From 33b1a4ea2daa612156a5188adbbf793afa454e37 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Tue, 9 Apr 2024 16:54:41 +0800 Subject: [PATCH] =?UTF-8?q?cds=E6=94=AF=E6=8C=81rclone=E6=8C=82=E8=BD=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- agent/internal/task/create_package.go | 4 +- client/internal/http/bucket.go | 30 +++++++-- client/internal/http/object.go | 67 +++++++++---------- client/internal/http/package.go | 38 +++++++++-- client/internal/http/server.go | 2 + client/internal/services/bucket.go | 23 +++++-- client/internal/services/object.go | 5 +- client/internal/services/package.go | 26 +++++-- common/pkgs/cmd/upload_objects.go | 24 +++++-- common/pkgs/db/bucket.go | 15 +++++ common/pkgs/db/object.go | 4 +- common/pkgs/db/package.go | 14 ++++ .../pkgs/iterator/download_object_iterator.go | 3 +- common/pkgs/mq/coordinator/bucket.go | 36 +++++++++- common/pkgs/mq/coordinator/package.go | 45 +++++++++++-- coordinator/internal/mq/bucket.go | 22 +++++- coordinator/internal/mq/package.go | 32 +++++++-- 17 files changed, 306 insertions(+), 84 deletions(-) diff --git a/agent/internal/task/create_package.go b/agent/internal/task/create_package.go index 1ddd848..9bf7684 100644 --- a/agent/internal/task/create_package.go +++ b/agent/internal/task/create_package.go @@ -63,7 +63,7 @@ func (t *CreatePackage) Execute(task *task.Task[TaskContext], ctx TaskContext, c return } - uploadRet, err := cmd.NewUploadObjects(t.userID, createResp.PackageID, t.objIter, t.nodeAffinity).Execute(&cmd.UploadObjectsContext{ + uploadRet, err := cmd.NewUploadObjects(t.userID, createResp.Package.PackageID, t.objIter, t.nodeAffinity).Execute(&cmd.UploadObjectsContext{ Distlock: ctx.distlock, Connectivity: ctx.connectivity, }) @@ -76,7 +76,7 @@ func (t *CreatePackage) Execute(task *task.Task[TaskContext], ctx TaskContext, c return } - t.Result.PackageID = createResp.PackageID + t.Result.PackageID = createResp.Package.PackageID t.Result.Objects = uploadRet.Objects complete(nil, CompleteOption{ diff --git a/client/internal/http/bucket.go b/client/internal/http/bucket.go index 7dcfe50..515bc20 100644 --- a/client/internal/http/bucket.go +++ b/client/internal/http/bucket.go @@ -19,17 +19,39 @@ func (s *Server) Bucket() *BucketService { } } +func (s *BucketService) GetByName(ctx *gin.Context) { + log := logger.WithField("HTTP", "Bucket.GetByName") + + var req cdssdk.BucketGetByName + if err := ctx.ShouldBindQuery(&req); err != nil { + log.Warnf("binding query: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + bucket, err := s.svc.BucketSvc().GetBucketByName(req.UserID, req.Name) + if err != nil { + log.Warnf("getting bucket by name: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get bucket by name failed")) + return + } + + ctx.JSON(http.StatusOK, OK(cdssdk.BucketGetByNameResp{ + Bucket: bucket, + })) +} + func (s *BucketService) Create(ctx *gin.Context) { log := logger.WithField("HTTP", "Bucket.Create") - var req cdssdk.BucketCreateReq + var req cdssdk.BucketCreate if err := ctx.ShouldBindJSON(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) return } - bucketID, err := s.svc.BucketSvc().CreateBucket(req.UserID, req.Name) + bucket, err := s.svc.BucketSvc().CreateBucket(req.UserID, req.Name) if err != nil { log.Warnf("creating bucket: %s", err.Error()) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "create bucket failed")) @@ -37,14 +59,14 @@ func (s *BucketService) Create(ctx *gin.Context) { } ctx.JSON(http.StatusOK, OK(cdssdk.BucketCreateResp{ - BucketID: bucketID, + Bucket: bucket, })) } func (s *BucketService) Delete(ctx *gin.Context) { log := logger.WithField("HTTP", "Bucket.Delete") - var req cdssdk.BucketDeleteReq + var req cdssdk.BucketDelete if err := ctx.ShouldBindJSON(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) diff --git a/client/internal/http/object.go b/client/internal/http/object.go index ae2eb6d..9c1428e 100644 --- a/client/internal/http/object.go +++ b/client/internal/http/object.go @@ -1,16 +1,18 @@ package http import ( + "fmt" "io" "mime/multipart" "net/http" + "path" "time" "github.com/gin-gonic/gin" "gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - myio "gitlink.org.cn/cloudream/common/utils/io" + myhttp "gitlink.org.cn/cloudream/common/utils/http" ) type ObjectService struct { @@ -51,7 +53,7 @@ func (s *ObjectService) Upload(ctx *gin.Context) { } for { - complete, _, err := s.svc.ObjectSvc().WaitUploading(taskID, time.Second*5) + complete, objs, err := s.svc.ObjectSvc().WaitUploading(taskID, time.Second*5) if complete { if err != nil { log.Warnf("uploading object: %s", err.Error()) @@ -59,7 +61,20 @@ func (s *ObjectService) Upload(ctx *gin.Context) { return } - ctx.JSON(http.StatusOK, OK(nil)) + uploadeds := make([]cdssdk.UploadedObject, len(objs.Objects)) + for i, obj := range objs.Objects { + err := "" + if obj.Error != nil { + err = obj.Error.Error() + } + o := obj.Object + uploadeds[i] = cdssdk.UploadedObject{ + Object: &o, + Error: err, + } + } + + ctx.JSON(http.StatusOK, OK(cdssdk.ObjectUploadResp{Uploadeds: uploadeds})) return } @@ -74,7 +89,7 @@ func (s *ObjectService) Upload(ctx *gin.Context) { func (s *ObjectService) Download(ctx *gin.Context) { log := logger.WithField("HTTP", "Object.Download") - var req cdssdk.ObjectDownloadReq + var req cdssdk.ObjectDownload if err := ctx.ShouldBindQuery(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) @@ -88,41 +103,25 @@ func (s *ObjectService) Download(ctx *gin.Context) { return } - ctx.Writer.WriteHeader(http.StatusOK) - // TODO 需要设置FileName - ctx.Header("Content-Disposition", "attachment; filename=filename") - ctx.Header("Content-Type", "application/octet-stream") - - buf := make([]byte, 4096) - ctx.Stream(func(w io.Writer) bool { - rd, err := file.Read(buf) - if err == io.EOF { - err = myio.WriteAll(w, buf[:rd]) - if err != nil { - log.Warnf("writing data to response: %s", err.Error()) - } - return false - } + mw := multipart.NewWriter(ctx.Writer) + defer mw.Close() - if err != nil { - log.Warnf("reading file data: %s", err.Error()) - return false - } + ctx.Writer.Header().Set("Content-Type", fmt.Sprintf("%s;boundary=%s", myhttp.ContentTypeMultiPart, mw.Boundary())) + ctx.Writer.WriteHeader(http.StatusOK) - err = myio.WriteAll(w, buf[:rd]) - if err != nil { - log.Warnf("writing data to response: %s", err.Error()) - return false - } + fw, err := mw.CreateFormFile("file", path.Base(file.Object.Path)) + if err != nil { + log.Warnf("creating form file: %s", err.Error()) + return + } - return true - }) + io.Copy(fw, file.File) } func (s *ObjectService) UpdateInfo(ctx *gin.Context) { log := logger.WithField("HTTP", "Object.UpdateInfo") - var req cdssdk.ObjectUpdateInfoReq + var req cdssdk.ObjectUpdateInfo if err := ctx.ShouldBindJSON(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) @@ -142,7 +141,7 @@ func (s *ObjectService) UpdateInfo(ctx *gin.Context) { func (s *ObjectService) Move(ctx *gin.Context) { log := logger.WithField("HTTP", "Object.Move") - var req cdssdk.ObjectMoveReq + var req cdssdk.ObjectMove if err := ctx.ShouldBindJSON(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) @@ -162,7 +161,7 @@ func (s *ObjectService) Move(ctx *gin.Context) { func (s *ObjectService) Delete(ctx *gin.Context) { log := logger.WithField("HTTP", "Object.Delete") - var req cdssdk.ObjectDeleteReq + var req cdssdk.ObjectDelete if err := ctx.ShouldBindJSON(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) @@ -182,7 +181,7 @@ func (s *ObjectService) Delete(ctx *gin.Context) { func (s *ObjectService) GetPackageObjects(ctx *gin.Context) { log := logger.WithField("HTTP", "Object.GetPackageObjects") - var req cdssdk.ObjectGetPackageObjectsReq + var req cdssdk.ObjectGetPackageObjects if err := ctx.ShouldBindQuery(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) diff --git a/client/internal/http/package.go b/client/internal/http/package.go index c18559d..b549dd5 100644 --- a/client/internal/http/package.go +++ b/client/internal/http/package.go @@ -3,6 +3,7 @@ package http import ( "mime/multipart" "net/http" + "net/url" "github.com/gin-gonic/gin" "gitlink.org.cn/cloudream/common/consts/errorcode" @@ -43,16 +44,36 @@ func (s *PackageService) Get(ctx *gin.Context) { ctx.JSON(http.StatusOK, OK(cdssdk.PackageGetResp{Package: *pkg})) } +func (s *PackageService) GetByName(ctx *gin.Context) { + log := logger.WithField("HTTP", "Package.GetByName") + + var req cdssdk.PackageGetByName + if err := ctx.ShouldBindQuery(&req); err != nil { + log.Warnf("binding query: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + pkg, err := s.svc.PackageSvc().GetByName(req.UserID, req.BucketName, req.PackageName) + if err != nil { + log.Warnf("getting package by name: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get package by name failed")) + return + } + + ctx.JSON(http.StatusOK, OK(cdssdk.PackageGetByNameResp{Package: *pkg})) +} + func (s *PackageService) Create(ctx *gin.Context) { log := logger.WithField("HTTP", "Package.Create") - var req cdssdk.PackageCreateReq + var req cdssdk.PackageCreate if err := ctx.ShouldBindJSON(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) return } - pkgID, err := s.svc.PackageSvc().Create(req.UserID, req.BucketID, req.Name) + pkg, err := s.svc.PackageSvc().Create(req.UserID, req.BucketID, req.Name) if err != nil { log.Warnf("creating package: %s", err.Error()) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "create package failed")) @@ -60,14 +81,14 @@ func (s *PackageService) Create(ctx *gin.Context) { } ctx.JSON(http.StatusOK, OK(cdssdk.PackageCreateResp{ - PackageID: pkgID, + Package: pkg, })) } func (s *PackageService) Delete(ctx *gin.Context) { log := logger.WithField("HTTP", "Package.Delete") - var req cdssdk.PackageDeleteReq + var req cdssdk.PackageDelete if err := ctx.ShouldBindJSON(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) @@ -87,7 +108,7 @@ func (s *PackageService) Delete(ctx *gin.Context) { func (s *PackageService) ListBucketPackages(ctx *gin.Context) { log := logger.WithField("HTTP", "Package.ListBucketPackages") - var req cdssdk.PackageListBucketPackagesReq + var req cdssdk.PackageListBucketPackages if err := ctx.ShouldBindQuery(&req); err != nil { log.Warnf("binding query: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) @@ -157,8 +178,13 @@ func mapMultiPartFileToUploadingObject(files []*multipart.FileHeader) stgiter.Up return nil, err } + fileName, err := url.PathUnescape(file.Filename) + if err != nil { + return nil, err + } + return &stgiter.IterUploadingObject{ - Path: file.Filename, + Path: fileName, Size: file.Size, File: stream, }, nil diff --git a/client/internal/http/server.go b/client/internal/http/server.go index d3c8fa9..44823ba 100644 --- a/client/internal/http/server.go +++ b/client/internal/http/server.go @@ -47,6 +47,7 @@ func (s *Server) initRouters() { s.engine.POST(cdssdk.ObjectDeletePath, s.Object().Delete) s.engine.GET(cdssdk.PackageGetPath, s.Package().Get) + s.engine.GET(cdssdk.PackageGetByNamePath, s.Package().GetByName) s.engine.POST(cdssdk.PackageCreatePath, s.Package().Create) s.engine.POST(cdssdk.PackageDeletePath, s.Package().Delete) s.engine.GET(cdssdk.PackageListBucketPackagesPath, s.Package().ListBucketPackages) @@ -59,6 +60,7 @@ func (s *Server) initRouters() { s.engine.POST(cdssdk.CacheMovePackagePath, s.Cache().MovePackage) + s.engine.GET(cdssdk.BucketGetByNamePath, s.Bucket().GetByName) s.engine.POST(cdssdk.BucketCreatePath, s.Bucket().Create) s.engine.POST(cdssdk.BucketDeletePath, s.Bucket().Delete) s.engine.GET(cdssdk.BucketListUserBucketsPath, s.Bucket().ListUserBuckets) diff --git a/client/internal/services/bucket.go b/client/internal/services/bucket.go index d402b54..57bc466 100644 --- a/client/internal/services/bucket.go +++ b/client/internal/services/bucket.go @@ -22,6 +22,21 @@ func (svc *BucketService) GetBucket(userID cdssdk.UserID, bucketID cdssdk.Bucket panic("not implement yet") } +func (svc *BucketService) GetBucketByName(userID cdssdk.UserID, bucketName string) (model.Bucket, error) { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return model.Bucket{}, fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + resp, err := coorCli.GetBucketByName(coormq.ReqGetBucketByName(userID, bucketName)) + if err != nil { + return model.Bucket{}, fmt.Errorf("get bucket by name failed, err: %w", err) + } + + return resp.Bucket, nil +} + func (svc *BucketService) GetUserBuckets(userID cdssdk.UserID) ([]model.Bucket, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { @@ -52,19 +67,19 @@ func (svc *BucketService) GetBucketPackages(userID cdssdk.UserID, bucketID cdssd return resp.Packages, nil } -func (svc *BucketService) CreateBucket(userID cdssdk.UserID, bucketName string) (cdssdk.BucketID, error) { +func (svc *BucketService) CreateBucket(userID cdssdk.UserID, bucketName string) (cdssdk.Bucket, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { - return 0, fmt.Errorf("new coordinator client: %w", err) + return cdssdk.Bucket{}, fmt.Errorf("new coordinator client: %w", err) } defer stgglb.CoordinatorMQPool.Release(coorCli) resp, err := coorCli.CreateBucket(coormq.NewCreateBucket(userID, bucketName)) if err != nil { - return 0, fmt.Errorf("creating bucket: %w", err) + return cdssdk.Bucket{}, fmt.Errorf("creating bucket: %w", err) } - return resp.BucketID, nil + return resp.Bucket, nil } func (svc *BucketService) DeleteBucket(userID cdssdk.UserID, bucketID cdssdk.BucketID) error { diff --git a/client/internal/services/object.go b/client/internal/services/object.go index 620ce35..b351d04 100644 --- a/client/internal/services/object.go +++ b/client/internal/services/object.go @@ -2,7 +2,6 @@ package services import ( "fmt" - "io" "time" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" @@ -66,7 +65,7 @@ func (svc *ObjectService) Move(userID cdssdk.UserID, movings []cdssdk.MovingObje return resp.Successes, nil } -func (svc *ObjectService) Download(userID cdssdk.UserID, objectID cdssdk.ObjectID) (io.ReadCloser, error) { +func (svc *ObjectService) Download(userID cdssdk.UserID, objectID cdssdk.ObjectID) (*iterator.IterDownloadingObject, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) @@ -92,7 +91,7 @@ func (svc *ObjectService) Download(userID cdssdk.UserID, objectID cdssdk.ObjectI return nil, err } - return downloading.File, nil + return downloading, nil } func (svc *ObjectService) Delete(userID cdssdk.UserID, objectIDs []cdssdk.ObjectID) error { diff --git a/client/internal/services/package.go b/client/internal/services/package.go index 405d53f..5b002ab 100644 --- a/client/internal/services/package.go +++ b/client/internal/services/package.go @@ -6,7 +6,6 @@ import ( cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" stgglb "gitlink.org.cn/cloudream/storage/common/globals" - "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) @@ -19,7 +18,7 @@ func (svc *Service) PackageSvc() *PackageService { return &PackageService{Service: svc} } -func (svc *PackageService) Get(userID cdssdk.UserID, packageID cdssdk.PackageID) (*model.Package, error) { +func (svc *PackageService) Get(userID cdssdk.UserID, packageID cdssdk.PackageID) (*cdssdk.Package, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) @@ -34,6 +33,21 @@ func (svc *PackageService) Get(userID cdssdk.UserID, packageID cdssdk.PackageID) return &getResp.Package, nil } +func (svc *PackageService) GetByName(userID cdssdk.UserID, bucketName string, packageName string) (*cdssdk.Package, error) { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + getResp, err := coorCli.GetPackageByName(coormq.ReqGetPackageByName(userID, bucketName, packageName)) + if err != nil { + return nil, fmt.Errorf("requsting to coodinator: %w", err) + } + + return &getResp.Package, nil +} + func (svc *PackageService) GetBucketPackages(userID cdssdk.UserID, bucketID cdssdk.BucketID) ([]cdssdk.Package, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { @@ -49,19 +63,19 @@ func (svc *PackageService) GetBucketPackages(userID cdssdk.UserID, bucketID cdss return getResp.Packages, nil } -func (svc *PackageService) Create(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string) (cdssdk.PackageID, error) { +func (svc *PackageService) Create(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string) (cdssdk.Package, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { - return 0, fmt.Errorf("new coordinator client: %w", err) + return cdssdk.Package{}, fmt.Errorf("new coordinator client: %w", err) } defer stgglb.CoordinatorMQPool.Release(coorCli) resp, err := coorCli.CreatePackage(coormq.NewCreatePackage(userID, bucketID, name)) if err != nil { - return 0, fmt.Errorf("creating package: %w", err) + return cdssdk.Package{}, fmt.Errorf("creating package: %w", err) } - return resp.PackageID, nil + return resp.Package, nil } func (svc *PackageService) DownloadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID) (iterator.DownloadingObjectIterator, error) { diff --git a/common/pkgs/cmd/upload_objects.go b/common/pkgs/cmd/upload_objects.go index 59a66bc..96c53d1 100644 --- a/common/pkgs/cmd/upload_objects.go +++ b/common/pkgs/cmd/upload_objects.go @@ -34,10 +34,9 @@ type UploadObjectsResult struct { } type ObjectUploadResult struct { - Info *iterator.IterUploadingObject - Error error - // TODO 这个字段没有被赋值 - ObjectID cdssdk.ObjectID + Info *iterator.IterUploadingObject + Error error + Object cdssdk.Object } type UploadNodeInfo struct { @@ -189,11 +188,26 @@ func uploadAndUpdatePackage(packageID cdssdk.PackageID, objectIter iterator.Uplo } } - _, err = coorCli.UpdatePackage(coormq.NewUpdatePackage(packageID, adds, nil)) + updateResp, err := coorCli.UpdatePackage(coormq.NewUpdatePackage(packageID, adds, nil)) if err != nil { return nil, fmt.Errorf("updating package: %w", err) } + updatedObjs := make(map[string]*cdssdk.Object) + for _, obj := range updateResp.Added { + o := obj + updatedObjs[obj.Path] = &o + } + + for i := range uploadRets { + obj := updatedObjs[uploadRets[i].Info.Path] + if obj == nil { + uploadRets[i].Error = fmt.Errorf("object %s not found in package", uploadRets[i].Info.Path) + continue + } + uploadRets[i].Object = *obj + } + return uploadRets, nil } diff --git a/common/pkgs/db/bucket.go b/common/pkgs/db/bucket.go index 5fdda6a..50d290a 100644 --- a/common/pkgs/db/bucket.go +++ b/common/pkgs/db/bucket.go @@ -18,6 +18,12 @@ func (db *DB) Bucket() *BucketDB { return &BucketDB{DB: db} } +func (db *BucketDB) GetByID(ctx SQLContext, bucketID cdssdk.BucketID) (cdssdk.Bucket, error) { + var ret cdssdk.Bucket + err := sqlx.Get(ctx, &ret, "select * from Bucket where BucketID = ?", bucketID) + return ret, err +} + // GetIDByName 根据BucketName查询BucketID func (db *BucketDB) GetIDByName(bucketName string) (int64, error) { //桶结构体 @@ -57,6 +63,15 @@ func (*BucketDB) GetUserBucket(ctx SQLContext, userID cdssdk.UserID, bucketID cd return ret, err } +func (*BucketDB) GetUserBucketByName(ctx SQLContext, userID cdssdk.UserID, bucketName string) (model.Bucket, error) { + var ret model.Bucket + err := sqlx.Get(ctx, &ret, + "select Bucket.* from UserBucket, Bucket where UserID = ? and"+ + " UserBucket.BucketID = Bucket.BucketID and"+ + " Bucket.Name = ?", userID, bucketName) + return ret, err +} + func (*BucketDB) GetUserBuckets(ctx SQLContext, userID cdssdk.UserID) ([]model.Bucket, error) { var ret []model.Bucket err := sqlx.Select(ctx, &ret, "select Bucket.* from UserBucket, Bucket where UserID = ? and UserBucket.BucketID = Bucket.BucketID", userID) diff --git a/common/pkgs/db/object.go b/common/pkgs/db/object.go index 7fd20ef..036620d 100644 --- a/common/pkgs/db/object.go +++ b/common/pkgs/db/object.go @@ -169,7 +169,7 @@ func (db *ObjectDB) GetPackageObjectDetails(ctx SQLContext, packageID cdssdk.Pac return rets, nil } -func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds []coormq.AddObjectEntry) ([]cdssdk.ObjectID, error) { +func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds []coormq.AddObjectEntry) ([]cdssdk.Object, error) { if len(adds) == 0 { return nil, nil } @@ -244,7 +244,7 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds [] return nil, fmt.Errorf("batch create caches: %w", err) } - return addedObjIDs, nil + return addedObjs, nil } func (db *ObjectDB) BatchUpdateRedundancy(ctx SQLContext, objs []coormq.UpdatingObjectRedundancy) error { diff --git a/common/pkgs/db/package.go b/common/pkgs/db/package.go index 4310ced..8c506ea 100644 --- a/common/pkgs/db/package.go +++ b/common/pkgs/db/package.go @@ -77,6 +77,20 @@ func (db *PackageDB) GetUserPackage(ctx SQLContext, userID cdssdk.UserID, packag return ret, err } +// 在指定名称的Bucket中查找指定名称的Package +func (*PackageDB) GetUserPackageByName(ctx SQLContext, userID cdssdk.UserID, bucketName string, packageName string) (cdssdk.Package, error) { + var ret model.Package + err := sqlx.Get(ctx, &ret, + "select Package.* from Package, Bucket, UserBucket where"+ + " Package.Name = ? and"+ + " Package.BucketID = Bucket.BucketID and"+ + " Bucket.Name = ? and"+ + " UserBucket.UserID = ? and"+ + " UserBucket.BucketID = Bucket.BucketID", + packageName, bucketName, userID) + return ret, err +} + func (db *PackageDB) Create(ctx SQLContext, bucketID cdssdk.BucketID, name string) (cdssdk.PackageID, error) { // 根据packagename和bucketid查询,若不存在则插入,若存在则返回错误 var packageID int64 diff --git a/common/pkgs/iterator/download_object_iterator.go b/common/pkgs/iterator/download_object_iterator.go index 792f9cc..cd58dc4 100644 --- a/common/pkgs/iterator/download_object_iterator.go +++ b/common/pkgs/iterator/download_object_iterator.go @@ -18,7 +18,6 @@ import ( stgglb "gitlink.org.cn/cloudream/storage/common/globals" stgmod "gitlink.org.cn/cloudream/storage/common/models" stgmodels "gitlink.org.cn/cloudream/storage/common/models" - "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock" "gitlink.org.cn/cloudream/storage/common/pkgs/ec" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" @@ -27,7 +26,7 @@ import ( type DownloadingObjectIterator = Iterator[*IterDownloadingObject] type IterDownloadingObject struct { - Object model.Object + Object cdssdk.Object File io.ReadCloser } diff --git a/common/pkgs/mq/coordinator/bucket.go b/common/pkgs/mq/coordinator/bucket.go index a129bac..2dc7c7a 100644 --- a/common/pkgs/mq/coordinator/bucket.go +++ b/common/pkgs/mq/coordinator/bucket.go @@ -7,6 +7,8 @@ import ( ) type BucketService interface { + GetBucketByName(msg *GetBucketByName) (*GetBucketByNameResp, *mq.CodeMessage) + GetUserBuckets(msg *GetUserBuckets) (*GetUserBucketsResp, *mq.CodeMessage) GetBucketPackages(msg *GetBucketPackages) (*GetBucketPackagesResp, *mq.CodeMessage) @@ -16,6 +18,34 @@ type BucketService interface { DeleteBucket(msg *DeleteBucket) (*DeleteBucketResp, *mq.CodeMessage) } +// 根据桶名获取桶 +var _ = Register(Service.GetBucketByName) + +type GetBucketByName struct { + mq.MessageBodyBase + UserID cdssdk.UserID `json:"userID"` + Name string `json:"name"` +} +type GetBucketByNameResp struct { + mq.MessageBodyBase + Bucket cdssdk.Bucket `json:"bucket"` +} + +func ReqGetBucketByName(userID cdssdk.UserID, name string) *GetBucketByName { + return &GetBucketByName{ + UserID: userID, + Name: name, + } +} +func RespGetBucketByName(bucket cdssdk.Bucket) *GetBucketByNameResp { + return &GetBucketByNameResp{ + Bucket: bucket, + } +} +func (client *Client) GetBucketByName(msg *GetBucketByName) (*GetBucketByNameResp, error) { + return mq.Request(Service.GetBucketByName, client.rabbitCli, msg) +} + // 获取用户所有的桶 var _ = Register(Service.GetUserBuckets) @@ -80,7 +110,7 @@ type CreateBucket struct { } type CreateBucketResp struct { mq.MessageBodyBase - BucketID cdssdk.BucketID `json:"bucketID"` + Bucket cdssdk.Bucket `json:"bucket"` } func NewCreateBucket(userID cdssdk.UserID, bucketName string) *CreateBucket { @@ -89,9 +119,9 @@ func NewCreateBucket(userID cdssdk.UserID, bucketName string) *CreateBucket { BucketName: bucketName, } } -func NewCreateBucketResp(bucketID cdssdk.BucketID) *CreateBucketResp { +func NewCreateBucketResp(bucket cdssdk.Bucket) *CreateBucketResp { return &CreateBucketResp{ - BucketID: bucketID, + Bucket: bucket, } } func (client *Client) CreateBucket(msg *CreateBucket) (*CreateBucketResp, error) { diff --git a/common/pkgs/mq/coordinator/package.go b/common/pkgs/mq/coordinator/package.go index 3d01e5d..d80cc37 100644 --- a/common/pkgs/mq/coordinator/package.go +++ b/common/pkgs/mq/coordinator/package.go @@ -12,6 +12,8 @@ import ( type PackageService interface { GetPackage(msg *GetPackage) (*GetPackageResp, *mq.CodeMessage) + GetPackageByName(msg *GetPackageByName) (*GetPackageByNameResp, *mq.CodeMessage) + CreatePackage(msg *CreatePackage) (*CreatePackageResp, *mq.CodeMessage) UpdatePackage(msg *UpdatePackage) (*UpdatePackageResp, *mq.CodeMessage) @@ -51,6 +53,36 @@ func (client *Client) GetPackage(msg *GetPackage) (*GetPackageResp, error) { return mq.Request(Service.GetPackage, client.rabbitCli, msg) } +// 根据名称获取Package +var _ = Register(Service.GetPackageByName) + +type GetPackageByName struct { + mq.MessageBodyBase + UserID cdssdk.UserID `json:"userID"` + BucketName string `json:"bucketName"` + PackageName string `json:"packageName"` +} +type GetPackageByNameResp struct { + mq.MessageBodyBase + Package cdssdk.Package `json:"package"` +} + +func ReqGetPackageByName(userID cdssdk.UserID, bucketName string, packageName string) *GetPackageByName { + return &GetPackageByName{ + UserID: userID, + BucketName: bucketName, + PackageName: packageName, + } +} +func NewGetPackageByNameResp(pkg cdssdk.Package) *GetPackageByNameResp { + return &GetPackageByNameResp{ + Package: pkg, + } +} +func (client *Client) GetPackageByName(msg *GetPackageByName) (*GetPackageByNameResp, error) { + return mq.Request(Service.GetPackageByName, client.rabbitCli, msg) +} + // 创建一个Package var _ = Register(Service.CreatePackage) @@ -62,7 +94,7 @@ type CreatePackage struct { } type CreatePackageResp struct { mq.MessageBodyBase - PackageID cdssdk.PackageID `json:"packageID"` + Package cdssdk.Package `json:"package"` } func NewCreatePackage(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string) *CreatePackage { @@ -72,9 +104,9 @@ func NewCreatePackage(userID cdssdk.UserID, bucketID cdssdk.BucketID, name strin Name: name, } } -func NewCreatePackageResp(packageID cdssdk.PackageID) *CreatePackageResp { +func NewCreatePackageResp(pkg cdssdk.Package) *CreatePackageResp { return &CreatePackageResp{ - PackageID: packageID, + Package: pkg, } } func (client *Client) CreatePackage(msg *CreatePackage) (*CreatePackageResp, error) { @@ -92,6 +124,7 @@ type UpdatePackage struct { } type UpdatePackageResp struct { mq.MessageBodyBase + Added []cdssdk.Object `json:"added"` } type AddObjectEntry struct { Path string `json:"path"` @@ -108,8 +141,10 @@ func NewUpdatePackage(packageID cdssdk.PackageID, adds []AddObjectEntry, deletes Deletes: deletes, } } -func NewUpdatePackageResp() *UpdatePackageResp { - return &UpdatePackageResp{} +func NewUpdatePackageResp(added []cdssdk.Object) *UpdatePackageResp { + return &UpdatePackageResp{ + Added: added, + } } func NewAddObjectEntry(path string, size int64, fileHash string, uploadTime time.Time, nodeID cdssdk.NodeID) AddObjectEntry { return AddObjectEntry{ diff --git a/coordinator/internal/mq/bucket.go b/coordinator/internal/mq/bucket.go index 78e5b41..e8278e9 100644 --- a/coordinator/internal/mq/bucket.go +++ b/coordinator/internal/mq/bucket.go @@ -18,6 +18,18 @@ func (svc *Service) GetBucket(userID cdssdk.UserID, bucketID cdssdk.BucketID) (m panic("not implement yet") } +func (svc *Service) GetBucketByName(msg *coormq.GetBucketByName) (*coormq.GetBucketByNameResp, *mq.CodeMessage) { + bucket, err := svc.db.Bucket().GetUserBucketByName(svc.db.SQLCtx(), msg.UserID, msg.Name) + if err != nil { + logger.WithField("UserID", msg.UserID). + WithField("Name", msg.Name). + Warnf("getting bucket by name: %s", err.Error()) + return nil, mq.Failed(errorcode.OperationFailed, "get bucket by name failed") + } + + return mq.ReplyOK(coormq.RespGetBucketByName(bucket)) +} + func (svc *Service) GetUserBuckets(msg *coormq.GetUserBuckets) (*coormq.GetUserBucketsResp, *mq.CodeMessage) { buckets, err := svc.db.Bucket().GetUserBuckets(svc.db.SQLCtx(), msg.UserID) @@ -44,18 +56,22 @@ func (svc *Service) GetBucketPackages(msg *coormq.GetBucketPackages) (*coormq.Ge } func (svc *Service) CreateBucket(msg *coormq.CreateBucket) (*coormq.CreateBucketResp, *mq.CodeMessage) { - var bucketID cdssdk.BucketID + var bucket cdssdk.Bucket err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { _, err := svc.db.User().GetByID(tx, msg.UserID) if err != nil { return fmt.Errorf("getting user by id: %w", err) } - bucketID, err = svc.db.Bucket().Create(tx, msg.UserID, msg.BucketName) + bucketID, err := svc.db.Bucket().Create(tx, msg.UserID, msg.BucketName) if err != nil { return fmt.Errorf("creating bucket: %w", err) } + bucket, err = svc.db.Bucket().GetByID(tx, bucketID) + if err != nil { + return fmt.Errorf("getting bucket by id: %w", err) + } return nil }) if err != nil { @@ -65,7 +81,7 @@ func (svc *Service) CreateBucket(msg *coormq.CreateBucket) (*coormq.CreateBucket return nil, mq.Failed(errorcode.OperationFailed, "create bucket failed") } - return mq.ReplyOK(coormq.NewCreateBucketResp(bucketID)) + return mq.ReplyOK(coormq.NewCreateBucketResp(bucket)) } func (svc *Service) DeleteBucket(msg *coormq.DeleteBucket) (*coormq.DeleteBucketResp, *mq.CodeMessage) { diff --git a/coordinator/internal/mq/package.go b/coordinator/internal/mq/package.go index ddab4ee..73acd1c 100644 --- a/coordinator/internal/mq/package.go +++ b/coordinator/internal/mq/package.go @@ -26,8 +26,22 @@ func (svc *Service) GetPackage(msg *coormq.GetPackage) (*coormq.GetPackageResp, return mq.ReplyOK(coormq.NewGetPackageResp(pkg)) } +func (svc *Service) GetPackageByName(msg *coormq.GetPackageByName) (*coormq.GetPackageByNameResp, *mq.CodeMessage) { + pkg, err := svc.db.Package().GetUserPackageByName(svc.db.SQLCtx(), msg.UserID, msg.BucketName, msg.PackageName) + if err != nil { + logger.WithField("UserID", msg.UserID). + WithField("BucketName", msg.BucketName). + WithField("PackageName", msg.PackageName). + Warnf("get package by name: %s", err.Error()) + + return nil, mq.Failed(errorcode.OperationFailed, "get package by name failed") + } + + return mq.ReplyOK(coormq.NewGetPackageByNameResp(pkg)) +} + func (svc *Service) CreatePackage(msg *coormq.CreatePackage) (*coormq.CreatePackageResp, *mq.CodeMessage) { - var pkgID cdssdk.PackageID + var pkg cdssdk.Package err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { var err error @@ -36,11 +50,16 @@ func (svc *Service) CreatePackage(msg *coormq.CreatePackage) (*coormq.CreatePack return fmt.Errorf("bucket is not avaiable to the user") } - pkgID, err = svc.db.Package().Create(tx, msg.BucketID, msg.Name) + pkgID, err := svc.db.Package().Create(tx, msg.BucketID, msg.Name) if err != nil { return fmt.Errorf("creating package: %w", err) } + pkg, err = svc.db.Package().GetByID(tx, pkgID) + if err != nil { + return fmt.Errorf("getting package by id: %w", err) + } + return nil }) if err != nil { @@ -50,10 +69,11 @@ func (svc *Service) CreatePackage(msg *coormq.CreatePackage) (*coormq.CreatePack return nil, mq.Failed(errorcode.OperationFailed, "creating package failed") } - return mq.ReplyOK(coormq.NewCreatePackageResp(pkgID)) + return mq.ReplyOK(coormq.NewCreatePackageResp(pkg)) } func (svc *Service) UpdatePackage(msg *coormq.UpdatePackage) (*coormq.UpdatePackageResp, *mq.CodeMessage) { + var added []cdssdk.Object err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { _, err := svc.db.Package().GetByID(tx, msg.PackageID) if err != nil { @@ -69,9 +89,11 @@ func (svc *Service) UpdatePackage(msg *coormq.UpdatePackage) (*coormq.UpdatePack // 再执行添加操作 if len(msg.Adds) > 0 { - if _, err := svc.db.Object().BatchAdd(tx, msg.PackageID, msg.Adds); err != nil { + ad, err := svc.db.Object().BatchAdd(tx, msg.PackageID, msg.Adds) + if err != nil { return fmt.Errorf("adding objects: %w", err) } + added = ad } return nil @@ -81,7 +103,7 @@ func (svc *Service) UpdatePackage(msg *coormq.UpdatePackage) (*coormq.UpdatePack return nil, mq.Failed(errorcode.OperationFailed, "update package failed") } - return mq.ReplyOK(coormq.NewUpdatePackageResp()) + return mq.ReplyOK(coormq.NewUpdatePackageResp(added)) } func (svc *Service) DeletePackage(msg *coormq.DeletePackage) (*coormq.DeletePackageResp, *mq.CodeMessage) {