From f19f274a7d243d3adbb22f7f9baf816324439c78 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Sun, 8 Oct 2023 16:46:36 +0800 Subject: [PATCH 1/4] =?UTF-8?q?Load=E4=B9=8B=E5=90=8E=E8=BF=94=E5=9B=9E?= =?UTF-8?q?=E6=96=87=E4=BB=B6=E8=B7=AF=E5=BE=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- agent/internal/services/mq/storage.go | 12 +++-- agent/internal/task/download_package.go | 27 ----------- agent/internal/task/storage_load_package.go | 29 ++++++++++++ client/internal/cmdline/storage.go | 3 +- client/internal/http/storage.go | 47 +++++++++++++++++++- client/internal/services/storage.go | 23 ++++++++-- client/internal/task/storage_load_package.go | 4 ++ common/pkgs/mq/agent/storage.go | 4 +- 8 files changed, 111 insertions(+), 38 deletions(-) delete mode 100644 agent/internal/task/download_package.go create mode 100644 agent/internal/task/storage_load_package.go diff --git a/agent/internal/services/mq/storage.go b/agent/internal/services/mq/storage.go index c7399d6..0704613 100644 --- a/agent/internal/services/mq/storage.go +++ b/agent/internal/services/mq/storage.go @@ -45,7 +45,7 @@ func (svc *Service) StartStorageLoadPackage(msg *agtmq.StartStorageLoadPackage) return nil, mq.Failed(errorcode.OperationFailed, "create output directory failed") } - tsk := svc.taskManager.StartNew(mytask.NewDownloadPackage(msg.UserID, msg.PackageID, outputDirPath)) + tsk := svc.taskManager.StartNew(mytask.NewStorageLoadPackage(msg.UserID, msg.PackageID, outputDirPath)) return mq.ReplyOK(agtmq.NewStartStorageLoadPackageResp(tsk.ID())) } @@ -65,7 +65,9 @@ func (svc *Service) WaitStorageLoadPackage(msg *agtmq.WaitStorageLoadPackage) (* errMsg = tsk.Error().Error() } - return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg)) + loadTsk := tsk.Body().(*mytask.StorageLoadPackage) + + return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg, loadTsk.FullPath)) } else { if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs)) { @@ -75,10 +77,12 @@ func (svc *Service) WaitStorageLoadPackage(msg *agtmq.WaitStorageLoadPackage) (* errMsg = tsk.Error().Error() } - return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg)) + loadTsk := tsk.Body().(*mytask.StorageLoadPackage) + + return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg, loadTsk.FullPath)) } - return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(false, "")) + return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(false, "", "")) } } diff --git a/agent/internal/task/download_package.go b/agent/internal/task/download_package.go deleted file mode 100644 index 9e9c275..0000000 --- a/agent/internal/task/download_package.go +++ /dev/null @@ -1,27 +0,0 @@ -package task - -import ( - "time" - - "gitlink.org.cn/cloudream/common/pkgs/task" - "gitlink.org.cn/cloudream/storage/common/pkgs/cmd" -) - -type DownloadPackage struct { - cmd *cmd.DownloadPackage -} - -func NewDownloadPackage(userID int64, packageID int64, outputPath string) *DownloadPackage { - return &DownloadPackage{ - cmd: cmd.NewDownloadPackage(userID, packageID, outputPath), - } -} -func (t *DownloadPackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { - err := t.cmd.Execute(&cmd.DownloadPackageContext{ - Distlock: ctx.distlock, - }) - - complete(err, CompleteOption{ - RemovingDelay: time.Minute, - }) -} diff --git a/agent/internal/task/storage_load_package.go b/agent/internal/task/storage_load_package.go new file mode 100644 index 0000000..ef7da0f --- /dev/null +++ b/agent/internal/task/storage_load_package.go @@ -0,0 +1,29 @@ +package task + +import ( + "time" + + "gitlink.org.cn/cloudream/common/pkgs/task" + "gitlink.org.cn/cloudream/storage/common/pkgs/cmd" +) + +type StorageLoadPackage struct { + cmd *cmd.DownloadPackage + FullPath string +} + +func NewStorageLoadPackage(userID int64, packageID int64, outputPath string) *StorageLoadPackage { + return &StorageLoadPackage{ + cmd: cmd.NewDownloadPackage(userID, packageID, outputPath), + FullPath: outputPath, + } +} +func (t *StorageLoadPackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { + err := t.cmd.Execute(&cmd.DownloadPackageContext{ + Distlock: ctx.distlock, + }) + + complete(err, CompleteOption{ + RemovingDelay: time.Minute, + }) +} diff --git a/client/internal/cmdline/storage.go b/client/internal/cmdline/storage.go index f727068..22059b9 100644 --- a/client/internal/cmdline/storage.go +++ b/client/internal/cmdline/storage.go @@ -14,12 +14,13 @@ func StorageLoadPackage(ctx CommandContext, packageID int64, storageID int64) er } for { - complete, err := ctx.Cmdline.Svc.StorageSvc().WaitStorageLoadPackage(taskID, time.Second*10) + complete, fullPath, err := ctx.Cmdline.Svc.StorageSvc().WaitStorageLoadPackage(taskID, time.Second*10) if complete { if err != nil { return fmt.Errorf("moving complete with: %w", err) } + fmt.Printf("Load To: %s\n", fullPath) return nil } diff --git a/client/internal/http/storage.go b/client/internal/http/storage.go index d16ff68..f707387 100644 --- a/client/internal/http/storage.go +++ b/client/internal/http/storage.go @@ -26,6 +26,10 @@ type StorageLoadPackageReq struct { StorageID *int64 `json:"storageID" binding:"required"` } +type StorageLoadPackageResp struct { + stgsdk.StorageLoadPackageResp +} + func (s *StorageService) LoadPackage(ctx *gin.Context) { log := logger.WithField("HTTP", "Storage.LoadPackage") @@ -44,7 +48,7 @@ func (s *StorageService) LoadPackage(ctx *gin.Context) { } for { - complete, err := s.svc.StorageSvc().WaitStorageLoadPackage(taskID, time.Second*10) + complete, fullPath, err := s.svc.StorageSvc().WaitStorageLoadPackage(taskID, time.Second*10) if complete { if err != nil { log.Warnf("loading complete with: %s", err.Error()) @@ -52,7 +56,11 @@ func (s *StorageService) LoadPackage(ctx *gin.Context) { return } - ctx.JSON(http.StatusOK, OK(nil)) + ctx.JSON(http.StatusOK, OK(StorageLoadPackageResp{ + StorageLoadPackageResp: stgsdk.StorageLoadPackageResp{ + FullPath: fullPath, + }, + })) return } @@ -118,3 +126,38 @@ func (s *StorageService) CreatePackage(ctx *gin.Context) { } } } + +type StorageGetInfoReq struct { + UserID *int64 `json:"userID" binding:"required"` + StorageID *int64 `json:"storageID" binding:"required"` +} + +type StorageGetInfoResp struct { + stgsdk.StorageGetInfoResp +} + +func (s *StorageService) GetInfo(ctx *gin.Context) { + log := logger.WithField("HTTP", "Storage.GetInfo") + + var req StorageGetInfoReq + if err := ctx.ShouldBindJSON(&req); err != nil { + log.Warnf("binding body: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + info, err := s.svc.StorageSvc().GetInfo(*req.UserID, *req.StorageID) + if err != nil { + log.Warnf("getting info: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get storage inf failed")) + return + } + + ctx.JSON(http.StatusOK, OK(StorageGetInfoResp{ + StorageGetInfoResp: stgsdk.StorageGetInfoResp{ + Name: info.Name, + NodeID: info.NodeID, + Directory: info.Directory, + }, + })) +} diff --git a/client/internal/services/storage.go b/client/internal/services/storage.go index d5514a8..89a075f 100644 --- a/client/internal/services/storage.go +++ b/client/internal/services/storage.go @@ -8,6 +8,7 @@ import ( "gitlink.org.cn/cloudream/storage/client/internal/task" stgglb "gitlink.org.cn/cloudream/storage/common/globals" + "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) @@ -25,12 +26,13 @@ func (svc *StorageService) StartStorageLoadPackage(userID int64, packageID int64 return tsk.ID(), nil } -func (svc *StorageService) WaitStorageLoadPackage(taskID string, waitTimeout time.Duration) (bool, error) { +func (svc *StorageService) WaitStorageLoadPackage(taskID string, waitTimeout time.Duration) (bool, string, error) { tsk := svc.TaskMgr.FindByID(taskID) if tsk.WaitTimeout(waitTimeout) { - return true, tsk.Error() + loadTsk := tsk.Body().(*task.StorageLoadPackage) + return true, loadTsk.ResultFullPath, tsk.Error() } - return false, nil + return false, "", nil } func (svc *StorageService) DeleteStoragePackage(userID int64, packageID int64, storageID int64) error { @@ -89,3 +91,18 @@ func (svc *StorageService) WaitStorageCreatePackage(nodeID int64, taskID string, return true, waitResp.PackageID, nil } + +func (svc *StorageService) GetInfo(userID int64, storageID int64) (*model.Storage, error) { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + getResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(userID, storageID)) + if err != nil { + return nil, fmt.Errorf("request to coordinator: %w", err) + } + + return &getResp.Storage, nil +} diff --git a/client/internal/task/storage_load_package.go b/client/internal/task/storage_load_package.go index 4c42b15..42e9a77 100644 --- a/client/internal/task/storage_load_package.go +++ b/client/internal/task/storage_load_package.go @@ -11,10 +11,13 @@ import ( coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) +// TODO 可以考虑不用Task来实现这些逻辑 type StorageLoadPackage struct { userID int64 packageID int64 storageID int64 + + ResultFullPath string } func NewStorageLoadPackage(userID int64, packageID int64, storageID int64) *StorageLoadPackage { @@ -97,6 +100,7 @@ func (t *StorageLoadPackage) do(ctx TaskContext) error { return fmt.Errorf("agent loading package: %s", waitResp.Error) } + t.ResultFullPath = waitResp.FullPath break } } diff --git a/common/pkgs/mq/agent/storage.go b/common/pkgs/mq/agent/storage.go index d5f692b..a7187d2 100644 --- a/common/pkgs/mq/agent/storage.go +++ b/common/pkgs/mq/agent/storage.go @@ -61,6 +61,7 @@ type WaitStorageLoadPackageResp struct { mq.MessageBodyBase IsComplete bool `json:"isComplete"` Error string `json:"error"` + FullPath string `json:"fullPath"` } func NewWaitStorageLoadPackage(taskID string, waitTimeoutMs int64) *WaitStorageLoadPackage { @@ -69,10 +70,11 @@ func NewWaitStorageLoadPackage(taskID string, waitTimeoutMs int64) *WaitStorageL WaitTimeoutMs: waitTimeoutMs, } } -func NewWaitStorageLoadPackageResp(isComplete bool, err string) *WaitStorageLoadPackageResp { +func NewWaitStorageLoadPackageResp(isComplete bool, err string, fullPath string) *WaitStorageLoadPackageResp { return &WaitStorageLoadPackageResp{ IsComplete: isComplete, Error: err, + FullPath: fullPath, } } func (client *Client) WaitStorageLoadPackage(msg *WaitStorageLoadPackage, opts ...mq.RequestOption) (*WaitStorageLoadPackageResp, error) { From 221dbc9e89fae8c6f5319fc780e336540a019e61 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Sun, 8 Oct 2023 17:00:48 +0800 Subject: [PATCH 2/4] =?UTF-8?q?=E8=B0=83=E6=95=B4Load=E6=93=8D=E4=BD=9C?= =?UTF-8?q?=E7=9A=84=E4=BA=A7=E7=94=9F=E7=9A=84=E7=9B=AE=E5=BD=95=E7=9A=84?= =?UTF-8?q?=E7=BB=93=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- agent/internal/config/config.go | 1 - agent/internal/services/mq/storage.go | 13 +++++-------- common/assets/confs/agent.config.json | 1 - common/utils/utils.go | 9 +++++---- 4 files changed, 10 insertions(+), 14 deletions(-) diff --git a/agent/internal/config/config.go b/agent/internal/config/config.go index bbf93e1..f699534 100644 --- a/agent/internal/config/config.go +++ b/agent/internal/config/config.go @@ -15,7 +15,6 @@ type Config struct { Local stgmodels.LocalMachineInfo `json:"local"` GRPC *grpc.Config `json:"grpc"` ECPacketSize int64 `json:"ecPacketSize"` - StorageBaseDir string `json:"storageBaseDir"` TempFileLifetime int `json:"tempFileLifetime"` // temp状态的副本最多能保持多久时间,单位:秒 Logger log.Config `json:"logger"` RabbitMQ stgmq.Config `json:"rabbitMQ"` diff --git a/agent/internal/services/mq/storage.go b/agent/internal/services/mq/storage.go index 0704613..1941106 100644 --- a/agent/internal/services/mq/storage.go +++ b/agent/internal/services/mq/storage.go @@ -10,7 +10,6 @@ import ( "gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" - "gitlink.org.cn/cloudream/storage/agent/internal/config" mytask "gitlink.org.cn/cloudream/storage/agent/internal/task" "gitlink.org.cn/cloudream/storage/common/consts" stgglb "gitlink.org.cn/cloudream/storage/common/globals" @@ -37,7 +36,7 @@ func (svc *Service) StartStorageLoadPackage(msg *agtmq.StartStorageLoadPackage) return nil, mq.Failed(errorcode.OperationFailed, "get storage info failed") } - outputDirPath := filepath.Join(config.Cfg().StorageBaseDir, getStgResp.Directory, utils.MakeStorageLoadPackageDirName(msg.PackageID, msg.UserID)) + outputDirPath := utils.MakeStorageLoadPackagePath(getStgResp.Directory, msg.UserID, msg.PackageID) if err = os.MkdirAll(outputDirPath, 0755); err != nil { logger.WithField("StorageID", msg.StorageID). Warnf("creating output directory: %s", err.Error()) @@ -87,9 +86,7 @@ func (svc *Service) WaitStorageLoadPackage(msg *agtmq.WaitStorageLoadPackage) (* } func (svc *Service) StorageCheck(msg *agtmq.StorageCheck) (*agtmq.StorageCheckResp, *mq.CodeMessage) { - dirFullPath := filepath.Join(config.Cfg().StorageBaseDir, msg.Directory) - - infos, err := os.ReadDir(dirFullPath) + infos, err := os.ReadDir(msg.Directory) if err != nil { logger.Warnf("list storage directory failed, err: %s", err.Error()) return mq.ReplyOK(agtmq.NewStorageCheckResp( @@ -115,7 +112,7 @@ func (svc *Service) checkStorageIncrement(msg *agtmq.StorageCheck, dirInfos []fs var entries []agtmq.StorageCheckRespEntry for _, obj := range msg.Packages { - dirName := utils.MakeStorageLoadPackageDirName(obj.PackageID, obj.UserID) + dirName := utils.MakeStorageLoadPackagePath(msg.Directory, obj.UserID, obj.PackageID) _, ok := infosMap[dirName] if ok { @@ -143,7 +140,7 @@ func (svc *Service) checkStorageComplete(msg *agtmq.StorageCheck, dirInfos []fs. var entries []agtmq.StorageCheckRespEntry for _, obj := range msg.Packages { - dirName := utils.MakeStorageLoadPackageDirName(obj.PackageID, obj.UserID) + dirName := utils.MakeStorageLoadPackagePath(msg.Directory, obj.UserID, obj.PackageID) _, ok := infosMap[dirName] if ok { @@ -177,7 +174,7 @@ func (svc *Service) StartStorageCreatePackage(msg *agtmq.StartStorageCreatePacka return nil, mq.Failed(errorcode.OperationFailed, "get storage info failed") } - fullPath := filepath.Clean(filepath.Join(config.Cfg().StorageBaseDir, getStgResp.Directory, msg.Path)) + fullPath := filepath.Clean(filepath.Join(getStgResp.Directory, msg.Path)) var uploadFilePathes []string err = filepath.WalkDir(fullPath, func(fname string, fi os.DirEntry, err error) error { diff --git a/common/assets/confs/agent.config.json b/common/assets/confs/agent.config.json index bff9b8f..34cc6a1 100644 --- a/common/assets/confs/agent.config.json +++ b/common/assets/confs/agent.config.json @@ -10,7 +10,6 @@ "port": 5010 }, "ecPacketSize": 10, - "storageBaseDir": ".", "tempFileLifetime": 3600, "logger": { "output": "file", diff --git a/common/utils/utils.go b/common/utils/utils.go index 3639aa9..fc93d42 100644 --- a/common/utils/utils.go +++ b/common/utils/utils.go @@ -1,10 +1,11 @@ package utils import ( - "fmt" + "path/filepath" + "strconv" ) -// MakeStorageLoadPackageDirName Load操作时,写入的文件夹的名称 -func MakeStorageLoadPackageDirName(packageID int64, userID int64) string { - return fmt.Sprintf("%d-%d", packageID, userID) +// MakeStorageLoadPackagePath Load操作时,写入的文件夹的名称 +func MakeStorageLoadPackagePath(stgDir string, userID int64, packageID int64) string { + return filepath.Join(stgDir, strconv.FormatInt(userID, 10), "packages", strconv.FormatInt(packageID, 10)) } From 4d642874216a67a193620d1332e1f6b72288d219 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Tue, 10 Oct 2023 17:17:04 +0800 Subject: [PATCH 3/4] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E8=8E=B7=E5=8F=96package?= =?UTF-8?q?=E9=87=8C=E6=89=80=E6=9C=89object=E7=9A=84filehash=E7=9A=84?= =?UTF-8?q?=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- agent/internal/task/cache_move_package.go | 2 +- client/internal/http/cacah.go | 27 ++++++++++++++++++++ client/internal/http/package.go | 29 +++++++++++++++++++++ client/internal/http/server.go | 2 ++ client/internal/services/cacah.go | 16 ++++++++++++ client/internal/services/object.go | 24 +++++++++++++++++- client/internal/services/package.go | 15 +++++++++++ common/pkgs/db/model/model.go | 17 +++---------- common/pkgs/db/object_rep.go | 22 ++++++++++++++++ common/pkgs/mq/coordinator/cache.go | 31 +++++++++++++++++++++++ common/pkgs/mq/coordinator/object.go | 31 +++++++++++++++++++++++ common/pkgs/mq/coordinator/package.go | 30 ---------------------- coordinator/internal/services/object.go | 25 ++++++++++++++++++ 13 files changed, 226 insertions(+), 45 deletions(-) diff --git a/agent/internal/task/cache_move_package.go b/agent/internal/task/cache_move_package.go index c7c121b..582855d 100644 --- a/agent/internal/task/cache_move_package.go +++ b/agent/internal/task/cache_move_package.go @@ -96,7 +96,7 @@ func (t *CacheMovePackage) moveRep(ctx TaskContext, coorCli *coormq.Client, pkg } fileHashes = append(fileHashes, rep.FileHash) - t.ResultCacheInfos = append(t.ResultCacheInfos, stgsdk.NewObjectCacheInfo(rep.Object.ObjectID, rep.FileHash)) + t.ResultCacheInfos = append(t.ResultCacheInfos, stgsdk.NewObjectCacheInfo(rep.Object, rep.FileHash)) } _, err = coorCli.CachePackageMoved(coormq.NewCachePackageMoved(pkg.PackageID, *stgglb.Local.NodeID, fileHashes)) diff --git a/client/internal/http/cacah.go b/client/internal/http/cacah.go index c196751..5466399 100644 --- a/client/internal/http/cacah.go +++ b/client/internal/http/cacah.go @@ -68,3 +68,30 @@ func (s *CacheService) MovePackage(ctx *gin.Context) { } } } + +type CacheGetPackageObjectCacheInfosReq struct { + UserID *int64 `form:"userID" binding:"required"` + PackageID *int64 `form:"packageID" binding:"required"` +} + +type CacheGetPackageObjectCacheInfosResp = stgsdk.GetPackageObjectCacheInfosResp + +func (s *CacheService) GetPackageObjectCacheInfos(ctx *gin.Context) { + log := logger.WithField("HTTP", "Cache.GetPackageObjectCacheInfos") + + var req CacheGetPackageObjectCacheInfosReq + if err := ctx.ShouldBindQuery(&req); err != nil { + log.Warnf("binding body: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + infos, err := s.svc.CacheSvc().GetPackageObjectCacheInfos(*req.UserID, *req.PackageID) + if err != nil { + log.Warnf("getting package object cache infos: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get package object cache infos failed")) + return + } + + ctx.JSON(http.StatusOK, OK(CacheGetPackageObjectCacheInfosResp{Infos: infos})) +} diff --git a/client/internal/http/package.go b/client/internal/http/package.go index 5a86f48..5d3415a 100644 --- a/client/internal/http/package.go +++ b/client/internal/http/package.go @@ -11,6 +11,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" stgsdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" stgiter "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" ) @@ -24,6 +25,34 @@ func (s *Server) PackageSvc() *PackageService { } } +type PackageGetReq struct { + UserID *int64 `form:"userID" binding:"required"` + PackageID *int64 `form:"packageID" binding:"required"` +} +type PackageGetResp struct { + model.Package +} + +func (s *PackageService) Get(ctx *gin.Context) { + log := logger.WithField("HTTP", "Package.Get") + + var req PackageGetReq + if err := ctx.ShouldBindQuery(&req); err != nil { + log.Warnf("binding body: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + pkg, err := s.svc.PackageSvc().Get(*req.UserID, *req.PackageID) + if err != nil { + log.Warnf("getting package: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get package failed")) + return + } + + ctx.JSON(http.StatusOK, OK(PackageGetResp{Package: *pkg})) +} + type PackageUploadReq struct { Info PackageUploadInfo `form:"info" binding:"required"` Files []*multipart.FileHeader `form:"files"` diff --git a/client/internal/http/server.go b/client/internal/http/server.go index 9d1abc7..18b1c9a 100644 --- a/client/internal/http/server.go +++ b/client/internal/http/server.go @@ -40,6 +40,7 @@ func (s *Server) Serve() error { func (s *Server) initRouters() { s.engine.GET("/object/download", s.ObjectSvc().Download) + s.engine.GET("/package/get", s.PackageSvc().Get) s.engine.POST("/package/upload", s.PackageSvc().Upload) s.engine.POST("/package/delete", s.PackageSvc().Delete) s.engine.GET("/package/getCachedNodes", s.PackageSvc().GetCachedNodes) @@ -49,4 +50,5 @@ func (s *Server) initRouters() { s.engine.POST("/storage/createPackage", s.StorageSvc().CreatePackage) s.engine.POST("/cache/movePackage", s.CacheSvc().MovePackage) + s.engine.GET("/cache/getPackageObjectCacheInfos", s.CacheSvc().GetPackageObjectCacheInfos) } diff --git a/client/internal/services/cacah.go b/client/internal/services/cacah.go index 6b624cd..c7aa3a8 100644 --- a/client/internal/services/cacah.go +++ b/client/internal/services/cacah.go @@ -8,6 +8,7 @@ import ( stgglb "gitlink.org.cn/cloudream/storage/common/globals" agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" + coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) type CacheService struct { @@ -55,3 +56,18 @@ func (svc *CacheService) WaitCacheMovePackage(nodeID int64, taskID string, waitT return true, waitResp.CacheInfos, nil } + +func (svc *CacheService) GetPackageObjectCacheInfos(userID int64, packageID int64) ([]stgsdk.ObjectCacheInfo, error) { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + getResp, err := coorCli.GetPackageObjectCacheInfos(coormq.NewGetPackageObjectCacheInfos(userID, packageID)) + if err != nil { + return nil, fmt.Errorf("requesting to coodinator: %w", err) + } + + return getResp.Infos, nil +} diff --git a/client/internal/services/object.go b/client/internal/services/object.go index 94c6de7..220fecb 100644 --- a/client/internal/services/object.go +++ b/client/internal/services/object.go @@ -1,6 +1,13 @@ package services -import "io" +import ( + "fmt" + "io" + + stgglb "gitlink.org.cn/cloudream/storage/common/globals" + "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" + coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" +) type ObjectService struct { *Service @@ -13,3 +20,18 @@ func (svc *Service) ObjectSvc() *ObjectService { func (svc *ObjectService) Download(userID int64, objectID int64) (io.ReadCloser, error) { panic("not implement yet!") } + +func (svc *ObjectService) GetPackageObjects(userID int64, packageID int64) ([]model.Object, error) { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + getResp, err := coorCli.GetPackageObjects(coormq.NewGetPackageObjects(userID, packageID)) + if err != nil { + return nil, fmt.Errorf("requsting to coodinator: %w", err) + } + + return getResp.Objects, nil +} diff --git a/client/internal/services/package.go b/client/internal/services/package.go index bfc851f..eae76f6 100644 --- a/client/internal/services/package.go +++ b/client/internal/services/package.go @@ -23,6 +23,21 @@ func (svc *Service) PackageSvc() *PackageService { return &PackageService{Service: svc} } +func (svc *PackageService) Get(userID int64, packageID int64) (*model.Package, error) { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + getResp, err := coorCli.GetPackage(coormq.NewGetPackage(userID, packageID)) + if err != nil { + return nil, fmt.Errorf("requsting to coodinator: %w", err) + } + + return &getResp.Package, nil +} + func (svc *PackageService) DownloadPackage(userID int64, packageID int64) (iterator.DownloadingObjectIterator, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { diff --git a/common/pkgs/db/model/model.go b/common/pkgs/db/model/model.go index 7e421f1..c11af65 100644 --- a/common/pkgs/db/model/model.go +++ b/common/pkgs/db/model/model.go @@ -6,6 +6,8 @@ import ( stgsdk "gitlink.org.cn/cloudream/common/sdks/storage" ) +// TODO 可以考虑逐步迁移到stgsdk中。迁移思路:数据对象应该包含的字段都迁移到stgsdk中,内部使用的一些特殊字段则留在这里 + type Node struct { NodeID int64 `db:"NodeID" json:"nodeID"` Name string `db:"Name" json:"name"` @@ -56,20 +58,9 @@ type Bucket struct { CreatorID int64 `db:"CreatorID" json:"creatorID"` } -type Package struct { - PackageID int64 `db:"PackageID" json:"packageID"` - Name string `db:"Name" json:"name"` - BucketID int64 `db:"BucketID" json:"bucketID"` - State string `db:"State" json:"state"` - Redundancy stgsdk.TypedRedundancyInfo `db:"Redundancy" json:"redundancy"` -} +type Package = stgsdk.Package -type Object struct { - ObjectID int64 `db:"ObjectID" json:"objectID"` - PackageID int64 `db:"PackageID" json:"packageID"` - Path string `db:"Path" json:"path"` - Size int64 `db:"Size" json:"size,string"` -} +type Object = stgsdk.Object type ObjectRep struct { ObjectID int64 `db:"ObjectID" json:"objectID"` diff --git a/common/pkgs/db/object_rep.go b/common/pkgs/db/object_rep.go index a3ae054..322db6f 100644 --- a/common/pkgs/db/object_rep.go +++ b/common/pkgs/db/object_rep.go @@ -7,6 +7,7 @@ import ( "strings" "github.com/jmoiron/sqlx" + stgsdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/common/consts" stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" @@ -116,6 +117,27 @@ func (db *ObjectRepDB) GetWithNodeIDInPackage(ctx SQLContext, packageID int64) ( return rets, nil } +func (db *ObjectRepDB) GetPackageObjectCacheInfos(ctx SQLContext, packageID int64) ([]stgsdk.ObjectCacheInfo, error) { + var tmpRet []struct { + stgsdk.Object + FileHash string `db:"FileHash"` + } + + err := sqlx.Select(ctx, &tmpRet, "select Object.*, ObjectRep.FileHash from Object"+ + " left join ObjectRep on Object.ObjectID = ObjectRep.ObjectID"+ + " where Object.PackageID = ? order by Object.ObjectID asc", packageID) + if err != nil { + return nil, err + } + + ret := make([]stgsdk.ObjectCacheInfo, len(tmpRet)) + for i, r := range tmpRet { + ret[i] = stgsdk.NewObjectCacheInfo(r.Object, r.FileHash) + } + + return ret, nil +} + // 按逗号切割字符串,并将每一个部分解析为一个int64的ID。 // 注:需要外部保证分隔的每一个部分都是正确的10进制数字格式 func splitIDStringUnsafe(idStr string) []int64 { diff --git a/common/pkgs/mq/coordinator/cache.go b/common/pkgs/mq/coordinator/cache.go index 4a68a53..4991b3f 100644 --- a/common/pkgs/mq/coordinator/cache.go +++ b/common/pkgs/mq/coordinator/cache.go @@ -2,10 +2,13 @@ package coordinator import ( "gitlink.org.cn/cloudream/common/pkgs/mq" + stgsdk "gitlink.org.cn/cloudream/common/sdks/storage" ) type CacheService interface { CachePackageMoved(msg *CachePackageMoved) (*CachePackageMovedResp, *mq.CodeMessage) + + GetPackageObjectCacheInfos(msg *GetPackageObjectCacheInfos) (*GetPackageObjectCacheInfosResp, *mq.CodeMessage) } // Package的Object移动到了节点的Cache中 @@ -34,3 +37,31 @@ func NewCachePackageMovedResp() *CachePackageMovedResp { func (client *Client) CachePackageMoved(msg *CachePackageMoved) (*CachePackageMovedResp, error) { return mq.Request(Service.CachePackageMoved, client.rabbitCli, msg) } + +// 获取Package中所有Object的FileHash +var _ = Register(Service.GetPackageObjectCacheInfos) + +type GetPackageObjectCacheInfos struct { + mq.MessageBodyBase + UserID int64 `json:"userID"` + PackageID int64 `json:"packageID"` +} +type GetPackageObjectCacheInfosResp struct { + mq.MessageBodyBase + Infos []stgsdk.ObjectCacheInfo +} + +func NewGetPackageObjectCacheInfos(userID int64, packageID int64) *GetPackageObjectCacheInfos { + return &GetPackageObjectCacheInfos{ + UserID: userID, + PackageID: packageID, + } +} +func NewGetPackageObjectCacheInfosResp(infos []stgsdk.ObjectCacheInfo) *GetPackageObjectCacheInfosResp { + return &GetPackageObjectCacheInfosResp{ + Infos: infos, + } +} +func (client *Client) GetPackageObjectCacheInfos(msg *GetPackageObjectCacheInfos) (*GetPackageObjectCacheInfosResp, error) { + return mq.Request(Service.GetPackageObjectCacheInfos, client.rabbitCli, msg) +} diff --git a/common/pkgs/mq/coordinator/object.go b/common/pkgs/mq/coordinator/object.go index ae04e68..29eac75 100644 --- a/common/pkgs/mq/coordinator/object.go +++ b/common/pkgs/mq/coordinator/object.go @@ -4,14 +4,45 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/mq" stgmod "gitlink.org.cn/cloudream/storage/common/models" + "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" ) type ObjectService interface { + GetPackageObjects(msg *GetPackageObjects) (*GetPackageObjectsResp, *mq.CodeMessage) + GetPackageObjectRepData(msg *GetPackageObjectRepData) (*GetPackageObjectRepDataResp, *mq.CodeMessage) GetPackageObjectECData(msg *GetPackageObjectECData) (*GetPackageObjectECDataResp, *mq.CodeMessage) } +// 查询Package中的所有Object,返回的Objects会按照ObjectID升序 +var _ = Register(Service.GetPackageObjects) + +type GetPackageObjects struct { + mq.MessageBodyBase + UserID int64 `json:"userID"` + PackageID int64 `json:"packageID"` +} +type GetPackageObjectsResp struct { + mq.MessageBodyBase + Objects []model.Object `json:"objects"` +} + +func NewGetPackageObjects(userID int64, packageID int64) *GetPackageObjects { + return &GetPackageObjects{ + UserID: userID, + PackageID: packageID, + } +} +func NewGetPackageObjectsResp(objects []model.Object) *GetPackageObjectsResp { + return &GetPackageObjectsResp{ + Objects: objects, + } +} +func (client *Client) GetPackageObjects(msg *GetPackageObjects) (*GetPackageObjectsResp, error) { + return mq.Request(Service.GetPackageObjects, client.rabbitCli, msg) +} + // 获取指定Object的Rep数据,返回的Objects会按照ObjectID升序 var _ = Register(Service.GetPackageObjectRepData) diff --git a/common/pkgs/mq/coordinator/package.go b/common/pkgs/mq/coordinator/package.go index 70b0999..2e85bad 100644 --- a/common/pkgs/mq/coordinator/package.go +++ b/common/pkgs/mq/coordinator/package.go @@ -10,8 +10,6 @@ import ( type PackageService interface { GetPackage(msg *GetPackage) (*GetPackageResp, *mq.CodeMessage) - GetPackageObjects(msg *GetPackageObjects) (*GetPackageObjectsResp, *mq.CodeMessage) - CreatePackage(msg *CreatePackage) (*CreatePackageResp, *mq.CodeMessage) UpdateRepPackage(msg *UpdateRepPackage) (*UpdateRepPackageResp, *mq.CodeMessage) @@ -53,34 +51,6 @@ func (client *Client) GetPackage(msg *GetPackage) (*GetPackageResp, error) { return mq.Request(Service.GetPackage, client.rabbitCli, msg) } -// 查询Package中的所有Object,返回的Objects会按照ObjectID升序 -var _ = Register(Service.GetPackageObjects) - -type GetPackageObjects struct { - mq.MessageBodyBase - UserID int64 `json:"userID"` - PackageID int64 `json:"packageID"` -} -type GetPackageObjectsResp struct { - mq.MessageBodyBase - Objects []model.Object `json:"objects"` -} - -func NewGetPackageObjects(userID int64, packageID int64) *GetPackageObjects { - return &GetPackageObjects{ - UserID: userID, - PackageID: packageID, - } -} -func NewGetPackageObjectsResp(objects []model.Object) *GetPackageObjectsResp { - return &GetPackageObjectsResp{ - Objects: objects, - } -} -func (client *Client) GetPackageObjects(msg *GetPackageObjects) (*GetPackageObjectsResp, error) { - return mq.Request(Service.GetPackageObjects, client.rabbitCli, msg) -} - // 创建一个Package var _ = Register(Service.CreatePackage) diff --git a/coordinator/internal/services/object.go b/coordinator/internal/services/object.go index 6cd5dbc..30a46bc 100644 --- a/coordinator/internal/services/object.go +++ b/coordinator/internal/services/object.go @@ -7,6 +7,31 @@ import ( coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) +func (svc *Service) GetPackageObjectCacheInfos(msg *coormq.GetPackageObjectCacheInfos) (*coormq.GetPackageObjectCacheInfosResp, *mq.CodeMessage) { + pkg, err := svc.db.Package().GetUserPackage(svc.db.SQLCtx(), msg.UserID, msg.PackageID) + if err != nil { + logger.WithField("PackageID", msg.PackageID). + Warnf("getting package: %s", err.Error()) + + return nil, mq.Failed(errorcode.OperationFailed, "get package failed") + } + + if pkg.Redundancy.IsRepInfo() { + infos, err := svc.db.ObjectRep().GetPackageObjectCacheInfos(svc.db.SQLCtx(), msg.PackageID) + if err != nil { + logger.WithField("PackageID", msg.PackageID). + Warnf("getting rep package object cache infos: %s", err.Error()) + + return nil, mq.Failed(errorcode.OperationFailed, "get rep package object cache infos failed") + } + + return mq.ReplyOK(coormq.NewGetPackageObjectCacheInfosResp(infos)) + } + // TODO EC + + return nil, mq.Failed(errorcode.OperationFailed, "not implement yet") +} + func (svc *Service) GetPackageObjectRepData(msg *coormq.GetPackageObjectRepData) (*coormq.GetPackageObjectRepDataResp, *mq.CodeMessage) { data, err := svc.db.ObjectRep().GetWithNodeIDInPackage(svc.db.SQLCtx(), msg.PackageID) if err != nil { From a45a11742a4af9d5f24e8ef2b3b445c1e5a999f9 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Wed, 11 Oct 2023 14:59:35 +0800 Subject: [PATCH 4/4] =?UTF-8?q?=E5=88=9D=E6=AD=A5=E5=AE=8C=E6=88=90IMFS?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/internal/http/cacah.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/internal/http/cacah.go b/client/internal/http/cacah.go index 5466399..2832162 100644 --- a/client/internal/http/cacah.go +++ b/client/internal/http/cacah.go @@ -74,7 +74,7 @@ type CacheGetPackageObjectCacheInfosReq struct { PackageID *int64 `form:"packageID" binding:"required"` } -type CacheGetPackageObjectCacheInfosResp = stgsdk.GetPackageObjectCacheInfosResp +type CacheGetPackageObjectCacheInfosResp = stgsdk.CacheGetPackageObjectCacheInfosResp func (s *CacheService) GetPackageObjectCacheInfos(ctx *gin.Context) { log := logger.WithField("HTTP", "Cache.GetPackageObjectCacheInfos")