From 3c71eba4a9925257af86eacc28e585ad38f1e5ed Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Thu, 7 Sep 2023 11:36:39 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=AF=E6=8C=81=E4=B8=8A=E4=BC=A0Package?= =?UTF-8?q?=E6=97=B6=E8=AE=BE=E7=BD=AE=E8=8A=82=E7=82=B9=E4=BA=B2=E5=92=8C?= =?UTF-8?q?=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- agent/internal/services/mq/storage.go | 4 +-- agent/internal/task/create_ec_package.go | 4 +-- agent/internal/task/create_rep_package.go | 4 +-- client/internal/cmdline/package.go | 18 +++++++--- client/internal/cmdline/storage.go | 2 +- client/internal/http/package.go | 13 +++---- client/internal/http/storage.go | 15 ++++---- client/internal/services/package.go | 8 ++--- client/internal/services/storage.go | 4 +-- client/internal/task/create_ec_package.go | 4 +-- client/internal/task/create_rep_package.go | 4 +-- common/pkgs/cmd/create_ec_package.go | 25 ++++++++------ common/pkgs/cmd/create_rep_package.go | 40 ++++++++++++++-------- common/pkgs/mq/agent/storage.go | 28 ++++++++------- 14 files changed, 100 insertions(+), 73 deletions(-) diff --git a/agent/internal/services/mq/storage.go b/agent/internal/services/mq/storage.go index 2283d07..df845f9 100644 --- a/agent/internal/services/mq/storage.go +++ b/agent/internal/services/mq/storage.go @@ -203,7 +203,7 @@ func (svc *Service) StartStorageCreatePackage(msg *agtmq.StartStorageCreatePacka return nil, mq.Failed(errorcode.OperationFailed, "get rep redundancy info failed") } - tsk := svc.taskManager.StartNew(mytask.NewCreateRepPackage(msg.UserID, msg.BucketID, msg.Name, objIter, repInfo)) + tsk := svc.taskManager.StartNew(mytask.NewCreateRepPackage(msg.UserID, msg.BucketID, msg.Name, objIter, repInfo, msg.NodeAffinity)) return mq.ReplyOK(agtmq.NewStartStorageCreatePackageResp(tsk.ID())) } @@ -214,7 +214,7 @@ func (svc *Service) StartStorageCreatePackage(msg *agtmq.StartStorageCreatePacka return nil, mq.Failed(errorcode.OperationFailed, "get ec redundancy info failed") } - tsk := svc.taskManager.StartNew(mytask.NewCreateECPackage(msg.UserID, msg.BucketID, msg.Name, objIter, ecInfo)) + tsk := svc.taskManager.StartNew(mytask.NewCreateECPackage(msg.UserID, msg.BucketID, msg.Name, objIter, ecInfo, msg.NodeAffinity)) return mq.ReplyOK(agtmq.NewStartStorageCreatePackageResp(tsk.ID())) } diff --git a/agent/internal/task/create_ec_package.go b/agent/internal/task/create_ec_package.go index 44b78e0..b651e95 100644 --- a/agent/internal/task/create_ec_package.go +++ b/agent/internal/task/create_ec_package.go @@ -18,9 +18,9 @@ type CreateECPackage struct { Result *CreateECPackageResult } -func NewCreateECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.ECRedundancyInfo) *CreateECPackage { +func NewCreateECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.ECRedundancyInfo, nodeAffinity *int64) *CreateECPackage { return &CreateECPackage{ - cmd: *cmd.NewCreateECPackage(userID, bucketID, name, objIter, redundancy), + cmd: *cmd.NewCreateECPackage(userID, bucketID, name, objIter, redundancy, nodeAffinity), } } diff --git a/agent/internal/task/create_rep_package.go b/agent/internal/task/create_rep_package.go index 6167e4a..1fc7b59 100644 --- a/agent/internal/task/create_rep_package.go +++ b/agent/internal/task/create_rep_package.go @@ -18,9 +18,9 @@ type CreateRepPackage struct { Result *CreateRepPackageResult } -func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.RepRedundancyInfo) *CreateRepPackage { +func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.RepRedundancyInfo, nodeAffinity *int64) *CreateRepPackage { return &CreateRepPackage{ - cmd: *cmd.NewCreateRepPackage(userID, bucketID, name, objIter, redundancy), + cmd: *cmd.NewCreateRepPackage(userID, bucketID, name, objIter, redundancy, nodeAffinity), } } diff --git a/client/internal/cmdline/package.go b/client/internal/cmdline/package.go index 2837d07..50db31f 100644 --- a/client/internal/cmdline/package.go +++ b/client/internal/cmdline/package.go @@ -87,7 +87,7 @@ func PackageDownloadPackage(ctx CommandContext, outputDir string, packageID int6 return nil } -func PackageUploadRepPackage(ctx CommandContext, rootPath string, bucketID int64, name string, repCount int) error { +func PackageUploadRepPackage(ctx CommandContext, rootPath string, bucketID int64, name string, repCount int, nodeAffinity []int64) error { rootPath = filepath.Clean(rootPath) var uploadFilePathes []string @@ -106,8 +106,13 @@ func PackageUploadRepPackage(ctx CommandContext, rootPath string, bucketID int64 return fmt.Errorf("open directory %s failed, err: %w", rootPath, err) } + var nodeAff *int64 + if len(nodeAffinity) > 0 { + nodeAff = &nodeAffinity[0] + } + objIter := iterator.NewUploadingObjectIterator(rootPath, uploadFilePathes) - taskID, err := ctx.Cmdline.Svc.PackageSvc().StartCreatingRepPackage(0, bucketID, name, objIter, models.NewRepRedundancyInfo(repCount)) + taskID, err := ctx.Cmdline.Svc.PackageSvc().StartCreatingRepPackage(0, bucketID, name, objIter, models.NewRepRedundancyInfo(repCount), nodeAff) if err != nil { return fmt.Errorf("upload file data failed, err: %w", err) @@ -181,7 +186,7 @@ func PackageUpdateRepPackage(ctx CommandContext, packageID int64, rootPath strin } } -func PackageUploadECPackage(ctx CommandContext, rootPath string, bucketID int64, name string, ecName string) error { +func PackageUploadECPackage(ctx CommandContext, rootPath string, bucketID int64, name string, ecName string, nodeAffinity []int64) error { var uploadFilePathes []string err := filepath.WalkDir(rootPath, func(fname string, fi os.DirEntry, err error) error { if err != nil { @@ -198,8 +203,13 @@ func PackageUploadECPackage(ctx CommandContext, rootPath string, bucketID int64, return fmt.Errorf("open directory %s failed, err: %w", rootPath, err) } + var nodeAff *int64 + if len(nodeAffinity) > 0 { + nodeAff = &nodeAffinity[0] + } + objIter := iterator.NewUploadingObjectIterator(rootPath, uploadFilePathes) - taskID, err := ctx.Cmdline.Svc.PackageSvc().StartCreatingECPackage(0, bucketID, name, objIter, models.NewECRedundancyInfo(ecName, config.Cfg().ECPacketSize)) + taskID, err := ctx.Cmdline.Svc.PackageSvc().StartCreatingECPackage(0, bucketID, name, objIter, models.NewECRedundancyInfo(ecName, config.Cfg().ECPacketSize), nodeAff) if err != nil { return fmt.Errorf("upload file data failed, err: %w", err) diff --git a/client/internal/cmdline/storage.go b/client/internal/cmdline/storage.go index a8a22f9..38e6e23 100644 --- a/client/internal/cmdline/storage.go +++ b/client/internal/cmdline/storage.go @@ -31,7 +31,7 @@ func StorageLoadPackage(ctx CommandContext, packageID int64, storageID int64) er func StorageCreateRepPackage(ctx CommandContext, bucketID int64, name string, storageID int64, path string, repCount int) error { nodeID, taskID, err := ctx.Cmdline.Svc.StorageSvc().StartStorageCreatePackage(0, bucketID, name, storageID, path, - models.NewTypedRepRedundancyInfo(repCount)) + models.NewTypedRepRedundancyInfo(repCount), nil) if err != nil { return fmt.Errorf("start storage uploading rep package: %w", err) } diff --git a/client/internal/http/package.go b/client/internal/http/package.go index 8eb5b2e..da764a6 100644 --- a/client/internal/http/package.go +++ b/client/internal/http/package.go @@ -30,10 +30,11 @@ type PackageUploadReq struct { } type PackageUploadInfo struct { - UserID *int64 `json:"userID" binding:"required"` - BucketID *int64 `json:"bucketID" binding:"required"` - Name string `json:"name" binding:"required"` - Redundancy models.TypedRedundancyInfo `json:"redundancy" binding:"required"` + UserID *int64 `json:"userID" binding:"required"` + BucketID *int64 `json:"bucketID" binding:"required"` + Name string `json:"name" binding:"required"` + Redundancy models.TypedRedundancyInfo `json:"redundancy" binding:"required"` + NodeAffinity *int64 `json:"nodeAffinity"` } type PackageUploadResp struct { @@ -76,7 +77,7 @@ func (s *PackageService) uploadRep(ctx *gin.Context, req *PackageUploadReq) { objIter := mapMultiPartFileToUploadingObject(req.Files) - taskID, err := s.svc.PackageSvc().StartCreatingRepPackage(*req.Info.UserID, *req.Info.BucketID, req.Info.Name, objIter, repInfo) + taskID, err := s.svc.PackageSvc().StartCreatingRepPackage(*req.Info.UserID, *req.Info.BucketID, req.Info.Name, objIter, repInfo, req.Info.NodeAffinity) if err != nil { log.Warnf("start uploading rep package task: %s", err.Error()) @@ -120,7 +121,7 @@ func (s *PackageService) uploadEC(ctx *gin.Context, req *PackageUploadReq) { objIter := mapMultiPartFileToUploadingObject(req.Files) - taskID, err := s.svc.PackageSvc().StartCreatingECPackage(*req.Info.UserID, *req.Info.BucketID, req.Info.Name, objIter, ecInfo) + taskID, err := s.svc.PackageSvc().StartCreatingECPackage(*req.Info.UserID, *req.Info.BucketID, req.Info.Name, objIter, ecInfo, req.Info.NodeAffinity) if err != nil { log.Warnf("start uploading ec package task: %s", err.Error()) diff --git a/client/internal/http/storage.go b/client/internal/http/storage.go index 1334170..60a5587 100644 --- a/client/internal/http/storage.go +++ b/client/internal/http/storage.go @@ -65,12 +65,13 @@ func (s *StorageService) LoadPackage(ctx *gin.Context) { } type StorageCreatePackageReq struct { - UserID *int64 `json:"userID" binding:"required"` - StorageID *int64 `json:"storageID" binding:"required"` - Path string `json:"path" binding:"required"` - BucketID *int64 `json:"bucketID" binding:"required"` - Name string `json:"name" binding:"required"` - Redundancy models.TypedRedundancyInfo `json:"redundancy" binding:"required"` + UserID *int64 `json:"userID" binding:"required"` + StorageID *int64 `json:"storageID" binding:"required"` + Path string `json:"path" binding:"required"` + BucketID *int64 `json:"bucketID" binding:"required"` + Name string `json:"name" binding:"required"` + Redundancy models.TypedRedundancyInfo `json:"redundancy" binding:"required"` + NodeAffinity *int64 `json:"nodeAffinity"` } type StorageCreatePackageResp struct { @@ -88,7 +89,7 @@ func (s *StorageService) CreatePackage(ctx *gin.Context) { } nodeID, taskID, err := s.svc.StorageSvc().StartStorageCreatePackage( - *req.UserID, *req.BucketID, req.Name, *req.StorageID, req.Path, req.Redundancy) + *req.UserID, *req.BucketID, req.Name, *req.StorageID, req.Path, req.Redundancy, req.NodeAffinity) if err != nil { log.Warnf("start storage create package: %s", err.Error()) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage create package failed")) diff --git a/client/internal/services/package.go b/client/internal/services/package.go index f518c42..f65907f 100644 --- a/client/internal/services/package.go +++ b/client/internal/services/package.go @@ -123,8 +123,8 @@ func (svc *PackageService) downloadECPackage(pkg model.Package, objects []model. return iter, nil } -func (svc *PackageService) StartCreatingRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, repInfo models.RepRedundancyInfo) (string, error) { - tsk := svc.TaskMgr.StartNew(mytask.NewCreateRepPackage(userID, bucketID, name, objIter, repInfo)) +func (svc *PackageService) StartCreatingRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, repInfo models.RepRedundancyInfo, nodeAffinity *int64) (string, error) { + tsk := svc.TaskMgr.StartNew(mytask.NewCreateRepPackage(userID, bucketID, name, objIter, repInfo, nodeAffinity)) return tsk.ID(), nil } @@ -151,8 +151,8 @@ func (svc *PackageService) WaitUpdatingRepPackage(taskID string, waitTimeout tim return false, nil, nil } -func (svc *PackageService) StartCreatingECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, ecInfo models.ECRedundancyInfo) (string, error) { - tsk := svc.TaskMgr.StartNew(mytask.NewCreateECPackage(userID, bucketID, name, objIter, ecInfo)) +func (svc *PackageService) StartCreatingECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, ecInfo models.ECRedundancyInfo, nodeAffinity *int64) (string, error) { + tsk := svc.TaskMgr.StartNew(mytask.NewCreateECPackage(userID, bucketID, name, objIter, ecInfo, nodeAffinity)) return tsk.ID(), nil } diff --git a/client/internal/services/storage.go b/client/internal/services/storage.go index 09c21cd..3083ed6 100644 --- a/client/internal/services/storage.go +++ b/client/internal/services/storage.go @@ -38,7 +38,7 @@ func (svc *StorageService) DeleteStoragePackage(userID int64, packageID int64, s } // 请求节点启动从Storage中上传文件的任务。会返回节点ID和任务ID -func (svc *StorageService) StartStorageCreatePackage(userID int64, bucketID int64, name string, storageID int64, path string, redundancy models.TypedRedundancyInfo) (int64, string, error) { +func (svc *StorageService) StartStorageCreatePackage(userID int64, bucketID int64, name string, storageID int64, path string, redundancy models.TypedRedundancyInfo, nodeAffinity *int64) (int64, string, error) { coorCli, err := globals.CoordinatorMQPool.Acquire() if err != nil { return 0, "", fmt.Errorf("new coordinator client: %w", err) @@ -56,7 +56,7 @@ func (svc *StorageService) StartStorageCreatePackage(userID int64, bucketID int6 } defer agentCli.Close() - startResp, err := agentCli.StartStorageCreatePackage(agtmq.NewStartStorageCreatePackage(userID, bucketID, name, storageID, path, redundancy)) + startResp, err := agentCli.StartStorageCreatePackage(agtmq.NewStartStorageCreatePackage(userID, bucketID, name, storageID, path, redundancy, nodeAffinity)) if err != nil { return 0, "", fmt.Errorf("start storage upload package: %w", err) } diff --git a/client/internal/task/create_ec_package.go b/client/internal/task/create_ec_package.go index d181e1f..2452676 100644 --- a/client/internal/task/create_ec_package.go +++ b/client/internal/task/create_ec_package.go @@ -17,9 +17,9 @@ type CreateECPackage struct { Result *CreateECPackageResult } -func NewCreateECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.ECRedundancyInfo) *CreateECPackage { +func NewCreateECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.ECRedundancyInfo, nodeAffinity *int64) *CreateECPackage { return &CreateECPackage{ - cmd: *cmd.NewCreateECPackage(userID, bucketID, name, objIter, redundancy), + cmd: *cmd.NewCreateECPackage(userID, bucketID, name, objIter, redundancy, nodeAffinity), } } diff --git a/client/internal/task/create_rep_package.go b/client/internal/task/create_rep_package.go index b102a698..30c6131 100644 --- a/client/internal/task/create_rep_package.go +++ b/client/internal/task/create_rep_package.go @@ -17,9 +17,9 @@ type CreateRepPackage struct { Result *CreateRepPackageResult } -func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.RepRedundancyInfo) *CreateRepPackage { +func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.RepRedundancyInfo, nodeAffinity *int64) *CreateRepPackage { return &CreateRepPackage{ - cmd: *cmd.NewCreateRepPackage(userID, bucketID, name, objIter, redundancy), + cmd: *cmd.NewCreateRepPackage(userID, bucketID, name, objIter, redundancy, nodeAffinity), } } diff --git a/common/pkgs/cmd/create_ec_package.go b/common/pkgs/cmd/create_ec_package.go index 5193796..c6148c4 100644 --- a/common/pkgs/cmd/create_ec_package.go +++ b/common/pkgs/cmd/create_ec_package.go @@ -20,11 +20,12 @@ import ( ) type CreateECPackage struct { - userID int64 - bucketID int64 - name string - objectIter iterator.UploadingObjectIterator - redundancy models.ECRedundancyInfo + userID int64 + bucketID int64 + name string + objectIter iterator.UploadingObjectIterator + redundancy models.ECRedundancyInfo + nodeAffinity *int64 } type CreateECPackageResult struct { @@ -38,13 +39,14 @@ type ECObjectUploadResult struct { ObjectID int64 } -func NewCreateECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.ECRedundancyInfo) *CreateECPackage { +func NewCreateECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.ECRedundancyInfo, nodeAffinity *int64) *CreateECPackage { return &CreateECPackage{ - userID: userID, - bucketID: bucketID, - name: name, - objectIter: objIter, - redundancy: redundancy, + userID: userID, + bucketID: bucketID, + name: name, + objectIter: objIter, + redundancy: redundancy, + nodeAffinity: nodeAffinity, } } @@ -124,6 +126,7 @@ func (t *CreateECPackage) Execute(ctx *UpdatePackageContext) (*CreateECPackageRe } defer ipfsMutex.Unlock() + // TODO 需要支持设置节点亲和性 rets, err := uploadAndUpdateECPackage(createPkgResp.PackageID, t.objectIter, uploadNodeInfos, t.redundancy, getECResp.Config) if err != nil { return nil, err diff --git a/common/pkgs/cmd/create_rep_package.go b/common/pkgs/cmd/create_rep_package.go index 7fd4d8b..9814b14 100644 --- a/common/pkgs/cmd/create_rep_package.go +++ b/common/pkgs/cmd/create_rep_package.go @@ -25,11 +25,12 @@ type UploadNodeInfo struct { } type CreateRepPackage struct { - userID int64 - bucketID int64 - name string - objectIter iterator.UploadingObjectIterator - redundancy models.RepRedundancyInfo + userID int64 + bucketID int64 + name string + objectIter iterator.UploadingObjectIterator + redundancy models.RepRedundancyInfo + nodeAffinity *int64 } type UpdatePackageContext struct { @@ -48,13 +49,14 @@ type RepObjectUploadResult struct { ObjectID int64 } -func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.RepRedundancyInfo) *CreateRepPackage { +func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.RepRedundancyInfo, nodeAffinity *int64) *CreateRepPackage { return &CreateRepPackage{ - userID: userID, - bucketID: bucketID, - name: name, - objectIter: objIter, - redundancy: redundancy, + userID: userID, + bucketID: bucketID, + name: name, + objectIter: objIter, + redundancy: redundancy, + nodeAffinity: nodeAffinity, } } @@ -113,7 +115,7 @@ func (t *CreateRepPackage) Execute(ctx *UpdatePackageContext) (*CreateRepPackage IsSameLocation: node.LocationID == findCliLocResp.Location.LocationID, } }) - uploadNode := t.chooseUploadNode(nodeInfos) + uploadNode := t.chooseUploadNode(nodeInfos, t.nodeAffinity) // 防止上传的副本被清除 ipfsMutex, err := reqbuilder.NewBuilder(). @@ -214,9 +216,17 @@ func uploadFile(file io.Reader, uploadNode UploadNodeInfo) (string, error) { } // chooseUploadNode 选择一个上传文件的节点 -// 1. 从与当前客户端相同地域的节点中随机选一个 -// 2. 没有用的话从所有节点中随机选一个 -func (t *CreateRepPackage) chooseUploadNode(nodes []UploadNodeInfo) UploadNodeInfo { +// 1. 选择设置了亲和性的节点 +// 2. 从与当前客户端相同地域的节点中随机选一个 +// 3. 没有用的话从所有节点中随机选一个 +func (t *CreateRepPackage) chooseUploadNode(nodes []UploadNodeInfo, nodeAffinity *int64) UploadNodeInfo { + if nodeAffinity != nil { + aff, ok := lo.Find(nodes, func(node UploadNodeInfo) bool { return node.Node.NodeID == *nodeAffinity }) + if ok { + return aff + } + } + sameLocationNodes := lo.Filter(nodes, func(e UploadNodeInfo, i int) bool { return e.IsSameLocation }) if len(sameLocationNodes) > 0 { return sameLocationNodes[rand.Intn(len(sameLocationNodes))] diff --git a/common/pkgs/mq/agent/storage.go b/common/pkgs/mq/agent/storage.go index 8a95979..d2b96be 100644 --- a/common/pkgs/mq/agent/storage.go +++ b/common/pkgs/mq/agent/storage.go @@ -127,25 +127,27 @@ func (client *Client) StorageCheck(msg StorageCheck, opts ...mq.RequestOption) ( var _ = Register(StorageService.StartStorageCreatePackage) type StartStorageCreatePackage struct { - UserID int64 `json:"userID"` - BucketID int64 `json:"bucketID"` - Name string `json:"name"` - StorageID int64 `json:"storageID"` - Path string `json:"path"` - Redundancy models.TypedRedundancyInfo `json:"redundancy"` + UserID int64 `json:"userID"` + BucketID int64 `json:"bucketID"` + Name string `json:"name"` + StorageID int64 `json:"storageID"` + Path string `json:"path"` + Redundancy models.TypedRedundancyInfo `json:"redundancy"` + NodeAffinity *int64 `json:"nodeAffinity"` } type StartStorageCreatePackageResp struct { TaskID string `json:"taskID"` } -func NewStartStorageCreatePackage(userID int64, bucketID int64, name string, storageID int64, path string, redundancy models.TypedRedundancyInfo) StartStorageCreatePackage { +func NewStartStorageCreatePackage(userID int64, bucketID int64, name string, storageID int64, path string, redundancy models.TypedRedundancyInfo, nodeAffinity *int64) StartStorageCreatePackage { return StartStorageCreatePackage{ - UserID: userID, - BucketID: bucketID, - Name: name, - StorageID: storageID, - Path: path, - Redundancy: redundancy, + UserID: userID, + BucketID: bucketID, + Name: name, + StorageID: storageID, + Path: path, + Redundancy: redundancy, + NodeAffinity: nodeAffinity, } } func NewStartStorageCreatePackageResp(taskID string) StartStorageCreatePackageResp {