From 8f2e26b18fce02e65706373d1a675b220b70ec4e Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Wed, 6 Dec 2023 16:23:50 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E8=B0=83=E8=AF=95=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/internal/cmdline/package.go | 10 +++--- ...create_ec_package.go => create_package.go} | 0 ...update_ec_package.go => update_package.go} | 0 common/assets/scripts/create_database.sql | 4 +-- ...create_ec_package.go => create_package.go} | 0 ...update_ec_package.go => update_package.go} | 0 common/pkgs/db/cache.go | 4 +-- common/pkgs/db/model/model.go | 35 +++++++++++++++++++ common/pkgs/db/object.go | 11 +++--- common/pkgs/db/object_block.go | 10 +++--- coordinator/internal/services/object.go | 4 +-- coordinator/internal/services/package.go | 2 +- 12 files changed, 59 insertions(+), 21 deletions(-) rename client/internal/task/{create_ec_package.go => create_package.go} (100%) rename client/internal/task/{update_ec_package.go => update_package.go} (100%) rename common/pkgs/cmd/{create_ec_package.go => create_package.go} (100%) rename common/pkgs/cmd/{update_ec_package.go => update_package.go} (100%) diff --git a/client/internal/cmdline/package.go b/client/internal/cmdline/package.go index b39ecb6..b0f9bc2 100644 --- a/client/internal/cmdline/package.go +++ b/client/internal/cmdline/package.go @@ -33,7 +33,7 @@ func PackageListBucketPackages(ctx CommandContext, bucketID cdssdk.BucketID) err return nil } -func PackageDownloadPackage(ctx CommandContext, outputDir string, packageID cdssdk.PackageID) error { +func PackageDownloadPackage(ctx CommandContext, packageID cdssdk.PackageID, outputDir string) error { err := os.MkdirAll(outputDir, os.ModePerm) if err != nil { return fmt.Errorf("create output directory %s failed, err: %w", outputDir, err) @@ -86,7 +86,7 @@ func PackageDownloadPackage(ctx CommandContext, outputDir string, packageID cdss return nil } -func PackageCreatePackage(ctx CommandContext, rootPath string, bucketID cdssdk.BucketID, name string, nodeAffinity []cdssdk.NodeID) error { +func PackageCreatePackage(ctx CommandContext, name string, rootPath string, bucketID cdssdk.BucketID, nodeAffinity []cdssdk.NodeID) error { rootPath = filepath.Clean(rootPath) var uploadFilePathes []string @@ -194,7 +194,8 @@ func PackageDeletePackage(ctx CommandContext, packageID cdssdk.PackageID) error return nil } -func PackageGetCachedNodes(ctx CommandContext, packageID cdssdk.PackageID, userID cdssdk.UserID) error { +func PackageGetCachedNodes(ctx CommandContext, packageID cdssdk.PackageID) error { + userID := cdssdk.UserID(0) resp, err := ctx.Cmdline.Svc.PackageSvc().GetCachedNodes(userID, packageID) fmt.Printf("resp: %v\n", resp) if err != nil { @@ -203,7 +204,8 @@ func PackageGetCachedNodes(ctx CommandContext, packageID cdssdk.PackageID, userI return nil } -func PackageGetLoadedNodes(ctx CommandContext, packageID cdssdk.PackageID, userID cdssdk.UserID) error { +func PackageGetLoadedNodes(ctx CommandContext, packageID cdssdk.PackageID) error { + userID := cdssdk.UserID(0) nodeIDs, err := ctx.Cmdline.Svc.PackageSvc().GetLoadedNodes(userID, packageID) fmt.Printf("nodeIDs: %v\n", nodeIDs) if err != nil { diff --git a/client/internal/task/create_ec_package.go b/client/internal/task/create_package.go similarity index 100% rename from client/internal/task/create_ec_package.go rename to client/internal/task/create_package.go diff --git a/client/internal/task/update_ec_package.go b/client/internal/task/update_package.go similarity index 100% rename from client/internal/task/update_ec_package.go rename to client/internal/task/update_package.go diff --git a/common/assets/scripts/create_database.sql b/common/assets/scripts/create_database.sql index 35f25cb..3bfcef2 100644 --- a/common/assets/scripts/create_database.sql +++ b/common/assets/scripts/create_database.sql @@ -112,8 +112,7 @@ create table Package ( PackageID int not null auto_increment primary key comment '包ID', Name varchar(100) not null comment '对象名', BucketID int not null comment '桶ID', - State varchar(100) not null comment '状态', - Redundancy JSON not null comment '冗余策略' + State varchar(100) not null comment '状态' ); create table Object ( @@ -122,6 +121,7 @@ create table Object ( Path varchar(500) not null comment '对象路径', Size bigint not null comment '对象大小(Byte)', FileHash varchar(100) not null comment '完整对象的FileHash', + Redundancy JSON not null comment '冗余策略', UNIQUE KEY PackagePath (PackageID, Path) ) comment = '对象表'; diff --git a/common/pkgs/cmd/create_ec_package.go b/common/pkgs/cmd/create_package.go similarity index 100% rename from common/pkgs/cmd/create_ec_package.go rename to common/pkgs/cmd/create_package.go diff --git a/common/pkgs/cmd/update_ec_package.go b/common/pkgs/cmd/update_package.go similarity index 100% rename from common/pkgs/cmd/update_ec_package.go rename to common/pkgs/cmd/update_package.go diff --git a/common/pkgs/db/cache.go b/common/pkgs/db/cache.go index 1c7301d..25ba742 100644 --- a/common/pkgs/db/cache.go +++ b/common/pkgs/db/cache.go @@ -37,7 +37,7 @@ func (*CacheDB) GetNodeCaches(ctx SQLContext, nodeID cdssdk.NodeID) ([]model.Cac // CreateNew 创建一条新的缓存记录 func (*CacheDB) CreateNew(ctx SQLContext, fileHash string, nodeID cdssdk.NodeID) error { - _, err := ctx.Exec("insert into Cache values(?,?,?,?,?)", fileHash, nodeID, consts.CacheStatePinned, nil, time.Now()) + _, err := ctx.Exec("insert into Cache values(?,?,?,?,?,?)", fileHash, nodeID, consts.CacheStatePinned, nil, time.Now(), 0) if err != nil { return err } @@ -91,7 +91,7 @@ func (*CacheDB) BatchCreatePinned(ctx SQLContext, fileHashes []string, nodeID cd // Create 创建一条Temp状态的缓存记录,如果已存在则不产生效果 func (*CacheDB) CreateTemp(ctx SQLContext, fileHash string, nodeID cdssdk.NodeID) error { - _, err := ctx.Exec("insert ignore into Cache values(?,?,?,?,?)", fileHash, nodeID, nil, consts.CacheStateTemp, time.Now()) + _, err := ctx.Exec("insert ignore into Cache values(?,?,?,?,?,?)", fileHash, nodeID, consts.CacheStateTemp, nil, time.Now(), 0) return err } diff --git a/common/pkgs/db/model/model.go b/common/pkgs/db/model/model.go index 1bf8cc0..a671052 100644 --- a/common/pkgs/db/model/model.go +++ b/common/pkgs/db/model/model.go @@ -1,9 +1,12 @@ package model import ( + "fmt" + "reflect" "time" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/utils/serder" stgmod "gitlink.org.cn/cloudream/storage/common/models" ) @@ -65,6 +68,38 @@ type Package = cdssdk.Package type Object = cdssdk.Object +// 由于Object的Redundancy字段是interface,所以不能直接将查询结果scan成Object,必须先scan成TempObject, +// 再.ToObject()转成Object +type TempObject struct { + cdssdk.Object + Redundancy RedundancyWarpper `db:"Redundancy"` +} + +func (o *TempObject) ToObject() cdssdk.Object { + obj := o.Object + obj.Redundancy = o.Redundancy.Value + return obj +} + +type RedundancyWarpper struct { + Value cdssdk.Redundancy +} + +func (o *RedundancyWarpper) Scan(src interface{}) error { + data, ok := src.([]uint8) + if !ok { + return fmt.Errorf("unknow src type: %v", reflect.TypeOf(data)) + } + + red, err := serder.JSONToObjectEx[cdssdk.Redundancy](data) + if err != nil { + return err + } + + o.Value = red + return nil +} + type ObjectBlock = stgmod.ObjectBlock type Cache struct { diff --git a/common/pkgs/db/object.go b/common/pkgs/db/object.go index 0fbeb23..6020ceb 100644 --- a/common/pkgs/db/object.go +++ b/common/pkgs/db/object.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/jmoiron/sqlx" + "github.com/samber/lo" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" @@ -18,9 +19,9 @@ func (db *DB) Object() *ObjectDB { } func (db *ObjectDB) GetByID(ctx SQLContext, objectID cdssdk.ObjectID) (model.Object, error) { - var ret model.Object + var ret model.TempObject err := sqlx.Get(ctx, &ret, "select * from Object where ObjectID = ?", objectID) - return ret, err + return ret.ToObject(), err } func (db *ObjectDB) Create(ctx SQLContext, packageID cdssdk.PackageID, path string, size int64, fileHash string, redundancy cdssdk.Redundancy) (int64, error) { @@ -88,9 +89,9 @@ func (*ObjectDB) UpdateFileInfo(ctx SQLContext, objectID cdssdk.ObjectID, fileSi } func (*ObjectDB) GetPackageObjects(ctx SQLContext, packageID cdssdk.PackageID) ([]model.Object, error) { - var ret []model.Object + var ret []model.TempObject err := sqlx.Select(ctx, &ret, "select * from Object where PackageID = ? order by ObjectID asc", packageID) - return ret, err + return lo.Map(ret, func(o model.TempObject, idx int) model.Object { return o.ToObject() }), err } func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, objs []coormq.AddObjectInfo) ([]cdssdk.ObjectID, error) { @@ -112,7 +113,7 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, objs [] } // 首次上传默认使用不分块的rep模式 - err = db.ObjectBlock().Create(ctx, objID, 0, obj.FileHash, obj.NodeID) + err = db.ObjectBlock().Create(ctx, objID, 0, obj.NodeID, obj.FileHash) if err != nil { return nil, fmt.Errorf("creating object block: %w", err) } diff --git a/common/pkgs/db/object_block.go b/common/pkgs/db/object_block.go index 5473b0d..77d3deb 100644 --- a/common/pkgs/db/object_block.go +++ b/common/pkgs/db/object_block.go @@ -20,8 +20,8 @@ func (db *DB) ObjectBlock() *ObjectBlockDB { return &ObjectBlockDB{DB: db} } -func (db *ObjectBlockDB) Create(ctx SQLContext, objectID cdssdk.ObjectID, index int, fileHash string, nodeID cdssdk.NodeID) error { - _, err := ctx.Exec("insert into ObjectBlock values(?,?,?,?)", objectID, index, fileHash, nodeID) +func (db *ObjectBlockDB) Create(ctx SQLContext, objectID cdssdk.ObjectID, index int, nodeID cdssdk.NodeID, fileHash string) error { + _, err := ctx.Exec("insert into ObjectBlock values(?,?,?,?)", objectID, index, nodeID, fileHash) return err } @@ -50,10 +50,10 @@ func (db *ObjectBlockDB) CountBlockWithHash(ctx SQLContext, fileHash string) (in } func (db *ObjectBlockDB) GetPackageBlockDetails(ctx SQLContext, packageID cdssdk.PackageID) ([]stgmod.ObjectDetail, error) { - var objs []model.Object + var objs []model.TempObject err := sqlx.Select(ctx, &objs, "select * from Object where PackageID = ? order by ObjectID asc", packageID) if err != nil { - return nil, fmt.Errorf("query objectIDs: %w", err) + return nil, fmt.Errorf("getting objects: %w", err) } rets := make([]stgmod.ObjectDetail, 0, len(objs)) @@ -101,7 +101,7 @@ func (db *ObjectBlockDB) GetPackageBlockDetails(ctx SQLContext, packageID cdssdk blocks = append(blocks, block) } - rets = append(rets, stgmod.NewObjectDetail(obj, cachedObjectNodeIDs, blocks)) + rets = append(rets, stgmod.NewObjectDetail(obj.ToObject(), cachedObjectNodeIDs, blocks)) } return rets, nil diff --git a/coordinator/internal/services/object.go b/coordinator/internal/services/object.go index 9b87316..f3f9674 100644 --- a/coordinator/internal/services/object.go +++ b/coordinator/internal/services/object.go @@ -24,9 +24,9 @@ func (svc *Service) GetPackageObjectDetails(msg *coormq.GetPackageObjectDetails) data, err := svc.db.ObjectBlock().GetPackageBlockDetails(svc.db.SQLCtx(), msg.PackageID) if err != nil { logger.WithField("PackageID", msg.PackageID). - Warnf("query object ec and node id in package: %s", err.Error()) + Warnf("getting package block details: %s", err.Error()) - return nil, mq.Failed(errorcode.OperationFailed, "query object ec and node id in package failed") + return nil, mq.Failed(errorcode.OperationFailed, "get package object block details failed") } return mq.ReplyOK(coormq.NewGetPackageObjectDetailsResp(data)) diff --git a/coordinator/internal/services/package.go b/coordinator/internal/services/package.go index bc9724a..baa7b10 100644 --- a/coordinator/internal/services/package.go +++ b/coordinator/internal/services/package.go @@ -73,7 +73,7 @@ func (svc *Service) UpdateECPackage(msg *coormq.UpdatePackage) (*coormq.UpdatePa }) if err != nil { logger.Warn(err.Error()) - return nil, mq.Failed(errorcode.OperationFailed, "update ec package failed") + return nil, mq.Failed(errorcode.OperationFailed, "update package failed") } return mq.ReplyOK(coormq.NewUpdatePackageResp())