You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

manager.go 2.4 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. package task
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. type Manager[TCtx any] struct {
  7. tasks []*Task[TCtx]
  8. lock sync.Mutex
  9. ctx TCtx
  10. }
  11. func NewManager[TCtx any](ctx TCtx) Manager[TCtx] {
  12. return Manager[TCtx]{
  13. ctx: ctx,
  14. }
  15. }
  16. // StartNew 启动一个新任务
  17. func (m *Manager[TCtx]) StartNew(body TaskBody[TCtx]) *Task[TCtx] {
  18. m.lock.Lock()
  19. defer m.lock.Unlock()
  20. task := &Task[TCtx]{
  21. body: body,
  22. }
  23. m.tasks = append(m.tasks, task)
  24. m.executeTask(task)
  25. return task
  26. }
  27. // Start 遍历正在运行的任务列表,如果存在相同的任务,则直接返回这个任务,否则创建一个新任务
  28. func (m *Manager[TCtx]) Start(body TaskBody[TCtx], cmp func(self, other TaskBody[TCtx]) bool) *Task[TCtx] {
  29. m.lock.Lock()
  30. defer m.lock.Unlock()
  31. for _, t := range m.tasks {
  32. if cmp(body, t.body) {
  33. return t
  34. }
  35. }
  36. task := &Task[TCtx]{
  37. body: body,
  38. }
  39. m.tasks = append(m.tasks, task)
  40. m.executeTask(task)
  41. return task
  42. }
  43. func (m *Manager[TCtx]) StartCmp(body ComparableTaskBody[TCtx]) *Task[TCtx] {
  44. return m.Start(body, func(self, other TaskBody[TCtx]) bool {
  45. return body.Compare(other)
  46. })
  47. }
  48. func (m *Manager[TCtx]) Find(predicate func(body TaskBody[TCtx]) bool) *Task[TCtx] {
  49. m.lock.Lock()
  50. defer m.lock.Unlock()
  51. for _, t := range m.tasks {
  52. if predicate(t.body) {
  53. return t
  54. }
  55. }
  56. return nil
  57. }
  58. func (m *Manager[TCtx]) executeTask(task *Task[TCtx]) {
  59. go func() {
  60. task.body.Execute(m.ctx, func(err error, completing func()) {
  61. // 删除任务
  62. m.lock.Lock()
  63. for i, t := range m.tasks {
  64. if t == task {
  65. m.tasks[i] = m.tasks[len(m.tasks)-1]
  66. m.tasks = m.tasks[:len(m.tasks)-1]
  67. break
  68. }
  69. }
  70. completing()
  71. m.lock.Unlock()
  72. task.waiterLock.Lock()
  73. task.isCompleted = true
  74. task.err = err
  75. task.waiterLock.Unlock()
  76. // 触发回调
  77. for _, w := range task.waiters {
  78. close(w)
  79. }
  80. for _, c := range task.onCompleted {
  81. c(task)
  82. }
  83. })
  84. // 如果Task没有调用complete函数就退出了,那么就认为是出错结束
  85. notCompletedYet := false
  86. task.waiterLock.Lock()
  87. if !task.isCompleted {
  88. task.isCompleted = true
  89. task.err = fmt.Errorf("task exit without calling complete function")
  90. notCompletedYet = true
  91. }
  92. task.waiterLock.Unlock()
  93. if notCompletedYet {
  94. // 触发回调
  95. for _, w := range task.waiters {
  96. close(w)
  97. }
  98. for _, c := range task.onCompleted {
  99. c(task)
  100. }
  101. }
  102. }()
  103. }

公共库

Contributors (1)