From d4f8540dfaca2e5eb680ef572c984f3a7755e9a8 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Wed, 13 Mar 2024 11:02:26 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E5=92=8C=E8=B0=83=E6=95=B4?= =?UTF-8?q?=E8=BF=87=E7=A8=8B=E4=B8=AD=E5=B0=86=E6=89=80=E6=9C=89=E7=9A=84?= =?UTF-8?q?=E5=B0=8F=E6=96=87=E4=BB=B6=E4=B8=80=E8=B5=B7=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- agent/internal/mq/object.go | 6 +- agent/internal/task/ipfs_pin.go | 35 +- common/pkgs/cmd/create_package.go | 13 +- common/pkgs/mq/agent/object.go | 8 +- common/pkgs/mq/coordinator/package.go | 4 +- .../event/check_package_redundancy.go | 73 ++- scanner/internal/event/clean_pinned.go | 503 +++++++++++------- 7 files changed, 392 insertions(+), 250 deletions(-) diff --git a/agent/internal/mq/object.go b/agent/internal/mq/object.go index 66bcfd7..dbdaacc 100644 --- a/agent/internal/mq/object.go +++ b/agent/internal/mq/object.go @@ -9,12 +9,12 @@ import ( ) func (svc *Service) PinObject(msg *agtmq.PinObject) (*agtmq.PinObjectResp, *mq.CodeMessage) { - logger.WithField("FileHash", msg.FileHash).Debugf("pin object") + logger.WithField("FileHash", msg.FileHashes).Debugf("pin object") - tsk := svc.taskManager.StartComparable(task.NewIPFSPin(msg.FileHash)) + tsk := svc.taskManager.StartNew(task.NewIPFSPin(msg.FileHashes)) if tsk.Error() != nil { - logger.WithField("FileHash", msg.FileHash). + logger.WithField("FileHash", msg.FileHashes). Warnf("pin object failed, err: %s", tsk.Error().Error()) return nil, mq.Failed(errorcode.OperationFailed, "pin object failed") } diff --git a/agent/internal/task/ipfs_pin.go b/agent/internal/task/ipfs_pin.go index 4807c99..f58ed77 100644 --- a/agent/internal/task/ipfs_pin.go +++ b/agent/internal/task/ipfs_pin.go @@ -10,24 +10,15 @@ import ( ) type IPFSPin struct { - FileHash string + FileHashes []string } -func NewIPFSPin(fileHash string) *IPFSPin { +func NewIPFSPin(fileHashes []string) *IPFSPin { return &IPFSPin{ - FileHash: fileHash, + FileHashes: fileHashes, } } -func (t *IPFSPin) Compare(other *Task) bool { - tsk, ok := other.Body().(*IPFSPin) - if !ok { - return false - } - - return t.FileHash == tsk.FileHash -} - func (t *IPFSPin) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { log := logger.WithType[IPFSPin]("Task") log.Debugf("begin with %v", logger.FormatStruct(t)) @@ -45,15 +36,17 @@ func (t *IPFSPin) Execute(task *task.Task[TaskContext], ctx TaskContext, complet } defer ipfsCli.Close() - err = ipfsCli.Pin(t.FileHash) - if err != nil { - err := fmt.Errorf("pin file failed, err: %w", err) - log.WithField("FileHash", t.FileHash).Warn(err.Error()) - - complete(err, CompleteOption{ - RemovingDelay: time.Minute, - }) - return + for _, fileHash := range t.FileHashes { + err = ipfsCli.Pin(fileHash) + if err != nil { + err := fmt.Errorf("pin file failed, err: %w", err) + log.WithField("FileHash", t.FileHashes).Warn(err.Error()) + + complete(err, CompleteOption{ + RemovingDelay: time.Minute, + }) + return + } } complete(nil, CompleteOption{ diff --git a/common/pkgs/cmd/create_package.go b/common/pkgs/cmd/create_package.go index 7a5dd77..f250dc5 100644 --- a/common/pkgs/cmd/create_package.go +++ b/common/pkgs/cmd/create_package.go @@ -139,6 +139,10 @@ func uploadAndUpdatePackage(packageID cdssdk.PackageID, objectIter iterator.Uplo if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + // 为所有文件选择相同的上传节点 + uploadNode := chooseUploadNode(userNodes, nodeAffinity) var uploadRets []ObjectUploadResult //上传文件夹 @@ -154,8 +158,6 @@ func uploadAndUpdatePackage(packageID cdssdk.PackageID, objectIter iterator.Uplo err = func() error { defer objInfo.File.Close() - uploadNode := chooseUploadNode(userNodes, nodeAffinity) - fileHash, err := uploadFile(objInfo.File, uploadNode) if err != nil { return fmt.Errorf("uploading file: %w", err) @@ -165,9 +167,6 @@ func uploadAndUpdatePackage(packageID cdssdk.PackageID, objectIter iterator.Uplo Info: objInfo, Error: err, }) - if err != nil { - return fmt.Errorf("uploading object: %w", err) - } adds = append(adds, coormq.NewAddObjectEntry(objInfo.Path, objInfo.Size, fileHash, uploadNode.Node.NodeID)) return nil @@ -177,7 +176,7 @@ func uploadAndUpdatePackage(packageID cdssdk.PackageID, objectIter iterator.Uplo } } - _, err = coorCli.UpdateECPackage(coormq.NewUpdatePackage(packageID, adds, nil)) + _, err = coorCli.UpdatePackage(coormq.NewUpdatePackage(packageID, adds, nil)) if err != nil { return nil, fmt.Errorf("updating package: %w", err) } @@ -262,7 +261,7 @@ func pinIPFSFile(nodeID cdssdk.NodeID, fileHash string) error { defer stgglb.AgentMQPool.Release(agtCli) // 然后让最近节点pin本地上传的文件 - _, err = agtCli.PinObject(agtmq.ReqPinObject(fileHash, false)) + _, err = agtCli.PinObject(agtmq.ReqPinObject([]string{fileHash}, false)) if err != nil { return fmt.Errorf("start pinning object: %w", err) } diff --git a/common/pkgs/mq/agent/object.go b/common/pkgs/mq/agent/object.go index a3b56ac..fd1b952 100644 --- a/common/pkgs/mq/agent/object.go +++ b/common/pkgs/mq/agent/object.go @@ -11,16 +11,16 @@ var _ = Register(Service.PinObject) type PinObject struct { mq.MessageBodyBase - FileHash string `json:"fileHash"` - IsBackground bool `json:"isBackground"` + FileHashes []string `json:"fileHashes"` + IsBackground bool `json:"isBackground"` } type PinObjectResp struct { mq.MessageBodyBase } -func ReqPinObject(fileHash string, isBackground bool) *PinObject { +func ReqPinObject(fileHashes []string, isBackground bool) *PinObject { return &PinObject{ - FileHash: fileHash, + FileHashes: fileHashes, IsBackground: isBackground, } } diff --git a/common/pkgs/mq/coordinator/package.go b/common/pkgs/mq/coordinator/package.go index 52c51f6..e831069 100644 --- a/common/pkgs/mq/coordinator/package.go +++ b/common/pkgs/mq/coordinator/package.go @@ -79,7 +79,7 @@ func (client *Client) CreatePackage(msg *CreatePackage) (*CreatePackageResp, err return mq.Request(Service.CreatePackage, client.rabbitCli, msg) } -// 更新EC备份模式的Package +// 更新Package var _ = Register(Service.UpdatePackage) type UpdatePackage struct { @@ -116,7 +116,7 @@ func NewAddObjectEntry(path string, size int64, fileHash string, nodeIDs cdssdk. NodeID: nodeIDs, } } -func (client *Client) UpdateECPackage(msg *UpdatePackage) (*UpdatePackageResp, error) { +func (client *Client) UpdatePackage(msg *UpdatePackage) (*UpdatePackageResp, error) { return mq.Request(Service.UpdatePackage, client.rabbitCli, msg) } diff --git a/scanner/internal/event/check_package_redundancy.go b/scanner/internal/event/check_package_redundancy.go index 357218a..339a09a 100644 --- a/scanner/internal/event/check_package_redundancy.go +++ b/scanner/internal/event/check_package_redundancy.go @@ -8,7 +8,7 @@ import ( "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/common/utils/sort" + "gitlink.org.cn/cloudream/common/utils/sort2" stgglb "gitlink.org.cn/cloudream/storage/common/globals" stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" @@ -110,7 +110,10 @@ func (t *CheckPackageRedundancy) Execute(execCtx ExecuteContext) { defRep := cdssdk.DefaultRepRedundancy defEC := cdssdk.DefaultECRedundancy + // TODO 目前rep的备份数量固定为2,所以这里直接选出两个节点 + mostBlockNodeIDs := t.summaryRepObjectBlockNodes(getObjs.Objects, 2) newRepNodes := t.chooseNewNodesForRep(&defRep, allNodes) + rechoosedRepNodes := t.rechooseNodesForRep(mostBlockNodeIDs, &defRep, allNodes) newECNodes := t.chooseNewNodesForEC(&defEC, allNodes) // 加锁 @@ -149,8 +152,7 @@ func (t *CheckPackageRedundancy) Execute(execCtx ExecuteContext) { log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: rep -> ec") entry, err = t.repToEC(obj, &defEC, newECNodes) } else { - uploadNodes := t.rechooseNodesForRep(obj, red, allNodes) - entry, err = t.repToRep(obj, &defRep, uploadNodes) + entry, err = t.repToRep(obj, &defRep, rechoosedRepNodes) } case *cdssdk.ECRedundancy: @@ -183,8 +185,43 @@ func (t *CheckPackageRedundancy) Execute(execCtx ExecuteContext) { } } +// 统计每个对象块所在的节点,选出块最多的不超过nodeCnt个节点 +func (t *CheckPackageRedundancy) summaryRepObjectBlockNodes(objs []stgmod.ObjectDetail, nodeCnt int) []cdssdk.NodeID { + type nodeBlocks struct { + NodeID cdssdk.NodeID + Count int + } + + nodeBlocksMap := make(map[cdssdk.NodeID]*nodeBlocks) + for _, obj := range objs { + shouldUseEC := obj.Object.Size > config.Cfg().ECFileSizeThreshold + if _, ok := obj.Object.Redundancy.(*cdssdk.RepRedundancy); ok && !shouldUseEC { + for _, block := range obj.Blocks { + if _, ok := nodeBlocksMap[block.NodeID]; !ok { + nodeBlocksMap[block.NodeID] = &nodeBlocks{ + NodeID: block.NodeID, + Count: 0, + } + } + nodeBlocksMap[block.NodeID].Count++ + } + } + } + + nodes := lo.Values(nodeBlocksMap) + sort2.Sort(nodes, func(left *nodeBlocks, right *nodeBlocks) int { + return right.Count - left.Count + }) + + ids := lo.Map(nodes, func(item *nodeBlocks, idx int) cdssdk.NodeID { return item.NodeID }) + if len(ids) > nodeCnt { + ids = ids[:nodeCnt] + } + return ids +} + func (t *CheckPackageRedundancy) chooseNewNodesForRep(red *cdssdk.RepRedundancy, allNodes map[cdssdk.NodeID]*NodeLoadInfo) []*NodeLoadInfo { - sortedNodes := sort.Sort(lo.Values(allNodes), func(left *NodeLoadInfo, right *NodeLoadInfo) int { + sortedNodes := sort2.Sort(lo.Values(allNodes), func(left *NodeLoadInfo, right *NodeLoadInfo) int { dm := right.LoadsRecentMonth - left.LoadsRecentMonth if dm != 0 { return dm @@ -197,7 +234,7 @@ func (t *CheckPackageRedundancy) chooseNewNodesForRep(red *cdssdk.RepRedundancy, } func (t *CheckPackageRedundancy) chooseNewNodesForEC(red *cdssdk.ECRedundancy, allNodes map[cdssdk.NodeID]*NodeLoadInfo) []*NodeLoadInfo { - sortedNodes := sort.Sort(lo.Values(allNodes), func(left *NodeLoadInfo, right *NodeLoadInfo) int { + sortedNodes := sort2.Sort(lo.Values(allNodes), func(left *NodeLoadInfo, right *NodeLoadInfo) int { dm := right.LoadsRecentMonth - left.LoadsRecentMonth if dm != 0 { return dm @@ -209,36 +246,36 @@ func (t *CheckPackageRedundancy) chooseNewNodesForEC(red *cdssdk.ECRedundancy, a return t.chooseSoManyNodes(red.N, sortedNodes) } -func (t *CheckPackageRedundancy) rechooseNodesForRep(obj stgmod.ObjectDetail, red *cdssdk.RepRedundancy, allNodes map[cdssdk.NodeID]*NodeLoadInfo) []*NodeLoadInfo { +func (t *CheckPackageRedundancy) rechooseNodesForRep(mostBlockNodeIDs []cdssdk.NodeID, red *cdssdk.RepRedundancy, allNodes map[cdssdk.NodeID]*NodeLoadInfo) []*NodeLoadInfo { type rechooseNode struct { *NodeLoadInfo - CachedBlockIndex int + HasBlock bool } var rechooseNodes []*rechooseNode for _, node := range allNodes { - cachedBlockIndex := -1 - for _, block := range obj.Blocks { - if block.NodeID == node.Node.NodeID { - cachedBlockIndex = block.Index + hasBlock := false + for _, id := range mostBlockNodeIDs { + if id == node.Node.NodeID { + hasBlock = true break } } rechooseNodes = append(rechooseNodes, &rechooseNode{ - NodeLoadInfo: node, - CachedBlockIndex: cachedBlockIndex, + NodeLoadInfo: node, + HasBlock: hasBlock, }) } - sortedNodes := sort.Sort(rechooseNodes, func(left *rechooseNode, right *rechooseNode) int { + sortedNodes := sort2.Sort(rechooseNodes, func(left *rechooseNode, right *rechooseNode) int { dm := right.LoadsRecentMonth - left.LoadsRecentMonth if dm != 0 { return dm } // 已经缓存了文件块的节点优先选择 - v := sort.CmpBool(right.CachedBlockIndex > -1, left.CachedBlockIndex > -1) + v := sort2.CmpBool(right.HasBlock, left.HasBlock) if v != 0 { return v } @@ -271,14 +308,14 @@ func (t *CheckPackageRedundancy) rechooseNodesForEC(obj stgmod.ObjectDetail, red }) } - sortedNodes := sort.Sort(rechooseNodes, func(left *rechooseNode, right *rechooseNode) int { + sortedNodes := sort2.Sort(rechooseNodes, func(left *rechooseNode, right *rechooseNode) int { dm := right.LoadsRecentMonth - left.LoadsRecentMonth if dm != 0 { return dm } // 已经缓存了文件块的节点优先选择 - v := sort.CmpBool(right.CachedBlockIndex > -1, left.CachedBlockIndex > -1) + v := sort2.CmpBool(right.CachedBlockIndex > -1, left.CachedBlockIndex > -1) if v != 0 { return v } @@ -627,7 +664,7 @@ func (t *CheckPackageRedundancy) pinObject(nodeID cdssdk.NodeID, fileHash string } defer stgglb.AgentMQPool.Release(agtCli) - _, err = agtCli.PinObject(agtmq.ReqPinObject(fileHash, false)) + _, err = agtCli.PinObject(agtmq.ReqPinObject([]string{fileHash}, false)) if err != nil { return fmt.Errorf("start pinning object: %w", err) } diff --git a/scanner/internal/event/clean_pinned.go b/scanner/internal/event/clean_pinned.go index efa6d1b..0f7bd6e 100644 --- a/scanner/internal/event/clean_pinned.go +++ b/scanner/internal/event/clean_pinned.go @@ -4,23 +4,23 @@ import ( "fmt" "math" "math/rand" - "strconv" + "sync" "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/bitmap" "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - mylo "gitlink.org.cn/cloudream/common/utils/lo" + "gitlink.org.cn/cloudream/common/utils/lo2" mymath "gitlink.org.cn/cloudream/common/utils/math" - myref "gitlink.org.cn/cloudream/common/utils/reflect" - mysort "gitlink.org.cn/cloudream/common/utils/sort" + "gitlink.org.cn/cloudream/common/utils/sort2" "gitlink.org.cn/cloudream/storage/common/consts" stgglb "gitlink.org.cn/cloudream/storage/common/globals" stgmod "gitlink.org.cn/cloudream/storage/common/models" - "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/plans" + agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event" + "gitlink.org.cn/cloudream/storage/scanner/internal/config" ) type CleanPinned struct { @@ -67,20 +67,134 @@ func (t *CleanPinned) Execute(execCtx ExecuteContext) { } readerNodeIDs := lo.Map(getLoadLog.Logs, func(item coormq.PackageLoadLogDetail, idx int) cdssdk.NodeID { return item.Storage.NodeID }) - var changeRedEntries []coormq.ChangeObjectRedundancyEntry + var allNodeID []cdssdk.NodeID for _, obj := range getObjs.Objects { - entry, err := t.doOne(execCtx, readerNodeIDs, coorCli, obj) + for _, block := range obj.Blocks { + allNodeID = append(allNodeID, block.NodeID) + } + allNodeID = append(allNodeID, obj.PinnedAt...) + } + + getNodeResp, err := coorCli.GetNodes(coormq.NewGetNodes(lo.Union(allNodeID))) + if err != nil { + log.Warnf("getting nodes: %s", err.Error()) + return + } + + allNodeInfos := make(map[cdssdk.NodeID]*cdssdk.Node) + for _, node := range getNodeResp.Nodes { + n := node + allNodeInfos[node.NodeID] = &n + } + + // 只对ec和rep对象进行处理 + var ecObjects []stgmod.ObjectDetail + var repObjects []stgmod.ObjectDetail + for _, obj := range getObjs.Objects { + if _, ok := obj.Object.Redundancy.(*cdssdk.ECRedundancy); ok { + ecObjects = append(ecObjects, obj) + } else if _, ok := obj.Object.Redundancy.(*cdssdk.RepRedundancy); ok { + repObjects = append(repObjects, obj) + } + } + + planBld := plans.NewPlanBuilder() + pinPlans := make(map[cdssdk.NodeID]*[]string) + + // 对于rep对象,统计出所有对象块分布最多的两个节点,用这两个节点代表所有rep对象块的分布,去进行退火算法 + var repObjectsUpdateEntries []coormq.ChangeObjectRedundancyEntry + repMostNodeIDs := t.summaryRepObjectBlockNodes(repObjects) + solu := t.startAnnealing(allNodeInfos, readerNodeIDs, annealingObject{ + totalBlockCount: 1, + minBlockCnt: 1, + pinnedAt: repMostNodeIDs, + blocks: nil, + }) + for _, obj := range repObjects { + repObjectsUpdateEntries = append(repObjectsUpdateEntries, t.makePlansForRepObject(solu, obj, pinPlans)) + } + + // 对于ec对象,则每个对象单独进行退火算法 + var ecObjectsUpdateEntries []coormq.ChangeObjectRedundancyEntry + for _, obj := range ecObjects { + ecRed := obj.Object.Redundancy.(*cdssdk.ECRedundancy) + solu := t.startAnnealing(allNodeInfos, readerNodeIDs, annealingObject{ + totalBlockCount: ecRed.N, + minBlockCnt: ecRed.K, + pinnedAt: obj.PinnedAt, + blocks: obj.Blocks, + }) + ecObjectsUpdateEntries = append(ecObjectsUpdateEntries, t.makePlansForECObject(allNodeInfos, solu, obj, &planBld)) + } + + wg := sync.WaitGroup{} + + // 执行pin操作 + var anyPinErr error + for nodeID, pin := range pinPlans { + wg.Add(1) + go func(nodeID cdssdk.NodeID, pin *[]string) { + defer wg.Done() + + agtCli, err := stgglb.AgentMQPool.Acquire(nodeID) + if err != nil { + log.Warnf("new agent client: %s", err.Error()) + return + } + defer stgglb.AgentMQPool.Release(agtCli) + + _, err = agtCli.PinObject(agtmq.ReqPinObject(*pin, false)) + if err != nil { + log.Warnf("pinning object: %s", err.Error()) + anyPinErr = err + } + }(nodeID, pin) + } + + // 执行IO计划 + var ioSwRets map[string]any + var ioSwErr error + wg.Add(1) + go func() { + defer wg.Done() + plan, err := planBld.Build() if err != nil { - log.WithField("PackageID", obj).Warn(err.Error()) - continue + ioSwErr = fmt.Errorf("building io switch plan: %w", err) + return } - if entry != nil { - changeRedEntries = append(changeRedEntries, *entry) + + exec, err := plans.Execute(*plan) + if err != nil { + ioSwErr = fmt.Errorf("executing io switch plan: %w", err) + return + } + ret, err := exec.Wait() + if err != nil { + ioSwErr = fmt.Errorf("waiting io switch plan: %w", err) + return } + ioSwRets = ret.ResultValues + }() + + wg.Wait() + + if anyPinErr != nil { + log.Warn(anyPinErr.Error()) + return + } + if ioSwErr != nil { + log.Warn(ioSwErr.Error()) + return } - if len(changeRedEntries) > 0 { - _, err = coorCli.ChangeObjectRedundancy(coormq.ReqChangeObjectRedundancy(changeRedEntries)) + // 根据按照方案进行调整的结果,填充更新元数据的命令 + for i := range ecObjectsUpdateEntries { + t.populateECObjectEntry(&ecObjectsUpdateEntries[i], ecObjects[i], ioSwRets) + } + + finalEntries := append(repObjectsUpdateEntries, ecObjectsUpdateEntries...) + if len(finalEntries) > 0 { + _, err = coorCli.ChangeObjectRedundancy(coormq.ReqChangeObjectRedundancy(finalEntries)) if err != nil { log.Warnf("changing object redundancy: %s", err.Error()) return @@ -88,15 +202,52 @@ func (t *CleanPinned) Execute(execCtx ExecuteContext) { } } -type doingContext struct { - execCtx ExecuteContext - readerNodeIDs []cdssdk.NodeID // 近期可能访问此对象的节点 - nodesSortedByReader map[cdssdk.NodeID][]nodeDist // 拥有数据的节点到每个可能访问对象的节点按距离排序 - nodeInfos map[cdssdk.NodeID]*cdssdk.Node +// 统计每个对象块所在的节点,选出块最多的不超过nodeCnt个节点 +func (t *CleanPinned) summaryRepObjectBlockNodes(objs []stgmod.ObjectDetail) []cdssdk.NodeID { + type nodeBlocks struct { + NodeID cdssdk.NodeID + Count int + } + + nodeBlocksMap := make(map[cdssdk.NodeID]*nodeBlocks) + for _, obj := range objs { + shouldUseEC := obj.Object.Size > config.Cfg().ECFileSizeThreshold + if _, ok := obj.Object.Redundancy.(*cdssdk.RepRedundancy); ok && !shouldUseEC { + for _, block := range obj.Blocks { + if _, ok := nodeBlocksMap[block.NodeID]; !ok { + nodeBlocksMap[block.NodeID] = &nodeBlocks{ + NodeID: block.NodeID, + Count: 0, + } + } + nodeBlocksMap[block.NodeID].Count++ + } + } + } + + nodes := lo.Values(nodeBlocksMap) + sort2.Sort(nodes, func(left *nodeBlocks, right *nodeBlocks) int { + return right.Count - left.Count + }) + + // 只选出块数超过一半的节点,但要保证至少有两个节点 + for i := 2; i < len(nodes); i++ { + if nodes[i].Count < len(objs)/2 { + nodes = nodes[:i] + break + } + } + + return lo.Map(nodes, func(item *nodeBlocks, idx int) cdssdk.NodeID { return item.NodeID }) +} + +type annealingState struct { + allNodeInfos map[cdssdk.NodeID]*cdssdk.Node // 所有节点的信息 + readerNodeIDs []cdssdk.NodeID // 近期可能访问此对象的节点 + nodesSortedByReader map[cdssdk.NodeID][]nodeDist // 拥有数据的节点到每个可能访问对象的节点按距离排序 + object annealingObject // 进行退火的对象 blockList []objectBlock // 排序后的块分布情况 nodeBlockBitmaps map[cdssdk.NodeID]*bitmap.Bitmap64 // 用位图的形式表示每一个节点上有哪些块 - allBlockTypeCount int // object总共被分成了几块 - minBlockTypeCount int // 最少要几块才能恢复出完整的object nodeCombTree combinatorialTree // 节点组合树,用于加速计算容灾度 maxScore float64 // 搜索过程中得到过的最大分数 @@ -127,6 +278,13 @@ type combinatorialTree struct { localNodeIDToNodeID []cdssdk.NodeID } +type annealingObject struct { + totalBlockCount int + minBlockCnt int + pinnedAt []cdssdk.NodeID + blocks []stgmod.ObjectBlock +} + const ( iterActionNone = 0 iterActionSkip = 1 @@ -331,125 +489,84 @@ type combinatorialTreeNode struct { blocksBitmap bitmap.Bitmap64 // 选择了这个中心之后,所有中心一共包含多少种块 } -func (t *CleanPinned) doOne(execCtx ExecuteContext, readerNodeIDs []cdssdk.NodeID, coorCli *coormq.Client, obj stgmod.ObjectDetail) (*coormq.ChangeObjectRedundancyEntry, error) { - if len(obj.PinnedAt) == 0 && len(obj.Blocks) == 0 { - return nil, nil - } +type annealingSolution struct { + blockList []objectBlock // 所有节点的块分布情况 + rmBlocks []bool // 要删除哪些块 +} - ctx := doingContext{ - execCtx: execCtx, +func (t *CleanPinned) startAnnealing(allNodeInfos map[cdssdk.NodeID]*cdssdk.Node, readerNodeIDs []cdssdk.NodeID, object annealingObject) annealingSolution { + state := &annealingState{ + allNodeInfos: allNodeInfos, readerNodeIDs: readerNodeIDs, nodesSortedByReader: make(map[cdssdk.NodeID][]nodeDist), - nodeInfos: make(map[cdssdk.NodeID]*cdssdk.Node), + object: object, nodeBlockBitmaps: make(map[cdssdk.NodeID]*bitmap.Bitmap64), } - err := t.getNodeInfos(&ctx, coorCli, obj) - if err != nil { - return nil, err - } - - err = t.makeBlockList(&ctx, obj) - if err != nil { - return nil, err + t.initBlockList(state) + if state.blockList == nil { + return annealingSolution{} } - if ctx.blockList == nil { - return nil, nil - } - - t.makeNodeBlockBitmap(&ctx) + t.initNodeBlockBitmap(state) - t.sortNodeByReaderDistance(&ctx) + t.sortNodeByReaderDistance(state) - ctx.rmBlocks = make([]bool, len(ctx.blockList)) - ctx.inversedIndex = -1 - ctx.nodeCombTree = newCombinatorialTree(ctx.nodeBlockBitmaps) + state.rmBlocks = make([]bool, len(state.blockList)) + state.inversedIndex = -1 + state.nodeCombTree = newCombinatorialTree(state.nodeBlockBitmaps) - ctx.lastScore = t.calcScore(&ctx) - ctx.maxScore = ctx.lastScore - ctx.maxScoreRmBlocks = mylo.ArrayClone(ctx.rmBlocks) + state.lastScore = t.calcScore(state) + state.maxScore = state.lastScore + state.maxScoreRmBlocks = lo2.ArrayClone(state.rmBlocks) // 模拟退火算法的温度 - curTemp := ctx.lastScore + curTemp := state.lastScore // 结束温度 finalTemp := curTemp * 0.2 // 冷却率 coolingRate := 0.95 for curTemp > finalTemp { - ctx.inversedIndex = rand.Intn(len(ctx.rmBlocks)) - block := ctx.blockList[ctx.inversedIndex] - ctx.rmBlocks[ctx.inversedIndex] = !ctx.rmBlocks[ctx.inversedIndex] - ctx.nodeBlockBitmaps[block.NodeID].Set(block.Index, !ctx.rmBlocks[ctx.inversedIndex]) - ctx.nodeCombTree.UpdateBitmap(block.NodeID, *ctx.nodeBlockBitmaps[block.NodeID], ctx.minBlockTypeCount) + state.inversedIndex = rand.Intn(len(state.rmBlocks)) + block := state.blockList[state.inversedIndex] + state.rmBlocks[state.inversedIndex] = !state.rmBlocks[state.inversedIndex] + state.nodeBlockBitmaps[block.NodeID].Set(block.Index, !state.rmBlocks[state.inversedIndex]) + state.nodeCombTree.UpdateBitmap(block.NodeID, *state.nodeBlockBitmaps[block.NodeID], state.object.minBlockCnt) - curScore := t.calcScore(&ctx) + curScore := t.calcScore(state) - dScore := curScore - ctx.lastScore + dScore := curScore - state.lastScore // 如果新方案比旧方案得分低,且没有要求强制接受新方案,那么就将变化改回去 if curScore == 0 || (dScore < 0 && !t.alwaysAccept(curTemp, dScore, coolingRate)) { - ctx.rmBlocks[ctx.inversedIndex] = !ctx.rmBlocks[ctx.inversedIndex] - ctx.nodeBlockBitmaps[block.NodeID].Set(block.Index, !ctx.rmBlocks[ctx.inversedIndex]) - ctx.nodeCombTree.UpdateBitmap(block.NodeID, *ctx.nodeBlockBitmaps[block.NodeID], ctx.minBlockTypeCount) - fmt.Printf("\n") + state.rmBlocks[state.inversedIndex] = !state.rmBlocks[state.inversedIndex] + state.nodeBlockBitmaps[block.NodeID].Set(block.Index, !state.rmBlocks[state.inversedIndex]) + state.nodeCombTree.UpdateBitmap(block.NodeID, *state.nodeBlockBitmaps[block.NodeID], state.object.minBlockCnt) + // fmt.Printf("\n") } else { - fmt.Printf(" accept!\n") - ctx.lastScore = curScore - if ctx.maxScore < curScore { - ctx.maxScore = ctx.lastScore - ctx.maxScoreRmBlocks = mylo.ArrayClone(ctx.rmBlocks) + // fmt.Printf(" accept!\n") + state.lastScore = curScore + if state.maxScore < curScore { + state.maxScore = state.lastScore + state.maxScoreRmBlocks = lo2.ArrayClone(state.rmBlocks) } } curTemp *= coolingRate } - return t.applySolution(ctx, obj) -} - -func (t *CleanPinned) getNodeInfos(ctx *doingContext, coorCli *coormq.Client, obj stgmod.ObjectDetail) error { - var nodeIDs []cdssdk.NodeID - for _, b := range obj.Blocks { - nodeIDs = append(nodeIDs, b.NodeID) + return annealingSolution{ + blockList: state.blockList, + rmBlocks: state.maxScoreRmBlocks, } - nodeIDs = append(nodeIDs, obj.PinnedAt...) - - nodeIDs = append(nodeIDs, ctx.readerNodeIDs...) - - getNode, err := coorCli.GetNodes(coormq.NewGetNodes(lo.Uniq(nodeIDs))) - if err != nil { - return fmt.Errorf("requesting to coordinator: %w", err) - } - - for _, n := range getNode.Nodes { - ctx.nodeInfos[n.NodeID] = &n - } - - return nil } -func (t *CleanPinned) makeBlockList(ctx *doingContext, obj stgmod.ObjectDetail) error { - blockCnt := 1 - minBlockCnt := 1 - switch red := obj.Object.Redundancy.(type) { - case *cdssdk.NoneRedundancy: - return nil - case *cdssdk.RepRedundancy: - blockCnt = 1 - minBlockCnt = 1 - case *cdssdk.ECRedundancy: - blockCnt = red.N - minBlockCnt = red.K - default: - return fmt.Errorf("unknow redundancy type: %v", myref.TypeOfValue(obj.Object.Redundancy)) - } - +func (t *CleanPinned) initBlockList(ctx *annealingState) { blocksMap := make(map[cdssdk.NodeID][]objectBlock) // 先生成所有的影子块 - for _, pinned := range obj.PinnedAt { - blocks := make([]objectBlock, 0, blockCnt) - for i := 0; i < blockCnt; i++ { + for _, pinned := range ctx.object.pinnedAt { + blocks := make([]objectBlock, 0, ctx.object.totalBlockCount) + for i := 0; i < ctx.object.totalBlockCount; i++ { blocks = append(blocks, objectBlock{ Index: i, NodeID: pinned, @@ -460,7 +577,7 @@ func (t *CleanPinned) makeBlockList(ctx *doingContext, obj stgmod.ObjectDetail) } // 再填充实际块 - for _, b := range obj.Blocks { + for _, b := range ctx.object.blocks { blocks := blocksMap[b.NodeID] has := false @@ -490,7 +607,7 @@ func (t *CleanPinned) makeBlockList(ctx *doingContext, obj stgmod.ObjectDetail) for _, bs := range blocksMap { sortedBlocks = append(sortedBlocks, bs...) } - sortedBlocks = mysort.Sort(sortedBlocks, func(left objectBlock, right objectBlock) int { + sortedBlocks = sort2.Sort(sortedBlocks, func(left objectBlock, right objectBlock) int { d := left.NodeID - right.NodeID if d != 0 { return int(d) @@ -499,36 +616,33 @@ func (t *CleanPinned) makeBlockList(ctx *doingContext, obj stgmod.ObjectDetail) return left.Index - right.Index }) - ctx.allBlockTypeCount = blockCnt - ctx.minBlockTypeCount = minBlockCnt ctx.blockList = sortedBlocks - return nil } -func (t *CleanPinned) makeNodeBlockBitmap(ctx *doingContext) { - for _, b := range ctx.blockList { - mp, ok := ctx.nodeBlockBitmaps[b.NodeID] +func (t *CleanPinned) initNodeBlockBitmap(state *annealingState) { + for _, b := range state.blockList { + mp, ok := state.nodeBlockBitmaps[b.NodeID] if !ok { nb := bitmap.Bitmap64(0) mp = &nb - ctx.nodeBlockBitmaps[b.NodeID] = mp + state.nodeBlockBitmaps[b.NodeID] = mp } mp.Set(b.Index, true) } } -func (t *CleanPinned) sortNodeByReaderDistance(ctx *doingContext) { - for _, r := range ctx.readerNodeIDs { +func (t *CleanPinned) sortNodeByReaderDistance(state *annealingState) { + for _, r := range state.readerNodeIDs { var nodeDists []nodeDist - for n := range ctx.nodeBlockBitmaps { + for n := range state.nodeBlockBitmaps { if r == n { // 同节点时距离视为0.1 nodeDists = append(nodeDists, nodeDist{ NodeID: n, Distance: consts.NodeDistanceSameNode, }) - } else if ctx.nodeInfos[r].LocationID == ctx.nodeInfos[n].LocationID { + } else if state.allNodeInfos[r].LocationID == state.allNodeInfos[n].LocationID { // 同地区时距离视为1 nodeDists = append(nodeDists, nodeDist{ NodeID: n, @@ -543,14 +657,14 @@ func (t *CleanPinned) sortNodeByReaderDistance(ctx *doingContext) { } } - ctx.nodesSortedByReader[r] = mysort.Sort(nodeDists, func(left, right nodeDist) int { return mysort.Cmp(left.Distance, right.Distance) }) + state.nodesSortedByReader[r] = sort2.Sort(nodeDists, func(left, right nodeDist) int { return sort2.Cmp(left.Distance, right.Distance) }) } } -func (t *CleanPinned) calcScore(ctx *doingContext) float64 { - dt := t.calcDisasterTolerance(ctx) - ac := t.calcMinAccessCost(ctx) - sc := t.calcSpaceCost(ctx) +func (t *CleanPinned) calcScore(state *annealingState) float64 { + dt := t.calcDisasterTolerance(state) + ac := t.calcMinAccessCost(state) + sc := t.calcSpaceCost(state) dtSc := 1.0 if dt < 1 { @@ -566,42 +680,43 @@ func (t *CleanPinned) calcScore(ctx *doingContext) float64 { newSc = dtSc / (sc * ac) } - fmt.Printf("solu: %v, cur: %v, dt: %v, ac: %v, sc: %v ", ctx.rmBlocks, newSc, dt, ac, sc) + // fmt.Printf("solu: %v, cur: %v, dt: %v, ac: %v, sc: %v ", ctx.rmBlocks, newSc, dt, ac, sc) return newSc } // 计算容灾度 -func (t *CleanPinned) calcDisasterTolerance(ctx *doingContext) float64 { - if ctx.inversedIndex != -1 { - node := ctx.blockList[ctx.inversedIndex] - ctx.nodeCombTree.UpdateBitmap(node.NodeID, *ctx.nodeBlockBitmaps[node.NodeID], ctx.minBlockTypeCount) +func (t *CleanPinned) calcDisasterTolerance(state *annealingState) float64 { + if state.inversedIndex != -1 { + node := state.blockList[state.inversedIndex] + state.nodeCombTree.UpdateBitmap(node.NodeID, *state.nodeBlockBitmaps[node.NodeID], state.object.minBlockCnt) } - return float64(len(ctx.nodeBlockBitmaps) - ctx.nodeCombTree.FindKBlocksMaxDepth(ctx.minBlockTypeCount)) + return float64(len(state.nodeBlockBitmaps) - state.nodeCombTree.FindKBlocksMaxDepth(state.object.minBlockCnt)) } // 计算最小访问数据的代价 -func (t *CleanPinned) calcMinAccessCost(ctx *doingContext) float64 { +func (t *CleanPinned) calcMinAccessCost(state *annealingState) float64 { cost := math.MaxFloat64 - for _, reader := range ctx.readerNodeIDs { - tarNodes := ctx.nodesSortedByReader[reader] + for _, reader := range state.readerNodeIDs { + tarNodes := state.nodesSortedByReader[reader] gotBlocks := bitmap.Bitmap64(0) thisCost := 0.0 for _, tar := range tarNodes { - tarNodeMp := ctx.nodeBlockBitmaps[tar.NodeID] + tarNodeMp := state.nodeBlockBitmaps[tar.NodeID] // 只需要从目的节点上获得缺少的块 curWeigth := gotBlocks.Weight() // 下面的if会在拿到k个块之后跳出循环,所以or多了块也没关系 gotBlocks.Or(tarNodeMp) - willGetBlocks := mymath.Min(gotBlocks.Weight()-curWeigth, ctx.minBlockTypeCount-curWeigth) + // 但是算读取块的消耗时,不能多算,最多算读了k个块的消耗 + willGetBlocks := mymath.Min(gotBlocks.Weight()-curWeigth, state.object.minBlockCnt-curWeigth) thisCost += float64(willGetBlocks) * float64(tar.Distance) - if gotBlocks.Weight() >= ctx.minBlockTypeCount { + if gotBlocks.Weight() >= state.object.minBlockCnt { break } } - if gotBlocks.Weight() >= ctx.minBlockTypeCount { + if gotBlocks.Weight() >= state.object.minBlockCnt { cost = math.Min(cost, thisCost) } } @@ -610,7 +725,7 @@ func (t *CleanPinned) calcMinAccessCost(ctx *doingContext) float64 { } // 计算冗余度 -func (t *CleanPinned) calcSpaceCost(ctx *doingContext) float64 { +func (t *CleanPinned) calcSpaceCost(ctx *annealingState) float64 { blockCount := 0 for i, b := range ctx.blockList { if ctx.rmBlocks[i] { @@ -625,26 +740,58 @@ func (t *CleanPinned) calcSpaceCost(ctx *doingContext) float64 { } } // 所有算力中心上拥有的块的总数 / 一个对象被分成了几个块 - return float64(blockCount) / float64(ctx.minBlockTypeCount) + return float64(blockCount) / float64(ctx.object.minBlockCnt) } // 如果新方案得分比旧方案小,那么在一定概率内也接受新方案 func (t *CleanPinned) alwaysAccept(curTemp float64, dScore float64, coolingRate float64) bool { v := math.Exp(dScore / curTemp / coolingRate) - fmt.Printf(" -- chance: %v, temp: %v", v, curTemp) + // fmt.Printf(" -- chance: %v, temp: %v", v, curTemp) return v > rand.Float64() } -func (t *CleanPinned) applySolution(ctx doingContext, obj stgmod.ObjectDetail) (*coormq.ChangeObjectRedundancyEntry, error) { +func (t *CleanPinned) makePlansForRepObject(solu annealingSolution, obj stgmod.ObjectDetail, pinPlans map[cdssdk.NodeID]*[]string) coormq.ChangeObjectRedundancyEntry { + entry := coormq.ChangeObjectRedundancyEntry{ + ObjectID: obj.Object.ObjectID, + Redundancy: obj.Object.Redundancy, + } + + for i, f := range solu.rmBlocks { + hasCache := lo.ContainsBy(obj.Blocks, func(b stgmod.ObjectBlock) bool { return b.NodeID == solu.blockList[i].NodeID }) || + lo.ContainsBy(obj.PinnedAt, func(n cdssdk.NodeID) bool { return n == solu.blockList[i].NodeID }) + willRm := f + + if !willRm { + // 如果对象在退火后要保留副本的节点没有副本,则需要在这个节点创建副本 + if !hasCache { + pinPlan, ok := pinPlans[solu.blockList[i].NodeID] + if !ok { + pinPlan = &[]string{} + pinPlans[solu.blockList[i].NodeID] = pinPlan + } + *pinPlan = append(*pinPlan, obj.Object.FileHash) + } + entry.Blocks = append(entry.Blocks, stgmod.ObjectBlock{ + ObjectID: obj.Object.ObjectID, + Index: solu.blockList[i].Index, + NodeID: solu.blockList[i].NodeID, + FileHash: obj.Object.FileHash, + }) + } + } + + return entry +} + +func (t *CleanPinned) makePlansForECObject(allNodeInfos map[cdssdk.NodeID]*cdssdk.Node, solu annealingSolution, obj stgmod.ObjectDetail, planBld *plans.PlanBuilder) coormq.ChangeObjectRedundancyEntry { entry := coormq.ChangeObjectRedundancyEntry{ ObjectID: obj.Object.ObjectID, Redundancy: obj.Object.Redundancy, } - fmt.Printf("final solu: %v, score: %v\n", ctx.maxScoreRmBlocks, ctx.maxScore) reconstrct := make(map[cdssdk.NodeID]*[]int) - for i, f := range ctx.maxScoreRmBlocks { - block := ctx.blockList[i] + for i, f := range solu.rmBlocks { + block := solu.blockList[i] if !f { entry.Blocks = append(entry.Blocks, stgmod.ObjectBlock{ ObjectID: obj.Object.ObjectID, @@ -666,64 +813,30 @@ func (t *CleanPinned) applySolution(ctx doingContext, obj stgmod.ObjectDetail) ( } } - bld := reqbuilder.NewBuilder() - for id := range reconstrct { - bld.IPFS().Buzy(id) - } - - mutex, err := bld.MutexLock(ctx.execCtx.Args.DistLock) - if err != nil { - return nil, fmt.Errorf("acquiring distlock: %w", err) - } - defer mutex.Unlock() - - if ecRed, ok := obj.Object.Redundancy.(*cdssdk.ECRedundancy); ok { - for id, idxs := range reconstrct { - bld := plans.NewPlanBuilder() - agt := bld.AtAgent(*ctx.nodeInfos[id]) - - strs := agt.IPFSRead(obj.Object.FileHash).ChunkedSplit(ecRed.ChunkSize, ecRed.K, true) - ss := agt.ECReconstructAny(*ecRed, lo.Range(ecRed.K), *idxs, strs.Streams...) - for i, s := range ss.Streams { - s.IPFSWrite(fmt.Sprintf("%d", (*idxs)[i])) - } - - plan, err := bld.Build() - if err != nil { - return nil, fmt.Errorf("building io switch plan: %w", err) - } - - exec, err := plans.Execute(*plan) - if err != nil { - return nil, fmt.Errorf("executing io switch plan: %w", err) - } - ret, err := exec.Wait() - if err != nil { - return nil, fmt.Errorf("executing io switch plan: %w", err) - } + ecRed := obj.Object.Redundancy.(*cdssdk.ECRedundancy) - for k, v := range ret.ResultValues { - idx, err := strconv.ParseInt(k, 10, 32) - if err != nil { - return nil, fmt.Errorf("parsing plan result: %w", err) - } + for id, idxs := range reconstrct { + agt := planBld.AtAgent(*allNodeInfos[id]) - for i := range entry.Blocks { - if entry.Blocks[i].NodeID == id && entry.Blocks[i].Index == int(idx) { - entry.Blocks[i].FileHash = v.(string) - } - } - } - - } - } else if _, ok := obj.Object.Redundancy.(*cdssdk.RepRedundancy); ok { - // rep模式不分块,所以每一个Block的FileHash就是完整文件的FileHash - for i := range entry.Blocks { - entry.Blocks[i].FileHash = obj.Object.FileHash + strs := agt.IPFSRead(obj.Object.FileHash).ChunkedSplit(ecRed.ChunkSize, ecRed.K, true) + ss := agt.ECReconstructAny(*ecRed, lo.Range(ecRed.K), *idxs, strs.Streams...) + for i, s := range ss.Streams { + s.IPFSWrite(fmt.Sprintf("%d.%d", obj.Object.ObjectID, (*idxs)[i])) } } + return entry +} - return &entry, nil +func (t *CleanPinned) populateECObjectEntry(entry *coormq.ChangeObjectRedundancyEntry, obj stgmod.ObjectDetail, ioRets map[string]any) { + for i := range entry.Blocks { + if entry.Blocks[i].FileHash != "" { + continue + } + + key := fmt.Sprintf("%d.%d", obj.Object.ObjectID, entry.Blocks[i].Index) + // 不应该出现key不存在的情况 + entry.Blocks[i].FileHash = ioRets[key].(string) + } } func init() {