You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

executor.go 3.9 kB

2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. package plans
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "sync"
  8. "sync/atomic"
  9. "gitlink.org.cn/cloudream/common/pkgs/future"
  10. myio "gitlink.org.cn/cloudream/common/utils/io"
  11. stgglb "gitlink.org.cn/cloudream/storage/common/globals"
  12. "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
  13. agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent"
  14. )
  15. type ExecutorResult struct {
  16. ResultValues map[string]any
  17. }
  18. type Executor struct {
  19. plan ComposedPlan
  20. callback *future.SetValueFuture[ExecutorResult]
  21. mqClis []*agtmq.Client
  22. planTaskIDs []string
  23. }
  24. func Execute(plan ComposedPlan) (*Executor, error) {
  25. executor := Executor{
  26. plan: plan,
  27. callback: future.NewSetValue[ExecutorResult](),
  28. }
  29. var err error
  30. for _, a := range plan.AgentPlans {
  31. var cli *agtmq.Client
  32. cli, err = stgglb.AgentMQPool.Acquire(a.Node.NodeID)
  33. if err != nil {
  34. executor.Close()
  35. return nil, fmt.Errorf("new mq client for %d: %w", a.Node.NodeID, err)
  36. }
  37. executor.mqClis = append(executor.mqClis, cli)
  38. }
  39. for i, a := range plan.AgentPlans {
  40. cli := executor.mqClis[i]
  41. _, err := cli.SetupIOPlan(agtmq.NewSetupIOPlan(a.Plan))
  42. if err != nil {
  43. for i -= 1; i >= 0; i-- {
  44. executor.mqClis[i].CancelIOPlan(agtmq.NewCancelIOPlan(plan.ID))
  45. }
  46. executor.Close()
  47. return nil, fmt.Errorf("setup plan at %d: %w", a.Node.NodeID, err)
  48. }
  49. }
  50. for i, a := range plan.AgentPlans {
  51. cli := executor.mqClis[i]
  52. resp, err := cli.StartIOPlan(agtmq.NewStartIOPlan(a.Plan.ID))
  53. if err != nil {
  54. executor.cancelAll()
  55. executor.Close()
  56. return nil, fmt.Errorf("setup plan at %d: %w", a.Node.NodeID, err)
  57. }
  58. executor.planTaskIDs = append(executor.planTaskIDs, resp.TaskID)
  59. }
  60. go executor.pollResult()
  61. return &executor, nil
  62. }
  63. func (e *Executor) SendStream(info *FromExecutorStream, stream io.Reader) error {
  64. // TODO 根据地域选择IP
  65. agtCli, err := stgglb.AgentRPCPool.Acquire(info.toNode.ExternalIP, info.toNode.ExternalGRPCPort)
  66. if err != nil {
  67. return fmt.Errorf("new agent rpc client: %w", err)
  68. }
  69. defer stgglb.AgentRPCPool.Release(agtCli)
  70. return agtCli.SendStream(e.plan.ID, info.info.ID, stream)
  71. }
  72. func (e *Executor) ReadStream(info *ToExecutorStream) (io.ReadCloser, error) {
  73. // TODO 根据地域选择IP
  74. agtCli, err := stgglb.AgentRPCPool.Acquire(info.fromNode.ExternalIP, info.fromNode.ExternalGRPCPort)
  75. if err != nil {
  76. return nil, fmt.Errorf("new agent rpc client: %w", err)
  77. }
  78. str, err := agtCli.FetchStream(e.plan.ID, info.info.ID)
  79. if err != nil {
  80. return nil, err
  81. }
  82. return myio.AfterReadClosed(str, func(closer io.ReadCloser) {
  83. stgglb.AgentRPCPool.Release(agtCli)
  84. }), nil
  85. }
  86. func (e *Executor) Wait() (ExecutorResult, error) {
  87. return e.callback.WaitValue(context.TODO())
  88. }
  89. func (e *Executor) cancelAll() {
  90. for _, cli := range e.mqClis {
  91. cli.CancelIOPlan(agtmq.NewCancelIOPlan(e.plan.ID))
  92. }
  93. }
  94. func (e *Executor) Close() {
  95. for _, c := range e.mqClis {
  96. stgglb.AgentMQPool.Release(c)
  97. }
  98. }
  99. func (e *Executor) pollResult() {
  100. wg := sync.WaitGroup{}
  101. var anyErr error
  102. var done atomic.Bool
  103. rets := make([]*ioswitch.PlanResult, len(e.plan.AgentPlans))
  104. for i, id := range e.planTaskIDs {
  105. idx := i
  106. taskID := id
  107. wg.Add(1)
  108. go func() {
  109. defer wg.Done()
  110. for {
  111. resp, err := e.mqClis[idx].WaitIOPlan(agtmq.NewWaitIOPlan(taskID, 5000))
  112. if err != nil {
  113. anyErr = err
  114. break
  115. }
  116. if resp.IsComplete {
  117. if resp.Error != "" {
  118. anyErr = errors.New(resp.Error)
  119. done.Store(true)
  120. } else {
  121. rets[idx] = &resp.Result
  122. }
  123. break
  124. }
  125. if done.Load() {
  126. break
  127. }
  128. }
  129. }()
  130. }
  131. wg.Wait()
  132. if anyErr != nil {
  133. e.callback.SetError(anyErr)
  134. return
  135. }
  136. reducedRet := ExecutorResult{
  137. ResultValues: make(map[string]any),
  138. }
  139. for _, ret := range rets {
  140. for k, v := range ret.Values {
  141. reducedRet.ResultValues[k] = v
  142. }
  143. }
  144. e.callback.SetValue(reducedRet)
  145. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。