You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

thread.cpp 8.6 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. /**
  2. * \file src/core/impl/utils/thread.cpp
  3. * MegEngine is Licensed under the Apache License, Version 2.0 (the "License")
  4. *
  5. * Copyright (c) 2014-2020 Megvii Inc. All rights reserved.
  6. *
  7. * Unless required by applicable law or agreed to in writing,
  8. * software distributed under the License is distributed on an
  9. * "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. */
  11. #include "megbrain/utils/thread.h"
  12. #include <thread>
  13. #include <atomic>
  14. using namespace mgb;
  15. #if MGB_THREAD_SAFE
  16. const std::thread::id RecursiveSpinlock::sm_none_owner = std::thread::id();
  17. //! why not use initializer_list for global var, detail:
  18. //! MGE-1738
  19. RecursiveSpinlock::RecursiveSpinlock() {
  20. m_owner = sm_none_owner;
  21. }
  22. void RecursiveSpinlock::lock() {
  23. auto tid = std::this_thread::get_id();
  24. if (m_owner.load(std::memory_order_relaxed) != tid) {
  25. for (; ;) {
  26. auto id = sm_none_owner;
  27. if (m_owner.compare_exchange_weak(id, tid,
  28. std::memory_order_acquire,
  29. std::memory_order_relaxed)) {
  30. break;
  31. }
  32. }
  33. }
  34. ++ m_recur_count;
  35. }
  36. void RecursiveSpinlock::unlock() {
  37. mgb_assert(m_recur_count &&
  38. m_owner.load(std::memory_order_relaxed) ==
  39. std::this_thread::get_id());
  40. if (! (-- m_recur_count)) {
  41. m_owner.store(sm_none_owner, std::memory_order_release);
  42. }
  43. }
  44. #else
  45. #if MGB_HAVE_THREAD
  46. #error "can not disable thread safety while enabling thread support"
  47. #endif
  48. #endif
  49. #if MGB_HAVE_THREAD
  50. #include "megbrain/utils/timer.h"
  51. #include <ctime>
  52. namespace {
  53. class SpinlockReleaser {
  54. std::atomic_flag &m_lock;
  55. public:
  56. SpinlockReleaser(std::atomic_flag &lock):
  57. m_lock{lock}
  58. {}
  59. ~SpinlockReleaser() {
  60. m_lock.clear(std::memory_order_release);
  61. }
  62. };
  63. }
  64. /* =============== SCQueueSynchronizer =============== */
  65. size_t SCQueueSynchronizer::cached_default_max_spin = 0;
  66. #ifdef WIN32
  67. bool SCQueueSynchronizer::is_into_atexit = false;
  68. #endif
  69. size_t SCQueueSynchronizer::get_default_max_spin() {
  70. if (cached_default_max_spin)
  71. return cached_default_max_spin;
  72. if (MGB_GETENV("MGB_WORKER_NO_SLEEP")) {
  73. mgb_log_warn("worker would not sleep");
  74. return cached_default_max_spin = std::numeric_limits<size_t>::max();
  75. }
  76. if (auto spin_string = MGB_GETENV("MGB_WORKER_MAX_SPIN")) {
  77. auto spin = std::stoi(spin_string);
  78. mgb_log_warn("worker would execute with spin of %d", spin);
  79. return cached_default_max_spin = spin;
  80. }
  81. // heuristically, let CPU spinning around 5ms at most before CPU yield.
  82. // we are going to measure how many spins will spent 5ms on current platform.
  83. std::atomic_bool start{false}, stop{false};
  84. size_t cnt;
  85. double cnt_time;
  86. auto worker_fn = [&]() {
  87. start.store(true);
  88. volatile size_t cntv = 0;
  89. RealTimer timer;
  90. while (!stop.load() && (cntv < (1 << 24))) {
  91. ++ cntv;
  92. }
  93. cnt_time = timer.get_msecs();
  94. cnt = cntv;
  95. };
  96. std::thread worker{worker_fn};
  97. while (!start.load()) {
  98. std::this_thread::yield();
  99. }
  100. {
  101. using namespace std::chrono_literals;
  102. std::this_thread::sleep_for(5ms);
  103. }
  104. stop.store(true);
  105. worker.join();
  106. cached_default_max_spin = std::max<size_t>(cnt * (5 / cnt_time), 100000);
  107. return cached_default_max_spin;
  108. }
  109. SCQueueSynchronizer::SCQueueSynchronizer(size_t max_spin) {
  110. m_max_spin = max_spin;
  111. }
  112. SCQueueSynchronizer::~SCQueueSynchronizer() noexcept {
  113. if (!m_worker_started)
  114. return;
  115. if (!m_wait_finish_called) {
  116. mgb_log_error("async queue not finished in destructor");
  117. mgb_trap();
  118. }
  119. {
  120. MGB_LOCK_GUARD(m_mtx_more_task);
  121. m_should_exit = true;
  122. m_cv_more_task.notify_all();
  123. }
  124. m_worker_thread.join();
  125. }
  126. void SCQueueSynchronizer::start_worker(std::thread thread) {
  127. mgb_assert(!m_worker_started);
  128. m_worker_started = true;
  129. m_worker_thread = std::move(thread);
  130. }
  131. void SCQueueSynchronizer::producer_add() {
  132. m_wait_finish_called = false;
  133. m_tot_task.fetch_add(1, std::memory_order_release);
  134. if (m_consumer_waiting.test_and_set(std::memory_order_acquire)) {
  135. // m_consumer_waiting already acquired by consumer or another producer
  136. MGB_LOCK_GUARD(m_mtx_more_task);
  137. m_cv_more_task.notify_all();
  138. } else {
  139. m_consumer_waiting.clear(std::memory_order_release);
  140. }
  141. }
  142. void SCQueueSynchronizer::producer_wait() {
  143. auto wait_target = m_tot_task.load(std::memory_order_relaxed);
  144. if (m_worker_started &&
  145. m_finished_task.load(std::memory_order_acquire) < wait_target) {
  146. std::unique_lock<std::mutex> lock(m_mtx_finished);
  147. // update wait_target again in this critical section
  148. wait_target = m_tot_task.load(std::memory_order_relaxed);
  149. if (m_waiter_target_queue.empty()) {
  150. m_waiter_target.store(wait_target, std::memory_order_relaxed);
  151. m_waiter_target_queue.push_back(wait_target);
  152. } else {
  153. mgb_assert(wait_target >= m_waiter_target_queue.back());
  154. if (wait_target > m_waiter_target_queue.back()) {
  155. m_waiter_target_queue.push_back(wait_target);
  156. }
  157. }
  158. size_t done;
  159. for (; ;) {
  160. // ensure that m_waiter_target is visible in consumer
  161. std::atomic_thread_fence(std::memory_order_seq_cst);
  162. done = m_finished_task.load(std::memory_order_relaxed);
  163. if (done >= wait_target)
  164. break;
  165. m_cv_finished.wait(lock);
  166. }
  167. if (!m_waiter_target_queue.empty()) {
  168. size_t next_target = 0;
  169. while (done >= (next_target = m_waiter_target_queue.front())) {
  170. m_waiter_target_queue.pop_front();
  171. if (m_waiter_target_queue.empty()) {
  172. next_target = std::numeric_limits<size_t>::max();
  173. break;
  174. }
  175. }
  176. m_waiter_target.store(next_target, std::memory_order_release);
  177. // this is necessary in practice, although not needed logically
  178. m_cv_finished.notify_all();
  179. }
  180. }
  181. m_wait_finish_called = true;
  182. }
  183. size_t SCQueueSynchronizer::consumer_fetch(size_t max, size_t min) {
  184. mgb_assert(max >= min && min >= 1);
  185. size_t spin = 0,
  186. cur_finished = m_finished_task.load(std::memory_order_relaxed);
  187. // relaxed mem order suffices because acquire would be called for ret
  188. while (m_tot_task.load(std::memory_order_relaxed) < cur_finished + min) {
  189. ++ spin;
  190. if (spin >= m_max_spin) {
  191. while (m_consumer_waiting.test_and_set(std::memory_order_relaxed));
  192. SpinlockReleaser releaser(m_consumer_waiting);
  193. std::unique_lock<std::mutex> lock(m_mtx_more_task);
  194. if (m_should_exit.load(std::memory_order_relaxed))
  195. return 0;
  196. if (m_tot_task.load(std::memory_order_relaxed) >=
  197. cur_finished + min)
  198. break;
  199. m_cv_more_task.wait(lock);
  200. }
  201. if (m_should_exit.load(std::memory_order_relaxed))
  202. return 0;
  203. }
  204. auto ret = std::min(
  205. m_tot_task.load(std::memory_order_acquire) - cur_finished, max);
  206. mgb_assert(ret >= min);
  207. return ret;
  208. }
  209. void SCQueueSynchronizer::consumer_commit(size_t nr) {
  210. auto done = m_finished_task.fetch_add(nr, std::memory_order_relaxed) + nr;
  211. // pair with the thread fence in producer_wait()
  212. std::atomic_thread_fence(std::memory_order_seq_cst);
  213. if (done >= m_waiter_target.load(std::memory_order_relaxed)) {
  214. MGB_LOCK_GUARD(m_mtx_finished);
  215. m_cv_finished.notify_all();
  216. }
  217. }
  218. /* =============== SyncableCounter =============== */
  219. SyncableCounter::SyncableCounter() = default;
  220. void SyncableCounter::incr(int delta) {
  221. MGB_LOCK_GUARD(m_mtx);
  222. m_val += delta;
  223. if (!m_val)
  224. m_cv.notify_all();
  225. }
  226. void SyncableCounter::wait_zero() {
  227. std::unique_lock<std::mutex> lk{m_mtx};
  228. for (; ; ) {
  229. if (!m_val)
  230. return;
  231. m_cv.wait(lk);
  232. }
  233. }
  234. #else // MGB_HAVE_THREAD
  235. #pragma message "threading support is disabled"
  236. #if MGB_CUDA
  237. #error "cuda must be disabled if threading is not available"
  238. #endif
  239. #endif // MGB_HAVE_THREAD
  240. // vim: syntax=cpp.doxygen foldmethod=marker foldmarker=f{{{,f}}}

MegEngine 安装包中集成了使用 GPU 运行代码所需的 CUDA 环境,不用区分 CPU 和 GPU 版。 如果想要运行 GPU 程序,请确保机器本身配有 GPU 硬件设备并安装好驱动。 如果你想体验在云端 GPU 算力平台进行深度学习开发的感觉,欢迎访问 MegStudio 平台