You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

executor.go 3.8 kB

2 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. package plans
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "sync"
  7. "sync/atomic"
  8. "gitlink.org.cn/cloudream/common/pkgs/future"
  9. myio "gitlink.org.cn/cloudream/common/utils/io"
  10. stgglb "gitlink.org.cn/cloudream/storage/common/globals"
  11. "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
  12. agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent"
  13. )
  14. type ExecutorResult struct {
  15. ResultValues map[string]any
  16. }
  17. type Executor struct {
  18. plan ComposedPlan
  19. callback *future.SetValueFuture[ExecutorResult]
  20. mqClis []*agtmq.Client
  21. planTaskIDs []string
  22. }
  23. func Execute(plan ComposedPlan) (*Executor, error) {
  24. executor := Executor{
  25. plan: plan,
  26. callback: future.NewSetValue[ExecutorResult](),
  27. }
  28. var err error
  29. for _, a := range plan.AgentPlans {
  30. var cli *agtmq.Client
  31. cli, err = stgglb.AgentMQPool.Acquire(a.Node.NodeID)
  32. if err != nil {
  33. executor.Close()
  34. return nil, fmt.Errorf("new mq client for %d: %w", a.Node.NodeID, err)
  35. }
  36. executor.mqClis = append(executor.mqClis, cli)
  37. }
  38. for i, a := range plan.AgentPlans {
  39. cli := executor.mqClis[i]
  40. _, err := cli.SetupIOPlan(agtmq.NewSetupIOPlan(a.Plan))
  41. if err != nil {
  42. for i -= 1; i >= 0; i-- {
  43. executor.mqClis[i].CancelIOPlan(agtmq.NewCancelIOPlan(plan.ID))
  44. }
  45. executor.Close()
  46. return nil, fmt.Errorf("setup plan at %d: %w", a.Node.NodeID, err)
  47. }
  48. }
  49. for i, a := range plan.AgentPlans {
  50. cli := executor.mqClis[i]
  51. resp, err := cli.StartIOPlan(agtmq.NewStartIOPlan(a.Plan.ID))
  52. if err != nil {
  53. executor.cancelAll()
  54. executor.Close()
  55. return nil, fmt.Errorf("setup plan at %d: %w", a.Node.NodeID, err)
  56. }
  57. executor.planTaskIDs = append(executor.planTaskIDs, resp.TaskID)
  58. }
  59. go executor.pollResult()
  60. return &executor, nil
  61. }
  62. func (e *Executor) SendStream(info *FromExecutorStream, stream io.Reader) error {
  63. // TODO 根据地域选择IP
  64. agtCli, err := stgglb.AgentRPCPool.Acquire(info.toNode.ExternalIP, info.toNode.ExternalGRPCPort)
  65. if err != nil {
  66. return fmt.Errorf("new agent rpc client: %w", err)
  67. }
  68. defer stgglb.AgentRPCPool.Release(agtCli)
  69. return agtCli.SendStream(e.plan.ID, info.info.ID, stream)
  70. }
  71. func (e *Executor) ReadStream(info *ToExecutorStream) (io.ReadCloser, error) {
  72. // TODO 根据地域选择IP
  73. agtCli, err := stgglb.AgentRPCPool.Acquire(info.fromNode.ExternalIP, info.fromNode.ExternalGRPCPort)
  74. if err != nil {
  75. return nil, fmt.Errorf("new agent rpc client: %w", err)
  76. }
  77. str, err := agtCli.FetchStream(e.plan.ID, info.info.ID)
  78. if err != nil {
  79. return nil, err
  80. }
  81. return myio.AfterReadClosed(str, func(closer io.ReadCloser) {
  82. stgglb.AgentRPCPool.Release(agtCli)
  83. }), nil
  84. }
  85. func (e *Executor) cancelAll() {
  86. for _, cli := range e.mqClis {
  87. cli.CancelIOPlan(agtmq.NewCancelIOPlan(e.plan.ID))
  88. }
  89. }
  90. func (e *Executor) Close() {
  91. for _, c := range e.mqClis {
  92. stgglb.AgentMQPool.Release(c)
  93. }
  94. }
  95. func (e *Executor) pollResult() {
  96. wg := sync.WaitGroup{}
  97. anyErr := atomic.Value{}
  98. anyErr.Store(nil)
  99. rets := make([]*ioswitch.PlanResult, len(e.plan.AgentPlans))
  100. for i, id := range e.planTaskIDs {
  101. idx := i
  102. taskID := id
  103. wg.Add(1)
  104. go func() {
  105. defer wg.Done()
  106. for {
  107. resp, err := e.mqClis[idx].WaitIOPlan(agtmq.NewWaitIOPlan(taskID, 5000))
  108. if err != nil {
  109. anyErr.Store(err)
  110. break
  111. }
  112. if resp.IsComplete {
  113. if resp.Error != "" {
  114. anyErr.Store(errors.New(resp.Error))
  115. } else {
  116. rets[idx] = &resp.Result
  117. }
  118. break
  119. }
  120. if anyErr.Load() != nil {
  121. break
  122. }
  123. }
  124. }()
  125. }
  126. wg.Wait()
  127. err := anyErr.Load().(error)
  128. if err != nil {
  129. e.callback.SetError(err)
  130. return
  131. }
  132. reducedRet := ExecutorResult{
  133. ResultValues: make(map[string]any),
  134. }
  135. for _, ret := range rets {
  136. for k, v := range ret.Values {
  137. reducedRet.ResultValues[k] = v
  138. }
  139. }
  140. e.callback.SetValue(reducedRet)
  141. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。