You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

task_manager.cc 12 kB

6 years ago
6 years ago
6 years ago
6 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367
  1. /**
  2. * Copyright 2019 Huawei Technologies Co., Ltd
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include <algorithm>
  17. #include <functional>
  18. #include <set>
  19. #include "./securec.h"
  20. #include "minddata/dataset/util/task_manager.h"
  21. namespace mindspore {
  22. namespace dataset {
  23. TaskManager *TaskManager::instance_ = nullptr;
  24. std::once_flag TaskManager::init_instance_flag_;
  25. // This takes the same parameter as Task constructor.
  26. Status TaskManager::CreateAsyncTask(const std::string &my_name, const std::function<Status()> &f, TaskGroup *vg,
  27. Task **task, int32_t operator_id) {
  28. // We need to block destructor coming otherwise we will deadlock. We will grab the
  29. // stateLock in shared allowing CreateAsyncTask to run concurrently.
  30. SharedLock stateLck(&state_lock_);
  31. // Now double check the state
  32. if (ServiceState() == STATE::kStopInProg || ServiceState() == STATE::kStopped) {
  33. return Status(StatusCode::kMDInterrupted, __LINE__, __FILE__, "TaskManager is shutting down");
  34. }
  35. RETURN_IF_NOT_OK(GetFreeTask(my_name, f, task, operator_id));
  36. if (vg == nullptr) {
  37. RETURN_STATUS_UNEXPECTED("TaskGroup is null");
  38. }
  39. // Previously there is a timing hole where the thread is spawn but hit error immediately before we can set
  40. // the TaskGroup pointer. We will do the set here before we call run(). The run() will do the registration.
  41. (*task)->set_task_group(vg);
  42. // Link to the master lru list.
  43. {
  44. UniqueLock lck(&lru_lock_);
  45. lru_.Append(*task);
  46. }
  47. // Link to the group list as well before we spawn.
  48. {
  49. UniqueLock lck(&vg->rw_lock_);
  50. vg->grp_list_.Append(*task);
  51. }
  52. // Track all the TaskGroup. Used for control-c
  53. {
  54. LockGuard lck(&tg_lock_);
  55. (void)this->grp_list_.insert(vg);
  56. }
  57. RETURN_IF_NOT_OK((*task)->wp_.Register(vg));
  58. RETURN_IF_NOT_OK((*task)->Run());
  59. // Wait for the thread to initialize successfully.
  60. RETURN_IF_NOT_OK((*task)->Wait());
  61. return Status::OK();
  62. }
  63. Status TaskManager::join_all() {
  64. Status rc;
  65. Status rc2;
  66. SharedLock lck(&lru_lock_);
  67. for (Task &tk : lru_) {
  68. rc = tk.Join();
  69. if (rc.IsError()) {
  70. rc2 = rc;
  71. }
  72. }
  73. return rc2;
  74. }
  75. void TaskManager::interrupt_all() noexcept {
  76. global_interrupt_ = 1;
  77. LockGuard lck(&tg_lock_);
  78. for (TaskGroup *vg : grp_list_) {
  79. auto svc = vg->GetIntrpService();
  80. if (svc) {
  81. // Stop the interrupt service. No new request is accepted.
  82. Status rc = svc->ServiceStop();
  83. if (rc.IsError()) MS_LOG(ERROR) << "Error while stopping the service. Message: " << rc;
  84. svc->InterruptAll();
  85. }
  86. }
  87. master_->Interrupt();
  88. }
  89. Task *TaskManager::FindMe() {
  90. #if !defined(_WIN32) && !defined(_WIN64)
  91. return gMyTask;
  92. #else
  93. TaskManager &tm = TaskManager::GetInstance();
  94. SharedLock lock(&tm.lru_lock_);
  95. auto id = this_thread::get_id();
  96. auto tk = std::find_if(tm.lru_.begin(), tm.lru_.end(), [id](const Task &tk) { return tk.id_ == id; });
  97. if (tk != tm.lru_.end()) {
  98. return &(*tk);
  99. }
  100. // If we get here, either I am the watchdog or the master thread.
  101. if (tm.master_->id_ == id) {
  102. return tm.master_.get();
  103. } else if (tm.watchdog_ != nullptr && tm.watchdog_->id_ == id) {
  104. return tm.watchdog_;
  105. }
  106. MS_LOG(ERROR) << "Task not found.";
  107. return nullptr;
  108. #endif
  109. }
  110. TaskManager::TaskManager() try : global_interrupt_(0),
  111. lru_(&Task::node),
  112. free_lst_(&Task::free),
  113. watchdog_grp_(nullptr),
  114. watchdog_(nullptr) {
  115. auto alloc = Services::GetAllocator<Task>();
  116. // Create a dummy Task for the master thread (this thread)
  117. master_ = std::allocate_shared<Task>(alloc, "master", []() -> Status { return Status::OK(); });
  118. master_->id_ = this_thread::get_id();
  119. master_->running_ = true;
  120. master_->is_master_ = true;
  121. #if !defined(_WIN32) && !defined(_WIN64)
  122. gMyTask = master_.get();
  123. #if !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__)
  124. // Initialize the semaphore for the watchdog
  125. errno_t rc = sem_init(&sem_, 0, 0);
  126. if (rc == -1) {
  127. MS_LOG(ERROR) << "Unable to initialize a semaphore. Errno = " << rc << ".";
  128. std::terminate();
  129. }
  130. #endif
  131. #endif
  132. } catch (const std::exception &e) {
  133. MS_LOG(ERROR) << "MindData initialization failed: " << e.what() << ".";
  134. std::terminate();
  135. }
  136. TaskManager::~TaskManager() {
  137. if (watchdog_) {
  138. WakeUpWatchDog();
  139. (void)watchdog_->Join();
  140. // watchdog_grp_ and watchdog_ pointers come from Services::GetInstance().GetServiceMemPool() which we will free it
  141. // on shutdown. So no need to free these pointers one by one.
  142. watchdog_grp_ = nullptr;
  143. watchdog_ = nullptr;
  144. }
  145. #if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__)
  146. (void)sem_destroy(&sem_);
  147. #endif
  148. }
  149. Status TaskManager::DoServiceStart() {
  150. MS_LOG(INFO) << "Starting Task Manager.";
  151. #if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__)
  152. // Create a watchdog for control-c
  153. std::shared_ptr<MemoryPool> mp = Services::GetInstance().GetServiceMemPool();
  154. // A dummy group just for the watchdog. We aren't really using it. But most code assumes a thread must
  155. // belong to a group.
  156. auto f = std::bind(&TaskManager::WatchDog, this);
  157. Status rc;
  158. watchdog_grp_ = new (&rc, mp) TaskGroup();
  159. RETURN_IF_NOT_OK(rc);
  160. rc = watchdog_grp_->CreateAsyncTask("Watchdog", f, &watchdog_);
  161. if (rc.IsError()) {
  162. ::operator delete(watchdog_grp_, mp);
  163. watchdog_grp_ = nullptr;
  164. return rc;
  165. }
  166. (void)grp_list_.erase(watchdog_grp_);
  167. lru_.Remove(watchdog_);
  168. #endif
  169. return Status::OK();
  170. }
  171. Status TaskManager::DoServiceStop() {
  172. WakeUpWatchDog();
  173. interrupt_all();
  174. return Status::OK();
  175. }
  176. Status TaskManager::WatchDog() {
  177. TaskManager::FindMe()->Post();
  178. #if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__)
  179. errno_t err = sem_wait(&sem_);
  180. if (err == -1) {
  181. RETURN_STATUS_UNEXPECTED("Errno = " + std::to_string(errno));
  182. }
  183. // We are woken up by control-c and we are going to stop all threads that are running.
  184. // In addition, we also want to prevent new thread from creating. This can be done
  185. // easily by calling the parent function.
  186. RETURN_IF_NOT_OK(ServiceStop());
  187. #endif
  188. return Status::OK();
  189. }
  190. // Follow the group link and interrupt other
  191. // Task in the same group. It is used by
  192. // Watchdog only.
  193. void TaskManager::InterruptGroup(Task &curTk) {
  194. TaskGroup *vg = curTk.MyTaskGroup();
  195. vg->interrupt_all();
  196. }
  197. void TaskManager::InterruptMaster(const Status &rc) {
  198. TaskManager &tm = TaskManager::GetInstance();
  199. std::shared_ptr<Task> master = tm.master_;
  200. std::lock_guard<std::mutex> lck(master->mux_);
  201. master->Interrupt();
  202. if (rc.IsError() && master->rc_.IsOk()) {
  203. master->rc_ = rc;
  204. master->caught_severe_exception_ = true;
  205. // Move log error here for some scenarios didn't call GetMasterThreadRc
  206. MS_LOG(ERROR) << "Task is terminated with err msg(more detail in info level log):" << master->rc_;
  207. }
  208. }
  209. Status TaskManager::GetMasterThreadRc() {
  210. TaskManager &tm = TaskManager::GetInstance();
  211. std::shared_ptr<Task> master = tm.master_;
  212. Status rc = tm.master_->GetTaskErrorIfAny();
  213. if (rc.IsError()) {
  214. // Reset the state once we retrieve the value.
  215. std::lock_guard<std::mutex> lck(master->mux_);
  216. master->rc_ = Status::OK();
  217. master->caught_severe_exception_ = false;
  218. master->ResetIntrpState();
  219. }
  220. return rc;
  221. }
  222. void TaskManager::ReturnFreeTask(Task *p) noexcept {
  223. // Take it out from lru_ if any
  224. {
  225. UniqueLock lck(&lru_lock_);
  226. auto it = std::find(lru_.begin(), lru_.end(), *p);
  227. if (it != lru_.end()) {
  228. lru_.Remove(p);
  229. }
  230. }
  231. // We need to deallocate the string resources associated with the Task class
  232. // before we cache its memory for future use.
  233. p->~Task();
  234. // Put it back into free list
  235. {
  236. LockGuard lck(&free_lock_);
  237. free_lst_.Append(p);
  238. }
  239. }
  240. Status TaskManager::GetFreeTask(const std::string &my_name, const std::function<Status()> &f, Task **p,
  241. int32_t operator_id) {
  242. if (p == nullptr) {
  243. RETURN_STATUS_UNEXPECTED("p is null");
  244. }
  245. Task *q = nullptr;
  246. // First try the free list
  247. {
  248. LockGuard lck(&free_lock_);
  249. if (free_lst_.count > 0) {
  250. q = free_lst_.head;
  251. free_lst_.Remove(q);
  252. }
  253. }
  254. if (q) {
  255. new (q) Task(my_name, f, operator_id);
  256. } else {
  257. std::shared_ptr<MemoryPool> mp = Services::GetInstance().GetServiceMemPool();
  258. Status rc;
  259. q = new (&rc, mp) Task(my_name, f, operator_id);
  260. RETURN_IF_NOT_OK(rc);
  261. }
  262. *p = q;
  263. return Status::OK();
  264. }
  265. Status TaskGroup::CreateAsyncTask(const std::string &my_name, const std::function<Status()> &f, Task **ppTask,
  266. int32_t operator_id) {
  267. auto pMytask = TaskManager::FindMe();
  268. // We need to block ~TaskGroup coming otherwise we will deadlock. We will grab the
  269. // stateLock in shared allowing CreateAsyncTask to run concurrently.
  270. SharedLock state_lck(&state_lock_);
  271. // Now double check the state
  272. if (ServiceState() != STATE::kRunning) {
  273. return Status(StatusCode::kMDInterrupted, __LINE__, __FILE__, "Taskgroup is shutting down");
  274. }
  275. TaskManager &dm = TaskManager::GetInstance();
  276. Task *pTask = nullptr;
  277. // If the group is already in error, early exit too.
  278. // We can't hold the rc_mux_ throughout because the thread spawned by CreateAsyncTask may hit error which
  279. // will try to shutdown the group and grab the rc_mux_ and we will deadlock.
  280. {
  281. std::unique_lock<std::mutex> rcLock(rc_mux_);
  282. if (rc_.IsError()) {
  283. return pMytask->IsMasterThread() ? rc_ : Status(StatusCode::kMDInterrupted);
  284. }
  285. }
  286. RETURN_IF_NOT_OK(dm.CreateAsyncTask(my_name, f, this, &pTask, operator_id));
  287. if (ppTask) {
  288. *ppTask = pTask;
  289. }
  290. return Status::OK();
  291. }
  292. void TaskGroup::interrupt_all() noexcept {
  293. // There is a racing condition if we don't stop the interrupt service at this point. New resource
  294. // may come in and not being picked up after we call InterruptAll(). So stop new comers and then
  295. // interrupt any existing resources.
  296. (void)intrp_svc_->ServiceStop();
  297. intrp_svc_->InterruptAll();
  298. }
  299. Status TaskGroup::join_all(Task::WaitFlag wf) {
  300. Status rc;
  301. Status rc2;
  302. SharedLock lck(&rw_lock_);
  303. for (Task &tk : grp_list_) {
  304. rc = tk.Join(wf);
  305. if (rc.IsError()) {
  306. rc2 = rc;
  307. }
  308. }
  309. return rc2;
  310. }
  311. Status TaskGroup::DoServiceStop() {
  312. interrupt_all();
  313. return (join_all(Task::WaitFlag::kNonBlocking));
  314. }
  315. TaskGroup::TaskGroup() : grp_list_(&Task::group), intrp_svc_(nullptr) {
  316. auto alloc = Services::GetAllocator<IntrpService>();
  317. intrp_svc_ = std::allocate_shared<IntrpService>(alloc);
  318. (void)Service::ServiceStart();
  319. }
  320. TaskGroup::~TaskGroup() {
  321. (void)Service::ServiceStop();
  322. // The TaskGroup is going out of scope, and we can return the Task list to the free list.
  323. Task *cur = grp_list_.head;
  324. TaskManager &tm = TaskManager::GetInstance();
  325. while (cur) {
  326. Task *next = cur->group.next;
  327. grp_list_.Remove(cur);
  328. tm.ReturnFreeTask(cur);
  329. cur = next;
  330. }
  331. {
  332. LockGuard lck(&tm.tg_lock_);
  333. (void)tm.grp_list_.erase(this);
  334. }
  335. }
  336. Status TaskGroup::GetTaskErrorIfAny() {
  337. SharedLock lck(&rw_lock_);
  338. for (Task &tk : grp_list_) {
  339. RETURN_IF_NOT_OK(tk.GetTaskErrorIfAny());
  340. }
  341. return Status::OK();
  342. }
  343. std::shared_ptr<IntrpService> TaskGroup::GetIntrpService() { return intrp_svc_; }
  344. } // namespace dataset
  345. } // namespace mindspore