You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

task_manager.h 5.0 kB

6 years ago
6 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. /**
  2. * Copyright 2019 Huawei Technologies Co., Ltd
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_UTIL_TASK_MANAGER_H_
  17. #define MINDSPORE_CCSRC_MINDDATA_DATASET_UTIL_TASK_MANAGER_H_
  18. #if !defined(_WIN32) && !defined(_WIN64)
  19. #include <semaphore.h>
  20. #include <signal.h> // for sig_atomic_t
  21. #endif
  22. #include <condition_variable>
  23. #include <functional>
  24. #include <memory>
  25. #include <string>
  26. #include <set>
  27. #include "minddata/dataset/util/allocator.h"
  28. #include "minddata/dataset/util/intrp_service.h"
  29. #include "minddata/dataset/util/lock.h"
  30. #include "minddata/dataset/util/services.h"
  31. #include "minddata/dataset/util/status.h"
  32. #include "minddata/dataset/util/task.h"
  33. namespace mindspore {
  34. namespace dataset {
  35. namespace thread {
  36. using id = std::thread::id;
  37. } // namespace thread
  38. namespace this_thread {
  39. inline thread::id get_id() { return std::this_thread::get_id(); }
  40. } // namespace this_thread
  41. class TaskManager : public Service {
  42. public:
  43. friend class Services;
  44. friend class TaskGroup;
  45. ~TaskManager() override;
  46. TaskManager(const TaskManager &) = delete;
  47. TaskManager &operator=(const TaskManager &) = delete;
  48. static TaskManager &GetInstance() noexcept { return Services::getTaskMgrInstance(); }
  49. Status DoServiceStart() override;
  50. Status DoServiceStop() override;
  51. // A public global interrupt flag for signal handlers
  52. volatile sig_atomic_t global_interrupt_;
  53. // API
  54. // This takes the same parameter as Task constructor. Take a look
  55. // of the test-thread.cc for usage.
  56. Status CreateAsyncTask(const std::string &my_name, const std::function<Status()> &f, TaskGroup *vg, Task **);
  57. // Same usage as boot thread group
  58. Status join_all();
  59. void interrupt_all() noexcept;
  60. // Locate a particular Task.
  61. static Task *FindMe();
  62. static void InterruptGroup(Task &);
  63. static Status GetMasterThreadRc();
  64. static void InterruptMaster(const Status &rc = Status::OK());
  65. static void WakeUpWatchDog() {
  66. #if !defined(_WIN32) && !defined(_WIN64)
  67. TaskManager &tm = TaskManager::GetInstance();
  68. (void)sem_post(&tm.sem_);
  69. #endif
  70. }
  71. void ReturnFreeTask(Task *p) noexcept;
  72. Status GetFreeTask(const std::string &my_name, const std::function<Status()> &f, Task **p);
  73. Status WatchDog();
  74. private:
  75. RWLock lru_lock_;
  76. SpinLock free_lock_;
  77. SpinLock tg_lock_;
  78. std::shared_ptr<Task> master_;
  79. List<Task> lru_;
  80. List<Task> free_lst_;
  81. #if !defined(_WIN32) && !defined(_WIN64)
  82. sem_t sem_;
  83. #endif
  84. TaskGroup *watchdog_grp_;
  85. std::set<TaskGroup *> grp_list_;
  86. Task *watchdog_;
  87. TaskManager();
  88. };
  89. // A group of related tasks.
  90. class TaskGroup : public Service {
  91. public:
  92. friend class Task;
  93. friend class TaskManager;
  94. Status CreateAsyncTask(const std::string &my_name, const std::function<Status()> &f, Task **pTask = nullptr);
  95. void interrupt_all() noexcept;
  96. Status join_all(Task::WaitFlag wf = Task::WaitFlag::kBlocking);
  97. int size() const noexcept { return grp_list_.count; }
  98. Status DoServiceStart() override { return Status::OK(); }
  99. Status DoServiceStop() override;
  100. TaskGroup();
  101. ~TaskGroup() override;
  102. Status GetTaskErrorIfAny();
  103. std::shared_ptr<IntrpService> GetIntrpService();
  104. private:
  105. Status rc_;
  106. // Can't use rw_lock_ as we will lead to deadlatch. Create another mutex to serialize access to rc_.
  107. std::mutex rc_mux_;
  108. RWLock rw_lock_;
  109. List<Task> grp_list_;
  110. std::shared_ptr<IntrpService> intrp_svc_;
  111. };
  112. namespace this_thread {
  113. inline bool is_interrupted() {
  114. TaskManager &tm = TaskManager::GetInstance();
  115. if (tm.global_interrupt_ == 1) {
  116. return true;
  117. }
  118. Task *my_task = TaskManager::FindMe();
  119. return my_task->Interrupted();
  120. }
  121. inline bool is_master_thread() {
  122. Task *my_task = TaskManager::FindMe();
  123. return my_task->IsMasterThread();
  124. }
  125. inline Status GetInterruptStatus() {
  126. Task *my_task = TaskManager::FindMe();
  127. return my_task->GetInterruptStatus();
  128. }
  129. } // namespace this_thread
  130. #define RETURN_IF_INTERRUPTED() \
  131. do { \
  132. if (mindspore::dataset::this_thread::is_interrupted()) { \
  133. return Task::OverrideInterruptRc(this_thread::GetInterruptStatus()); \
  134. } \
  135. } while (false)
  136. } // namespace dataset
  137. } // namespace mindspore
  138. #endif // MINDSPORE_CCSRC_MINDDATA_DATASET_UTIL_TASK_MANAGER_H_