diff --git a/internal/services/package.go b/internal/services/package.go index be6fe90..afe5e4b 100644 --- a/internal/services/package.go +++ b/internal/services/package.go @@ -3,6 +3,7 @@ package services import ( "database/sql" "fmt" + "sort" "github.com/jmoiron/sqlx" "gitlink.org.cn/cloudream/common/consts/errorcode" @@ -189,19 +190,19 @@ func (svc *Service) DeletePackage(msg *coormq.DeletePackage) (*coormq.DeletePack return mq.ReplyOK(coormq.NewDeletePackageResp()) } -func (svc *Service) GetCacheNodesByPackage(msg *coormq.GetCacheNodesByPackage) (*coormq.GetCacheNodesByPackageResp, *mq.CodeMessage) { +func (svc *Service) GetPackageCachedNodes(msg *coormq.GetPackageCachedNodes) (*coormq.GetPackageCachedNodesResp, *mq.CodeMessage) { isAva, err := svc.db.Package().IsAvailable(svc.db.SQLCtx(), msg.UserID, msg.PackageID) if err != nil { logger.WithField("UserID", msg.UserID). WithField("PackageID", msg.PackageID). Warnf("check package available failed, err: %s", err.Error()) - return mq.ReplyFailed[coormq.GetCacheNodesByPackageResp](errorcode.OperationFailed, "check package available failed") + return mq.ReplyFailed[coormq.GetPackageCachedNodesResp](errorcode.OperationFailed, "check package available failed") } if !isAva { logger.WithField("UserID", msg.UserID). WithField("PackageID", msg.PackageID). Warnf("package is not available to the user") - return mq.ReplyFailed[coormq.GetCacheNodesByPackageResp](errorcode.OperationFailed, "package is not available to the user") + return mq.ReplyFailed[coormq.GetPackageCachedNodesResp](errorcode.OperationFailed, "package is not available to the user") } pkg, err := svc.db.Package().GetByID(svc.db.SQLCtx(), msg.PackageID) @@ -212,61 +213,87 @@ func (svc *Service) GetCacheNodesByPackage(msg *coormq.GetCacheNodesByPackage) ( return nil, mq.Failed(errorcode.OperationFailed, "get package failed") } - redunancyType := pkg.Redundancy.Type - - uniqueNodeIDs := make(map[int64]bool) - var nodeIDs []int64 - if redunancyType == models.RedundancyRep { + var packageSize int64 + nodeInfoMap := make(map[int64]*models.NodePackageCachingInfo) + if pkg.Redundancy.IsRepInfo() { // 备份方式为rep objectRepDatas, err := svc.db.ObjectRep().GetWithNodeIDInPackage(svc.db.SQLCtx(), msg.PackageID) if err != nil { logger.WithField("PackageID", msg.PackageID). Warnf("get objectRepDatas by packageID failed, err: %s", err.Error()) - return mq.ReplyFailed[coormq.GetCacheNodesByPackageResp](errorcode.OperationFailed, "get objectRepDatas by packageID failed") + return mq.ReplyFailed[coormq.GetPackageCachedNodesResp](errorcode.OperationFailed, "get objectRepDatas by packageID failed") } for _, data := range objectRepDatas { + packageSize += data.Object.Size for _, nodeID := range data.NodeIDs { - if !uniqueNodeIDs[nodeID] { - uniqueNodeIDs[nodeID] = true - nodeIDs = append(nodeIDs, nodeID) + + nodeInfo, exists := nodeInfoMap[nodeID] + if !exists { + nodeInfo = &models.NodePackageCachingInfo{ + NodeID: nodeID, + FileSize: data.Object.Size, + ObjectCount: 1, + } + } else { + nodeInfo.FileSize += data.Object.Size + nodeInfo.ObjectCount++ } + nodeInfoMap[nodeID] = nodeInfo } } - } else if redunancyType == models.RedundancyEC { + } else if pkg.Redundancy.IsECInfo() { // 备份方式为ec objectECDatas, err := svc.db.ObjectBlock().GetWithNodeIDInPackage(svc.db.SQLCtx(), msg.PackageID) if err != nil { logger.WithField("PackageID", msg.PackageID). Warnf("get objectECDatas by packageID failed, err: %s", err.Error()) - return mq.ReplyFailed[coormq.GetCacheNodesByPackageResp](errorcode.OperationFailed, "get objectECDatas by packageID failed") + return mq.ReplyFailed[coormq.GetPackageCachedNodesResp](errorcode.OperationFailed, "get objectECDatas by packageID failed") } for _, ecData := range objectECDatas { + packageSize += ecData.Object.Size for _, block := range ecData.Blocks { for _, nodeID := range block.NodeIDs { - if !uniqueNodeIDs[nodeID] { - uniqueNodeIDs[nodeID] = true - nodeIDs = append(nodeIDs, nodeID) + + nodeInfo, exists := nodeInfoMap[nodeID] + if !exists { + nodeInfo = &models.NodePackageCachingInfo{ + NodeID: nodeID, + FileSize: ecData.Object.Size, + ObjectCount: 1, + } + } else { + nodeInfo.FileSize += ecData.Object.Size + nodeInfo.ObjectCount++ } + nodeInfoMap[nodeID] = nodeInfo } } } } else { logger.WithField("PackageID", msg.PackageID). Warnf("Redundancy type %s is wrong", pkg.Redundancy.Type) - return mq.ReplyFailed[coormq.GetCacheNodesByPackageResp](errorcode.OperationFailed, "redundancy type is wrong") + return mq.ReplyFailed[coormq.GetPackageCachedNodesResp](errorcode.OperationFailed, "redundancy type is wrong") + } + + var nodeInfos []models.NodePackageCachingInfo + for _, nodeInfo := range nodeInfoMap { + nodeInfos = append(nodeInfos, *nodeInfo) } - return mq.ReplyOK(coormq.NewGetCacheNodesByPackageResp(nodeIDs, redunancyType)) + sort.Slice(nodeInfos, func(i, j int) bool { + return nodeInfos[i].NodeID < nodeInfos[j].NodeID + }) + return mq.ReplyOK(coormq.NewGetPackageCachedNodesResp(nodeInfos, packageSize, pkg.Redundancy.Type)) } -func (svc *Service) GetStorageNodesByPackage(msg *coormq.GetStorageNodesByPackage) (*coormq.GetStorageNodesByPackageResp, *mq.CodeMessage) { +func (svc *Service) GetPackageLoadedNodes(msg *coormq.GetPackageLoadedNodes) (*coormq.GetPackageLoadedNodesResp, *mq.CodeMessage) { storages, err := svc.db.StoragePackage().FindPackageStorages(svc.db.SQLCtx(), msg.PackageID) if err != nil { logger.WithField("PackageID", msg.PackageID). Warnf("get storages by packageID failed, err: %s", err.Error()) - return mq.ReplyFailed[coormq.GetStorageNodesByPackageResp](errorcode.OperationFailed, "get storages by packageID failed") + return mq.ReplyFailed[coormq.GetPackageLoadedNodesResp](errorcode.OperationFailed, "get storages by packageID failed") } uniqueNodeIDs := make(map[int64]bool) @@ -278,5 +305,5 @@ func (svc *Service) GetStorageNodesByPackage(msg *coormq.GetStorageNodesByPackag } } - return mq.ReplyOK(coormq.NewGetStorageNodesByPackageResp(nodeIDs)) + return mq.ReplyOK(coormq.NewGetPackageLoadedNodesResp(nodeIDs)) }