diff --git a/agent/internal/cmd/serve.go b/agent/internal/cmd/serve.go index 654db1a..6731e29 100644 --- a/agent/internal/cmd/serve.go +++ b/agent/internal/cmd/serve.go @@ -166,7 +166,7 @@ func serve(configPath string) { // 启动命令服务器 // TODO 需要设计AgentID持久化机制 - agtSvr, err := agtmq.NewServer(cmdsvc.NewService(&taskMgr, stgAgts), config.Cfg().ID, config.Cfg().RabbitMQ) + agtSvr, err := agtmq.NewServer(cmdsvc.NewService(&taskMgr, stgAgts, uploader), config.Cfg().ID, config.Cfg().RabbitMQ) if err != nil { logger.Fatalf("new agent server failed, err: %s", err.Error()) } diff --git a/agent/internal/mq/service.go b/agent/internal/mq/service.go index b4688ed..95cd4e3 100644 --- a/agent/internal/mq/service.go +++ b/agent/internal/mq/service.go @@ -3,16 +3,19 @@ package mq import ( "gitlink.org.cn/cloudream/storage/agent/internal/task" "gitlink.org.cn/cloudream/storage/common/pkgs/storage/agtpool" + "gitlink.org.cn/cloudream/storage/common/pkgs/uploader" ) type Service struct { taskManager *task.Manager stgAgts *agtpool.AgentPool + uploader *uploader.Uploader } -func NewService(taskMgr *task.Manager, stgAgts *agtpool.AgentPool) *Service { +func NewService(taskMgr *task.Manager, stgAgts *agtpool.AgentPool, uplodaer *uploader.Uploader) *Service { return &Service{ taskManager: taskMgr, stgAgts: stgAgts, + uploader: uplodaer, } } diff --git a/agent/internal/mq/storage.go b/agent/internal/mq/storage.go index ff5b4ff..cdc3871 100644 --- a/agent/internal/mq/storage.go +++ b/agent/internal/mq/storage.go @@ -1,76 +1,57 @@ package mq import ( - "time" - "gitlink.org.cn/cloudream/common/consts/errorcode" + "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" - mytask "gitlink.org.cn/cloudream/storage/agent/internal/task" + stgglb "gitlink.org.cn/cloudream/storage/common/globals" agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" + coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) -func (svc *Service) StartStorageCreatePackage(msg *agtmq.StartStorageCreatePackage) (*agtmq.StartStorageCreatePackageResp, *mq.CodeMessage) { - return nil, mq.Failed(errorcode.OperationFailed, "not implemented") - // coorCli, err := stgglb.CoordinatorMQPool.Acquire() - // if err != nil { - // logger.Warnf("new coordinator client: %s", err.Error()) - - // return nil, mq.Failed(errorcode.OperationFailed, "new coordinator client failed") - // } - // defer stgglb.CoordinatorMQPool.Release(coorCli) - - // getStg, err := coorCli.GetStorageDetails(coormq.ReqGetStorageDetails([]cdssdk.StorageID{msg.StorageID})) - // if err != nil { - // return nil, mq.Failed(errorcode.OperationFailed, err.Error()) - // } - // if getStg.Storages[0] == nil { - // return nil, mq.Failed(errorcode.OperationFailed, "storage not found") - // } - // if getStg.Storages[0].Shared == nil { - // return nil, mq.Failed(errorcode.OperationFailed, "storage has no shared storage") - // } - - // fullPath := filepath.Clean(filepath.Join(getStg.Storages[0].Shared.LoadBase, msg.Path)) - - // var uploadFilePathes []string - // err = filepath.WalkDir(fullPath, func(fname string, fi os.DirEntry, err error) error { - // if err != nil { - // return nil - // } - - // if !fi.IsDir() { - // uploadFilePathes = append(uploadFilePathes, fname) - // } +func (svc *Service) StorageCreatePackage(msg *agtmq.StorageCreatePackage) (*agtmq.StorageCreatePackageResp, *mq.CodeMessage) { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + logger.Warnf("new coordinator client: %s", err.Error()) - // return nil - // }) - // if err != nil { - // logger.Warnf("opening directory %s: %s", fullPath, err.Error()) - - // return nil, mq.Failed(errorcode.OperationFailed, "read directory failed") - // } + return nil, mq.Failed(errorcode.OperationFailed, "new coordinator client failed") + } + defer stgglb.CoordinatorMQPool.Release(coorCli) - // objIter := iterator.NewUploadingObjectIterator(fullPath, uploadFilePathes) - // tsk := svc.taskManager.StartNew(mytask.NewCreatePackage(msg.UserID, msg.BucketID, msg.Name, objIter, msg.StorageAffinity)) - // return mq.ReplyOK(agtmq.NewStartStorageCreatePackageResp(tsk.ID())) -} + pub, err := svc.stgAgts.GetPublicStore(msg.StorageID) + if err != nil { + return nil, mq.Failed(errorcode.OperationFailed, err.Error()) + } -func (svc *Service) WaitStorageCreatePackage(msg *agtmq.WaitStorageCreatePackage) (*agtmq.WaitStorageCreatePackageResp, *mq.CodeMessage) { - tsk := svc.taskManager.FindByID(msg.TaskID) - if tsk == nil { - return nil, mq.Failed(errorcode.TaskNotFound, "task not found") + createResp, err := coorCli.CreatePackage(coormq.NewCreatePackage(msg.UserID, msg.BucketID, msg.Name)) + if err != nil { + return nil, mq.Failed(errorcode.OperationFailed, err.Error()) } - if msg.WaitTimeoutMs == 0 { - tsk.Wait() - } else if !tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs) * time.Millisecond) { - return mq.ReplyOK(agtmq.NewWaitStorageCreatePackageResp(false, "", 0)) + uploader, err := svc.uploader.BeginUpdate(msg.UserID, createResp.Package.PackageID, msg.StorageAffinity, nil, nil) + if err != nil { + return nil, mq.Failed(errorcode.OperationFailed, err.Error()) } - if tsk.Error() != nil { - return mq.ReplyOK(agtmq.NewWaitStorageCreatePackageResp(true, tsk.Error().Error(), 0)) + objPathes, err := pub.List(msg.Path, true) + for _, p := range objPathes { + o, err := pub.Read(p) + if err != nil { + logger.Warnf("read object %s: %v", p, err) + continue + } + + err = uploader.Upload(p, o) + o.Close() + if err != nil { + logger.Warnf("upload object %s: %v", p, err) + continue + } + } + _, err = uploader.Commit() + if err != nil { + return nil, mq.Failed(errorcode.OperationFailed, err.Error()) } - taskBody := tsk.Body().(*mytask.CreatePackage) - return mq.ReplyOK(agtmq.NewWaitStorageCreatePackageResp(true, "", taskBody.Result.PackageID)) + return mq.ReplyOK(agtmq.RespStorageCreatePackage(createResp.Package)) } diff --git a/client/internal/cmdline/storage.go b/client/internal/cmdline/storage.go index bb6c09a..7263c58 100644 --- a/client/internal/cmdline/storage.go +++ b/client/internal/cmdline/storage.go @@ -22,27 +22,13 @@ func StorageCreatePackage(ctx CommandContext, bucketID cdssdk.BucketID, name str }() // 开始创建并上传包到存储系统 - hubID, taskID, err := ctx.Cmdline.Svc.StorageSvc().StartStorageCreatePackage(1, bucketID, name, storageID, path, 0) + pkg, err := ctx.Cmdline.Svc.StorageSvc().StorageCreatePackage(1, bucketID, name, storageID, path, 0) if err != nil { return fmt.Errorf("start storage uploading package: %w", err) } - // 循环等待上传完成 - for { - complete, packageID, err := ctx.Cmdline.Svc.StorageSvc().WaitStorageCreatePackage(hubID, taskID, time.Second*10) - if complete { - if err != nil { - return fmt.Errorf("uploading complete with: %w", err) - } - - fmt.Printf("%d\n", packageID) - return nil - } - - if err != nil { - return fmt.Errorf("wait uploading: %w", err) - } - } + fmt.Printf("%d\n", pkg.PackageID) + return nil } // 初始化函数,注册加载包和创建包的命令到命令行解析器。 diff --git a/client/internal/http/storage.go b/client/internal/http/storage.go index 96bb9a6..48b72c0 100644 --- a/client/internal/http/storage.go +++ b/client/internal/http/storage.go @@ -1,8 +1,8 @@ package http import ( + "fmt" "net/http" - "time" "github.com/gin-gonic/gin" "gitlink.org.cn/cloudream/common/consts/errorcode" @@ -50,35 +50,17 @@ func (s *StorageService) CreatePackage(ctx *gin.Context) { return } - hubID, taskID, err := s.svc.StorageSvc().StartStorageCreatePackage( + pkg, err := s.svc.StorageSvc().StorageCreatePackage( req.UserID, req.BucketID, req.Name, req.StorageID, req.Path, req.StorageAffinity) if err != nil { - log.Warnf("start storage create package: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage create package failed")) + log.Warnf("storage create package: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("storage create package: %v", err))) return } - for { - complete, packageID, err := s.svc.StorageSvc().WaitStorageCreatePackage(hubID, taskID, time.Second*10) - if complete { - if err != nil { - log.Warnf("creating complete with: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage create package failed")) - return - } - - ctx.JSON(http.StatusOK, OK(cdsapi.StorageCreatePackageResp{ - PackageID: packageID, - })) - return - } - - if err != nil { - log.Warnf("wait creating: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage create package failed")) - return - } - } + ctx.JSON(http.StatusOK, OK(cdsapi.StorageCreatePackageResp{ + Package: pkg, + })) } func (s *StorageService) Get(ctx *gin.Context) { diff --git a/client/internal/services/storage.go b/client/internal/services/storage.go index a9263eb..267e369 100644 --- a/client/internal/services/storage.go +++ b/client/internal/services/storage.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "path" - "time" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" @@ -161,57 +160,32 @@ func (svc *StorageService) LoadPackage(userID cdssdk.UserID, packageID cdssdk.Pa } // 请求节点启动从Storage中上传文件的任务。会返回节点ID和任务ID -func (svc *StorageService) StartStorageCreatePackage(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string, storageID cdssdk.StorageID, path string, storageAffinity cdssdk.StorageID) (cdssdk.HubID, string, error) { +func (svc *StorageService) StorageCreatePackage(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string, storageID cdssdk.StorageID, path string, storageAffinity cdssdk.StorageID) (cdssdk.Package, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { - return 0, "", fmt.Errorf("new coordinator client: %w", err) + return cdssdk.Package{}, fmt.Errorf("new coordinator client: %w", err) } defer stgglb.CoordinatorMQPool.Release(coorCli) stgResp, err := coorCli.GetStorageDetails(coormq.ReqGetStorageDetails([]cdssdk.StorageID{storageID})) if err != nil { - return 0, "", fmt.Errorf("getting storage info: %w", err) + return cdssdk.Package{}, fmt.Errorf("getting storage info: %w", err) } if stgResp.Storages[0].Storage.ShardStore == nil { - return 0, "", fmt.Errorf("shard storage is not enabled") + return cdssdk.Package{}, fmt.Errorf("shard storage is not enabled") } agentCli, err := stgglb.AgentMQPool.Acquire(stgResp.Storages[0].MasterHub.HubID) if err != nil { - return 0, "", fmt.Errorf("new agent client: %w", err) + return cdssdk.Package{}, fmt.Errorf("new agent client: %w", err) } defer stgglb.AgentMQPool.Release(agentCli) - startResp, err := agentCli.StartStorageCreatePackage(agtmq.NewStartStorageCreatePackage(userID, bucketID, name, storageID, path, storageAffinity)) + createResp, err := agentCli.StorageCreatePackage(agtmq.ReqStorageCreatePackage(userID, bucketID, name, storageID, path, storageAffinity)) if err != nil { - return 0, "", fmt.Errorf("start storage upload package: %w", err) + return cdssdk.Package{}, err } - return stgResp.Storages[0].MasterHub.HubID, startResp.TaskID, nil -} - -func (svc *StorageService) WaitStorageCreatePackage(hubID cdssdk.HubID, taskID string, waitTimeout time.Duration) (bool, cdssdk.PackageID, error) { - agentCli, err := stgglb.AgentMQPool.Acquire(hubID) - if err != nil { - // TODO 失败是否要当做任务已经结束? - return true, 0, fmt.Errorf("new agent client: %w", err) - } - defer stgglb.AgentMQPool.Release(agentCli) - - waitResp, err := agentCli.WaitStorageCreatePackage(agtmq.NewWaitStorageCreatePackage(taskID, waitTimeout.Milliseconds())) - if err != nil { - // TODO 请求失败是否要当做任务已经结束? - return true, 0, fmt.Errorf("wait storage upload package: %w", err) - } - - if !waitResp.IsComplete { - return false, 0, nil - } - - if waitResp.Error != "" { - return true, 0, fmt.Errorf("%s", waitResp.Error) - } - - return true, waitResp.PackageID, nil + return createResp.Package, nil } diff --git a/common/pkgs/mq/agent/storage.go b/common/pkgs/mq/agent/storage.go index 1182752..bc200bf 100644 --- a/common/pkgs/mq/agent/storage.go +++ b/common/pkgs/mq/agent/storage.go @@ -6,15 +6,13 @@ import ( ) type StorageService interface { - StartStorageCreatePackage(msg *StartStorageCreatePackage) (*StartStorageCreatePackageResp, *mq.CodeMessage) - - WaitStorageCreatePackage(msg *WaitStorageCreatePackage) (*WaitStorageCreatePackageResp, *mq.CodeMessage) + StorageCreatePackage(msg *StorageCreatePackage) (*StorageCreatePackageResp, *mq.CodeMessage) } // 启动从Storage上传Package的任务 -var _ = Register(Service.StartStorageCreatePackage) +var _ = Register(Service.StorageCreatePackage) -type StartStorageCreatePackage struct { +type StorageCreatePackage struct { mq.MessageBodyBase UserID cdssdk.UserID `json:"userID"` BucketID cdssdk.BucketID `json:"bucketID"` @@ -23,13 +21,13 @@ type StartStorageCreatePackage struct { Path string `json:"path"` StorageAffinity cdssdk.StorageID `json:"storageAffinity"` } -type StartStorageCreatePackageResp struct { +type StorageCreatePackageResp struct { mq.MessageBodyBase - TaskID string `json:"taskID"` + Package cdssdk.Package `json:"package"` } -func NewStartStorageCreatePackage(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string, storageID cdssdk.StorageID, path string, stgAffinity cdssdk.StorageID) *StartStorageCreatePackage { - return &StartStorageCreatePackage{ +func ReqStorageCreatePackage(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string, storageID cdssdk.StorageID, path string, stgAffinity cdssdk.StorageID) *StorageCreatePackage { + return &StorageCreatePackage{ UserID: userID, BucketID: bucketID, Name: name, @@ -38,43 +36,11 @@ func NewStartStorageCreatePackage(userID cdssdk.UserID, bucketID cdssdk.BucketID StorageAffinity: stgAffinity, } } -func NewStartStorageCreatePackageResp(taskID string) *StartStorageCreatePackageResp { - return &StartStorageCreatePackageResp{ - TaskID: taskID, - } -} -func (client *Client) StartStorageCreatePackage(msg *StartStorageCreatePackage, opts ...mq.RequestOption) (*StartStorageCreatePackageResp, error) { - return mq.Request(Service.StartStorageCreatePackage, client.rabbitCli, msg, opts...) -} - -// 等待从Storage上传Package的任务 -var _ = Register(Service.WaitStorageCreatePackage) - -type WaitStorageCreatePackage struct { - mq.MessageBodyBase - TaskID string `json:"taskID"` - WaitTimeoutMs int64 `json:"waitTimeout"` -} -type WaitStorageCreatePackageResp struct { - mq.MessageBodyBase - IsComplete bool `json:"isComplete"` - Error string `json:"error"` - PackageID cdssdk.PackageID `json:"packageID"` -} - -func NewWaitStorageCreatePackage(taskID string, waitTimeoutMs int64) *WaitStorageCreatePackage { - return &WaitStorageCreatePackage{ - TaskID: taskID, - WaitTimeoutMs: waitTimeoutMs, - } -} -func NewWaitStorageCreatePackageResp(isComplete bool, err string, packageID cdssdk.PackageID) *WaitStorageCreatePackageResp { - return &WaitStorageCreatePackageResp{ - IsComplete: isComplete, - Error: err, - PackageID: packageID, +func RespStorageCreatePackage(pkg cdssdk.Package) *StorageCreatePackageResp { + return &StorageCreatePackageResp{ + Package: pkg, } } -func (client *Client) WaitStorageCreatePackage(msg *WaitStorageCreatePackage, opts ...mq.RequestOption) (*WaitStorageCreatePackageResp, error) { - return mq.Request(Service.WaitStorageCreatePackage, client.rabbitCli, msg, opts...) +func (client *Client) StorageCreatePackage(msg *StorageCreatePackage, opts ...mq.RequestOption) (*StorageCreatePackageResp, error) { + return mq.Request(Service.StorageCreatePackage, client.rabbitCli, msg, opts...) } diff --git a/common/pkgs/storage/local/public_store.go b/common/pkgs/storage/local/public_store.go index 344bda6..1d5fab3 100644 --- a/common/pkgs/storage/local/public_store.go +++ b/common/pkgs/storage/local/public_store.go @@ -2,6 +2,7 @@ package local import ( "io" + "io/fs" "os" "path/filepath" @@ -60,6 +61,64 @@ func (s *PublicStore) Write(objPath string, stream io.Reader) error { return nil } +func (s *PublicStore) Read(objPath string) (io.ReadCloser, error) { + fullPath := filepath.Join(s.cfg.LoadBase, objPath) + f, err := os.Open(fullPath) + if err != nil { + return nil, err + } + + return f, nil +} + +func (s *PublicStore) List(path string, recursive bool) ([]string, error) { + fullPath := filepath.Join(s.cfg.LoadBase, path) + + var pathes []string + if recursive { + err := filepath.WalkDir(fullPath, func(path string, d fs.DirEntry, err error) error { + if err != nil { + return err + } + if d.IsDir() { + return nil + } + + relPath, err := filepath.Rel(s.cfg.LoadBase, path) + if err != nil { + return err + } + + pathes = append(pathes, filepath.ToSlash(relPath)) + return nil + }) + if err != nil { + return nil, err + } + + } else { + files, err := os.ReadDir(fullPath) + if err != nil { + return nil, err + } + + for _, f := range files { + if f.IsDir() { + continue + } + + relPath, err := filepath.Rel(s.cfg.LoadBase, filepath.Join(fullPath, f.Name())) + if err != nil { + return nil, err + } + + pathes = append(pathes, filepath.ToSlash(relPath)) + } + } + + return pathes, nil +} + func (s *PublicStore) getLogger() logger.Logger { return logger.WithField("PublicStore", "Local").WithField("Storage", s.agt.Detail.Storage.String()) } diff --git a/common/pkgs/storage/s3/public_store.go b/common/pkgs/storage/s3/public_store.go index ab74b12..2ba5641 100644 --- a/common/pkgs/storage/s3/public_store.go +++ b/common/pkgs/storage/s3/public_store.go @@ -3,6 +3,7 @@ package s3 import ( "context" "io" + "strings" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" @@ -52,7 +53,7 @@ func (s *PublicStore) Stop() { } func (s *PublicStore) Write(objPath string, stream io.Reader) error { - key := JoinKey(objPath, s.cfg.LoadBase) + key := JoinKey(s.cfg.LoadBase, objPath) _, err := s.cli.PutObject(context.TODO(), &s3.PutObjectInput{ Bucket: aws.String(s.Bucket), @@ -63,6 +64,58 @@ func (s *PublicStore) Write(objPath string, stream io.Reader) error { return err } +func (s *PublicStore) Read(objPath string) (io.ReadCloser, error) { + key := JoinKey(s.cfg.LoadBase, objPath) + + resp, err := s.cli.GetObject(context.TODO(), &s3.GetObjectInput{ + Bucket: aws.String(s.Bucket), + Key: aws.String(key), + }) + + if err != nil { + return nil, err + } + + return resp.Body, nil +} + +func (s *PublicStore) List(path string, recursive bool) ([]string, error) { + key := JoinKey(s.cfg.LoadBase, path) + // TODO 待测试 + + input := &s3.ListObjectsInput{ + Bucket: aws.String(s.Bucket), + Prefix: aws.String(key), + } + + if !recursive { + input.Delimiter = aws.String("/") + } + + var pathes []string + + var marker *string + for { + input.Marker = marker + resp, err := s.cli.ListObjects(context.Background(), input) + if err != nil { + return nil, err + } + + for _, obj := range resp.Contents { + pathes = append(pathes, strings.TrimPrefix(*obj.Key, s.cfg.LoadBase+"/")) + } + + if !*resp.IsTruncated { + break + } + + marker = resp.NextMarker + } + + return pathes, nil +} + func (s *PublicStore) getLogger() logger.Logger { return logger.WithField("PublicStore", "S3").WithField("Storage", s.Detail.Storage.String()) } diff --git a/common/pkgs/storage/types/public_store.go b/common/pkgs/storage/types/public_store.go index 782f2aa..2f022bd 100644 --- a/common/pkgs/storage/types/public_store.go +++ b/common/pkgs/storage/types/public_store.go @@ -9,4 +9,7 @@ type PublicStore interface { Stop() Write(objectPath string, stream io.Reader) error + Read(objectPath string) (io.ReadCloser, error) + // 返回指定路径下的所有文件 + List(path string, recursive bool) ([]string, error) }