| @@ -203,7 +203,7 @@ func (svc *Service) StartStorageCreatePackage(msg *agtmq.StartStorageCreatePacka | |||||
| return nil, mq.Failed(errorcode.OperationFailed, "get rep redundancy info failed") | return nil, mq.Failed(errorcode.OperationFailed, "get rep redundancy info failed") | ||||
| } | } | ||||
| tsk := svc.taskManager.StartNew(mytask.NewCreateRepPackage(msg.UserID, msg.BucketID, msg.Name, objIter, repInfo)) | |||||
| tsk := svc.taskManager.StartNew(mytask.NewCreateRepPackage(msg.UserID, msg.BucketID, msg.Name, objIter, repInfo, msg.NodeAffinity)) | |||||
| return mq.ReplyOK(agtmq.NewStartStorageCreatePackageResp(tsk.ID())) | return mq.ReplyOK(agtmq.NewStartStorageCreatePackageResp(tsk.ID())) | ||||
| } | } | ||||
| @@ -214,7 +214,7 @@ func (svc *Service) StartStorageCreatePackage(msg *agtmq.StartStorageCreatePacka | |||||
| return nil, mq.Failed(errorcode.OperationFailed, "get ec redundancy info failed") | return nil, mq.Failed(errorcode.OperationFailed, "get ec redundancy info failed") | ||||
| } | } | ||||
| tsk := svc.taskManager.StartNew(mytask.NewCreateECPackage(msg.UserID, msg.BucketID, msg.Name, objIter, ecInfo)) | |||||
| tsk := svc.taskManager.StartNew(mytask.NewCreateECPackage(msg.UserID, msg.BucketID, msg.Name, objIter, ecInfo, msg.NodeAffinity)) | |||||
| return mq.ReplyOK(agtmq.NewStartStorageCreatePackageResp(tsk.ID())) | return mq.ReplyOK(agtmq.NewStartStorageCreatePackageResp(tsk.ID())) | ||||
| } | } | ||||
| @@ -18,9 +18,9 @@ type CreateECPackage struct { | |||||
| Result *CreateECPackageResult | Result *CreateECPackageResult | ||||
| } | } | ||||
| func NewCreateECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.ECRedundancyInfo) *CreateECPackage { | |||||
| func NewCreateECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.ECRedundancyInfo, nodeAffinity *int64) *CreateECPackage { | |||||
| return &CreateECPackage{ | return &CreateECPackage{ | ||||
| cmd: *cmd.NewCreateECPackage(userID, bucketID, name, objIter, redundancy), | |||||
| cmd: *cmd.NewCreateECPackage(userID, bucketID, name, objIter, redundancy, nodeAffinity), | |||||
| } | } | ||||
| } | } | ||||
| @@ -18,9 +18,9 @@ type CreateRepPackage struct { | |||||
| Result *CreateRepPackageResult | Result *CreateRepPackageResult | ||||
| } | } | ||||
| func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.RepRedundancyInfo) *CreateRepPackage { | |||||
| func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.RepRedundancyInfo, nodeAffinity *int64) *CreateRepPackage { | |||||
| return &CreateRepPackage{ | return &CreateRepPackage{ | ||||
| cmd: *cmd.NewCreateRepPackage(userID, bucketID, name, objIter, redundancy), | |||||
| cmd: *cmd.NewCreateRepPackage(userID, bucketID, name, objIter, redundancy, nodeAffinity), | |||||
| } | } | ||||
| } | } | ||||
| @@ -87,7 +87,7 @@ func PackageDownloadPackage(ctx CommandContext, outputDir string, packageID int6 | |||||
| return nil | return nil | ||||
| } | } | ||||
| func PackageUploadRepPackage(ctx CommandContext, rootPath string, bucketID int64, name string, repCount int) error { | |||||
| func PackageUploadRepPackage(ctx CommandContext, rootPath string, bucketID int64, name string, repCount int, nodeAffinity []int64) error { | |||||
| rootPath = filepath.Clean(rootPath) | rootPath = filepath.Clean(rootPath) | ||||
| var uploadFilePathes []string | var uploadFilePathes []string | ||||
| @@ -106,8 +106,13 @@ func PackageUploadRepPackage(ctx CommandContext, rootPath string, bucketID int64 | |||||
| return fmt.Errorf("open directory %s failed, err: %w", rootPath, err) | return fmt.Errorf("open directory %s failed, err: %w", rootPath, err) | ||||
| } | } | ||||
| var nodeAff *int64 | |||||
| if len(nodeAffinity) > 0 { | |||||
| nodeAff = &nodeAffinity[0] | |||||
| } | |||||
| objIter := iterator.NewUploadingObjectIterator(rootPath, uploadFilePathes) | objIter := iterator.NewUploadingObjectIterator(rootPath, uploadFilePathes) | ||||
| taskID, err := ctx.Cmdline.Svc.PackageSvc().StartCreatingRepPackage(0, bucketID, name, objIter, models.NewRepRedundancyInfo(repCount)) | |||||
| taskID, err := ctx.Cmdline.Svc.PackageSvc().StartCreatingRepPackage(0, bucketID, name, objIter, models.NewRepRedundancyInfo(repCount), nodeAff) | |||||
| if err != nil { | if err != nil { | ||||
| return fmt.Errorf("upload file data failed, err: %w", err) | return fmt.Errorf("upload file data failed, err: %w", err) | ||||
| @@ -181,7 +186,7 @@ func PackageUpdateRepPackage(ctx CommandContext, packageID int64, rootPath strin | |||||
| } | } | ||||
| } | } | ||||
| func PackageUploadECPackage(ctx CommandContext, rootPath string, bucketID int64, name string, ecName string) error { | |||||
| func PackageUploadECPackage(ctx CommandContext, rootPath string, bucketID int64, name string, ecName string, nodeAffinity []int64) error { | |||||
| var uploadFilePathes []string | var uploadFilePathes []string | ||||
| err := filepath.WalkDir(rootPath, func(fname string, fi os.DirEntry, err error) error { | err := filepath.WalkDir(rootPath, func(fname string, fi os.DirEntry, err error) error { | ||||
| if err != nil { | if err != nil { | ||||
| @@ -198,8 +203,13 @@ func PackageUploadECPackage(ctx CommandContext, rootPath string, bucketID int64, | |||||
| return fmt.Errorf("open directory %s failed, err: %w", rootPath, err) | return fmt.Errorf("open directory %s failed, err: %w", rootPath, err) | ||||
| } | } | ||||
| var nodeAff *int64 | |||||
| if len(nodeAffinity) > 0 { | |||||
| nodeAff = &nodeAffinity[0] | |||||
| } | |||||
| objIter := iterator.NewUploadingObjectIterator(rootPath, uploadFilePathes) | objIter := iterator.NewUploadingObjectIterator(rootPath, uploadFilePathes) | ||||
| taskID, err := ctx.Cmdline.Svc.PackageSvc().StartCreatingECPackage(0, bucketID, name, objIter, models.NewECRedundancyInfo(ecName, config.Cfg().ECPacketSize)) | |||||
| taskID, err := ctx.Cmdline.Svc.PackageSvc().StartCreatingECPackage(0, bucketID, name, objIter, models.NewECRedundancyInfo(ecName, config.Cfg().ECPacketSize), nodeAff) | |||||
| if err != nil { | if err != nil { | ||||
| return fmt.Errorf("upload file data failed, err: %w", err) | return fmt.Errorf("upload file data failed, err: %w", err) | ||||
| @@ -31,7 +31,7 @@ func StorageLoadPackage(ctx CommandContext, packageID int64, storageID int64) er | |||||
| func StorageCreateRepPackage(ctx CommandContext, bucketID int64, name string, storageID int64, path string, repCount int) error { | func StorageCreateRepPackage(ctx CommandContext, bucketID int64, name string, storageID int64, path string, repCount int) error { | ||||
| nodeID, taskID, err := ctx.Cmdline.Svc.StorageSvc().StartStorageCreatePackage(0, bucketID, name, storageID, path, | nodeID, taskID, err := ctx.Cmdline.Svc.StorageSvc().StartStorageCreatePackage(0, bucketID, name, storageID, path, | ||||
| models.NewTypedRepRedundancyInfo(repCount)) | |||||
| models.NewTypedRepRedundancyInfo(repCount), nil) | |||||
| if err != nil { | if err != nil { | ||||
| return fmt.Errorf("start storage uploading rep package: %w", err) | return fmt.Errorf("start storage uploading rep package: %w", err) | ||||
| } | } | ||||
| @@ -30,10 +30,11 @@ type PackageUploadReq struct { | |||||
| } | } | ||||
| type PackageUploadInfo struct { | type PackageUploadInfo struct { | ||||
| UserID *int64 `json:"userID" binding:"required"` | |||||
| BucketID *int64 `json:"bucketID" binding:"required"` | |||||
| Name string `json:"name" binding:"required"` | |||||
| Redundancy models.TypedRedundancyInfo `json:"redundancy" binding:"required"` | |||||
| UserID *int64 `json:"userID" binding:"required"` | |||||
| BucketID *int64 `json:"bucketID" binding:"required"` | |||||
| Name string `json:"name" binding:"required"` | |||||
| Redundancy models.TypedRedundancyInfo `json:"redundancy" binding:"required"` | |||||
| NodeAffinity *int64 `json:"nodeAffinity"` | |||||
| } | } | ||||
| type PackageUploadResp struct { | type PackageUploadResp struct { | ||||
| @@ -76,7 +77,7 @@ func (s *PackageService) uploadRep(ctx *gin.Context, req *PackageUploadReq) { | |||||
| objIter := mapMultiPartFileToUploadingObject(req.Files) | objIter := mapMultiPartFileToUploadingObject(req.Files) | ||||
| taskID, err := s.svc.PackageSvc().StartCreatingRepPackage(*req.Info.UserID, *req.Info.BucketID, req.Info.Name, objIter, repInfo) | |||||
| taskID, err := s.svc.PackageSvc().StartCreatingRepPackage(*req.Info.UserID, *req.Info.BucketID, req.Info.Name, objIter, repInfo, req.Info.NodeAffinity) | |||||
| if err != nil { | if err != nil { | ||||
| log.Warnf("start uploading rep package task: %s", err.Error()) | log.Warnf("start uploading rep package task: %s", err.Error()) | ||||
| @@ -120,7 +121,7 @@ func (s *PackageService) uploadEC(ctx *gin.Context, req *PackageUploadReq) { | |||||
| objIter := mapMultiPartFileToUploadingObject(req.Files) | objIter := mapMultiPartFileToUploadingObject(req.Files) | ||||
| taskID, err := s.svc.PackageSvc().StartCreatingECPackage(*req.Info.UserID, *req.Info.BucketID, req.Info.Name, objIter, ecInfo) | |||||
| taskID, err := s.svc.PackageSvc().StartCreatingECPackage(*req.Info.UserID, *req.Info.BucketID, req.Info.Name, objIter, ecInfo, req.Info.NodeAffinity) | |||||
| if err != nil { | if err != nil { | ||||
| log.Warnf("start uploading ec package task: %s", err.Error()) | log.Warnf("start uploading ec package task: %s", err.Error()) | ||||
| @@ -65,12 +65,13 @@ func (s *StorageService) LoadPackage(ctx *gin.Context) { | |||||
| } | } | ||||
| type StorageCreatePackageReq struct { | type StorageCreatePackageReq struct { | ||||
| UserID *int64 `json:"userID" binding:"required"` | |||||
| StorageID *int64 `json:"storageID" binding:"required"` | |||||
| Path string `json:"path" binding:"required"` | |||||
| BucketID *int64 `json:"bucketID" binding:"required"` | |||||
| Name string `json:"name" binding:"required"` | |||||
| Redundancy models.TypedRedundancyInfo `json:"redundancy" binding:"required"` | |||||
| UserID *int64 `json:"userID" binding:"required"` | |||||
| StorageID *int64 `json:"storageID" binding:"required"` | |||||
| Path string `json:"path" binding:"required"` | |||||
| BucketID *int64 `json:"bucketID" binding:"required"` | |||||
| Name string `json:"name" binding:"required"` | |||||
| Redundancy models.TypedRedundancyInfo `json:"redundancy" binding:"required"` | |||||
| NodeAffinity *int64 `json:"nodeAffinity"` | |||||
| } | } | ||||
| type StorageCreatePackageResp struct { | type StorageCreatePackageResp struct { | ||||
| @@ -88,7 +89,7 @@ func (s *StorageService) CreatePackage(ctx *gin.Context) { | |||||
| } | } | ||||
| nodeID, taskID, err := s.svc.StorageSvc().StartStorageCreatePackage( | nodeID, taskID, err := s.svc.StorageSvc().StartStorageCreatePackage( | ||||
| *req.UserID, *req.BucketID, req.Name, *req.StorageID, req.Path, req.Redundancy) | |||||
| *req.UserID, *req.BucketID, req.Name, *req.StorageID, req.Path, req.Redundancy, req.NodeAffinity) | |||||
| if err != nil { | if err != nil { | ||||
| log.Warnf("start storage create package: %s", err.Error()) | log.Warnf("start storage create package: %s", err.Error()) | ||||
| ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage create package failed")) | ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage create package failed")) | ||||
| @@ -123,8 +123,8 @@ func (svc *PackageService) downloadECPackage(pkg model.Package, objects []model. | |||||
| return iter, nil | return iter, nil | ||||
| } | } | ||||
| func (svc *PackageService) StartCreatingRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, repInfo models.RepRedundancyInfo) (string, error) { | |||||
| tsk := svc.TaskMgr.StartNew(mytask.NewCreateRepPackage(userID, bucketID, name, objIter, repInfo)) | |||||
| func (svc *PackageService) StartCreatingRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, repInfo models.RepRedundancyInfo, nodeAffinity *int64) (string, error) { | |||||
| tsk := svc.TaskMgr.StartNew(mytask.NewCreateRepPackage(userID, bucketID, name, objIter, repInfo, nodeAffinity)) | |||||
| return tsk.ID(), nil | return tsk.ID(), nil | ||||
| } | } | ||||
| @@ -151,8 +151,8 @@ func (svc *PackageService) WaitUpdatingRepPackage(taskID string, waitTimeout tim | |||||
| return false, nil, nil | return false, nil, nil | ||||
| } | } | ||||
| func (svc *PackageService) StartCreatingECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, ecInfo models.ECRedundancyInfo) (string, error) { | |||||
| tsk := svc.TaskMgr.StartNew(mytask.NewCreateECPackage(userID, bucketID, name, objIter, ecInfo)) | |||||
| func (svc *PackageService) StartCreatingECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, ecInfo models.ECRedundancyInfo, nodeAffinity *int64) (string, error) { | |||||
| tsk := svc.TaskMgr.StartNew(mytask.NewCreateECPackage(userID, bucketID, name, objIter, ecInfo, nodeAffinity)) | |||||
| return tsk.ID(), nil | return tsk.ID(), nil | ||||
| } | } | ||||
| @@ -38,7 +38,7 @@ func (svc *StorageService) DeleteStoragePackage(userID int64, packageID int64, s | |||||
| } | } | ||||
| // 请求节点启动从Storage中上传文件的任务。会返回节点ID和任务ID | // 请求节点启动从Storage中上传文件的任务。会返回节点ID和任务ID | ||||
| func (svc *StorageService) StartStorageCreatePackage(userID int64, bucketID int64, name string, storageID int64, path string, redundancy models.TypedRedundancyInfo) (int64, string, error) { | |||||
| func (svc *StorageService) StartStorageCreatePackage(userID int64, bucketID int64, name string, storageID int64, path string, redundancy models.TypedRedundancyInfo, nodeAffinity *int64) (int64, string, error) { | |||||
| coorCli, err := globals.CoordinatorMQPool.Acquire() | coorCli, err := globals.CoordinatorMQPool.Acquire() | ||||
| if err != nil { | if err != nil { | ||||
| return 0, "", fmt.Errorf("new coordinator client: %w", err) | return 0, "", fmt.Errorf("new coordinator client: %w", err) | ||||
| @@ -56,7 +56,7 @@ func (svc *StorageService) StartStorageCreatePackage(userID int64, bucketID int6 | |||||
| } | } | ||||
| defer agentCli.Close() | defer agentCli.Close() | ||||
| startResp, err := agentCli.StartStorageCreatePackage(agtmq.NewStartStorageCreatePackage(userID, bucketID, name, storageID, path, redundancy)) | |||||
| startResp, err := agentCli.StartStorageCreatePackage(agtmq.NewStartStorageCreatePackage(userID, bucketID, name, storageID, path, redundancy, nodeAffinity)) | |||||
| if err != nil { | if err != nil { | ||||
| return 0, "", fmt.Errorf("start storage upload package: %w", err) | return 0, "", fmt.Errorf("start storage upload package: %w", err) | ||||
| } | } | ||||
| @@ -17,9 +17,9 @@ type CreateECPackage struct { | |||||
| Result *CreateECPackageResult | Result *CreateECPackageResult | ||||
| } | } | ||||
| func NewCreateECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.ECRedundancyInfo) *CreateECPackage { | |||||
| func NewCreateECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.ECRedundancyInfo, nodeAffinity *int64) *CreateECPackage { | |||||
| return &CreateECPackage{ | return &CreateECPackage{ | ||||
| cmd: *cmd.NewCreateECPackage(userID, bucketID, name, objIter, redundancy), | |||||
| cmd: *cmd.NewCreateECPackage(userID, bucketID, name, objIter, redundancy, nodeAffinity), | |||||
| } | } | ||||
| } | } | ||||
| @@ -17,9 +17,9 @@ type CreateRepPackage struct { | |||||
| Result *CreateRepPackageResult | Result *CreateRepPackageResult | ||||
| } | } | ||||
| func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.RepRedundancyInfo) *CreateRepPackage { | |||||
| func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.RepRedundancyInfo, nodeAffinity *int64) *CreateRepPackage { | |||||
| return &CreateRepPackage{ | return &CreateRepPackage{ | ||||
| cmd: *cmd.NewCreateRepPackage(userID, bucketID, name, objIter, redundancy), | |||||
| cmd: *cmd.NewCreateRepPackage(userID, bucketID, name, objIter, redundancy, nodeAffinity), | |||||
| } | } | ||||
| } | } | ||||
| @@ -20,11 +20,12 @@ import ( | |||||
| ) | ) | ||||
| type CreateECPackage struct { | type CreateECPackage struct { | ||||
| userID int64 | |||||
| bucketID int64 | |||||
| name string | |||||
| objectIter iterator.UploadingObjectIterator | |||||
| redundancy models.ECRedundancyInfo | |||||
| userID int64 | |||||
| bucketID int64 | |||||
| name string | |||||
| objectIter iterator.UploadingObjectIterator | |||||
| redundancy models.ECRedundancyInfo | |||||
| nodeAffinity *int64 | |||||
| } | } | ||||
| type CreateECPackageResult struct { | type CreateECPackageResult struct { | ||||
| @@ -38,13 +39,14 @@ type ECObjectUploadResult struct { | |||||
| ObjectID int64 | ObjectID int64 | ||||
| } | } | ||||
| func NewCreateECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.ECRedundancyInfo) *CreateECPackage { | |||||
| func NewCreateECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.ECRedundancyInfo, nodeAffinity *int64) *CreateECPackage { | |||||
| return &CreateECPackage{ | return &CreateECPackage{ | ||||
| userID: userID, | |||||
| bucketID: bucketID, | |||||
| name: name, | |||||
| objectIter: objIter, | |||||
| redundancy: redundancy, | |||||
| userID: userID, | |||||
| bucketID: bucketID, | |||||
| name: name, | |||||
| objectIter: objIter, | |||||
| redundancy: redundancy, | |||||
| nodeAffinity: nodeAffinity, | |||||
| } | } | ||||
| } | } | ||||
| @@ -124,6 +126,7 @@ func (t *CreateECPackage) Execute(ctx *UpdatePackageContext) (*CreateECPackageRe | |||||
| } | } | ||||
| defer ipfsMutex.Unlock() | defer ipfsMutex.Unlock() | ||||
| // TODO 需要支持设置节点亲和性 | |||||
| rets, err := uploadAndUpdateECPackage(createPkgResp.PackageID, t.objectIter, uploadNodeInfos, t.redundancy, getECResp.Config) | rets, err := uploadAndUpdateECPackage(createPkgResp.PackageID, t.objectIter, uploadNodeInfos, t.redundancy, getECResp.Config) | ||||
| if err != nil { | if err != nil { | ||||
| return nil, err | return nil, err | ||||
| @@ -25,11 +25,12 @@ type UploadNodeInfo struct { | |||||
| } | } | ||||
| type CreateRepPackage struct { | type CreateRepPackage struct { | ||||
| userID int64 | |||||
| bucketID int64 | |||||
| name string | |||||
| objectIter iterator.UploadingObjectIterator | |||||
| redundancy models.RepRedundancyInfo | |||||
| userID int64 | |||||
| bucketID int64 | |||||
| name string | |||||
| objectIter iterator.UploadingObjectIterator | |||||
| redundancy models.RepRedundancyInfo | |||||
| nodeAffinity *int64 | |||||
| } | } | ||||
| type UpdatePackageContext struct { | type UpdatePackageContext struct { | ||||
| @@ -48,13 +49,14 @@ type RepObjectUploadResult struct { | |||||
| ObjectID int64 | ObjectID int64 | ||||
| } | } | ||||
| func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.RepRedundancyInfo) *CreateRepPackage { | |||||
| func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.RepRedundancyInfo, nodeAffinity *int64) *CreateRepPackage { | |||||
| return &CreateRepPackage{ | return &CreateRepPackage{ | ||||
| userID: userID, | |||||
| bucketID: bucketID, | |||||
| name: name, | |||||
| objectIter: objIter, | |||||
| redundancy: redundancy, | |||||
| userID: userID, | |||||
| bucketID: bucketID, | |||||
| name: name, | |||||
| objectIter: objIter, | |||||
| redundancy: redundancy, | |||||
| nodeAffinity: nodeAffinity, | |||||
| } | } | ||||
| } | } | ||||
| @@ -113,7 +115,7 @@ func (t *CreateRepPackage) Execute(ctx *UpdatePackageContext) (*CreateRepPackage | |||||
| IsSameLocation: node.LocationID == findCliLocResp.Location.LocationID, | IsSameLocation: node.LocationID == findCliLocResp.Location.LocationID, | ||||
| } | } | ||||
| }) | }) | ||||
| uploadNode := t.chooseUploadNode(nodeInfos) | |||||
| uploadNode := t.chooseUploadNode(nodeInfos, t.nodeAffinity) | |||||
| // 防止上传的副本被清除 | // 防止上传的副本被清除 | ||||
| ipfsMutex, err := reqbuilder.NewBuilder(). | ipfsMutex, err := reqbuilder.NewBuilder(). | ||||
| @@ -214,9 +216,17 @@ func uploadFile(file io.Reader, uploadNode UploadNodeInfo) (string, error) { | |||||
| } | } | ||||
| // chooseUploadNode 选择一个上传文件的节点 | // chooseUploadNode 选择一个上传文件的节点 | ||||
| // 1. 从与当前客户端相同地域的节点中随机选一个 | |||||
| // 2. 没有用的话从所有节点中随机选一个 | |||||
| func (t *CreateRepPackage) chooseUploadNode(nodes []UploadNodeInfo) UploadNodeInfo { | |||||
| // 1. 选择设置了亲和性的节点 | |||||
| // 2. 从与当前客户端相同地域的节点中随机选一个 | |||||
| // 3. 没有用的话从所有节点中随机选一个 | |||||
| func (t *CreateRepPackage) chooseUploadNode(nodes []UploadNodeInfo, nodeAffinity *int64) UploadNodeInfo { | |||||
| if nodeAffinity != nil { | |||||
| aff, ok := lo.Find(nodes, func(node UploadNodeInfo) bool { return node.Node.NodeID == *nodeAffinity }) | |||||
| if ok { | |||||
| return aff | |||||
| } | |||||
| } | |||||
| sameLocationNodes := lo.Filter(nodes, func(e UploadNodeInfo, i int) bool { return e.IsSameLocation }) | sameLocationNodes := lo.Filter(nodes, func(e UploadNodeInfo, i int) bool { return e.IsSameLocation }) | ||||
| if len(sameLocationNodes) > 0 { | if len(sameLocationNodes) > 0 { | ||||
| return sameLocationNodes[rand.Intn(len(sameLocationNodes))] | return sameLocationNodes[rand.Intn(len(sameLocationNodes))] | ||||
| @@ -127,25 +127,27 @@ func (client *Client) StorageCheck(msg StorageCheck, opts ...mq.RequestOption) ( | |||||
| var _ = Register(StorageService.StartStorageCreatePackage) | var _ = Register(StorageService.StartStorageCreatePackage) | ||||
| type StartStorageCreatePackage struct { | type StartStorageCreatePackage struct { | ||||
| UserID int64 `json:"userID"` | |||||
| BucketID int64 `json:"bucketID"` | |||||
| Name string `json:"name"` | |||||
| StorageID int64 `json:"storageID"` | |||||
| Path string `json:"path"` | |||||
| Redundancy models.TypedRedundancyInfo `json:"redundancy"` | |||||
| UserID int64 `json:"userID"` | |||||
| BucketID int64 `json:"bucketID"` | |||||
| Name string `json:"name"` | |||||
| StorageID int64 `json:"storageID"` | |||||
| Path string `json:"path"` | |||||
| Redundancy models.TypedRedundancyInfo `json:"redundancy"` | |||||
| NodeAffinity *int64 `json:"nodeAffinity"` | |||||
| } | } | ||||
| type StartStorageCreatePackageResp struct { | type StartStorageCreatePackageResp struct { | ||||
| TaskID string `json:"taskID"` | TaskID string `json:"taskID"` | ||||
| } | } | ||||
| func NewStartStorageCreatePackage(userID int64, bucketID int64, name string, storageID int64, path string, redundancy models.TypedRedundancyInfo) StartStorageCreatePackage { | |||||
| func NewStartStorageCreatePackage(userID int64, bucketID int64, name string, storageID int64, path string, redundancy models.TypedRedundancyInfo, nodeAffinity *int64) StartStorageCreatePackage { | |||||
| return StartStorageCreatePackage{ | return StartStorageCreatePackage{ | ||||
| UserID: userID, | |||||
| BucketID: bucketID, | |||||
| Name: name, | |||||
| StorageID: storageID, | |||||
| Path: path, | |||||
| Redundancy: redundancy, | |||||
| UserID: userID, | |||||
| BucketID: bucketID, | |||||
| Name: name, | |||||
| StorageID: storageID, | |||||
| Path: path, | |||||
| Redundancy: redundancy, | |||||
| NodeAffinity: nodeAffinity, | |||||
| } | } | ||||
| } | } | ||||
| func NewStartStorageCreatePackageResp(taskID string) StartStorageCreatePackageResp { | func NewStartStorageCreatePackageResp(taskID string) StartStorageCreatePackageResp { | ||||