From f2b2762082a63ddc454ba86754e8e3479c9a3b0e Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Tue, 27 Feb 2024 11:40:59 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E6=B5=8B=E8=AF=95=E4=B8=AD?= =?UTF-8?q?=E5=8F=91=E7=8E=B0=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- agent/internal/mq/object.go | 6 +- client/internal/cmdline/package.go | 10 +- common/assets/confs/agent.config.json | 3 +- common/assets/confs/client.config.json | 3 +- common/assets/confs/scanner.config.json | 3 +- common/pkgs/db/cache.go | 19 +- common/pkgs/db/db.go | 5 + common/pkgs/db/object.go | 249 +++++++++++++----- common/pkgs/db/object_block.go | 15 +- common/pkgs/db/package.go | 2 +- common/pkgs/db/pinned_object.go | 14 + common/pkgs/db/utils.go | 75 ++++++ common/pkgs/grpc/agent/pool.go | 31 ++- .../pkgs/iterator/download_object_iterator.go | 81 ++++-- common/pkgs/mq/agent/client.go | 26 +- common/pkgs/mq/coordinator/client.go | 19 +- common/pkgs/mq/coordinator/object.go | 4 +- common/pkgs/mq/scanner/client.go | 19 +- scanner/internal/event/agent_check_cache.go | 2 +- 19 files changed, 458 insertions(+), 128 deletions(-) create mode 100644 common/pkgs/db/utils.go diff --git a/agent/internal/mq/object.go b/agent/internal/mq/object.go index 06a106f..66bcfd7 100644 --- a/agent/internal/mq/object.go +++ b/agent/internal/mq/object.go @@ -2,19 +2,19 @@ package mq import ( "gitlink.org.cn/cloudream/common/consts/errorcode" - log "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" "gitlink.org.cn/cloudream/storage/agent/internal/task" agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" ) func (svc *Service) PinObject(msg *agtmq.PinObject) (*agtmq.PinObjectResp, *mq.CodeMessage) { - log.WithField("FileHash", msg.FileHash).Debugf("pin object") + logger.WithField("FileHash", msg.FileHash).Debugf("pin object") tsk := svc.taskManager.StartComparable(task.NewIPFSPin(msg.FileHash)) if tsk.Error() != nil { - log.WithField("FileHash", msg.FileHash). + logger.WithField("FileHash", msg.FileHash). Warnf("pin object failed, err: %s", tsk.Error().Error()) return nil, mq.Failed(errorcode.OperationFailed, "pin object failed") } diff --git a/client/internal/cmdline/package.go b/client/internal/cmdline/package.go index b0f9bc2..a4a7dfd 100644 --- a/client/internal/cmdline/package.go +++ b/client/internal/cmdline/package.go @@ -46,6 +46,8 @@ func PackageDownloadPackage(ctx CommandContext, packageID cdssdk.PackageID, outp } defer objIter.Close() + madeDirs := make(map[string]bool) + for { objInfo, err := objIter.MoveNext() if err == iterator.ErrNoMoreItem { @@ -61,8 +63,11 @@ func PackageDownloadPackage(ctx CommandContext, packageID cdssdk.PackageID, outp fullPath := filepath.Join(outputDir, objInfo.Object.Path) dirPath := filepath.Dir(fullPath) - if err := os.MkdirAll(dirPath, 0755); err != nil { - return fmt.Errorf("creating object dir: %w", err) + if !madeDirs[dirPath] { + if err := os.MkdirAll(dirPath, 0755); err != nil { + return fmt.Errorf("creating object dir: %w", err) + } + madeDirs[dirPath] = true } outputFile, err := os.Create(fullPath) @@ -135,6 +140,7 @@ func PackageCreatePackage(ctx CommandContext, name string, rootPath string, buck }) } fmt.Print(tb.Render()) + fmt.Printf("\n%v", uploadObjectResult.PackageID) return nil } diff --git a/common/assets/confs/agent.config.json b/common/assets/confs/agent.config.json index e706fd4..226711e 100644 --- a/common/assets/confs/agent.config.json +++ b/common/assets/confs/agent.config.json @@ -30,6 +30,7 @@ "etcdUsername": "", "etcdPassword": "", "etcdLockLeaseTimeSec": 5, - "description": "I am a agent" + "randomReleasingDelayMs": 3000, + "serviceDescription": "I am a agent" } } \ No newline at end of file diff --git a/common/assets/confs/client.config.json b/common/assets/confs/client.config.json index 10e4265..d7ce483 100644 --- a/common/assets/confs/client.config.json +++ b/common/assets/confs/client.config.json @@ -23,6 +23,7 @@ "etcdUsername": "", "etcdPassword": "", "etcdLockLeaseTimeSec": 5, - "description": "I am a client" + "randomReleasingDelayMs": 3000, + "serviceDescription": "I am a client" } } \ No newline at end of file diff --git a/common/assets/confs/scanner.config.json b/common/assets/confs/scanner.config.json index 4dbd650..e84b602 100644 --- a/common/assets/confs/scanner.config.json +++ b/common/assets/confs/scanner.config.json @@ -24,6 +24,7 @@ "etcdUsername": "", "etcdPassword": "", "etcdLockLeaseTimeSec": 5, - "description": "I am a scanner" + "randomReleasingDelayMs": 3000, + "serviceDescription": "I am a scanner" } } \ No newline at end of file diff --git a/common/pkgs/db/cache.go b/common/pkgs/db/cache.go index ea59e8a..964326f 100644 --- a/common/pkgs/db/cache.go +++ b/common/pkgs/db/cache.go @@ -44,7 +44,19 @@ func (*CacheDB) Create(ctx SQLContext, fileHash string, nodeID cdssdk.NodeID, pr return nil } -func (*CacheDB) BatchCreate(ctx SQLContext, fileHashes []string, nodeID cdssdk.NodeID, priority int) error { +// 批量创建缓存记录 +func (*CacheDB) BatchCreate(ctx SQLContext, caches []model.Cache) error { + return BatchNamedExec( + ctx, + "insert into Cache(FileHash,NodeID,CreateTime,Priority) values(:FileHash,:NodeID,:CreateTime,:Priority)"+ + " on duplicate key update CreateTime=values(CreateTime), Priority=values(Priority)", + 4, + caches, + nil, + ) +} + +func (*CacheDB) BatchCreateOnSameNode(ctx SQLContext, fileHashes []string, nodeID cdssdk.NodeID, priority int) error { var caches []model.Cache var nowTime = time.Now() for _, hash := range fileHashes { @@ -56,8 +68,9 @@ func (*CacheDB) BatchCreate(ctx SQLContext, fileHashes []string, nodeID cdssdk.N }) } - _, err := sqlx.NamedExec(ctx, "insert into Cache(FileHash,NodeID,CreateTime,Priority) values(:FileHash,:NodeID,:CreateTime,:Priority)"+ - " on duplicate key update CreateTime=values(CreateTime), Priority=values(Priority)", + _, err := sqlx.NamedExec(ctx, + "insert into Cache(FileHash,NodeID,CreateTime,Priority) values(:FileHash,:NodeID,:CreateTime,:Priority)"+ + " on duplicate key update CreateTime=values(CreateTime), Priority=values(Priority)", caches, ) return err diff --git a/common/pkgs/db/db.go b/common/pkgs/db/db.go index 77761a3..d04f878 100644 --- a/common/pkgs/db/db.go +++ b/common/pkgs/db/db.go @@ -18,6 +18,11 @@ type SQLContext interface { sqlx.Queryer sqlx.Execer sqlx.Ext + sqlx.Preparer + + NamedQuery(query string, arg interface{}) (*sqlx.Rows, error) + NamedExec(query string, arg interface{}) (sql.Result, error) + PrepareNamed(query string) (*sqlx.NamedStmt, error) } func NewDB(cfg *config.Config) (*DB, error) { diff --git a/common/pkgs/db/object.go b/common/pkgs/db/object.go index af4369e..24e3941 100644 --- a/common/pkgs/db/object.go +++ b/common/pkgs/db/object.go @@ -2,6 +2,7 @@ package db import ( "fmt" + "time" "github.com/jmoiron/sqlx" "github.com/samber/lo" @@ -25,6 +26,23 @@ func (db *ObjectDB) GetByID(ctx SQLContext, objectID cdssdk.ObjectID) (model.Obj return ret.ToObject(), err } +func (db *ObjectDB) BatchGetPackageObjectIDs(ctx SQLContext, pkgID cdssdk.PackageID, pathes []string) ([]cdssdk.ObjectID, error) { + // TODO In语句 + stmt, args, err := sqlx.In("select ObjectID from Object force index(PackagePath) where PackageID=? and Path in (?)", pkgID, pathes) + if err != nil { + return nil, err + } + stmt = ctx.Rebind(stmt) + + objIDs := make([]cdssdk.ObjectID, 0, len(pathes)) + err = sqlx.Select(ctx, &objIDs, stmt, args...) + if err != nil { + return nil, err + } + + return objIDs, nil +} + func (db *ObjectDB) Create(ctx SQLContext, packageID cdssdk.PackageID, path string, size int64, fileHash string, redundancy cdssdk.Redundancy) (int64, error) { sql := "insert into Object(PackageID, Path, Size, FileHash, Redundancy) values(?,?,?,?,?)" @@ -75,6 +93,15 @@ func (db *ObjectDB) CreateOrUpdate(ctx SQLContext, packageID cdssdk.PackageID, p return objID, false, nil } +// 批量创建或者更新记录 +func (db *ObjectDB) BatchCreateOrUpdate(ctx SQLContext, objs []cdssdk.Object) error { + sql := "insert into Object(PackageID, Path, Size, FileHash, Redundancy)" + + " values(:PackageID,:Path,:Size,:FileHash,:Redundancy)" + + " on duplicate key update Size = values(Size), FileHash = values(FileHash), Redundancy = values(Redundancy)" + + return BatchNamedExec(ctx, sql, 5, objs, nil) +} + func (*ObjectDB) UpdateFileInfo(ctx SQLContext, objectID cdssdk.ObjectID, fileSize int64) (bool, error) { ret, err := ctx.Exec("update Object set FileSize = ? where ObjectID = ?", fileSize, objectID) if err != nil { @@ -104,103 +131,189 @@ func (db *ObjectDB) GetPackageObjectDetails(ctx SQLContext, packageID cdssdk.Pac rets := make([]stgmod.ObjectDetail, 0, len(objs)) - for _, obj := range objs { - var blocks []stgmod.ObjectBlock - err = sqlx.Select(ctx, - &blocks, - "select * from ObjectBlock where ObjectID = ? order by `Index`", - obj.ObjectID, - ) - if err != nil { - return nil, err + var allBlocks []stgmod.ObjectBlock + err = sqlx.Select(ctx, &allBlocks, "select ObjectBlock.* from ObjectBlock, Object where PackageID = ? and ObjectBlock.ObjectID = Object.ObjectID order by ObjectBlock.ObjectID, `Index` asc", packageID) + if err != nil { + return nil, fmt.Errorf("getting all object blocks: %w", err) + } + + var allPinnedObjs []cdssdk.PinnedObject + err = sqlx.Select(ctx, &allPinnedObjs, "select PinnedObject.* from PinnedObject, Object where PackageID = ? and PinnedObject.ObjectID = Object.ObjectID order by PinnedObject.ObjectID", packageID) + if err != nil { + return nil, fmt.Errorf("getting all pinned objects: %w", err) + } + + blksCur := 0 + pinnedsCur := 0 + for _, temp := range objs { + detail := stgmod.ObjectDetail{ + Object: temp.ToObject(), } - var pinnedAt []cdssdk.NodeID - err = sqlx.Select(ctx, &pinnedAt, "select NodeID from PinnedObject where ObjectID = ?", obj.ObjectID) - if err != nil { - return nil, err + // 1. 查询Object和ObjectBlock时均按照ObjectID升序排序 + // 2. ObjectBlock结果集中的不同ObjectID数只会比Object结果集的少 + // 因此在两个结果集上同时从头开始遍历时,如果两边的ObjectID字段不同,那么一定是ObjectBlock这边的ObjectID > Object的ObjectID, + // 此时让Object的遍历游标前进,直到两边的ObjectID再次相等 + for ; blksCur < len(allBlocks); blksCur++ { + if allBlocks[blksCur].ObjectID != temp.ObjectID { + break + } + detail.Blocks = append(detail.Blocks, allBlocks[blksCur]) } - rets = append(rets, stgmod.NewObjectDetail(obj.ToObject(), pinnedAt, blocks)) + for ; pinnedsCur < len(allPinnedObjs); pinnedsCur++ { + if allPinnedObjs[pinnedsCur].ObjectID != temp.ObjectID { + break + } + detail.PinnedAt = append(detail.PinnedAt, allPinnedObjs[pinnedsCur].NodeID) + } + + rets = append(rets, detail) } return rets, nil } -func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, objs []coormq.AddObjectEntry) ([]cdssdk.ObjectID, error) { - objIDs := make([]cdssdk.ObjectID, 0, len(objs)) - for _, obj := range objs { - // 创建对象的记录 - objID, isCreate, err := db.CreateOrUpdate(ctx, packageID, obj.Path, obj.Size, obj.FileHash) - if err != nil { - return nil, fmt.Errorf("creating object: %w", err) - } +func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds []coormq.AddObjectEntry) ([]cdssdk.ObjectID, error) { + objs := make([]cdssdk.Object, 0, len(adds)) + for _, add := range adds { + objs = append(objs, cdssdk.Object{ + PackageID: packageID, + Path: add.Path, + Size: add.Size, + FileHash: add.FileHash, + Redundancy: cdssdk.NewNoneRedundancy(), // 首次上传默认使用不分块的none模式 + }) + } - objIDs = append(objIDs, objID) + err := db.BatchCreateOrUpdate(ctx, objs) + if err != nil { + return nil, fmt.Errorf("batch create or update objects: %w", err) + } - if !isCreate { - // 删除原本所有的编码块记录,重新添加 - if err = db.ObjectBlock().DeleteByObjectID(ctx, objID); err != nil { - return nil, fmt.Errorf("deleting all object block: %w", err) - } + pathes := make([]string, 0, len(adds)) + for _, add := range adds { + pathes = append(pathes, add.Path) + } + objIDs, err := db.BatchGetPackageObjectIDs(ctx, packageID, pathes) + if err != nil { + return nil, fmt.Errorf("batch get object ids: %w", err) + } - // 删除原本Pin住的Object。暂不考虑FileHash没有变化的情况 - if err = db.PinnedObject().DeleteByObjectID(ctx, objID); err != nil { - return nil, fmt.Errorf("deleting all pinned object: %w", err) - } - } + err = db.ObjectBlock().BatchDeleteByObjectID(ctx, objIDs) + if err != nil { + return nil, fmt.Errorf("batch delete object blocks: %w", err) + } - // 首次上传默认使用不分块的none模式 - err = db.ObjectBlock().Create(ctx, objID, 0, obj.NodeID, obj.FileHash) - if err != nil { - return nil, fmt.Errorf("creating object block: %w", err) - } + err = db.PinnedObject().BatchDeleteByObjectID(ctx, objIDs) + if err != nil { + return nil, fmt.Errorf("batch delete pinned objects: %w", err) + } - // 创建缓存记录 - err = db.Cache().Create(ctx, obj.FileHash, obj.NodeID, 0) - if err != nil { - return nil, fmt.Errorf("creating cache: %w", err) - } + objBlocks := make([]stgmod.ObjectBlock, 0, len(adds)) + for i, add := range adds { + objBlocks = append(objBlocks, stgmod.ObjectBlock{ + ObjectID: objIDs[i], + Index: 0, + NodeID: add.NodeID, + FileHash: add.FileHash, + }) + } + + err = db.ObjectBlock().BatchCreate(ctx, objBlocks) + if err != nil { + return nil, fmt.Errorf("batch create object blocks: %w", err) + } + + caches := make([]model.Cache, 0, len(adds)) + for _, add := range adds { + caches = append(caches, model.Cache{ + FileHash: add.FileHash, + NodeID: add.NodeID, + CreateTime: time.Now(), + Priority: 0, + }) + } + + err = db.Cache().BatchCreate(ctx, caches) + if err != nil { + return nil, fmt.Errorf("batch create caches: %w", err) } return objIDs, nil } func (db *ObjectDB) BatchUpdateRedundancy(ctx SQLContext, objs []coormq.ChangeObjectRedundancyEntry) error { + objIDs := make([]cdssdk.ObjectID, 0, len(objs)) + dummyObjs := make([]cdssdk.Object, 0, len(objs)) for _, obj := range objs { - _, err := ctx.Exec("update Object set Redundancy = ? where ObjectID = ?", obj.Redundancy, obj.ObjectID) - if err != nil { - return fmt.Errorf("updating object: %w", err) - } + objIDs = append(objIDs, obj.ObjectID) + dummyObjs = append(dummyObjs, cdssdk.Object{ + ObjectID: obj.ObjectID, + Redundancy: obj.Redundancy, + }) + } - // 删除原本所有的编码块记录,重新添加 - if err = db.ObjectBlock().DeleteByObjectID(ctx, obj.ObjectID); err != nil { - return fmt.Errorf("deleting all object block: %w", err) - } + // 目前只能使用这种方式来同时更新大量数据 + err := BatchNamedExec(ctx, + "insert into Object(ObjectID, PackageID, Path, Size, FileHash, Redundancy)"+ + " values(:ObjectID, :PackageID, :Path, :Size, :FileHash, :Redundancy) as new"+ + " on duplicate key update Redundancy=new.Redundancy", 6, dummyObjs, nil) + if err != nil { + return fmt.Errorf("batch update object redundancy: %w", err) + } - // 删除原本Pin住的Object。暂不考虑FileHash没有变化的情况 - if err = db.PinnedObject().DeleteByObjectID(ctx, obj.ObjectID); err != nil { - return fmt.Errorf("deleting all pinned object: %w", err) - } + // 删除原本所有的编码块记录,重新添加 + err = db.ObjectBlock().BatchDeleteByObjectID(ctx, objIDs) + if err != nil { + return fmt.Errorf("batch delete object blocks: %w", err) + } - for _, block := range obj.Blocks { - err = db.ObjectBlock().Create(ctx, obj.ObjectID, block.Index, block.NodeID, block.FileHash) - if err != nil { - return fmt.Errorf("creating object block: %w", err) - } + // 删除原本Pin住的Object。暂不考虑FileHash没有变化的情况 + err = db.PinnedObject().BatchDeleteByObjectID(ctx, objIDs) + if err != nil { + return fmt.Errorf("batch delete pinned object: %w", err) + } - // 创建缓存记录 - err = db.Cache().Create(ctx, block.FileHash, block.NodeID, 0) - if err != nil { - return fmt.Errorf("creating cache: %w", err) - } + blocks := make([]stgmod.ObjectBlock, 0, len(objs)) + for _, obj := range objs { + blocks = append(blocks, obj.Blocks...) + } + err = db.ObjectBlock().BatchCreate(ctx, blocks) + if err != nil { + return fmt.Errorf("batch create object blocks: %w", err) + } + + caches := make([]model.Cache, 0, len(objs)) + for _, obj := range objs { + for _, blk := range obj.Blocks { + caches = append(caches, model.Cache{ + FileHash: blk.FileHash, + NodeID: blk.NodeID, + CreateTime: time.Now(), + Priority: 0, + }) } + } + err = db.Cache().BatchCreate(ctx, caches) + if err != nil { + return fmt.Errorf("batch create object caches: %w", err) + } - err = db.PinnedObject().ObjectBatchCreate(ctx, obj.ObjectID, obj.PinnedAt) - if err != nil { - return fmt.Errorf("creating pinned object: %w", err) + pinneds := make([]cdssdk.PinnedObject, 0, len(objs)) + for _, obj := range objs { + for _, p := range obj.PinnedAt { + pinneds = append(pinneds, cdssdk.PinnedObject{ + ObjectID: obj.ObjectID, + NodeID: p, + CreateTime: time.Now(), + }) } } + err = db.PinnedObject().BatchTryCreate(ctx, pinneds) + if err != nil { + return fmt.Errorf("batch create pinned objects: %w", err) + } return nil } diff --git a/common/pkgs/db/object_block.go b/common/pkgs/db/object_block.go index e560e03..f420bfb 100644 --- a/common/pkgs/db/object_block.go +++ b/common/pkgs/db/object_block.go @@ -30,11 +30,12 @@ func (db *ObjectBlockDB) Create(ctx SQLContext, objectID cdssdk.ObjectID, index } func (db *ObjectBlockDB) BatchCreate(ctx SQLContext, blocks []stgmod.ObjectBlock) error { - _, err := sqlx.NamedExec(ctx, + return BatchNamedExec(ctx, "insert ignore into ObjectBlock(ObjectID, `Index`, NodeID, FileHash) values(:ObjectID, :Index, :NodeID, :FileHash)", + 4, blocks, + nil, ) - return err } func (db *ObjectBlockDB) DeleteByObjectID(ctx SQLContext, objectID cdssdk.ObjectID) error { @@ -42,6 +43,16 @@ func (db *ObjectBlockDB) DeleteByObjectID(ctx SQLContext, objectID cdssdk.Object return err } +func (db *ObjectBlockDB) BatchDeleteByObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) error { + // TODO in语句有长度限制 + query, args, err := sqlx.In("delete from ObjectBlock where ObjectID in (?)", objectIDs) + if err != nil { + return err + } + _, err = ctx.Exec(query, args...) + return err +} + func (db *ObjectBlockDB) DeleteInPackage(ctx SQLContext, packageID cdssdk.PackageID) error { _, err := ctx.Exec("delete ObjectBlock from ObjectBlock inner join Object on ObjectBlock.ObjectID = Object.ObjectID where PackageID = ?", packageID) return err diff --git a/common/pkgs/db/package.go b/common/pkgs/db/package.go index 1491bec..4310ced 100644 --- a/common/pkgs/db/package.go +++ b/common/pkgs/db/package.go @@ -80,7 +80,7 @@ func (db *PackageDB) GetUserPackage(ctx SQLContext, userID cdssdk.UserID, packag func (db *PackageDB) Create(ctx SQLContext, bucketID cdssdk.BucketID, name string) (cdssdk.PackageID, error) { // 根据packagename和bucketid查询,若不存在则插入,若存在则返回错误 var packageID int64 - err := sqlx.Get(ctx, &packageID, "select PackageID from Package where Name = ? AND BucketID = ?", name, bucketID) + err := sqlx.Get(ctx, &packageID, "select PackageID from Package where Name = ? AND BucketID = ? for update", name, bucketID) // 无错误代表存在记录 if err == nil { return 0, fmt.Errorf("package with given Name and BucketID already exists") diff --git a/common/pkgs/db/pinned_object.go b/common/pkgs/db/pinned_object.go index 026e5ca..1cfd981 100644 --- a/common/pkgs/db/pinned_object.go +++ b/common/pkgs/db/pinned_object.go @@ -39,6 +39,10 @@ func (*PinnedObjectDB) TryCreate(ctx SQLContext, nodeID cdssdk.NodeID, objectID return err } +func (*PinnedObjectDB) BatchTryCreate(ctx SQLContext, pinneds []cdssdk.PinnedObject) error { + return BatchNamedExec(ctx, "insert ignore into PinnedObject values(:NodeID,:ObjectID,:CreateTime)", 3, pinneds, nil) +} + func (*PinnedObjectDB) CreateFromPackage(ctx SQLContext, packageID cdssdk.PackageID, nodeID cdssdk.NodeID) error { _, err := ctx.Exec( "insert ignore into PinnedObject(NodeID, ObjectID, CreateTime) select ? as NodeID, ObjectID, ? as CreateTime from Object where PackageID = ?", @@ -69,6 +73,16 @@ func (*PinnedObjectDB) DeleteByObjectID(ctx SQLContext, objectID cdssdk.ObjectID return err } +func (*PinnedObjectDB) BatchDeleteByObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) error { + // TODO in语句有长度限制 + query, args, err := sqlx.In("delete from PinnedObject where ObjectID in (?)", objectIDs) + if err != nil { + return err + } + _, err = ctx.Exec(query, args...) + return err +} + func (*PinnedObjectDB) DeleteInPackage(ctx SQLContext, packageID cdssdk.PackageID) error { _, err := ctx.Exec("delete PinnedObject from PinnedObject inner join Object on PinnedObject.ObjectID = Object.ObjectID where PackageID = ?", packageID) return err diff --git a/common/pkgs/db/utils.go b/common/pkgs/db/utils.go new file mode 100644 index 0000000..5dde56f --- /dev/null +++ b/common/pkgs/db/utils.go @@ -0,0 +1,75 @@ +package db + +import ( + "database/sql" + + "github.com/jmoiron/sqlx" + "gitlink.org.cn/cloudream/common/utils/math" +) + +const ( + maxPlaceholderCount = 65535 +) + +func BatchNamedExec[T any](ctx SQLContext, sql string, argCnt int, arr []T, callback func(sql.Result) bool) error { + if argCnt == 0 { + ret, err := ctx.NamedExec(sql, arr) + if err != nil { + return err + } + + if callback != nil { + callback(ret) + } + + return nil + } + + batchSize := maxPlaceholderCount / argCnt + for len(arr) > 0 { + curBatchSize := math.Min(batchSize, len(arr)) + + ret, err := ctx.NamedExec(sql, arr[:curBatchSize]) + if err != nil { + return nil + } + if callback != nil && !callback(ret) { + return nil + } + + arr = arr[curBatchSize:] + } + + return nil +} + +func BatchNamedQuery[T any](ctx SQLContext, sql string, argCnt int, arr []T, callback func(*sqlx.Rows) bool) error { + if argCnt == 0 { + ret, err := ctx.NamedQuery(sql, arr) + if err != nil { + return err + } + + if callback != nil { + callback(ret) + } + + return nil + } + + batchSize := maxPlaceholderCount / argCnt + for len(arr) > 0 { + curBatchSize := math.Min(batchSize, len(arr)) + + ret, err := ctx.NamedQuery(sql, arr[:curBatchSize]) + if err != nil { + return nil + } + if callback != nil && !callback(ret) { + return nil + } + + arr = arr[curBatchSize:] + } + return nil +} diff --git a/common/pkgs/grpc/agent/pool.go b/common/pkgs/grpc/agent/pool.go index 9d00619..3b06ec9 100644 --- a/common/pkgs/grpc/agent/pool.go +++ b/common/pkgs/grpc/agent/pool.go @@ -2,6 +2,7 @@ package agent import ( "fmt" + sync "sync" ) type PoolConfig struct { @@ -18,28 +19,42 @@ func (c *PoolClient) Close() { type Pool struct { grpcCfg *PoolConfig + shareds map[string]*PoolClient + lock sync.Mutex } func NewPool(grpcCfg *PoolConfig) *Pool { return &Pool{ grpcCfg: grpcCfg, + shareds: make(map[string]*PoolClient), } } // 获取一个GRPC客户端。由于事先不能知道所有agent的GRPC配置信息,所以只能让调用者把建立连接所需的配置都传递进来, // Pool来决定要不要新建客户端。 func (p *Pool) Acquire(ip string, port int) (*PoolClient, error) { - cli, err := NewClient(fmt.Sprintf("%s:%d", ip, port)) - if err != nil { - return nil, err + addr := fmt.Sprintf("%s:%d", ip, port) + + p.lock.Lock() + defer p.lock.Unlock() + + cli, ok := p.shareds[addr] + if !ok { + c, err := NewClient(addr) + if err != nil { + return nil, err + } + cli = &PoolClient{ + Client: c, + owner: p, + } + p.shareds[addr] = cli } - return &PoolClient{ - Client: cli, - owner: p, - }, nil + return cli, nil + } func (p *Pool) Release(cli *PoolClient) { - cli.Client.Close() + // TODO 释放长时间未使用的client } diff --git a/common/pkgs/iterator/download_object_iterator.go b/common/pkgs/iterator/download_object_iterator.go index 722b653..4fdfcd3 100644 --- a/common/pkgs/iterator/download_object_iterator.go +++ b/common/pkgs/iterator/download_object_iterator.go @@ -46,8 +46,11 @@ type DownloadObjectIterator struct { objectDetails []stgmodels.ObjectDetail currentIndex int + inited bool downloadCtx *DownloadContext + coorCli *coormq.Client + allNodes map[cdssdk.NodeID]cdssdk.Node } func NewDownloadObjectIterator(objectDetails []stgmodels.ObjectDetail, downloadCtx *DownloadContext) *DownloadObjectIterator { @@ -58,27 +61,60 @@ func NewDownloadObjectIterator(objectDetails []stgmodels.ObjectDetail, downloadC } func (i *DownloadObjectIterator) MoveNext() (*IterDownloadingObject, error) { - coorCli, err := stgglb.CoordinatorMQPool.Acquire() - if err != nil { - return nil, fmt.Errorf("new coordinator client: %w", err) + if !i.inited { + if err := i.init(); err != nil { + return nil, err + } + + i.inited = true } - defer stgglb.CoordinatorMQPool.Release(coorCli) if i.currentIndex >= len(i.objectDetails) { return nil, ErrNoMoreItem } - item, err := i.doMove(coorCli) + item, err := i.doMove() i.currentIndex++ return item, err } -func (iter *DownloadObjectIterator) doMove(coorCli *coormq.Client) (*IterDownloadingObject, error) { +func (i *DownloadObjectIterator) init() error { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return fmt.Errorf("new coordinator client: %w", err) + } + i.coorCli = coorCli + + allNodeIDs := make(map[cdssdk.NodeID]bool) + for _, obj := range i.objectDetails { + for _, p := range obj.PinnedAt { + allNodeIDs[p] = true + } + + for _, b := range obj.Blocks { + allNodeIDs[b.NodeID] = true + } + } + + getNodes, err := coorCli.GetNodes(coormq.NewGetNodes(lo.Keys(allNodeIDs))) + if err != nil { + return fmt.Errorf("getting nodes: %w", err) + } + + i.allNodes = make(map[cdssdk.NodeID]cdssdk.Node) + for _, n := range getNodes.Nodes { + i.allNodes[n.NodeID] = n + } + + return nil +} + +func (iter *DownloadObjectIterator) doMove() (*IterDownloadingObject, error) { obj := iter.objectDetails[iter.currentIndex] switch red := obj.Object.Redundancy.(type) { case *cdssdk.NoneRedundancy: - reader, err := iter.downloadNoneOrRepObject(coorCli, iter.downloadCtx, obj) + reader, err := iter.downloadNoneOrRepObject(obj) if err != nil { return nil, fmt.Errorf("downloading object: %w", err) } @@ -89,7 +125,7 @@ func (iter *DownloadObjectIterator) doMove(coorCli *coormq.Client) (*IterDownloa }, nil case *cdssdk.RepRedundancy: - reader, err := iter.downloadNoneOrRepObject(coorCli, iter.downloadCtx, obj) + reader, err := iter.downloadNoneOrRepObject(obj) if err != nil { return nil, fmt.Errorf("downloading rep object: %w", err) } @@ -100,7 +136,7 @@ func (iter *DownloadObjectIterator) doMove(coorCli *coormq.Client) (*IterDownloa }, nil case *cdssdk.ECRedundancy: - reader, err := iter.downloadECObject(coorCli, iter.downloadCtx, obj, red) + reader, err := iter.downloadECObject(obj, red) if err != nil { return nil, fmt.Errorf("downloading ec object: %w", err) } @@ -120,15 +156,15 @@ func (i *DownloadObjectIterator) Close() { } } -func (iter *DownloadObjectIterator) downloadNoneOrRepObject(coorCli *coormq.Client, ctx *DownloadContext, obj stgmodels.ObjectDetail) (io.ReadCloser, error) { - allNodes, err := iter.sortDownloadNodes(coorCli, ctx, obj) +func (iter *DownloadObjectIterator) downloadNoneOrRepObject(obj stgmodels.ObjectDetail) (io.ReadCloser, error) { + allNodes, err := iter.sortDownloadNodes(obj) if err != nil { return nil, err } bsc, blocks := iter.getMinReadingBlockSolution(allNodes, 1) osc, node := iter.getMinReadingObjectSolution(allNodes, 1) if bsc < osc { - return downloadFile(ctx, blocks[0].Node, blocks[0].Block.FileHash) + return downloadFile(iter.downloadCtx, blocks[0].Node, blocks[0].Block.FileHash) } // bsc >= osc,如果osc是MaxFloat64,那么bsc也一定是,也就意味着没有足够块来恢复文件 @@ -136,11 +172,11 @@ func (iter *DownloadObjectIterator) downloadNoneOrRepObject(coorCli *coormq.Clie return nil, fmt.Errorf("no node has this object") } - return downloadFile(ctx, *node, obj.Object.FileHash) + return downloadFile(iter.downloadCtx, *node, obj.Object.FileHash) } -func (iter *DownloadObjectIterator) downloadECObject(coorCli *coormq.Client, ctx *DownloadContext, obj stgmodels.ObjectDetail, ecRed *cdssdk.ECRedundancy) (io.ReadCloser, error) { - allNodes, err := iter.sortDownloadNodes(coorCli, ctx, obj) +func (iter *DownloadObjectIterator) downloadECObject(obj stgmodels.ObjectDetail, ecRed *cdssdk.ECRedundancy) (io.ReadCloser, error) { + allNodes, err := iter.sortDownloadNodes(obj) if err != nil { return nil, err } @@ -155,7 +191,7 @@ func (iter *DownloadObjectIterator) downloadECObject(coorCli *coormq.Client, ctx } for i, b := range blocks { - str, err := downloadFile(ctx, b.Node, b.Block.FileHash) + str, err := downloadFile(iter.downloadCtx, b.Node, b.Block.FileHash) if err != nil { for i -= 1; i >= 0; i-- { fileStrs[i].Close() @@ -185,10 +221,10 @@ func (iter *DownloadObjectIterator) downloadECObject(coorCli *coormq.Client, ctx return nil, fmt.Errorf("no enough blocks to reconstruct the file, want %d, get only %d", ecRed.K, len(blocks)) } - return downloadFile(ctx, *node, obj.Object.FileHash) + return downloadFile(iter.downloadCtx, *node, obj.Object.FileHash) } -func (iter *DownloadObjectIterator) sortDownloadNodes(coorCli *coormq.Client, ctx *DownloadContext, obj stgmodels.ObjectDetail) ([]*DownloadNodeInfo, error) { +func (iter *DownloadObjectIterator) sortDownloadNodes(obj stgmodels.ObjectDetail) ([]*DownloadNodeInfo, error) { var nodeIDs []cdssdk.NodeID for _, id := range obj.PinnedAt { if !lo.Contains(nodeIDs, id) { @@ -201,16 +237,11 @@ func (iter *DownloadObjectIterator) sortDownloadNodes(coorCli *coormq.Client, ct } } - getNodes, err := coorCli.GetNodes(coormq.NewGetNodes(nodeIDs)) - if err != nil { - return nil, fmt.Errorf("getting nodes: %w", err) - } - downloadNodeMap := make(map[cdssdk.NodeID]*DownloadNodeInfo) for _, id := range obj.PinnedAt { node, ok := downloadNodeMap[id] if !ok { - mod := *getNodes.GetNode(id) + mod := iter.allNodes[id] node = &DownloadNodeInfo{ Node: mod, ObjectPinned: true, @@ -225,7 +256,7 @@ func (iter *DownloadObjectIterator) sortDownloadNodes(coorCli *coormq.Client, ct for _, b := range obj.Blocks { node, ok := downloadNodeMap[b.NodeID] if !ok { - mod := *getNodes.GetNode(b.NodeID) + mod := iter.allNodes[b.NodeID] node = &DownloadNodeInfo{ Node: mod, Distance: iter.getNodeDistance(mod), diff --git a/common/pkgs/mq/agent/client.go b/common/pkgs/mq/agent/client.go index d984e47..016c0fe 100644 --- a/common/pkgs/mq/agent/client.go +++ b/common/pkgs/mq/agent/client.go @@ -1,6 +1,8 @@ package agent import ( + "sync" + "gitlink.org.cn/cloudream/common/pkgs/mq" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" stgmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq" @@ -33,18 +35,34 @@ type Pool interface { } type pool struct { - mqcfg *stgmq.Config + mqcfg *stgmq.Config + shareds map[cdssdk.NodeID]*Client + lock sync.Mutex } func NewPool(mqcfg *stgmq.Config) Pool { return &pool{ - mqcfg: mqcfg, + mqcfg: mqcfg, + shareds: make(map[cdssdk.NodeID]*Client), } } func (p *pool) Acquire(id cdssdk.NodeID) (*Client, error) { - return NewClient(id, p.mqcfg) + p.lock.Lock() + defer p.lock.Unlock() + + cli, ok := p.shareds[id] + if !ok { + var err error + cli, err = NewClient(id, p.mqcfg) + if err != nil { + return nil, err + } + p.shareds[id] = cli + } + + return cli, nil } func (p *pool) Release(cli *Client) { - cli.Close() + // TODO 定时关闭 } diff --git a/common/pkgs/mq/coordinator/client.go b/common/pkgs/mq/coordinator/client.go index 8d25532..84a4f28 100644 --- a/common/pkgs/mq/coordinator/client.go +++ b/common/pkgs/mq/coordinator/client.go @@ -1,6 +1,8 @@ package coordinator import ( + "sync" + "gitlink.org.cn/cloudream/common/pkgs/mq" stgmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq" ) @@ -30,7 +32,9 @@ type Pool interface { } type pool struct { - mqcfg *stgmq.Config + mqcfg *stgmq.Config + shared *Client + lock sync.Mutex } func NewPool(mqcfg *stgmq.Config) Pool { @@ -39,9 +43,18 @@ func NewPool(mqcfg *stgmq.Config) Pool { } } func (p *pool) Acquire() (*Client, error) { - return NewClient(p.mqcfg) + p.lock.Lock() + defer p.lock.Unlock() + if p.shared == nil { + var err error + p.shared, err = NewClient(p.mqcfg) + if err != nil { + return nil, err + } + } + + return p.shared, nil } func (p *pool) Release(cli *Client) { - cli.Close() } diff --git a/common/pkgs/mq/coordinator/object.go b/common/pkgs/mq/coordinator/object.go index 6c665d6..45b5b24 100644 --- a/common/pkgs/mq/coordinator/object.go +++ b/common/pkgs/mq/coordinator/object.go @@ -81,8 +81,8 @@ type ChangeObjectRedundancyResp struct { mq.MessageBodyBase } type ChangeObjectRedundancyEntry struct { - ObjectID cdssdk.ObjectID `json:"objectID"` - Redundancy cdssdk.Redundancy `json:"redundancy"` + ObjectID cdssdk.ObjectID `json:"objectID" db:"ObjectID"` + Redundancy cdssdk.Redundancy `json:"redundancy" db:"Redundancy"` PinnedAt []cdssdk.NodeID `json:"pinnedAt"` Blocks []stgmod.ObjectBlock `json:"blocks"` } diff --git a/common/pkgs/mq/scanner/client.go b/common/pkgs/mq/scanner/client.go index 8cb70c3..156a16d 100644 --- a/common/pkgs/mq/scanner/client.go +++ b/common/pkgs/mq/scanner/client.go @@ -1,6 +1,8 @@ package scanner import ( + "sync" + "gitlink.org.cn/cloudream/common/pkgs/mq" stgmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq" ) @@ -30,7 +32,9 @@ type Pool interface { } type pool struct { - mqcfg *stgmq.Config + mqcfg *stgmq.Config + shared *Client + lock sync.Mutex } func NewPool(mqcfg *stgmq.Config) Pool { @@ -39,9 +43,18 @@ func NewPool(mqcfg *stgmq.Config) Pool { } } func (p *pool) Acquire() (*Client, error) { - return NewClient(p.mqcfg) + p.lock.Lock() + defer p.lock.Unlock() + if p.shared == nil { + var err error + p.shared, err = NewClient(p.mqcfg) + if err != nil { + return nil, err + } + } + + return p.shared, nil } func (p *pool) Release(cli *Client) { - cli.Close() } diff --git a/scanner/internal/event/agent_check_cache.go b/scanner/internal/event/agent_check_cache.go index 6f9084e..7f4cd45 100644 --- a/scanner/internal/event/agent_check_cache.go +++ b/scanner/internal/event/agent_check_cache.go @@ -105,7 +105,7 @@ func (t *AgentCheckCache) checkCache(execCtx ExecuteContext, tx *sqlx.Tx, realFi } if len(realFileHashesCp) > 0 { - err = execCtx.Args.DB.Cache().BatchCreate(tx, lo.Keys(realFileHashesCp), t.NodeID, 0) + err = execCtx.Args.DB.Cache().BatchCreateOnSameNode(tx, lo.Keys(realFileHashesCp), t.NodeID, 0) if err != nil { log.Warnf("batch create node caches: %w", err) return