Browse Source

选择下载节点时考虑节点延迟

gitlink
Sydonian 1 year ago
parent
commit
3f3d1c86f7
9 changed files with 40 additions and 13 deletions
  1. +1
    -1
      agent/main.go
  2. +1
    -1
      client/main.go
  3. +2
    -1
      common/assets/confs/agent.config.json
  4. +2
    -1
      common/assets/confs/client.config.json
  5. +4
    -3
      common/consts/consts.go
  6. +1
    -2
      common/pkgs/connectivity/collector.go
  7. +2
    -0
      common/pkgs/downloader/config.go
  8. +8
    -2
      common/pkgs/downloader/downloader.go
  9. +19
    -2
      common/pkgs/downloader/iterator.go

+ 1
- 1
agent/main.go View File

@@ -94,7 +94,7 @@ func main() {

sw := ioswitch.NewSwitch()

dlder := downloader.NewDownloader(config.Cfg().Downloader)
dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol)

taskMgr := task.NewManager(distlock, &sw, &conCol, &dlder)



+ 1
- 1
client/main.go View File

@@ -85,7 +85,7 @@ func main() {

taskMgr := task.NewManager(distlockSvc, &conCol)

dlder := downloader.NewDownloader(config.Cfg().Downloader)
dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol)

svc, err := services.NewService(distlockSvc, &taskMgr, &dlder)
if err != nil {


+ 2
- 1
common/assets/confs/agent.config.json View File

@@ -37,6 +37,7 @@
"testInterval": 300
},
"downloader": {
"maxStripCacheCount": 100
"maxStripCacheCount": 100,
"highLatencyNode": 35
}
}

+ 2
- 1
common/assets/confs/client.config.json View File

@@ -30,6 +30,7 @@
"testInterval": 300
},
"downloader": {
"maxStripCacheCount": 100
"maxStripCacheCount": 100,
"highLatencyNode": 35
}
}

+ 4
- 3
common/consts/consts.go View File

@@ -11,7 +11,8 @@ const (
)

const (
NodeDistanceSameNode = 0.1
NodeDistanceSameLocation = 1
NodeDistanceOther = 5
NodeDistanceSameNode = 0.1
NodeDistanceSameLocation = 1
NodeDistanceOther = 5
NodeDistanceHighLatencyNode = 10
)

+ 1
- 2
common/pkgs/connectivity/collector.go View File

@@ -215,8 +215,7 @@ func (r *Collector) ping(node cdssdk.Node) Connectivity {
}
}

// 此时间差为一个来回的时间,因此单程延迟需要除以2
delay := time.Since(start) / 2
delay := time.Since(start)
avgDelay += delay

// 每次ping之间间隔1秒


+ 2
- 0
common/pkgs/downloader/config.go View File

@@ -3,4 +3,6 @@ package downloader
type Config struct {
// EC模式的Object的条带缓存数量
MaxStripCacheCount int `json:"maxStripCacheCount"`
// 当到下载节点的延迟高于这个值时,该节点在评估时会有更高的分数惩罚,单位:ms
HighLatencyNodeMs float64 `json:"highLatencyNodeMs"`
}

+ 8
- 2
common/pkgs/downloader/downloader.go View File

@@ -2,13 +2,15 @@ package downloader

import (
"fmt"
"io"

lru "github.com/hashicorp/golang-lru/v2"
"gitlink.org.cn/cloudream/common/pkgs/iterator"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/connectivity"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
"io"
)

const (
@@ -36,9 +38,11 @@ type Downloading struct {

type Downloader struct {
strips *StripCache
conn *connectivity.Collector
cfg Config
}

func NewDownloader(cfg Config) Downloader {
func NewDownloader(cfg Config, conn *connectivity.Collector) Downloader {
if cfg.MaxStripCacheCount == 0 {
cfg.MaxStripCacheCount = DefaultMaxStripCacheCount
}
@@ -46,6 +50,8 @@ func NewDownloader(cfg Config) Downloader {
ch, _ := lru.New[ECStripKey, ObjectECStrip](cfg.MaxStripCacheCount)
return Downloader{
strips: ch,
conn: conn,
cfg: cfg,
}
}



+ 19
- 2
common/pkgs/downloader/iterator.go View File

@@ -5,11 +5,13 @@ import (
"io"
"math"
"reflect"
"time"

"github.com/samber/lo"

"gitlink.org.cn/cloudream/common/pkgs/bitmap"
"gitlink.org.cn/cloudream/common/pkgs/ipfs"
"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"

"gitlink.org.cn/cloudream/common/utils/io2"
@@ -25,8 +27,6 @@ import (
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
)

var errNoDirectReadBlock = fmt.Errorf("no direct read block")

type DownloadNodeInfo struct {
Node cdssdk.Node
ObjectPinned bool
@@ -175,6 +175,7 @@ func (iter *DownloadObjectIterator) downloadNoneOrRepObject(obj downloadReqeust2
bsc, blocks := iter.getMinReadingBlockSolution(allNodes, 1)
osc, node := iter.getMinReadingObjectSolution(allNodes, 1)
if bsc < osc {
logger.Debugf("downloading object from node %v(%v)", blocks[0].Node.Name, blocks[0].Node.NodeID)

return NewIPFSReaderWithRange(blocks[0].Node, blocks[0].Block.FileHash, ipfs.ReadOption{
Offset: obj.Raw.Offset,
@@ -187,6 +188,7 @@ func (iter *DownloadObjectIterator) downloadNoneOrRepObject(obj downloadReqeust2
return nil, fmt.Errorf("no node has this object")
}

logger.Debugf("downloading object from node %v(%v)", node.Name, node.NodeID)
return NewIPFSReaderWithRange(*node, obj.Detail.Object.FileHash, ipfs.ReadOption{
Offset: obj.Raw.Offset,
Length: obj.Raw.Length,
@@ -203,6 +205,15 @@ func (iter *DownloadObjectIterator) downloadECObject(req downloadReqeust2, ecRed
osc, node := iter.getMinReadingObjectSolution(allNodes, ecRed.K)

if bsc < osc {
var logStrs []any = []any{"downloading ec object from blocks: "}
for i, b := range blocks {
if i > 0 {
logStrs = append(logStrs, ", ")
}
logStrs = append(logStrs, fmt.Sprintf("%v: %v(%v)", b.Block.Index, b.Node.Name, b.Node.NodeID))
}
logger.Debug(logStrs...)

var fileStrs []*IPFSReader
for _, b := range blocks {
str := NewIPFSReader(b.Node, b.Block.FileHash)
@@ -312,6 +323,7 @@ func (iter *DownloadObjectIterator) downloadECObject(req downloadReqeust2, ecRed
return nil, fmt.Errorf("no enough blocks to reconstruct the file, want %d, get only %d", ecRed.K, len(blocks))
}

logger.Debugf("downloading ec object from node %v(%v)", node.Name, node.NodeID)
return NewIPFSReaderWithRange(*node, req.Detail.Object.FileHash, ipfs.ReadOption{
Offset: req.Raw.Offset,
Length: req.Raw.Length,
@@ -419,5 +431,10 @@ func (iter *DownloadObjectIterator) getNodeDistance(node cdssdk.Node) float64 {
return consts.NodeDistanceSameLocation
}

c := iter.downloader.conn.Get(node.NodeID)
if c == nil || c.Delay == nil || *c.Delay > time.Duration(float64(time.Millisecond)*iter.downloader.cfg.HighLatencyNodeMs) {
return consts.NodeDistanceHighLatencyNode
}

return consts.NodeDistanceOther
}

Loading…
Cancel
Save