diff --git a/client/internal/cmdline/scanner.go b/client/internal/cmdline/scanner.go index f088df6..cbbfcbf 100644 --- a/client/internal/cmdline/scanner.go +++ b/client/internal/cmdline/scanner.go @@ -25,10 +25,14 @@ func ScannerPostEvent(ctx CommandContext, args []string) error { } func init() { + parseScannerEventCmdTrie.MustAdd(scevt.NewAgentCacheGC, myreflect.TypeNameOf[scevt.AgentCacheGC]()) + parseScannerEventCmdTrie.MustAdd(scevt.NewAgentCheckCache, myreflect.TypeNameOf[scevt.AgentCheckCache]()) parseScannerEventCmdTrie.MustAdd(scevt.NewAgentCheckState, myreflect.TypeNameOf[scevt.AgentCheckState]()) + parseScannerEventCmdTrie.MustAdd(scevt.NewAgentStorageGC, myreflect.TypeNameOf[scevt.AgentStorageGC]()) + parseScannerEventCmdTrie.MustAdd(scevt.NewAgentCheckStorage, myreflect.TypeNameOf[scevt.AgentCheckStorage]()) parseScannerEventCmdTrie.MustAdd(scevt.NewCheckPackage, myreflect.TypeNameOf[scevt.CheckPackage]()) diff --git a/common/models/models.go b/common/models/models.go index e6d7988..6430e7b 100644 --- a/common/models/models.go +++ b/common/models/models.go @@ -1,6 +1,7 @@ package stgmod import ( + "github.com/samber/lo" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" ) @@ -11,36 +12,41 @@ type ObjectBlock struct { FileHash string `db:"FileHash" json:"fileHash"` } -type ObjectBlockDetail struct { - ObjectID cdssdk.ObjectID `json:"objectID"` - Index int `json:"index"` - FileHash string `json:"fileHash"` - NodeIDs []cdssdk.NodeID `json:"nodeID"` // 这个块应该在哪些节点上 - CachedNodeIDs []cdssdk.NodeID `json:"cachedNodeIDs"` // 哪些节点实际缓存了这个块 +type ObjectDetail struct { + Object cdssdk.Object `json:"object"` + Blocks []ObjectBlock `json:"blocks"` } -func NewObjectBlockDetail(objID cdssdk.ObjectID, index int, fileHash string, nodeIDs []cdssdk.NodeID, cachedNodeIDs []cdssdk.NodeID) ObjectBlockDetail { - return ObjectBlockDetail{ - ObjectID: objID, - Index: index, - FileHash: fileHash, - NodeIDs: nodeIDs, - CachedNodeIDs: cachedNodeIDs, +func NewObjectDetail(object cdssdk.Object, blocks []ObjectBlock) ObjectDetail { + return ObjectDetail{ + Object: object, + Blocks: blocks, } } -type ObjectDetail struct { - Object cdssdk.Object `json:"object"` - CachedNodeIDs []cdssdk.NodeID `json:"cachedNodeIDs"` // 文件的完整数据在哪些节点上缓存 - Blocks []ObjectBlockDetail `json:"blocks"` +type GrouppedObjectBlock struct { + ObjectID cdssdk.ObjectID + Index int + FileHash string + NodeIDs []cdssdk.NodeID } -func NewObjectDetail(object cdssdk.Object, cachedNodeIDs []cdssdk.NodeID, blocks []ObjectBlockDetail) ObjectDetail { - return ObjectDetail{ - Object: object, - CachedNodeIDs: cachedNodeIDs, - Blocks: blocks, +func (o *ObjectDetail) GroupBlocks() []GrouppedObjectBlock { + grps := make(map[int]GrouppedObjectBlock) + for _, block := range o.Blocks { + grp, ok := grps[block.Index] + if !ok { + grp = GrouppedObjectBlock{ + ObjectID: block.ObjectID, + Index: block.Index, + FileHash: block.FileHash, + } + } + grp.NodeIDs = append(grp.NodeIDs, block.NodeID) + grps[block.Index] = grp } + + return lo.Values(grps) } type LocalMachineInfo struct { diff --git a/common/pkgs/db/cache.go b/common/pkgs/db/cache.go index 0128143..d543592 100644 --- a/common/pkgs/db/cache.go +++ b/common/pkgs/db/cache.go @@ -34,9 +34,9 @@ func (*CacheDB) GetByNodeID(ctx SQLContext, nodeID cdssdk.NodeID) ([]model.Cache return ret, err } -// Create 创建一条新的缓存记录 +// Create 创建一条的缓存记录,如果已有则不进行操作 func (*CacheDB) Create(ctx SQLContext, fileHash string, nodeID cdssdk.NodeID, priority int) error { - _, err := ctx.Exec("insert into Cache values(?,?,?,?)", fileHash, nodeID, time.Now(), priority) + _, err := ctx.Exec("insert ignore into Cache values(?,?,?,?)", fileHash, nodeID, time.Now(), priority) if err != nil { return err } @@ -65,7 +65,11 @@ func (*CacheDB) BatchCreate(ctx SQLContext, fileHashes []string, nodeID cdssdk.N func (*CacheDB) NodeBatchDelete(ctx SQLContext, nodeID cdssdk.NodeID, fileHashes []string) error { // TODO in语句有长度限制 - _, err := ctx.Exec("delete from Cache where NodeID = ? and FileHash in (?)", nodeID, fileHashes) + query, args, err := sqlx.In("delete from Cache where NodeID = ? and FileHash in (?)", nodeID, fileHashes) + if err != nil { + return err + } + _, err = ctx.Exec(query, args...) return err } diff --git a/common/pkgs/db/object.go b/common/pkgs/db/object.go index aae7553..9816f78 100644 --- a/common/pkgs/db/object.go +++ b/common/pkgs/db/object.go @@ -169,7 +169,12 @@ func (db *ObjectDB) BatchUpdateRedundancy(ctx SQLContext, objs []coormq.ChangeOb } func (*ObjectDB) BatchDelete(ctx SQLContext, ids []cdssdk.ObjectID) error { - _, err := ctx.Exec("delete from Object where ObjectID in (?)", ids) + query, args, err := sqlx.In("delete from Object where ObjectID in (?)", ids) + if err != nil { + return err + } + + _, err = ctx.Exec(query, args...) return err } diff --git a/common/pkgs/db/object_block.go b/common/pkgs/db/object_block.go index 2ec166c..93c706f 100644 --- a/common/pkgs/db/object_block.go +++ b/common/pkgs/db/object_block.go @@ -22,7 +22,7 @@ func (db *DB) ObjectBlock() *ObjectBlockDB { func (db *ObjectBlockDB) GetByNodeID(ctx SQLContext, nodeID cdssdk.NodeID) ([]stgmod.ObjectBlock, error) { var rets []stgmod.ObjectBlock - _, err := ctx.Exec("select * from ObjectBlock where NodeID = ?", nodeID) + err := sqlx.Select(ctx, &rets, "select * from ObjectBlock where NodeID = ?", nodeID) return rets, err } @@ -42,7 +42,12 @@ func (db *ObjectBlockDB) DeleteInPackage(ctx SQLContext, packageID cdssdk.Packag } func (db *ObjectBlockDB) NodeBatchDelete(ctx SQLContext, nodeID cdssdk.NodeID, fileHashes []string) error { - _, err := ctx.Exec("delete from ObjectBlock where NodeID = ? and FileHash in (?)", nodeID, fileHashes) + query, args, err := sqlx.In("delete from ObjectBlock where NodeID = ? and FileHash in (?)", nodeID, fileHashes) + if err != nil { + return err + } + + _, err = ctx.Exec(query, args...) return err } @@ -70,49 +75,17 @@ func (db *ObjectBlockDB) GetPackageBlockDetails(ctx SQLContext, packageID cdssdk rets := make([]stgmod.ObjectDetail, 0, len(objs)) for _, obj := range objs { - var cachedObjectNodeIDs []cdssdk.NodeID - err := sqlx.Select(ctx, &cachedObjectNodeIDs, - "select NodeID from Object, Cache where"+ - " ObjectID = ? and Object.FileHash = Cache.FileHash", - obj.ObjectID, - ) - if err != nil { - return nil, err - } - - var blockTmpRets []struct { - Index int `db:"Index"` - FileHashes string `db:"FileHashes"` - NodeIDs string `db:"NodeIDs"` - CachedNodeIDs *string `db:"CachedNodeIDs"` - } - + var blocks []stgmod.ObjectBlock err = sqlx.Select(ctx, - &blockTmpRets, - "select ObjectBlock.Index, group_concat(distinct ObjectBlock.FileHash) as FileHashes, group_concat(distinct ObjectBlock.NodeID) as NodeIDs, group_concat(distinct Cache.NodeID) as CachedNodeIDs"+ - " from ObjectBlock left join Cache on ObjectBlock.FileHash = Cache.FileHash"+ - " where ObjectID = ? group by ObjectBlock.Index", + &blocks, + "select * from ObjectBlock where ObjectID = ? order by Index", obj.ObjectID, ) if err != nil { return nil, err } - blocks := make([]stgmod.ObjectBlockDetail, 0, len(blockTmpRets)) - for _, tmp := range blockTmpRets { - var block stgmod.ObjectBlockDetail - block.Index = tmp.Index - - block.FileHash = splitConcatedFileHash(tmp.FileHashes)[0] - block.NodeIDs = splitConcatedNodeID(tmp.NodeIDs) - if tmp.CachedNodeIDs != nil { - block.CachedNodeIDs = splitConcatedNodeID(*tmp.CachedNodeIDs) - } - - blocks = append(blocks, block) - } - - rets = append(rets, stgmod.NewObjectDetail(obj.ToObject(), cachedObjectNodeIDs, blocks)) + rets = append(rets, stgmod.NewObjectDetail(obj.ToObject(), blocks)) } return rets, nil diff --git a/common/pkgs/db/pinned_object.go b/common/pkgs/db/pinned_object.go index 2b08e77..1e71872 100644 --- a/common/pkgs/db/pinned_object.go +++ b/common/pkgs/db/pinned_object.go @@ -4,7 +4,9 @@ import ( "time" "github.com/jmoiron/sqlx" + "github.com/samber/lo" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" ) type PinnedObjectDB struct { @@ -22,9 +24,9 @@ func (*PinnedObjectDB) GetByNodeID(ctx SQLContext, nodeID cdssdk.NodeID) ([]cdss } func (*PinnedObjectDB) GetObjectsByNodeID(ctx SQLContext, nodeID cdssdk.NodeID) ([]cdssdk.Object, error) { - var ret []cdssdk.Object + var ret []model.TempObject err := sqlx.Select(ctx, &ret, "select Object.* from PinnedObject, Object where PinnedObject.ObjectID = Object.ObjectID and NodeID = ?", nodeID) - return ret, err + return lo.Map(ret, func(o model.TempObject, idx int) cdssdk.Object { return o.ToObject() }), err } func (*PinnedObjectDB) Create(ctx SQLContext, nodeID cdssdk.NodeID, objectID cdssdk.ObjectID, createTime time.Time) error { @@ -34,7 +36,7 @@ func (*PinnedObjectDB) Create(ctx SQLContext, nodeID cdssdk.NodeID, objectID cds func (*PinnedObjectDB) CreateFromPackage(ctx SQLContext, packageID cdssdk.PackageID, nodeID cdssdk.NodeID) error { _, err := ctx.Exec( - "insert ignore into PinnedObject(NodeID, ObjectID, CreateTime) select ?, ObjectID, ? from Object where PackageID = ?", + "insert ignore into PinnedObject(NodeID, ObjectID, CreateTime) select ? as NodeID, ObjectID, ? as CreateTime from Object where PackageID = ?", nodeID, time.Now(), packageID, @@ -43,12 +45,12 @@ func (*PinnedObjectDB) CreateFromPackage(ctx SQLContext, packageID cdssdk.Packag } func (*PinnedObjectDB) Delete(ctx SQLContext, nodeID cdssdk.NodeID, objectID cdssdk.ObjectID) error { - _, err := ctx.Exec("delete from PinnedObject where NodeID = ? and ObjectID = ?") + _, err := ctx.Exec("delete from PinnedObject where NodeID = ? and ObjectID = ?", nodeID, objectID) return err } func (*PinnedObjectDB) DeleteByObjectID(ctx SQLContext, objectID cdssdk.ObjectID) error { - _, err := ctx.Exec("delete from PinnedObject where and ObjectID = ?") + _, err := ctx.Exec("delete from PinnedObject where ObjectID = ?", objectID) return err } @@ -58,6 +60,10 @@ func (*PinnedObjectDB) DeleteInPackage(ctx SQLContext, packageID cdssdk.PackageI } func (*PinnedObjectDB) NodeBatchDelete(ctx SQLContext, nodeID cdssdk.NodeID, objectIDs []cdssdk.ObjectID) error { - _, err := ctx.Exec("delete from PinnedObject where NodeID = ? and ObjectID in (?)", objectIDs) + query, args, err := sqlx.In("delete from PinnedObject where NodeID = ? and ObjectID in (?)", objectIDs) + if err != nil { + return err + } + _, err = ctx.Exec(query, args...) return err } diff --git a/common/pkgs/iterator/download_object_iterator.go b/common/pkgs/iterator/download_object_iterator.go index 7271a3e..5cb1ef4 100644 --- a/common/pkgs/iterator/download_object_iterator.go +++ b/common/pkgs/iterator/download_object_iterator.go @@ -127,14 +127,16 @@ func (i *DownloadObjectIterator) chooseDownloadNode(entries []DownloadNodeInfo) } func (iter *DownloadObjectIterator) downloadNoneOrRepObject(coorCli *coormq.Client, ctx *DownloadContext, obj stgmodels.ObjectDetail) (io.ReadCloser, error) { + if len(obj.Blocks) == 0 { + return nil, fmt.Errorf("no node has this object") + } + //采取直接读,优先选内网节点 var chosenNodes []DownloadNodeInfo - for i := range obj.Blocks { - if len(obj.Blocks[i].CachedNodeIDs) == 0 { - return nil, fmt.Errorf("no node has block %d", obj.Blocks[i].Index) - } - getNodesResp, err := coorCli.GetNodes(coormq.NewGetNodes(obj.Blocks[i].CachedNodeIDs)) + grpBlocks := obj.GroupBlocks() + for _, grp := range grpBlocks { + getNodesResp, err := coorCli.GetNodes(coormq.NewGetNodes(grp.NodeIDs)) if err != nil { continue } @@ -151,8 +153,8 @@ func (iter *DownloadObjectIterator) downloadNoneOrRepObject(coorCli *coormq.Clie var fileStrs []io.ReadCloser - for i := range obj.Blocks { - str, err := downloadFile(ctx, chosenNodes[i], obj.Blocks[i].FileHash) + for i := range grpBlocks { + str, err := downloadFile(ctx, chosenNodes[i], grpBlocks[i].FileHash) if err != nil { for i -= 1; i >= 0; i-- { fileStrs[i].Close() @@ -172,19 +174,14 @@ func (iter *DownloadObjectIterator) downloadNoneOrRepObject(coorCli *coormq.Clie func (iter *DownloadObjectIterator) downloadECObject(coorCli *coormq.Client, ctx *DownloadContext, obj stgmodels.ObjectDetail, ecRed *cdssdk.ECRedundancy) (io.ReadCloser, error) { //采取直接读,优先选内网节点 var chosenNodes []DownloadNodeInfo - var chosenBlocks []stgmodels.ObjectBlockDetail - for i := range obj.Blocks { + var chosenBlocks []stgmodels.GrouppedObjectBlock + grpBlocks := obj.GroupBlocks() + for i := range grpBlocks { if len(chosenBlocks) == ecRed.K { break } - // 块没有被任何节点缓存或者获取失败都没关系,只要能获取到k个块的信息就行 - - if len(obj.Blocks[i].CachedNodeIDs) == 0 { - continue - } - - getNodesResp, err := coorCli.GetNodes(coormq.NewGetNodes(obj.Blocks[i].CachedNodeIDs)) + getNodesResp, err := coorCli.GetNodes(coormq.NewGetNodes(grpBlocks[i].NodeIDs)) if err != nil { continue } @@ -196,7 +193,7 @@ func (iter *DownloadObjectIterator) downloadECObject(coorCli *coormq.Client, ctx } }) - chosenBlocks = append(chosenBlocks, obj.Blocks[i]) + chosenBlocks = append(chosenBlocks, grpBlocks[i]) chosenNodes = append(chosenNodes, iter.chooseDownloadNode(downloadNodes)) } diff --git a/coordinator/internal/services/bucket.go b/coordinator/internal/services/bucket.go index 4ffdab7..564b105 100644 --- a/coordinator/internal/services/bucket.go +++ b/coordinator/internal/services/bucket.go @@ -45,7 +45,7 @@ func (svc *Service) GetBucketPackages(msg *coormq.GetBucketPackages) (*coormq.Ge func (svc *Service) CreateBucket(msg *coormq.CreateBucket) (*coormq.CreateBucketResp, *mq.CodeMessage) { var bucketID cdssdk.BucketID - err := svc.db.DoTx(sql.LevelLinearizable, func(tx *sqlx.Tx) error { + err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { _, err := svc.db.User().GetByID(tx, msg.UserID) if err != nil { return fmt.Errorf("getting user by id: %w", err) @@ -69,7 +69,7 @@ func (svc *Service) CreateBucket(msg *coormq.CreateBucket) (*coormq.CreateBucket } func (svc *Service) DeleteBucket(msg *coormq.DeleteBucket) (*coormq.DeleteBucketResp, *mq.CodeMessage) { - err := svc.db.DoTx(sql.LevelLinearizable, func(tx *sqlx.Tx) error { + err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { isAvai, _ := svc.db.Bucket().IsAvailable(tx, msg.BucketID, msg.UserID) if !isAvai { return fmt.Errorf("bucket is not avaiable to the user") diff --git a/coordinator/internal/services/cache.go b/coordinator/internal/services/cache.go index 1cc5a0b..9df882c 100644 --- a/coordinator/internal/services/cache.go +++ b/coordinator/internal/services/cache.go @@ -12,7 +12,7 @@ import ( ) func (svc *Service) CachePackageMoved(msg *coormq.CachePackageMoved) (*coormq.CachePackageMovedResp, *mq.CodeMessage) { - err := svc.db.DoTx(sql.LevelLinearizable, func(tx *sqlx.Tx) error { + err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { _, err := svc.db.Package().GetByID(tx, msg.PackageID) if err != nil { return fmt.Errorf("getting package by id: %w", err) diff --git a/coordinator/internal/services/object.go b/coordinator/internal/services/object.go index 43cf601..d901c88 100644 --- a/coordinator/internal/services/object.go +++ b/coordinator/internal/services/object.go @@ -28,7 +28,7 @@ func (svc *Service) GetPackageObjects(msg *coormq.GetPackageObjects) (*coormq.Ge func (svc *Service) GetPackageObjectDetails(msg *coormq.GetPackageObjectDetails) (*coormq.GetPackageObjectDetailsResp, *mq.CodeMessage) { var details []stgmod.ObjectDetail // 必须放在事务里进行,因为GetPackageBlockDetails是由多次数据库操作组成,必须保证数据的一致性 - err := svc.db.DoTx(sql.LevelLinearizable, func(tx *sqlx.Tx) error { + err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { var err error _, err = svc.db.Package().GetByID(tx, msg.PackageID) if err != nil { @@ -52,7 +52,7 @@ func (svc *Service) GetPackageObjectDetails(msg *coormq.GetPackageObjectDetails) } func (svc *Service) ChangeObjectRedundancy(msg *coormq.ChangeObjectRedundancy) (*coormq.ChangeObjectRedundancyResp, *mq.CodeMessage) { - err := svc.db.DoTx(sql.LevelLinearizable, func(tx *sqlx.Tx) error { + err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { return svc.db.Object().BatchUpdateRedundancy(tx, msg.Entries) }) if err != nil { diff --git a/coordinator/internal/services/package.go b/coordinator/internal/services/package.go index 30e110c..b4822e9 100644 --- a/coordinator/internal/services/package.go +++ b/coordinator/internal/services/package.go @@ -28,7 +28,7 @@ func (svc *Service) GetPackage(msg *coormq.GetPackage) (*coormq.GetPackageResp, func (svc *Service) CreatePackage(msg *coormq.CreatePackage) (*coormq.CreatePackageResp, *mq.CodeMessage) { var pkgID cdssdk.PackageID - err := svc.db.DoTx(sql.LevelLinearizable, func(tx *sqlx.Tx) error { + err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { var err error isAvai, _ := svc.db.Bucket().IsAvailable(tx, msg.BucketID, msg.UserID) @@ -54,7 +54,7 @@ func (svc *Service) CreatePackage(msg *coormq.CreatePackage) (*coormq.CreatePack } func (svc *Service) UpdatePackage(msg *coormq.UpdatePackage) (*coormq.UpdatePackageResp, *mq.CodeMessage) { - err := svc.db.DoTx(sql.LevelLinearizable, func(tx *sqlx.Tx) error { + err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { _, err := svc.db.Package().GetByID(tx, msg.PackageID) if err != nil { return fmt.Errorf("getting package by id: %w", err) @@ -85,7 +85,7 @@ func (svc *Service) UpdatePackage(msg *coormq.UpdatePackage) (*coormq.UpdatePack } func (svc *Service) DeletePackage(msg *coormq.DeletePackage) (*coormq.DeletePackageResp, *mq.CodeMessage) { - err := svc.db.DoTx(sql.LevelLinearizable, func(tx *sqlx.Tx) error { + err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { isAvai, _ := svc.db.Package().IsAvailable(tx, msg.UserID, msg.PackageID) if !isAvai { return fmt.Errorf("package is not available to the user") @@ -144,19 +144,17 @@ func (svc *Service) GetPackageCachedNodes(msg *coormq.GetPackageCachedNodes) (*c for _, obj := range objDetails { // 只要存了文件的一个块,就认为此节点存了整个文件 for _, block := range obj.Blocks { - for _, nodeID := range block.CachedNodeIDs { - info, ok := nodeInfoMap[nodeID] - if !ok { - info = &cdssdk.NodePackageCachingInfo{ - NodeID: nodeID, - } - nodeInfoMap[nodeID] = info - + info, ok := nodeInfoMap[block.NodeID] + if !ok { + info = &cdssdk.NodePackageCachingInfo{ + NodeID: block.NodeID, } + nodeInfoMap[block.NodeID] = info - info.FileSize += obj.Object.Size - info.ObjectCount++ } + + info.FileSize += obj.Object.Size + info.ObjectCount++ } } diff --git a/coordinator/internal/services/storage.go b/coordinator/internal/services/storage.go index 66b38f0..abdaa6a 100644 --- a/coordinator/internal/services/storage.go +++ b/coordinator/internal/services/storage.go @@ -24,7 +24,7 @@ func (svc *Service) GetStorageInfo(msg *coormq.GetStorageInfo) (*coormq.GetStora } func (svc *Service) StoragePackageLoaded(msg *coormq.StoragePackageLoaded) (*coormq.StoragePackageLoadedResp, *mq.CodeMessage) { - err := svc.db.DoTx(sql.LevelLinearizable, func(tx *sqlx.Tx) error { + err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { err := svc.db.StoragePackage().Create(tx, msg.StorageID, msg.PackageID, msg.UserID) if err != nil { return fmt.Errorf("creating storage package: %w", err) diff --git a/scanner/internal/event/agent_cache_gc.go b/scanner/internal/event/agent_cache_gc.go index 0204579..22e5521 100644 --- a/scanner/internal/event/agent_cache_gc.go +++ b/scanner/internal/event/agent_cache_gc.go @@ -56,7 +56,7 @@ func (t *AgentCacheGC) Execute(execCtx ExecuteContext) { defer mutex.Unlock() var allFileHashes []string - err = execCtx.Args.DB.DoTx(sql.LevelLinearizable, func(tx *sqlx.Tx) error { + err = execCtx.Args.DB.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { blocks, err := execCtx.Args.DB.ObjectBlock().GetByNodeID(tx, t.NodeID) if err != nil { return fmt.Errorf("getting object blocks by node id: %w", err) diff --git a/scanner/internal/event/agent_check_cache.go b/scanner/internal/event/agent_check_cache.go index 3c64541..6f9084e 100644 --- a/scanner/internal/event/agent_check_cache.go +++ b/scanner/internal/event/agent_check_cache.go @@ -61,7 +61,7 @@ func (t *AgentCheckCache) Execute(execCtx ExecuteContext) { realFileHashes := lo.SliceToMap(checkResp.FileHashes, func(hash string) (string, bool) { return hash, true }) // 根据IPFS中实际文件情况修改元数据。修改过程中的失败均忽略。(但关联修改需要原子性) - execCtx.Args.DB.DoTx(sql.LevelLinearizable, func(tx *sqlx.Tx) error { + execCtx.Args.DB.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { t.checkCache(execCtx, tx, realFileHashes) t.checkPinnedObject(execCtx, tx, realFileHashes) @@ -97,16 +97,19 @@ func (t *AgentCheckCache) checkCache(execCtx ExecuteContext, tx *sqlx.Tx, realFi rms = append(rms, c.FileHash) } - err = execCtx.Args.DB.Cache().NodeBatchDelete(tx, t.NodeID, rms) - if err != nil { - log.Warnf("batch delete node caches: %w", err.Error()) - return + if len(rms) > 0 { + err = execCtx.Args.DB.Cache().NodeBatchDelete(tx, t.NodeID, rms) + if err != nil { + log.Warnf("batch delete node caches: %w", err.Error()) + } } - err = execCtx.Args.DB.Cache().BatchCreate(tx, lo.Keys(realFileHashes), t.NodeID, 0) - if err != nil { - log.Warnf("batch create node caches: %w", err) - return + if len(realFileHashesCp) > 0 { + err = execCtx.Args.DB.Cache().BatchCreate(tx, lo.Keys(realFileHashesCp), t.NodeID, 0) + if err != nil { + log.Warnf("batch create node caches: %w", err) + return + } } } @@ -128,10 +131,11 @@ func (t *AgentCheckCache) checkPinnedObject(execCtx ExecuteContext, tx *sqlx.Tx, rms = append(rms, c.ObjectID) } - err = execCtx.Args.DB.PinnedObject().NodeBatchDelete(tx, t.NodeID, rms) - if err != nil { - log.Warnf("batch delete node pinned objects: %s", err.Error()) - return + if len(rms) > 0 { + err = execCtx.Args.DB.PinnedObject().NodeBatchDelete(tx, t.NodeID, rms) + if err != nil { + log.Warnf("batch delete node pinned objects: %s", err.Error()) + } } } @@ -139,29 +143,25 @@ func (t *AgentCheckCache) checkPinnedObject(execCtx ExecuteContext, tx *sqlx.Tx, func (t *AgentCheckCache) checkObjectBlock(execCtx ExecuteContext, tx *sqlx.Tx, realFileHashes map[string]bool) { log := logger.WithType[AgentCheckCache]("Event") - objs, err := execCtx.Args.DB.ObjectBlock().GetByNodeID(tx, t.NodeID) + blocks, err := execCtx.Args.DB.ObjectBlock().GetByNodeID(tx, t.NodeID) if err != nil { log.WithField("NodeID", t.NodeID).Warnf("getting object blocks by node id: %s", err.Error()) return } - realFileHashesCp := make(map[string]bool) - for k, v := range realFileHashes { - realFileHashesCp[k] = v - } - var rms []string - for _, c := range objs { - if realFileHashesCp[c.FileHash] { + for _, b := range blocks { + if realFileHashes[b.FileHash] { continue } - rms = append(rms, c.FileHash) + rms = append(rms, b.FileHash) } - err = execCtx.Args.DB.ObjectBlock().NodeBatchDelete(tx, t.NodeID, rms) - if err != nil { - log.Warnf("batch delete node object blocks: %w", err) - return + if len(rms) > 0 { + err = execCtx.Args.DB.ObjectBlock().NodeBatchDelete(tx, t.NodeID, rms) + if err != nil { + log.Warnf("batch delete node object blocks: %s", err.Error()) + } } } diff --git a/scanner/internal/event/agent_check_storage.go b/scanner/internal/event/agent_check_storage.go index 66e786c..9edaacf 100644 --- a/scanner/internal/event/agent_check_storage.go +++ b/scanner/internal/event/agent_check_storage.go @@ -88,7 +88,7 @@ func (t *AgentCheckStorage) Execute(execCtx ExecuteContext) { pkgs[pkg.PackageID] = true } - execCtx.Args.DB.DoTx(sql.LevelLinearizable, func(tx *sqlx.Tx) error { + execCtx.Args.DB.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { packages, err := execCtx.Args.DB.StoragePackage().GetAllByStorageID(tx, t.StorageID) if err != nil { log.Warnf("getting storage package: %s", err.Error()) diff --git a/scanner/internal/event/check_package_redundancy.go b/scanner/internal/event/check_package_redundancy.go index 00e331b..9fa883d 100644 --- a/scanner/internal/event/check_package_redundancy.go +++ b/scanner/internal/event/check_package_redundancy.go @@ -2,6 +2,7 @@ package event import ( "fmt" + "strconv" "time" "github.com/samber/lo" @@ -219,7 +220,7 @@ func (t *CheckPackageRedundancy) rechooseNodesForRep(obj stgmod.ObjectDetail, re for _, node := range allNodes { cachedBlockIndex := -1 for _, block := range obj.Blocks { - if lo.Contains(block.CachedNodeIDs, node.Node.NodeID) { + if block.NodeID == node.Node.NodeID { cachedBlockIndex = block.Index break } @@ -259,7 +260,7 @@ func (t *CheckPackageRedundancy) rechooseNodesForEC(obj stgmod.ObjectDetail, red for _, node := range allNodes { cachedBlockIndex := -1 for _, block := range obj.Blocks { - if lo.Contains(block.CachedNodeIDs, node.Node.NodeID) { + if block.NodeID == node.Node.NodeID { cachedBlockIndex = block.Index break } @@ -327,7 +328,7 @@ func (t *CheckPackageRedundancy) chooseSoManyNodes(count int, nodes []*NodeLoadI } func (t *CheckPackageRedundancy) noneToRep(obj stgmod.ObjectDetail, red *cdssdk.RepRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.ChangeObjectRedundancyEntry, error) { - if len(obj.CachedNodeIDs) == 0 { + if len(obj.Blocks) == 0 { return nil, fmt.Errorf("object is not cached on any nodes, cannot change its redundancy to rep") } @@ -362,11 +363,11 @@ func (t *CheckPackageRedundancy) noneToEC(obj stgmod.ObjectDetail, red *cdssdk.E } defer stgglb.CoordinatorMQPool.Release(coorCli) - if len(obj.CachedNodeIDs) == 0 { + if len(obj.Blocks) == 0 { return nil, fmt.Errorf("object is not cached on any nodes, cannot change its redundancy to ec") } - getNodes, err := coorCli.GetNodes(coormq.NewGetNodes([]cdssdk.NodeID{obj.CachedNodeIDs[0]})) + getNodes, err := coorCli.GetNodes(coormq.NewGetNodes([]cdssdk.NodeID{obj.Blocks[0].NodeID})) if err != nil { return nil, fmt.Errorf("requesting to get nodes: %w", err) } @@ -410,7 +411,7 @@ func (t *CheckPackageRedundancy) noneToEC(obj stgmod.ObjectDetail, red *cdssdk.E } func (t *CheckPackageRedundancy) repToRep(obj stgmod.ObjectDetail, red *cdssdk.RepRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.ChangeObjectRedundancyEntry, error) { - if len(obj.CachedNodeIDs) == 0 { + if len(obj.Blocks) == 0 { return nil, fmt.Errorf("object is not cached on any nodes, cannot change its redundancy to rep") } @@ -456,10 +457,10 @@ func (t *CheckPackageRedundancy) ecToRep(obj stgmod.ObjectDetail, srcRed *cdssdk } defer stgglb.CoordinatorMQPool.Release(coorCli) - var chosenBlocks []stgmod.ObjectBlockDetail + var chosenBlocks []stgmod.GrouppedObjectBlock var chosenBlockIndexes []int - for _, block := range obj.Blocks { - if len(block.CachedNodeIDs) > 0 { + for _, block := range obj.GroupBlocks() { + if len(block.NodeIDs) > 0 { chosenBlocks = append(chosenBlocks, block) chosenBlockIndexes = append(chosenBlockIndexes, block.Index) } @@ -529,10 +530,12 @@ func (t *CheckPackageRedundancy) ecToEC(obj stgmod.ObjectDetail, srcRed *cdssdk. } defer stgglb.CoordinatorMQPool.Release(coorCli) - var chosenBlocks []stgmod.ObjectBlockDetail + grpBlocks := obj.GroupBlocks() + + var chosenBlocks []stgmod.GrouppedObjectBlock var chosenBlockIndexes []int - for _, block := range obj.Blocks { - if len(block.CachedNodeIDs) > 0 { + for _, block := range grpBlocks { + if len(block.NodeIDs) > 0 { chosenBlocks = append(chosenBlocks, block) chosenBlockIndexes = append(chosenBlockIndexes, block.Index) } @@ -551,28 +554,26 @@ func (t *CheckPackageRedundancy) ecToEC(obj stgmod.ObjectDetail, srcRed *cdssdk. var newBlocks []stgmod.ObjectBlock shouldUpdateBlocks := false - for i := range obj.Blocks { - newBlocks = append(newBlocks, stgmod.ObjectBlock{ + for i, node := range uploadNodes { + newBlock := stgmod.ObjectBlock{ ObjectID: obj.Object.ObjectID, Index: i, - NodeID: uploadNodes[i].Node.NodeID, - FileHash: obj.Blocks[i].FileHash, - }) + NodeID: node.Node.NodeID, + } + + grp, ok := lo.Find(grpBlocks, func(grp stgmod.GrouppedObjectBlock) bool { return grp.Index == i }) // 如果新选中的节点已经记录在Block表中,那么就不需要任何变更 - if lo.Contains(obj.Blocks[i].NodeIDs, uploadNodes[i].Node.NodeID) { + if ok && lo.Contains(grp.NodeIDs, node.Node.NodeID) { + newBlock.FileHash = grp.FileHash + newBlocks = append(newBlocks, newBlock) continue } shouldUpdateBlocks = true - // 新选的节点不在Block表中,但实际上保存了分块的数据,那么只需建立一条Block记录即可 - if lo.Contains(obj.Blocks[i].CachedNodeIDs, uploadNodes[i].Node.NodeID) { - continue - } - // 否则就要重建出这个节点需要的块 - tarNode := planBlder.AtAgent(uploadNodes[i].Node) + tarNode := planBlder.AtAgent(node.Node) var inputs []*plans.AgentStream for _, block := range chosenBlocks { @@ -580,7 +581,8 @@ func (t *CheckPackageRedundancy) ecToEC(obj stgmod.ObjectDetail, srcRed *cdssdk. } // 输出只需要自己要保存的那一块 - tarNode.ECReconstructAny(*srcRed, chosenBlockIndexes, []int{i}, inputs...).Stream(0).IPFSWrite("") + tarNode.ECReconstructAny(*srcRed, chosenBlockIndexes, []int{i}, inputs...).Stream(0).IPFSWrite(fmt.Sprintf("%d", i)) + newBlocks = append(newBlocks, newBlock) } plan, err := planBlder.Build() @@ -594,7 +596,7 @@ func (t *CheckPackageRedundancy) ecToEC(obj stgmod.ObjectDetail, srcRed *cdssdk. } // 如果没有任何Plan,Wait会直接返回成功 - _, err = exec.Wait() + ret, err := exec.Wait() if err != nil { return nil, fmt.Errorf("executing io plan: %w", err) } @@ -603,6 +605,15 @@ func (t *CheckPackageRedundancy) ecToEC(obj stgmod.ObjectDetail, srcRed *cdssdk. return nil, nil } + for k, v := range ret.ResultValues { + idx, err := strconv.ParseInt(k, 10, 64) + if err != nil { + return nil, fmt.Errorf("parsing result key %s as index: %w", k, err) + } + + newBlocks[idx].FileHash = v.(string) + } + return &coormq.ChangeObjectRedundancyEntry{ ObjectID: obj.Object.ObjectID, Redundancy: tarRed,