Browse Source

修复从Storage上传Package的功能

gitlink
Sydonian 8 months ago
parent
commit
2144482eea
10 changed files with 190 additions and 183 deletions
  1. +1
    -1
      agent/internal/cmd/serve.go
  2. +4
    -1
      agent/internal/mq/service.go
  3. +39
    -58
      agent/internal/mq/storage.go
  4. +3
    -17
      client/internal/cmdline/storage.go
  5. +7
    -25
      client/internal/http/storage.go
  6. +8
    -34
      client/internal/services/storage.go
  7. +12
    -46
      common/pkgs/mq/agent/storage.go
  8. +59
    -0
      common/pkgs/storage/local/public_store.go
  9. +54
    -1
      common/pkgs/storage/s3/public_store.go
  10. +3
    -0
      common/pkgs/storage/types/public_store.go

+ 1
- 1
agent/internal/cmd/serve.go View File

@@ -166,7 +166,7 @@ func serve(configPath string) {

// 启动命令服务器
// TODO 需要设计AgentID持久化机制
agtSvr, err := agtmq.NewServer(cmdsvc.NewService(&taskMgr, stgAgts), config.Cfg().ID, config.Cfg().RabbitMQ)
agtSvr, err := agtmq.NewServer(cmdsvc.NewService(&taskMgr, stgAgts, uploader), config.Cfg().ID, config.Cfg().RabbitMQ)
if err != nil {
logger.Fatalf("new agent server failed, err: %s", err.Error())
}


+ 4
- 1
agent/internal/mq/service.go View File

@@ -3,16 +3,19 @@ package mq
import (
"gitlink.org.cn/cloudream/storage/agent/internal/task"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/agtpool"
"gitlink.org.cn/cloudream/storage/common/pkgs/uploader"
)

type Service struct {
taskManager *task.Manager
stgAgts *agtpool.AgentPool
uploader *uploader.Uploader
}

func NewService(taskMgr *task.Manager, stgAgts *agtpool.AgentPool) *Service {
func NewService(taskMgr *task.Manager, stgAgts *agtpool.AgentPool, uplodaer *uploader.Uploader) *Service {
return &Service{
taskManager: taskMgr,
stgAgts: stgAgts,
uploader: uplodaer,
}
}

+ 39
- 58
agent/internal/mq/storage.go View File

@@ -1,76 +1,57 @@
package mq

import (
"time"

"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq"
mytask "gitlink.org.cn/cloudream/storage/agent/internal/task"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
)

func (svc *Service) StartStorageCreatePackage(msg *agtmq.StartStorageCreatePackage) (*agtmq.StartStorageCreatePackageResp, *mq.CodeMessage) {
return nil, mq.Failed(errorcode.OperationFailed, "not implemented")
// coorCli, err := stgglb.CoordinatorMQPool.Acquire()
// if err != nil {
// logger.Warnf("new coordinator client: %s", err.Error())

// return nil, mq.Failed(errorcode.OperationFailed, "new coordinator client failed")
// }
// defer stgglb.CoordinatorMQPool.Release(coorCli)

// getStg, err := coorCli.GetStorageDetails(coormq.ReqGetStorageDetails([]cdssdk.StorageID{msg.StorageID}))
// if err != nil {
// return nil, mq.Failed(errorcode.OperationFailed, err.Error())
// }
// if getStg.Storages[0] == nil {
// return nil, mq.Failed(errorcode.OperationFailed, "storage not found")
// }
// if getStg.Storages[0].Shared == nil {
// return nil, mq.Failed(errorcode.OperationFailed, "storage has no shared storage")
// }

// fullPath := filepath.Clean(filepath.Join(getStg.Storages[0].Shared.LoadBase, msg.Path))

// var uploadFilePathes []string
// err = filepath.WalkDir(fullPath, func(fname string, fi os.DirEntry, err error) error {
// if err != nil {
// return nil
// }

// if !fi.IsDir() {
// uploadFilePathes = append(uploadFilePathes, fname)
// }
func (svc *Service) StorageCreatePackage(msg *agtmq.StorageCreatePackage) (*agtmq.StorageCreatePackageResp, *mq.CodeMessage) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
logger.Warnf("new coordinator client: %s", err.Error())

// return nil
// })
// if err != nil {
// logger.Warnf("opening directory %s: %s", fullPath, err.Error())

// return nil, mq.Failed(errorcode.OperationFailed, "read directory failed")
// }
return nil, mq.Failed(errorcode.OperationFailed, "new coordinator client failed")
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

// objIter := iterator.NewUploadingObjectIterator(fullPath, uploadFilePathes)
// tsk := svc.taskManager.StartNew(mytask.NewCreatePackage(msg.UserID, msg.BucketID, msg.Name, objIter, msg.StorageAffinity))
// return mq.ReplyOK(agtmq.NewStartStorageCreatePackageResp(tsk.ID()))
}
pub, err := svc.stgAgts.GetPublicStore(msg.StorageID)
if err != nil {
return nil, mq.Failed(errorcode.OperationFailed, err.Error())
}

func (svc *Service) WaitStorageCreatePackage(msg *agtmq.WaitStorageCreatePackage) (*agtmq.WaitStorageCreatePackageResp, *mq.CodeMessage) {
tsk := svc.taskManager.FindByID(msg.TaskID)
if tsk == nil {
return nil, mq.Failed(errorcode.TaskNotFound, "task not found")
createResp, err := coorCli.CreatePackage(coormq.NewCreatePackage(msg.UserID, msg.BucketID, msg.Name))
if err != nil {
return nil, mq.Failed(errorcode.OperationFailed, err.Error())
}

if msg.WaitTimeoutMs == 0 {
tsk.Wait()
} else if !tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs) * time.Millisecond) {
return mq.ReplyOK(agtmq.NewWaitStorageCreatePackageResp(false, "", 0))
uploader, err := svc.uploader.BeginUpdate(msg.UserID, createResp.Package.PackageID, msg.StorageAffinity, nil, nil)
if err != nil {
return nil, mq.Failed(errorcode.OperationFailed, err.Error())
}

if tsk.Error() != nil {
return mq.ReplyOK(agtmq.NewWaitStorageCreatePackageResp(true, tsk.Error().Error(), 0))
objPathes, err := pub.List(msg.Path, true)
for _, p := range objPathes {
o, err := pub.Read(p)
if err != nil {
logger.Warnf("read object %s: %v", p, err)
continue
}

err = uploader.Upload(p, o)
o.Close()
if err != nil {
logger.Warnf("upload object %s: %v", p, err)
continue
}
}
_, err = uploader.Commit()
if err != nil {
return nil, mq.Failed(errorcode.OperationFailed, err.Error())
}

taskBody := tsk.Body().(*mytask.CreatePackage)
return mq.ReplyOK(agtmq.NewWaitStorageCreatePackageResp(true, "", taskBody.Result.PackageID))
return mq.ReplyOK(agtmq.RespStorageCreatePackage(createResp.Package))
}

+ 3
- 17
client/internal/cmdline/storage.go View File

@@ -22,27 +22,13 @@ func StorageCreatePackage(ctx CommandContext, bucketID cdssdk.BucketID, name str
}()

// 开始创建并上传包到存储系统
hubID, taskID, err := ctx.Cmdline.Svc.StorageSvc().StartStorageCreatePackage(1, bucketID, name, storageID, path, 0)
pkg, err := ctx.Cmdline.Svc.StorageSvc().StorageCreatePackage(1, bucketID, name, storageID, path, 0)
if err != nil {
return fmt.Errorf("start storage uploading package: %w", err)
}

// 循环等待上传完成
for {
complete, packageID, err := ctx.Cmdline.Svc.StorageSvc().WaitStorageCreatePackage(hubID, taskID, time.Second*10)
if complete {
if err != nil {
return fmt.Errorf("uploading complete with: %w", err)
}

fmt.Printf("%d\n", packageID)
return nil
}

if err != nil {
return fmt.Errorf("wait uploading: %w", err)
}
}
fmt.Printf("%d\n", pkg.PackageID)
return nil
}

// 初始化函数,注册加载包和创建包的命令到命令行解析器。


+ 7
- 25
client/internal/http/storage.go View File

@@ -1,8 +1,8 @@
package http

import (
"fmt"
"net/http"
"time"

"github.com/gin-gonic/gin"
"gitlink.org.cn/cloudream/common/consts/errorcode"
@@ -50,35 +50,17 @@ func (s *StorageService) CreatePackage(ctx *gin.Context) {
return
}

hubID, taskID, err := s.svc.StorageSvc().StartStorageCreatePackage(
pkg, err := s.svc.StorageSvc().StorageCreatePackage(
req.UserID, req.BucketID, req.Name, req.StorageID, req.Path, req.StorageAffinity)
if err != nil {
log.Warnf("start storage create package: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage create package failed"))
log.Warnf("storage create package: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("storage create package: %v", err)))
return
}

for {
complete, packageID, err := s.svc.StorageSvc().WaitStorageCreatePackage(hubID, taskID, time.Second*10)
if complete {
if err != nil {
log.Warnf("creating complete with: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage create package failed"))
return
}

ctx.JSON(http.StatusOK, OK(cdsapi.StorageCreatePackageResp{
PackageID: packageID,
}))
return
}

if err != nil {
log.Warnf("wait creating: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage create package failed"))
return
}
}
ctx.JSON(http.StatusOK, OK(cdsapi.StorageCreatePackageResp{
Package: pkg,
}))
}

func (s *StorageService) Get(ctx *gin.Context) {


+ 8
- 34
client/internal/services/storage.go View File

@@ -4,7 +4,6 @@ import (
"context"
"fmt"
"path"
"time"

"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
@@ -161,57 +160,32 @@ func (svc *StorageService) LoadPackage(userID cdssdk.UserID, packageID cdssdk.Pa
}

// 请求节点启动从Storage中上传文件的任务。会返回节点ID和任务ID
func (svc *StorageService) StartStorageCreatePackage(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string, storageID cdssdk.StorageID, path string, storageAffinity cdssdk.StorageID) (cdssdk.HubID, string, error) {
func (svc *StorageService) StorageCreatePackage(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string, storageID cdssdk.StorageID, path string, storageAffinity cdssdk.StorageID) (cdssdk.Package, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return 0, "", fmt.Errorf("new coordinator client: %w", err)
return cdssdk.Package{}, fmt.Errorf("new coordinator client: %w", err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

stgResp, err := coorCli.GetStorageDetails(coormq.ReqGetStorageDetails([]cdssdk.StorageID{storageID}))
if err != nil {
return 0, "", fmt.Errorf("getting storage info: %w", err)
return cdssdk.Package{}, fmt.Errorf("getting storage info: %w", err)
}

if stgResp.Storages[0].Storage.ShardStore == nil {
return 0, "", fmt.Errorf("shard storage is not enabled")
return cdssdk.Package{}, fmt.Errorf("shard storage is not enabled")
}

agentCli, err := stgglb.AgentMQPool.Acquire(stgResp.Storages[0].MasterHub.HubID)
if err != nil {
return 0, "", fmt.Errorf("new agent client: %w", err)
return cdssdk.Package{}, fmt.Errorf("new agent client: %w", err)
}
defer stgglb.AgentMQPool.Release(agentCli)

startResp, err := agentCli.StartStorageCreatePackage(agtmq.NewStartStorageCreatePackage(userID, bucketID, name, storageID, path, storageAffinity))
createResp, err := agentCli.StorageCreatePackage(agtmq.ReqStorageCreatePackage(userID, bucketID, name, storageID, path, storageAffinity))
if err != nil {
return 0, "", fmt.Errorf("start storage upload package: %w", err)
return cdssdk.Package{}, err
}

return stgResp.Storages[0].MasterHub.HubID, startResp.TaskID, nil
}

func (svc *StorageService) WaitStorageCreatePackage(hubID cdssdk.HubID, taskID string, waitTimeout time.Duration) (bool, cdssdk.PackageID, error) {
agentCli, err := stgglb.AgentMQPool.Acquire(hubID)
if err != nil {
// TODO 失败是否要当做任务已经结束?
return true, 0, fmt.Errorf("new agent client: %w", err)
}
defer stgglb.AgentMQPool.Release(agentCli)

waitResp, err := agentCli.WaitStorageCreatePackage(agtmq.NewWaitStorageCreatePackage(taskID, waitTimeout.Milliseconds()))
if err != nil {
// TODO 请求失败是否要当做任务已经结束?
return true, 0, fmt.Errorf("wait storage upload package: %w", err)
}

if !waitResp.IsComplete {
return false, 0, nil
}

if waitResp.Error != "" {
return true, 0, fmt.Errorf("%s", waitResp.Error)
}

return true, waitResp.PackageID, nil
return createResp.Package, nil
}

+ 12
- 46
common/pkgs/mq/agent/storage.go View File

@@ -6,15 +6,13 @@ import (
)

type StorageService interface {
StartStorageCreatePackage(msg *StartStorageCreatePackage) (*StartStorageCreatePackageResp, *mq.CodeMessage)

WaitStorageCreatePackage(msg *WaitStorageCreatePackage) (*WaitStorageCreatePackageResp, *mq.CodeMessage)
StorageCreatePackage(msg *StorageCreatePackage) (*StorageCreatePackageResp, *mq.CodeMessage)
}

// 启动从Storage上传Package的任务
var _ = Register(Service.StartStorageCreatePackage)
var _ = Register(Service.StorageCreatePackage)

type StartStorageCreatePackage struct {
type StorageCreatePackage struct {
mq.MessageBodyBase
UserID cdssdk.UserID `json:"userID"`
BucketID cdssdk.BucketID `json:"bucketID"`
@@ -23,13 +21,13 @@ type StartStorageCreatePackage struct {
Path string `json:"path"`
StorageAffinity cdssdk.StorageID `json:"storageAffinity"`
}
type StartStorageCreatePackageResp struct {
type StorageCreatePackageResp struct {
mq.MessageBodyBase
TaskID string `json:"taskID"`
Package cdssdk.Package `json:"package"`
}

func NewStartStorageCreatePackage(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string, storageID cdssdk.StorageID, path string, stgAffinity cdssdk.StorageID) *StartStorageCreatePackage {
return &StartStorageCreatePackage{
func ReqStorageCreatePackage(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string, storageID cdssdk.StorageID, path string, stgAffinity cdssdk.StorageID) *StorageCreatePackage {
return &StorageCreatePackage{
UserID: userID,
BucketID: bucketID,
Name: name,
@@ -38,43 +36,11 @@ func NewStartStorageCreatePackage(userID cdssdk.UserID, bucketID cdssdk.BucketID
StorageAffinity: stgAffinity,
}
}
func NewStartStorageCreatePackageResp(taskID string) *StartStorageCreatePackageResp {
return &StartStorageCreatePackageResp{
TaskID: taskID,
}
}
func (client *Client) StartStorageCreatePackage(msg *StartStorageCreatePackage, opts ...mq.RequestOption) (*StartStorageCreatePackageResp, error) {
return mq.Request(Service.StartStorageCreatePackage, client.rabbitCli, msg, opts...)
}

// 等待从Storage上传Package的任务
var _ = Register(Service.WaitStorageCreatePackage)

type WaitStorageCreatePackage struct {
mq.MessageBodyBase
TaskID string `json:"taskID"`
WaitTimeoutMs int64 `json:"waitTimeout"`
}
type WaitStorageCreatePackageResp struct {
mq.MessageBodyBase
IsComplete bool `json:"isComplete"`
Error string `json:"error"`
PackageID cdssdk.PackageID `json:"packageID"`
}

func NewWaitStorageCreatePackage(taskID string, waitTimeoutMs int64) *WaitStorageCreatePackage {
return &WaitStorageCreatePackage{
TaskID: taskID,
WaitTimeoutMs: waitTimeoutMs,
}
}
func NewWaitStorageCreatePackageResp(isComplete bool, err string, packageID cdssdk.PackageID) *WaitStorageCreatePackageResp {
return &WaitStorageCreatePackageResp{
IsComplete: isComplete,
Error: err,
PackageID: packageID,
func RespStorageCreatePackage(pkg cdssdk.Package) *StorageCreatePackageResp {
return &StorageCreatePackageResp{
Package: pkg,
}
}
func (client *Client) WaitStorageCreatePackage(msg *WaitStorageCreatePackage, opts ...mq.RequestOption) (*WaitStorageCreatePackageResp, error) {
return mq.Request(Service.WaitStorageCreatePackage, client.rabbitCli, msg, opts...)
func (client *Client) StorageCreatePackage(msg *StorageCreatePackage, opts ...mq.RequestOption) (*StorageCreatePackageResp, error) {
return mq.Request(Service.StorageCreatePackage, client.rabbitCli, msg, opts...)
}

+ 59
- 0
common/pkgs/storage/local/public_store.go View File

@@ -2,6 +2,7 @@ package local

import (
"io"
"io/fs"
"os"
"path/filepath"

@@ -60,6 +61,64 @@ func (s *PublicStore) Write(objPath string, stream io.Reader) error {
return nil
}

func (s *PublicStore) Read(objPath string) (io.ReadCloser, error) {
fullPath := filepath.Join(s.cfg.LoadBase, objPath)
f, err := os.Open(fullPath)
if err != nil {
return nil, err
}

return f, nil
}

func (s *PublicStore) List(path string, recursive bool) ([]string, error) {
fullPath := filepath.Join(s.cfg.LoadBase, path)

var pathes []string
if recursive {
err := filepath.WalkDir(fullPath, func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
if d.IsDir() {
return nil
}

relPath, err := filepath.Rel(s.cfg.LoadBase, path)
if err != nil {
return err
}

pathes = append(pathes, filepath.ToSlash(relPath))
return nil
})
if err != nil {
return nil, err
}

} else {
files, err := os.ReadDir(fullPath)
if err != nil {
return nil, err
}

for _, f := range files {
if f.IsDir() {
continue
}

relPath, err := filepath.Rel(s.cfg.LoadBase, filepath.Join(fullPath, f.Name()))
if err != nil {
return nil, err
}

pathes = append(pathes, filepath.ToSlash(relPath))
}
}

return pathes, nil
}

func (s *PublicStore) getLogger() logger.Logger {
return logger.WithField("PublicStore", "Local").WithField("Storage", s.agt.Detail.Storage.String())
}

+ 54
- 1
common/pkgs/storage/s3/public_store.go View File

@@ -3,6 +3,7 @@ package s3
import (
"context"
"io"
"strings"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
@@ -52,7 +53,7 @@ func (s *PublicStore) Stop() {
}

func (s *PublicStore) Write(objPath string, stream io.Reader) error {
key := JoinKey(objPath, s.cfg.LoadBase)
key := JoinKey(s.cfg.LoadBase, objPath)

_, err := s.cli.PutObject(context.TODO(), &s3.PutObjectInput{
Bucket: aws.String(s.Bucket),
@@ -63,6 +64,58 @@ func (s *PublicStore) Write(objPath string, stream io.Reader) error {
return err
}

func (s *PublicStore) Read(objPath string) (io.ReadCloser, error) {
key := JoinKey(s.cfg.LoadBase, objPath)

resp, err := s.cli.GetObject(context.TODO(), &s3.GetObjectInput{
Bucket: aws.String(s.Bucket),
Key: aws.String(key),
})

if err != nil {
return nil, err
}

return resp.Body, nil
}

func (s *PublicStore) List(path string, recursive bool) ([]string, error) {
key := JoinKey(s.cfg.LoadBase, path)
// TODO 待测试

input := &s3.ListObjectsInput{
Bucket: aws.String(s.Bucket),
Prefix: aws.String(key),
}

if !recursive {
input.Delimiter = aws.String("/")
}

var pathes []string

var marker *string
for {
input.Marker = marker
resp, err := s.cli.ListObjects(context.Background(), input)
if err != nil {
return nil, err
}

for _, obj := range resp.Contents {
pathes = append(pathes, strings.TrimPrefix(*obj.Key, s.cfg.LoadBase+"/"))
}

if !*resp.IsTruncated {
break
}

marker = resp.NextMarker
}

return pathes, nil
}

func (s *PublicStore) getLogger() logger.Logger {
return logger.WithField("PublicStore", "S3").WithField("Storage", s.Detail.Storage.String())
}

+ 3
- 0
common/pkgs/storage/types/public_store.go View File

@@ -9,4 +9,7 @@ type PublicStore interface {
Stop()

Write(objectPath string, stream io.Reader) error
Read(objectPath string) (io.ReadCloser, error)
// 返回指定路径下的所有文件
List(path string, recursive bool) ([]string, error)
}

Loading…
Cancel
Save