From 4b88628c1076ec60f64d666aa1a48afd99a15894 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Fri, 20 Dec 2024 16:18:22 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=A4=8D=E5=88=B6Package?= =?UTF-8?q?=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/internal/http/package.go | 22 ++++++++ client/internal/http/server.go | 1 + client/internal/services/package.go | 15 ++++++ common/pkgs/db2/object_block.go | 10 ++++ common/pkgs/db2/package.go | 10 ++-- common/pkgs/mq/coordinator/package.go | 35 +++++++++++++ coordinator/internal/mq/package.go | 74 ++++++++++++++++++++++++--- 7 files changed, 156 insertions(+), 11 deletions(-) diff --git a/client/internal/http/package.go b/client/internal/http/package.go index 5b9aaf0..a088a51 100644 --- a/client/internal/http/package.go +++ b/client/internal/http/package.go @@ -172,6 +172,28 @@ func (s *PackageService) Delete(ctx *gin.Context) { ctx.JSON(http.StatusOK, OK(nil)) } +func (s *PackageService) Clone(ctx *gin.Context) { + log := logger.WithField("HTTP", "Package.Clone") + + var req cdsapi.PackageClone + if err := ctx.ShouldBindJSON(&req); err != nil { + log.Warnf("binding body: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + pkg, err := s.svc.PackageSvc().Clone(req.UserID, req.PackageID, req.BucketID, req.Name) + if err != nil { + log.Warnf("cloning package: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "clone package failed")) + return + } + + ctx.JSON(http.StatusOK, OK(cdsapi.PackageCloneResp{ + Package: pkg, + })) +} + func (s *PackageService) ListBucketPackages(ctx *gin.Context) { log := logger.WithField("HTTP", "Package.ListBucketPackages") diff --git a/client/internal/http/server.go b/client/internal/http/server.go index de73e52..c3abc00 100644 --- a/client/internal/http/server.go +++ b/client/internal/http/server.go @@ -59,6 +59,7 @@ func (s *Server) initRouters() { rt.POST(cdsapi.PackageCreatePath, s.Package().Create) rt.POST(cdsapi.PackageCreateLoadPath, s.Package().CreateLoad) rt.POST(cdsapi.PackageDeletePath, s.Package().Delete) + rt.POST(cdsapi.PackageClonePath, s.Package().Clone) rt.GET(cdsapi.PackageListBucketPackagesPath, s.Package().ListBucketPackages) rt.GET(cdsapi.PackageGetCachedStoragesPath, s.Package().GetCachedStorages) rt.GET(cdsapi.PackageGetLoadedStoragesPath, s.Package().GetLoadedStorages) diff --git a/client/internal/services/package.go b/client/internal/services/package.go index 4991565..6356a6a 100644 --- a/client/internal/services/package.go +++ b/client/internal/services/package.go @@ -106,6 +106,21 @@ func (svc *PackageService) DeletePackage(userID cdssdk.UserID, packageID cdssdk. return nil } +func (svc *PackageService) Clone(userID cdssdk.UserID, packageID cdssdk.PackageID, bucketID cdssdk.BucketID, name string) (cdssdk.Package, error) { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return cdssdk.Package{}, fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + resp, err := coorCli.ClonePackage(coormq.ReqClonePackage(userID, packageID, bucketID, name)) + if err != nil { + return cdssdk.Package{}, fmt.Errorf("cloning package: %w", err) + } + + return resp.Package, nil +} + // GetCachedStorages 获取指定包的缓存节点信息 func (svc *PackageService) GetCachedStorages(userID cdssdk.UserID, packageID cdssdk.PackageID) (cdssdk.PackageCachingInfo, error) { // 从协调器MQ池中获取客户端 diff --git a/common/pkgs/db2/object_block.go b/common/pkgs/db2/object_block.go index 6f52112..53550a6 100644 --- a/common/pkgs/db2/object_block.go +++ b/common/pkgs/db2/object_block.go @@ -33,6 +33,16 @@ func (db *ObjectBlockDB) BatchGetByObjectID(ctx SQLContext, objectIDs []cdssdk.O return blocks, err } +func (*ObjectBlockDB) GetInPackageID(ctx SQLContext, packageID cdssdk.PackageID) ([]stgmod.ObjectBlock, error) { + var rets []stgmod.ObjectBlock + err := ctx.Table("ObjectBlock"). + Joins("INNER JOIN Object ON ObjectBlock.ObjectID = Object.ObjectID"). + Where("Object.PackageID = ?", packageID). + Order("ObjectBlock.ObjectID, ObjectBlock.`Index` ASC"). + Find(&rets).Error + return rets, err +} + func (db *ObjectBlockDB) Create(ctx SQLContext, objectID cdssdk.ObjectID, index int, stgID cdssdk.StorageID, fileHash cdssdk.FileHash) error { block := stgmod.ObjectBlock{ObjectID: objectID, Index: index, StorageID: stgID, FileHash: fileHash} return ctx.Table("ObjectBlock").Create(&block).Error diff --git a/common/pkgs/db2/package.go b/common/pkgs/db2/package.go index 37a7012..8fa805e 100644 --- a/common/pkgs/db2/package.go +++ b/common/pkgs/db2/package.go @@ -110,7 +110,7 @@ func (*PackageDB) GetUserPackageByName(ctx SQLContext, userID cdssdk.UserID, buc return ret, err } -func (db *PackageDB) Create(ctx SQLContext, bucketID cdssdk.BucketID, name string) (cdssdk.PackageID, error) { +func (db *PackageDB) Create(ctx SQLContext, bucketID cdssdk.BucketID, name string) (cdssdk.Package, error) { var packageID int64 err := ctx.Table("Package"). Select("PackageID"). @@ -118,18 +118,18 @@ func (db *PackageDB) Create(ctx SQLContext, bucketID cdssdk.BucketID, name strin Scan(&packageID).Error if err != nil { - return 0, err + return cdssdk.Package{}, err } if packageID != 0 { - return 0, gorm.ErrDuplicatedKey + return cdssdk.Package{}, gorm.ErrDuplicatedKey } newPackage := cdssdk.Package{Name: name, BucketID: bucketID, State: cdssdk.PackageStateNormal} if err := ctx.Create(&newPackage).Error; err != nil { - return 0, fmt.Errorf("insert package failed, err: %w", err) + return cdssdk.Package{}, fmt.Errorf("insert package failed, err: %w", err) } - return newPackage.PackageID, nil + return newPackage, nil } // SoftDelete 设置一个对象被删除,并将相关数据删除 diff --git a/common/pkgs/mq/coordinator/package.go b/common/pkgs/mq/coordinator/package.go index 1ae60b7..86e363e 100644 --- a/common/pkgs/mq/coordinator/package.go +++ b/common/pkgs/mq/coordinator/package.go @@ -20,6 +20,8 @@ type PackageService interface { DeletePackage(msg *DeletePackage) (*DeletePackageResp, *mq.CodeMessage) + ClonePackage(msg *ClonePackage) (*ClonePackageResp, *mq.CodeMessage) + GetPackageCachedStorages(msg *GetPackageCachedStorages) (*GetPackageCachedStoragesResp, *mq.CodeMessage) GetPackageLoadedStorages(msg *GetPackageLoadedStorages) (*GetPackageLoadedStoragesResp, *mq.CodeMessage) @@ -186,6 +188,39 @@ func (client *Client) DeletePackage(msg *DeletePackage) (*DeletePackageResp, err return mq.Request(Service.DeletePackage, client.rabbitCli, msg) } +// 克隆Package +var _ = Register(Service.ClonePackage) + +type ClonePackage struct { + mq.MessageBodyBase + UserID cdssdk.UserID `json:"userID"` + PackageID cdssdk.PackageID `json:"packageID"` + BucketID cdssdk.BucketID `json:"bucketID"` + Name string `json:"name"` +} +type ClonePackageResp struct { + mq.MessageBodyBase + Package cdssdk.Package `json:"package"` +} + +func ReqClonePackage(userID cdssdk.UserID, packageID cdssdk.PackageID, bucketID cdssdk.BucketID, name string) *ClonePackage { + return &ClonePackage{ + UserID: userID, + PackageID: packageID, + BucketID: bucketID, + Name: name, + } +} +func RespClonePackage(pkg cdssdk.Package) *ClonePackageResp { + return &ClonePackageResp{ + Package: pkg, + } +} + +func (client *Client) ClonePackage(msg *ClonePackage) (*ClonePackageResp, error) { + return mq.Request(Service.ClonePackage, client.rabbitCli, msg) +} + // 根据PackageID获取object分布情况 var _ = Register(Service.GetPackageCachedStorages) diff --git a/coordinator/internal/mq/package.go b/coordinator/internal/mq/package.go index 6ad4ba6..7b398b2 100644 --- a/coordinator/internal/mq/package.go +++ b/coordinator/internal/mq/package.go @@ -5,6 +5,7 @@ import ( "fmt" "sort" + stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/db2" "gorm.io/gorm" @@ -59,16 +60,11 @@ func (svc *Service) CreatePackage(msg *coormq.CreatePackage) (*coormq.CreatePack return fmt.Errorf("bucket is not avaiable to the user") } - pkgID, err := svc.db2.Package().Create(tx, msg.BucketID, msg.Name) + pkg, err = svc.db2.Package().Create(tx, msg.BucketID, msg.Name) if err != nil { return fmt.Errorf("creating package: %w", err) } - pkg, err = svc.db2.Package().GetByID(tx, pkgID) - if err != nil { - return fmt.Errorf("getting package by id: %w", err) - } - return nil }) if err != nil { @@ -158,6 +154,72 @@ func (svc *Service) DeletePackage(msg *coormq.DeletePackage) (*coormq.DeletePack return mq.ReplyOK(coormq.NewDeletePackageResp()) } +func (svc *Service) ClonePackage(msg *coormq.ClonePackage) (*coormq.ClonePackageResp, *mq.CodeMessage) { + var pkg cdssdk.Package + err := svc.db2.DoTx(func(tx db2.SQLContext) error { + var err error + + isAvai, _ := svc.db2.Bucket().IsAvailable(tx, msg.BucketID, msg.UserID) + if !isAvai { + return fmt.Errorf("bucket is not avaiable to the user") + } + + pkg, err = svc.db2.Package().Create(tx, msg.BucketID, msg.Name) + if err != nil { + return fmt.Errorf("creating package: %w", err) + } + + objs, err := svc.db2.Object().GetPackageObjects(tx, msg.PackageID) + if err != nil { + return fmt.Errorf("getting package objects: %w", err) + } + + objBlks, err := svc.db2.ObjectBlock().GetInPackageID(tx, msg.PackageID) + if err != nil { + return fmt.Errorf("getting object blocks: %w", err) + } + + clonedObjs := make([]cdssdk.Object, len(objs)) + for i, obj := range objs { + clonedObjs[i] = obj + clonedObjs[i].ObjectID = 0 + clonedObjs[i].PackageID = pkg.PackageID + } + + err = svc.db2.Object().BatchCreate(tx, &clonedObjs) + if err != nil { + return fmt.Errorf("batch creating objects: %w", err) + } + + oldToNew := make(map[cdssdk.ObjectID]cdssdk.ObjectID) + for i, obj := range clonedObjs { + oldToNew[objs[i].ObjectID] = obj.ObjectID + } + + clonedBlks := make([]stgmod.ObjectBlock, len(objBlks)) + for i, blk := range objBlks { + clonedBlks[i] = blk + clonedBlks[i].ObjectID = oldToNew[blk.ObjectID] + } + + err = svc.db2.ObjectBlock().BatchCreate(tx, clonedBlks) + if err != nil { + return fmt.Errorf("batch creating object blocks: %w", err) + } + + return nil + }) + if err != nil { + if errors.Is(err, gorm.ErrDuplicatedKey) { + return nil, mq.Failed(errorcode.DataExists, "package already exists") + } + + return nil, mq.Failed(errorcode.OperationFailed, err.Error()) + } + + return mq.ReplyOK(coormq.RespClonePackage(pkg)) +} + func (svc *Service) GetPackageCachedStorages(msg *coormq.GetPackageCachedStorages) (*coormq.GetPackageCachedStoragesResp, *mq.CodeMessage) { isAva, err := svc.db2.Package().IsAvailable(svc.db2.DefCtx(), msg.UserID, msg.PackageID) if err != nil {