diff --git a/agent/internal/mq/object.go b/agent/internal/mq/object.go index 06a106f..dbdaacc 100644 --- a/agent/internal/mq/object.go +++ b/agent/internal/mq/object.go @@ -2,19 +2,19 @@ package mq import ( "gitlink.org.cn/cloudream/common/consts/errorcode" - log "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" "gitlink.org.cn/cloudream/storage/agent/internal/task" agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" ) func (svc *Service) PinObject(msg *agtmq.PinObject) (*agtmq.PinObjectResp, *mq.CodeMessage) { - log.WithField("FileHash", msg.FileHash).Debugf("pin object") + logger.WithField("FileHash", msg.FileHashes).Debugf("pin object") - tsk := svc.taskManager.StartComparable(task.NewIPFSPin(msg.FileHash)) + tsk := svc.taskManager.StartNew(task.NewIPFSPin(msg.FileHashes)) if tsk.Error() != nil { - log.WithField("FileHash", msg.FileHash). + logger.WithField("FileHash", msg.FileHashes). Warnf("pin object failed, err: %s", tsk.Error().Error()) return nil, mq.Failed(errorcode.OperationFailed, "pin object failed") } diff --git a/agent/internal/task/ipfs_pin.go b/agent/internal/task/ipfs_pin.go index 4807c99..622b45d 100644 --- a/agent/internal/task/ipfs_pin.go +++ b/agent/internal/task/ipfs_pin.go @@ -10,24 +10,15 @@ import ( ) type IPFSPin struct { - FileHash string + FileHashes []string } -func NewIPFSPin(fileHash string) *IPFSPin { +func NewIPFSPin(fileHashes []string) *IPFSPin { return &IPFSPin{ - FileHash: fileHash, + FileHashes: fileHashes, } } -func (t *IPFSPin) Compare(other *Task) bool { - tsk, ok := other.Body().(*IPFSPin) - if !ok { - return false - } - - return t.FileHash == tsk.FileHash -} - func (t *IPFSPin) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { log := logger.WithType[IPFSPin]("Task") log.Debugf("begin with %v", logger.FormatStruct(t)) @@ -45,15 +36,17 @@ func (t *IPFSPin) Execute(task *task.Task[TaskContext], ctx TaskContext, complet } defer ipfsCli.Close() - err = ipfsCli.Pin(t.FileHash) - if err != nil { - err := fmt.Errorf("pin file failed, err: %w", err) - log.WithField("FileHash", t.FileHash).Warn(err.Error()) - - complete(err, CompleteOption{ - RemovingDelay: time.Minute, - }) - return + for _, fileHash := range t.FileHashes { + err = ipfsCli.Pin(fileHash) + if err != nil { + err := fmt.Errorf("pin file failed, err: %w", err) + log.WithField("FileHash", fileHash).Warn(err.Error()) + + complete(err, CompleteOption{ + RemovingDelay: time.Minute, + }) + return + } } complete(nil, CompleteOption{ diff --git a/agent/internal/task/storage_load_package.go b/agent/internal/task/storage_load_package.go index 23c7b17..ba40773 100644 --- a/agent/internal/task/storage_load_package.go +++ b/agent/internal/task/storage_load_package.go @@ -15,7 +15,7 @@ import ( cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" myio "gitlink.org.cn/cloudream/common/utils/io" myref "gitlink.org.cn/cloudream/common/utils/reflect" - mysort "gitlink.org.cn/cloudream/common/utils/sort" + "gitlink.org.cn/cloudream/common/utils/sort2" "gitlink.org.cn/cloudream/storage/common/consts" stgglb "gitlink.org.cn/cloudream/storage/common/globals" stgmod "gitlink.org.cn/cloudream/storage/common/models" @@ -103,6 +103,7 @@ func (t *StorageLoadPackage) do(task *task.Task[TaskContext], ctx TaskContext) e return fmt.Errorf("loading package to storage: %w", err) } + // TODO 要防止下载的临时文件被删除 return err } @@ -289,8 +290,8 @@ func (t *StorageLoadPackage) sortDownloadNodes(coorCli *coormq.Client, obj stgmo node.Blocks = append(node.Blocks, b) } - return mysort.Sort(lo.Values(downloadNodeMap), func(left, right *downloadNodeInfo) int { - return mysort.Cmp(left.Distance, right.Distance) + return sort2.Sort(lo.Values(downloadNodeMap), func(left, right *downloadNodeInfo) int { + return sort2.Cmp(left.Distance, right.Distance) }), nil } diff --git a/client/internal/cmdline/cache.go b/client/internal/cmdline/cache.go index 7a65745..a94859e 100644 --- a/client/internal/cmdline/cache.go +++ b/client/internal/cmdline/cache.go @@ -29,6 +29,12 @@ func CacheMovePackage(ctx CommandContext, packageID cdssdk.PackageID, nodeID cds } } +func CacheRemovePackage(ctx CommandContext, packageID cdssdk.PackageID, nodeID cdssdk.NodeID) error { + return ctx.Cmdline.Svc.CacheSvc().CacheRemovePackage(packageID, nodeID) +} + func init() { commands.Add(CacheMovePackage, "cache", "move") + + commands.Add(CacheRemovePackage, "cache", "remove") } diff --git a/client/internal/cmdline/package.go b/client/internal/cmdline/package.go index b0f9bc2..a4a7dfd 100644 --- a/client/internal/cmdline/package.go +++ b/client/internal/cmdline/package.go @@ -46,6 +46,8 @@ func PackageDownloadPackage(ctx CommandContext, packageID cdssdk.PackageID, outp } defer objIter.Close() + madeDirs := make(map[string]bool) + for { objInfo, err := objIter.MoveNext() if err == iterator.ErrNoMoreItem { @@ -61,8 +63,11 @@ func PackageDownloadPackage(ctx CommandContext, packageID cdssdk.PackageID, outp fullPath := filepath.Join(outputDir, objInfo.Object.Path) dirPath := filepath.Dir(fullPath) - if err := os.MkdirAll(dirPath, 0755); err != nil { - return fmt.Errorf("creating object dir: %w", err) + if !madeDirs[dirPath] { + if err := os.MkdirAll(dirPath, 0755); err != nil { + return fmt.Errorf("creating object dir: %w", err) + } + madeDirs[dirPath] = true } outputFile, err := os.Create(fullPath) @@ -135,6 +140,7 @@ func PackageCreatePackage(ctx CommandContext, name string, rootPath string, buck }) } fmt.Print(tb.Render()) + fmt.Printf("\n%v", uploadObjectResult.PackageID) return nil } diff --git a/client/internal/services/cache.go b/client/internal/services/cache.go index ac37be9..a32e456 100644 --- a/client/internal/services/cache.go +++ b/client/internal/services/cache.go @@ -8,6 +8,7 @@ import ( stgglb "gitlink.org.cn/cloudream/storage/common/globals" agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" + coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) type CacheService struct { @@ -55,3 +56,18 @@ func (svc *CacheService) WaitCacheMovePackage(nodeID cdssdk.NodeID, taskID strin return true, nil } + +func (svc *CacheService) CacheRemovePackage(packageID cdssdk.PackageID, nodeID cdssdk.NodeID) error { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return fmt.Errorf("new agent client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + _, err = coorCli.CacheRemovePackage(coormq.ReqCacheRemoveMovedPackage(packageID, nodeID)) + if err != nil { + return fmt.Errorf("requesting to coordinator: %w", err) + } + + return nil +} diff --git a/common/assets/confs/agent.config.json b/common/assets/confs/agent.config.json index e706fd4..226711e 100644 --- a/common/assets/confs/agent.config.json +++ b/common/assets/confs/agent.config.json @@ -30,6 +30,7 @@ "etcdUsername": "", "etcdPassword": "", "etcdLockLeaseTimeSec": 5, - "description": "I am a agent" + "randomReleasingDelayMs": 3000, + "serviceDescription": "I am a agent" } } \ No newline at end of file diff --git a/common/assets/confs/client.config.json b/common/assets/confs/client.config.json index 10e4265..d7ce483 100644 --- a/common/assets/confs/client.config.json +++ b/common/assets/confs/client.config.json @@ -23,6 +23,7 @@ "etcdUsername": "", "etcdPassword": "", "etcdLockLeaseTimeSec": 5, - "description": "I am a client" + "randomReleasingDelayMs": 3000, + "serviceDescription": "I am a client" } } \ No newline at end of file diff --git a/common/assets/confs/scanner.config.json b/common/assets/confs/scanner.config.json index 4dbd650..e84b602 100644 --- a/common/assets/confs/scanner.config.json +++ b/common/assets/confs/scanner.config.json @@ -24,6 +24,7 @@ "etcdUsername": "", "etcdPassword": "", "etcdLockLeaseTimeSec": 5, - "description": "I am a scanner" + "randomReleasingDelayMs": 3000, + "serviceDescription": "I am a scanner" } } \ No newline at end of file diff --git a/common/assets/scripts/create_database.sql b/common/assets/scripts/create_database.sql index 850bfd4..3652c7a 100644 --- a/common/assets/scripts/create_database.sql +++ b/common/assets/scripts/create_database.sql @@ -157,8 +157,8 @@ create table StoragePackage ( ); create table StoragePackageLog ( - PackageID int not null comment '包ID', StorageID int not null comment '存储服务ID', + PackageID int not null comment '包ID', UserID int not null comment '调度了此文件的用户ID', CreateTime timestamp not null comment '加载Package完成的时间' ); diff --git a/common/pkgs/cmd/create_package.go b/common/pkgs/cmd/create_package.go index 7a5dd77..f250dc5 100644 --- a/common/pkgs/cmd/create_package.go +++ b/common/pkgs/cmd/create_package.go @@ -139,6 +139,10 @@ func uploadAndUpdatePackage(packageID cdssdk.PackageID, objectIter iterator.Uplo if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + // 为所有文件选择相同的上传节点 + uploadNode := chooseUploadNode(userNodes, nodeAffinity) var uploadRets []ObjectUploadResult //上传文件夹 @@ -154,8 +158,6 @@ func uploadAndUpdatePackage(packageID cdssdk.PackageID, objectIter iterator.Uplo err = func() error { defer objInfo.File.Close() - uploadNode := chooseUploadNode(userNodes, nodeAffinity) - fileHash, err := uploadFile(objInfo.File, uploadNode) if err != nil { return fmt.Errorf("uploading file: %w", err) @@ -165,9 +167,6 @@ func uploadAndUpdatePackage(packageID cdssdk.PackageID, objectIter iterator.Uplo Info: objInfo, Error: err, }) - if err != nil { - return fmt.Errorf("uploading object: %w", err) - } adds = append(adds, coormq.NewAddObjectEntry(objInfo.Path, objInfo.Size, fileHash, uploadNode.Node.NodeID)) return nil @@ -177,7 +176,7 @@ func uploadAndUpdatePackage(packageID cdssdk.PackageID, objectIter iterator.Uplo } } - _, err = coorCli.UpdateECPackage(coormq.NewUpdatePackage(packageID, adds, nil)) + _, err = coorCli.UpdatePackage(coormq.NewUpdatePackage(packageID, adds, nil)) if err != nil { return nil, fmt.Errorf("updating package: %w", err) } @@ -262,7 +261,7 @@ func pinIPFSFile(nodeID cdssdk.NodeID, fileHash string) error { defer stgglb.AgentMQPool.Release(agtCli) // 然后让最近节点pin本地上传的文件 - _, err = agtCli.PinObject(agtmq.ReqPinObject(fileHash, false)) + _, err = agtCli.PinObject(agtmq.ReqPinObject([]string{fileHash}, false)) if err != nil { return fmt.Errorf("start pinning object: %w", err) } diff --git a/common/pkgs/db/cache.go b/common/pkgs/db/cache.go index ea59e8a..84ff1ca 100644 --- a/common/pkgs/db/cache.go +++ b/common/pkgs/db/cache.go @@ -44,7 +44,19 @@ func (*CacheDB) Create(ctx SQLContext, fileHash string, nodeID cdssdk.NodeID, pr return nil } -func (*CacheDB) BatchCreate(ctx SQLContext, fileHashes []string, nodeID cdssdk.NodeID, priority int) error { +// 批量创建缓存记录 +func (*CacheDB) BatchCreate(ctx SQLContext, caches []model.Cache) error { + return BatchNamedExec( + ctx, + "insert into Cache(FileHash,NodeID,CreateTime,Priority) values(:FileHash,:NodeID,:CreateTime,:Priority)"+ + " on duplicate key update CreateTime=values(CreateTime), Priority=values(Priority)", + 4, + caches, + nil, + ) +} + +func (*CacheDB) BatchCreateOnSameNode(ctx SQLContext, fileHashes []string, nodeID cdssdk.NodeID, priority int) error { var caches []model.Cache var nowTime = time.Now() for _, hash := range fileHashes { @@ -56,11 +68,13 @@ func (*CacheDB) BatchCreate(ctx SQLContext, fileHashes []string, nodeID cdssdk.N }) } - _, err := sqlx.NamedExec(ctx, "insert into Cache(FileHash,NodeID,CreateTime,Priority) values(:FileHash,:NodeID,:CreateTime,:Priority)"+ - " on duplicate key update CreateTime=values(CreateTime), Priority=values(Priority)", + return BatchNamedExec(ctx, + "insert into Cache(FileHash,NodeID,CreateTime,Priority) values(:FileHash,:NodeID,:CreateTime,:Priority)"+ + " on duplicate key update CreateTime=values(CreateTime), Priority=values(Priority)", + 4, caches, + nil, ) - return err } func (*CacheDB) NodeBatchDelete(ctx SQLContext, nodeID cdssdk.NodeID, fileHashes []string) error { diff --git a/common/pkgs/db/db.go b/common/pkgs/db/db.go index 77761a3..d04f878 100644 --- a/common/pkgs/db/db.go +++ b/common/pkgs/db/db.go @@ -18,6 +18,11 @@ type SQLContext interface { sqlx.Queryer sqlx.Execer sqlx.Ext + sqlx.Preparer + + NamedQuery(query string, arg interface{}) (*sqlx.Rows, error) + NamedExec(query string, arg interface{}) (sql.Result, error) + PrepareNamed(query string) (*sqlx.NamedStmt, error) } func NewDB(cfg *config.Config) (*DB, error) { diff --git a/common/pkgs/db/object.go b/common/pkgs/db/object.go index af4369e..24e3941 100644 --- a/common/pkgs/db/object.go +++ b/common/pkgs/db/object.go @@ -2,6 +2,7 @@ package db import ( "fmt" + "time" "github.com/jmoiron/sqlx" "github.com/samber/lo" @@ -25,6 +26,23 @@ func (db *ObjectDB) GetByID(ctx SQLContext, objectID cdssdk.ObjectID) (model.Obj return ret.ToObject(), err } +func (db *ObjectDB) BatchGetPackageObjectIDs(ctx SQLContext, pkgID cdssdk.PackageID, pathes []string) ([]cdssdk.ObjectID, error) { + // TODO In语句 + stmt, args, err := sqlx.In("select ObjectID from Object force index(PackagePath) where PackageID=? and Path in (?)", pkgID, pathes) + if err != nil { + return nil, err + } + stmt = ctx.Rebind(stmt) + + objIDs := make([]cdssdk.ObjectID, 0, len(pathes)) + err = sqlx.Select(ctx, &objIDs, stmt, args...) + if err != nil { + return nil, err + } + + return objIDs, nil +} + func (db *ObjectDB) Create(ctx SQLContext, packageID cdssdk.PackageID, path string, size int64, fileHash string, redundancy cdssdk.Redundancy) (int64, error) { sql := "insert into Object(PackageID, Path, Size, FileHash, Redundancy) values(?,?,?,?,?)" @@ -75,6 +93,15 @@ func (db *ObjectDB) CreateOrUpdate(ctx SQLContext, packageID cdssdk.PackageID, p return objID, false, nil } +// 批量创建或者更新记录 +func (db *ObjectDB) BatchCreateOrUpdate(ctx SQLContext, objs []cdssdk.Object) error { + sql := "insert into Object(PackageID, Path, Size, FileHash, Redundancy)" + + " values(:PackageID,:Path,:Size,:FileHash,:Redundancy)" + + " on duplicate key update Size = values(Size), FileHash = values(FileHash), Redundancy = values(Redundancy)" + + return BatchNamedExec(ctx, sql, 5, objs, nil) +} + func (*ObjectDB) UpdateFileInfo(ctx SQLContext, objectID cdssdk.ObjectID, fileSize int64) (bool, error) { ret, err := ctx.Exec("update Object set FileSize = ? where ObjectID = ?", fileSize, objectID) if err != nil { @@ -104,103 +131,189 @@ func (db *ObjectDB) GetPackageObjectDetails(ctx SQLContext, packageID cdssdk.Pac rets := make([]stgmod.ObjectDetail, 0, len(objs)) - for _, obj := range objs { - var blocks []stgmod.ObjectBlock - err = sqlx.Select(ctx, - &blocks, - "select * from ObjectBlock where ObjectID = ? order by `Index`", - obj.ObjectID, - ) - if err != nil { - return nil, err + var allBlocks []stgmod.ObjectBlock + err = sqlx.Select(ctx, &allBlocks, "select ObjectBlock.* from ObjectBlock, Object where PackageID = ? and ObjectBlock.ObjectID = Object.ObjectID order by ObjectBlock.ObjectID, `Index` asc", packageID) + if err != nil { + return nil, fmt.Errorf("getting all object blocks: %w", err) + } + + var allPinnedObjs []cdssdk.PinnedObject + err = sqlx.Select(ctx, &allPinnedObjs, "select PinnedObject.* from PinnedObject, Object where PackageID = ? and PinnedObject.ObjectID = Object.ObjectID order by PinnedObject.ObjectID", packageID) + if err != nil { + return nil, fmt.Errorf("getting all pinned objects: %w", err) + } + + blksCur := 0 + pinnedsCur := 0 + for _, temp := range objs { + detail := stgmod.ObjectDetail{ + Object: temp.ToObject(), } - var pinnedAt []cdssdk.NodeID - err = sqlx.Select(ctx, &pinnedAt, "select NodeID from PinnedObject where ObjectID = ?", obj.ObjectID) - if err != nil { - return nil, err + // 1. 查询Object和ObjectBlock时均按照ObjectID升序排序 + // 2. ObjectBlock结果集中的不同ObjectID数只会比Object结果集的少 + // 因此在两个结果集上同时从头开始遍历时,如果两边的ObjectID字段不同,那么一定是ObjectBlock这边的ObjectID > Object的ObjectID, + // 此时让Object的遍历游标前进,直到两边的ObjectID再次相等 + for ; blksCur < len(allBlocks); blksCur++ { + if allBlocks[blksCur].ObjectID != temp.ObjectID { + break + } + detail.Blocks = append(detail.Blocks, allBlocks[blksCur]) } - rets = append(rets, stgmod.NewObjectDetail(obj.ToObject(), pinnedAt, blocks)) + for ; pinnedsCur < len(allPinnedObjs); pinnedsCur++ { + if allPinnedObjs[pinnedsCur].ObjectID != temp.ObjectID { + break + } + detail.PinnedAt = append(detail.PinnedAt, allPinnedObjs[pinnedsCur].NodeID) + } + + rets = append(rets, detail) } return rets, nil } -func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, objs []coormq.AddObjectEntry) ([]cdssdk.ObjectID, error) { - objIDs := make([]cdssdk.ObjectID, 0, len(objs)) - for _, obj := range objs { - // 创建对象的记录 - objID, isCreate, err := db.CreateOrUpdate(ctx, packageID, obj.Path, obj.Size, obj.FileHash) - if err != nil { - return nil, fmt.Errorf("creating object: %w", err) - } +func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds []coormq.AddObjectEntry) ([]cdssdk.ObjectID, error) { + objs := make([]cdssdk.Object, 0, len(adds)) + for _, add := range adds { + objs = append(objs, cdssdk.Object{ + PackageID: packageID, + Path: add.Path, + Size: add.Size, + FileHash: add.FileHash, + Redundancy: cdssdk.NewNoneRedundancy(), // 首次上传默认使用不分块的none模式 + }) + } - objIDs = append(objIDs, objID) + err := db.BatchCreateOrUpdate(ctx, objs) + if err != nil { + return nil, fmt.Errorf("batch create or update objects: %w", err) + } - if !isCreate { - // 删除原本所有的编码块记录,重新添加 - if err = db.ObjectBlock().DeleteByObjectID(ctx, objID); err != nil { - return nil, fmt.Errorf("deleting all object block: %w", err) - } + pathes := make([]string, 0, len(adds)) + for _, add := range adds { + pathes = append(pathes, add.Path) + } + objIDs, err := db.BatchGetPackageObjectIDs(ctx, packageID, pathes) + if err != nil { + return nil, fmt.Errorf("batch get object ids: %w", err) + } - // 删除原本Pin住的Object。暂不考虑FileHash没有变化的情况 - if err = db.PinnedObject().DeleteByObjectID(ctx, objID); err != nil { - return nil, fmt.Errorf("deleting all pinned object: %w", err) - } - } + err = db.ObjectBlock().BatchDeleteByObjectID(ctx, objIDs) + if err != nil { + return nil, fmt.Errorf("batch delete object blocks: %w", err) + } - // 首次上传默认使用不分块的none模式 - err = db.ObjectBlock().Create(ctx, objID, 0, obj.NodeID, obj.FileHash) - if err != nil { - return nil, fmt.Errorf("creating object block: %w", err) - } + err = db.PinnedObject().BatchDeleteByObjectID(ctx, objIDs) + if err != nil { + return nil, fmt.Errorf("batch delete pinned objects: %w", err) + } - // 创建缓存记录 - err = db.Cache().Create(ctx, obj.FileHash, obj.NodeID, 0) - if err != nil { - return nil, fmt.Errorf("creating cache: %w", err) - } + objBlocks := make([]stgmod.ObjectBlock, 0, len(adds)) + for i, add := range adds { + objBlocks = append(objBlocks, stgmod.ObjectBlock{ + ObjectID: objIDs[i], + Index: 0, + NodeID: add.NodeID, + FileHash: add.FileHash, + }) + } + + err = db.ObjectBlock().BatchCreate(ctx, objBlocks) + if err != nil { + return nil, fmt.Errorf("batch create object blocks: %w", err) + } + + caches := make([]model.Cache, 0, len(adds)) + for _, add := range adds { + caches = append(caches, model.Cache{ + FileHash: add.FileHash, + NodeID: add.NodeID, + CreateTime: time.Now(), + Priority: 0, + }) + } + + err = db.Cache().BatchCreate(ctx, caches) + if err != nil { + return nil, fmt.Errorf("batch create caches: %w", err) } return objIDs, nil } func (db *ObjectDB) BatchUpdateRedundancy(ctx SQLContext, objs []coormq.ChangeObjectRedundancyEntry) error { + objIDs := make([]cdssdk.ObjectID, 0, len(objs)) + dummyObjs := make([]cdssdk.Object, 0, len(objs)) for _, obj := range objs { - _, err := ctx.Exec("update Object set Redundancy = ? where ObjectID = ?", obj.Redundancy, obj.ObjectID) - if err != nil { - return fmt.Errorf("updating object: %w", err) - } + objIDs = append(objIDs, obj.ObjectID) + dummyObjs = append(dummyObjs, cdssdk.Object{ + ObjectID: obj.ObjectID, + Redundancy: obj.Redundancy, + }) + } - // 删除原本所有的编码块记录,重新添加 - if err = db.ObjectBlock().DeleteByObjectID(ctx, obj.ObjectID); err != nil { - return fmt.Errorf("deleting all object block: %w", err) - } + // 目前只能使用这种方式来同时更新大量数据 + err := BatchNamedExec(ctx, + "insert into Object(ObjectID, PackageID, Path, Size, FileHash, Redundancy)"+ + " values(:ObjectID, :PackageID, :Path, :Size, :FileHash, :Redundancy) as new"+ + " on duplicate key update Redundancy=new.Redundancy", 6, dummyObjs, nil) + if err != nil { + return fmt.Errorf("batch update object redundancy: %w", err) + } - // 删除原本Pin住的Object。暂不考虑FileHash没有变化的情况 - if err = db.PinnedObject().DeleteByObjectID(ctx, obj.ObjectID); err != nil { - return fmt.Errorf("deleting all pinned object: %w", err) - } + // 删除原本所有的编码块记录,重新添加 + err = db.ObjectBlock().BatchDeleteByObjectID(ctx, objIDs) + if err != nil { + return fmt.Errorf("batch delete object blocks: %w", err) + } - for _, block := range obj.Blocks { - err = db.ObjectBlock().Create(ctx, obj.ObjectID, block.Index, block.NodeID, block.FileHash) - if err != nil { - return fmt.Errorf("creating object block: %w", err) - } + // 删除原本Pin住的Object。暂不考虑FileHash没有变化的情况 + err = db.PinnedObject().BatchDeleteByObjectID(ctx, objIDs) + if err != nil { + return fmt.Errorf("batch delete pinned object: %w", err) + } - // 创建缓存记录 - err = db.Cache().Create(ctx, block.FileHash, block.NodeID, 0) - if err != nil { - return fmt.Errorf("creating cache: %w", err) - } + blocks := make([]stgmod.ObjectBlock, 0, len(objs)) + for _, obj := range objs { + blocks = append(blocks, obj.Blocks...) + } + err = db.ObjectBlock().BatchCreate(ctx, blocks) + if err != nil { + return fmt.Errorf("batch create object blocks: %w", err) + } + + caches := make([]model.Cache, 0, len(objs)) + for _, obj := range objs { + for _, blk := range obj.Blocks { + caches = append(caches, model.Cache{ + FileHash: blk.FileHash, + NodeID: blk.NodeID, + CreateTime: time.Now(), + Priority: 0, + }) } + } + err = db.Cache().BatchCreate(ctx, caches) + if err != nil { + return fmt.Errorf("batch create object caches: %w", err) + } - err = db.PinnedObject().ObjectBatchCreate(ctx, obj.ObjectID, obj.PinnedAt) - if err != nil { - return fmt.Errorf("creating pinned object: %w", err) + pinneds := make([]cdssdk.PinnedObject, 0, len(objs)) + for _, obj := range objs { + for _, p := range obj.PinnedAt { + pinneds = append(pinneds, cdssdk.PinnedObject{ + ObjectID: obj.ObjectID, + NodeID: p, + CreateTime: time.Now(), + }) } } + err = db.PinnedObject().BatchTryCreate(ctx, pinneds) + if err != nil { + return fmt.Errorf("batch create pinned objects: %w", err) + } return nil } diff --git a/common/pkgs/db/object_block.go b/common/pkgs/db/object_block.go index e560e03..f420bfb 100644 --- a/common/pkgs/db/object_block.go +++ b/common/pkgs/db/object_block.go @@ -30,11 +30,12 @@ func (db *ObjectBlockDB) Create(ctx SQLContext, objectID cdssdk.ObjectID, index } func (db *ObjectBlockDB) BatchCreate(ctx SQLContext, blocks []stgmod.ObjectBlock) error { - _, err := sqlx.NamedExec(ctx, + return BatchNamedExec(ctx, "insert ignore into ObjectBlock(ObjectID, `Index`, NodeID, FileHash) values(:ObjectID, :Index, :NodeID, :FileHash)", + 4, blocks, + nil, ) - return err } func (db *ObjectBlockDB) DeleteByObjectID(ctx SQLContext, objectID cdssdk.ObjectID) error { @@ -42,6 +43,16 @@ func (db *ObjectBlockDB) DeleteByObjectID(ctx SQLContext, objectID cdssdk.Object return err } +func (db *ObjectBlockDB) BatchDeleteByObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) error { + // TODO in语句有长度限制 + query, args, err := sqlx.In("delete from ObjectBlock where ObjectID in (?)", objectIDs) + if err != nil { + return err + } + _, err = ctx.Exec(query, args...) + return err +} + func (db *ObjectBlockDB) DeleteInPackage(ctx SQLContext, packageID cdssdk.PackageID) error { _, err := ctx.Exec("delete ObjectBlock from ObjectBlock inner join Object on ObjectBlock.ObjectID = Object.ObjectID where PackageID = ?", packageID) return err diff --git a/common/pkgs/db/package.go b/common/pkgs/db/package.go index 1491bec..4310ced 100644 --- a/common/pkgs/db/package.go +++ b/common/pkgs/db/package.go @@ -80,7 +80,7 @@ func (db *PackageDB) GetUserPackage(ctx SQLContext, userID cdssdk.UserID, packag func (db *PackageDB) Create(ctx SQLContext, bucketID cdssdk.BucketID, name string) (cdssdk.PackageID, error) { // 根据packagename和bucketid查询,若不存在则插入,若存在则返回错误 var packageID int64 - err := sqlx.Get(ctx, &packageID, "select PackageID from Package where Name = ? AND BucketID = ?", name, bucketID) + err := sqlx.Get(ctx, &packageID, "select PackageID from Package where Name = ? AND BucketID = ? for update", name, bucketID) // 无错误代表存在记录 if err == nil { return 0, fmt.Errorf("package with given Name and BucketID already exists") diff --git a/common/pkgs/db/pinned_object.go b/common/pkgs/db/pinned_object.go index 026e5ca..6853315 100644 --- a/common/pkgs/db/pinned_object.go +++ b/common/pkgs/db/pinned_object.go @@ -39,6 +39,10 @@ func (*PinnedObjectDB) TryCreate(ctx SQLContext, nodeID cdssdk.NodeID, objectID return err } +func (*PinnedObjectDB) BatchTryCreate(ctx SQLContext, pinneds []cdssdk.PinnedObject) error { + return BatchNamedExec(ctx, "insert ignore into PinnedObject values(:NodeID,:ObjectID,:CreateTime)", 3, pinneds, nil) +} + func (*PinnedObjectDB) CreateFromPackage(ctx SQLContext, packageID cdssdk.PackageID, nodeID cdssdk.NodeID) error { _, err := ctx.Exec( "insert ignore into PinnedObject(NodeID, ObjectID, CreateTime) select ? as NodeID, ObjectID, ? as CreateTime from Object where PackageID = ?", @@ -69,11 +73,26 @@ func (*PinnedObjectDB) DeleteByObjectID(ctx SQLContext, objectID cdssdk.ObjectID return err } +func (*PinnedObjectDB) BatchDeleteByObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) error { + // TODO in语句有长度限制 + query, args, err := sqlx.In("delete from PinnedObject where ObjectID in (?)", objectIDs) + if err != nil { + return err + } + _, err = ctx.Exec(query, args...) + return err +} + func (*PinnedObjectDB) DeleteInPackage(ctx SQLContext, packageID cdssdk.PackageID) error { _, err := ctx.Exec("delete PinnedObject from PinnedObject inner join Object on PinnedObject.ObjectID = Object.ObjectID where PackageID = ?", packageID) return err } +func (*PinnedObjectDB) DeleteInPackageAtNode(ctx SQLContext, packageID cdssdk.PackageID, nodeID cdssdk.NodeID) error { + _, err := ctx.Exec("delete PinnedObject from PinnedObject inner join Object on PinnedObject.ObjectID = Object.ObjectID where PackageID = ? and NodeID = ?", packageID, nodeID) + return err +} + func (*PinnedObjectDB) NodeBatchDelete(ctx SQLContext, nodeID cdssdk.NodeID, objectIDs []cdssdk.ObjectID) error { query, args, err := sqlx.In("delete from PinnedObject where NodeID = ? and ObjectID in (?)", objectIDs) if err != nil { diff --git a/common/pkgs/db/storage_package.go b/common/pkgs/db/storage_package.go index b29842e..042fd44 100644 --- a/common/pkgs/db/storage_package.go +++ b/common/pkgs/db/storage_package.go @@ -34,8 +34,9 @@ func (*StoragePackageDB) GetAllByStorageID(ctx SQLContext, storageID cdssdk.Stor return ret, err } -func (*StoragePackageDB) Create(ctx SQLContext, storageID cdssdk.StorageID, packageID cdssdk.PackageID, userID cdssdk.UserID) error { - _, err := ctx.Exec("insert into StoragePackage values(?,?,?,?)", storageID, packageID, userID, model.StoragePackageStateNormal) +func (*StoragePackageDB) CreateOrUpdate(ctx SQLContext, storageID cdssdk.StorageID, packageID cdssdk.PackageID, userID cdssdk.UserID) error { + _, err := ctx.Exec("insert into StoragePackage values(?,?,?,?)"+ + " on duplicate key update State=values(State)", storageID, packageID, userID, model.StoragePackageStateNormal) return err } diff --git a/common/pkgs/db/utils.go b/common/pkgs/db/utils.go new file mode 100644 index 0000000..5dde56f --- /dev/null +++ b/common/pkgs/db/utils.go @@ -0,0 +1,75 @@ +package db + +import ( + "database/sql" + + "github.com/jmoiron/sqlx" + "gitlink.org.cn/cloudream/common/utils/math" +) + +const ( + maxPlaceholderCount = 65535 +) + +func BatchNamedExec[T any](ctx SQLContext, sql string, argCnt int, arr []T, callback func(sql.Result) bool) error { + if argCnt == 0 { + ret, err := ctx.NamedExec(sql, arr) + if err != nil { + return err + } + + if callback != nil { + callback(ret) + } + + return nil + } + + batchSize := maxPlaceholderCount / argCnt + for len(arr) > 0 { + curBatchSize := math.Min(batchSize, len(arr)) + + ret, err := ctx.NamedExec(sql, arr[:curBatchSize]) + if err != nil { + return nil + } + if callback != nil && !callback(ret) { + return nil + } + + arr = arr[curBatchSize:] + } + + return nil +} + +func BatchNamedQuery[T any](ctx SQLContext, sql string, argCnt int, arr []T, callback func(*sqlx.Rows) bool) error { + if argCnt == 0 { + ret, err := ctx.NamedQuery(sql, arr) + if err != nil { + return err + } + + if callback != nil { + callback(ret) + } + + return nil + } + + batchSize := maxPlaceholderCount / argCnt + for len(arr) > 0 { + curBatchSize := math.Min(batchSize, len(arr)) + + ret, err := ctx.NamedQuery(sql, arr[:curBatchSize]) + if err != nil { + return nil + } + if callback != nil && !callback(ret) { + return nil + } + + arr = arr[curBatchSize:] + } + return nil +} diff --git a/common/pkgs/distlock/lockprovider/ipfs_lock.go b/common/pkgs/distlock/lockprovider/ipfs_lock.go index 3872982..19de3a5 100644 --- a/common/pkgs/distlock/lockprovider/ipfs_lock.go +++ b/common/pkgs/distlock/lockprovider/ipfs_lock.go @@ -4,7 +4,7 @@ import ( "fmt" "gitlink.org.cn/cloudream/common/pkgs/distlock" - mylo "gitlink.org.cn/cloudream/common/utils/lo" + "gitlink.org.cn/cloudream/common/utils/lo2" ) const ( @@ -129,9 +129,9 @@ func (l *IPFSNodeLock) Lock(reqID string, lock distlock.Lock) error { func (l *IPFSNodeLock) Unlock(reqID string, lock distlock.Lock) error { switch lock.Name { case IPFSBuzyLock: - l.buzyReqIDs = mylo.Remove(l.buzyReqIDs, reqID) + l.buzyReqIDs = lo2.Remove(l.buzyReqIDs, reqID) case IPFSGCLock: - l.gcReqIDs = mylo.Remove(l.gcReqIDs, reqID) + l.gcReqIDs = lo2.Remove(l.gcReqIDs, reqID) default: return fmt.Errorf("unknow lock name: %s", lock.Name) } diff --git a/common/pkgs/distlock/lockprovider/metadata_lock.go b/common/pkgs/distlock/lockprovider/metadata_lock.go index ab8a6ca..cdb6f16 100644 --- a/common/pkgs/distlock/lockprovider/metadata_lock.go +++ b/common/pkgs/distlock/lockprovider/metadata_lock.go @@ -5,7 +5,7 @@ import ( "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/distlock" - mylo "gitlink.org.cn/cloudream/common/utils/lo" + "gitlink.org.cn/cloudream/common/utils/lo2" ) const ( @@ -96,10 +96,10 @@ func (l *MetadataLock) removeElementLock(lock distlock.Lock, locks []*metadataEl return locks } - lck.requestIDs = mylo.Remove(lck.requestIDs, reqID) + lck.requestIDs = lo2.Remove(lck.requestIDs, reqID) if len(lck.requestIDs) == 0 { - locks = mylo.RemoveAt(locks, index) + locks = lo2.RemoveAt(locks, index) } return locks diff --git a/common/pkgs/distlock/lockprovider/storage_lock.go b/common/pkgs/distlock/lockprovider/storage_lock.go index df5671f..01c903f 100644 --- a/common/pkgs/distlock/lockprovider/storage_lock.go +++ b/common/pkgs/distlock/lockprovider/storage_lock.go @@ -4,7 +4,7 @@ import ( "fmt" "gitlink.org.cn/cloudream/common/pkgs/distlock" - mylo "gitlink.org.cn/cloudream/common/utils/lo" + "gitlink.org.cn/cloudream/common/utils/lo2" ) const ( @@ -129,9 +129,9 @@ func (l *StorageNodeLock) Lock(reqID string, lock distlock.Lock) error { func (l *StorageNodeLock) Unlock(reqID string, lock distlock.Lock) error { switch lock.Name { case StorageBuzyLock: - l.buzyReqIDs = mylo.Remove(l.buzyReqIDs, reqID) + l.buzyReqIDs = lo2.Remove(l.buzyReqIDs, reqID) case StorageGCLock: - l.gcReqIDs = mylo.Remove(l.gcReqIDs, reqID) + l.gcReqIDs = lo2.Remove(l.gcReqIDs, reqID) default: return fmt.Errorf("unknow lock name: %s", lock.Name) } diff --git a/common/pkgs/distlock/reqbuilder/lock_request_builder.go b/common/pkgs/distlock/reqbuilder/lock_request_builder.go index 4afb577..9acbde3 100644 --- a/common/pkgs/distlock/reqbuilder/lock_request_builder.go +++ b/common/pkgs/distlock/reqbuilder/lock_request_builder.go @@ -2,7 +2,7 @@ package reqbuilder import ( "gitlink.org.cn/cloudream/common/pkgs/distlock" - mylo "gitlink.org.cn/cloudream/common/utils/lo" + "gitlink.org.cn/cloudream/common/utils/lo2" ) type LockRequestBuilder struct { @@ -15,7 +15,7 @@ func NewBuilder() *LockRequestBuilder { func (b *LockRequestBuilder) Build() distlock.LockRequest { return distlock.LockRequest{ - Locks: mylo.ArrayClone(b.locks), + Locks: lo2.ArrayClone(b.locks), } } diff --git a/common/pkgs/grpc/agent/pool.go b/common/pkgs/grpc/agent/pool.go index 9d00619..3b06ec9 100644 --- a/common/pkgs/grpc/agent/pool.go +++ b/common/pkgs/grpc/agent/pool.go @@ -2,6 +2,7 @@ package agent import ( "fmt" + sync "sync" ) type PoolConfig struct { @@ -18,28 +19,42 @@ func (c *PoolClient) Close() { type Pool struct { grpcCfg *PoolConfig + shareds map[string]*PoolClient + lock sync.Mutex } func NewPool(grpcCfg *PoolConfig) *Pool { return &Pool{ grpcCfg: grpcCfg, + shareds: make(map[string]*PoolClient), } } // 获取一个GRPC客户端。由于事先不能知道所有agent的GRPC配置信息,所以只能让调用者把建立连接所需的配置都传递进来, // Pool来决定要不要新建客户端。 func (p *Pool) Acquire(ip string, port int) (*PoolClient, error) { - cli, err := NewClient(fmt.Sprintf("%s:%d", ip, port)) - if err != nil { - return nil, err + addr := fmt.Sprintf("%s:%d", ip, port) + + p.lock.Lock() + defer p.lock.Unlock() + + cli, ok := p.shareds[addr] + if !ok { + c, err := NewClient(addr) + if err != nil { + return nil, err + } + cli = &PoolClient{ + Client: c, + owner: p, + } + p.shareds[addr] = cli } - return &PoolClient{ - Client: cli, - owner: p, - }, nil + return cli, nil + } func (p *Pool) Release(cli *PoolClient) { - cli.Client.Close() + // TODO 释放长时间未使用的client } diff --git a/common/pkgs/ioswitch/ops/clone.go b/common/pkgs/ioswitch/ops/clone.go new file mode 100644 index 0000000..670159a --- /dev/null +++ b/common/pkgs/ioswitch/ops/clone.go @@ -0,0 +1,43 @@ +package ops + +import ( + "io" + "sync" + + myio "gitlink.org.cn/cloudream/common/utils/io" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" +) + +type Clone struct { + InputID ioswitch.StreamID `json:"inputID"` + OutputIDs []ioswitch.StreamID `json:"outputIDs"` +} + +func (o *Clone) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { + strs, err := sw.WaitStreams(planID, o.InputID) + if err != nil { + return err + } + defer strs[0].Stream.Close() + + wg := sync.WaitGroup{} + cloned := myio.Clone(strs[0].Stream, len(o.OutputIDs)) + for i, s := range cloned { + wg.Add(1) + + sw.StreamReady(planID, + ioswitch.NewStream(o.OutputIDs[i], + myio.AfterReadClosedOnce(s, func(closer io.ReadCloser) { + wg.Done() + }), + ), + ) + } + + wg.Wait() + return nil +} + +func init() { + OpUnion.AddT((*Clone)(nil)) +} diff --git a/common/pkgs/ioswitch/ops/file.go b/common/pkgs/ioswitch/ops/file.go index 0219a53..8f9adcd 100644 --- a/common/pkgs/ioswitch/ops/file.go +++ b/common/pkgs/ioswitch/ops/file.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "os" + "path" "gitlink.org.cn/cloudream/common/pkgs/future" myio "gitlink.org.cn/cloudream/common/utils/io" @@ -23,6 +24,12 @@ func (o *FileWrite) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { } defer str[0].Stream.Close() + dir := path.Dir(o.FilePath) + err = os.MkdirAll(dir, 0777) + if err != nil { + return fmt.Errorf("mkdir: %w", err) + } + file, err := os.Create(o.FilePath) if err != nil { return fmt.Errorf("opening file: %w", err) diff --git a/common/pkgs/ioswitch/plans/agent_plan.go b/common/pkgs/ioswitch/plans/agent_plan.go index 1f78a37..9c75a8a 100644 --- a/common/pkgs/ioswitch/plans/agent_plan.go +++ b/common/pkgs/ioswitch/plans/agent_plan.go @@ -243,3 +243,24 @@ func (b *AgentPlanBuilder) ChunkedJoin(chunkSize int, streams ...*AgentStream) * return agtStr } + +func (s *AgentStream) Clone(cnt int) *MultiStream { + mstr := &MultiStream{} + + var outputStrIDs []ioswitch.StreamID + for i := 0; i < cnt; i++ { + info := s.owner.owner.newStream() + mstr.Streams = append(mstr.Streams, &AgentStream{ + owner: s.owner, + info: info, + }) + outputStrIDs = append(outputStrIDs, info.ID) + } + + s.owner.ops = append(s.owner.ops, &ops.Clone{ + InputID: s.info.ID, + OutputIDs: outputStrIDs, + }) + + return mstr +} diff --git a/common/pkgs/ioswitch/switch.go b/common/pkgs/ioswitch/switch.go index aa1c584..682a21a 100644 --- a/common/pkgs/ioswitch/switch.go +++ b/common/pkgs/ioswitch/switch.go @@ -8,7 +8,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/future" "gitlink.org.cn/cloudream/common/pkgs/logger" - "gitlink.org.cn/cloudream/common/utils/lo" + "gitlink.org.cn/cloudream/common/utils/lo2" ) var ErrPlanFinished = errors.New("plan is finished") @@ -232,7 +232,7 @@ func (s *Switch) StreamReady(planID PlanID, stream Stream) { return } - plan.waittings = lo.RemoveAt(plan.waittings, i) + plan.waittings = lo2.RemoveAt(plan.waittings, i) wa.Complete() return } diff --git a/common/pkgs/iterator/download_object_iterator.go b/common/pkgs/iterator/download_object_iterator.go index 722b653..792f9cc 100644 --- a/common/pkgs/iterator/download_object_iterator.go +++ b/common/pkgs/iterator/download_object_iterator.go @@ -13,7 +13,7 @@ import ( cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" myio "gitlink.org.cn/cloudream/common/utils/io" - mysort "gitlink.org.cn/cloudream/common/utils/sort" + "gitlink.org.cn/cloudream/common/utils/sort2" "gitlink.org.cn/cloudream/storage/common/consts" stgglb "gitlink.org.cn/cloudream/storage/common/globals" stgmod "gitlink.org.cn/cloudream/storage/common/models" @@ -46,8 +46,11 @@ type DownloadObjectIterator struct { objectDetails []stgmodels.ObjectDetail currentIndex int + inited bool downloadCtx *DownloadContext + coorCli *coormq.Client + allNodes map[cdssdk.NodeID]cdssdk.Node } func NewDownloadObjectIterator(objectDetails []stgmodels.ObjectDetail, downloadCtx *DownloadContext) *DownloadObjectIterator { @@ -58,27 +61,60 @@ func NewDownloadObjectIterator(objectDetails []stgmodels.ObjectDetail, downloadC } func (i *DownloadObjectIterator) MoveNext() (*IterDownloadingObject, error) { - coorCli, err := stgglb.CoordinatorMQPool.Acquire() - if err != nil { - return nil, fmt.Errorf("new coordinator client: %w", err) + if !i.inited { + if err := i.init(); err != nil { + return nil, err + } + + i.inited = true } - defer stgglb.CoordinatorMQPool.Release(coorCli) if i.currentIndex >= len(i.objectDetails) { return nil, ErrNoMoreItem } - item, err := i.doMove(coorCli) + item, err := i.doMove() i.currentIndex++ return item, err } -func (iter *DownloadObjectIterator) doMove(coorCli *coormq.Client) (*IterDownloadingObject, error) { +func (i *DownloadObjectIterator) init() error { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return fmt.Errorf("new coordinator client: %w", err) + } + i.coorCli = coorCli + + allNodeIDs := make(map[cdssdk.NodeID]bool) + for _, obj := range i.objectDetails { + for _, p := range obj.PinnedAt { + allNodeIDs[p] = true + } + + for _, b := range obj.Blocks { + allNodeIDs[b.NodeID] = true + } + } + + getNodes, err := coorCli.GetNodes(coormq.NewGetNodes(lo.Keys(allNodeIDs))) + if err != nil { + return fmt.Errorf("getting nodes: %w", err) + } + + i.allNodes = make(map[cdssdk.NodeID]cdssdk.Node) + for _, n := range getNodes.Nodes { + i.allNodes[n.NodeID] = n + } + + return nil +} + +func (iter *DownloadObjectIterator) doMove() (*IterDownloadingObject, error) { obj := iter.objectDetails[iter.currentIndex] switch red := obj.Object.Redundancy.(type) { case *cdssdk.NoneRedundancy: - reader, err := iter.downloadNoneOrRepObject(coorCli, iter.downloadCtx, obj) + reader, err := iter.downloadNoneOrRepObject(obj) if err != nil { return nil, fmt.Errorf("downloading object: %w", err) } @@ -89,7 +125,7 @@ func (iter *DownloadObjectIterator) doMove(coorCli *coormq.Client) (*IterDownloa }, nil case *cdssdk.RepRedundancy: - reader, err := iter.downloadNoneOrRepObject(coorCli, iter.downloadCtx, obj) + reader, err := iter.downloadNoneOrRepObject(obj) if err != nil { return nil, fmt.Errorf("downloading rep object: %w", err) } @@ -100,7 +136,7 @@ func (iter *DownloadObjectIterator) doMove(coorCli *coormq.Client) (*IterDownloa }, nil case *cdssdk.ECRedundancy: - reader, err := iter.downloadECObject(coorCli, iter.downloadCtx, obj, red) + reader, err := iter.downloadECObject(obj, red) if err != nil { return nil, fmt.Errorf("downloading ec object: %w", err) } @@ -120,15 +156,15 @@ func (i *DownloadObjectIterator) Close() { } } -func (iter *DownloadObjectIterator) downloadNoneOrRepObject(coorCli *coormq.Client, ctx *DownloadContext, obj stgmodels.ObjectDetail) (io.ReadCloser, error) { - allNodes, err := iter.sortDownloadNodes(coorCli, ctx, obj) +func (iter *DownloadObjectIterator) downloadNoneOrRepObject(obj stgmodels.ObjectDetail) (io.ReadCloser, error) { + allNodes, err := iter.sortDownloadNodes(obj) if err != nil { return nil, err } bsc, blocks := iter.getMinReadingBlockSolution(allNodes, 1) osc, node := iter.getMinReadingObjectSolution(allNodes, 1) if bsc < osc { - return downloadFile(ctx, blocks[0].Node, blocks[0].Block.FileHash) + return downloadFile(iter.downloadCtx, blocks[0].Node, blocks[0].Block.FileHash) } // bsc >= osc,如果osc是MaxFloat64,那么bsc也一定是,也就意味着没有足够块来恢复文件 @@ -136,11 +172,11 @@ func (iter *DownloadObjectIterator) downloadNoneOrRepObject(coorCli *coormq.Clie return nil, fmt.Errorf("no node has this object") } - return downloadFile(ctx, *node, obj.Object.FileHash) + return downloadFile(iter.downloadCtx, *node, obj.Object.FileHash) } -func (iter *DownloadObjectIterator) downloadECObject(coorCli *coormq.Client, ctx *DownloadContext, obj stgmodels.ObjectDetail, ecRed *cdssdk.ECRedundancy) (io.ReadCloser, error) { - allNodes, err := iter.sortDownloadNodes(coorCli, ctx, obj) +func (iter *DownloadObjectIterator) downloadECObject(obj stgmodels.ObjectDetail, ecRed *cdssdk.ECRedundancy) (io.ReadCloser, error) { + allNodes, err := iter.sortDownloadNodes(obj) if err != nil { return nil, err } @@ -155,7 +191,7 @@ func (iter *DownloadObjectIterator) downloadECObject(coorCli *coormq.Client, ctx } for i, b := range blocks { - str, err := downloadFile(ctx, b.Node, b.Block.FileHash) + str, err := downloadFile(iter.downloadCtx, b.Node, b.Block.FileHash) if err != nil { for i -= 1; i >= 0; i-- { fileStrs[i].Close() @@ -185,10 +221,10 @@ func (iter *DownloadObjectIterator) downloadECObject(coorCli *coormq.Client, ctx return nil, fmt.Errorf("no enough blocks to reconstruct the file, want %d, get only %d", ecRed.K, len(blocks)) } - return downloadFile(ctx, *node, obj.Object.FileHash) + return downloadFile(iter.downloadCtx, *node, obj.Object.FileHash) } -func (iter *DownloadObjectIterator) sortDownloadNodes(coorCli *coormq.Client, ctx *DownloadContext, obj stgmodels.ObjectDetail) ([]*DownloadNodeInfo, error) { +func (iter *DownloadObjectIterator) sortDownloadNodes(obj stgmodels.ObjectDetail) ([]*DownloadNodeInfo, error) { var nodeIDs []cdssdk.NodeID for _, id := range obj.PinnedAt { if !lo.Contains(nodeIDs, id) { @@ -201,16 +237,11 @@ func (iter *DownloadObjectIterator) sortDownloadNodes(coorCli *coormq.Client, ct } } - getNodes, err := coorCli.GetNodes(coormq.NewGetNodes(nodeIDs)) - if err != nil { - return nil, fmt.Errorf("getting nodes: %w", err) - } - downloadNodeMap := make(map[cdssdk.NodeID]*DownloadNodeInfo) for _, id := range obj.PinnedAt { node, ok := downloadNodeMap[id] if !ok { - mod := *getNodes.GetNode(id) + mod := iter.allNodes[id] node = &DownloadNodeInfo{ Node: mod, ObjectPinned: true, @@ -225,7 +256,7 @@ func (iter *DownloadObjectIterator) sortDownloadNodes(coorCli *coormq.Client, ct for _, b := range obj.Blocks { node, ok := downloadNodeMap[b.NodeID] if !ok { - mod := *getNodes.GetNode(b.NodeID) + mod := iter.allNodes[b.NodeID] node = &DownloadNodeInfo{ Node: mod, Distance: iter.getNodeDistance(mod), @@ -236,8 +267,8 @@ func (iter *DownloadObjectIterator) sortDownloadNodes(coorCli *coormq.Client, ct node.Blocks = append(node.Blocks, b) } - return mysort.Sort(lo.Values(downloadNodeMap), func(left, right *DownloadNodeInfo) int { - return mysort.Cmp(left.Distance, right.Distance) + return sort2.Sort(lo.Values(downloadNodeMap), func(left, right *DownloadNodeInfo) int { + return sort2.Cmp(left.Distance, right.Distance) }), nil } diff --git a/common/pkgs/mq/agent/client.go b/common/pkgs/mq/agent/client.go index d984e47..016c0fe 100644 --- a/common/pkgs/mq/agent/client.go +++ b/common/pkgs/mq/agent/client.go @@ -1,6 +1,8 @@ package agent import ( + "sync" + "gitlink.org.cn/cloudream/common/pkgs/mq" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" stgmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq" @@ -33,18 +35,34 @@ type Pool interface { } type pool struct { - mqcfg *stgmq.Config + mqcfg *stgmq.Config + shareds map[cdssdk.NodeID]*Client + lock sync.Mutex } func NewPool(mqcfg *stgmq.Config) Pool { return &pool{ - mqcfg: mqcfg, + mqcfg: mqcfg, + shareds: make(map[cdssdk.NodeID]*Client), } } func (p *pool) Acquire(id cdssdk.NodeID) (*Client, error) { - return NewClient(id, p.mqcfg) + p.lock.Lock() + defer p.lock.Unlock() + + cli, ok := p.shareds[id] + if !ok { + var err error + cli, err = NewClient(id, p.mqcfg) + if err != nil { + return nil, err + } + p.shareds[id] = cli + } + + return cli, nil } func (p *pool) Release(cli *Client) { - cli.Close() + // TODO 定时关闭 } diff --git a/common/pkgs/mq/agent/object.go b/common/pkgs/mq/agent/object.go index a3b56ac..fd1b952 100644 --- a/common/pkgs/mq/agent/object.go +++ b/common/pkgs/mq/agent/object.go @@ -11,16 +11,16 @@ var _ = Register(Service.PinObject) type PinObject struct { mq.MessageBodyBase - FileHash string `json:"fileHash"` - IsBackground bool `json:"isBackground"` + FileHashes []string `json:"fileHashes"` + IsBackground bool `json:"isBackground"` } type PinObjectResp struct { mq.MessageBodyBase } -func ReqPinObject(fileHash string, isBackground bool) *PinObject { +func ReqPinObject(fileHashes []string, isBackground bool) *PinObject { return &PinObject{ - FileHash: fileHash, + FileHashes: fileHashes, IsBackground: isBackground, } } diff --git a/common/pkgs/mq/coordinator/cache.go b/common/pkgs/mq/coordinator/cache.go index 92acbf6..ff75895 100644 --- a/common/pkgs/mq/coordinator/cache.go +++ b/common/pkgs/mq/coordinator/cache.go @@ -7,6 +7,8 @@ import ( type CacheService interface { CachePackageMoved(msg *CachePackageMoved) (*CachePackageMovedResp, *mq.CodeMessage) + + CacheRemovePackage(msg *CacheRemovePackage) (*CacheRemovePackageResp, *mq.CodeMessage) } // Package的Object移动到了节点的Cache中 @@ -33,3 +35,28 @@ func NewCachePackageMovedResp() *CachePackageMovedResp { func (client *Client) CachePackageMoved(msg *CachePackageMoved) (*CachePackageMovedResp, error) { return mq.Request(Service.CachePackageMoved, client.rabbitCli, msg) } + +// 删除移动到指定节点Cache中的Package +var _ = Register(Service.CacheRemovePackage) + +type CacheRemovePackage struct { + mq.MessageBodyBase + PackageID cdssdk.PackageID `json:"packageID"` + NodeID cdssdk.NodeID `json:"nodeID"` +} +type CacheRemovePackageResp struct { + mq.MessageBodyBase +} + +func ReqCacheRemoveMovedPackage(packageID cdssdk.PackageID, nodeID cdssdk.NodeID) *CacheRemovePackage { + return &CacheRemovePackage{ + PackageID: packageID, + NodeID: nodeID, + } +} +func RespCacheRemovePackage() *CacheRemovePackageResp { + return &CacheRemovePackageResp{} +} +func (client *Client) CacheRemovePackage(msg *CacheRemovePackage) (*CacheRemovePackageResp, error) { + return mq.Request(Service.CacheRemovePackage, client.rabbitCli, msg) +} diff --git a/common/pkgs/mq/coordinator/client.go b/common/pkgs/mq/coordinator/client.go index 8d25532..84a4f28 100644 --- a/common/pkgs/mq/coordinator/client.go +++ b/common/pkgs/mq/coordinator/client.go @@ -1,6 +1,8 @@ package coordinator import ( + "sync" + "gitlink.org.cn/cloudream/common/pkgs/mq" stgmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq" ) @@ -30,7 +32,9 @@ type Pool interface { } type pool struct { - mqcfg *stgmq.Config + mqcfg *stgmq.Config + shared *Client + lock sync.Mutex } func NewPool(mqcfg *stgmq.Config) Pool { @@ -39,9 +43,18 @@ func NewPool(mqcfg *stgmq.Config) Pool { } } func (p *pool) Acquire() (*Client, error) { - return NewClient(p.mqcfg) + p.lock.Lock() + defer p.lock.Unlock() + if p.shared == nil { + var err error + p.shared, err = NewClient(p.mqcfg) + if err != nil { + return nil, err + } + } + + return p.shared, nil } func (p *pool) Release(cli *Client) { - cli.Close() } diff --git a/common/pkgs/mq/coordinator/object.go b/common/pkgs/mq/coordinator/object.go index 6c665d6..45b5b24 100644 --- a/common/pkgs/mq/coordinator/object.go +++ b/common/pkgs/mq/coordinator/object.go @@ -81,8 +81,8 @@ type ChangeObjectRedundancyResp struct { mq.MessageBodyBase } type ChangeObjectRedundancyEntry struct { - ObjectID cdssdk.ObjectID `json:"objectID"` - Redundancy cdssdk.Redundancy `json:"redundancy"` + ObjectID cdssdk.ObjectID `json:"objectID" db:"ObjectID"` + Redundancy cdssdk.Redundancy `json:"redundancy" db:"Redundancy"` PinnedAt []cdssdk.NodeID `json:"pinnedAt"` Blocks []stgmod.ObjectBlock `json:"blocks"` } diff --git a/common/pkgs/mq/coordinator/package.go b/common/pkgs/mq/coordinator/package.go index 52c51f6..e831069 100644 --- a/common/pkgs/mq/coordinator/package.go +++ b/common/pkgs/mq/coordinator/package.go @@ -79,7 +79,7 @@ func (client *Client) CreatePackage(msg *CreatePackage) (*CreatePackageResp, err return mq.Request(Service.CreatePackage, client.rabbitCli, msg) } -// 更新EC备份模式的Package +// 更新Package var _ = Register(Service.UpdatePackage) type UpdatePackage struct { @@ -116,7 +116,7 @@ func NewAddObjectEntry(path string, size int64, fileHash string, nodeIDs cdssdk. NodeID: nodeIDs, } } -func (client *Client) UpdateECPackage(msg *UpdatePackage) (*UpdatePackageResp, error) { +func (client *Client) UpdatePackage(msg *UpdatePackage) (*UpdatePackageResp, error) { return mq.Request(Service.UpdatePackage, client.rabbitCli, msg) } diff --git a/common/pkgs/mq/scanner/client.go b/common/pkgs/mq/scanner/client.go index 8cb70c3..156a16d 100644 --- a/common/pkgs/mq/scanner/client.go +++ b/common/pkgs/mq/scanner/client.go @@ -1,6 +1,8 @@ package scanner import ( + "sync" + "gitlink.org.cn/cloudream/common/pkgs/mq" stgmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq" ) @@ -30,7 +32,9 @@ type Pool interface { } type pool struct { - mqcfg *stgmq.Config + mqcfg *stgmq.Config + shared *Client + lock sync.Mutex } func NewPool(mqcfg *stgmq.Config) Pool { @@ -39,9 +43,18 @@ func NewPool(mqcfg *stgmq.Config) Pool { } } func (p *pool) Acquire() (*Client, error) { - return NewClient(p.mqcfg) + p.lock.Lock() + defer p.lock.Unlock() + if p.shared == nil { + var err error + p.shared, err = NewClient(p.mqcfg) + if err != nil { + return nil, err + } + } + + return p.shared, nil } func (p *pool) Release(cli *Client) { - cli.Close() } diff --git a/coordinator/internal/mq/cache.go b/coordinator/internal/mq/cache.go index 215f8a0..0e097f8 100644 --- a/coordinator/internal/mq/cache.go +++ b/coordinator/internal/mq/cache.go @@ -37,3 +37,30 @@ func (svc *Service) CachePackageMoved(msg *coormq.CachePackageMoved) (*coormq.Ca return mq.ReplyOK(coormq.NewCachePackageMovedResp()) } + +func (svc *Service) CacheRemovePackage(msg *coormq.CacheRemovePackage) (*coormq.CacheRemovePackageResp, *mq.CodeMessage) { + err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { + _, err := svc.db.Package().GetByID(tx, msg.PackageID) + if err != nil { + return fmt.Errorf("getting package by id: %w", err) + } + + _, err = svc.db.Node().GetByID(tx, msg.NodeID) + if err != nil { + return fmt.Errorf("getting node by id: %w", err) + } + + err = svc.db.PinnedObject().DeleteInPackageAtNode(tx, msg.PackageID, msg.NodeID) + if err != nil { + return fmt.Errorf("delete pinned objects in package at node: %w", err) + } + + return nil + }) + if err != nil { + logger.WithField("PackageID", msg.PackageID).WithField("NodeID", msg.NodeID).Warn(err.Error()) + return nil, mq.Failed(errorcode.OperationFailed, "remove pinned package failed") + } + + return mq.ReplyOK(coormq.RespCacheRemovePackage()) +} diff --git a/coordinator/internal/mq/storage.go b/coordinator/internal/mq/storage.go index 06ec0f6..6060bd0 100644 --- a/coordinator/internal/mq/storage.go +++ b/coordinator/internal/mq/storage.go @@ -34,7 +34,7 @@ func (svc *Service) StoragePackageLoaded(msg *coormq.StoragePackageLoaded) (*coo return fmt.Errorf("storage is not available to user") } - err := svc.db.StoragePackage().Create(tx, msg.StorageID, msg.PackageID, msg.UserID) + err := svc.db.StoragePackage().CreateOrUpdate(tx, msg.StorageID, msg.PackageID, msg.UserID) if err != nil { return fmt.Errorf("creating storage package: %w", err) } diff --git a/scanner/internal/event/agent_check_cache.go b/scanner/internal/event/agent_check_cache.go index 6f9084e..7f4cd45 100644 --- a/scanner/internal/event/agent_check_cache.go +++ b/scanner/internal/event/agent_check_cache.go @@ -105,7 +105,7 @@ func (t *AgentCheckCache) checkCache(execCtx ExecuteContext, tx *sqlx.Tx, realFi } if len(realFileHashesCp) > 0 { - err = execCtx.Args.DB.Cache().BatchCreate(tx, lo.Keys(realFileHashesCp), t.NodeID, 0) + err = execCtx.Args.DB.Cache().BatchCreateOnSameNode(tx, lo.Keys(realFileHashesCp), t.NodeID, 0) if err != nil { log.Warnf("batch create node caches: %w", err) return diff --git a/scanner/internal/event/check_package_redundancy.go b/scanner/internal/event/check_package_redundancy.go index 357218a..b1cc02c 100644 --- a/scanner/internal/event/check_package_redundancy.go +++ b/scanner/internal/event/check_package_redundancy.go @@ -8,7 +8,7 @@ import ( "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/common/utils/sort" + "gitlink.org.cn/cloudream/common/utils/sort2" stgglb "gitlink.org.cn/cloudream/storage/common/globals" stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" @@ -73,7 +73,8 @@ func (t *CheckPackageRedundancy) Execute(execCtx ExecuteContext) { return } - getNodes, err := coorCli.GetNodes(coormq.NewGetNodes(nil)) + // TODO UserID + getNodes, err := coorCli.GetUserNodes(coormq.NewGetUserNodes(0)) if err != nil { log.Warnf("getting all nodes: %s", err.Error()) return @@ -110,7 +111,10 @@ func (t *CheckPackageRedundancy) Execute(execCtx ExecuteContext) { defRep := cdssdk.DefaultRepRedundancy defEC := cdssdk.DefaultECRedundancy + // TODO 目前rep的备份数量固定为2,所以这里直接选出两个节点 + mostBlockNodeIDs := t.summaryRepObjectBlockNodes(getObjs.Objects, 2) newRepNodes := t.chooseNewNodesForRep(&defRep, allNodes) + rechoosedRepNodes := t.rechooseNodesForRep(mostBlockNodeIDs, &defRep, allNodes) newECNodes := t.chooseNewNodesForEC(&defEC, allNodes) // 加锁 @@ -149,8 +153,7 @@ func (t *CheckPackageRedundancy) Execute(execCtx ExecuteContext) { log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: rep -> ec") entry, err = t.repToEC(obj, &defEC, newECNodes) } else { - uploadNodes := t.rechooseNodesForRep(obj, red, allNodes) - entry, err = t.repToRep(obj, &defRep, uploadNodes) + entry, err = t.repToRep(obj, &defRep, rechoosedRepNodes) } case *cdssdk.ECRedundancy: @@ -183,8 +186,43 @@ func (t *CheckPackageRedundancy) Execute(execCtx ExecuteContext) { } } +// 统计每个对象块所在的节点,选出块最多的不超过nodeCnt个节点 +func (t *CheckPackageRedundancy) summaryRepObjectBlockNodes(objs []stgmod.ObjectDetail, nodeCnt int) []cdssdk.NodeID { + type nodeBlocks struct { + NodeID cdssdk.NodeID + Count int + } + + nodeBlocksMap := make(map[cdssdk.NodeID]*nodeBlocks) + for _, obj := range objs { + shouldUseEC := obj.Object.Size > config.Cfg().ECFileSizeThreshold + if _, ok := obj.Object.Redundancy.(*cdssdk.RepRedundancy); ok && !shouldUseEC { + for _, block := range obj.Blocks { + if _, ok := nodeBlocksMap[block.NodeID]; !ok { + nodeBlocksMap[block.NodeID] = &nodeBlocks{ + NodeID: block.NodeID, + Count: 0, + } + } + nodeBlocksMap[block.NodeID].Count++ + } + } + } + + nodes := lo.Values(nodeBlocksMap) + sort2.Sort(nodes, func(left *nodeBlocks, right *nodeBlocks) int { + return right.Count - left.Count + }) + + ids := lo.Map(nodes, func(item *nodeBlocks, idx int) cdssdk.NodeID { return item.NodeID }) + if len(ids) > nodeCnt { + ids = ids[:nodeCnt] + } + return ids +} + func (t *CheckPackageRedundancy) chooseNewNodesForRep(red *cdssdk.RepRedundancy, allNodes map[cdssdk.NodeID]*NodeLoadInfo) []*NodeLoadInfo { - sortedNodes := sort.Sort(lo.Values(allNodes), func(left *NodeLoadInfo, right *NodeLoadInfo) int { + sortedNodes := sort2.Sort(lo.Values(allNodes), func(left *NodeLoadInfo, right *NodeLoadInfo) int { dm := right.LoadsRecentMonth - left.LoadsRecentMonth if dm != 0 { return dm @@ -197,7 +235,7 @@ func (t *CheckPackageRedundancy) chooseNewNodesForRep(red *cdssdk.RepRedundancy, } func (t *CheckPackageRedundancy) chooseNewNodesForEC(red *cdssdk.ECRedundancy, allNodes map[cdssdk.NodeID]*NodeLoadInfo) []*NodeLoadInfo { - sortedNodes := sort.Sort(lo.Values(allNodes), func(left *NodeLoadInfo, right *NodeLoadInfo) int { + sortedNodes := sort2.Sort(lo.Values(allNodes), func(left *NodeLoadInfo, right *NodeLoadInfo) int { dm := right.LoadsRecentMonth - left.LoadsRecentMonth if dm != 0 { return dm @@ -209,36 +247,36 @@ func (t *CheckPackageRedundancy) chooseNewNodesForEC(red *cdssdk.ECRedundancy, a return t.chooseSoManyNodes(red.N, sortedNodes) } -func (t *CheckPackageRedundancy) rechooseNodesForRep(obj stgmod.ObjectDetail, red *cdssdk.RepRedundancy, allNodes map[cdssdk.NodeID]*NodeLoadInfo) []*NodeLoadInfo { +func (t *CheckPackageRedundancy) rechooseNodesForRep(mostBlockNodeIDs []cdssdk.NodeID, red *cdssdk.RepRedundancy, allNodes map[cdssdk.NodeID]*NodeLoadInfo) []*NodeLoadInfo { type rechooseNode struct { *NodeLoadInfo - CachedBlockIndex int + HasBlock bool } var rechooseNodes []*rechooseNode for _, node := range allNodes { - cachedBlockIndex := -1 - for _, block := range obj.Blocks { - if block.NodeID == node.Node.NodeID { - cachedBlockIndex = block.Index + hasBlock := false + for _, id := range mostBlockNodeIDs { + if id == node.Node.NodeID { + hasBlock = true break } } rechooseNodes = append(rechooseNodes, &rechooseNode{ - NodeLoadInfo: node, - CachedBlockIndex: cachedBlockIndex, + NodeLoadInfo: node, + HasBlock: hasBlock, }) } - sortedNodes := sort.Sort(rechooseNodes, func(left *rechooseNode, right *rechooseNode) int { + sortedNodes := sort2.Sort(rechooseNodes, func(left *rechooseNode, right *rechooseNode) int { dm := right.LoadsRecentMonth - left.LoadsRecentMonth if dm != 0 { return dm } // 已经缓存了文件块的节点优先选择 - v := sort.CmpBool(right.CachedBlockIndex > -1, left.CachedBlockIndex > -1) + v := sort2.CmpBool(right.HasBlock, left.HasBlock) if v != 0 { return v } @@ -271,14 +309,14 @@ func (t *CheckPackageRedundancy) rechooseNodesForEC(obj stgmod.ObjectDetail, red }) } - sortedNodes := sort.Sort(rechooseNodes, func(left *rechooseNode, right *rechooseNode) int { + sortedNodes := sort2.Sort(rechooseNodes, func(left *rechooseNode, right *rechooseNode) int { dm := right.LoadsRecentMonth - left.LoadsRecentMonth if dm != 0 { return dm } // 已经缓存了文件块的节点优先选择 - v := sort.CmpBool(right.CachedBlockIndex > -1, left.CachedBlockIndex > -1) + v := sort2.CmpBool(right.CachedBlockIndex > -1, left.CachedBlockIndex > -1) if v != 0 { return v } @@ -627,7 +665,7 @@ func (t *CheckPackageRedundancy) pinObject(nodeID cdssdk.NodeID, fileHash string } defer stgglb.AgentMQPool.Release(agtCli) - _, err = agtCli.PinObject(agtmq.ReqPinObject(fileHash, false)) + _, err = agtCli.PinObject(agtmq.ReqPinObject([]string{fileHash}, false)) if err != nil { return fmt.Errorf("start pinning object: %w", err) } diff --git a/scanner/internal/event/clean_pinned.go b/scanner/internal/event/clean_pinned.go index efa6d1b..4957770 100644 --- a/scanner/internal/event/clean_pinned.go +++ b/scanner/internal/event/clean_pinned.go @@ -4,21 +4,21 @@ import ( "fmt" "math" "math/rand" - "strconv" + "sync" "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/bitmap" "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - mylo "gitlink.org.cn/cloudream/common/utils/lo" + "gitlink.org.cn/cloudream/common/utils/lo2" mymath "gitlink.org.cn/cloudream/common/utils/math" - myref "gitlink.org.cn/cloudream/common/utils/reflect" - mysort "gitlink.org.cn/cloudream/common/utils/sort" + "gitlink.org.cn/cloudream/common/utils/sort2" "gitlink.org.cn/cloudream/storage/common/consts" stgglb "gitlink.org.cn/cloudream/storage/common/globals" stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/plans" + agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event" ) @@ -67,20 +67,83 @@ func (t *CleanPinned) Execute(execCtx ExecuteContext) { } readerNodeIDs := lo.Map(getLoadLog.Logs, func(item coormq.PackageLoadLogDetail, idx int) cdssdk.NodeID { return item.Storage.NodeID }) - var changeRedEntries []coormq.ChangeObjectRedundancyEntry + // 注意!需要保证allNodeID包含所有之后可能用到的节点ID + // TOOD 可以考虑设计Cache机制 + var allNodeID []cdssdk.NodeID for _, obj := range getObjs.Objects { - entry, err := t.doOne(execCtx, readerNodeIDs, coorCli, obj) - if err != nil { - log.WithField("PackageID", obj).Warn(err.Error()) - continue + for _, block := range obj.Blocks { + allNodeID = append(allNodeID, block.NodeID) } - if entry != nil { - changeRedEntries = append(changeRedEntries, *entry) + allNodeID = append(allNodeID, obj.PinnedAt...) + } + allNodeID = append(allNodeID, readerNodeIDs...) + + getNodeResp, err := coorCli.GetNodes(coormq.NewGetNodes(lo.Union(allNodeID))) + if err != nil { + log.Warnf("getting nodes: %s", err.Error()) + return + } + + allNodeInfos := make(map[cdssdk.NodeID]*cdssdk.Node) + for _, node := range getNodeResp.Nodes { + n := node + allNodeInfos[node.NodeID] = &n + } + + // 只对ec和rep对象进行处理 + var ecObjects []stgmod.ObjectDetail + var repObjects []stgmod.ObjectDetail + for _, obj := range getObjs.Objects { + if _, ok := obj.Object.Redundancy.(*cdssdk.ECRedundancy); ok { + ecObjects = append(ecObjects, obj) + } else if _, ok := obj.Object.Redundancy.(*cdssdk.RepRedundancy); ok { + repObjects = append(repObjects, obj) } } - if len(changeRedEntries) > 0 { - _, err = coorCli.ChangeObjectRedundancy(coormq.ReqChangeObjectRedundancy(changeRedEntries)) + planBld := plans.NewPlanBuilder() + pinPlans := make(map[cdssdk.NodeID]*[]string) + + // 对于rep对象,统计出所有对象块分布最多的两个节点,用这两个节点代表所有rep对象块的分布,去进行退火算法 + var repObjectsUpdateEntries []coormq.ChangeObjectRedundancyEntry + repMostNodeIDs := t.summaryRepObjectBlockNodes(repObjects) + solu := t.startAnnealing(allNodeInfos, readerNodeIDs, annealingObject{ + totalBlockCount: 1, + minBlockCnt: 1, + pinnedAt: repMostNodeIDs, + blocks: nil, + }) + for _, obj := range repObjects { + repObjectsUpdateEntries = append(repObjectsUpdateEntries, t.makePlansForRepObject(solu, obj, pinPlans)) + } + + // 对于ec对象,则每个对象单独进行退火算法 + var ecObjectsUpdateEntries []coormq.ChangeObjectRedundancyEntry + for _, obj := range ecObjects { + ecRed := obj.Object.Redundancy.(*cdssdk.ECRedundancy) + solu := t.startAnnealing(allNodeInfos, readerNodeIDs, annealingObject{ + totalBlockCount: ecRed.N, + minBlockCnt: ecRed.K, + pinnedAt: obj.PinnedAt, + blocks: obj.Blocks, + }) + ecObjectsUpdateEntries = append(ecObjectsUpdateEntries, t.makePlansForECObject(allNodeInfos, solu, obj, &planBld)) + } + + ioSwRets, err := t.executePlans(execCtx, pinPlans, &planBld) + if err != nil { + log.Warn(err.Error()) + return + } + + // 根据按照方案进行调整的结果,填充更新元数据的命令 + for i := range ecObjectsUpdateEntries { + t.populateECObjectEntry(&ecObjectsUpdateEntries[i], ecObjects[i], ioSwRets) + } + + finalEntries := append(repObjectsUpdateEntries, ecObjectsUpdateEntries...) + if len(finalEntries) > 0 { + _, err = coorCli.ChangeObjectRedundancy(coormq.ReqChangeObjectRedundancy(finalEntries)) if err != nil { log.Warnf("changing object redundancy: %s", err.Error()) return @@ -88,15 +151,64 @@ func (t *CleanPinned) Execute(execCtx ExecuteContext) { } } -type doingContext struct { - execCtx ExecuteContext - readerNodeIDs []cdssdk.NodeID // 近期可能访问此对象的节点 - nodesSortedByReader map[cdssdk.NodeID][]nodeDist // 拥有数据的节点到每个可能访问对象的节点按距离排序 - nodeInfos map[cdssdk.NodeID]*cdssdk.Node +func (t *CleanPinned) summaryRepObjectBlockNodes(objs []stgmod.ObjectDetail) []cdssdk.NodeID { + type nodeBlocks struct { + NodeID cdssdk.NodeID + Count int + } + + nodeBlocksMap := make(map[cdssdk.NodeID]*nodeBlocks) + for _, obj := range objs { + cacheBlockNodes := make(map[cdssdk.NodeID]bool) + for _, block := range obj.Blocks { + if _, ok := nodeBlocksMap[block.NodeID]; !ok { + nodeBlocksMap[block.NodeID] = &nodeBlocks{ + NodeID: block.NodeID, + Count: 0, + } + } + nodeBlocksMap[block.NodeID].Count++ + cacheBlockNodes[block.NodeID] = true + } + + for _, nodeID := range obj.PinnedAt { + if cacheBlockNodes[nodeID] { + continue + } + + if _, ok := nodeBlocksMap[nodeID]; !ok { + nodeBlocksMap[nodeID] = &nodeBlocks{ + NodeID: nodeID, + Count: 0, + } + } + nodeBlocksMap[nodeID].Count++ + } + } + + nodes := lo.Values(nodeBlocksMap) + sort2.Sort(nodes, func(left *nodeBlocks, right *nodeBlocks) int { + return right.Count - left.Count + }) + + // 只选出块数超过一半的节点,但要保证至少有两个节点 + for i := 2; i < len(nodes); i++ { + if nodes[i].Count < len(objs)/2 { + nodes = nodes[:i] + break + } + } + + return lo.Map(nodes, func(item *nodeBlocks, idx int) cdssdk.NodeID { return item.NodeID }) +} + +type annealingState struct { + allNodeInfos map[cdssdk.NodeID]*cdssdk.Node // 所有节点的信息 + readerNodeIDs []cdssdk.NodeID // 近期可能访问此对象的节点 + nodesSortedByReader map[cdssdk.NodeID][]nodeDist // 拥有数据的节点到每个可能访问对象的节点按距离排序 + object annealingObject // 进行退火的对象 blockList []objectBlock // 排序后的块分布情况 nodeBlockBitmaps map[cdssdk.NodeID]*bitmap.Bitmap64 // 用位图的形式表示每一个节点上有哪些块 - allBlockTypeCount int // object总共被分成了几块 - minBlockTypeCount int // 最少要几块才能恢复出完整的object nodeCombTree combinatorialTree // 节点组合树,用于加速计算容灾度 maxScore float64 // 搜索过程中得到过的最大分数 @@ -127,6 +239,13 @@ type combinatorialTree struct { localNodeIDToNodeID []cdssdk.NodeID } +type annealingObject struct { + totalBlockCount int + minBlockCnt int + pinnedAt []cdssdk.NodeID + blocks []stgmod.ObjectBlock +} + const ( iterActionNone = 0 iterActionSkip = 1 @@ -331,125 +450,84 @@ type combinatorialTreeNode struct { blocksBitmap bitmap.Bitmap64 // 选择了这个中心之后,所有中心一共包含多少种块 } -func (t *CleanPinned) doOne(execCtx ExecuteContext, readerNodeIDs []cdssdk.NodeID, coorCli *coormq.Client, obj stgmod.ObjectDetail) (*coormq.ChangeObjectRedundancyEntry, error) { - if len(obj.PinnedAt) == 0 && len(obj.Blocks) == 0 { - return nil, nil - } +type annealingSolution struct { + blockList []objectBlock // 所有节点的块分布情况 + rmBlocks []bool // 要删除哪些块 +} - ctx := doingContext{ - execCtx: execCtx, +func (t *CleanPinned) startAnnealing(allNodeInfos map[cdssdk.NodeID]*cdssdk.Node, readerNodeIDs []cdssdk.NodeID, object annealingObject) annealingSolution { + state := &annealingState{ + allNodeInfos: allNodeInfos, readerNodeIDs: readerNodeIDs, nodesSortedByReader: make(map[cdssdk.NodeID][]nodeDist), - nodeInfos: make(map[cdssdk.NodeID]*cdssdk.Node), + object: object, nodeBlockBitmaps: make(map[cdssdk.NodeID]*bitmap.Bitmap64), } - err := t.getNodeInfos(&ctx, coorCli, obj) - if err != nil { - return nil, err - } - - err = t.makeBlockList(&ctx, obj) - if err != nil { - return nil, err - } - - if ctx.blockList == nil { - return nil, nil + t.initBlockList(state) + if state.blockList == nil { + return annealingSolution{} } - t.makeNodeBlockBitmap(&ctx) + t.initNodeBlockBitmap(state) - t.sortNodeByReaderDistance(&ctx) + t.sortNodeByReaderDistance(state) - ctx.rmBlocks = make([]bool, len(ctx.blockList)) - ctx.inversedIndex = -1 - ctx.nodeCombTree = newCombinatorialTree(ctx.nodeBlockBitmaps) + state.rmBlocks = make([]bool, len(state.blockList)) + state.inversedIndex = -1 + state.nodeCombTree = newCombinatorialTree(state.nodeBlockBitmaps) - ctx.lastScore = t.calcScore(&ctx) - ctx.maxScore = ctx.lastScore - ctx.maxScoreRmBlocks = mylo.ArrayClone(ctx.rmBlocks) + state.lastScore = t.calcScore(state) + state.maxScore = state.lastScore + state.maxScoreRmBlocks = lo2.ArrayClone(state.rmBlocks) // 模拟退火算法的温度 - curTemp := ctx.lastScore + curTemp := state.lastScore // 结束温度 finalTemp := curTemp * 0.2 // 冷却率 coolingRate := 0.95 for curTemp > finalTemp { - ctx.inversedIndex = rand.Intn(len(ctx.rmBlocks)) - block := ctx.blockList[ctx.inversedIndex] - ctx.rmBlocks[ctx.inversedIndex] = !ctx.rmBlocks[ctx.inversedIndex] - ctx.nodeBlockBitmaps[block.NodeID].Set(block.Index, !ctx.rmBlocks[ctx.inversedIndex]) - ctx.nodeCombTree.UpdateBitmap(block.NodeID, *ctx.nodeBlockBitmaps[block.NodeID], ctx.minBlockTypeCount) + state.inversedIndex = rand.Intn(len(state.rmBlocks)) + block := state.blockList[state.inversedIndex] + state.rmBlocks[state.inversedIndex] = !state.rmBlocks[state.inversedIndex] + state.nodeBlockBitmaps[block.NodeID].Set(block.Index, !state.rmBlocks[state.inversedIndex]) + state.nodeCombTree.UpdateBitmap(block.NodeID, *state.nodeBlockBitmaps[block.NodeID], state.object.minBlockCnt) - curScore := t.calcScore(&ctx) + curScore := t.calcScore(state) - dScore := curScore - ctx.lastScore + dScore := curScore - state.lastScore // 如果新方案比旧方案得分低,且没有要求强制接受新方案,那么就将变化改回去 if curScore == 0 || (dScore < 0 && !t.alwaysAccept(curTemp, dScore, coolingRate)) { - ctx.rmBlocks[ctx.inversedIndex] = !ctx.rmBlocks[ctx.inversedIndex] - ctx.nodeBlockBitmaps[block.NodeID].Set(block.Index, !ctx.rmBlocks[ctx.inversedIndex]) - ctx.nodeCombTree.UpdateBitmap(block.NodeID, *ctx.nodeBlockBitmaps[block.NodeID], ctx.minBlockTypeCount) - fmt.Printf("\n") + state.rmBlocks[state.inversedIndex] = !state.rmBlocks[state.inversedIndex] + state.nodeBlockBitmaps[block.NodeID].Set(block.Index, !state.rmBlocks[state.inversedIndex]) + state.nodeCombTree.UpdateBitmap(block.NodeID, *state.nodeBlockBitmaps[block.NodeID], state.object.minBlockCnt) + // fmt.Printf("\n") } else { - fmt.Printf(" accept!\n") - ctx.lastScore = curScore - if ctx.maxScore < curScore { - ctx.maxScore = ctx.lastScore - ctx.maxScoreRmBlocks = mylo.ArrayClone(ctx.rmBlocks) + // fmt.Printf(" accept!\n") + state.lastScore = curScore + if state.maxScore < curScore { + state.maxScore = state.lastScore + state.maxScoreRmBlocks = lo2.ArrayClone(state.rmBlocks) } } curTemp *= coolingRate } - - return t.applySolution(ctx, obj) -} - -func (t *CleanPinned) getNodeInfos(ctx *doingContext, coorCli *coormq.Client, obj stgmod.ObjectDetail) error { - var nodeIDs []cdssdk.NodeID - for _, b := range obj.Blocks { - nodeIDs = append(nodeIDs, b.NodeID) + // fmt.Printf("final: %v\n", state.maxScoreRmBlocks) + return annealingSolution{ + blockList: state.blockList, + rmBlocks: state.maxScoreRmBlocks, } - nodeIDs = append(nodeIDs, obj.PinnedAt...) - - nodeIDs = append(nodeIDs, ctx.readerNodeIDs...) - - getNode, err := coorCli.GetNodes(coormq.NewGetNodes(lo.Uniq(nodeIDs))) - if err != nil { - return fmt.Errorf("requesting to coordinator: %w", err) - } - - for _, n := range getNode.Nodes { - ctx.nodeInfos[n.NodeID] = &n - } - - return nil } -func (t *CleanPinned) makeBlockList(ctx *doingContext, obj stgmod.ObjectDetail) error { - blockCnt := 1 - minBlockCnt := 1 - switch red := obj.Object.Redundancy.(type) { - case *cdssdk.NoneRedundancy: - return nil - case *cdssdk.RepRedundancy: - blockCnt = 1 - minBlockCnt = 1 - case *cdssdk.ECRedundancy: - blockCnt = red.N - minBlockCnt = red.K - default: - return fmt.Errorf("unknow redundancy type: %v", myref.TypeOfValue(obj.Object.Redundancy)) - } - +func (t *CleanPinned) initBlockList(ctx *annealingState) { blocksMap := make(map[cdssdk.NodeID][]objectBlock) // 先生成所有的影子块 - for _, pinned := range obj.PinnedAt { - blocks := make([]objectBlock, 0, blockCnt) - for i := 0; i < blockCnt; i++ { + for _, pinned := range ctx.object.pinnedAt { + blocks := make([]objectBlock, 0, ctx.object.totalBlockCount) + for i := 0; i < ctx.object.totalBlockCount; i++ { blocks = append(blocks, objectBlock{ Index: i, NodeID: pinned, @@ -460,7 +538,7 @@ func (t *CleanPinned) makeBlockList(ctx *doingContext, obj stgmod.ObjectDetail) } // 再填充实际块 - for _, b := range obj.Blocks { + for _, b := range ctx.object.blocks { blocks := blocksMap[b.NodeID] has := false @@ -490,7 +568,7 @@ func (t *CleanPinned) makeBlockList(ctx *doingContext, obj stgmod.ObjectDetail) for _, bs := range blocksMap { sortedBlocks = append(sortedBlocks, bs...) } - sortedBlocks = mysort.Sort(sortedBlocks, func(left objectBlock, right objectBlock) int { + sortedBlocks = sort2.Sort(sortedBlocks, func(left objectBlock, right objectBlock) int { d := left.NodeID - right.NodeID if d != 0 { return int(d) @@ -499,36 +577,33 @@ func (t *CleanPinned) makeBlockList(ctx *doingContext, obj stgmod.ObjectDetail) return left.Index - right.Index }) - ctx.allBlockTypeCount = blockCnt - ctx.minBlockTypeCount = minBlockCnt ctx.blockList = sortedBlocks - return nil } -func (t *CleanPinned) makeNodeBlockBitmap(ctx *doingContext) { - for _, b := range ctx.blockList { - mp, ok := ctx.nodeBlockBitmaps[b.NodeID] +func (t *CleanPinned) initNodeBlockBitmap(state *annealingState) { + for _, b := range state.blockList { + mp, ok := state.nodeBlockBitmaps[b.NodeID] if !ok { nb := bitmap.Bitmap64(0) mp = &nb - ctx.nodeBlockBitmaps[b.NodeID] = mp + state.nodeBlockBitmaps[b.NodeID] = mp } mp.Set(b.Index, true) } } -func (t *CleanPinned) sortNodeByReaderDistance(ctx *doingContext) { - for _, r := range ctx.readerNodeIDs { +func (t *CleanPinned) sortNodeByReaderDistance(state *annealingState) { + for _, r := range state.readerNodeIDs { var nodeDists []nodeDist - for n := range ctx.nodeBlockBitmaps { + for n := range state.nodeBlockBitmaps { if r == n { // 同节点时距离视为0.1 nodeDists = append(nodeDists, nodeDist{ NodeID: n, Distance: consts.NodeDistanceSameNode, }) - } else if ctx.nodeInfos[r].LocationID == ctx.nodeInfos[n].LocationID { + } else if state.allNodeInfos[r].LocationID == state.allNodeInfos[n].LocationID { // 同地区时距离视为1 nodeDists = append(nodeDists, nodeDist{ NodeID: n, @@ -543,14 +618,14 @@ func (t *CleanPinned) sortNodeByReaderDistance(ctx *doingContext) { } } - ctx.nodesSortedByReader[r] = mysort.Sort(nodeDists, func(left, right nodeDist) int { return mysort.Cmp(left.Distance, right.Distance) }) + state.nodesSortedByReader[r] = sort2.Sort(nodeDists, func(left, right nodeDist) int { return sort2.Cmp(left.Distance, right.Distance) }) } } -func (t *CleanPinned) calcScore(ctx *doingContext) float64 { - dt := t.calcDisasterTolerance(ctx) - ac := t.calcMinAccessCost(ctx) - sc := t.calcSpaceCost(ctx) +func (t *CleanPinned) calcScore(state *annealingState) float64 { + dt := t.calcDisasterTolerance(state) + ac := t.calcMinAccessCost(state) + sc := t.calcSpaceCost(state) dtSc := 1.0 if dt < 1 { @@ -566,42 +641,43 @@ func (t *CleanPinned) calcScore(ctx *doingContext) float64 { newSc = dtSc / (sc * ac) } - fmt.Printf("solu: %v, cur: %v, dt: %v, ac: %v, sc: %v ", ctx.rmBlocks, newSc, dt, ac, sc) + // fmt.Printf("solu: %v, cur: %v, dt: %v, ac: %v, sc: %v \n", state.rmBlocks, newSc, dt, ac, sc) return newSc } // 计算容灾度 -func (t *CleanPinned) calcDisasterTolerance(ctx *doingContext) float64 { - if ctx.inversedIndex != -1 { - node := ctx.blockList[ctx.inversedIndex] - ctx.nodeCombTree.UpdateBitmap(node.NodeID, *ctx.nodeBlockBitmaps[node.NodeID], ctx.minBlockTypeCount) +func (t *CleanPinned) calcDisasterTolerance(state *annealingState) float64 { + if state.inversedIndex != -1 { + node := state.blockList[state.inversedIndex] + state.nodeCombTree.UpdateBitmap(node.NodeID, *state.nodeBlockBitmaps[node.NodeID], state.object.minBlockCnt) } - return float64(len(ctx.nodeBlockBitmaps) - ctx.nodeCombTree.FindKBlocksMaxDepth(ctx.minBlockTypeCount)) + return float64(len(state.nodeBlockBitmaps) - state.nodeCombTree.FindKBlocksMaxDepth(state.object.minBlockCnt)) } // 计算最小访问数据的代价 -func (t *CleanPinned) calcMinAccessCost(ctx *doingContext) float64 { +func (t *CleanPinned) calcMinAccessCost(state *annealingState) float64 { cost := math.MaxFloat64 - for _, reader := range ctx.readerNodeIDs { - tarNodes := ctx.nodesSortedByReader[reader] + for _, reader := range state.readerNodeIDs { + tarNodes := state.nodesSortedByReader[reader] gotBlocks := bitmap.Bitmap64(0) thisCost := 0.0 for _, tar := range tarNodes { - tarNodeMp := ctx.nodeBlockBitmaps[tar.NodeID] + tarNodeMp := state.nodeBlockBitmaps[tar.NodeID] // 只需要从目的节点上获得缺少的块 curWeigth := gotBlocks.Weight() // 下面的if会在拿到k个块之后跳出循环,所以or多了块也没关系 gotBlocks.Or(tarNodeMp) - willGetBlocks := mymath.Min(gotBlocks.Weight()-curWeigth, ctx.minBlockTypeCount-curWeigth) + // 但是算读取块的消耗时,不能多算,最多算读了k个块的消耗 + willGetBlocks := mymath.Min(gotBlocks.Weight()-curWeigth, state.object.minBlockCnt-curWeigth) thisCost += float64(willGetBlocks) * float64(tar.Distance) - if gotBlocks.Weight() >= ctx.minBlockTypeCount { + if gotBlocks.Weight() >= state.object.minBlockCnt { break } } - if gotBlocks.Weight() >= ctx.minBlockTypeCount { + if gotBlocks.Weight() >= state.object.minBlockCnt { cost = math.Min(cost, thisCost) } } @@ -610,7 +686,7 @@ func (t *CleanPinned) calcMinAccessCost(ctx *doingContext) float64 { } // 计算冗余度 -func (t *CleanPinned) calcSpaceCost(ctx *doingContext) float64 { +func (t *CleanPinned) calcSpaceCost(ctx *annealingState) float64 { blockCount := 0 for i, b := range ctx.blockList { if ctx.rmBlocks[i] { @@ -625,26 +701,58 @@ func (t *CleanPinned) calcSpaceCost(ctx *doingContext) float64 { } } // 所有算力中心上拥有的块的总数 / 一个对象被分成了几个块 - return float64(blockCount) / float64(ctx.minBlockTypeCount) + return float64(blockCount) / float64(ctx.object.minBlockCnt) } // 如果新方案得分比旧方案小,那么在一定概率内也接受新方案 func (t *CleanPinned) alwaysAccept(curTemp float64, dScore float64, coolingRate float64) bool { v := math.Exp(dScore / curTemp / coolingRate) - fmt.Printf(" -- chance: %v, temp: %v", v, curTemp) + // fmt.Printf(" -- chance: %v, temp: %v", v, curTemp) return v > rand.Float64() } -func (t *CleanPinned) applySolution(ctx doingContext, obj stgmod.ObjectDetail) (*coormq.ChangeObjectRedundancyEntry, error) { +func (t *CleanPinned) makePlansForRepObject(solu annealingSolution, obj stgmod.ObjectDetail, pinPlans map[cdssdk.NodeID]*[]string) coormq.ChangeObjectRedundancyEntry { + entry := coormq.ChangeObjectRedundancyEntry{ + ObjectID: obj.Object.ObjectID, + Redundancy: obj.Object.Redundancy, + } + + for i, f := range solu.rmBlocks { + hasCache := lo.ContainsBy(obj.Blocks, func(b stgmod.ObjectBlock) bool { return b.NodeID == solu.blockList[i].NodeID }) || + lo.ContainsBy(obj.PinnedAt, func(n cdssdk.NodeID) bool { return n == solu.blockList[i].NodeID }) + willRm := f + + if !willRm { + // 如果对象在退火后要保留副本的节点没有副本,则需要在这个节点创建副本 + if !hasCache { + pinPlan, ok := pinPlans[solu.blockList[i].NodeID] + if !ok { + pinPlan = &[]string{} + pinPlans[solu.blockList[i].NodeID] = pinPlan + } + *pinPlan = append(*pinPlan, obj.Object.FileHash) + } + entry.Blocks = append(entry.Blocks, stgmod.ObjectBlock{ + ObjectID: obj.Object.ObjectID, + Index: solu.blockList[i].Index, + NodeID: solu.blockList[i].NodeID, + FileHash: obj.Object.FileHash, + }) + } + } + + return entry +} + +func (t *CleanPinned) makePlansForECObject(allNodeInfos map[cdssdk.NodeID]*cdssdk.Node, solu annealingSolution, obj stgmod.ObjectDetail, planBld *plans.PlanBuilder) coormq.ChangeObjectRedundancyEntry { entry := coormq.ChangeObjectRedundancyEntry{ ObjectID: obj.Object.ObjectID, Redundancy: obj.Object.Redundancy, } - fmt.Printf("final solu: %v, score: %v\n", ctx.maxScoreRmBlocks, ctx.maxScore) reconstrct := make(map[cdssdk.NodeID]*[]int) - for i, f := range ctx.maxScoreRmBlocks { - block := ctx.blockList[i] + for i, f := range solu.rmBlocks { + block := solu.blockList[i] if !f { entry.Blocks = append(entry.Blocks, stgmod.ObjectBlock{ ObjectID: obj.Object.ObjectID, @@ -666,64 +774,109 @@ func (t *CleanPinned) applySolution(ctx doingContext, obj stgmod.ObjectDetail) ( } } - bld := reqbuilder.NewBuilder() - for id := range reconstrct { - bld.IPFS().Buzy(id) + ecRed := obj.Object.Redundancy.(*cdssdk.ECRedundancy) + + for id, idxs := range reconstrct { + agt := planBld.AtAgent(*allNodeInfos[id]) + + strs := agt.IPFSRead(obj.Object.FileHash).ChunkedSplit(ecRed.ChunkSize, ecRed.K, true) + ss := agt.ECReconstructAny(*ecRed, lo.Range(ecRed.K), *idxs, strs.Streams...) + for i, s := range ss.Streams { + s.IPFSWrite(fmt.Sprintf("%d.%d", obj.Object.ObjectID, (*idxs)[i])) + } } + return entry +} + +func (t *CleanPinned) executePlans(execCtx ExecuteContext, pinPlans map[cdssdk.NodeID]*[]string, planBld *plans.PlanBuilder) (map[string]any, error) { + log := logger.WithType[CleanPinned]("Event") - mutex, err := bld.MutexLock(ctx.execCtx.Args.DistLock) + ioPlan, err := planBld.Build() + if err != nil { + return nil, fmt.Errorf("building io switch plan: %w", err) + } + + // 统一加锁,有重复也没关系 + lockBld := reqbuilder.NewBuilder() + for nodeID := range pinPlans { + lockBld.IPFS().Buzy(nodeID) + } + for _, plan := range ioPlan.AgentPlans { + lockBld.IPFS().Buzy(plan.Node.NodeID) + } + lock, err := lockBld.MutexLock(execCtx.Args.DistLock) if err != nil { return nil, fmt.Errorf("acquiring distlock: %w", err) } - defer mutex.Unlock() + defer lock.Unlock() - if ecRed, ok := obj.Object.Redundancy.(*cdssdk.ECRedundancy); ok { - for id, idxs := range reconstrct { - bld := plans.NewPlanBuilder() - agt := bld.AtAgent(*ctx.nodeInfos[id]) + wg := sync.WaitGroup{} - strs := agt.IPFSRead(obj.Object.FileHash).ChunkedSplit(ecRed.ChunkSize, ecRed.K, true) - ss := agt.ECReconstructAny(*ecRed, lo.Range(ecRed.K), *idxs, strs.Streams...) - for i, s := range ss.Streams { - s.IPFSWrite(fmt.Sprintf("%d", (*idxs)[i])) - } + // 执行pin操作 + var anyPinErr error + for nodeID, pin := range pinPlans { + wg.Add(1) + go func(nodeID cdssdk.NodeID, pin *[]string) { + defer wg.Done() - plan, err := bld.Build() + agtCli, err := stgglb.AgentMQPool.Acquire(nodeID) if err != nil { - return nil, fmt.Errorf("building io switch plan: %w", err) + log.Warnf("new agent client: %s", err.Error()) + return } + defer stgglb.AgentMQPool.Release(agtCli) - exec, err := plans.Execute(*plan) + _, err = agtCli.PinObject(agtmq.ReqPinObject(*pin, false)) if err != nil { - return nil, fmt.Errorf("executing io switch plan: %w", err) + log.Warnf("pinning object: %s", err.Error()) + anyPinErr = err } - ret, err := exec.Wait() - if err != nil { - return nil, fmt.Errorf("executing io switch plan: %w", err) - } - - for k, v := range ret.ResultValues { - idx, err := strconv.ParseInt(k, 10, 32) - if err != nil { - return nil, fmt.Errorf("parsing plan result: %w", err) - } + }(nodeID, pin) + } - for i := range entry.Blocks { - if entry.Blocks[i].NodeID == id && entry.Blocks[i].Index == int(idx) { - entry.Blocks[i].FileHash = v.(string) - } - } - } + // 执行IO计划 + var ioSwRets map[string]any + var ioSwErr error + wg.Add(1) + go func() { + defer wg.Done() + exec, err := plans.Execute(*ioPlan) + if err != nil { + ioSwErr = fmt.Errorf("executing io switch plan: %w", err) + return } - } else if _, ok := obj.Object.Redundancy.(*cdssdk.RepRedundancy); ok { - // rep模式不分块,所以每一个Block的FileHash就是完整文件的FileHash - for i := range entry.Blocks { - entry.Blocks[i].FileHash = obj.Object.FileHash + ret, err := exec.Wait() + if err != nil { + ioSwErr = fmt.Errorf("waiting io switch plan: %w", err) + return } + ioSwRets = ret.ResultValues + }() + + wg.Wait() + + if anyPinErr != nil { + return nil, anyPinErr + } + + if ioSwErr != nil { + return nil, ioSwErr } - return &entry, nil + return ioSwRets, nil +} + +func (t *CleanPinned) populateECObjectEntry(entry *coormq.ChangeObjectRedundancyEntry, obj stgmod.ObjectDetail, ioRets map[string]any) { + for i := range entry.Blocks { + if entry.Blocks[i].FileHash != "" { + continue + } + + key := fmt.Sprintf("%d.%d", obj.Object.ObjectID, entry.Blocks[i].Index) + // 不应该出现key不存在的情况 + entry.Blocks[i].FileHash = ioRets[key].(string) + } } func init() {