From 97bd7717b6f80f448d1b7b51962a9e956b20546b Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Fri, 15 Sep 2023 10:03:10 +0800 Subject: [PATCH] =?UTF-8?q?=E7=A7=BB=E5=8A=A8=E7=BC=93=E5=AD=98=E7=9A=84?= =?UTF-8?q?=E5=87=BD=E6=95=B0=E8=BF=94=E5=9B=9E=E7=A7=BB=E5=8A=A8=E5=90=8E?= =?UTF-8?q?=E7=9A=84FileHash?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- agent/internal/services/mq/cache.go | 8 +++++--- agent/internal/task/cache_move_package.go | 4 ++++ client/internal/cmdline/cache.go | 2 +- client/internal/http/cacah.go | 10 ++++++++-- client/internal/services/cacah.go | 13 +++++++------ common/pkgs/mq/agent/cache.go | 9 ++++++--- 6 files changed, 31 insertions(+), 15 deletions(-) diff --git a/agent/internal/services/mq/cache.go b/agent/internal/services/mq/cache.go index cbc298b..cd053d3 100644 --- a/agent/internal/services/mq/cache.go +++ b/agent/internal/services/mq/cache.go @@ -127,6 +127,8 @@ func (svc *Service) WaitCacheMovePackage(msg *agtmq.WaitCacheMovePackage) (*agtm return nil, mq.Failed(errorcode.TaskNotFound, "task not found") } + mvPkgTask := tsk.Body().(*mytask.CacheMovePackage) + if msg.WaitTimeoutMs == 0 { tsk.Wait() @@ -135,7 +137,7 @@ func (svc *Service) WaitCacheMovePackage(msg *agtmq.WaitCacheMovePackage) (*agtm errMsg = tsk.Error().Error() } - return mq.ReplyOK(agtmq.NewWaitCacheMovePackageResp(true, errMsg)) + return mq.ReplyOK(agtmq.NewWaitCacheMovePackageResp(true, errMsg, mvPkgTask.ResultCacheInfos)) } else { if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs)) { @@ -145,9 +147,9 @@ func (svc *Service) WaitCacheMovePackage(msg *agtmq.WaitCacheMovePackage) (*agtm errMsg = tsk.Error().Error() } - return mq.ReplyOK(agtmq.NewWaitCacheMovePackageResp(true, errMsg)) + return mq.ReplyOK(agtmq.NewWaitCacheMovePackageResp(true, errMsg, nil)) } - return mq.ReplyOK(agtmq.NewWaitCacheMovePackageResp(false, "")) + return mq.ReplyOK(agtmq.NewWaitCacheMovePackageResp(false, "", mvPkgTask.ResultCacheInfos)) } } diff --git a/agent/internal/task/cache_move_package.go b/agent/internal/task/cache_move_package.go index 6319cfd..4793f0a 100644 --- a/agent/internal/task/cache_move_package.go +++ b/agent/internal/task/cache_move_package.go @@ -4,6 +4,7 @@ import ( "fmt" "time" + "gitlink.org.cn/cloudream/common/models" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/task" "gitlink.org.cn/cloudream/storage/common/globals" @@ -15,6 +16,8 @@ import ( type CacheMovePackage struct { userID int64 packageID int64 + + ResultCacheInfos []models.ObjectCacheInfo } func NewCacheMovePackage(userID int64, packageID int64) *CacheMovePackage { @@ -92,6 +95,7 @@ func (t *CacheMovePackage) moveRep(ctx TaskContext, coorCli *coormq.PoolClient, } fileHashes = append(fileHashes, rep.FileHash) + t.ResultCacheInfos = append(t.ResultCacheInfos, models.NewObjectCacheInfo(rep.Object.ObjectID, rep.FileHash)) } _, err = coorCli.CachePackageMoved(coormq.NewCachePackageMoved(pkg.PackageID, *globals.Local.NodeID, fileHashes)) diff --git a/client/internal/cmdline/cache.go b/client/internal/cmdline/cache.go index 3c8e3e7..1a849ce 100644 --- a/client/internal/cmdline/cache.go +++ b/client/internal/cmdline/cache.go @@ -12,7 +12,7 @@ func CacheMovePackage(ctx CommandContext, packageID int64, nodeID int64) error { } for { - complete, err := ctx.Cmdline.Svc.CacheSvc().WaitCacheMovePackage(nodeID, taskID, time.Second*10) + complete, _, err := ctx.Cmdline.Svc.CacheSvc().WaitCacheMovePackage(nodeID, taskID, time.Second*10) if complete { if err != nil { return fmt.Errorf("moving complete with: %w", err) diff --git a/client/internal/http/cacah.go b/client/internal/http/cacah.go index ebe58e6..a8635cd 100644 --- a/client/internal/http/cacah.go +++ b/client/internal/http/cacah.go @@ -6,6 +6,7 @@ import ( "github.com/gin-gonic/gin" "gitlink.org.cn/cloudream/common/consts/errorcode" + "gitlink.org.cn/cloudream/common/models" "gitlink.org.cn/cloudream/common/pkgs/logger" ) @@ -24,6 +25,9 @@ type CacheMovePackageReq struct { PackageID *int64 `json:"packageID" binding:"required"` NodeID *int64 `json:"nodeID" binding:"required"` } +type CacheMovePackageResp struct { + CacheInfos []models.ObjectCacheInfo `json:"cacheInfos"` +} func (s *CacheService) MovePackage(ctx *gin.Context) { log := logger.WithField("HTTP", "Cache.LoadPackage") @@ -43,7 +47,7 @@ func (s *CacheService) MovePackage(ctx *gin.Context) { } for { - complete, err := s.svc.CacheSvc().WaitCacheMovePackage(*req.NodeID, taskID, time.Second*10) + complete, cacheInfos, err := s.svc.CacheSvc().WaitCacheMovePackage(*req.NodeID, taskID, time.Second*10) if complete { if err != nil { log.Warnf("moving complete with: %s", err.Error()) @@ -51,7 +55,9 @@ func (s *CacheService) MovePackage(ctx *gin.Context) { return } - ctx.JSON(http.StatusOK, OK(nil)) + ctx.JSON(http.StatusOK, OK(CacheMovePackageResp{ + CacheInfos: cacheInfos, + })) return } diff --git a/client/internal/services/cacah.go b/client/internal/services/cacah.go index d4ed2cb..9091520 100644 --- a/client/internal/services/cacah.go +++ b/client/internal/services/cacah.go @@ -4,6 +4,7 @@ import ( "fmt" "time" + "gitlink.org.cn/cloudream/common/models" "gitlink.org.cn/cloudream/storage/common/globals" agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" ) @@ -31,25 +32,25 @@ func (svc *CacheService) StartCacheMovePackage(userID int64, packageID int64, no return startResp.TaskID, nil } -func (svc *CacheService) WaitCacheMovePackage(nodeID int64, taskID string, waitTimeout time.Duration) (bool, error) { +func (svc *CacheService) WaitCacheMovePackage(nodeID int64, taskID string, waitTimeout time.Duration) (bool, []models.ObjectCacheInfo, error) { agentCli, err := globals.AgentMQPool.Acquire(nodeID) if err != nil { - return true, fmt.Errorf("new agent client: %w", err) + return true, nil, fmt.Errorf("new agent client: %w", err) } defer agentCli.Close() waitResp, err := agentCli.WaitCacheMovePackage(agtmq.NewWaitCacheMovePackage(taskID, waitTimeout.Milliseconds())) if err != nil { - return true, fmt.Errorf("wait cache move package: %w", err) + return true, nil, fmt.Errorf("wait cache move package: %w", err) } if !waitResp.IsComplete { - return false, nil + return false, nil, nil } if waitResp.Error != "" { - return true, fmt.Errorf("%s", waitResp.Error) + return true, nil, fmt.Errorf("%s", waitResp.Error) } - return true, nil + return true, waitResp.CacheInfos, nil } diff --git a/common/pkgs/mq/agent/cache.go b/common/pkgs/mq/agent/cache.go index 4dbf7c0..1fb24f3 100644 --- a/common/pkgs/mq/agent/cache.go +++ b/common/pkgs/mq/agent/cache.go @@ -1,6 +1,7 @@ package agent import ( + "gitlink.org.cn/cloudream/common/models" "gitlink.org.cn/cloudream/common/pkgs/mq" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" ) @@ -93,8 +94,9 @@ type WaitCacheMovePackage struct { } type WaitCacheMovePackageResp struct { mq.MessageBodyBase - IsComplete bool `json:"isComplete"` - Error string `json:"error"` + IsComplete bool `json:"isComplete"` + Error string `json:"error"` + CacheInfos []models.ObjectCacheInfo `json:"cacheInfos"` } func NewWaitCacheMovePackage(taskID string, waitTimeoutMs int64) *WaitCacheMovePackage { @@ -103,10 +105,11 @@ func NewWaitCacheMovePackage(taskID string, waitTimeoutMs int64) *WaitCacheMoveP WaitTimeoutMs: waitTimeoutMs, } } -func NewWaitCacheMovePackageResp(isComplete bool, err string) *WaitCacheMovePackageResp { +func NewWaitCacheMovePackageResp(isComplete bool, err string, cacheInfos []models.ObjectCacheInfo) *WaitCacheMovePackageResp { return &WaitCacheMovePackageResp{ IsComplete: isComplete, Error: err, + CacheInfos: cacheInfos, } } func (client *Client) WaitCacheMovePackage(msg *WaitCacheMovePackage, opts ...mq.RequestOption) (*WaitCacheMovePackageResp, error) {