From 8516ed46bc943f18f78bdf2b80482adf1b9777be Mon Sep 17 00:00:00 2001 From: songjc <969378911@qq.com> Date: Thu, 10 Aug 2023 10:56:40 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=89=B9=E9=87=8F=E8=B0=83?= =?UTF-8?q?=E5=BA=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/cmdline/storage.go | 6 +++--- internal/http/storage.go | 2 +- internal/services/storage.go | 10 +++++----- internal/task/move_dir_to_storage.go | 13 +++++++------ internal/task/move_object_to_storage.go | 7 ++++--- 5 files changed, 20 insertions(+), 18 deletions(-) diff --git a/internal/cmdline/storage.go b/internal/cmdline/storage.go index b15fea4..e1a15ed 100644 --- a/internal/cmdline/storage.go +++ b/internal/cmdline/storage.go @@ -12,7 +12,7 @@ func StorageMoveObject(ctx CommandContext, objectID int64, storageID int64) erro } for { - complete, err := ctx.Cmdline.Svc.StorageSvc().WaitStorageMoveObjectToStorage(taskID, time.Second*10) + complete, err := ctx.Cmdline.Svc.StorageSvc().WaitStorageMoveObject(taskID, time.Second*10) if complete { if err != nil { return fmt.Errorf("moving complete with: %w", err) @@ -28,13 +28,13 @@ func StorageMoveObject(ctx CommandContext, objectID int64, storageID int64) erro } func StorageMoveDir(ctx CommandContext, dirName string, storageID int64) error { - taskID, err := ctx.Cmdline.Svc.StorageSvc().StartMovingObjectDirToStorage(0, dirName, storageID) + taskID, err := ctx.Cmdline.Svc.StorageSvc().StartMovingDir(0, dirName, storageID) if err != nil { return fmt.Errorf("start moving object to storage: %w", err) } for { - complete, results, err := ctx.Cmdline.Svc.StorageSvc().WaitMovingObjectDirToStorage(taskID, time.Second*5) + complete, results, err := ctx.Cmdline.Svc.StorageSvc().WaitMovingDir(taskID, time.Second*5) if complete { if err != nil { return fmt.Errorf("moving complete with: %w", err) diff --git a/internal/http/storage.go b/internal/http/storage.go index 65fe6e6..7e1244d 100644 --- a/internal/http/storage.go +++ b/internal/http/storage.go @@ -43,7 +43,7 @@ func (s *StorageService) MoveObject(ctx *gin.Context) { } for { - complete, err := s.svc.StorageSvc().WaitStorageMoveObjectToStorage(taskID, time.Second*10) + complete, err := s.svc.StorageSvc().WaitStorageMoveObject(taskID, time.Second*10) if complete { if err != nil { log.Warnf("moving complete with: %s", err.Error()) diff --git a/internal/services/storage.go b/internal/services/storage.go index 3f54cd4..96ea7ee 100644 --- a/internal/services/storage.go +++ b/internal/services/storage.go @@ -24,7 +24,7 @@ func (svc *StorageService) StartStorageMoveObject(userID int64, objectID int64, return tsk.ID(), nil } -func (svc *StorageService) WaitStorageMoveObjectToStorage(taskID string, waitTimeout time.Duration) (bool, error) { +func (svc *StorageService) WaitStorageMoveObject(taskID string, waitTimeout time.Duration) (bool, error) { tsk := svc.taskMgr.FindByID(taskID) if tsk.WaitTimeout(waitTimeout) { return true, tsk.Error() @@ -33,15 +33,15 @@ func (svc *StorageService) WaitStorageMoveObjectToStorage(taskID string, waitTim } -func (svc *StorageService) StartMovingObjectDirToStorage(userID int64, dirName string, storageID int64) (string, error) { - tsk := svc.taskMgr.StartNew(task.NewMoveObjectDirToStorage(userID, dirName, storageID)) +func (svc *StorageService) StartMovingDir(userID int64, dirName string, storageID int64) (string, error) { + tsk := svc.taskMgr.StartNew(task.NewMoveDirToStorage(userID, dirName, storageID)) return tsk.ID(), nil } -func (svc *StorageService) WaitMovingObjectDirToStorage(taskID string, waitTimeout time.Duration) (bool, []task.ResultObjectToStorage, error) { +func (svc *StorageService) WaitMovingDir(taskID string, waitTimeout time.Duration) (bool, []task.ResultObjectToStorage, error) { tsk := svc.taskMgr.FindByID(taskID) if tsk.WaitTimeout(waitTimeout) { - return true, tsk.Body().(*task.MoveObjectDirToStorage).ResultObjectToStorages, tsk.Error() + return true, tsk.Body().(*task.MoveDirToStorage).ResultObjectToStorages, tsk.Error() } return false, nil, nil diff --git a/internal/task/move_dir_to_storage.go b/internal/task/move_dir_to_storage.go index fa5d3a2..b26acb1 100644 --- a/internal/task/move_dir_to_storage.go +++ b/internal/task/move_dir_to_storage.go @@ -2,13 +2,14 @@ package task import ( "fmt" + "path/filepath" "time" "gitlink.org.cn/cloudream/common/pkg/distlock/reqbuilder" coormsg "gitlink.org.cn/cloudream/rabbitmq/message/coordinator" ) -type MoveObjectDirToStorage struct { +type MoveDirToStorage struct { userID int64 dirName string storageID int64 @@ -20,22 +21,22 @@ type ResultObjectToStorage struct { Error error } -func NewMoveObjectDirToStorage(userID int64, dirName string, storageID int64) *MoveObjectDirToStorage { - return &MoveObjectDirToStorage{ +func NewMoveDirToStorage(userID int64, dirName string, storageID int64) *MoveDirToStorage { + return &MoveDirToStorage{ userID: userID, dirName: dirName, storageID: storageID, } } -func (t *MoveObjectDirToStorage) Execute(ctx TaskContext, complete CompleteFn) { +func (t *MoveDirToStorage) Execute(ctx TaskContext, complete CompleteFn) { err := t.do(ctx) complete(err, CompleteOption{ RemovingDelay: time.Minute, }) } -func (t *MoveObjectDirToStorage) do(ctx TaskContext) error { +func (t *MoveDirToStorage) do(ctx TaskContext) error { //根据dirName查询相关的所有文件 objsResp, err := ctx.Coordinator.GetObjectsByDirName(coormsg.NewGetObjectsByDirName(t.userID, t.dirName)) if err != nil { @@ -73,7 +74,7 @@ func (t *MoveObjectDirToStorage) do(ctx TaskContext) error { defer mutex.Unlock() for i := 0; i < len(objsResp.Objects); i++ { - err := moveSingleObjectToStorage(ctx, t.userID, objsResp.Objects[i].ObjectID, t.storageID) + err := moveSingleObjectToStorage(ctx, t.userID, objsResp.Objects[i].ObjectID, filepath.Dir(objsResp.Objects[i].Name), t.storageID) t.ResultObjectToStorages = append(t.ResultObjectToStorages, ResultObjectToStorage{ ObjectName: objsResp.Objects[i].Name, Error: err, diff --git a/internal/task/move_object_to_storage.go b/internal/task/move_object_to_storage.go index 19727ef..c402b47 100644 --- a/internal/task/move_object_to_storage.go +++ b/internal/task/move_object_to_storage.go @@ -56,16 +56,17 @@ func (t *MoveObjectToStorage) do(ctx TaskContext) error { } defer mutex.Unlock() - err = moveSingleObjectToStorage(ctx, t.userID, t.objectID, t.storageID) + err = moveSingleObjectToStorage(ctx, t.userID, t.objectID, "", t.storageID) return err } -func moveSingleObjectToStorage(ctx TaskContext, userID int64, objectID int64, storageID int64) error { +func moveSingleObjectToStorage(ctx TaskContext, userID int64, objectID int64, dirName string, storageID int64) error { // 先向协调端请求文件相关的元数据 preMoveResp, err := ctx.Coordinator.PreMoveObjectToStorage(coormsg.NewPreMoveObjectToStorage(objectID, storageID, userID)) if err != nil { return fmt.Errorf("pre move object to storage: %w", err) } + fmt.Printf("preMoveResp: %v\n", preMoveResp) // 然后向代理端发送移动文件的请求 agentClient, err := agtcli.NewClient(preMoveResp.NodeID, &config.Cfg().RabbitMQ) @@ -75,7 +76,7 @@ func moveSingleObjectToStorage(ctx TaskContext, userID int64, objectID int64, st defer agentClient.Close() agentMoveResp, err := agentClient.StartStorageMoveObject( - agtmsg.NewStartStorageMoveObject(preMoveResp.Directory, + agtmsg.NewStartStorageMoveObject(preMoveResp.Directory+"/"+dirName, objectID, userID, preMoveResp.FileSize,