diff --git a/client/main.go b/client/main.go index 7020ec8..4b471ac 100644 --- a/client/main.go +++ b/client/main.go @@ -3,10 +3,12 @@ package main import ( "fmt" "os" + "time" _ "google.golang.org/grpc/balancer/grpclb" "gitlink.org.cn/cloudream/common/pkgs/logger" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/client/internal/cmdline" "gitlink.org.cn/cloudream/storage/client/internal/config" "gitlink.org.cn/cloudream/storage/client/internal/services" @@ -15,6 +17,7 @@ import ( "gitlink.org.cn/cloudream/storage/common/pkgs/connectivity" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock" "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" + coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) func main() { @@ -39,9 +42,39 @@ func main() { stgglb.InitIPFSPool(config.Cfg().IPFS) } - // 启动网络连通性检测,并就地检测一次 - conCol := connectivity.NewCollector(&config.Cfg().Connectivity, nil) - conCol.CollectInPlace() + var conCol connectivity.Collector + if config.Cfg().Local.NodeID != nil { + //如果client与某个node处于同一台机器,则使用这个node的连通性信息 + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + logger.Warnf("acquire coordinator mq failed, err: %s", err.Error()) + os.Exit(1) + } + getCons, err := coorCli.GetNodeConnectivities(coormq.ReqGetNodeConnectivities([]cdssdk.NodeID{*config.Cfg().Local.NodeID})) + if err != nil { + logger.Warnf("get node connectivities failed, err: %s", err.Error()) + os.Exit(1) + } + consMap := make(map[cdssdk.NodeID]connectivity.Connectivity) + for _, con := range getCons.Connectivities { + var delay *time.Duration + if con.Delay != nil { + d := time.Duration(*con.Delay * float32(time.Millisecond)) + delay = &d + } + consMap[con.FromNodeID] = connectivity.Connectivity{ + ToNodeID: con.ToNodeID, + Delay: delay, + } + } + conCol = connectivity.NewCollectorWithInitData(&config.Cfg().Connectivity, nil, consMap) + logger.Info("use local node connectivities") + + } else { + // 否则需要就地收集连通性信息 + conCol = connectivity.NewCollector(&config.Cfg().Connectivity, nil) + conCol.CollectInPlace() + } distlockSvc, err := distlock.NewService(&config.Cfg().DistLock) if err != nil { diff --git a/common/pkgs/connectivity/collector.go b/common/pkgs/connectivity/collector.go index 6234cd9..289e547 100644 --- a/common/pkgs/connectivity/collector.go +++ b/common/pkgs/connectivity/collector.go @@ -39,6 +39,19 @@ func NewCollector(cfg *Config, onCollected func(collector *Collector)) Collector return rpt } +func NewCollectorWithInitData(cfg *Config, onCollected func(collector *Collector), initData map[cdssdk.NodeID]Connectivity) Collector { + rpt := Collector{ + cfg: cfg, + collectNow: make(chan any), + close: make(chan any), + connectivities: initData, + lock: &sync.RWMutex{}, + onCollected: onCollected, + } + go rpt.serve() + return rpt +} + func (r *Collector) Get(nodeID cdssdk.NodeID) *Connectivity { r.lock.RLock() defer r.lock.RUnlock() diff --git a/common/pkgs/db/node_connectivity.go b/common/pkgs/db/node_connectivity.go index 8461563..1b57ad7 100644 --- a/common/pkgs/db/node_connectivity.go +++ b/common/pkgs/db/node_connectivity.go @@ -14,14 +14,14 @@ func (db *DB) NodeConnectivity() *NodeConnectivityDB { return &NodeConnectivityDB{DB: db} } -func (db *NodeConnectivityDB) BatchGetByFromNode(ctx SQLContext, nodeIDs []cdssdk.NodeID) ([]model.NodeConnectivity, error) { - if len(nodeIDs) == 0 { +func (db *NodeConnectivityDB) BatchGetByFromNode(ctx SQLContext, fromNodeIDs []cdssdk.NodeID) ([]model.NodeConnectivity, error) { + if len(fromNodeIDs) == 0 { return nil, nil } var ret []model.NodeConnectivity - sql, args, err := sqlx.In("select * from NodeConnectivity where NodeID in (?)", nodeIDs) + sql, args, err := sqlx.In("select * from NodeConnectivity where FromNodeID in (?)", fromNodeIDs) if err != nil { return nil, err }