| @@ -3,6 +3,7 @@ package services | |||
| import ( | |||
| "database/sql" | |||
| "fmt" | |||
| "sort" | |||
| "github.com/jmoiron/sqlx" | |||
| "gitlink.org.cn/cloudream/common/consts/errorcode" | |||
| @@ -189,19 +190,19 @@ func (svc *Service) DeletePackage(msg *coormq.DeletePackage) (*coormq.DeletePack | |||
| return mq.ReplyOK(coormq.NewDeletePackageResp()) | |||
| } | |||
| func (svc *Service) GetCacheNodesByPackage(msg *coormq.GetCacheNodesByPackage) (*coormq.GetCacheNodesByPackageResp, *mq.CodeMessage) { | |||
| func (svc *Service) GetPackageCachedNodes(msg *coormq.GetPackageCachedNodes) (*coormq.GetPackageCachedNodesResp, *mq.CodeMessage) { | |||
| isAva, err := svc.db.Package().IsAvailable(svc.db.SQLCtx(), msg.UserID, msg.PackageID) | |||
| if err != nil { | |||
| logger.WithField("UserID", msg.UserID). | |||
| WithField("PackageID", msg.PackageID). | |||
| Warnf("check package available failed, err: %s", err.Error()) | |||
| return mq.ReplyFailed[coormq.GetCacheNodesByPackageResp](errorcode.OperationFailed, "check package available failed") | |||
| return mq.ReplyFailed[coormq.GetPackageCachedNodesResp](errorcode.OperationFailed, "check package available failed") | |||
| } | |||
| if !isAva { | |||
| logger.WithField("UserID", msg.UserID). | |||
| WithField("PackageID", msg.PackageID). | |||
| Warnf("package is not available to the user") | |||
| return mq.ReplyFailed[coormq.GetCacheNodesByPackageResp](errorcode.OperationFailed, "package is not available to the user") | |||
| return mq.ReplyFailed[coormq.GetPackageCachedNodesResp](errorcode.OperationFailed, "package is not available to the user") | |||
| } | |||
| pkg, err := svc.db.Package().GetByID(svc.db.SQLCtx(), msg.PackageID) | |||
| @@ -212,61 +213,87 @@ func (svc *Service) GetCacheNodesByPackage(msg *coormq.GetCacheNodesByPackage) ( | |||
| return nil, mq.Failed(errorcode.OperationFailed, "get package failed") | |||
| } | |||
| redunancyType := pkg.Redundancy.Type | |||
| uniqueNodeIDs := make(map[int64]bool) | |||
| var nodeIDs []int64 | |||
| if redunancyType == models.RedundancyRep { | |||
| var packageSize int64 | |||
| nodeInfoMap := make(map[int64]*models.NodePackageCachingInfo) | |||
| if pkg.Redundancy.IsRepInfo() { | |||
| // 备份方式为rep | |||
| objectRepDatas, err := svc.db.ObjectRep().GetWithNodeIDInPackage(svc.db.SQLCtx(), msg.PackageID) | |||
| if err != nil { | |||
| logger.WithField("PackageID", msg.PackageID). | |||
| Warnf("get objectRepDatas by packageID failed, err: %s", err.Error()) | |||
| return mq.ReplyFailed[coormq.GetCacheNodesByPackageResp](errorcode.OperationFailed, "get objectRepDatas by packageID failed") | |||
| return mq.ReplyFailed[coormq.GetPackageCachedNodesResp](errorcode.OperationFailed, "get objectRepDatas by packageID failed") | |||
| } | |||
| for _, data := range objectRepDatas { | |||
| packageSize += data.Object.Size | |||
| for _, nodeID := range data.NodeIDs { | |||
| if !uniqueNodeIDs[nodeID] { | |||
| uniqueNodeIDs[nodeID] = true | |||
| nodeIDs = append(nodeIDs, nodeID) | |||
| nodeInfo, exists := nodeInfoMap[nodeID] | |||
| if !exists { | |||
| nodeInfo = &models.NodePackageCachingInfo{ | |||
| NodeID: nodeID, | |||
| FileSize: data.Object.Size, | |||
| ObjectCount: 1, | |||
| } | |||
| } else { | |||
| nodeInfo.FileSize += data.Object.Size | |||
| nodeInfo.ObjectCount++ | |||
| } | |||
| nodeInfoMap[nodeID] = nodeInfo | |||
| } | |||
| } | |||
| } else if redunancyType == models.RedundancyEC { | |||
| } else if pkg.Redundancy.IsECInfo() { | |||
| // 备份方式为ec | |||
| objectECDatas, err := svc.db.ObjectBlock().GetWithNodeIDInPackage(svc.db.SQLCtx(), msg.PackageID) | |||
| if err != nil { | |||
| logger.WithField("PackageID", msg.PackageID). | |||
| Warnf("get objectECDatas by packageID failed, err: %s", err.Error()) | |||
| return mq.ReplyFailed[coormq.GetCacheNodesByPackageResp](errorcode.OperationFailed, "get objectECDatas by packageID failed") | |||
| return mq.ReplyFailed[coormq.GetPackageCachedNodesResp](errorcode.OperationFailed, "get objectECDatas by packageID failed") | |||
| } | |||
| for _, ecData := range objectECDatas { | |||
| packageSize += ecData.Object.Size | |||
| for _, block := range ecData.Blocks { | |||
| for _, nodeID := range block.NodeIDs { | |||
| if !uniqueNodeIDs[nodeID] { | |||
| uniqueNodeIDs[nodeID] = true | |||
| nodeIDs = append(nodeIDs, nodeID) | |||
| nodeInfo, exists := nodeInfoMap[nodeID] | |||
| if !exists { | |||
| nodeInfo = &models.NodePackageCachingInfo{ | |||
| NodeID: nodeID, | |||
| FileSize: ecData.Object.Size, | |||
| ObjectCount: 1, | |||
| } | |||
| } else { | |||
| nodeInfo.FileSize += ecData.Object.Size | |||
| nodeInfo.ObjectCount++ | |||
| } | |||
| nodeInfoMap[nodeID] = nodeInfo | |||
| } | |||
| } | |||
| } | |||
| } else { | |||
| logger.WithField("PackageID", msg.PackageID). | |||
| Warnf("Redundancy type %s is wrong", pkg.Redundancy.Type) | |||
| return mq.ReplyFailed[coormq.GetCacheNodesByPackageResp](errorcode.OperationFailed, "redundancy type is wrong") | |||
| return mq.ReplyFailed[coormq.GetPackageCachedNodesResp](errorcode.OperationFailed, "redundancy type is wrong") | |||
| } | |||
| var nodeInfos []models.NodePackageCachingInfo | |||
| for _, nodeInfo := range nodeInfoMap { | |||
| nodeInfos = append(nodeInfos, *nodeInfo) | |||
| } | |||
| return mq.ReplyOK(coormq.NewGetCacheNodesByPackageResp(nodeIDs, redunancyType)) | |||
| sort.Slice(nodeInfos, func(i, j int) bool { | |||
| return nodeInfos[i].NodeID < nodeInfos[j].NodeID | |||
| }) | |||
| return mq.ReplyOK(coormq.NewGetPackageCachedNodesResp(nodeInfos, packageSize, pkg.Redundancy.Type)) | |||
| } | |||
| func (svc *Service) GetStorageNodesByPackage(msg *coormq.GetStorageNodesByPackage) (*coormq.GetStorageNodesByPackageResp, *mq.CodeMessage) { | |||
| func (svc *Service) GetPackageLoadedNodes(msg *coormq.GetPackageLoadedNodes) (*coormq.GetPackageLoadedNodesResp, *mq.CodeMessage) { | |||
| storages, err := svc.db.StoragePackage().FindPackageStorages(svc.db.SQLCtx(), msg.PackageID) | |||
| if err != nil { | |||
| logger.WithField("PackageID", msg.PackageID). | |||
| Warnf("get storages by packageID failed, err: %s", err.Error()) | |||
| return mq.ReplyFailed[coormq.GetStorageNodesByPackageResp](errorcode.OperationFailed, "get storages by packageID failed") | |||
| return mq.ReplyFailed[coormq.GetPackageLoadedNodesResp](errorcode.OperationFailed, "get storages by packageID failed") | |||
| } | |||
| uniqueNodeIDs := make(map[int64]bool) | |||
| @@ -278,5 +305,5 @@ func (svc *Service) GetStorageNodesByPackage(msg *coormq.GetStorageNodesByPackag | |||
| } | |||
| } | |||
| return mq.ReplyOK(coormq.NewGetStorageNodesByPackageResp(nodeIDs)) | |||
| return mq.ReplyOK(coormq.NewGetPackageLoadedNodesResp(nodeIDs)) | |||
| } | |||