diff --git a/client/internal/http/http.go b/client/internal/http/http.go index 3df07b6..f018411 100644 --- a/client/internal/http/http.go +++ b/client/internal/http/http.go @@ -1,6 +1,8 @@ package http import ( + "fmt" + "gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/pkgs/mq" ) @@ -19,10 +21,10 @@ func OK(data any) Response { } } -func Failed(code string, msg string) Response { +func Failed(code string, format string, args ...any) Response { return Response{ Code: code, - Message: msg, + Message: fmt.Sprintf(format, args...), } } diff --git a/client/internal/http/object.go b/client/internal/http/object.go index d0dd259..fe3ebeb 100644 --- a/client/internal/http/object.go +++ b/client/internal/http/object.go @@ -83,7 +83,7 @@ func (s *ObjectService) Upload(ctx *gin.Context) { return } - up, err := s.svc.Uploader.BeginUpdate(req.Info.PackageID, req.Info.Affinity, req.Info.LoadTo, req.Info.LoadToPath) + up, err := s.svc.Uploader.BeginUpdate(req.Info.PackageID, req.Info.Affinity, req.Info.CopyTo, req.Info.CopyToPath) if err != nil { log.Warnf("begin update: %s", err.Error()) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("begin update: %v", err))) diff --git a/client/internal/http/package.go b/client/internal/http/package.go index 36c3b45..5e05249 100644 --- a/client/internal/http/package.go +++ b/client/internal/http/package.go @@ -88,31 +88,31 @@ func (s *PackageService) Create(ctx *gin.Context) { })) } -type PackageCreateLoad struct { - Info cliapi.PackageCreateLoadInfo `form:"info" binding:"required"` - Files []*multipart.FileHeader `form:"files"` +type PackageCreateUpload struct { + Info cliapi.PackageCreateUploadInfo `form:"info" binding:"required"` + Files []*multipart.FileHeader `form:"files"` } func (s *PackageService) CreateLoad(ctx *gin.Context) { - log := logger.WithField("HTTP", "Package.CreateLoad") + log := logger.WithField("HTTP", "Package.CreateUpload") - var req PackageCreateLoad + var req PackageCreateUpload if err := ctx.ShouldBind(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) return } - if len(req.Info.LoadTo) != len(req.Info.LoadToPath) { - log.Warnf("load to and load to path count not match") - ctx.JSON(http.StatusOK, Failed(errorcode.BadArgument, "load to and load to path count not match")) + if len(req.Info.CopyTo) != len(req.Info.CopyToPath) { + log.Warnf("CopyTo and CopyToPath count not match") + ctx.JSON(http.StatusOK, Failed(errorcode.BadArgument, "CopyTo and CopyToPath count not match")) return } - up, err := s.svc.Uploader.BeginCreateLoad(req.Info.BucketID, req.Info.Name, req.Info.LoadTo, req.Info.LoadToPath) + up, err := s.svc.Uploader.BeginCreateUpload(req.Info.BucketID, req.Info.Name, req.Info.CopyTo, req.Info.CopyToPath) if err != nil { - log.Warnf("begin package create load: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("begin package create load: %v", err))) + log.Warnf("begin package create upload: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "%v", err)) return } defer up.Abort() @@ -145,8 +145,8 @@ func (s *PackageService) CreateLoad(ctx *gin.Context) { ret, err := up.Commit() if err != nil { - log.Warnf("commit create load: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("commit create load: %v", err))) + log.Warnf("commit create upload: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("commit create upload: %v", err))) return } @@ -155,7 +155,7 @@ func (s *PackageService) CreateLoad(ctx *gin.Context) { objs[i] = ret.Objects[pathes[i]] } - ctx.JSON(http.StatusOK, OK(cliapi.PackageCreateLoadResp{Package: ret.Package, Objects: objs})) + ctx.JSON(http.StatusOK, OK(cliapi.PackageCreateUploadResp{Package: ret.Package, Objects: objs})) } func (s *PackageService) Delete(ctx *gin.Context) { diff --git a/client/internal/http/presigned.go b/client/internal/http/presigned.go index 265c084..a95aa0a 100644 --- a/client/internal/http/presigned.go +++ b/client/internal/http/presigned.go @@ -154,7 +154,7 @@ func (s *PresignedService) ObjectUpload(ctx *gin.Context) { return } - up, err := s.svc.Uploader.BeginUpdate(req.PackageID, req.Affinity, req.LoadTo, req.LoadToPath) + up, err := s.svc.Uploader.BeginUpdate(req.PackageID, req.Affinity, req.CopyTo, req.CopyToPath) if err != nil { log.Warnf("begin update: %s", err.Error()) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("begin update: %v", err))) diff --git a/client/internal/http/server.go b/client/internal/http/server.go index cf397c3..9efa154 100644 --- a/client/internal/http/server.go +++ b/client/internal/http/server.go @@ -91,13 +91,13 @@ func (s *Server) initRouters(engine *gin.Engine) { rt.GET(cliapi.PackageGetPath, s.Package().Get) rt.GET(cliapi.PackageGetByFullNamePath, s.Package().GetByFullName) rt.POST(cliapi.PackageCreatePath, s.Package().Create) - rt.POST(cliapi.PackageCreateLoadPath, s.Package().CreateLoad) + rt.POST(cliapi.PackageCreateUploadPath, s.Package().CreateLoad) rt.POST(cliapi.PackageDeletePath, s.Package().Delete) rt.POST(cliapi.PackageClonePath, s.Package().Clone) rt.GET(cliapi.PackageListBucketPackagesPath, s.Package().ListBucketPackages) // rt.GET(cdsapi.PackageGetCachedStoragesPath, s.Package().GetCachedStorages) - rt.POST(cliapi.UserSpaceLoadPackagePath, s.UserSpace().LoadPackage) + rt.POST(cliapi.UserSpaceDownloadPackagePath, s.UserSpace().DownloadPackage) rt.POST(cliapi.UserSpaceCreatePackagePath, s.UserSpace().CreatePackage) rt.GET(cliapi.UserSpaceGetPath, s.UserSpace().Get) @@ -130,13 +130,13 @@ func (s *Server) routeV1(eg *gin.Engine, rt gin.IRoutes) { v1.GET(cliapi.PackageGetPath, awsAuth.Auth, s.Package().Get) v1.GET(cliapi.PackageGetByFullNamePath, awsAuth.Auth, s.Package().GetByFullName) v1.POST(cliapi.PackageCreatePath, awsAuth.Auth, s.Package().Create) - v1.POST(cliapi.PackageCreateLoadPath, awsAuth.Auth, s.Package().CreateLoad) + v1.POST(cliapi.PackageCreateUploadPath, awsAuth.Auth, s.Package().CreateLoad) v1.POST(cliapi.PackageDeletePath, awsAuth.Auth, s.Package().Delete) v1.POST(cliapi.PackageClonePath, awsAuth.Auth, s.Package().Clone) v1.GET(cliapi.PackageListBucketPackagesPath, awsAuth.Auth, s.Package().ListBucketPackages) // v1.GET(cdsapi.PackageGetCachedStoragesPath, awsAuth.Auth, s.Package().GetCachedStorages) - v1.POST(cliapi.UserSpaceLoadPackagePath, awsAuth.Auth, s.UserSpace().LoadPackage) + v1.POST(cliapi.UserSpaceDownloadPackagePath, awsAuth.Auth, s.UserSpace().DownloadPackage) v1.POST(cliapi.UserSpaceCreatePackagePath, awsAuth.Auth, s.UserSpace().CreatePackage) v1.GET(cliapi.UserSpaceGetPath, awsAuth.Auth, s.UserSpace().Get) rt.POST(cliapi.UserSpaceSpaceToSpacePath, s.UserSpace().SpaceToSpace) diff --git a/client/internal/http/user_space.go b/client/internal/http/user_space.go index 9a5234e..3a01683 100644 --- a/client/internal/http/user_space.go +++ b/client/internal/http/user_space.go @@ -20,24 +20,24 @@ func (s *Server) UserSpace() *UserSpaceService { } } -func (s *UserSpaceService) LoadPackage(ctx *gin.Context) { - log := logger.WithField("HTTP", "UserSpace.LoadPackage") +func (s *UserSpaceService) DownloadPackage(ctx *gin.Context) { + log := logger.WithField("HTTP", "UserSpace.DownloadPackage") - var req cliapi.UserSpaceLoadPackageReq + var req cliapi.UserSpaceDownloadPackageReq if err := ctx.ShouldBindJSON(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) return } - err := s.svc.UserSpaceSvc().LoadPackage(req.PackageID, req.UserSpaceID, req.RootPath) + err := s.svc.UserSpaceSvc().DownloadPackage(req.PackageID, req.UserSpaceID, req.RootPath) if err != nil { - log.Warnf("loading package: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "loading package failed")) + log.Warnf("downloading package: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "%v", err)) return } - ctx.JSON(http.StatusOK, OK(cliapi.UserSpaceLoadPackageResp{})) + ctx.JSON(http.StatusOK, OK(cliapi.UserSpaceDownloadPackageResp{})) } func (s *UserSpaceService) CreatePackage(ctx *gin.Context) { diff --git a/client/internal/services/user_space.go b/client/internal/services/user_space.go index 56b81ee..64ac883 100644 --- a/client/internal/services/user_space.go +++ b/client/internal/services/user_space.go @@ -39,7 +39,7 @@ func (svc *UserSpaceService) GetByName(name string) (clitypes.UserSpace, error) return svc.DB.UserSpace().GetByName(svc.DB.DefCtx(), name) } -func (svc *UserSpaceService) LoadPackage(packageID clitypes.PackageID, userspaceID clitypes.UserSpaceID, rootPath string) error { +func (svc *UserSpaceService) DownloadPackage(packageID clitypes.PackageID, userspaceID clitypes.UserSpaceID, rootPath string) error { coorCli := stgglb.CoordinatorRPCPool.Get() defer coorCli.Release() diff --git a/client/internal/ticktock/change_redundancy.go b/client/internal/ticktock/change_redundancy.go index ce76093..9aaf739 100644 --- a/client/internal/ticktock/change_redundancy.go +++ b/client/internal/ticktock/change_redundancy.go @@ -34,7 +34,7 @@ func (j *ChangeRedundancy) Execute(t *TickTock) { ctx := &changeRedundancyContext{ ticktock: t, - allUserSpaces: make(map[clitypes.UserSpaceID]*userSpaceLoadInfo), + allUserSpaces: make(map[clitypes.UserSpaceID]*userSpaceUsageInfo), } spaceIDs, err := t.db.UserSpace().GetAllIDs(t.db.DefCtx()) @@ -52,7 +52,7 @@ func (j *ChangeRedundancy) Execute(t *TickTock) { continue } - ctx.allUserSpaces[space.UserSpace.UserSpaceID] = &userSpaceLoadInfo{ + ctx.allUserSpaces[space.UserSpace.UserSpaceID] = &userSpaceUsageInfo{ UserSpace: space, } } @@ -85,11 +85,11 @@ func (j *ChangeRedundancy) Execute(t *TickTock) { type changeRedundancyContext struct { ticktock *TickTock - allUserSpaces map[clitypes.UserSpaceID]*userSpaceLoadInfo + allUserSpaces map[clitypes.UserSpaceID]*userSpaceUsageInfo mostBlockStgIDs []clitypes.UserSpaceID } -type userSpaceLoadInfo struct { +type userSpaceUsageInfo struct { UserSpace *clitypes.UserSpaceDetail AccessAmount float64 } diff --git a/client/internal/ticktock/redundancy_recover.go b/client/internal/ticktock/redundancy_recover.go index f8c12ee..4dbb79c 100644 --- a/client/internal/ticktock/redundancy_recover.go +++ b/client/internal/ticktock/redundancy_recover.go @@ -21,7 +21,7 @@ import ( cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" ) -func (t *ChangeRedundancy) chooseRedundancy(ctx *changeRedundancyContext, obj clitypes.ObjectDetail) (clitypes.Redundancy, []*userSpaceLoadInfo) { +func (t *ChangeRedundancy) chooseRedundancy(ctx *changeRedundancyContext, obj clitypes.ObjectDetail) (clitypes.Redundancy, []*userSpaceUsageInfo) { switch obj.Object.Redundancy.(type) { case *clitypes.NoneRedundancy: if obj.Object.Size > ctx.ticktock.cfg.ECFileSizeThreshold { @@ -53,7 +53,7 @@ func (t *ChangeRedundancy) chooseRedundancy(ctx *changeRedundancyContext, obj cl return nil, nil } -func (t *ChangeRedundancy) doChangeRedundancy(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, newRed clitypes.Redundancy, selectedUserSpaces []*userSpaceLoadInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) { +func (t *ChangeRedundancy) doChangeRedundancy(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, newRed clitypes.Redundancy, selectedUserSpaces []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) { log := logger.WithType[ChangeRedundancy]("TickTock") var updating *db.UpdatingObjectRedundancy @@ -145,41 +145,41 @@ func (t *ChangeRedundancy) summaryRepObjectBlockUserSpaces(ctx *changeRedundancy return ids } -func (t *ChangeRedundancy) chooseNewUserSpacesForRep(ctx *changeRedundancyContext, red *clitypes.RepRedundancy) []*userSpaceLoadInfo { - sortedUserSpaces := sort2.Sort(lo.Values(ctx.allUserSpaces), func(left *userSpaceLoadInfo, right *userSpaceLoadInfo) int { +func (t *ChangeRedundancy) chooseNewUserSpacesForRep(ctx *changeRedundancyContext, red *clitypes.RepRedundancy) []*userSpaceUsageInfo { + sortedUserSpaces := sort2.Sort(lo.Values(ctx.allUserSpaces), func(left *userSpaceUsageInfo, right *userSpaceUsageInfo) int { return sort2.Cmp(right.AccessAmount, left.AccessAmount) }) return t.chooseSoManyUserSpaces(red.RepCount, sortedUserSpaces) } -func (t *ChangeRedundancy) chooseNewUserSpacesForEC(ctx *changeRedundancyContext, red *clitypes.ECRedundancy) []*userSpaceLoadInfo { - sortedUserSpaces := sort2.Sort(lo.Values(ctx.allUserSpaces), func(left *userSpaceLoadInfo, right *userSpaceLoadInfo) int { +func (t *ChangeRedundancy) chooseNewUserSpacesForEC(ctx *changeRedundancyContext, red *clitypes.ECRedundancy) []*userSpaceUsageInfo { + sortedUserSpaces := sort2.Sort(lo.Values(ctx.allUserSpaces), func(left *userSpaceUsageInfo, right *userSpaceUsageInfo) int { return sort2.Cmp(right.AccessAmount, left.AccessAmount) }) return t.chooseSoManyUserSpaces(red.N, sortedUserSpaces) } -func (t *ChangeRedundancy) chooseNewUserSpacesForLRC(ctx *changeRedundancyContext, red *clitypes.LRCRedundancy) []*userSpaceLoadInfo { - sortedUserSpaces := sort2.Sort(lo.Values(ctx.allUserSpaces), func(left *userSpaceLoadInfo, right *userSpaceLoadInfo) int { +func (t *ChangeRedundancy) chooseNewUserSpacesForLRC(ctx *changeRedundancyContext, red *clitypes.LRCRedundancy) []*userSpaceUsageInfo { + sortedUserSpaces := sort2.Sort(lo.Values(ctx.allUserSpaces), func(left *userSpaceUsageInfo, right *userSpaceUsageInfo) int { return sort2.Cmp(right.AccessAmount, left.AccessAmount) }) return t.chooseSoManyUserSpaces(red.N, sortedUserSpaces) } -func (t *ChangeRedundancy) chooseNewUserSpacesForSeg(ctx *changeRedundancyContext, segCount int) []*userSpaceLoadInfo { - sortedUserSpaces := sort2.Sort(lo.Values(ctx.allUserSpaces), func(left *userSpaceLoadInfo, right *userSpaceLoadInfo) int { +func (t *ChangeRedundancy) chooseNewUserSpacesForSeg(ctx *changeRedundancyContext, segCount int) []*userSpaceUsageInfo { + sortedUserSpaces := sort2.Sort(lo.Values(ctx.allUserSpaces), func(left *userSpaceUsageInfo, right *userSpaceUsageInfo) int { return sort2.Cmp(right.AccessAmount, left.AccessAmount) }) return t.chooseSoManyUserSpaces(segCount, sortedUserSpaces) } -func (t *ChangeRedundancy) rechooseUserSpacesForRep(ctx *changeRedundancyContext, red *clitypes.RepRedundancy) []*userSpaceLoadInfo { +func (t *ChangeRedundancy) rechooseUserSpacesForRep(ctx *changeRedundancyContext, red *clitypes.RepRedundancy) []*userSpaceUsageInfo { type rechooseUserSpace struct { - *userSpaceLoadInfo + *userSpaceUsageInfo HasBlock bool } @@ -194,8 +194,8 @@ func (t *ChangeRedundancy) rechooseUserSpacesForRep(ctx *changeRedundancyContext } rechooseStgs = append(rechooseStgs, &rechooseUserSpace{ - userSpaceLoadInfo: stg, - HasBlock: hasBlock, + userSpaceUsageInfo: stg, + HasBlock: hasBlock, }) } @@ -209,12 +209,12 @@ func (t *ChangeRedundancy) rechooseUserSpacesForRep(ctx *changeRedundancyContext return sort2.Cmp(right.AccessAmount, left.AccessAmount) }) - return t.chooseSoManyUserSpaces(red.RepCount, lo.Map(sortedStgs, func(userspace *rechooseUserSpace, idx int) *userSpaceLoadInfo { return userspace.userSpaceLoadInfo })) + return t.chooseSoManyUserSpaces(red.RepCount, lo.Map(sortedStgs, func(userspace *rechooseUserSpace, idx int) *userSpaceUsageInfo { return userspace.userSpaceUsageInfo })) } -func (t *ChangeRedundancy) rechooseUserSpacesForEC(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, red *clitypes.ECRedundancy) []*userSpaceLoadInfo { +func (t *ChangeRedundancy) rechooseUserSpacesForEC(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, red *clitypes.ECRedundancy) []*userSpaceUsageInfo { type rechooseStg struct { - *userSpaceLoadInfo + *userSpaceUsageInfo CachedBlockIndex int } @@ -229,8 +229,8 @@ func (t *ChangeRedundancy) rechooseUserSpacesForEC(ctx *changeRedundancyContext, } rechooseStgs = append(rechooseStgs, &rechooseStg{ - userSpaceLoadInfo: stg, - CachedBlockIndex: cachedBlockIndex, + userSpaceUsageInfo: stg, + CachedBlockIndex: cachedBlockIndex, }) } @@ -245,12 +245,12 @@ func (t *ChangeRedundancy) rechooseUserSpacesForEC(ctx *changeRedundancyContext, }) // TODO 可以考虑选择已有块的节点时,能依然按照Index顺序选择 - return t.chooseSoManyUserSpaces(red.N, lo.Map(sortedStgs, func(userspace *rechooseStg, idx int) *userSpaceLoadInfo { return userspace.userSpaceLoadInfo })) + return t.chooseSoManyUserSpaces(red.N, lo.Map(sortedStgs, func(userspace *rechooseStg, idx int) *userSpaceUsageInfo { return userspace.userSpaceUsageInfo })) } -func (t *ChangeRedundancy) rechooseUserSpacesForLRC(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, red *clitypes.LRCRedundancy) []*userSpaceLoadInfo { +func (t *ChangeRedundancy) rechooseUserSpacesForLRC(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, red *clitypes.LRCRedundancy) []*userSpaceUsageInfo { type rechooseStg struct { - *userSpaceLoadInfo + *userSpaceUsageInfo CachedBlockIndex int } @@ -265,8 +265,8 @@ func (t *ChangeRedundancy) rechooseUserSpacesForLRC(ctx *changeRedundancyContext } rechooseStgs = append(rechooseStgs, &rechooseStg{ - userSpaceLoadInfo: stg, - CachedBlockIndex: cachedBlockIndex, + userSpaceUsageInfo: stg, + CachedBlockIndex: cachedBlockIndex, }) } @@ -281,12 +281,12 @@ func (t *ChangeRedundancy) rechooseUserSpacesForLRC(ctx *changeRedundancyContext }) // TODO 可以考虑选择已有块的节点时,能依然按照Index顺序选择 - return t.chooseSoManyUserSpaces(red.N, lo.Map(sortedStgs, func(userspace *rechooseStg, idx int) *userSpaceLoadInfo { return userspace.userSpaceLoadInfo })) + return t.chooseSoManyUserSpaces(red.N, lo.Map(sortedStgs, func(userspace *rechooseStg, idx int) *userSpaceUsageInfo { return userspace.userSpaceUsageInfo })) } -func (t *ChangeRedundancy) chooseSoManyUserSpaces(count int, stgs []*userSpaceLoadInfo) []*userSpaceLoadInfo { +func (t *ChangeRedundancy) chooseSoManyUserSpaces(count int, stgs []*userSpaceUsageInfo) []*userSpaceUsageInfo { repeateCount := (count + len(stgs) - 1) / len(stgs) - extendStgs := make([]*userSpaceLoadInfo, repeateCount*len(stgs)) + extendStgs := make([]*userSpaceUsageInfo, repeateCount*len(stgs)) // 使用复制的方式将节点数扩充到要求的数量 // 复制之后的结构:ABCD -> AAABBBCCCDDD @@ -298,7 +298,7 @@ func (t *ChangeRedundancy) chooseSoManyUserSpaces(count int, stgs []*userSpaceLo } extendStgs = extendStgs[:count] - var chosen []*userSpaceLoadInfo + var chosen []*userSpaceUsageInfo for len(chosen) < count { // 在每一轮内都选不同地区的节点,如果节点数不够,那么就再来一轮 chosenLocations := make(map[cortypes.LocationID]bool) @@ -320,7 +320,7 @@ func (t *ChangeRedundancy) chooseSoManyUserSpaces(count int, stgs []*userSpaceLo return chosen } -func (t *ChangeRedundancy) noneToRep(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, red *clitypes.RepRedundancy, uploadStgs []*userSpaceLoadInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) { +func (t *ChangeRedundancy) noneToRep(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, red *clitypes.RepRedundancy, uploadStgs []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) { if len(obj.Blocks) == 0 { return nil, nil, fmt.Errorf("object is not cached on any userspaces, cannot change its redundancy to rep") } @@ -334,7 +334,7 @@ func (t *ChangeRedundancy) noneToRep(ctx *changeRedundancyContext, obj clitypes. } // 如果选择的备份节点都是同一个,那么就只要上传一次 - uploadStgs = lo.UniqBy(uploadStgs, func(item *userSpaceLoadInfo) clitypes.UserSpaceID { return item.UserSpace.UserSpace.UserSpaceID }) + uploadStgs = lo.UniqBy(uploadStgs, func(item *userSpaceUsageInfo) clitypes.UserSpaceID { return item.UserSpace.UserSpace.UserSpaceID }) ft := ioswitch2.NewFromTo() ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *srcStg.UserSpace.MasterHub, *srcStg.UserSpace, ioswitch2.RawStream())) @@ -394,7 +394,7 @@ func (t *ChangeRedundancy) noneToRep(ctx *changeRedundancyContext, obj clitypes. }, nil } -func (t *ChangeRedundancy) noneToEC(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, red *clitypes.ECRedundancy, uploadStgs []*userSpaceLoadInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) { +func (t *ChangeRedundancy) noneToEC(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, red *clitypes.ECRedundancy, uploadStgs []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) { if len(obj.Blocks) == 0 { return nil, nil, fmt.Errorf("object is not cached on any userspaces, cannot change its redundancy to ec") } @@ -479,7 +479,7 @@ func (t *ChangeRedundancy) noneToEC(ctx *changeRedundancyContext, obj clitypes.O }, nil } -func (t *ChangeRedundancy) noneToLRC(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, red *clitypes.LRCRedundancy, uploadStgs []*userSpaceLoadInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) { +func (t *ChangeRedundancy) noneToLRC(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, red *clitypes.LRCRedundancy, uploadStgs []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) { if len(obj.Blocks) == 0 { return nil, nil, fmt.Errorf("object is not cached on any userspaces, cannot change its redundancy to ec") } @@ -564,7 +564,7 @@ func (t *ChangeRedundancy) noneToLRC(ctx *changeRedundancyContext, obj clitypes. nil } -func (t *ChangeRedundancy) noneToSeg(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, red *clitypes.SegmentRedundancy, uploadStgs []*userSpaceLoadInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) { +func (t *ChangeRedundancy) noneToSeg(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, red *clitypes.SegmentRedundancy, uploadStgs []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) { if len(obj.Blocks) == 0 { return nil, nil, fmt.Errorf("object is not cached on any userspaces, cannot change its redundancy to rep") } @@ -578,7 +578,7 @@ func (t *ChangeRedundancy) noneToSeg(ctx *changeRedundancyContext, obj clitypes. } // 如果选择的备份节点都是同一个,那么就只要上传一次 - uploadStgs = lo.UniqBy(uploadStgs, func(item *userSpaceLoadInfo) clitypes.UserSpaceID { return item.UserSpace.UserSpace.UserSpaceID }) + uploadStgs = lo.UniqBy(uploadStgs, func(item *userSpaceUsageInfo) clitypes.UserSpaceID { return item.UserSpace.UserSpace.UserSpaceID }) ft := ioswitch2.NewFromTo() ft.SegmentParam = red @@ -655,7 +655,7 @@ func (t *ChangeRedundancy) noneToSeg(ctx *changeRedundancyContext, obj clitypes. nil } -func (t *ChangeRedundancy) repToRep(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, red *clitypes.RepRedundancy, uploadStgs []*userSpaceLoadInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) { +func (t *ChangeRedundancy) repToRep(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, red *clitypes.RepRedundancy, uploadStgs []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) { if len(obj.Blocks) == 0 { return nil, nil, fmt.Errorf("object is not cached on any userspaces, cannot change its redundancy to rep") } @@ -669,7 +669,7 @@ func (t *ChangeRedundancy) repToRep(ctx *changeRedundancyContext, obj clitypes.O } // 如果选择的备份节点都是同一个,那么就只要上传一次 - uploadStgs = lo.UniqBy(uploadStgs, func(item *userSpaceLoadInfo) clitypes.UserSpaceID { return item.UserSpace.UserSpace.UserSpaceID }) + uploadStgs = lo.UniqBy(uploadStgs, func(item *userSpaceUsageInfo) clitypes.UserSpaceID { return item.UserSpace.UserSpace.UserSpaceID }) ft := ioswitch2.NewFromTo() ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *srcStg.UserSpace.MasterHub, *srcStg.UserSpace, ioswitch2.RawStream())) @@ -731,11 +731,11 @@ func (t *ChangeRedundancy) repToRep(ctx *changeRedundancyContext, obj clitypes.O nil } -func (t *ChangeRedundancy) repToEC(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, red *clitypes.ECRedundancy, uploadUserSpaces []*userSpaceLoadInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) { +func (t *ChangeRedundancy) repToEC(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, red *clitypes.ECRedundancy, uploadUserSpaces []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) { return t.noneToEC(ctx, obj, red, uploadUserSpaces) } -func (t *ChangeRedundancy) ecToRep(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, srcRed *clitypes.ECRedundancy, tarRed *clitypes.RepRedundancy, uploadStgs []*userSpaceLoadInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) { +func (t *ChangeRedundancy) ecToRep(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, srcRed *clitypes.ECRedundancy, tarRed *clitypes.RepRedundancy, uploadStgs []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) { var chosenBlocks []clitypes.GrouppedObjectBlock var chosenBlockIndexes []int var chosenBlockStg []clitypes.UserSpaceDetail @@ -765,7 +765,7 @@ func (t *ChangeRedundancy) ecToRep(ctx *changeRedundancyContext, obj clitypes.Ob } // 如果选择的备份节点都是同一个,那么就只要上传一次 - uploadStgs = lo.UniqBy(uploadStgs, func(item *userSpaceLoadInfo) clitypes.UserSpaceID { return item.UserSpace.UserSpace.UserSpaceID }) + uploadStgs = lo.UniqBy(uploadStgs, func(item *userSpaceUsageInfo) clitypes.UserSpaceID { return item.UserSpace.UserSpace.UserSpaceID }) planBlder := exec.NewPlanBuilder() ft := ioswitch2.NewFromTo() @@ -863,7 +863,7 @@ func (t *ChangeRedundancy) ecToRep(ctx *changeRedundancyContext, obj clitypes.Ob nil } -func (t *ChangeRedundancy) ecToEC(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, srcRed *clitypes.ECRedundancy, tarRed *clitypes.ECRedundancy, uploadUserSpaces []*userSpaceLoadInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) { +func (t *ChangeRedundancy) ecToEC(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, srcRed *clitypes.ECRedundancy, tarRed *clitypes.ECRedundancy, uploadUserSpaces []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) { grpBlocks := obj.GroupBlocks() var chosenBlocks []clitypes.GrouppedObjectBlock @@ -1016,7 +1016,7 @@ func (t *ChangeRedundancy) ecToEC(ctx *changeRedundancyContext, obj clitypes.Obj nil } -func (t *ChangeRedundancy) lrcToLRC(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, srcRed *clitypes.LRCRedundancy, tarRed *clitypes.LRCRedundancy, uploadUserSpaces []*userSpaceLoadInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) { +func (t *ChangeRedundancy) lrcToLRC(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, srcRed *clitypes.LRCRedundancy, tarRed *clitypes.LRCRedundancy, uploadUserSpaces []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) { blocksGrpByIndex := obj.GroupBlocks() @@ -1122,7 +1122,7 @@ TODO2 修复这一块的代码 }, nil } */ -func (t *ChangeRedundancy) reconstructLRC(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, grpBlocks []clitypes.GrouppedObjectBlock, red *clitypes.LRCRedundancy, uploadUserSpaces []*userSpaceLoadInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) { +func (t *ChangeRedundancy) reconstructLRC(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, grpBlocks []clitypes.GrouppedObjectBlock, red *clitypes.LRCRedundancy, uploadUserSpaces []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) { var chosenBlocks []clitypes.GrouppedObjectBlock var chosenBlockStg []clitypes.UserSpaceDetail diff --git a/client/internal/uploader/create_load.go b/client/internal/uploader/create_load.go index 05f1cb6..5b92b78 100644 --- a/client/internal/uploader/create_load.go +++ b/client/internal/uploader/create_load.go @@ -17,10 +17,10 @@ import ( "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser" ) -type CreateLoadUploader struct { +type CreateUploader struct { pkg types.Package targetSpaces []types.UserSpaceDetail - loadRoots []string + copyRoots []string uploader *Uploader pubLock *distlock.Mutex successes []db.AddObjectEntry @@ -28,12 +28,12 @@ type CreateLoadUploader struct { commited bool } -type CreateLoadResult struct { +type CreateUploadResult struct { Package types.Package Objects map[string]types.Object } -func (u *CreateLoadUploader) Upload(pa string, stream io.Reader, opts ...UploadOption) error { +func (u *CreateUploader) Upload(pa string, stream io.Reader, opts ...UploadOption) error { opt := UploadOption{} if len(opts) > 0 { opt = opts[0] @@ -50,7 +50,7 @@ func (u *CreateLoadUploader) Upload(pa string, stream io.Reader, opts ...UploadO ft.AddFrom(fromExec) for i, space := range u.targetSpaces { ft.AddTo(ioswitch2.NewToShardStore(*space.MasterHub, space, ioswitch2.RawStream(), "shardInfo")) - ft.AddTo(ioswitch2.NewToPublicStore(*space.MasterHub, space, path.Join(u.loadRoots[i], pa))) + ft.AddTo(ioswitch2.NewToPublicStore(*space.MasterHub, space, path.Join(u.copyRoots[i], pa))) spaceIDs = append(spaceIDs, space.UserSpace.UserSpaceID) } @@ -84,12 +84,12 @@ func (u *CreateLoadUploader) Upload(pa string, stream io.Reader, opts ...UploadO return nil } -func (u *CreateLoadUploader) Commit() (CreateLoadResult, error) { +func (u *CreateUploader) Commit() (CreateUploadResult, error) { u.lock.Lock() defer u.lock.Unlock() if u.commited { - return CreateLoadResult{}, fmt.Errorf("package already commited") + return CreateUploadResult{}, fmt.Errorf("package already commited") } u.commited = true @@ -102,10 +102,10 @@ func (u *CreateLoadUploader) Commit() (CreateLoadResult, error) { return err }) if err != nil { - return CreateLoadResult{}, fmt.Errorf("adding objects: %w", err) + return CreateUploadResult{}, fmt.Errorf("adding objects: %w", err) } - ret := CreateLoadResult{ + ret := CreateUploadResult{ Package: u.pkg, Objects: make(map[string]types.Object), } @@ -117,7 +117,7 @@ func (u *CreateLoadUploader) Commit() (CreateLoadResult, error) { return ret, nil } -func (u *CreateLoadUploader) Abort() { +func (u *CreateUploader) Abort() { u.lock.Lock() defer u.lock.Unlock() diff --git a/client/internal/uploader/update.go b/client/internal/uploader/update.go index bf9a857..7e81bd6 100644 --- a/client/internal/uploader/update.go +++ b/client/internal/uploader/update.go @@ -23,8 +23,8 @@ type UpdateUploader struct { pkgID types.PackageID targetSpace types.UserSpaceDetail pubLock *distlock.Mutex - loadToSpaces []types.UserSpaceDetail - loadToPath []string + copyToSpaces []types.UserSpaceDetail + copyToPath []string successes []db.AddObjectEntry lock sync.Mutex commited bool @@ -60,8 +60,8 @@ func (w *UpdateUploader) Upload(pat string, stream io.Reader, opts ...UploadOpti ft.AddFrom(fromExec). AddTo(ioswitch2.NewToShardStore(*w.targetSpace.MasterHub, w.targetSpace, ioswitch2.RawStream(), "shardInfo")) - for i, space := range w.loadToSpaces { - ft.AddTo(ioswitch2.NewToPublicStore(*space.MasterHub, space, path.Join(w.loadToPath[i], pat))) + for i, space := range w.copyToSpaces { + ft.AddTo(ioswitch2.NewToPublicStore(*space.MasterHub, space, path.Join(w.copyToPath[i], pat))) } plans := exec.NewPlanBuilder() diff --git a/client/internal/uploader/uploader.go b/client/internal/uploader/uploader.go index 569428b..5bd7d2f 100644 --- a/client/internal/uploader/uploader.go +++ b/client/internal/uploader/uploader.go @@ -43,7 +43,7 @@ func NewUploader(pubLock *distlock.Service, connectivity *connectivity.Collector } } -func (u *Uploader) BeginUpdate(pkgID clitypes.PackageID, affinity clitypes.UserSpaceID, loadTo []clitypes.UserSpaceID, loadToPath []string) (*UpdateUploader, error) { +func (u *Uploader) BeginUpdate(pkgID clitypes.PackageID, affinity clitypes.UserSpaceID, copyTo []clitypes.UserSpaceID, copyToPath []string) (*UpdateUploader, error) { spaceIDs, err := u.db.UserSpace().GetAllIDs(u.db.DefCtx()) if err != nil { return nil, fmt.Errorf("getting user space ids: %w", err) @@ -77,19 +77,19 @@ func (u *Uploader) BeginUpdate(pkgID clitypes.PackageID, affinity clitypes.UserS return nil, fmt.Errorf("user no available userspaces") } - loadToSpaces := make([]clitypes.UserSpaceDetail, len(loadTo)) - for i, spaceID := range loadTo { + copyToSpaces := make([]clitypes.UserSpaceDetail, len(copyTo)) + for i, spaceID := range copyTo { space, ok := lo.Find(spaceDetails, func(space *clitypes.UserSpaceDetail) bool { return space.UserSpace.UserSpaceID == spaceID }) if !ok { - return nil, fmt.Errorf("load to storage %v not found", spaceID) + return nil, fmt.Errorf("user space %v not found", spaceID) } if space.MasterHub == nil { - return nil, fmt.Errorf("load to storage %v has no master hub", spaceID) + return nil, fmt.Errorf("user space %v has no master hub", spaceID) } - loadToSpaces[i] = *space + copyToSpaces[i] = *space } target := u.chooseUploadStorage(uploadSpaces, affinity) @@ -105,8 +105,8 @@ func (u *Uploader) BeginUpdate(pkgID clitypes.PackageID, affinity clitypes.UserS pkgID: pkgID, targetSpace: target.Space, pubLock: pubLock, - loadToSpaces: loadToSpaces, - loadToPath: loadToPath, + copyToSpaces: copyToSpaces, + copyToPath: copyToPath, }, nil } @@ -133,13 +133,13 @@ func (w *Uploader) chooseUploadStorage(spaces []UploadSpaceInfo, spaceAffinity c return spaces[0] } -func (u *Uploader) BeginCreateLoad(bktID clitypes.BucketID, pkgName string, loadTo []clitypes.UserSpaceID, loadToPath []string) (*CreateLoadUploader, error) { - getSpaces := u.spaceMeta.GetMany(loadTo) +func (u *Uploader) BeginCreateUpload(bktID clitypes.BucketID, pkgName string, copyTo []clitypes.UserSpaceID, copyToPath []string) (*CreateUploader, error) { + getSpaces := u.spaceMeta.GetMany(copyTo) - spacesStgs := make([]clitypes.UserSpaceDetail, len(loadTo)) + spacesStgs := make([]clitypes.UserSpaceDetail, len(copyTo)) for i, stg := range getSpaces { if stg == nil { - return nil, fmt.Errorf("storage %v not found", loadTo[i]) + return nil, fmt.Errorf("storage %v not found", copyTo[i]) } spacesStgs[i] = *stg } @@ -165,10 +165,10 @@ func (u *Uploader) BeginCreateLoad(bktID clitypes.BucketID, pkgName string, load return nil, fmt.Errorf("acquire lock: %w", err) } - return &CreateLoadUploader{ + return &CreateUploader{ pkg: pkg, targetSpaces: spacesStgs, - loadRoots: loadToPath, + copyRoots: copyToPath, uploader: u, pubLock: lock, }, nil diff --git a/client/sdk/api/object.go b/client/sdk/api/object.go index 0f22fb8..5aeaea9 100644 --- a/client/sdk/api/object.go +++ b/client/sdk/api/object.go @@ -92,8 +92,8 @@ type ObjectUpload struct { type ObjectUploadInfo struct { PackageID types.PackageID `json:"packageID" binding:"required"` Affinity types.UserSpaceID `json:"affinity"` - LoadTo []types.UserSpaceID `json:"loadTo"` - LoadToPath []string `json:"loadToPath"` + CopyTo []types.UserSpaceID `json:"copyTo"` + CopyToPath []string `json:"copyToPath"` } type UploadingObject struct { diff --git a/client/sdk/api/package.go b/client/sdk/api/package.go index 9b5fee6..7f793d7 100644 --- a/client/sdk/api/package.go +++ b/client/sdk/api/package.go @@ -89,25 +89,25 @@ func (s *PackageService) Create(req PackageCreate) (*PackageCreateResp, error) { return JSONAPI(s.cfg, http.DefaultClient, &req, &PackageCreateResp{}) } -const PackageCreateLoadPath = "/package/createLoad" +const PackageCreateUploadPath = "/package/createUpload" -type PackageCreateLoad struct { - PackageCreateLoadInfo +type PackageCreateUpload struct { + PackageCreateUploadInfo Files UploadObjectIterator `json:"-"` } -type PackageCreateLoadInfo struct { +type PackageCreateUploadInfo struct { BucketID clitypes.BucketID `json:"bucketID" binding:"required"` Name string `json:"name" binding:"required"` - LoadTo []clitypes.UserSpaceID `json:"loadTo"` - LoadToPath []string `json:"loadToPath"` + CopyTo []clitypes.UserSpaceID `json:"copyTo"` + CopyToPath []string `json:"copyToPath"` } -type PackageCreateLoadResp struct { +type PackageCreateUploadResp struct { Package clitypes.Package `json:"package"` Objects []clitypes.Object `json:"objects"` } -func (c *PackageService) CreateLoad(req PackageCreateLoad) (*PackageCreateLoadResp, error) { - url, err := url.JoinPath(c.cfg.URL, PackageCreateLoadPath) +func (c *PackageService) CreateUpload(req PackageCreateUpload) (*PackageCreateUploadResp, error) { + url, err := url.JoinPath(c.cfg.URL, PackageCreateUploadPath) if err != nil { return nil, err } @@ -130,7 +130,7 @@ func (c *PackageService) CreateLoad(req PackageCreateLoad) (*PackageCreateLoadRe return nil, err } - codeResp, err := ParseJSONResponse[response[PackageCreateLoadResp]](resp) + codeResp, err := ParseJSONResponse[response[PackageCreateUploadResp]](resp) if err != nil { return nil, err } diff --git a/client/sdk/api/presigned.go b/client/sdk/api/presigned.go index 7138851..599cbab 100644 --- a/client/sdk/api/presigned.go +++ b/client/sdk/api/presigned.go @@ -64,8 +64,8 @@ type PresignedObjectUpload struct { PackageID clitypes.PackageID `form:"packageID" binding:"required" url:"packageID"` Path string `form:"path" binding:"required" url:"path"` Affinity clitypes.UserSpaceID `form:"affinity" url:"affinity,omitempty"` - LoadTo []clitypes.UserSpaceID `form:"loadTo" url:"loadTo,omitempty"` - LoadToPath []string `form:"loadToPath" url:"loadToPath,omitempty"` + CopyTo []clitypes.UserSpaceID `form:"copyTo" url:"copyTo,omitempty"` + CopyToPath []string `form:"copyToPath" url:"copyToPath,omitempty"` } type PresignedObjectUploadResp struct { diff --git a/client/sdk/api/storage_test.go b/client/sdk/api/storage_test.go index 6253303..f8d3daa 100644 --- a/client/sdk/api/storage_test.go +++ b/client/sdk/api/storage_test.go @@ -168,7 +168,7 @@ func Test_Storage(t *testing.T) { }) So(err, ShouldBeNil) - _, err = cli.UserSpaceLoadPackage(UserSpaceLoadPackageReq{ + _, err = cli.UserSpaceDownloadPackage(UserSpaceDownloadPackageReq{ PackageID: createResp.Package.PackageID, UserSpaceID: 1, }) diff --git a/client/sdk/api/userspace.go b/client/sdk/api/userspace.go index da4b454..82044be 100644 --- a/client/sdk/api/userspace.go +++ b/client/sdk/api/userspace.go @@ -7,26 +7,26 @@ import ( clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" ) -const UserSpaceLoadPackagePath = "/userspace/loadPackage" +const UserSpaceDownloadPackagePath = "/userspace/downloadPackage" -type UserSpaceLoadPackageReq struct { +type UserSpaceDownloadPackageReq struct { PackageID clitypes.PackageID `json:"packageID" binding:"required"` UserSpaceID clitypes.UserSpaceID `json:"userSpaceID" binding:"required"` RootPath string `json:"rootPath"` } -func (r *UserSpaceLoadPackageReq) MakeParam() *sdks.RequestParam { - return sdks.MakeJSONParam(http.MethodPost, UserSpaceLoadPackagePath, r) +func (r *UserSpaceDownloadPackageReq) MakeParam() *sdks.RequestParam { + return sdks.MakeJSONParam(http.MethodPost, UserSpaceDownloadPackagePath, r) } -type UserSpaceLoadPackageResp struct{} +type UserSpaceDownloadPackageResp struct{} -func (r *UserSpaceLoadPackageResp) ParseResponse(resp *http.Response) error { +func (r *UserSpaceDownloadPackageResp) ParseResponse(resp *http.Response) error { return sdks.ParseCodeDataJSONResponse(resp, r) } -func (c *Client) UserSpaceLoadPackage(req UserSpaceLoadPackageReq) (*UserSpaceLoadPackageResp, error) { - return JSONAPI(c.cfg, http.DefaultClient, &req, &UserSpaceLoadPackageResp{}) +func (c *Client) UserSpaceDownloadPackage(req UserSpaceDownloadPackageReq) (*UserSpaceDownloadPackageResp, error) { + return JSONAPI(c.cfg, http.DefaultClient, &req, &UserSpaceDownloadPackageResp{}) } const UserSpaceCreatePackagePath = "/userspace/createPackage" diff --git a/common/pkgs/ioswitch2/parser/opt/multipart.go b/common/pkgs/ioswitch2/parser/opt/multipart.go index 07534f2..b03b164 100644 --- a/common/pkgs/ioswitch2/parser/opt/multipart.go +++ b/common/pkgs/ioswitch2/parser/opt/multipart.go @@ -9,7 +9,7 @@ import ( ) // 将SegmentJoin指令替换成分片上传指令 -func UseMultipartUploadToShardStore(ctx *state.GenerateState) { +func UseMultipartUpcopyToShardStore(ctx *state.GenerateState) { dag.WalkOnlyType[*ops2.SegmentJoinNode](ctx.DAG.Graph, func(joinNode *ops2.SegmentJoinNode) bool { if joinNode.Joined().Dst.Len() != 1 { return true diff --git a/common/pkgs/ioswitch2/parser/parser.go b/common/pkgs/ioswitch2/parser/parser.go index 05d6fc7..089ad1b 100644 --- a/common/pkgs/ioswitch2/parser/parser.go +++ b/common/pkgs/ioswitch2/parser/parser.go @@ -79,7 +79,7 @@ func Parse(ft ioswitch2.FromTo, blder *exec.PlanBuilder) error { opt.RemoveUnusedFromNode(state) opt.UseECMultiplier(state) opt.UseS2STransfer(state) - opt.UseMultipartUploadToShardStore(state) + opt.UseMultipartUpcopyToShardStore(state) opt.DropUnused(state) opt.StoreShardWriteResult(state) opt.GenerateRange(state) diff --git a/hub/internal/task/cache_move_package.go b/hub/internal/task/cache_move_package.go deleted file mode 100644 index 44b422f..0000000 --- a/hub/internal/task/cache_move_package.go +++ /dev/null @@ -1,92 +0,0 @@ -package task - -/* -import ( - "fmt" - "time" - - "gitlink.org.cn/cloudream/common/pkgs/logger" - "gitlink.org.cn/cloudream/common/pkgs/task" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" - "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/distlock/reqbuilder" - "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/iterator" - coormq "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/coordinator" -) - -type CacheMovePackage struct { - userID cdssdk.UserID - packageID cdssdk.PackageID - storageID cdssdk.StorageID -} - -func NewCacheMovePackage(userID cdssdk.UserID, packageID cdssdk.PackageID, storageID cdssdk.StorageID) *CacheMovePackage { - return &CacheMovePackage{ - userID: userID, - packageID: packageID, - storageID: storageID, - } -} - -func (t *CacheMovePackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { - err := t.do(ctx) - complete(err, CompleteOption{ - RemovingDelay: time.Minute, - }) -} - -func (t *CacheMovePackage) do(ctx TaskContext) error { - log := logger.WithType[CacheMovePackage]("Task") - log.Debugf("begin with %v", logger.FormatStruct(t)) - defer log.Debugf("end") - - store, err := ctx.stgHubs.GetShardStore(t.storageID) - if err != nil { - return fmt.Errorf("get shard store of storage %v: %w", t.storageID, err) - } - - mutex, err := reqbuilder.NewBuilder(). - // 保护解码出来的Object数据 - Shard().Buzy(t.storageID). - MutexLock(ctx.distlock) - if err != nil { - return fmt.Errorf("acquiring distlock: %w", err) - } - defer mutex.Unlock() - - coorCli, err := stgglb.CoordinatorMQPool.Acquire() - if err != nil { - return fmt.Errorf("new coordinator client: %w", err) - } - defer stgglb.CoordinatorMQPool.Release(coorCli) - - // TODO 可以考虑优化,比如rep类型的直接pin就可以 - objIter := ctx.downloader.DownloadPackage(t.packageID) - defer objIter.Close() - - for { - obj, err := objIter.MoveNext() - if err != nil { - if err == iterator.ErrNoMoreItem { - break - } - return err - } - defer obj.File.Close() - - _, err = store.Create(obj.File) - if err != nil { - return fmt.Errorf("writing to store: %w", err) - } - - ctx.accessStat.AddAccessCounter(obj.Object.ObjectID, t.packageID, t.storageID, 1) - } - - _, err = coorCli.CachePackageMoved(coormq.NewCachePackageMoved(t.packageID, t.storageID)) - if err != nil { - return fmt.Errorf("request to coordinator: %w", err) - } - - return nil -} -*/ diff --git a/hub/internal/task/create_package.go b/hub/internal/task/create_package.go deleted file mode 100644 index 906de3f..0000000 --- a/hub/internal/task/create_package.go +++ /dev/null @@ -1,150 +0,0 @@ -package task - -/* -import ( - "fmt" - "path/filepath" - "time" - - "github.com/samber/lo" - "gitlink.org.cn/cloudream/common/pkgs/logger" - "gitlink.org.cn/cloudream/common/pkgs/task" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" - "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/iterator" - "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/coordinator" -) - -// CreatePackageResult 定义创建包的结果结构 -// 包含包的ID和上传的对象列表 -type CreatePackageResult struct { - PackageID cdssdk.PackageID - Objects []cdssdk.Object -} - -// CreatePackage 定义创建包的任务结构 -// 包含用户ID、存储桶ID、包名称、上传对象的迭代器、节点亲和性以及任务结果 -type CreatePackage struct { - userID cdssdk.UserID - bucketID cdssdk.BucketID - name string - objIter iterator.UploadingObjectIterator - stgAffinity cdssdk.StorageID - Result CreatePackageResult -} - -// NewCreatePackage 创建一个新的CreatePackage实例 -// userID: 用户ID -// bucketID: 存储桶ID -// name: 包名称 -// objIter: 上传对象的迭代器 -// stgAffinity: 节点亲和性,指定包应该创建在哪个节点上(可选) -// 返回CreatePackage实例的指针 -func NewCreatePackage(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string, objIter iterator.UploadingObjectIterator, stgAffinity cdssdk.StorageID) *CreatePackage { - return &CreatePackage{ - userID: userID, - bucketID: bucketID, - name: name, - objIter: objIter, - stgAffinity: stgAffinity, - } -} - -// Execute 执行创建包的任务 -// task: 任务实例,携带任务上下文 -// ctx: 任务上下文,包含分布式锁和网络连接性等信息 -// complete: 任务完成的回调函数 -func (t *CreatePackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { - // 获取任务日志记录器 - log := logger.WithType[CreatePackage]("Task") - - log.Debugf("begin") - defer log.Debugf("end") - - // 从MQ池中获取协调器客户端 - coorCli, err := stgglb.CoordinatorMQPool.Acquire() - if err != nil { - err = fmt.Errorf("new coordinator client: %w", err) - log.Warn(err.Error()) - // 完成任务并设置移除延迟 - complete(err, CompleteOption{ - RemovingDelay: time.Minute, - }) - return - } - defer stgglb.CoordinatorMQPool.Release(coorCli) - - // 向协调器创建包 - createResp, err := coorCli.CreatePackage(coordinator.NewCreatePackage(t.userID, t.bucketID, t.name)) - if err != nil { - err = fmt.Errorf("creating package: %w", err) - log.Error(err.Error()) - // 完成任务并设置移除延迟 - complete(err, CompleteOption{ - RemovingDelay: time.Minute, - }) - return - } - - up, err := ctx.uploader.BeginUpdate(t.userID, createResp.Package.PackageID, t.stgAffinity, nil, nil) - if err != nil { - err = fmt.Errorf("begin update: %w", err) - log.Error(err.Error()) - // 完成任务并设置移除延迟 - complete(err, CompleteOption{ - RemovingDelay: time.Minute, - }) - return - } - defer up.Abort() - - for { - obj, err := t.objIter.MoveNext() - if err == iterator.ErrNoMoreItem { - break - } - if err != nil { - log.Error(err.Error()) - // 完成任务并设置移除延迟 - complete(err, CompleteOption{ - RemovingDelay: time.Minute, - }) - return - } - path := filepath.ToSlash(obj.Path) - - // 上传对象 - err = up.Upload(path, obj.File) - if err != nil { - err = fmt.Errorf("uploading object: %w", err) - log.Error(err.Error()) - // 完成任务并设置移除延迟 - complete(err, CompleteOption{ - RemovingDelay: time.Minute, - }) - return - } - - } - - // 结束上传 - uploadRet, err := up.Commit() - if err != nil { - err = fmt.Errorf("uploading objects: %w", err) - log.Error(err.Error()) - // 完成任务并设置移除延迟 - complete(err, CompleteOption{ - RemovingDelay: time.Minute, - }) - return - } - - t.Result.PackageID = createResp.Package.PackageID - t.Result.Objects = lo.Values(uploadRet.Objects) - - // 完成任务并设置移除延迟 - complete(nil, CompleteOption{ - RemovingDelay: time.Minute, - }) -} -*/ diff --git a/hub/internal/task/task.go b/hub/internal/task/task.go deleted file mode 100644 index 78342b1..0000000 --- a/hub/internal/task/task.go +++ /dev/null @@ -1,49 +0,0 @@ -package task - -/* -import ( - "gitlink.org.cn/cloudream/common/pkgs/distlock" - "gitlink.org.cn/cloudream/common/pkgs/task" - "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/accessstat" - "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/connectivity" - "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/downloader" - "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/hubpool" - "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/uploader" -) - -// TaskContext 定义了任务执行的上下文环境,包含分布式锁服务、IO开关和网络连接状态收集器 -type TaskContext struct { - distlock *distlock.Service - connectivity *connectivity.Collector - downloader *downloader.Downloader - accessStat *accessstat.AccessStat - stgHubs *hubpool.HubPool - uploader *uploader.Uploader -} - -// CompleteFn 类型定义了任务完成时需要执行的函数,用于设置任务的执行结果 -type CompleteFn = task.CompleteFn - -// Manager 类型代表任务管理器,用于创建、管理和调度任务 -type Manager = task.Manager[TaskContext] - -// TaskBody 类型定义了任务体,包含了任务的具体执行逻辑 -type TaskBody = task.TaskBody[TaskContext] - -// Task 类型代表一个具体的任务,包含了任务的上下文、执行体和其它相关信息 -type Task = task.Task[TaskContext] - -// CompleteOption 类型定义了任务完成时的选项,可用于定制化任务完成的处理方式 -type CompleteOption = task.CompleteOption - -func NewManager(distlock *distlock.Service, connectivity *connectivity.Collector, downloader *downloader.Downloader, accessStat *accessstat.AccessStat, stgHubs *hubpool.HubPool, uploader *uploader.Uploader) Manager { - return task.NewManager(TaskContext{ - distlock: distlock, - connectivity: connectivity, - downloader: downloader, - accessStat: accessStat, - stgHubs: stgHubs, - uploader: uploader, - }) -} -*/