| @@ -203,7 +203,7 @@ func (svc *Service) StartStorageCreatePackage(msg *agtmq.StartStorageCreatePacka | |||
| return nil, mq.Failed(errorcode.OperationFailed, "get rep redundancy info failed") | |||
| } | |||
| tsk := svc.taskManager.StartNew(mytask.NewCreateRepPackage(msg.UserID, msg.BucketID, msg.Name, objIter, repInfo)) | |||
| tsk := svc.taskManager.StartNew(mytask.NewCreateRepPackage(msg.UserID, msg.BucketID, msg.Name, objIter, repInfo, msg.NodeAffinity)) | |||
| return mq.ReplyOK(agtmq.NewStartStorageCreatePackageResp(tsk.ID())) | |||
| } | |||
| @@ -214,7 +214,7 @@ func (svc *Service) StartStorageCreatePackage(msg *agtmq.StartStorageCreatePacka | |||
| return nil, mq.Failed(errorcode.OperationFailed, "get ec redundancy info failed") | |||
| } | |||
| tsk := svc.taskManager.StartNew(mytask.NewCreateECPackage(msg.UserID, msg.BucketID, msg.Name, objIter, ecInfo)) | |||
| tsk := svc.taskManager.StartNew(mytask.NewCreateECPackage(msg.UserID, msg.BucketID, msg.Name, objIter, ecInfo, msg.NodeAffinity)) | |||
| return mq.ReplyOK(agtmq.NewStartStorageCreatePackageResp(tsk.ID())) | |||
| } | |||
| @@ -18,9 +18,9 @@ type CreateECPackage struct { | |||
| Result *CreateECPackageResult | |||
| } | |||
| func NewCreateECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.ECRedundancyInfo) *CreateECPackage { | |||
| func NewCreateECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.ECRedundancyInfo, nodeAffinity *int64) *CreateECPackage { | |||
| return &CreateECPackage{ | |||
| cmd: *cmd.NewCreateECPackage(userID, bucketID, name, objIter, redundancy), | |||
| cmd: *cmd.NewCreateECPackage(userID, bucketID, name, objIter, redundancy, nodeAffinity), | |||
| } | |||
| } | |||
| @@ -18,9 +18,9 @@ type CreateRepPackage struct { | |||
| Result *CreateRepPackageResult | |||
| } | |||
| func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.RepRedundancyInfo) *CreateRepPackage { | |||
| func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.RepRedundancyInfo, nodeAffinity *int64) *CreateRepPackage { | |||
| return &CreateRepPackage{ | |||
| cmd: *cmd.NewCreateRepPackage(userID, bucketID, name, objIter, redundancy), | |||
| cmd: *cmd.NewCreateRepPackage(userID, bucketID, name, objIter, redundancy, nodeAffinity), | |||
| } | |||
| } | |||
| @@ -87,7 +87,7 @@ func PackageDownloadPackage(ctx CommandContext, outputDir string, packageID int6 | |||
| return nil | |||
| } | |||
| func PackageUploadRepPackage(ctx CommandContext, rootPath string, bucketID int64, name string, repCount int) error { | |||
| func PackageUploadRepPackage(ctx CommandContext, rootPath string, bucketID int64, name string, repCount int, nodeAffinity []int64) error { | |||
| rootPath = filepath.Clean(rootPath) | |||
| var uploadFilePathes []string | |||
| @@ -106,8 +106,13 @@ func PackageUploadRepPackage(ctx CommandContext, rootPath string, bucketID int64 | |||
| return fmt.Errorf("open directory %s failed, err: %w", rootPath, err) | |||
| } | |||
| var nodeAff *int64 | |||
| if len(nodeAffinity) > 0 { | |||
| nodeAff = &nodeAffinity[0] | |||
| } | |||
| objIter := iterator.NewUploadingObjectIterator(rootPath, uploadFilePathes) | |||
| taskID, err := ctx.Cmdline.Svc.PackageSvc().StartCreatingRepPackage(0, bucketID, name, objIter, models.NewRepRedundancyInfo(repCount)) | |||
| taskID, err := ctx.Cmdline.Svc.PackageSvc().StartCreatingRepPackage(0, bucketID, name, objIter, models.NewRepRedundancyInfo(repCount), nodeAff) | |||
| if err != nil { | |||
| return fmt.Errorf("upload file data failed, err: %w", err) | |||
| @@ -181,7 +186,7 @@ func PackageUpdateRepPackage(ctx CommandContext, packageID int64, rootPath strin | |||
| } | |||
| } | |||
| func PackageUploadECPackage(ctx CommandContext, rootPath string, bucketID int64, name string, ecName string) error { | |||
| func PackageUploadECPackage(ctx CommandContext, rootPath string, bucketID int64, name string, ecName string, nodeAffinity []int64) error { | |||
| var uploadFilePathes []string | |||
| err := filepath.WalkDir(rootPath, func(fname string, fi os.DirEntry, err error) error { | |||
| if err != nil { | |||
| @@ -198,8 +203,13 @@ func PackageUploadECPackage(ctx CommandContext, rootPath string, bucketID int64, | |||
| return fmt.Errorf("open directory %s failed, err: %w", rootPath, err) | |||
| } | |||
| var nodeAff *int64 | |||
| if len(nodeAffinity) > 0 { | |||
| nodeAff = &nodeAffinity[0] | |||
| } | |||
| objIter := iterator.NewUploadingObjectIterator(rootPath, uploadFilePathes) | |||
| taskID, err := ctx.Cmdline.Svc.PackageSvc().StartCreatingECPackage(0, bucketID, name, objIter, models.NewECRedundancyInfo(ecName, config.Cfg().ECPacketSize)) | |||
| taskID, err := ctx.Cmdline.Svc.PackageSvc().StartCreatingECPackage(0, bucketID, name, objIter, models.NewECRedundancyInfo(ecName, config.Cfg().ECPacketSize), nodeAff) | |||
| if err != nil { | |||
| return fmt.Errorf("upload file data failed, err: %w", err) | |||
| @@ -31,7 +31,7 @@ func StorageLoadPackage(ctx CommandContext, packageID int64, storageID int64) er | |||
| func StorageCreateRepPackage(ctx CommandContext, bucketID int64, name string, storageID int64, path string, repCount int) error { | |||
| nodeID, taskID, err := ctx.Cmdline.Svc.StorageSvc().StartStorageCreatePackage(0, bucketID, name, storageID, path, | |||
| models.NewTypedRepRedundancyInfo(repCount)) | |||
| models.NewTypedRepRedundancyInfo(repCount), nil) | |||
| if err != nil { | |||
| return fmt.Errorf("start storage uploading rep package: %w", err) | |||
| } | |||
| @@ -30,10 +30,11 @@ type PackageUploadReq struct { | |||
| } | |||
| type PackageUploadInfo struct { | |||
| UserID *int64 `json:"userID" binding:"required"` | |||
| BucketID *int64 `json:"bucketID" binding:"required"` | |||
| Name string `json:"name" binding:"required"` | |||
| Redundancy models.TypedRedundancyInfo `json:"redundancy" binding:"required"` | |||
| UserID *int64 `json:"userID" binding:"required"` | |||
| BucketID *int64 `json:"bucketID" binding:"required"` | |||
| Name string `json:"name" binding:"required"` | |||
| Redundancy models.TypedRedundancyInfo `json:"redundancy" binding:"required"` | |||
| NodeAffinity *int64 `json:"nodeAffinity"` | |||
| } | |||
| type PackageUploadResp struct { | |||
| @@ -76,7 +77,7 @@ func (s *PackageService) uploadRep(ctx *gin.Context, req *PackageUploadReq) { | |||
| objIter := mapMultiPartFileToUploadingObject(req.Files) | |||
| taskID, err := s.svc.PackageSvc().StartCreatingRepPackage(*req.Info.UserID, *req.Info.BucketID, req.Info.Name, objIter, repInfo) | |||
| taskID, err := s.svc.PackageSvc().StartCreatingRepPackage(*req.Info.UserID, *req.Info.BucketID, req.Info.Name, objIter, repInfo, req.Info.NodeAffinity) | |||
| if err != nil { | |||
| log.Warnf("start uploading rep package task: %s", err.Error()) | |||
| @@ -120,7 +121,7 @@ func (s *PackageService) uploadEC(ctx *gin.Context, req *PackageUploadReq) { | |||
| objIter := mapMultiPartFileToUploadingObject(req.Files) | |||
| taskID, err := s.svc.PackageSvc().StartCreatingECPackage(*req.Info.UserID, *req.Info.BucketID, req.Info.Name, objIter, ecInfo) | |||
| taskID, err := s.svc.PackageSvc().StartCreatingECPackage(*req.Info.UserID, *req.Info.BucketID, req.Info.Name, objIter, ecInfo, req.Info.NodeAffinity) | |||
| if err != nil { | |||
| log.Warnf("start uploading ec package task: %s", err.Error()) | |||
| @@ -65,12 +65,13 @@ func (s *StorageService) LoadPackage(ctx *gin.Context) { | |||
| } | |||
| type StorageCreatePackageReq struct { | |||
| UserID *int64 `json:"userID" binding:"required"` | |||
| StorageID *int64 `json:"storageID" binding:"required"` | |||
| Path string `json:"path" binding:"required"` | |||
| BucketID *int64 `json:"bucketID" binding:"required"` | |||
| Name string `json:"name" binding:"required"` | |||
| Redundancy models.TypedRedundancyInfo `json:"redundancy" binding:"required"` | |||
| UserID *int64 `json:"userID" binding:"required"` | |||
| StorageID *int64 `json:"storageID" binding:"required"` | |||
| Path string `json:"path" binding:"required"` | |||
| BucketID *int64 `json:"bucketID" binding:"required"` | |||
| Name string `json:"name" binding:"required"` | |||
| Redundancy models.TypedRedundancyInfo `json:"redundancy" binding:"required"` | |||
| NodeAffinity *int64 `json:"nodeAffinity"` | |||
| } | |||
| type StorageCreatePackageResp struct { | |||
| @@ -88,7 +89,7 @@ func (s *StorageService) CreatePackage(ctx *gin.Context) { | |||
| } | |||
| nodeID, taskID, err := s.svc.StorageSvc().StartStorageCreatePackage( | |||
| *req.UserID, *req.BucketID, req.Name, *req.StorageID, req.Path, req.Redundancy) | |||
| *req.UserID, *req.BucketID, req.Name, *req.StorageID, req.Path, req.Redundancy, req.NodeAffinity) | |||
| if err != nil { | |||
| log.Warnf("start storage create package: %s", err.Error()) | |||
| ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage create package failed")) | |||
| @@ -123,8 +123,8 @@ func (svc *PackageService) downloadECPackage(pkg model.Package, objects []model. | |||
| return iter, nil | |||
| } | |||
| func (svc *PackageService) StartCreatingRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, repInfo models.RepRedundancyInfo) (string, error) { | |||
| tsk := svc.TaskMgr.StartNew(mytask.NewCreateRepPackage(userID, bucketID, name, objIter, repInfo)) | |||
| func (svc *PackageService) StartCreatingRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, repInfo models.RepRedundancyInfo, nodeAffinity *int64) (string, error) { | |||
| tsk := svc.TaskMgr.StartNew(mytask.NewCreateRepPackage(userID, bucketID, name, objIter, repInfo, nodeAffinity)) | |||
| return tsk.ID(), nil | |||
| } | |||
| @@ -151,8 +151,8 @@ func (svc *PackageService) WaitUpdatingRepPackage(taskID string, waitTimeout tim | |||
| return false, nil, nil | |||
| } | |||
| func (svc *PackageService) StartCreatingECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, ecInfo models.ECRedundancyInfo) (string, error) { | |||
| tsk := svc.TaskMgr.StartNew(mytask.NewCreateECPackage(userID, bucketID, name, objIter, ecInfo)) | |||
| func (svc *PackageService) StartCreatingECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, ecInfo models.ECRedundancyInfo, nodeAffinity *int64) (string, error) { | |||
| tsk := svc.TaskMgr.StartNew(mytask.NewCreateECPackage(userID, bucketID, name, objIter, ecInfo, nodeAffinity)) | |||
| return tsk.ID(), nil | |||
| } | |||
| @@ -38,7 +38,7 @@ func (svc *StorageService) DeleteStoragePackage(userID int64, packageID int64, s | |||
| } | |||
| // 请求节点启动从Storage中上传文件的任务。会返回节点ID和任务ID | |||
| func (svc *StorageService) StartStorageCreatePackage(userID int64, bucketID int64, name string, storageID int64, path string, redundancy models.TypedRedundancyInfo) (int64, string, error) { | |||
| func (svc *StorageService) StartStorageCreatePackage(userID int64, bucketID int64, name string, storageID int64, path string, redundancy models.TypedRedundancyInfo, nodeAffinity *int64) (int64, string, error) { | |||
| coorCli, err := globals.CoordinatorMQPool.Acquire() | |||
| if err != nil { | |||
| return 0, "", fmt.Errorf("new coordinator client: %w", err) | |||
| @@ -56,7 +56,7 @@ func (svc *StorageService) StartStorageCreatePackage(userID int64, bucketID int6 | |||
| } | |||
| defer agentCli.Close() | |||
| startResp, err := agentCli.StartStorageCreatePackage(agtmq.NewStartStorageCreatePackage(userID, bucketID, name, storageID, path, redundancy)) | |||
| startResp, err := agentCli.StartStorageCreatePackage(agtmq.NewStartStorageCreatePackage(userID, bucketID, name, storageID, path, redundancy, nodeAffinity)) | |||
| if err != nil { | |||
| return 0, "", fmt.Errorf("start storage upload package: %w", err) | |||
| } | |||
| @@ -17,9 +17,9 @@ type CreateECPackage struct { | |||
| Result *CreateECPackageResult | |||
| } | |||
| func NewCreateECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.ECRedundancyInfo) *CreateECPackage { | |||
| func NewCreateECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.ECRedundancyInfo, nodeAffinity *int64) *CreateECPackage { | |||
| return &CreateECPackage{ | |||
| cmd: *cmd.NewCreateECPackage(userID, bucketID, name, objIter, redundancy), | |||
| cmd: *cmd.NewCreateECPackage(userID, bucketID, name, objIter, redundancy, nodeAffinity), | |||
| } | |||
| } | |||
| @@ -17,9 +17,9 @@ type CreateRepPackage struct { | |||
| Result *CreateRepPackageResult | |||
| } | |||
| func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.RepRedundancyInfo) *CreateRepPackage { | |||
| func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.RepRedundancyInfo, nodeAffinity *int64) *CreateRepPackage { | |||
| return &CreateRepPackage{ | |||
| cmd: *cmd.NewCreateRepPackage(userID, bucketID, name, objIter, redundancy), | |||
| cmd: *cmd.NewCreateRepPackage(userID, bucketID, name, objIter, redundancy, nodeAffinity), | |||
| } | |||
| } | |||
| @@ -20,11 +20,12 @@ import ( | |||
| ) | |||
| type CreateECPackage struct { | |||
| userID int64 | |||
| bucketID int64 | |||
| name string | |||
| objectIter iterator.UploadingObjectIterator | |||
| redundancy models.ECRedundancyInfo | |||
| userID int64 | |||
| bucketID int64 | |||
| name string | |||
| objectIter iterator.UploadingObjectIterator | |||
| redundancy models.ECRedundancyInfo | |||
| nodeAffinity *int64 | |||
| } | |||
| type CreateECPackageResult struct { | |||
| @@ -38,13 +39,14 @@ type ECObjectUploadResult struct { | |||
| ObjectID int64 | |||
| } | |||
| func NewCreateECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.ECRedundancyInfo) *CreateECPackage { | |||
| func NewCreateECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.ECRedundancyInfo, nodeAffinity *int64) *CreateECPackage { | |||
| return &CreateECPackage{ | |||
| userID: userID, | |||
| bucketID: bucketID, | |||
| name: name, | |||
| objectIter: objIter, | |||
| redundancy: redundancy, | |||
| userID: userID, | |||
| bucketID: bucketID, | |||
| name: name, | |||
| objectIter: objIter, | |||
| redundancy: redundancy, | |||
| nodeAffinity: nodeAffinity, | |||
| } | |||
| } | |||
| @@ -124,6 +126,7 @@ func (t *CreateECPackage) Execute(ctx *UpdatePackageContext) (*CreateECPackageRe | |||
| } | |||
| defer ipfsMutex.Unlock() | |||
| // TODO 需要支持设置节点亲和性 | |||
| rets, err := uploadAndUpdateECPackage(createPkgResp.PackageID, t.objectIter, uploadNodeInfos, t.redundancy, getECResp.Config) | |||
| if err != nil { | |||
| return nil, err | |||
| @@ -25,11 +25,12 @@ type UploadNodeInfo struct { | |||
| } | |||
| type CreateRepPackage struct { | |||
| userID int64 | |||
| bucketID int64 | |||
| name string | |||
| objectIter iterator.UploadingObjectIterator | |||
| redundancy models.RepRedundancyInfo | |||
| userID int64 | |||
| bucketID int64 | |||
| name string | |||
| objectIter iterator.UploadingObjectIterator | |||
| redundancy models.RepRedundancyInfo | |||
| nodeAffinity *int64 | |||
| } | |||
| type UpdatePackageContext struct { | |||
| @@ -48,13 +49,14 @@ type RepObjectUploadResult struct { | |||
| ObjectID int64 | |||
| } | |||
| func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.RepRedundancyInfo) *CreateRepPackage { | |||
| func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.RepRedundancyInfo, nodeAffinity *int64) *CreateRepPackage { | |||
| return &CreateRepPackage{ | |||
| userID: userID, | |||
| bucketID: bucketID, | |||
| name: name, | |||
| objectIter: objIter, | |||
| redundancy: redundancy, | |||
| userID: userID, | |||
| bucketID: bucketID, | |||
| name: name, | |||
| objectIter: objIter, | |||
| redundancy: redundancy, | |||
| nodeAffinity: nodeAffinity, | |||
| } | |||
| } | |||
| @@ -113,7 +115,7 @@ func (t *CreateRepPackage) Execute(ctx *UpdatePackageContext) (*CreateRepPackage | |||
| IsSameLocation: node.LocationID == findCliLocResp.Location.LocationID, | |||
| } | |||
| }) | |||
| uploadNode := t.chooseUploadNode(nodeInfos) | |||
| uploadNode := t.chooseUploadNode(nodeInfos, t.nodeAffinity) | |||
| // 防止上传的副本被清除 | |||
| ipfsMutex, err := reqbuilder.NewBuilder(). | |||
| @@ -214,9 +216,17 @@ func uploadFile(file io.Reader, uploadNode UploadNodeInfo) (string, error) { | |||
| } | |||
| // chooseUploadNode 选择一个上传文件的节点 | |||
| // 1. 从与当前客户端相同地域的节点中随机选一个 | |||
| // 2. 没有用的话从所有节点中随机选一个 | |||
| func (t *CreateRepPackage) chooseUploadNode(nodes []UploadNodeInfo) UploadNodeInfo { | |||
| // 1. 选择设置了亲和性的节点 | |||
| // 2. 从与当前客户端相同地域的节点中随机选一个 | |||
| // 3. 没有用的话从所有节点中随机选一个 | |||
| func (t *CreateRepPackage) chooseUploadNode(nodes []UploadNodeInfo, nodeAffinity *int64) UploadNodeInfo { | |||
| if nodeAffinity != nil { | |||
| aff, ok := lo.Find(nodes, func(node UploadNodeInfo) bool { return node.Node.NodeID == *nodeAffinity }) | |||
| if ok { | |||
| return aff | |||
| } | |||
| } | |||
| sameLocationNodes := lo.Filter(nodes, func(e UploadNodeInfo, i int) bool { return e.IsSameLocation }) | |||
| if len(sameLocationNodes) > 0 { | |||
| return sameLocationNodes[rand.Intn(len(sameLocationNodes))] | |||
| @@ -127,25 +127,27 @@ func (client *Client) StorageCheck(msg StorageCheck, opts ...mq.RequestOption) ( | |||
| var _ = Register(StorageService.StartStorageCreatePackage) | |||
| type StartStorageCreatePackage struct { | |||
| UserID int64 `json:"userID"` | |||
| BucketID int64 `json:"bucketID"` | |||
| Name string `json:"name"` | |||
| StorageID int64 `json:"storageID"` | |||
| Path string `json:"path"` | |||
| Redundancy models.TypedRedundancyInfo `json:"redundancy"` | |||
| UserID int64 `json:"userID"` | |||
| BucketID int64 `json:"bucketID"` | |||
| Name string `json:"name"` | |||
| StorageID int64 `json:"storageID"` | |||
| Path string `json:"path"` | |||
| Redundancy models.TypedRedundancyInfo `json:"redundancy"` | |||
| NodeAffinity *int64 `json:"nodeAffinity"` | |||
| } | |||
| type StartStorageCreatePackageResp struct { | |||
| TaskID string `json:"taskID"` | |||
| } | |||
| func NewStartStorageCreatePackage(userID int64, bucketID int64, name string, storageID int64, path string, redundancy models.TypedRedundancyInfo) StartStorageCreatePackage { | |||
| func NewStartStorageCreatePackage(userID int64, bucketID int64, name string, storageID int64, path string, redundancy models.TypedRedundancyInfo, nodeAffinity *int64) StartStorageCreatePackage { | |||
| return StartStorageCreatePackage{ | |||
| UserID: userID, | |||
| BucketID: bucketID, | |||
| Name: name, | |||
| StorageID: storageID, | |||
| Path: path, | |||
| Redundancy: redundancy, | |||
| UserID: userID, | |||
| BucketID: bucketID, | |||
| Name: name, | |||
| StorageID: storageID, | |||
| Path: path, | |||
| Redundancy: redundancy, | |||
| NodeAffinity: nodeAffinity, | |||
| } | |||
| } | |||
| func NewStartStorageCreatePackageResp(taskID string) StartStorageCreatePackageResp { | |||