| @@ -25,10 +25,14 @@ func ScannerPostEvent(ctx CommandContext, args []string) error { | |||
| } | |||
| func init() { | |||
| parseScannerEventCmdTrie.MustAdd(scevt.NewAgentCacheGC, myreflect.TypeNameOf[scevt.AgentCacheGC]()) | |||
| parseScannerEventCmdTrie.MustAdd(scevt.NewAgentCheckCache, myreflect.TypeNameOf[scevt.AgentCheckCache]()) | |||
| parseScannerEventCmdTrie.MustAdd(scevt.NewAgentCheckState, myreflect.TypeNameOf[scevt.AgentCheckState]()) | |||
| parseScannerEventCmdTrie.MustAdd(scevt.NewAgentStorageGC, myreflect.TypeNameOf[scevt.AgentStorageGC]()) | |||
| parseScannerEventCmdTrie.MustAdd(scevt.NewAgentCheckStorage, myreflect.TypeNameOf[scevt.AgentCheckStorage]()) | |||
| parseScannerEventCmdTrie.MustAdd(scevt.NewCheckPackage, myreflect.TypeNameOf[scevt.CheckPackage]()) | |||
| @@ -1,6 +1,7 @@ | |||
| package stgmod | |||
| import ( | |||
| "github.com/samber/lo" | |||
| cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | |||
| ) | |||
| @@ -11,36 +12,41 @@ type ObjectBlock struct { | |||
| FileHash string `db:"FileHash" json:"fileHash"` | |||
| } | |||
| type ObjectBlockDetail struct { | |||
| ObjectID cdssdk.ObjectID `json:"objectID"` | |||
| Index int `json:"index"` | |||
| FileHash string `json:"fileHash"` | |||
| NodeIDs []cdssdk.NodeID `json:"nodeID"` // 这个块应该在哪些节点上 | |||
| CachedNodeIDs []cdssdk.NodeID `json:"cachedNodeIDs"` // 哪些节点实际缓存了这个块 | |||
| type ObjectDetail struct { | |||
| Object cdssdk.Object `json:"object"` | |||
| Blocks []ObjectBlock `json:"blocks"` | |||
| } | |||
| func NewObjectBlockDetail(objID cdssdk.ObjectID, index int, fileHash string, nodeIDs []cdssdk.NodeID, cachedNodeIDs []cdssdk.NodeID) ObjectBlockDetail { | |||
| return ObjectBlockDetail{ | |||
| ObjectID: objID, | |||
| Index: index, | |||
| FileHash: fileHash, | |||
| NodeIDs: nodeIDs, | |||
| CachedNodeIDs: cachedNodeIDs, | |||
| func NewObjectDetail(object cdssdk.Object, blocks []ObjectBlock) ObjectDetail { | |||
| return ObjectDetail{ | |||
| Object: object, | |||
| Blocks: blocks, | |||
| } | |||
| } | |||
| type ObjectDetail struct { | |||
| Object cdssdk.Object `json:"object"` | |||
| CachedNodeIDs []cdssdk.NodeID `json:"cachedNodeIDs"` // 文件的完整数据在哪些节点上缓存 | |||
| Blocks []ObjectBlockDetail `json:"blocks"` | |||
| type GrouppedObjectBlock struct { | |||
| ObjectID cdssdk.ObjectID | |||
| Index int | |||
| FileHash string | |||
| NodeIDs []cdssdk.NodeID | |||
| } | |||
| func NewObjectDetail(object cdssdk.Object, cachedNodeIDs []cdssdk.NodeID, blocks []ObjectBlockDetail) ObjectDetail { | |||
| return ObjectDetail{ | |||
| Object: object, | |||
| CachedNodeIDs: cachedNodeIDs, | |||
| Blocks: blocks, | |||
| func (o *ObjectDetail) GroupBlocks() []GrouppedObjectBlock { | |||
| grps := make(map[int]GrouppedObjectBlock) | |||
| for _, block := range o.Blocks { | |||
| grp, ok := grps[block.Index] | |||
| if !ok { | |||
| grp = GrouppedObjectBlock{ | |||
| ObjectID: block.ObjectID, | |||
| Index: block.Index, | |||
| FileHash: block.FileHash, | |||
| } | |||
| } | |||
| grp.NodeIDs = append(grp.NodeIDs, block.NodeID) | |||
| grps[block.Index] = grp | |||
| } | |||
| return lo.Values(grps) | |||
| } | |||
| type LocalMachineInfo struct { | |||
| @@ -34,9 +34,9 @@ func (*CacheDB) GetByNodeID(ctx SQLContext, nodeID cdssdk.NodeID) ([]model.Cache | |||
| return ret, err | |||
| } | |||
| // Create 创建一条新的缓存记录 | |||
| // Create 创建一条的缓存记录,如果已有则不进行操作 | |||
| func (*CacheDB) Create(ctx SQLContext, fileHash string, nodeID cdssdk.NodeID, priority int) error { | |||
| _, err := ctx.Exec("insert into Cache values(?,?,?,?)", fileHash, nodeID, time.Now(), priority) | |||
| _, err := ctx.Exec("insert ignore into Cache values(?,?,?,?)", fileHash, nodeID, time.Now(), priority) | |||
| if err != nil { | |||
| return err | |||
| } | |||
| @@ -65,7 +65,11 @@ func (*CacheDB) BatchCreate(ctx SQLContext, fileHashes []string, nodeID cdssdk.N | |||
| func (*CacheDB) NodeBatchDelete(ctx SQLContext, nodeID cdssdk.NodeID, fileHashes []string) error { | |||
| // TODO in语句有长度限制 | |||
| _, err := ctx.Exec("delete from Cache where NodeID = ? and FileHash in (?)", nodeID, fileHashes) | |||
| query, args, err := sqlx.In("delete from Cache where NodeID = ? and FileHash in (?)", nodeID, fileHashes) | |||
| if err != nil { | |||
| return err | |||
| } | |||
| _, err = ctx.Exec(query, args...) | |||
| return err | |||
| } | |||
| @@ -169,7 +169,12 @@ func (db *ObjectDB) BatchUpdateRedundancy(ctx SQLContext, objs []coormq.ChangeOb | |||
| } | |||
| func (*ObjectDB) BatchDelete(ctx SQLContext, ids []cdssdk.ObjectID) error { | |||
| _, err := ctx.Exec("delete from Object where ObjectID in (?)", ids) | |||
| query, args, err := sqlx.In("delete from Object where ObjectID in (?)", ids) | |||
| if err != nil { | |||
| return err | |||
| } | |||
| _, err = ctx.Exec(query, args...) | |||
| return err | |||
| } | |||
| @@ -22,7 +22,7 @@ func (db *DB) ObjectBlock() *ObjectBlockDB { | |||
| func (db *ObjectBlockDB) GetByNodeID(ctx SQLContext, nodeID cdssdk.NodeID) ([]stgmod.ObjectBlock, error) { | |||
| var rets []stgmod.ObjectBlock | |||
| _, err := ctx.Exec("select * from ObjectBlock where NodeID = ?", nodeID) | |||
| err := sqlx.Select(ctx, &rets, "select * from ObjectBlock where NodeID = ?", nodeID) | |||
| return rets, err | |||
| } | |||
| @@ -42,7 +42,12 @@ func (db *ObjectBlockDB) DeleteInPackage(ctx SQLContext, packageID cdssdk.Packag | |||
| } | |||
| func (db *ObjectBlockDB) NodeBatchDelete(ctx SQLContext, nodeID cdssdk.NodeID, fileHashes []string) error { | |||
| _, err := ctx.Exec("delete from ObjectBlock where NodeID = ? and FileHash in (?)", nodeID, fileHashes) | |||
| query, args, err := sqlx.In("delete from ObjectBlock where NodeID = ? and FileHash in (?)", nodeID, fileHashes) | |||
| if err != nil { | |||
| return err | |||
| } | |||
| _, err = ctx.Exec(query, args...) | |||
| return err | |||
| } | |||
| @@ -70,49 +75,17 @@ func (db *ObjectBlockDB) GetPackageBlockDetails(ctx SQLContext, packageID cdssdk | |||
| rets := make([]stgmod.ObjectDetail, 0, len(objs)) | |||
| for _, obj := range objs { | |||
| var cachedObjectNodeIDs []cdssdk.NodeID | |||
| err := sqlx.Select(ctx, &cachedObjectNodeIDs, | |||
| "select NodeID from Object, Cache where"+ | |||
| " ObjectID = ? and Object.FileHash = Cache.FileHash", | |||
| obj.ObjectID, | |||
| ) | |||
| if err != nil { | |||
| return nil, err | |||
| } | |||
| var blockTmpRets []struct { | |||
| Index int `db:"Index"` | |||
| FileHashes string `db:"FileHashes"` | |||
| NodeIDs string `db:"NodeIDs"` | |||
| CachedNodeIDs *string `db:"CachedNodeIDs"` | |||
| } | |||
| var blocks []stgmod.ObjectBlock | |||
| err = sqlx.Select(ctx, | |||
| &blockTmpRets, | |||
| "select ObjectBlock.Index, group_concat(distinct ObjectBlock.FileHash) as FileHashes, group_concat(distinct ObjectBlock.NodeID) as NodeIDs, group_concat(distinct Cache.NodeID) as CachedNodeIDs"+ | |||
| " from ObjectBlock left join Cache on ObjectBlock.FileHash = Cache.FileHash"+ | |||
| " where ObjectID = ? group by ObjectBlock.Index", | |||
| &blocks, | |||
| "select * from ObjectBlock where ObjectID = ? order by Index", | |||
| obj.ObjectID, | |||
| ) | |||
| if err != nil { | |||
| return nil, err | |||
| } | |||
| blocks := make([]stgmod.ObjectBlockDetail, 0, len(blockTmpRets)) | |||
| for _, tmp := range blockTmpRets { | |||
| var block stgmod.ObjectBlockDetail | |||
| block.Index = tmp.Index | |||
| block.FileHash = splitConcatedFileHash(tmp.FileHashes)[0] | |||
| block.NodeIDs = splitConcatedNodeID(tmp.NodeIDs) | |||
| if tmp.CachedNodeIDs != nil { | |||
| block.CachedNodeIDs = splitConcatedNodeID(*tmp.CachedNodeIDs) | |||
| } | |||
| blocks = append(blocks, block) | |||
| } | |||
| rets = append(rets, stgmod.NewObjectDetail(obj.ToObject(), cachedObjectNodeIDs, blocks)) | |||
| rets = append(rets, stgmod.NewObjectDetail(obj.ToObject(), blocks)) | |||
| } | |||
| return rets, nil | |||
| @@ -4,7 +4,9 @@ import ( | |||
| "time" | |||
| "github.com/jmoiron/sqlx" | |||
| "github.com/samber/lo" | |||
| cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | |||
| "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" | |||
| ) | |||
| type PinnedObjectDB struct { | |||
| @@ -22,9 +24,9 @@ func (*PinnedObjectDB) GetByNodeID(ctx SQLContext, nodeID cdssdk.NodeID) ([]cdss | |||
| } | |||
| func (*PinnedObjectDB) GetObjectsByNodeID(ctx SQLContext, nodeID cdssdk.NodeID) ([]cdssdk.Object, error) { | |||
| var ret []cdssdk.Object | |||
| var ret []model.TempObject | |||
| err := sqlx.Select(ctx, &ret, "select Object.* from PinnedObject, Object where PinnedObject.ObjectID = Object.ObjectID and NodeID = ?", nodeID) | |||
| return ret, err | |||
| return lo.Map(ret, func(o model.TempObject, idx int) cdssdk.Object { return o.ToObject() }), err | |||
| } | |||
| func (*PinnedObjectDB) Create(ctx SQLContext, nodeID cdssdk.NodeID, objectID cdssdk.ObjectID, createTime time.Time) error { | |||
| @@ -34,7 +36,7 @@ func (*PinnedObjectDB) Create(ctx SQLContext, nodeID cdssdk.NodeID, objectID cds | |||
| func (*PinnedObjectDB) CreateFromPackage(ctx SQLContext, packageID cdssdk.PackageID, nodeID cdssdk.NodeID) error { | |||
| _, err := ctx.Exec( | |||
| "insert ignore into PinnedObject(NodeID, ObjectID, CreateTime) select ?, ObjectID, ? from Object where PackageID = ?", | |||
| "insert ignore into PinnedObject(NodeID, ObjectID, CreateTime) select ? as NodeID, ObjectID, ? as CreateTime from Object where PackageID = ?", | |||
| nodeID, | |||
| time.Now(), | |||
| packageID, | |||
| @@ -43,12 +45,12 @@ func (*PinnedObjectDB) CreateFromPackage(ctx SQLContext, packageID cdssdk.Packag | |||
| } | |||
| func (*PinnedObjectDB) Delete(ctx SQLContext, nodeID cdssdk.NodeID, objectID cdssdk.ObjectID) error { | |||
| _, err := ctx.Exec("delete from PinnedObject where NodeID = ? and ObjectID = ?") | |||
| _, err := ctx.Exec("delete from PinnedObject where NodeID = ? and ObjectID = ?", nodeID, objectID) | |||
| return err | |||
| } | |||
| func (*PinnedObjectDB) DeleteByObjectID(ctx SQLContext, objectID cdssdk.ObjectID) error { | |||
| _, err := ctx.Exec("delete from PinnedObject where and ObjectID = ?") | |||
| _, err := ctx.Exec("delete from PinnedObject where ObjectID = ?", objectID) | |||
| return err | |||
| } | |||
| @@ -58,6 +60,10 @@ func (*PinnedObjectDB) DeleteInPackage(ctx SQLContext, packageID cdssdk.PackageI | |||
| } | |||
| func (*PinnedObjectDB) NodeBatchDelete(ctx SQLContext, nodeID cdssdk.NodeID, objectIDs []cdssdk.ObjectID) error { | |||
| _, err := ctx.Exec("delete from PinnedObject where NodeID = ? and ObjectID in (?)", objectIDs) | |||
| query, args, err := sqlx.In("delete from PinnedObject where NodeID = ? and ObjectID in (?)", objectIDs) | |||
| if err != nil { | |||
| return err | |||
| } | |||
| _, err = ctx.Exec(query, args...) | |||
| return err | |||
| } | |||
| @@ -127,14 +127,16 @@ func (i *DownloadObjectIterator) chooseDownloadNode(entries []DownloadNodeInfo) | |||
| } | |||
| func (iter *DownloadObjectIterator) downloadNoneOrRepObject(coorCli *coormq.Client, ctx *DownloadContext, obj stgmodels.ObjectDetail) (io.ReadCloser, error) { | |||
| if len(obj.Blocks) == 0 { | |||
| return nil, fmt.Errorf("no node has this object") | |||
| } | |||
| //采取直接读,优先选内网节点 | |||
| var chosenNodes []DownloadNodeInfo | |||
| for i := range obj.Blocks { | |||
| if len(obj.Blocks[i].CachedNodeIDs) == 0 { | |||
| return nil, fmt.Errorf("no node has block %d", obj.Blocks[i].Index) | |||
| } | |||
| getNodesResp, err := coorCli.GetNodes(coormq.NewGetNodes(obj.Blocks[i].CachedNodeIDs)) | |||
| grpBlocks := obj.GroupBlocks() | |||
| for _, grp := range grpBlocks { | |||
| getNodesResp, err := coorCli.GetNodes(coormq.NewGetNodes(grp.NodeIDs)) | |||
| if err != nil { | |||
| continue | |||
| } | |||
| @@ -151,8 +153,8 @@ func (iter *DownloadObjectIterator) downloadNoneOrRepObject(coorCli *coormq.Clie | |||
| var fileStrs []io.ReadCloser | |||
| for i := range obj.Blocks { | |||
| str, err := downloadFile(ctx, chosenNodes[i], obj.Blocks[i].FileHash) | |||
| for i := range grpBlocks { | |||
| str, err := downloadFile(ctx, chosenNodes[i], grpBlocks[i].FileHash) | |||
| if err != nil { | |||
| for i -= 1; i >= 0; i-- { | |||
| fileStrs[i].Close() | |||
| @@ -172,19 +174,14 @@ func (iter *DownloadObjectIterator) downloadNoneOrRepObject(coorCli *coormq.Clie | |||
| func (iter *DownloadObjectIterator) downloadECObject(coorCli *coormq.Client, ctx *DownloadContext, obj stgmodels.ObjectDetail, ecRed *cdssdk.ECRedundancy) (io.ReadCloser, error) { | |||
| //采取直接读,优先选内网节点 | |||
| var chosenNodes []DownloadNodeInfo | |||
| var chosenBlocks []stgmodels.ObjectBlockDetail | |||
| for i := range obj.Blocks { | |||
| var chosenBlocks []stgmodels.GrouppedObjectBlock | |||
| grpBlocks := obj.GroupBlocks() | |||
| for i := range grpBlocks { | |||
| if len(chosenBlocks) == ecRed.K { | |||
| break | |||
| } | |||
| // 块没有被任何节点缓存或者获取失败都没关系,只要能获取到k个块的信息就行 | |||
| if len(obj.Blocks[i].CachedNodeIDs) == 0 { | |||
| continue | |||
| } | |||
| getNodesResp, err := coorCli.GetNodes(coormq.NewGetNodes(obj.Blocks[i].CachedNodeIDs)) | |||
| getNodesResp, err := coorCli.GetNodes(coormq.NewGetNodes(grpBlocks[i].NodeIDs)) | |||
| if err != nil { | |||
| continue | |||
| } | |||
| @@ -196,7 +193,7 @@ func (iter *DownloadObjectIterator) downloadECObject(coorCli *coormq.Client, ctx | |||
| } | |||
| }) | |||
| chosenBlocks = append(chosenBlocks, obj.Blocks[i]) | |||
| chosenBlocks = append(chosenBlocks, grpBlocks[i]) | |||
| chosenNodes = append(chosenNodes, iter.chooseDownloadNode(downloadNodes)) | |||
| } | |||
| @@ -45,7 +45,7 @@ func (svc *Service) GetBucketPackages(msg *coormq.GetBucketPackages) (*coormq.Ge | |||
| func (svc *Service) CreateBucket(msg *coormq.CreateBucket) (*coormq.CreateBucketResp, *mq.CodeMessage) { | |||
| var bucketID cdssdk.BucketID | |||
| err := svc.db.DoTx(sql.LevelLinearizable, func(tx *sqlx.Tx) error { | |||
| err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { | |||
| _, err := svc.db.User().GetByID(tx, msg.UserID) | |||
| if err != nil { | |||
| return fmt.Errorf("getting user by id: %w", err) | |||
| @@ -69,7 +69,7 @@ func (svc *Service) CreateBucket(msg *coormq.CreateBucket) (*coormq.CreateBucket | |||
| } | |||
| func (svc *Service) DeleteBucket(msg *coormq.DeleteBucket) (*coormq.DeleteBucketResp, *mq.CodeMessage) { | |||
| err := svc.db.DoTx(sql.LevelLinearizable, func(tx *sqlx.Tx) error { | |||
| err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { | |||
| isAvai, _ := svc.db.Bucket().IsAvailable(tx, msg.BucketID, msg.UserID) | |||
| if !isAvai { | |||
| return fmt.Errorf("bucket is not avaiable to the user") | |||
| @@ -12,7 +12,7 @@ import ( | |||
| ) | |||
| func (svc *Service) CachePackageMoved(msg *coormq.CachePackageMoved) (*coormq.CachePackageMovedResp, *mq.CodeMessage) { | |||
| err := svc.db.DoTx(sql.LevelLinearizable, func(tx *sqlx.Tx) error { | |||
| err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { | |||
| _, err := svc.db.Package().GetByID(tx, msg.PackageID) | |||
| if err != nil { | |||
| return fmt.Errorf("getting package by id: %w", err) | |||
| @@ -28,7 +28,7 @@ func (svc *Service) GetPackageObjects(msg *coormq.GetPackageObjects) (*coormq.Ge | |||
| func (svc *Service) GetPackageObjectDetails(msg *coormq.GetPackageObjectDetails) (*coormq.GetPackageObjectDetailsResp, *mq.CodeMessage) { | |||
| var details []stgmod.ObjectDetail | |||
| // 必须放在事务里进行,因为GetPackageBlockDetails是由多次数据库操作组成,必须保证数据的一致性 | |||
| err := svc.db.DoTx(sql.LevelLinearizable, func(tx *sqlx.Tx) error { | |||
| err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { | |||
| var err error | |||
| _, err = svc.db.Package().GetByID(tx, msg.PackageID) | |||
| if err != nil { | |||
| @@ -52,7 +52,7 @@ func (svc *Service) GetPackageObjectDetails(msg *coormq.GetPackageObjectDetails) | |||
| } | |||
| func (svc *Service) ChangeObjectRedundancy(msg *coormq.ChangeObjectRedundancy) (*coormq.ChangeObjectRedundancyResp, *mq.CodeMessage) { | |||
| err := svc.db.DoTx(sql.LevelLinearizable, func(tx *sqlx.Tx) error { | |||
| err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { | |||
| return svc.db.Object().BatchUpdateRedundancy(tx, msg.Entries) | |||
| }) | |||
| if err != nil { | |||
| @@ -28,7 +28,7 @@ func (svc *Service) GetPackage(msg *coormq.GetPackage) (*coormq.GetPackageResp, | |||
| func (svc *Service) CreatePackage(msg *coormq.CreatePackage) (*coormq.CreatePackageResp, *mq.CodeMessage) { | |||
| var pkgID cdssdk.PackageID | |||
| err := svc.db.DoTx(sql.LevelLinearizable, func(tx *sqlx.Tx) error { | |||
| err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { | |||
| var err error | |||
| isAvai, _ := svc.db.Bucket().IsAvailable(tx, msg.BucketID, msg.UserID) | |||
| @@ -54,7 +54,7 @@ func (svc *Service) CreatePackage(msg *coormq.CreatePackage) (*coormq.CreatePack | |||
| } | |||
| func (svc *Service) UpdatePackage(msg *coormq.UpdatePackage) (*coormq.UpdatePackageResp, *mq.CodeMessage) { | |||
| err := svc.db.DoTx(sql.LevelLinearizable, func(tx *sqlx.Tx) error { | |||
| err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { | |||
| _, err := svc.db.Package().GetByID(tx, msg.PackageID) | |||
| if err != nil { | |||
| return fmt.Errorf("getting package by id: %w", err) | |||
| @@ -85,7 +85,7 @@ func (svc *Service) UpdatePackage(msg *coormq.UpdatePackage) (*coormq.UpdatePack | |||
| } | |||
| func (svc *Service) DeletePackage(msg *coormq.DeletePackage) (*coormq.DeletePackageResp, *mq.CodeMessage) { | |||
| err := svc.db.DoTx(sql.LevelLinearizable, func(tx *sqlx.Tx) error { | |||
| err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { | |||
| isAvai, _ := svc.db.Package().IsAvailable(tx, msg.UserID, msg.PackageID) | |||
| if !isAvai { | |||
| return fmt.Errorf("package is not available to the user") | |||
| @@ -144,19 +144,17 @@ func (svc *Service) GetPackageCachedNodes(msg *coormq.GetPackageCachedNodes) (*c | |||
| for _, obj := range objDetails { | |||
| // 只要存了文件的一个块,就认为此节点存了整个文件 | |||
| for _, block := range obj.Blocks { | |||
| for _, nodeID := range block.CachedNodeIDs { | |||
| info, ok := nodeInfoMap[nodeID] | |||
| if !ok { | |||
| info = &cdssdk.NodePackageCachingInfo{ | |||
| NodeID: nodeID, | |||
| } | |||
| nodeInfoMap[nodeID] = info | |||
| info, ok := nodeInfoMap[block.NodeID] | |||
| if !ok { | |||
| info = &cdssdk.NodePackageCachingInfo{ | |||
| NodeID: block.NodeID, | |||
| } | |||
| nodeInfoMap[block.NodeID] = info | |||
| info.FileSize += obj.Object.Size | |||
| info.ObjectCount++ | |||
| } | |||
| info.FileSize += obj.Object.Size | |||
| info.ObjectCount++ | |||
| } | |||
| } | |||
| @@ -24,7 +24,7 @@ func (svc *Service) GetStorageInfo(msg *coormq.GetStorageInfo) (*coormq.GetStora | |||
| } | |||
| func (svc *Service) StoragePackageLoaded(msg *coormq.StoragePackageLoaded) (*coormq.StoragePackageLoadedResp, *mq.CodeMessage) { | |||
| err := svc.db.DoTx(sql.LevelLinearizable, func(tx *sqlx.Tx) error { | |||
| err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { | |||
| err := svc.db.StoragePackage().Create(tx, msg.StorageID, msg.PackageID, msg.UserID) | |||
| if err != nil { | |||
| return fmt.Errorf("creating storage package: %w", err) | |||
| @@ -56,7 +56,7 @@ func (t *AgentCacheGC) Execute(execCtx ExecuteContext) { | |||
| defer mutex.Unlock() | |||
| var allFileHashes []string | |||
| err = execCtx.Args.DB.DoTx(sql.LevelLinearizable, func(tx *sqlx.Tx) error { | |||
| err = execCtx.Args.DB.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { | |||
| blocks, err := execCtx.Args.DB.ObjectBlock().GetByNodeID(tx, t.NodeID) | |||
| if err != nil { | |||
| return fmt.Errorf("getting object blocks by node id: %w", err) | |||
| @@ -61,7 +61,7 @@ func (t *AgentCheckCache) Execute(execCtx ExecuteContext) { | |||
| realFileHashes := lo.SliceToMap(checkResp.FileHashes, func(hash string) (string, bool) { return hash, true }) | |||
| // 根据IPFS中实际文件情况修改元数据。修改过程中的失败均忽略。(但关联修改需要原子性) | |||
| execCtx.Args.DB.DoTx(sql.LevelLinearizable, func(tx *sqlx.Tx) error { | |||
| execCtx.Args.DB.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { | |||
| t.checkCache(execCtx, tx, realFileHashes) | |||
| t.checkPinnedObject(execCtx, tx, realFileHashes) | |||
| @@ -97,16 +97,19 @@ func (t *AgentCheckCache) checkCache(execCtx ExecuteContext, tx *sqlx.Tx, realFi | |||
| rms = append(rms, c.FileHash) | |||
| } | |||
| err = execCtx.Args.DB.Cache().NodeBatchDelete(tx, t.NodeID, rms) | |||
| if err != nil { | |||
| log.Warnf("batch delete node caches: %w", err.Error()) | |||
| return | |||
| if len(rms) > 0 { | |||
| err = execCtx.Args.DB.Cache().NodeBatchDelete(tx, t.NodeID, rms) | |||
| if err != nil { | |||
| log.Warnf("batch delete node caches: %w", err.Error()) | |||
| } | |||
| } | |||
| err = execCtx.Args.DB.Cache().BatchCreate(tx, lo.Keys(realFileHashes), t.NodeID, 0) | |||
| if err != nil { | |||
| log.Warnf("batch create node caches: %w", err) | |||
| return | |||
| if len(realFileHashesCp) > 0 { | |||
| err = execCtx.Args.DB.Cache().BatchCreate(tx, lo.Keys(realFileHashesCp), t.NodeID, 0) | |||
| if err != nil { | |||
| log.Warnf("batch create node caches: %w", err) | |||
| return | |||
| } | |||
| } | |||
| } | |||
| @@ -128,10 +131,11 @@ func (t *AgentCheckCache) checkPinnedObject(execCtx ExecuteContext, tx *sqlx.Tx, | |||
| rms = append(rms, c.ObjectID) | |||
| } | |||
| err = execCtx.Args.DB.PinnedObject().NodeBatchDelete(tx, t.NodeID, rms) | |||
| if err != nil { | |||
| log.Warnf("batch delete node pinned objects: %s", err.Error()) | |||
| return | |||
| if len(rms) > 0 { | |||
| err = execCtx.Args.DB.PinnedObject().NodeBatchDelete(tx, t.NodeID, rms) | |||
| if err != nil { | |||
| log.Warnf("batch delete node pinned objects: %s", err.Error()) | |||
| } | |||
| } | |||
| } | |||
| @@ -139,29 +143,25 @@ func (t *AgentCheckCache) checkPinnedObject(execCtx ExecuteContext, tx *sqlx.Tx, | |||
| func (t *AgentCheckCache) checkObjectBlock(execCtx ExecuteContext, tx *sqlx.Tx, realFileHashes map[string]bool) { | |||
| log := logger.WithType[AgentCheckCache]("Event") | |||
| objs, err := execCtx.Args.DB.ObjectBlock().GetByNodeID(tx, t.NodeID) | |||
| blocks, err := execCtx.Args.DB.ObjectBlock().GetByNodeID(tx, t.NodeID) | |||
| if err != nil { | |||
| log.WithField("NodeID", t.NodeID).Warnf("getting object blocks by node id: %s", err.Error()) | |||
| return | |||
| } | |||
| realFileHashesCp := make(map[string]bool) | |||
| for k, v := range realFileHashes { | |||
| realFileHashesCp[k] = v | |||
| } | |||
| var rms []string | |||
| for _, c := range objs { | |||
| if realFileHashesCp[c.FileHash] { | |||
| for _, b := range blocks { | |||
| if realFileHashes[b.FileHash] { | |||
| continue | |||
| } | |||
| rms = append(rms, c.FileHash) | |||
| rms = append(rms, b.FileHash) | |||
| } | |||
| err = execCtx.Args.DB.ObjectBlock().NodeBatchDelete(tx, t.NodeID, rms) | |||
| if err != nil { | |||
| log.Warnf("batch delete node object blocks: %w", err) | |||
| return | |||
| if len(rms) > 0 { | |||
| err = execCtx.Args.DB.ObjectBlock().NodeBatchDelete(tx, t.NodeID, rms) | |||
| if err != nil { | |||
| log.Warnf("batch delete node object blocks: %s", err.Error()) | |||
| } | |||
| } | |||
| } | |||
| @@ -88,7 +88,7 @@ func (t *AgentCheckStorage) Execute(execCtx ExecuteContext) { | |||
| pkgs[pkg.PackageID] = true | |||
| } | |||
| execCtx.Args.DB.DoTx(sql.LevelLinearizable, func(tx *sqlx.Tx) error { | |||
| execCtx.Args.DB.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { | |||
| packages, err := execCtx.Args.DB.StoragePackage().GetAllByStorageID(tx, t.StorageID) | |||
| if err != nil { | |||
| log.Warnf("getting storage package: %s", err.Error()) | |||
| @@ -2,6 +2,7 @@ package event | |||
| import ( | |||
| "fmt" | |||
| "strconv" | |||
| "time" | |||
| "github.com/samber/lo" | |||
| @@ -219,7 +220,7 @@ func (t *CheckPackageRedundancy) rechooseNodesForRep(obj stgmod.ObjectDetail, re | |||
| for _, node := range allNodes { | |||
| cachedBlockIndex := -1 | |||
| for _, block := range obj.Blocks { | |||
| if lo.Contains(block.CachedNodeIDs, node.Node.NodeID) { | |||
| if block.NodeID == node.Node.NodeID { | |||
| cachedBlockIndex = block.Index | |||
| break | |||
| } | |||
| @@ -259,7 +260,7 @@ func (t *CheckPackageRedundancy) rechooseNodesForEC(obj stgmod.ObjectDetail, red | |||
| for _, node := range allNodes { | |||
| cachedBlockIndex := -1 | |||
| for _, block := range obj.Blocks { | |||
| if lo.Contains(block.CachedNodeIDs, node.Node.NodeID) { | |||
| if block.NodeID == node.Node.NodeID { | |||
| cachedBlockIndex = block.Index | |||
| break | |||
| } | |||
| @@ -327,7 +328,7 @@ func (t *CheckPackageRedundancy) chooseSoManyNodes(count int, nodes []*NodeLoadI | |||
| } | |||
| func (t *CheckPackageRedundancy) noneToRep(obj stgmod.ObjectDetail, red *cdssdk.RepRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.ChangeObjectRedundancyEntry, error) { | |||
| if len(obj.CachedNodeIDs) == 0 { | |||
| if len(obj.Blocks) == 0 { | |||
| return nil, fmt.Errorf("object is not cached on any nodes, cannot change its redundancy to rep") | |||
| } | |||
| @@ -362,11 +363,11 @@ func (t *CheckPackageRedundancy) noneToEC(obj stgmod.ObjectDetail, red *cdssdk.E | |||
| } | |||
| defer stgglb.CoordinatorMQPool.Release(coorCli) | |||
| if len(obj.CachedNodeIDs) == 0 { | |||
| if len(obj.Blocks) == 0 { | |||
| return nil, fmt.Errorf("object is not cached on any nodes, cannot change its redundancy to ec") | |||
| } | |||
| getNodes, err := coorCli.GetNodes(coormq.NewGetNodes([]cdssdk.NodeID{obj.CachedNodeIDs[0]})) | |||
| getNodes, err := coorCli.GetNodes(coormq.NewGetNodes([]cdssdk.NodeID{obj.Blocks[0].NodeID})) | |||
| if err != nil { | |||
| return nil, fmt.Errorf("requesting to get nodes: %w", err) | |||
| } | |||
| @@ -410,7 +411,7 @@ func (t *CheckPackageRedundancy) noneToEC(obj stgmod.ObjectDetail, red *cdssdk.E | |||
| } | |||
| func (t *CheckPackageRedundancy) repToRep(obj stgmod.ObjectDetail, red *cdssdk.RepRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.ChangeObjectRedundancyEntry, error) { | |||
| if len(obj.CachedNodeIDs) == 0 { | |||
| if len(obj.Blocks) == 0 { | |||
| return nil, fmt.Errorf("object is not cached on any nodes, cannot change its redundancy to rep") | |||
| } | |||
| @@ -456,10 +457,10 @@ func (t *CheckPackageRedundancy) ecToRep(obj stgmod.ObjectDetail, srcRed *cdssdk | |||
| } | |||
| defer stgglb.CoordinatorMQPool.Release(coorCli) | |||
| var chosenBlocks []stgmod.ObjectBlockDetail | |||
| var chosenBlocks []stgmod.GrouppedObjectBlock | |||
| var chosenBlockIndexes []int | |||
| for _, block := range obj.Blocks { | |||
| if len(block.CachedNodeIDs) > 0 { | |||
| for _, block := range obj.GroupBlocks() { | |||
| if len(block.NodeIDs) > 0 { | |||
| chosenBlocks = append(chosenBlocks, block) | |||
| chosenBlockIndexes = append(chosenBlockIndexes, block.Index) | |||
| } | |||
| @@ -529,10 +530,12 @@ func (t *CheckPackageRedundancy) ecToEC(obj stgmod.ObjectDetail, srcRed *cdssdk. | |||
| } | |||
| defer stgglb.CoordinatorMQPool.Release(coorCli) | |||
| var chosenBlocks []stgmod.ObjectBlockDetail | |||
| grpBlocks := obj.GroupBlocks() | |||
| var chosenBlocks []stgmod.GrouppedObjectBlock | |||
| var chosenBlockIndexes []int | |||
| for _, block := range obj.Blocks { | |||
| if len(block.CachedNodeIDs) > 0 { | |||
| for _, block := range grpBlocks { | |||
| if len(block.NodeIDs) > 0 { | |||
| chosenBlocks = append(chosenBlocks, block) | |||
| chosenBlockIndexes = append(chosenBlockIndexes, block.Index) | |||
| } | |||
| @@ -551,28 +554,26 @@ func (t *CheckPackageRedundancy) ecToEC(obj stgmod.ObjectDetail, srcRed *cdssdk. | |||
| var newBlocks []stgmod.ObjectBlock | |||
| shouldUpdateBlocks := false | |||
| for i := range obj.Blocks { | |||
| newBlocks = append(newBlocks, stgmod.ObjectBlock{ | |||
| for i, node := range uploadNodes { | |||
| newBlock := stgmod.ObjectBlock{ | |||
| ObjectID: obj.Object.ObjectID, | |||
| Index: i, | |||
| NodeID: uploadNodes[i].Node.NodeID, | |||
| FileHash: obj.Blocks[i].FileHash, | |||
| }) | |||
| NodeID: node.Node.NodeID, | |||
| } | |||
| grp, ok := lo.Find(grpBlocks, func(grp stgmod.GrouppedObjectBlock) bool { return grp.Index == i }) | |||
| // 如果新选中的节点已经记录在Block表中,那么就不需要任何变更 | |||
| if lo.Contains(obj.Blocks[i].NodeIDs, uploadNodes[i].Node.NodeID) { | |||
| if ok && lo.Contains(grp.NodeIDs, node.Node.NodeID) { | |||
| newBlock.FileHash = grp.FileHash | |||
| newBlocks = append(newBlocks, newBlock) | |||
| continue | |||
| } | |||
| shouldUpdateBlocks = true | |||
| // 新选的节点不在Block表中,但实际上保存了分块的数据,那么只需建立一条Block记录即可 | |||
| if lo.Contains(obj.Blocks[i].CachedNodeIDs, uploadNodes[i].Node.NodeID) { | |||
| continue | |||
| } | |||
| // 否则就要重建出这个节点需要的块 | |||
| tarNode := planBlder.AtAgent(uploadNodes[i].Node) | |||
| tarNode := planBlder.AtAgent(node.Node) | |||
| var inputs []*plans.AgentStream | |||
| for _, block := range chosenBlocks { | |||
| @@ -580,7 +581,8 @@ func (t *CheckPackageRedundancy) ecToEC(obj stgmod.ObjectDetail, srcRed *cdssdk. | |||
| } | |||
| // 输出只需要自己要保存的那一块 | |||
| tarNode.ECReconstructAny(*srcRed, chosenBlockIndexes, []int{i}, inputs...).Stream(0).IPFSWrite("") | |||
| tarNode.ECReconstructAny(*srcRed, chosenBlockIndexes, []int{i}, inputs...).Stream(0).IPFSWrite(fmt.Sprintf("%d", i)) | |||
| newBlocks = append(newBlocks, newBlock) | |||
| } | |||
| plan, err := planBlder.Build() | |||
| @@ -594,7 +596,7 @@ func (t *CheckPackageRedundancy) ecToEC(obj stgmod.ObjectDetail, srcRed *cdssdk. | |||
| } | |||
| // 如果没有任何Plan,Wait会直接返回成功 | |||
| _, err = exec.Wait() | |||
| ret, err := exec.Wait() | |||
| if err != nil { | |||
| return nil, fmt.Errorf("executing io plan: %w", err) | |||
| } | |||
| @@ -603,6 +605,15 @@ func (t *CheckPackageRedundancy) ecToEC(obj stgmod.ObjectDetail, srcRed *cdssdk. | |||
| return nil, nil | |||
| } | |||
| for k, v := range ret.ResultValues { | |||
| idx, err := strconv.ParseInt(k, 10, 64) | |||
| if err != nil { | |||
| return nil, fmt.Errorf("parsing result key %s as index: %w", k, err) | |||
| } | |||
| newBlocks[idx].FileHash = v.(string) | |||
| } | |||
| return &coormq.ChangeObjectRedundancyEntry{ | |||
| ObjectID: obj.Object.ObjectID, | |||
| Redundancy: tarRed, | |||