From 278b418a01fef1cc6d0d17807f6613220d13c05a Mon Sep 17 00:00:00 2001 From: songjc <969378911@qq.com> Date: Thu, 24 Aug 2023 15:40:51 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/services/package.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/internal/services/package.go b/internal/services/package.go index be6fe90..ec756af 100644 --- a/internal/services/package.go +++ b/internal/services/package.go @@ -189,19 +189,19 @@ func (svc *Service) DeletePackage(msg *coormq.DeletePackage) (*coormq.DeletePack return mq.ReplyOK(coormq.NewDeletePackageResp()) } -func (svc *Service) GetCacheNodesByPackage(msg *coormq.GetCacheNodesByPackage) (*coormq.GetCacheNodesByPackageResp, *mq.CodeMessage) { +func (svc *Service) GetPackageCachedNodes(msg *coormq.GetPackageCachedNodes) (*coormq.GetPackageCachedNodesResp, *mq.CodeMessage) { isAva, err := svc.db.Package().IsAvailable(svc.db.SQLCtx(), msg.UserID, msg.PackageID) if err != nil { logger.WithField("UserID", msg.UserID). WithField("PackageID", msg.PackageID). Warnf("check package available failed, err: %s", err.Error()) - return mq.ReplyFailed[coormq.GetCacheNodesByPackageResp](errorcode.OperationFailed, "check package available failed") + return mq.ReplyFailed[coormq.GetPackageCachedNodesResp](errorcode.OperationFailed, "check package available failed") } if !isAva { logger.WithField("UserID", msg.UserID). WithField("PackageID", msg.PackageID). Warnf("package is not available to the user") - return mq.ReplyFailed[coormq.GetCacheNodesByPackageResp](errorcode.OperationFailed, "package is not available to the user") + return mq.ReplyFailed[coormq.GetPackageCachedNodesResp](errorcode.OperationFailed, "package is not available to the user") } pkg, err := svc.db.Package().GetByID(svc.db.SQLCtx(), msg.PackageID) @@ -222,7 +222,7 @@ func (svc *Service) GetCacheNodesByPackage(msg *coormq.GetCacheNodesByPackage) ( if err != nil { logger.WithField("PackageID", msg.PackageID). Warnf("get objectRepDatas by packageID failed, err: %s", err.Error()) - return mq.ReplyFailed[coormq.GetCacheNodesByPackageResp](errorcode.OperationFailed, "get objectRepDatas by packageID failed") + return mq.ReplyFailed[coormq.GetPackageCachedNodesResp](errorcode.OperationFailed, "get objectRepDatas by packageID failed") } for _, data := range objectRepDatas { @@ -239,7 +239,7 @@ func (svc *Service) GetCacheNodesByPackage(msg *coormq.GetCacheNodesByPackage) ( if err != nil { logger.WithField("PackageID", msg.PackageID). Warnf("get objectECDatas by packageID failed, err: %s", err.Error()) - return mq.ReplyFailed[coormq.GetCacheNodesByPackageResp](errorcode.OperationFailed, "get objectECDatas by packageID failed") + return mq.ReplyFailed[coormq.GetPackageCachedNodesResp](errorcode.OperationFailed, "get objectECDatas by packageID failed") } for _, ecData := range objectECDatas { @@ -255,18 +255,18 @@ func (svc *Service) GetCacheNodesByPackage(msg *coormq.GetCacheNodesByPackage) ( } else { logger.WithField("PackageID", msg.PackageID). Warnf("Redundancy type %s is wrong", pkg.Redundancy.Type) - return mq.ReplyFailed[coormq.GetCacheNodesByPackageResp](errorcode.OperationFailed, "redundancy type is wrong") + return mq.ReplyFailed[coormq.GetPackageCachedNodesResp](errorcode.OperationFailed, "redundancy type is wrong") } - return mq.ReplyOK(coormq.NewGetCacheNodesByPackageResp(nodeIDs, redunancyType)) + return mq.ReplyOK(coormq.NewGetPackageCachedNodesResp(nodeIDs, redunancyType)) } -func (svc *Service) GetStorageNodesByPackage(msg *coormq.GetStorageNodesByPackage) (*coormq.GetStorageNodesByPackageResp, *mq.CodeMessage) { +func (svc *Service) GetPackageLoadedNodes(msg *coormq.GetPackageLoadedNodes) (*coormq.GetPackageLoadedNodesResp, *mq.CodeMessage) { storages, err := svc.db.StoragePackage().FindPackageStorages(svc.db.SQLCtx(), msg.PackageID) if err != nil { logger.WithField("PackageID", msg.PackageID). Warnf("get storages by packageID failed, err: %s", err.Error()) - return mq.ReplyFailed[coormq.GetStorageNodesByPackageResp](errorcode.OperationFailed, "get storages by packageID failed") + return mq.ReplyFailed[coormq.GetPackageLoadedNodesResp](errorcode.OperationFailed, "get storages by packageID failed") } uniqueNodeIDs := make(map[int64]bool) @@ -278,5 +278,5 @@ func (svc *Service) GetStorageNodesByPackage(msg *coormq.GetStorageNodesByPackag } } - return mq.ReplyOK(coormq.NewGetStorageNodesByPackageResp(nodeIDs)) + return mq.ReplyOK(coormq.NewGetPackageLoadedNodesResp(nodeIDs)) } From 20b279e5df8a1643025f747b7d9d5c4653e88b9c Mon Sep 17 00:00:00 2001 From: songjc <969378911@qq.com> Date: Fri, 25 Aug 2023 09:30:57 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=8E=A5=E5=8F=A3?= =?UTF-8?q?=E8=BF=94=E5=9B=9E=E4=BF=A1=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/services/package.go | 54 ++++++++++++++++++++++++++++-------- 1 file changed, 42 insertions(+), 12 deletions(-) diff --git a/internal/services/package.go b/internal/services/package.go index ec756af..8944e86 100644 --- a/internal/services/package.go +++ b/internal/services/package.go @@ -3,6 +3,7 @@ package services import ( "database/sql" "fmt" + "sort" "github.com/jmoiron/sqlx" "gitlink.org.cn/cloudream/common/consts/errorcode" @@ -212,11 +213,12 @@ func (svc *Service) GetPackageCachedNodes(msg *coormq.GetPackageCachedNodes) (*c return nil, mq.Failed(errorcode.OperationFailed, "get package failed") } - redunancyType := pkg.Redundancy.Type + // uniqueNodeIDs := make(map[int64]bool) + // var nodeIDs []int64 + var packageSize int64 - uniqueNodeIDs := make(map[int64]bool) - var nodeIDs []int64 - if redunancyType == models.RedundancyRep { + nodeInfoMap := make(map[int64]*models.NodePackageCachingInfo) + if pkg.Redundancy.IsRepInfo() { // 备份方式为rep objectRepDatas, err := svc.db.ObjectRep().GetWithNodeIDInPackage(svc.db.SQLCtx(), msg.PackageID) if err != nil { @@ -226,14 +228,24 @@ func (svc *Service) GetPackageCachedNodes(msg *coormq.GetPackageCachedNodes) (*c } for _, data := range objectRepDatas { + packageSize += data.Object.Size for _, nodeID := range data.NodeIDs { - if !uniqueNodeIDs[nodeID] { - uniqueNodeIDs[nodeID] = true - nodeIDs = append(nodeIDs, nodeID) + + nodeInfo, exists := nodeInfoMap[nodeID] + if !exists { + nodeInfo = &models.NodePackageCachingInfo{ + NodeID: nodeID, + FileSize: data.Object.Size, + ObjectCount: 1, + } + } else { + nodeInfo.FileSize += data.Object.Size + nodeInfo.ObjectCount++ } + nodeInfoMap[nodeID] = nodeInfo } } - } else if redunancyType == models.RedundancyEC { + } else if pkg.Redundancy.IsECInfo() { // 备份方式为ec objectECDatas, err := svc.db.ObjectBlock().GetWithNodeIDInPackage(svc.db.SQLCtx(), msg.PackageID) if err != nil { @@ -243,12 +255,22 @@ func (svc *Service) GetPackageCachedNodes(msg *coormq.GetPackageCachedNodes) (*c } for _, ecData := range objectECDatas { + packageSize += ecData.Object.Size for _, block := range ecData.Blocks { for _, nodeID := range block.NodeIDs { - if !uniqueNodeIDs[nodeID] { - uniqueNodeIDs[nodeID] = true - nodeIDs = append(nodeIDs, nodeID) + + nodeInfo, exists := nodeInfoMap[nodeID] + if !exists { + nodeInfo = &models.NodePackageCachingInfo{ + NodeID: nodeID, + FileSize: ecData.Object.Size, + ObjectCount: 1, + } + } else { + nodeInfo.FileSize += ecData.Object.Size + nodeInfo.ObjectCount++ } + nodeInfoMap[nodeID] = nodeInfo } } } @@ -258,7 +280,15 @@ func (svc *Service) GetPackageCachedNodes(msg *coormq.GetPackageCachedNodes) (*c return mq.ReplyFailed[coormq.GetPackageCachedNodesResp](errorcode.OperationFailed, "redundancy type is wrong") } - return mq.ReplyOK(coormq.NewGetPackageCachedNodesResp(nodeIDs, redunancyType)) + var nodeInfos []models.NodePackageCachingInfo + for _, nodeInfo := range nodeInfoMap { + nodeInfos = append(nodeInfos, *nodeInfo) + } + + sort.Slice(nodeInfos, func(i, j int) bool { + return nodeInfos[i].NodeID < nodeInfos[j].NodeID + }) + return mq.ReplyOK(coormq.NewGetPackageCachedNodesResp(nodeInfos, packageSize, pkg.Redundancy.Type)) } func (svc *Service) GetPackageLoadedNodes(msg *coormq.GetPackageLoadedNodes) (*coormq.GetPackageLoadedNodesResp, *mq.CodeMessage) { From 5096d5db59e6f6781074a1ce02a512dd99466e64 Mon Sep 17 00:00:00 2001 From: songjc <969378911@qq.com> Date: Fri, 25 Aug 2023 09:45:44 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/services/package.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/internal/services/package.go b/internal/services/package.go index 8944e86..afe5e4b 100644 --- a/internal/services/package.go +++ b/internal/services/package.go @@ -213,10 +213,7 @@ func (svc *Service) GetPackageCachedNodes(msg *coormq.GetPackageCachedNodes) (*c return nil, mq.Failed(errorcode.OperationFailed, "get package failed") } - // uniqueNodeIDs := make(map[int64]bool) - // var nodeIDs []int64 var packageSize int64 - nodeInfoMap := make(map[int64]*models.NodePackageCachingInfo) if pkg.Redundancy.IsRepInfo() { // 备份方式为rep