Browse Source

优化批量调度

gitlink
songjc 2 years ago
parent
commit
8516ed46bc
5 changed files with 20 additions and 18 deletions
  1. +3
    -3
      internal/cmdline/storage.go
  2. +1
    -1
      internal/http/storage.go
  3. +5
    -5
      internal/services/storage.go
  4. +7
    -6
      internal/task/move_dir_to_storage.go
  5. +4
    -3
      internal/task/move_object_to_storage.go

+ 3
- 3
internal/cmdline/storage.go View File

@@ -12,7 +12,7 @@ func StorageMoveObject(ctx CommandContext, objectID int64, storageID int64) erro
} }


for { for {
complete, err := ctx.Cmdline.Svc.StorageSvc().WaitStorageMoveObjectToStorage(taskID, time.Second*10)
complete, err := ctx.Cmdline.Svc.StorageSvc().WaitStorageMoveObject(taskID, time.Second*10)
if complete { if complete {
if err != nil { if err != nil {
return fmt.Errorf("moving complete with: %w", err) return fmt.Errorf("moving complete with: %w", err)
@@ -28,13 +28,13 @@ func StorageMoveObject(ctx CommandContext, objectID int64, storageID int64) erro
} }


func StorageMoveDir(ctx CommandContext, dirName string, storageID int64) error { func StorageMoveDir(ctx CommandContext, dirName string, storageID int64) error {
taskID, err := ctx.Cmdline.Svc.StorageSvc().StartMovingObjectDirToStorage(0, dirName, storageID)
taskID, err := ctx.Cmdline.Svc.StorageSvc().StartMovingDir(0, dirName, storageID)
if err != nil { if err != nil {
return fmt.Errorf("start moving object to storage: %w", err) return fmt.Errorf("start moving object to storage: %w", err)
} }


for { for {
complete, results, err := ctx.Cmdline.Svc.StorageSvc().WaitMovingObjectDirToStorage(taskID, time.Second*5)
complete, results, err := ctx.Cmdline.Svc.StorageSvc().WaitMovingDir(taskID, time.Second*5)
if complete { if complete {
if err != nil { if err != nil {
return fmt.Errorf("moving complete with: %w", err) return fmt.Errorf("moving complete with: %w", err)


+ 1
- 1
internal/http/storage.go View File

@@ -43,7 +43,7 @@ func (s *StorageService) MoveObject(ctx *gin.Context) {
} }


for { for {
complete, err := s.svc.StorageSvc().WaitStorageMoveObjectToStorage(taskID, time.Second*10)
complete, err := s.svc.StorageSvc().WaitStorageMoveObject(taskID, time.Second*10)
if complete { if complete {
if err != nil { if err != nil {
log.Warnf("moving complete with: %s", err.Error()) log.Warnf("moving complete with: %s", err.Error())


+ 5
- 5
internal/services/storage.go View File

@@ -24,7 +24,7 @@ func (svc *StorageService) StartStorageMoveObject(userID int64, objectID int64,
return tsk.ID(), nil return tsk.ID(), nil
} }


func (svc *StorageService) WaitStorageMoveObjectToStorage(taskID string, waitTimeout time.Duration) (bool, error) {
func (svc *StorageService) WaitStorageMoveObject(taskID string, waitTimeout time.Duration) (bool, error) {
tsk := svc.taskMgr.FindByID(taskID) tsk := svc.taskMgr.FindByID(taskID)
if tsk.WaitTimeout(waitTimeout) { if tsk.WaitTimeout(waitTimeout) {
return true, tsk.Error() return true, tsk.Error()
@@ -33,15 +33,15 @@ func (svc *StorageService) WaitStorageMoveObjectToStorage(taskID string, waitTim


} }


func (svc *StorageService) StartMovingObjectDirToStorage(userID int64, dirName string, storageID int64) (string, error) {
tsk := svc.taskMgr.StartNew(task.NewMoveObjectDirToStorage(userID, dirName, storageID))
func (svc *StorageService) StartMovingDir(userID int64, dirName string, storageID int64) (string, error) {
tsk := svc.taskMgr.StartNew(task.NewMoveDirToStorage(userID, dirName, storageID))
return tsk.ID(), nil return tsk.ID(), nil
} }


func (svc *StorageService) WaitMovingObjectDirToStorage(taskID string, waitTimeout time.Duration) (bool, []task.ResultObjectToStorage, error) {
func (svc *StorageService) WaitMovingDir(taskID string, waitTimeout time.Duration) (bool, []task.ResultObjectToStorage, error) {
tsk := svc.taskMgr.FindByID(taskID) tsk := svc.taskMgr.FindByID(taskID)
if tsk.WaitTimeout(waitTimeout) { if tsk.WaitTimeout(waitTimeout) {
return true, tsk.Body().(*task.MoveObjectDirToStorage).ResultObjectToStorages, tsk.Error()
return true, tsk.Body().(*task.MoveDirToStorage).ResultObjectToStorages, tsk.Error()
} }


return false, nil, nil return false, nil, nil


+ 7
- 6
internal/task/move_dir_to_storage.go View File

@@ -2,13 +2,14 @@ package task


import ( import (
"fmt" "fmt"
"path/filepath"
"time" "time"


"gitlink.org.cn/cloudream/common/pkg/distlock/reqbuilder" "gitlink.org.cn/cloudream/common/pkg/distlock/reqbuilder"
coormsg "gitlink.org.cn/cloudream/rabbitmq/message/coordinator" coormsg "gitlink.org.cn/cloudream/rabbitmq/message/coordinator"
) )


type MoveObjectDirToStorage struct {
type MoveDirToStorage struct {
userID int64 userID int64
dirName string dirName string
storageID int64 storageID int64
@@ -20,22 +21,22 @@ type ResultObjectToStorage struct {
Error error Error error
} }


func NewMoveObjectDirToStorage(userID int64, dirName string, storageID int64) *MoveObjectDirToStorage {
return &MoveObjectDirToStorage{
func NewMoveDirToStorage(userID int64, dirName string, storageID int64) *MoveDirToStorage {
return &MoveDirToStorage{
userID: userID, userID: userID,
dirName: dirName, dirName: dirName,
storageID: storageID, storageID: storageID,
} }
} }


func (t *MoveObjectDirToStorage) Execute(ctx TaskContext, complete CompleteFn) {
func (t *MoveDirToStorage) Execute(ctx TaskContext, complete CompleteFn) {
err := t.do(ctx) err := t.do(ctx)
complete(err, CompleteOption{ complete(err, CompleteOption{
RemovingDelay: time.Minute, RemovingDelay: time.Minute,
}) })
} }


func (t *MoveObjectDirToStorage) do(ctx TaskContext) error {
func (t *MoveDirToStorage) do(ctx TaskContext) error {
//根据dirName查询相关的所有文件 //根据dirName查询相关的所有文件
objsResp, err := ctx.Coordinator.GetObjectsByDirName(coormsg.NewGetObjectsByDirName(t.userID, t.dirName)) objsResp, err := ctx.Coordinator.GetObjectsByDirName(coormsg.NewGetObjectsByDirName(t.userID, t.dirName))
if err != nil { if err != nil {
@@ -73,7 +74,7 @@ func (t *MoveObjectDirToStorage) do(ctx TaskContext) error {
defer mutex.Unlock() defer mutex.Unlock()


for i := 0; i < len(objsResp.Objects); i++ { for i := 0; i < len(objsResp.Objects); i++ {
err := moveSingleObjectToStorage(ctx, t.userID, objsResp.Objects[i].ObjectID, t.storageID)
err := moveSingleObjectToStorage(ctx, t.userID, objsResp.Objects[i].ObjectID, filepath.Dir(objsResp.Objects[i].Name), t.storageID)
t.ResultObjectToStorages = append(t.ResultObjectToStorages, ResultObjectToStorage{ t.ResultObjectToStorages = append(t.ResultObjectToStorages, ResultObjectToStorage{
ObjectName: objsResp.Objects[i].Name, ObjectName: objsResp.Objects[i].Name,
Error: err, Error: err,


+ 4
- 3
internal/task/move_object_to_storage.go View File

@@ -56,16 +56,17 @@ func (t *MoveObjectToStorage) do(ctx TaskContext) error {
} }
defer mutex.Unlock() defer mutex.Unlock()


err = moveSingleObjectToStorage(ctx, t.userID, t.objectID, t.storageID)
err = moveSingleObjectToStorage(ctx, t.userID, t.objectID, "", t.storageID)
return err return err
} }


func moveSingleObjectToStorage(ctx TaskContext, userID int64, objectID int64, storageID int64) error {
func moveSingleObjectToStorage(ctx TaskContext, userID int64, objectID int64, dirName string, storageID int64) error {
// 先向协调端请求文件相关的元数据 // 先向协调端请求文件相关的元数据
preMoveResp, err := ctx.Coordinator.PreMoveObjectToStorage(coormsg.NewPreMoveObjectToStorage(objectID, storageID, userID)) preMoveResp, err := ctx.Coordinator.PreMoveObjectToStorage(coormsg.NewPreMoveObjectToStorage(objectID, storageID, userID))
if err != nil { if err != nil {
return fmt.Errorf("pre move object to storage: %w", err) return fmt.Errorf("pre move object to storage: %w", err)
} }
fmt.Printf("preMoveResp: %v\n", preMoveResp)


// 然后向代理端发送移动文件的请求 // 然后向代理端发送移动文件的请求
agentClient, err := agtcli.NewClient(preMoveResp.NodeID, &config.Cfg().RabbitMQ) agentClient, err := agtcli.NewClient(preMoveResp.NodeID, &config.Cfg().RabbitMQ)
@@ -75,7 +76,7 @@ func moveSingleObjectToStorage(ctx TaskContext, userID int64, objectID int64, st
defer agentClient.Close() defer agentClient.Close()


agentMoveResp, err := agentClient.StartStorageMoveObject( agentMoveResp, err := agentClient.StartStorageMoveObject(
agtmsg.NewStartStorageMoveObject(preMoveResp.Directory,
agtmsg.NewStartStorageMoveObject(preMoveResp.Directory+"/"+dirName,
objectID, objectID,
userID, userID,
preMoveResp.FileSize, preMoveResp.FileSize,


Loading…
Cancel
Save