From b6f2f51f483475a5831eae7e60323857b00ec9b1 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Wed, 5 Jun 2024 15:33:45 +0800 Subject: [PATCH] =?UTF-8?q?Storage=E5=A2=9E=E5=8A=A0LocalBase=E5=92=8CRemo?= =?UTF-8?q?teBase=E5=AD=97=E6=AE=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- agent/internal/mq/storage.go | 52 +++++++++++++++---- agent/internal/task/storage_load_package.go | 13 +++-- client/internal/http/server.go | 2 +- client/internal/http/storage.go | 20 +++---- client/internal/services/storage.go | 22 +++++--- common/assets/scripts/create_database.sql | 1 + common/pkgs/db/model/model.go | 9 +--- common/pkgs/mq/agent/storage.go | 26 +++++----- common/utils/utils.go | 9 ++-- scanner/internal/event/agent_check_storage.go | 2 +- scanner/internal/event/agent_storage_gc.go | 2 +- 11 files changed, 98 insertions(+), 60 deletions(-) diff --git a/agent/internal/mq/storage.go b/agent/internal/mq/storage.go index 37780e2..7a36b92 100644 --- a/agent/internal/mq/storage.go +++ b/agent/internal/mq/storage.go @@ -46,7 +46,7 @@ func (svc *Service) WaitStorageLoadPackage(msg *agtmq.WaitStorageLoadPackage) (* loadTsk := tsk.Body().(*mytask.StorageLoadPackage) - return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg, loadTsk.FullOutputPath)) + return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg, loadTsk.PackagePath, loadTsk.LocalBase, loadTsk.RemoteBase)) } else { if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs) * time.Millisecond) { @@ -58,15 +58,33 @@ func (svc *Service) WaitStorageLoadPackage(msg *agtmq.WaitStorageLoadPackage) (* loadTsk := tsk.Body().(*mytask.StorageLoadPackage) - return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg, loadTsk.FullOutputPath)) + return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg, loadTsk.PackagePath, loadTsk.LocalBase, loadTsk.RemoteBase)) } - return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(false, "", "")) + return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(false, "", "", "", "")) } } func (svc *Service) StorageCheck(msg *agtmq.StorageCheck) (*agtmq.StorageCheckResp, *mq.CodeMessage) { - infos, err := os.ReadDir(msg.Directory) + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return mq.ReplyOK(agtmq.NewStorageCheckResp( + err.Error(), + nil, + )) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + // TODO UserID。应该设计两种接口,一种需要UserID,一种不需要。 + getStg, err := coorCli.GetStorage(coormq.ReqGetStorage(cdssdk.UserID(1), msg.StorageID)) + if err != nil { + return mq.ReplyOK(agtmq.NewStorageCheckResp( + err.Error(), + nil, + )) + } + + entries, err := os.ReadDir(utils.MakeStorageLoadDirectory(getStg.Storage.LocalBase)) if err != nil { logger.Warnf("list storage directory failed, err: %s", err.Error()) return mq.ReplyOK(agtmq.NewStorageCheckResp( @@ -77,7 +95,7 @@ func (svc *Service) StorageCheck(msg *agtmq.StorageCheck) (*agtmq.StorageCheckRe var stgPkgs []model.StoragePackage - userDirs := lo.Filter(infos, func(info fs.DirEntry, index int) bool { return info.IsDir() }) + userDirs := lo.Filter(entries, func(info fs.DirEntry, index int) bool { return info.IsDir() }) for _, dir := range userDirs { userIDInt, err := strconv.ParseInt(dir.Name(), 10, 64) if err != nil { @@ -85,7 +103,7 @@ func (svc *Service) StorageCheck(msg *agtmq.StorageCheck) (*agtmq.StorageCheckRe continue } - pkgDir := utils.MakeStorageLoadDirectory(msg.Directory, dir.Name()) + pkgDir := filepath.Join(utils.MakeStorageLoadDirectory(getStg.Storage.LocalBase), dir.Name()) pkgDirs, err := os.ReadDir(pkgDir) if err != nil { logger.Warnf("reading package dir %s: %s", pkgDir, err.Error()) @@ -111,7 +129,19 @@ func (svc *Service) StorageCheck(msg *agtmq.StorageCheck) (*agtmq.StorageCheckRe } func (svc *Service) StorageGC(msg *agtmq.StorageGC) (*agtmq.StorageGCResp, *mq.CodeMessage) { - infos, err := os.ReadDir(msg.Directory) + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return nil, mq.Failed(errorcode.OperationFailed, err.Error()) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + // TODO UserID。应该设计两种接口,一种需要UserID,一种不需要。 + getStg, err := coorCli.GetStorage(coormq.ReqGetStorage(cdssdk.UserID(1), msg.StorageID)) + if err != nil { + return nil, mq.Failed(errorcode.OperationFailed, err.Error()) + } + + entries, err := os.ReadDir(utils.MakeStorageLoadDirectory(getStg.Storage.LocalBase)) if err != nil { logger.Warnf("list storage directory failed, err: %s", err.Error()) return nil, mq.Failed(errorcode.OperationFailed, "list directory files failed") @@ -132,12 +162,12 @@ func (svc *Service) StorageGC(msg *agtmq.StorageGC) (*agtmq.StorageGCResp, *mq.C pkgs[pkgIDStr] = true } - userDirs := lo.Filter(infos, func(info fs.DirEntry, index int) bool { return info.IsDir() }) + userDirs := lo.Filter(entries, func(info fs.DirEntry, index int) bool { return info.IsDir() }) for _, dir := range userDirs { pkgMap, ok := userPkgs[dir.Name()] // 第一级目录名是UserID,先删除UserID在StoragePackage表里没出现过的文件夹 if !ok { - rmPath := filepath.Join(msg.Directory, dir.Name()) + rmPath := filepath.Join(utils.MakeStorageLoadDirectory(getStg.Storage.LocalBase), dir.Name()) err := os.RemoveAll(rmPath) if err != nil { logger.Warnf("removing user dir %s: %s", rmPath, err.Error()) @@ -147,7 +177,7 @@ func (svc *Service) StorageGC(msg *agtmq.StorageGC) (*agtmq.StorageGCResp, *mq.C continue } - pkgDir := utils.MakeStorageLoadDirectory(msg.Directory, dir.Name()) + pkgDir := filepath.Join(utils.MakeStorageLoadDirectory(getStg.Storage.LocalBase), dir.Name()) // 遍历每个UserID目录的packages目录里的内容 pkgs, err := os.ReadDir(pkgDir) if err != nil { @@ -188,7 +218,7 @@ func (svc *Service) StartStorageCreatePackage(msg *agtmq.StartStorageCreatePacka return nil, mq.Failed(errorcode.OperationFailed, "get storage info failed") } - fullPath := filepath.Clean(filepath.Join(getStgResp.Storage.Directory, msg.Path)) + fullPath := filepath.Clean(filepath.Join(getStgResp.Storage.LocalBase, msg.Path)) var uploadFilePathes []string err = filepath.WalkDir(fullPath, func(fname string, fi os.DirEntry, err error) error { diff --git a/agent/internal/task/storage_load_package.go b/agent/internal/task/storage_load_package.go index 93c8374..28f26bc 100644 --- a/agent/internal/task/storage_load_package.go +++ b/agent/internal/task/storage_load_package.go @@ -26,7 +26,9 @@ import ( ) type StorageLoadPackage struct { - FullOutputPath string + PackagePath string + LocalBase string + RemoteBase string userID cdssdk.UserID packageID cdssdk.PackageID @@ -67,11 +69,12 @@ func (t *StorageLoadPackage) do(task *task.Task[TaskContext], ctx TaskContext) e return fmt.Errorf("request to coordinator: %w", err) } - outputDirPath := utils.MakeStorageLoadPackagePath(getStgResp.Storage.Directory, t.userID, t.packageID) - if err = os.MkdirAll(outputDirPath, 0755); err != nil { + t.PackagePath = utils.MakeLoadedPackagePath(t.userID, t.packageID) + fullLocalPath := filepath.Join(getStgResp.Storage.LocalBase, t.PackagePath) + + if err = os.MkdirAll(fullLocalPath, 0755); err != nil { return fmt.Errorf("creating output directory: %w", err) } - t.FullOutputPath = outputDirPath getObjectDetails, err := coorCli.GetPackageObjectDetails(coormq.ReqGetPackageObjectDetails(t.packageID)) if err != nil { @@ -92,7 +95,7 @@ func (t *StorageLoadPackage) do(task *task.Task[TaskContext], ctx TaskContext) e defer mutex.Unlock() for _, obj := range getObjectDetails.Objects { - err := t.downloadOne(coorCli, ipfsCli, outputDirPath, obj) + err := t.downloadOne(coorCli, ipfsCli, fullLocalPath, obj) if err != nil { return err } diff --git a/client/internal/http/server.go b/client/internal/http/server.go index 2c93745..a316283 100644 --- a/client/internal/http/server.go +++ b/client/internal/http/server.go @@ -58,7 +58,7 @@ func (s *Server) initRouters() { rt.POST(cdssdk.StorageLoadPackagePath, s.Storage().LoadPackage) rt.POST(cdssdk.StorageCreatePackagePath, s.Storage().CreatePackage) - rt.GET(cdssdk.StorageGetInfoPath, s.Storage().GetInfo) + rt.GET(cdssdk.StorageGetPath, s.Storage().Get) rt.POST(cdssdk.CacheMovePackagePath, s.Cache().MovePackage) diff --git a/client/internal/http/storage.go b/client/internal/http/storage.go index c098914..7587c41 100644 --- a/client/internal/http/storage.go +++ b/client/internal/http/storage.go @@ -2,6 +2,7 @@ package http import ( "net/http" + "path/filepath" "time" "github.com/gin-gonic/gin" @@ -38,7 +39,7 @@ func (s *StorageService) LoadPackage(ctx *gin.Context) { } for { - complete, fullPath, err := s.svc.StorageSvc().WaitStorageLoadPackage(nodeID, taskID, time.Second*10) + complete, ret, err := s.svc.StorageSvc().WaitStorageLoadPackage(nodeID, taskID, time.Second*10) if complete { if err != nil { log.Warnf("loading complete with: %s", err.Error()) @@ -47,7 +48,10 @@ func (s *StorageService) LoadPackage(ctx *gin.Context) { } ctx.JSON(http.StatusOK, OK(cdssdk.StorageLoadPackageResp{ - FullPath: fullPath, + FullPath: filepath.Join(ret.RemoteBase, ret.PackagePath), + PackagePath: ret.PackagePath, + LocalBase: ret.LocalBase, + RemoteBase: ret.RemoteBase, })) return } @@ -101,10 +105,10 @@ func (s *StorageService) CreatePackage(ctx *gin.Context) { } } -func (s *StorageService) GetInfo(ctx *gin.Context) { - log := logger.WithField("HTTP", "Storage.GetInfo") +func (s *StorageService) Get(ctx *gin.Context) { + log := logger.WithField("HTTP", "Storage.Get") - var req cdssdk.StorageGetInfoReq + var req cdssdk.StorageGet if err := ctx.ShouldBindQuery(&req); err != nil { log.Warnf("binding query: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) @@ -118,9 +122,7 @@ func (s *StorageService) GetInfo(ctx *gin.Context) { return } - ctx.JSON(http.StatusOK, OK(cdssdk.StorageGetInfoResp{ - Name: info.Name, - NodeID: info.NodeID, - Directory: info.Directory, + ctx.JSON(http.StatusOK, OK(cdssdk.StorageGetResp{ + Storage: *info, })) } diff --git a/client/internal/services/storage.go b/client/internal/services/storage.go index 448c9a6..e961319 100644 --- a/client/internal/services/storage.go +++ b/client/internal/services/storage.go @@ -76,29 +76,39 @@ func (svc *StorageService) StartStorageLoadPackage(userID cdssdk.UserID, package return stgResp.Storage.NodeID, startResp.TaskID, nil } -func (svc *StorageService) WaitStorageLoadPackage(nodeID cdssdk.NodeID, taskID string, waitTimeout time.Duration) (bool, string, error) { +type StorageLoadPackageResult struct { + PackagePath string + LocalBase string + RemoteBase string +} + +func (svc *StorageService) WaitStorageLoadPackage(nodeID cdssdk.NodeID, taskID string, waitTimeout time.Duration) (bool, *StorageLoadPackageResult, error) { agentCli, err := stgglb.AgentMQPool.Acquire(nodeID) if err != nil { // TODO 失败是否要当做任务已经结束? - return true, "", fmt.Errorf("new agent client: %w", err) + return true, nil, fmt.Errorf("new agent client: %w", err) } defer stgglb.AgentMQPool.Release(agentCli) waitResp, err := agentCli.WaitStorageLoadPackage(agtmq.NewWaitStorageLoadPackage(taskID, waitTimeout.Milliseconds())) if err != nil { // TODO 请求失败是否要当做任务已经结束? - return true, "", fmt.Errorf("wait storage load package: %w", err) + return true, nil, fmt.Errorf("wait storage load package: %w", err) } if !waitResp.IsComplete { - return false, "", nil + return false, nil, nil } if waitResp.Error != "" { - return true, "", fmt.Errorf("%s", waitResp.Error) + return true, nil, fmt.Errorf("%s", waitResp.Error) } - return true, waitResp.FullPath, nil + return true, &StorageLoadPackageResult{ + PackagePath: waitResp.PackagePath, + LocalBase: waitResp.LocalBase, + RemoteBase: waitResp.RemoteBase, + }, nil } func (svc *StorageService) DeleteStoragePackage(userID int64, packageID int64, storageID int64) error { diff --git a/common/assets/scripts/create_database.sql b/common/assets/scripts/create_database.sql index 42f3249..f543931 100644 --- a/common/assets/scripts/create_database.sql +++ b/common/assets/scripts/create_database.sql @@ -44,6 +44,7 @@ create table Storage ( Name varchar(100) not null comment '存储服务名称', NodeID int not null comment '存储服务所在节点的ID', Directory varchar(4096) not null comment '存储服务所在节点的目录', + Remote varchar(4096) not null, State varchar(100) comment '状态' ) comment = "存储服务表"; diff --git a/common/pkgs/db/model/model.go b/common/pkgs/db/model/model.go index 8044dcb..961f697 100644 --- a/common/pkgs/db/model/model.go +++ b/common/pkgs/db/model/model.go @@ -11,14 +11,7 @@ import ( ) // TODO 可以考虑逐步迁移到cdssdk中。迁移思路:数据对象应该包含的字段都迁移到cdssdk中,内部使用的一些特殊字段则留在这里 - -type Storage struct { - StorageID cdssdk.StorageID `db:"StorageID" json:"storageID"` - Name string `db:"Name" json:"name"` - NodeID cdssdk.NodeID `db:"NodeID" json:"nodeID"` - Directory string `db:"Directory" json:"directory"` - State string `db:"State" json:"state"` -} +type Storage = cdssdk.Storage type User struct { UserID cdssdk.UserID `db:"UserID" json:"userID"` diff --git a/common/pkgs/mq/agent/storage.go b/common/pkgs/mq/agent/storage.go index dec7fb6..1087a94 100644 --- a/common/pkgs/mq/agent/storage.go +++ b/common/pkgs/mq/agent/storage.go @@ -61,9 +61,11 @@ type WaitStorageLoadPackage struct { } type WaitStorageLoadPackageResp struct { mq.MessageBodyBase - IsComplete bool `json:"isComplete"` - Error string `json:"error"` - FullPath string `json:"fullPath"` + IsComplete bool `json:"isComplete"` + Error string `json:"error"` + PackagePath string `json:"packagePath"` // 加载后的Package的路径,相对于数据库中配置的Directory + LocalBase string `json:"localBase"` // 存储服务本地的目录,LocalBase + PackagePath = Package在代理节点上的完整路径 + RemoteBase string `json:"remoteBase"` // 存储服务远程的目录,RemoteBase + PackagePath = Package在存储服务中的完整路径 } func NewWaitStorageLoadPackage(taskID string, waitTimeoutMs int64) *WaitStorageLoadPackage { @@ -72,11 +74,13 @@ func NewWaitStorageLoadPackage(taskID string, waitTimeoutMs int64) *WaitStorageL WaitTimeoutMs: waitTimeoutMs, } } -func NewWaitStorageLoadPackageResp(isComplete bool, err string, fullPath string) *WaitStorageLoadPackageResp { +func NewWaitStorageLoadPackageResp(isComplete bool, err string, packagePath string, localBase string, remoteBase string) *WaitStorageLoadPackageResp { return &WaitStorageLoadPackageResp{ - IsComplete: isComplete, - Error: err, - FullPath: fullPath, + IsComplete: isComplete, + Error: err, + PackagePath: packagePath, + LocalBase: localBase, + RemoteBase: remoteBase, } } func (client *Client) WaitStorageLoadPackage(msg *WaitStorageLoadPackage, opts ...mq.RequestOption) (*WaitStorageLoadPackageResp, error) { @@ -89,7 +93,6 @@ var _ = Register(Service.StorageCheck) type StorageCheck struct { mq.MessageBodyBase StorageID cdssdk.StorageID `json:"storageID"` - Directory string `json:"directory"` } type StorageCheckResp struct { mq.MessageBodyBase @@ -97,10 +100,9 @@ type StorageCheckResp struct { Packages []model.StoragePackage `json:"packages"` } -func NewStorageCheck(storageID cdssdk.StorageID, directory string) *StorageCheck { +func NewStorageCheck(storageID cdssdk.StorageID) *StorageCheck { return &StorageCheck{ StorageID: storageID, - Directory: directory, } } func NewStorageCheckResp(dirState string, packages []model.StoragePackage) *StorageCheckResp { @@ -119,17 +121,15 @@ var _ = Register(Service.StorageGC) type StorageGC struct { mq.MessageBodyBase StorageID cdssdk.StorageID `json:"storageID"` - Directory string `json:"directory"` Packages []model.StoragePackage `json:"packages"` } type StorageGCResp struct { mq.MessageBodyBase } -func ReqStorageGC(storageID cdssdk.StorageID, directory string, packages []model.StoragePackage) *StorageGC { +func ReqStorageGC(storageID cdssdk.StorageID, packages []model.StoragePackage) *StorageGC { return &StorageGC{ StorageID: storageID, - Directory: directory, Packages: packages, } } diff --git a/common/utils/utils.go b/common/utils/utils.go index 3dc97a9..29a98a3 100644 --- a/common/utils/utils.go +++ b/common/utils/utils.go @@ -7,11 +7,10 @@ import ( cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" ) -// MakeStorageLoadPackagePath Load操作时,写入的文件夹的名称 -func MakeStorageLoadPackagePath(stgDir string, userID cdssdk.UserID, packageID cdssdk.PackageID) string { - return filepath.Join(stgDir, strconv.FormatInt(int64(userID), 10), "packages", strconv.FormatInt(int64(packageID), 10)) +func MakeLoadedPackagePath(userID cdssdk.UserID, packageID cdssdk.PackageID) string { + return filepath.Join("packages", strconv.FormatInt(int64(userID), 10), strconv.FormatInt(int64(packageID), 10)) } -func MakeStorageLoadDirectory(stgDir string, userIDStr string) string { - return filepath.Join(stgDir, userIDStr, "packages") +func MakeStorageLoadDirectory(stgDir string) string { + return filepath.Join(stgDir, "packages") } diff --git a/scanner/internal/event/agent_check_storage.go b/scanner/internal/event/agent_check_storage.go index 9edaacf..207b2fc 100644 --- a/scanner/internal/event/agent_check_storage.go +++ b/scanner/internal/event/agent_check_storage.go @@ -72,7 +72,7 @@ func (t *AgentCheckStorage) Execute(execCtx ExecuteContext) { } defer stgglb.AgentMQPool.Release(agtCli) - checkResp, err := agtCli.StorageCheck(agtmq.NewStorageCheck(stg.StorageID, stg.Directory), mq.RequestOption{Timeout: time.Minute}) + checkResp, err := agtCli.StorageCheck(agtmq.NewStorageCheck(stg.StorageID), mq.RequestOption{Timeout: time.Minute}) if err != nil { log.WithField("NodeID", stg.NodeID).Warnf("checking storage: %s", err.Error()) return diff --git a/scanner/internal/event/agent_storage_gc.go b/scanner/internal/event/agent_storage_gc.go index c3c22ad..beeab3a 100644 --- a/scanner/internal/event/agent_storage_gc.go +++ b/scanner/internal/event/agent_storage_gc.go @@ -74,7 +74,7 @@ func (t *AgentStorageGC) Execute(execCtx ExecuteContext) { } defer stgglb.AgentMQPool.Release(agtCli) - _, err = agtCli.StorageGC(agtmq.ReqStorageGC(t.StorageID, getStg.Directory, stgPkgs), mq.RequestOption{Timeout: time.Minute}) + _, err = agtCli.StorageGC(agtmq.ReqStorageGC(t.StorageID, stgPkgs), mq.RequestOption{Timeout: time.Minute}) if err != nil { log.WithField("StorageID", t.StorageID).Warnf("storage gc: %s", err.Error()) return