You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

collector.go 4.6 kB


  1. package connectivity
  2. import (
  3. "math/rand"
  4. "sync"
  5. "time"
  6. "gitlink.org.cn/cloudream/common/pkgs/logger"
  7. cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
  8. stgglb "gitlink.org.cn/cloudream/storage/common/globals"
  9. coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
  10. )
  11. type Connectivity struct {
  12. ToNodeID cdssdk.NodeID
  13. Delay *time.Duration
  14. TestTime time.Time
  15. }
  16. type Collector struct {
  17. cfg *Config
  18. onCollected func(collector *Collector)
  19. collectNow chan any
  20. close chan any
  21. connectivities map[cdssdk.NodeID]Connectivity
  22. lock *sync.RWMutex
  23. }
  24. func NewCollector(cfg *Config, onCollected func(collector *Collector)) Collector {
  25. rpt := Collector{
  26. cfg: cfg,
  27. collectNow: make(chan any),
  28. close: make(chan any),
  29. connectivities: make(map[cdssdk.NodeID]Connectivity),
  30. lock: &sync.RWMutex{},
  31. onCollected: onCollected,
  32. }
  33. go rpt.serve()
  34. return rpt
  35. }
  36. func (r *Collector) Get(nodeID cdssdk.NodeID) *Connectivity {
  37. r.lock.RLock()
  38. defer r.lock.RUnlock()
  39. con, ok := r.connectivities[nodeID]
  40. if ok {
  41. return &con
  42. }
  43. return nil
  44. }
  45. func (r *Collector) GetAll() map[cdssdk.NodeID]Connectivity {
  46. r.lock.RLock()
  47. defer r.lock.RUnlock()
  48. ret := make(map[cdssdk.NodeID]Connectivity)
  49. for k, v := range r.connectivities {
  50. ret[k] = v
  51. }
  52. return ret
  53. }
  54. // 启动一次收集
  55. func (r *Collector) CollecNow() {
  56. select {
  57. case r.collectNow <- nil:
  58. default:
  59. }
  60. }
  61. // 就地进行收集,会阻塞当前线程
  62. func (r *Collector) CollectInPlace() {
  63. r.testing()
  64. }
  65. func (r *Collector) Close() {
  66. select {
  67. case r.close <- nil:
  68. default:
  69. }
  70. }
  71. func (r *Collector) serve() {
  72. log := logger.WithType[Collector]("")
  73. log.Info("start connectivity reporter")
  74. // 为了防止同时启动的节点会集中进行Ping,所以第一次上报间隔为0-TestInterval秒之间随机
  75. startup := true
  76. firstReportDelay := time.Duration(float64(r.cfg.TestInterval) * float64(time.Second) * rand.Float64())
  77. ticker := time.NewTicker(firstReportDelay)
  78. loop:
  79. for {
  80. select {
  81. case <-ticker.C:
  82. r.testing()
  83. if startup {
  84. startup = false
  85. ticker.Reset(time.Duration(r.cfg.TestInterval) * time.Second)
  86. }
  87. case <-r.collectNow:
  88. r.testing()
  89. case <-r.close:
  90. ticker.Stop()
  91. break loop
  92. }
  93. }
  94. log.Info("stop connectivity reporter")
  95. }
  96. func (r *Collector) testing() {
  97. log := logger.WithType[Collector]("")
  98. log.Debug("do testing")
  99. coorCli, err := stgglb.CoordinatorMQPool.Acquire()
  100. if err != nil {
  101. return
  102. }
  103. defer stgglb.CoordinatorMQPool.Release(coorCli)
  104. getNodeResp, err := coorCli.GetNodes(coormq.NewGetNodes(nil))
  105. if err != nil {
  106. return
  107. }
  108. wg := sync.WaitGroup{}
  109. cons := make([]Connectivity, len(getNodeResp.Nodes))
  110. for i, node := range getNodeResp.Nodes {
  111. tmpIdx := i
  112. tmpNode := node
  113. wg.Add(1)
  114. go func() {
  115. defer wg.Done()
  116. cons[tmpIdx] = r.ping(tmpNode)
  117. }()
  118. }
  119. wg.Wait()
  120. r.lock.Lock()
  121. // 删除所有node的记录,然后重建,避免node数量变化时导致残余数据
  122. r.connectivities = make(map[cdssdk.NodeID]Connectivity)
  123. for _, con := range cons {
  124. r.connectivities[con.ToNodeID] = con
  125. }
  126. r.lock.Unlock()
  127. if r.onCollected != nil {
  128. r.onCollected(r)
  129. }
  130. }
  131. func (r *Collector) ping(node cdssdk.Node) Connectivity {
  132. log := logger.WithType[Collector]("").WithField("NodeID", node.NodeID)
  133. ip := node.ExternalIP
  134. port := node.ExternalGRPCPort
  135. if node.LocationID == stgglb.Local.LocationID {
  136. ip = node.LocalIP
  137. port = node.LocalGRPCPort
  138. }
  139. agtCli, err := stgglb.AgentRPCPool.Acquire(ip, port)
  140. if err != nil {
  141. log.Warnf("new agent %v:%v rpc client: %w", ip, port, err)
  142. return Connectivity{
  143. ToNodeID: node.NodeID,
  144. Delay: nil,
  145. TestTime: time.Now(),
  146. }
  147. }
  148. defer stgglb.AgentRPCPool.Release(agtCli)
  149. // 第一次ping保证网络连接建立成功
  150. err = agtCli.Ping()
  151. if err != nil {
  152. log.Warnf("pre ping: %v", err)
  153. return Connectivity{
  154. ToNodeID: node.NodeID,
  155. Delay: nil,
  156. TestTime: time.Now(),
  157. }
  158. }
  159. // 后几次ping计算延迟
  160. var avgDelay time.Duration
  161. for i := 0; i < 3; i++ {
  162. start := time.Now()
  163. err = agtCli.Ping()
  164. if err != nil {
  165. log.Warnf("ping: %v", err)
  166. return Connectivity{
  167. ToNodeID: node.NodeID,
  168. Delay: nil,
  169. TestTime: time.Now(),
  170. }
  171. }
  172. // 此时间差为一个来回的时间,因此单程延迟需要除以2
  173. delay := time.Since(start) / 2
  174. avgDelay += delay
  175. // 每次ping之间间隔1秒
  176. <-time.After(time.Second)
  177. }
  178. delay := avgDelay / 3
  179. return Connectivity{
  180. ToNodeID: node.NodeID,
  181. Delay: &delay,
  182. TestTime: time.Now(),
  183. }
  184. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。