Browse Source

上传文件接口支持调度

gitlink
Sydonian 1 year ago
parent
commit
f4159320a9
6 changed files with 49 additions and 19 deletions
  1. +1
    -1
      agent/internal/task/create_package.go
  2. +1
    -1
      client/internal/cmdline/object.go
  3. +1
    -1
      client/internal/cmdline/put.go
  4. +1
    -1
      client/internal/http/object.go
  5. +18
    -10
      common/pkgs/uploader/update.go
  6. +27
    -5
      common/pkgs/uploader/uploader.go

+ 1
- 1
agent/internal/task/create_package.go View File

@@ -85,7 +85,7 @@ func (t *CreatePackage) Execute(task *task.Task[TaskContext], ctx TaskContext, c
return
}

up, err := ctx.uploader.BeginUpdate(t.userID, createResp.Package.PackageID, t.stgAffinity)
up, err := ctx.uploader.BeginUpdate(t.userID, createResp.Package.PackageID, t.stgAffinity, nil, nil)
if err != nil {
err = fmt.Errorf("begin update: %w", err)
log.Error(err.Error())


+ 1
- 1
client/internal/cmdline/object.go View File

@@ -33,7 +33,7 @@ var _ = MustAddCmd(func(ctx CommandContext, packageID cdssdk.PackageID, rootPath
storageAff = storageAffinity[0]
}

up, err := ctx.Cmdline.Svc.Uploader.BeginUpdate(userID, packageID, storageAff)
up, err := ctx.Cmdline.Svc.Uploader.BeginUpdate(userID, packageID, storageAff, nil, nil)
if err != nil {
return fmt.Errorf("begin updating package: %w", err)
}


+ 1
- 1
client/internal/cmdline/put.go View File

@@ -68,7 +68,7 @@ func init() {
storageAff = cdssdk.StorageID(stgID)
}

up, err := cmdCtx.Cmdline.Svc.Uploader.BeginUpdate(userID, pkg.PackageID, storageAff)
up, err := cmdCtx.Cmdline.Svc.Uploader.BeginUpdate(userID, pkg.PackageID, storageAff, nil, nil)
if err != nil {
fmt.Printf("begin updating package: %v\n", err)
return


+ 1
- 1
client/internal/http/object.go View File

@@ -63,7 +63,7 @@ func (s *ObjectService) Upload(ctx *gin.Context) {
return
}

up, err := s.svc.Uploader.BeginUpdate(req.Info.UserID, req.Info.PackageID, req.Info.Affinity)
up, err := s.svc.Uploader.BeginUpdate(req.Info.UserID, req.Info.PackageID, req.Info.Affinity, req.Info.LoadTo, req.Info.LoadToPath)
if err != nil {
log.Warnf("begin update: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("begin update: %v", err)))


+ 18
- 10
common/pkgs/uploader/update.go View File

@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"path"
"sync"
"time"

@@ -19,13 +20,15 @@ import (
)

type UpdateUploader struct {
uploader *Uploader
pkgID cdssdk.PackageID
targetStg stgmod.StorageDetail
distMutex *distlock.Mutex
successes []coormq.AddObjectEntry
lock sync.Mutex
commited bool
uploader *Uploader
pkgID cdssdk.PackageID
targetStg stgmod.StorageDetail
distMutex *distlock.Mutex
loadToStgs []stgmod.StorageDetail
loadToPath []string
successes []coormq.AddObjectEntry
lock sync.Mutex
commited bool
}

type UploadStorageInfo struct {
@@ -39,12 +42,17 @@ type UpdateResult struct {
Objects map[string]cdssdk.Object
}

func (w *UpdateUploader) Upload(path string, size int64, stream io.Reader) error {
func (w *UpdateUploader) Upload(pat string, size int64, stream io.Reader) error {
uploadTime := time.Now()

ft := ioswitch2.NewFromTo()
fromExec, hd := ioswitch2.NewFromDriver(ioswitch2.RawStream())
ft.AddFrom(fromExec).AddTo(ioswitch2.NewToShardStore(*w.targetStg.MasterHub, w.targetStg, ioswitch2.RawStream(), "fileHash"))
ft.AddFrom(fromExec).
AddTo(ioswitch2.NewToShardStore(*w.targetStg.MasterHub, w.targetStg, ioswitch2.RawStream(), "fileHash"))

for i, stg := range w.loadToStgs {
ft.AddTo(ioswitch2.NewLoadToShared(*stg.MasterHub, stg.Storage, path.Join(w.loadToPath[i], pat)))
}

plans := exec.NewPlanBuilder()
err := parser.Parse(ft, plans)
@@ -66,7 +74,7 @@ func (w *UpdateUploader) Upload(path string, size int64, stream io.Reader) error

// 记录上传结果
w.successes = append(w.successes, coormq.AddObjectEntry{
Path: path,
Path: pat,
Size: size,
FileHash: ret["fileHash"].(*ops2.FileHashValue).Hash,
UploadTime: uploadTime,


+ 27
- 5
common/pkgs/uploader/uploader.go View File

@@ -24,6 +24,8 @@ type Uploader struct {
connectivity *connectivity.Collector
stgMgr *svcmgr.Manager
stgMeta *metacache.StorageMeta
loadTo []cdssdk.StorageID
loadToPath []string
}

func NewUploader(distlock *distlock.Service, connectivity *connectivity.Collector, stgMgr *svcmgr.Manager, stgMeta *metacache.StorageMeta) *Uploader {
@@ -35,7 +37,7 @@ func NewUploader(distlock *distlock.Service, connectivity *connectivity.Collecto
}
}

func (u *Uploader) BeginUpdate(userID cdssdk.UserID, pkgID cdssdk.PackageID, affinity cdssdk.StorageID) (*UpdateUploader, error) {
func (u *Uploader) BeginUpdate(userID cdssdk.UserID, pkgID cdssdk.PackageID, affinity cdssdk.StorageID, loadTo []cdssdk.StorageID, loadToPath []string) (*UpdateUploader, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
@@ -72,6 +74,24 @@ func (u *Uploader) BeginUpdate(userID cdssdk.UserID, pkgID cdssdk.PackageID, aff
return nil, fmt.Errorf("user no available storages")
}

loadToStgs := make([]stgmod.StorageDetail, len(loadTo))
for i, stgID := range loadTo {
stg, ok := lo.Find(getUserStgsResp.Storages, func(stg stgmod.StorageDetail) bool {
return stg.Storage.StorageID == stgID
})
if !ok {
return nil, fmt.Errorf("load to storage %v not found", stgID)
}
if stg.MasterHub == nil {
return nil, fmt.Errorf("load to storage %v has no master hub", stgID)
}
if stg.Storage.SharedStore == nil {
return nil, fmt.Errorf("load to storage %v has no shared store", stgID)
}

loadToStgs[i] = stg
}

target := u.chooseUploadStorage(userStgs, affinity)

// 给上传节点的IPFS加锁
@@ -83,10 +103,12 @@ func (u *Uploader) BeginUpdate(userID cdssdk.UserID, pkgID cdssdk.PackageID, aff
}

return &UpdateUploader{
uploader: u,
pkgID: pkgID,
targetStg: target.Storage,
distMutex: distMutex,
uploader: u,
pkgID: pkgID,
targetStg: target.Storage,
distMutex: distMutex,
loadToStgs: loadToStgs,
loadToPath: loadToPath,
}, nil
}



Loading…
Cancel
Save