diff --git a/agent/main.go b/agent/main.go index 25c8eee..a9e49a7 100644 --- a/agent/main.go +++ b/agent/main.go @@ -94,7 +94,7 @@ func main() { sw := ioswitch.NewSwitch() - dlder := downloader.NewDownloader(config.Cfg().Downloader) + dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol) taskMgr := task.NewManager(distlock, &sw, &conCol, &dlder) diff --git a/client/main.go b/client/main.go index 4b471ac..47502ac 100644 --- a/client/main.go +++ b/client/main.go @@ -85,7 +85,7 @@ func main() { taskMgr := task.NewManager(distlockSvc, &conCol) - dlder := downloader.NewDownloader(config.Cfg().Downloader) + dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol) svc, err := services.NewService(distlockSvc, &taskMgr, &dlder) if err != nil { diff --git a/common/assets/confs/agent.config.json b/common/assets/confs/agent.config.json index 61c323f..e347b41 100644 --- a/common/assets/confs/agent.config.json +++ b/common/assets/confs/agent.config.json @@ -37,6 +37,7 @@ "testInterval": 300 }, "downloader": { - "maxStripCacheCount": 100 + "maxStripCacheCount": 100, + "highLatencyNode": 35 } } \ No newline at end of file diff --git a/common/assets/confs/client.config.json b/common/assets/confs/client.config.json index df81039..a0b6a12 100644 --- a/common/assets/confs/client.config.json +++ b/common/assets/confs/client.config.json @@ -30,6 +30,7 @@ "testInterval": 300 }, "downloader": { - "maxStripCacheCount": 100 + "maxStripCacheCount": 100, + "highLatencyNode": 35 } } \ No newline at end of file diff --git a/common/consts/consts.go b/common/consts/consts.go index b7f849d..1fb4563 100644 --- a/common/consts/consts.go +++ b/common/consts/consts.go @@ -11,7 +11,8 @@ const ( ) const ( - NodeDistanceSameNode = 0.1 - NodeDistanceSameLocation = 1 - NodeDistanceOther = 5 + NodeDistanceSameNode = 0.1 + NodeDistanceSameLocation = 1 + NodeDistanceOther = 5 + NodeDistanceHighLatencyNode = 10 ) diff --git a/common/pkgs/connectivity/collector.go b/common/pkgs/connectivity/collector.go index 289e547..b54ab9e 100644 --- a/common/pkgs/connectivity/collector.go +++ b/common/pkgs/connectivity/collector.go @@ -215,8 +215,7 @@ func (r *Collector) ping(node cdssdk.Node) Connectivity { } } - // 此时间差为一个来回的时间,因此单程延迟需要除以2 - delay := time.Since(start) / 2 + delay := time.Since(start) avgDelay += delay // 每次ping之间间隔1秒 diff --git a/common/pkgs/downloader/config.go b/common/pkgs/downloader/config.go index c89ef6a..103bb9a 100644 --- a/common/pkgs/downloader/config.go +++ b/common/pkgs/downloader/config.go @@ -3,4 +3,6 @@ package downloader type Config struct { // EC模式的Object的条带缓存数量 MaxStripCacheCount int `json:"maxStripCacheCount"` + // 当到下载节点的延迟高于这个值时,该节点在评估时会有更高的分数惩罚,单位:ms + HighLatencyNodeMs float64 `json:"highLatencyNodeMs"` } diff --git a/common/pkgs/downloader/downloader.go b/common/pkgs/downloader/downloader.go index d81e8f9..fd8356f 100644 --- a/common/pkgs/downloader/downloader.go +++ b/common/pkgs/downloader/downloader.go @@ -2,13 +2,15 @@ package downloader import ( "fmt" + "io" + lru "github.com/hashicorp/golang-lru/v2" "gitlink.org.cn/cloudream/common/pkgs/iterator" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" stgglb "gitlink.org.cn/cloudream/storage/common/globals" stgmod "gitlink.org.cn/cloudream/storage/common/models" + "gitlink.org.cn/cloudream/storage/common/pkgs/connectivity" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" - "io" ) const ( @@ -36,9 +38,11 @@ type Downloading struct { type Downloader struct { strips *StripCache + conn *connectivity.Collector + cfg Config } -func NewDownloader(cfg Config) Downloader { +func NewDownloader(cfg Config, conn *connectivity.Collector) Downloader { if cfg.MaxStripCacheCount == 0 { cfg.MaxStripCacheCount = DefaultMaxStripCacheCount } @@ -46,6 +50,8 @@ func NewDownloader(cfg Config) Downloader { ch, _ := lru.New[ECStripKey, ObjectECStrip](cfg.MaxStripCacheCount) return Downloader{ strips: ch, + conn: conn, + cfg: cfg, } } diff --git a/common/pkgs/downloader/iterator.go b/common/pkgs/downloader/iterator.go index 54e495b..4af2ea7 100644 --- a/common/pkgs/downloader/iterator.go +++ b/common/pkgs/downloader/iterator.go @@ -5,11 +5,13 @@ import ( "io" "math" "reflect" + "time" "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/bitmap" "gitlink.org.cn/cloudream/common/pkgs/ipfs" + "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/io2" @@ -25,8 +27,6 @@ import ( coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) -var errNoDirectReadBlock = fmt.Errorf("no direct read block") - type DownloadNodeInfo struct { Node cdssdk.Node ObjectPinned bool @@ -175,6 +175,7 @@ func (iter *DownloadObjectIterator) downloadNoneOrRepObject(obj downloadReqeust2 bsc, blocks := iter.getMinReadingBlockSolution(allNodes, 1) osc, node := iter.getMinReadingObjectSolution(allNodes, 1) if bsc < osc { + logger.Debugf("downloading object from node %v(%v)", blocks[0].Node.Name, blocks[0].Node.NodeID) return NewIPFSReaderWithRange(blocks[0].Node, blocks[0].Block.FileHash, ipfs.ReadOption{ Offset: obj.Raw.Offset, @@ -187,6 +188,7 @@ func (iter *DownloadObjectIterator) downloadNoneOrRepObject(obj downloadReqeust2 return nil, fmt.Errorf("no node has this object") } + logger.Debugf("downloading object from node %v(%v)", node.Name, node.NodeID) return NewIPFSReaderWithRange(*node, obj.Detail.Object.FileHash, ipfs.ReadOption{ Offset: obj.Raw.Offset, Length: obj.Raw.Length, @@ -203,6 +205,15 @@ func (iter *DownloadObjectIterator) downloadECObject(req downloadReqeust2, ecRed osc, node := iter.getMinReadingObjectSolution(allNodes, ecRed.K) if bsc < osc { + var logStrs []any = []any{"downloading ec object from blocks: "} + for i, b := range blocks { + if i > 0 { + logStrs = append(logStrs, ", ") + } + logStrs = append(logStrs, fmt.Sprintf("%v: %v(%v)", b.Block.Index, b.Node.Name, b.Node.NodeID)) + } + logger.Debug(logStrs...) + var fileStrs []*IPFSReader for _, b := range blocks { str := NewIPFSReader(b.Node, b.Block.FileHash) @@ -312,6 +323,7 @@ func (iter *DownloadObjectIterator) downloadECObject(req downloadReqeust2, ecRed return nil, fmt.Errorf("no enough blocks to reconstruct the file, want %d, get only %d", ecRed.K, len(blocks)) } + logger.Debugf("downloading ec object from node %v(%v)", node.Name, node.NodeID) return NewIPFSReaderWithRange(*node, req.Detail.Object.FileHash, ipfs.ReadOption{ Offset: req.Raw.Offset, Length: req.Raw.Length, @@ -419,5 +431,10 @@ func (iter *DownloadObjectIterator) getNodeDistance(node cdssdk.Node) float64 { return consts.NodeDistanceSameLocation } + c := iter.downloader.conn.Get(node.NodeID) + if c == nil || c.Delay == nil || *c.Delay > time.Duration(float64(time.Millisecond)*iter.downloader.cfg.HighLatencyNodeMs) { + return consts.NodeDistanceHighLatencyNode + } + return consts.NodeDistanceOther }