From 36ca07b8942465d75e48ed9f788624146a9147df Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Wed, 23 Aug 2023 16:12:50 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E7=BC=93=E5=AD=98=E7=A7=BB?= =?UTF-8?q?=E5=8A=A8=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/cmdline/cache.go | 32 ++++++++++++++++++ internal/http/cacah.go | 64 ++++++++++++++++++++++++++++++++++++ internal/http/server.go | 4 +++ internal/http/storage.go | 55 +++++++++++++++++++++++++++++++ internal/services/cacah.go | 55 +++++++++++++++++++++++++++++++ internal/services/storage.go | 1 - 6 files changed, 210 insertions(+), 1 deletion(-) create mode 100644 internal/cmdline/cache.go create mode 100644 internal/http/cacah.go create mode 100644 internal/services/cacah.go diff --git a/internal/cmdline/cache.go b/internal/cmdline/cache.go new file mode 100644 index 0000000..3c8e3e7 --- /dev/null +++ b/internal/cmdline/cache.go @@ -0,0 +1,32 @@ +package cmdline + +import ( + "fmt" + "time" +) + +func CacheMovePackage(ctx CommandContext, packageID int64, nodeID int64) error { + taskID, err := ctx.Cmdline.Svc.CacheSvc().StartCacheMovePackage(0, packageID, nodeID) + if err != nil { + return fmt.Errorf("start cache moving package: %w", err) + } + + for { + complete, err := ctx.Cmdline.Svc.CacheSvc().WaitCacheMovePackage(nodeID, taskID, time.Second*10) + if complete { + if err != nil { + return fmt.Errorf("moving complete with: %w", err) + } + + return nil + } + + if err != nil { + return fmt.Errorf("wait moving: %w", err) + } + } +} + +func init() { + commands.Add(CacheMovePackage, "cache", "move") +} diff --git a/internal/http/cacah.go b/internal/http/cacah.go new file mode 100644 index 0000000..ebe58e6 --- /dev/null +++ b/internal/http/cacah.go @@ -0,0 +1,64 @@ +package http + +import ( + "net/http" + "time" + + "github.com/gin-gonic/gin" + "gitlink.org.cn/cloudream/common/consts/errorcode" + "gitlink.org.cn/cloudream/common/pkgs/logger" +) + +type CacheService struct { + *Server +} + +func (s *Server) CacheSvc() *CacheService { + return &CacheService{ + Server: s, + } +} + +type CacheMovePackageReq struct { + UserID *int64 `json:"userID" binding:"required"` + PackageID *int64 `json:"packageID" binding:"required"` + NodeID *int64 `json:"nodeID" binding:"required"` +} + +func (s *CacheService) MovePackage(ctx *gin.Context) { + log := logger.WithField("HTTP", "Cache.LoadPackage") + + var req CacheMovePackageReq + if err := ctx.ShouldBindJSON(&req); err != nil { + log.Warnf("binding body: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + taskID, err := s.svc.CacheSvc().StartCacheMovePackage(*req.UserID, *req.PackageID, *req.NodeID) + if err != nil { + log.Warnf("start cache move package: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "cache move package failed")) + return + } + + for { + complete, err := s.svc.CacheSvc().WaitCacheMovePackage(*req.NodeID, taskID, time.Second*10) + if complete { + if err != nil { + log.Warnf("moving complete with: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "cache move package failed")) + return + } + + ctx.JSON(http.StatusOK, OK(nil)) + return + } + + if err != nil { + log.Warnf("wait moving: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "cache move package failed")) + return + } + } +} diff --git a/internal/http/server.go b/internal/http/server.go index 996c676..c216d35 100644 --- a/internal/http/server.go +++ b/internal/http/server.go @@ -39,8 +39,12 @@ func (s *Server) Serve() error { func (s *Server) initRouters() { s.engine.GET("/object/download", s.ObjectSvc().Download) + s.engine.POST("/package/upload", s.PackageSvc().Upload) s.engine.POST("/package/delete", s.PackageSvc().Delete) s.engine.POST("/storage/loadPackage", s.StorageSvc().LoadPackage) + s.engine.POST("/storage/createPackage", s.StorageSvc().CreatePackage) + + s.engine.POST("/cache/movePackage", s.CacheSvc().MovePackage) } diff --git a/internal/http/storage.go b/internal/http/storage.go index 8574fa2..1334170 100644 --- a/internal/http/storage.go +++ b/internal/http/storage.go @@ -6,6 +6,7 @@ import ( "github.com/gin-gonic/gin" "gitlink.org.cn/cloudream/common/consts/errorcode" + "gitlink.org.cn/cloudream/common/models" "gitlink.org.cn/cloudream/common/pkgs/logger" ) @@ -62,3 +63,57 @@ func (s *StorageService) LoadPackage(ctx *gin.Context) { } } } + +type StorageCreatePackageReq struct { + UserID *int64 `json:"userID" binding:"required"` + StorageID *int64 `json:"storageID" binding:"required"` + Path string `json:"path" binding:"required"` + BucketID *int64 `json:"bucketID" binding:"required"` + Name string `json:"name" binding:"required"` + Redundancy models.TypedRedundancyInfo `json:"redundancy" binding:"required"` +} + +type StorageCreatePackageResp struct { + PackageID int64 `json:"packageID"` +} + +func (s *StorageService) CreatePackage(ctx *gin.Context) { + log := logger.WithField("HTTP", "Storage.CreatePackage") + + var req StorageCreatePackageReq + if err := ctx.ShouldBindJSON(&req); err != nil { + log.Warnf("binding body: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + nodeID, taskID, err := s.svc.StorageSvc().StartStorageCreatePackage( + *req.UserID, *req.BucketID, req.Name, *req.StorageID, req.Path, req.Redundancy) + if err != nil { + log.Warnf("start storage create package: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage create package failed")) + return + } + + for { + complete, packageID, err := s.svc.StorageSvc().WaitStorageCreatePackage(nodeID, taskID, time.Second*10) + if complete { + if err != nil { + log.Warnf("creating complete with: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage create package failed")) + return + } + + ctx.JSON(http.StatusOK, OK(StorageCreatePackageResp{ + PackageID: packageID, + })) + return + } + + if err != nil { + log.Warnf("wait creating: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage create package failed")) + return + } + } +} diff --git a/internal/services/cacah.go b/internal/services/cacah.go new file mode 100644 index 0000000..107fe69 --- /dev/null +++ b/internal/services/cacah.go @@ -0,0 +1,55 @@ +package services + +import ( + "fmt" + "time" + + "gitlink.org.cn/cloudream/storage-common/globals" + agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent" +) + +type CacheService struct { + *Service +} + +func (svc *Service) CacheSvc() *CacheService { + return &CacheService{Service: svc} +} + +func (svc *CacheService) StartCacheMovePackage(userID int64, packageID int64, nodeID int64) (string, error) { + agentCli, err := globals.AgentMQPool.Acquire(nodeID) + if err != nil { + return "", fmt.Errorf("new agent client: %w", err) + } + defer agentCli.Close() + + startResp, err := agentCli.StartCacheMovePackage(agtmq.NewStartCacheMovePackage(userID, packageID)) + if err != nil { + return "", fmt.Errorf("start cache move package: %w", err) + } + + return startResp.TaskID, nil +} + +func (svc *CacheService) WaitCacheMovePackage(nodeID int64, taskID string, waitTimeout time.Duration) (bool, error) { + agentCli, err := globals.AgentMQPool.Acquire(nodeID) + if err != nil { + return true, fmt.Errorf("new agent client: %w", err) + } + defer agentCli.Close() + + waitResp, err := agentCli.WaitCacheMovePackage(agtmq.NewWaitCacheMovePackage(taskID, waitTimeout.Milliseconds())) + if err != nil { + return true, fmt.Errorf("wait cache move package: %w", err) + } + + if !waitResp.IsComplete { + return false, nil + } + + if waitResp.Error != "" { + return true, fmt.Errorf("%s", waitResp.Error) + } + + return true, nil +} diff --git a/internal/services/storage.go b/internal/services/storage.go index 9e9c44d..900ad84 100644 --- a/internal/services/storage.go +++ b/internal/services/storage.go @@ -30,7 +30,6 @@ func (svc *StorageService) WaitStorageLoadPackage(taskID string, waitTimeout tim return true, tsk.Error() } return false, nil - } func (svc *StorageService) DeleteStoragePackage(userID int64, packageID int64, storageID int64) error {