diff --git a/agent/internal/task/storage_load_package.go b/agent/internal/task/storage_load_package.go index a2654a2..2f88b0b 100644 --- a/agent/internal/task/storage_load_package.go +++ b/agent/internal/task/storage_load_package.go @@ -3,17 +3,23 @@ package task import ( "fmt" "io" + "math" "os" "path/filepath" "time" + "github.com/samber/lo" + "gitlink.org.cn/cloudream/common/pkgs/bitmap" "gitlink.org.cn/cloudream/common/pkgs/ipfs" "gitlink.org.cn/cloudream/common/pkgs/task" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" myio "gitlink.org.cn/cloudream/common/utils/io" myref "gitlink.org.cn/cloudream/common/utils/reflect" + mysort "gitlink.org.cn/cloudream/common/utils/sort" + "gitlink.org.cn/cloudream/storage/common/consts" stgglb "gitlink.org.cn/cloudream/storage/common/globals" stgmod "gitlink.org.cn/cloudream/storage/common/models" + "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" "gitlink.org.cn/cloudream/storage/common/pkgs/ec" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" @@ -87,7 +93,7 @@ func (t *StorageLoadPackage) do(task *task.Task[TaskContext], ctx TaskContext) e defer mutex.Unlock() for _, obj := range getObjectDetails.Objects { - err := t.downloadOne(ipfsCli, outputDirPath, obj) + err := t.downloadOne(coorCli, ipfsCli, outputDirPath, obj) if err != nil { return err } @@ -101,7 +107,7 @@ func (t *StorageLoadPackage) do(task *task.Task[TaskContext], ctx TaskContext) e return err } -func (t *StorageLoadPackage) downloadOne(ipfsCli *ipfs.PoolClient, dir string, obj stgmod.ObjectDetail) error { +func (t *StorageLoadPackage) downloadOne(coorCli *coormq.Client, ipfsCli *ipfs.PoolClient, dir string, obj stgmod.ObjectDetail) error { var file io.ReadCloser switch red := obj.Object.Redundancy.(type) { @@ -120,7 +126,7 @@ func (t *StorageLoadPackage) downloadOne(ipfsCli *ipfs.PoolClient, dir string, o file = reader case *cdssdk.ECRedundancy: - reader, pinnedBlocks, err := t.downloadECObject(ipfsCli, obj, red) + reader, pinnedBlocks, err := t.downloadECObject(coorCli, ipfsCli, obj, red) if err != nil { return fmt.Errorf("downloading ec object: %w", err) } @@ -153,14 +159,12 @@ func (t *StorageLoadPackage) downloadOne(ipfsCli *ipfs.PoolClient, dir string, o } func (t *StorageLoadPackage) downloadNoneOrRepObject(ipfsCli *ipfs.PoolClient, obj stgmod.ObjectDetail) (io.ReadCloser, error) { - if len(obj.Blocks) == 0 { + if len(obj.Blocks) == 0 && len(obj.PinnedAt) == 0 { return nil, fmt.Errorf("no node has this object") } - // 异步pin,不管实际有没有成功 - go func() { - ipfsCli.Pin(obj.Object.FileHash) - }() + // 不管实际有没有成功 + ipfsCli.Pin(obj.Object.FileHash) file, err := ipfsCli.OpenRead(obj.Object.FileHash) if err != nil { @@ -170,62 +174,179 @@ func (t *StorageLoadPackage) downloadNoneOrRepObject(ipfsCli *ipfs.PoolClient, o return file, nil } -func (t *StorageLoadPackage) downloadECObject(ipfsCli *ipfs.PoolClient, obj stgmod.ObjectDetail, ecRed *cdssdk.ECRedundancy) (io.ReadCloser, []stgmod.ObjectBlock, error) { - var chosenBlocks []stgmod.GrouppedObjectBlock - grpBlocks := obj.GroupBlocks() - for i := range grpBlocks { - if len(chosenBlocks) == ecRed.K { - break +func (t *StorageLoadPackage) downloadECObject(coorCli *coormq.Client, ipfsCli *ipfs.PoolClient, obj stgmod.ObjectDetail, ecRed *cdssdk.ECRedundancy) (io.ReadCloser, []stgmod.ObjectBlock, error) { + allNodes, err := t.sortDownloadNodes(coorCli, obj) + if err != nil { + return nil, nil, err + } + bsc, blocks := t.getMinReadingBlockSolution(allNodes, ecRed.K) + osc, _ := t.getMinReadingObjectSolution(allNodes, ecRed.K) + if bsc < osc { + var fileStrs []io.ReadCloser + + rs, err := ec.NewRs(ecRed.K, ecRed.N, ecRed.ChunkSize) + if err != nil { + return nil, nil, fmt.Errorf("new rs: %w", err) + } + + for i := range blocks { + // 不管实际有没有成功 + ipfsCli.Pin(blocks[i].Block.FileHash) + + str, err := ipfsCli.OpenRead(blocks[i].Block.FileHash) + if err != nil { + for i -= 1; i >= 0; i-- { + fileStrs[i].Close() + } + return nil, nil, fmt.Errorf("donwloading file: %w", err) + } + + fileStrs = append(fileStrs, str) + } + + fileReaders, filesCloser := myio.ToReaders(fileStrs) + + var indexes []int + var pinnedBlocks []stgmod.ObjectBlock + for _, b := range blocks { + indexes = append(indexes, b.Block.Index) + pinnedBlocks = append(pinnedBlocks, stgmod.ObjectBlock{ + ObjectID: b.Block.ObjectID, + Index: b.Block.Index, + NodeID: *stgglb.Local.NodeID, + FileHash: b.Block.FileHash, + }) } - chosenBlocks = append(chosenBlocks, grpBlocks[i]) + outputs, outputsCloser := myio.ToReaders(rs.ReconstructData(fileReaders, indexes)) + return myio.AfterReadClosed(myio.Length(myio.ChunkedJoin(outputs, int(ecRed.ChunkSize)), obj.Object.Size), func(c io.ReadCloser) { + filesCloser() + outputsCloser() + }), pinnedBlocks, nil } - if len(chosenBlocks) < ecRed.K { - return nil, nil, fmt.Errorf("no enough blocks to reconstruct the file, want %d, get only %d", ecRed.K, len(chosenBlocks)) + // bsc >= osc,如果osc是MaxFloat64,那么bsc也一定是,也就意味着没有足够块来恢复文件 + if osc == math.MaxFloat64 { + return nil, nil, fmt.Errorf("no enough blocks to reconstruct the file, want %d, get only %d", ecRed.K, len(blocks)) } - var fileStrs []io.ReadCloser + // 如果是直接读取的文件,那么就不需要Pin文件块 + str, err := ipfsCli.OpenRead(obj.Object.FileHash) + return str, nil, err +} + +type downloadNodeInfo struct { + Node model.Node + ObjectPinned bool + Blocks []stgmod.ObjectBlock + Distance float64 +} - rs, err := ec.NewRs(ecRed.K, ecRed.N, ecRed.ChunkSize) +func (t *StorageLoadPackage) sortDownloadNodes(coorCli *coormq.Client, obj stgmod.ObjectDetail) ([]*downloadNodeInfo, error) { + var nodeIDs []cdssdk.NodeID + for _, id := range obj.PinnedAt { + if !lo.Contains(nodeIDs, id) { + nodeIDs = append(nodeIDs, id) + } + } + for _, b := range obj.Blocks { + if !lo.Contains(nodeIDs, b.NodeID) { + nodeIDs = append(nodeIDs, b.NodeID) + } + } + + getNodes, err := coorCli.GetNodes(coormq.NewGetNodes(nodeIDs)) if err != nil { - return nil, nil, fmt.Errorf("new rs: %w", err) + return nil, fmt.Errorf("getting nodes: %w", err) } - for i := range chosenBlocks { - // 异步pin,不管实际有没有成功 - go func() { - ipfsCli.Pin(chosenBlocks[i].FileHash) - }() + downloadNodeMap := make(map[cdssdk.NodeID]*downloadNodeInfo) + for _, id := range obj.PinnedAt { + node, ok := downloadNodeMap[id] + if !ok { + mod := *getNodes.GetNode(id) + node = &downloadNodeInfo{ + Node: mod, + ObjectPinned: true, + Distance: t.getNodeDistance(mod), + } + downloadNodeMap[id] = node + } - str, err := ipfsCli.OpenRead(chosenBlocks[i].FileHash) - if err != nil { - for i -= 1; i >= 0; i-- { - fileStrs[i].Close() + node.ObjectPinned = true + } + + for _, b := range obj.Blocks { + node, ok := downloadNodeMap[b.NodeID] + if !ok { + mod := *getNodes.GetNode(b.NodeID) + node = &downloadNodeInfo{ + Node: mod, + Distance: t.getNodeDistance(mod), } - return nil, nil, fmt.Errorf("donwloading file: %w", err) + downloadNodeMap[b.NodeID] = node } - fileStrs = append(fileStrs, str) + node.Blocks = append(node.Blocks, b) } - fileReaders, filesCloser := myio.ToReaders(fileStrs) + return mysort.Sort(lo.Values(downloadNodeMap), func(left, right *downloadNodeInfo) int { + return mysort.Cmp(left.Distance, right.Distance) + }), nil +} + +type downloadBlock struct { + Node model.Node + Block stgmod.ObjectBlock +} + +func (t *StorageLoadPackage) getMinReadingBlockSolution(sortedNodes []*downloadNodeInfo, k int) (float64, []downloadBlock) { + gotBlocksMap := bitmap.Bitmap64(0) + var gotBlocks []downloadBlock + dist := float64(0.0) + for _, n := range sortedNodes { + for _, b := range n.Blocks { + if !gotBlocksMap.Get(b.Index) { + gotBlocks = append(gotBlocks, downloadBlock{ + Node: n.Node, + Block: b, + }) + gotBlocksMap.Set(b.Index, true) + dist += n.Distance + } + + if len(gotBlocks) >= k { + return dist, gotBlocks + } + } + } + + return math.MaxFloat64, gotBlocks +} + +func (t *StorageLoadPackage) getMinReadingObjectSolution(sortedNodes []*downloadNodeInfo, k int) (float64, *model.Node) { + dist := math.MaxFloat64 + var downloadNode *model.Node + for _, n := range sortedNodes { + if n.ObjectPinned && float64(k)*n.Distance < dist { + dist = float64(k) * n.Distance + downloadNode = &n.Node + } + } + + return dist, downloadNode +} + +func (t *StorageLoadPackage) getNodeDistance(node model.Node) float64 { + if stgglb.Local.NodeID != nil { + if node.NodeID == *stgglb.Local.NodeID { + return consts.NodeDistanceSameNode + } + } - var indexes []int - var pinnedBlocks []stgmod.ObjectBlock - for _, b := range chosenBlocks { - indexes = append(indexes, b.Index) - pinnedBlocks = append(pinnedBlocks, stgmod.ObjectBlock{ - ObjectID: b.ObjectID, - Index: b.Index, - NodeID: *stgglb.Local.NodeID, - FileHash: b.FileHash, - }) + if node.LocationID == stgglb.Local.LocationID { + return consts.NodeDistanceSameLocation } - outputs, outputsCloser := myio.ToReaders(rs.ReconstructData(fileReaders, indexes)) - return myio.AfterReadClosed(myio.Length(myio.ChunkedJoin(outputs, int(ecRed.ChunkSize)), obj.Object.Size), func(c io.ReadCloser) { - filesCloser() - outputsCloser() - }), pinnedBlocks, nil + return consts.NodeDistanceOther } diff --git a/common/consts/consts.go b/common/consts/consts.go index 0e5c658..b7f849d 100644 --- a/common/consts/consts.go +++ b/common/consts/consts.go @@ -9,3 +9,9 @@ const ( NodeStateNormal = "Normal" NodeStateUnavailable = "Unavailable" ) + +const ( + NodeDistanceSameNode = 0.1 + NodeDistanceSameLocation = 1 + NodeDistanceOther = 5 +) diff --git a/common/pkgs/iterator/download_object_iterator.go b/common/pkgs/iterator/download_object_iterator.go index 5cb1ef4..a219f76 100644 --- a/common/pkgs/iterator/download_object_iterator.go +++ b/common/pkgs/iterator/download_object_iterator.go @@ -3,16 +3,20 @@ package iterator import ( "fmt" "io" - "math/rand" + "math" "reflect" "github.com/samber/lo" + "gitlink.org.cn/cloudream/common/pkgs/bitmap" "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" myio "gitlink.org.cn/cloudream/common/utils/io" + mysort "gitlink.org.cn/cloudream/common/utils/sort" + "gitlink.org.cn/cloudream/storage/common/consts" stgglb "gitlink.org.cn/cloudream/storage/common/globals" + stgmod "gitlink.org.cn/cloudream/storage/common/models" stgmodels "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock" @@ -28,8 +32,10 @@ type IterDownloadingObject struct { } type DownloadNodeInfo struct { - Node model.Node - IsSameLocation bool + Node model.Node + ObjectPinned bool + Blocks []stgmod.ObjectBlock + Distance float64 } type DownloadContext struct { @@ -114,136 +120,192 @@ func (i *DownloadObjectIterator) Close() { } } -// chooseDownloadNode 选择一个下载节点 -// 1. 从与当前客户端相同地域的节点中随机选一个 -// 2. 没有用的话从所有节点中随机选一个 -func (i *DownloadObjectIterator) chooseDownloadNode(entries []DownloadNodeInfo) DownloadNodeInfo { - sameLocationEntries := lo.Filter(entries, func(e DownloadNodeInfo, i int) bool { return e.IsSameLocation }) - if len(sameLocationEntries) > 0 { - return sameLocationEntries[rand.Intn(len(sameLocationEntries))] +func (iter *DownloadObjectIterator) downloadNoneOrRepObject(coorCli *coormq.Client, ctx *DownloadContext, obj stgmodels.ObjectDetail) (io.ReadCloser, error) { + allNodes, err := iter.sortDownloadNodes(coorCli, ctx, obj) + if err != nil { + return nil, err + } + bsc, blocks := iter.getMinReadingBlockSolution(allNodes, 1) + osc, node := iter.getMinReadingObjectSolution(allNodes, 1) + if bsc < osc { + return downloadFile(ctx, blocks[0].Node, blocks[0].Block.FileHash) } - return entries[rand.Intn(len(entries))] -} - -func (iter *DownloadObjectIterator) downloadNoneOrRepObject(coorCli *coormq.Client, ctx *DownloadContext, obj stgmodels.ObjectDetail) (io.ReadCloser, error) { - if len(obj.Blocks) == 0 { + // bsc >= osc,如果osc是MaxFloat64,那么bsc也一定是,也就意味着没有足够块来恢复文件 + if osc == math.MaxFloat64 { return nil, fmt.Errorf("no node has this object") } - //采取直接读,优先选内网节点 - var chosenNodes []DownloadNodeInfo + return downloadFile(ctx, *node, obj.Object.FileHash) +} + +func (iter *DownloadObjectIterator) downloadECObject(coorCli *coormq.Client, ctx *DownloadContext, obj stgmodels.ObjectDetail, ecRed *cdssdk.ECRedundancy) (io.ReadCloser, error) { + allNodes, err := iter.sortDownloadNodes(coorCli, ctx, obj) + if err != nil { + return nil, err + } + bsc, blocks := iter.getMinReadingBlockSolution(allNodes, ecRed.K) + osc, node := iter.getMinReadingObjectSolution(allNodes, ecRed.K) + if bsc < osc { + var fileStrs []io.ReadCloser - grpBlocks := obj.GroupBlocks() - for _, grp := range grpBlocks { - getNodesResp, err := coorCli.GetNodes(coormq.NewGetNodes(grp.NodeIDs)) + rs, err := ec.NewRs(ecRed.K, ecRed.N, ecRed.ChunkSize) if err != nil { - continue + return nil, fmt.Errorf("new rs: %w", err) } - downloadNodes := lo.Map(getNodesResp.Nodes, func(node model.Node, index int) DownloadNodeInfo { - return DownloadNodeInfo{ - Node: node, - IsSameLocation: node.LocationID == stgglb.Local.LocationID, + for i, b := range blocks { + str, err := downloadFile(ctx, b.Node, b.Block.FileHash) + if err != nil { + for i -= 1; i >= 0; i-- { + fileStrs[i].Close() + } + return nil, fmt.Errorf("donwloading file: %w", err) } - }) - chosenNodes = append(chosenNodes, iter.chooseDownloadNode(downloadNodes)) - } + fileStrs = append(fileStrs, str) + } - var fileStrs []io.ReadCloser + fileReaders, filesCloser := myio.ToReaders(fileStrs) - for i := range grpBlocks { - str, err := downloadFile(ctx, chosenNodes[i], grpBlocks[i].FileHash) - if err != nil { - for i -= 1; i >= 0; i-- { - fileStrs[i].Close() - } - return nil, fmt.Errorf("donwloading file: %w", err) + var indexes []int + for _, b := range blocks { + indexes = append(indexes, b.Block.Index) } - fileStrs = append(fileStrs, str) + outputs, outputsCloser := myio.ToReaders(rs.ReconstructData(fileReaders, indexes)) + return myio.AfterReadClosed(myio.Length(myio.ChunkedJoin(outputs, int(ecRed.ChunkSize)), obj.Object.Size), func(c io.ReadCloser) { + filesCloser() + outputsCloser() + }), nil } - fileReaders, filesCloser := myio.ToReaders(fileStrs) - return myio.AfterReadClosed(myio.Length(myio.Join(fileReaders), obj.Object.Size), func(c io.ReadCloser) { - filesCloser() - }), nil + // bsc >= osc,如果osc是MaxFloat64,那么bsc也一定是,也就意味着没有足够块来恢复文件 + if osc == math.MaxFloat64 { + return nil, fmt.Errorf("no enough blocks to reconstruct the file, want %d, get only %d", ecRed.K, len(blocks)) + } + + return downloadFile(ctx, *node, obj.Object.FileHash) } -func (iter *DownloadObjectIterator) downloadECObject(coorCli *coormq.Client, ctx *DownloadContext, obj stgmodels.ObjectDetail, ecRed *cdssdk.ECRedundancy) (io.ReadCloser, error) { - //采取直接读,优先选内网节点 - var chosenNodes []DownloadNodeInfo - var chosenBlocks []stgmodels.GrouppedObjectBlock - grpBlocks := obj.GroupBlocks() - for i := range grpBlocks { - if len(chosenBlocks) == ecRed.K { - break +func (iter *DownloadObjectIterator) sortDownloadNodes(coorCli *coormq.Client, ctx *DownloadContext, obj stgmodels.ObjectDetail) ([]*DownloadNodeInfo, error) { + var nodeIDs []cdssdk.NodeID + for _, id := range obj.PinnedAt { + if !lo.Contains(nodeIDs, id) { + nodeIDs = append(nodeIDs, id) } - - getNodesResp, err := coorCli.GetNodes(coormq.NewGetNodes(grpBlocks[i].NodeIDs)) - if err != nil { - continue + } + for _, b := range obj.Blocks { + if !lo.Contains(nodeIDs, b.NodeID) { + nodeIDs = append(nodeIDs, b.NodeID) } + } - downloadNodes := lo.Map(getNodesResp.Nodes, func(node model.Node, index int) DownloadNodeInfo { - return DownloadNodeInfo{ - Node: node, - IsSameLocation: node.LocationID == stgglb.Local.LocationID, - } - }) + getNodes, err := coorCli.GetNodes(coormq.NewGetNodes(nodeIDs)) + if err != nil { + return nil, fmt.Errorf("getting nodes: %w", err) + } - chosenBlocks = append(chosenBlocks, grpBlocks[i]) - chosenNodes = append(chosenNodes, iter.chooseDownloadNode(downloadNodes)) + downloadNodeMap := make(map[cdssdk.NodeID]*DownloadNodeInfo) + for _, id := range obj.PinnedAt { + node, ok := downloadNodeMap[id] + if !ok { + mod := *getNodes.GetNode(id) + node = &DownloadNodeInfo{ + Node: mod, + ObjectPinned: true, + Distance: iter.getNodeDistance(mod), + } + downloadNodeMap[id] = node + } + node.ObjectPinned = true } - if len(chosenBlocks) < ecRed.K { - return nil, fmt.Errorf("no enough blocks to reconstruct the file, want %d, get only %d", ecRed.K, len(chosenBlocks)) + for _, b := range obj.Blocks { + node, ok := downloadNodeMap[b.NodeID] + if !ok { + mod := *getNodes.GetNode(b.NodeID) + node = &DownloadNodeInfo{ + Node: mod, + Distance: iter.getNodeDistance(mod), + } + downloadNodeMap[b.NodeID] = node + } + + node.Blocks = append(node.Blocks, b) } - var fileStrs []io.ReadCloser + return mysort.Sort(lo.Values(downloadNodeMap), func(left, right *DownloadNodeInfo) int { + return mysort.Cmp(left.Distance, right.Distance) + }), nil +} - rs, err := ec.NewRs(ecRed.K, ecRed.N, ecRed.ChunkSize) - if err != nil { - return nil, fmt.Errorf("new rs: %w", err) - } +type downloadBlock struct { + Node model.Node + Block stgmod.ObjectBlock +} - for i := range chosenBlocks { - str, err := downloadFile(ctx, chosenNodes[i], chosenBlocks[i].FileHash) - if err != nil { - for i -= 1; i >= 0; i-- { - fileStrs[i].Close() +func (iter *DownloadObjectIterator) getMinReadingBlockSolution(sortedNodes []*DownloadNodeInfo, k int) (float64, []downloadBlock) { + gotBlocksMap := bitmap.Bitmap64(0) + var gotBlocks []downloadBlock + dist := float64(0.0) + for _, n := range sortedNodes { + for _, b := range n.Blocks { + if !gotBlocksMap.Get(b.Index) { + gotBlocks = append(gotBlocks, downloadBlock{ + Node: n.Node, + Block: b, + }) + gotBlocksMap.Set(b.Index, true) + dist += n.Distance + } + + if len(gotBlocks) >= k { + return dist, gotBlocks } - return nil, fmt.Errorf("donwloading file: %w", err) } + } + + return math.MaxFloat64, gotBlocks +} - fileStrs = append(fileStrs, str) +func (iter *DownloadObjectIterator) getMinReadingObjectSolution(sortedNodes []*DownloadNodeInfo, k int) (float64, *model.Node) { + dist := math.MaxFloat64 + var downloadNode *model.Node + for _, n := range sortedNodes { + if n.ObjectPinned && float64(k)*n.Distance < dist { + dist = float64(k) * n.Distance + downloadNode = &n.Node + } } - fileReaders, filesCloser := myio.ToReaders(fileStrs) + return dist, downloadNode +} + +func (iter *DownloadObjectIterator) getNodeDistance(node model.Node) float64 { + if stgglb.Local.NodeID != nil { + if node.NodeID == *stgglb.Local.NodeID { + return consts.NodeDistanceSameNode + } + } - var indexes []int - for _, b := range chosenBlocks { - indexes = append(indexes, b.Index) + if node.LocationID == stgglb.Local.LocationID { + return consts.NodeDistanceSameLocation } - outputs, outputsCloser := myio.ToReaders(rs.ReconstructData(fileReaders, indexes)) - return myio.AfterReadClosed(myio.Length(myio.ChunkedJoin(outputs, int(ecRed.ChunkSize)), obj.Object.Size), func(c io.ReadCloser) { - filesCloser() - outputsCloser() - }), nil + return consts.NodeDistanceOther } -func downloadFile(ctx *DownloadContext, node DownloadNodeInfo, fileHash string) (io.ReadCloser, error) { +func downloadFile(ctx *DownloadContext, node model.Node, fileHash string) (io.ReadCloser, error) { // 如果客户端与节点在同一个地域,则使用内网地址连接节点 - nodeIP := node.Node.ExternalIP - grpcPort := node.Node.ExternalGRPCPort - if node.IsSameLocation { - nodeIP = node.Node.LocalIP - grpcPort = node.Node.LocalGRPCPort + nodeIP := node.ExternalIP + grpcPort := node.ExternalGRPCPort + if node.LocationID == stgglb.Local.LocationID { + nodeIP = node.LocalIP + grpcPort = node.LocalGRPCPort - logger.Infof("client and node %d are at the same location, use local ip", node.Node.NodeID) + logger.Infof("client and node %d are at the same location, use local ip", node.NodeID) } if stgglb.IPFSPool != nil { @@ -257,7 +319,7 @@ func downloadFile(ctx *DownloadContext, node DownloadNodeInfo, fileHash string) logger.Warnf("download from local IPFS failed, so try to download from node %s, err: %s", nodeIP, err.Error()) } - return downloadFromNode(ctx, node.Node.NodeID, nodeIP, grpcPort, fileHash) + return downloadFromNode(ctx, node.NodeID, nodeIP, grpcPort, fileHash) } func downloadFromNode(ctx *DownloadContext, nodeID cdssdk.NodeID, nodeIP string, grpcPort int, fileHash string) (io.ReadCloser, error) { diff --git a/common/pkgs/mq/coordinator/node.go b/common/pkgs/mq/coordinator/node.go index 7285f83..eb307a2 100644 --- a/common/pkgs/mq/coordinator/node.go +++ b/common/pkgs/mq/coordinator/node.go @@ -60,6 +60,15 @@ func NewGetNodesResp(nodes []model.Node) *GetNodesResp { Nodes: nodes, } } +func (r *GetNodesResp) GetNode(id cdssdk.NodeID) *model.Node { + for _, n := range r.Nodes { + if n.NodeID == id { + return &n + } + } + + return nil +} func (client *Client) GetNodes(msg *GetNodes) (*GetNodesResp, error) { return mq.Request(Service.GetNodes, client.rabbitCli, msg) } diff --git a/scanner/internal/event/clean_pinned.go b/scanner/internal/event/clean_pinned.go index fbe15f1..15a9ee1 100644 --- a/scanner/internal/event/clean_pinned.go +++ b/scanner/internal/event/clean_pinned.go @@ -7,12 +7,14 @@ import ( "strconv" "github.com/samber/lo" + "gitlink.org.cn/cloudream/common/pkgs/bitmap" "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" mylo "gitlink.org.cn/cloudream/common/utils/lo" mymath "gitlink.org.cn/cloudream/common/utils/math" myref "gitlink.org.cn/cloudream/common/utils/reflect" mysort "gitlink.org.cn/cloudream/common/utils/sort" + "gitlink.org.cn/cloudream/storage/common/consts" stgglb "gitlink.org.cn/cloudream/storage/common/globals" stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" @@ -92,11 +94,11 @@ type doingContext struct { readerNodeIDs []cdssdk.NodeID // 近期可能访问此对象的节点 nodesSortedByReader map[cdssdk.NodeID][]nodeDist // 拥有数据的节点到每个可能访问对象的节点按距离排序 nodeInfos map[cdssdk.NodeID]*model.Node - blockList []objectBlock // 排序后的块分布情况 - nodeBlockBitmaps map[cdssdk.NodeID]*bitmap // 用位图的形式表示每一个节点上有哪些块 - allBlockTypeCount int // object总共被分成了几块 - minBlockTypeCount int // 最少要几块才能恢复出完整的object - nodeCombTree combinatorialTree // 节点组合树,用于加速计算容灾度 + blockList []objectBlock // 排序后的块分布情况 + nodeBlockBitmaps map[cdssdk.NodeID]*bitmap.Bitmap64 // 用位图的形式表示每一个节点上有哪些块 + allBlockTypeCount int // object总共被分成了几块 + minBlockTypeCount int // 最少要几块才能恢复出完整的object + nodeCombTree combinatorialTree // 节点组合树,用于加速计算容灾度 maxScore float64 // 搜索过程中得到过的最大分数 maxScoreRmBlocks []bool // 最大分数对应的删除方案 @@ -121,7 +123,7 @@ type nodeDist struct { type combinatorialTree struct { nodes []combinatorialTreeNode - blocksMaps map[int]bitmap + blocksMaps map[int]bitmap.Bitmap64 nodeIDToLocalNodeID map[cdssdk.NodeID]int localNodeIDToNodeID []cdssdk.NodeID } @@ -132,9 +134,9 @@ const ( iterActionBreak = 2 ) -func newCombinatorialTree(nodeBlocksMaps map[cdssdk.NodeID]*bitmap) combinatorialTree { +func newCombinatorialTree(nodeBlocksMaps map[cdssdk.NodeID]*bitmap.Bitmap64) combinatorialTree { tree := combinatorialTree{ - blocksMaps: make(map[int]bitmap), + blocksMaps: make(map[int]bitmap.Bitmap64), nodeIDToLocalNodeID: make(map[cdssdk.NodeID]int), } @@ -193,7 +195,7 @@ func (t *combinatorialTree) GetDepth(index int) int { // 更新某一个算力中心节点的块分布位图,同时更新它对应组合树节点的所有子节点。 // 如果更新到某个节点时,已有K个块,那么就不会再更新它的子节点 -func (t *combinatorialTree) UpdateBitmap(nodeID cdssdk.NodeID, mp bitmap, k int) { +func (t *combinatorialTree) UpdateBitmap(nodeID cdssdk.NodeID, mp bitmap.Bitmap64, k int) { t.blocksMaps[t.nodeIDToLocalNodeID[nodeID]] = mp // 首先定义两种遍历树节点时的移动方式: // 1. 竖直移动(深度增加):从一个节点移动到它最左边的子节点。每移动一步,index+1 @@ -327,31 +329,7 @@ func (t *combinatorialTree) itering(index int, parentIndex int, depth int, do fu type combinatorialTreeNode struct { localNodeID int parent *combinatorialTreeNode - blocksBitmap bitmap // 选择了这个中心之后,所有中心一共包含多少种块 -} - -type bitmap uint64 - -func (b *bitmap) Set(index int, val bool) { - if val { - *b |= 1 << index - } else { - *b &= ^(1 << index) - } -} - -func (b *bitmap) Or(other *bitmap) { - *b |= *other -} - -func (b *bitmap) Weight() int { - v := *b - cnt := 0 - for v > 0 { - cnt++ - v &= (v - 1) - } - return cnt + blocksBitmap bitmap.Bitmap64 // 选择了这个中心之后,所有中心一共包含多少种块 } func (t *CleanPinned) doOne(execCtx ExecuteContext, readerNodeIDs []cdssdk.NodeID, coorCli *coormq.Client, obj stgmod.ObjectDetail) (*coormq.ChangeObjectRedundancyEntry, error) { @@ -364,7 +342,7 @@ func (t *CleanPinned) doOne(execCtx ExecuteContext, readerNodeIDs []cdssdk.NodeI readerNodeIDs: readerNodeIDs, nodesSortedByReader: make(map[cdssdk.NodeID][]nodeDist), nodeInfos: make(map[cdssdk.NodeID]*model.Node), - nodeBlockBitmaps: make(map[cdssdk.NodeID]*bitmap), + nodeBlockBitmaps: make(map[cdssdk.NodeID]*bitmap.Bitmap64), } err := t.getNodeInfos(&ctx, coorCli, obj) @@ -532,7 +510,7 @@ func (t *CleanPinned) makeNodeBlockBitmap(ctx *doingContext) { for _, b := range ctx.blockList { mp, ok := ctx.nodeBlockBitmaps[b.NodeID] if !ok { - nb := bitmap(0) + nb := bitmap.Bitmap64(0) mp = &nb ctx.nodeBlockBitmaps[b.NodeID] = mp } @@ -549,19 +527,19 @@ func (t *CleanPinned) sortNodeByReaderDistance(ctx *doingContext) { // 同节点时距离视为0.1 nodeDists = append(nodeDists, nodeDist{ NodeID: n, - Distance: 0.1, + Distance: consts.NodeDistanceSameNode, }) } else if ctx.nodeInfos[r].LocationID == ctx.nodeInfos[n].LocationID { // 同地区时距离视为1 nodeDists = append(nodeDists, nodeDist{ NodeID: n, - Distance: 1, + Distance: consts.NodeDistanceSameLocation, }) } else { // 不同地区时距离视为5 nodeDists = append(nodeDists, nodeDist{ NodeID: n, - Distance: 5, + Distance: consts.NodeDistanceOther, }) } } @@ -607,7 +585,7 @@ func (t *CleanPinned) calcMinAccessCost(ctx *doingContext) float64 { cost := math.MaxFloat64 for _, reader := range ctx.readerNodeIDs { tarNodes := ctx.nodesSortedByReader[reader] - gotBlocks := bitmap(0) + gotBlocks := bitmap.Bitmap64(0) thisCost := 0.0 for _, tar := range tarNodes { diff --git a/scanner/internal/event/clean_pinned_test.go b/scanner/internal/event/clean_pinned_test.go index 6066c2a..0f167f8 100644 --- a/scanner/internal/event/clean_pinned_test.go +++ b/scanner/internal/event/clean_pinned_test.go @@ -4,12 +4,13 @@ import ( "testing" . "github.com/smartystreets/goconvey/convey" + "gitlink.org.cn/cloudream/common/pkgs/bitmap" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" ) -func newTreeTest(nodeBlocksMap []bitmap) combinatorialTree { +func newTreeTest(nodeBlocksMap []bitmap.Bitmap64) combinatorialTree { tree := combinatorialTree{ - blocksMaps: make(map[int]bitmap), + blocksMaps: make(map[int]bitmap.Bitmap64), nodeIDToLocalNodeID: make(map[cdssdk.NodeID]int), } @@ -79,25 +80,25 @@ func Test_iterCombBits(t *testing.T) { func Test_newCombinatorialTree(t *testing.T) { testcases := []struct { title string - nodeBlocks []bitmap + nodeBlocks []bitmap.Bitmap64 expectedTreeNodeLocalIDs []int expectedTreeNodeBitmaps []int }{ { title: "1个节点", - nodeBlocks: []bitmap{1}, + nodeBlocks: []bitmap.Bitmap64{1}, expectedTreeNodeLocalIDs: []int{-1, 0}, expectedTreeNodeBitmaps: []int{0, 1}, }, { title: "2个节点", - nodeBlocks: []bitmap{1, 0}, + nodeBlocks: []bitmap.Bitmap64{1, 0}, expectedTreeNodeLocalIDs: []int{-1, 0, 1, 1}, expectedTreeNodeBitmaps: []int{0, 1, 1, 0}, }, { title: "4个节点", - nodeBlocks: []bitmap{1, 2, 4, 8}, + nodeBlocks: []bitmap.Bitmap64{1, 2, 4, 8}, expectedTreeNodeLocalIDs: []int{-1, 0, 1, 2, 3, 3, 2, 3, 3, 1, 2, 3, 3, 2, 3, 3}, expectedTreeNodeBitmaps: []int{0, 1, 3, 7, 15, 11, 5, 13, 9, 2, 6, 14, 10, 4, 12, 8}, }, @@ -123,71 +124,71 @@ func Test_newCombinatorialTree(t *testing.T) { func Test_UpdateBitmap(t *testing.T) { testcases := []struct { title string - nodeBlocks []bitmap + nodeBlocks []bitmap.Bitmap64 updatedNodeID cdssdk.NodeID - updatedBitmap bitmap + updatedBitmap bitmap.Bitmap64 k int expectedTreeNodeBitmaps []int }{ { title: "4个节点,更新但值不变", - nodeBlocks: []bitmap{1, 2, 4, 8}, + nodeBlocks: []bitmap.Bitmap64{1, 2, 4, 8}, updatedNodeID: cdssdk.NodeID(0), - updatedBitmap: bitmap(1), + updatedBitmap: bitmap.Bitmap64(1), k: 4, expectedTreeNodeBitmaps: []int{0, 1, 3, 7, 15, 11, 5, 13, 9, 2, 6, 14, 10, 4, 12, 8}, }, { title: "4个节点,更新0", - nodeBlocks: []bitmap{1, 2, 4, 8}, + nodeBlocks: []bitmap.Bitmap64{1, 2, 4, 8}, updatedNodeID: cdssdk.NodeID(0), - updatedBitmap: bitmap(2), + updatedBitmap: bitmap.Bitmap64(2), k: 4, expectedTreeNodeBitmaps: []int{0, 2, 2, 6, 14, 10, 6, 14, 10, 2, 6, 14, 10, 4, 12, 8}, }, { title: "4个节点,更新1", - nodeBlocks: []bitmap{1, 2, 4, 8}, + nodeBlocks: []bitmap.Bitmap64{1, 2, 4, 8}, updatedNodeID: cdssdk.NodeID(1), - updatedBitmap: bitmap(1), + updatedBitmap: bitmap.Bitmap64(1), k: 4, expectedTreeNodeBitmaps: []int{0, 1, 1, 5, 13, 9, 5, 13, 9, 1, 5, 13, 9, 4, 12, 8}, }, { title: "4个节点,更新2", - nodeBlocks: []bitmap{1, 2, 4, 8}, + nodeBlocks: []bitmap.Bitmap64{1, 2, 4, 8}, updatedNodeID: cdssdk.NodeID(2), - updatedBitmap: bitmap(1), + updatedBitmap: bitmap.Bitmap64(1), k: 4, expectedTreeNodeBitmaps: []int{0, 1, 3, 3, 11, 11, 1, 9, 9, 2, 3, 11, 10, 1, 9, 8}, }, { title: "4个节点,更新3", - nodeBlocks: []bitmap{1, 2, 4, 8}, + nodeBlocks: []bitmap.Bitmap64{1, 2, 4, 8}, updatedNodeID: cdssdk.NodeID(3), - updatedBitmap: bitmap(1), + updatedBitmap: bitmap.Bitmap64(1), k: 4, expectedTreeNodeBitmaps: []int{0, 1, 3, 7, 7, 3, 5, 5, 1, 2, 6, 7, 3, 4, 5, 1}, }, { title: "4个节点,k<4,更新0,0之前没有k个块,现在拥有", - nodeBlocks: []bitmap{1, 2, 4, 8}, + nodeBlocks: []bitmap.Bitmap64{1, 2, 4, 8}, updatedNodeID: cdssdk.NodeID(0), - updatedBitmap: bitmap(3), + updatedBitmap: bitmap.Bitmap64(3), k: 2, expectedTreeNodeBitmaps: []int{0, 3, 3, 7, 15, 11, 5, 13, 9, 2, 6, 14, 10, 4, 12, 8}, }, { title: "4个节点,k<4,更新0,0之前有k个块,现在没有", - nodeBlocks: []bitmap{3, 4, 0, 0}, + nodeBlocks: []bitmap.Bitmap64{3, 4, 0, 0}, updatedNodeID: cdssdk.NodeID(0), - updatedBitmap: bitmap(0), + updatedBitmap: bitmap.Bitmap64(0), k: 2, expectedTreeNodeBitmaps: []int{0, 0, 4, 4, 4, 4, 0, 0, 0, 4, 4, 4, 4, 0, 0, 0}, }, @@ -211,43 +212,43 @@ func Test_UpdateBitmap(t *testing.T) { func Test_FindKBlocksMaxDepth(t *testing.T) { testcases := []struct { title string - nodeBlocks []bitmap + nodeBlocks []bitmap.Bitmap64 k int expected int }{ { title: "每个节点各有一个块", - nodeBlocks: []bitmap{1, 2, 4, 8}, + nodeBlocks: []bitmap.Bitmap64{1, 2, 4, 8}, k: 2, expected: 2, }, { title: "所有节点加起来块数不足", - nodeBlocks: []bitmap{1, 1, 1, 1}, + nodeBlocks: []bitmap.Bitmap64{1, 1, 1, 1}, k: 2, expected: 4, }, { title: "不同节点有相同块", - nodeBlocks: []bitmap{1, 1, 2, 4}, + nodeBlocks: []bitmap.Bitmap64{1, 1, 2, 4}, k: 2, expected: 3, }, { title: "一个节点就拥有所有块", - nodeBlocks: []bitmap{3, 6, 12, 24}, + nodeBlocks: []bitmap.Bitmap64{3, 6, 12, 24}, k: 2, expected: 1, }, { title: "只有一块,且只在某一个节点1", - nodeBlocks: []bitmap{1, 0}, + nodeBlocks: []bitmap.Bitmap64{1, 0}, k: 1, expected: 2, }, { title: "只有一块,且只在某一个节点2", - nodeBlocks: []bitmap{0, 1}, + nodeBlocks: []bitmap.Bitmap64{0, 1}, k: 1, expected: 2, },