You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

thread.cpp 13 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449
  1. #include "megbrain/utils/thread.h"
  2. #include <atomic>
  3. #include <random>
  4. #include "megbrain/test/helper.h"
  5. #include "megbrain/utils/timer.h"
  6. #if MGB_HAVE_THREAD
  7. using namespace mgb;
  8. namespace {
  9. #if MGB_ENABLE_EXCEPTION
  10. class ExcMaker final : public AsyncQueueSC<int, ExcMaker> {
  11. public:
  12. void process_one_task(int) { throw std::runtime_error("test"); }
  13. };
  14. #endif
  15. class FuncExecutor final : public AsyncQueueSC<thin_function<void()>, FuncExecutor> {
  16. public:
  17. void process_one_task(const thin_function<void()>& task) { task(); }
  18. };
  19. template <int producer_sleep, int consumer_sleep>
  20. void test_scq_sync_multi_producer() {
  21. size_t nr_worker_call = 0;
  22. SCQueueSynchronizer sync(0);
  23. auto worker = [&]() {
  24. RNGxorshf rng{next_rand_seed()};
  25. while (auto nr = sync.consumer_fetch(1)) {
  26. nr_worker_call += nr;
  27. ASSERT_EQ(1u, nr);
  28. if (consumer_sleep) {
  29. std::this_thread::sleep_for(
  30. std::chrono::microseconds(rng() % consumer_sleep));
  31. }
  32. sync.consumer_commit(nr);
  33. }
  34. };
  35. sync.start_worker(std::thread{worker});
  36. constexpr size_t N = 500, M = 8;
  37. std::atomic_size_t nr_worker_started{0};
  38. auto producer_impl = [&]() {
  39. RNGxorshf rng{next_rand_seed()};
  40. ++nr_worker_started;
  41. while (nr_worker_started.load() != M)
  42. ;
  43. for (size_t i = 0; i < N; ++i) {
  44. if (producer_sleep) {
  45. std::this_thread::sleep_for(
  46. std::chrono::microseconds(rng() % producer_sleep));
  47. }
  48. sync.producer_add();
  49. if (i % 4 == 0)
  50. sync.producer_wait();
  51. }
  52. };
  53. std::vector<std::thread> producer_threads;
  54. for (size_t i = 0; i < M; ++i) {
  55. producer_threads.emplace_back(producer_impl);
  56. }
  57. for (auto&& i : producer_threads)
  58. i.join();
  59. sync.producer_wait();
  60. ASSERT_EQ(N * M, nr_worker_call);
  61. }
  62. } // namespace
  63. TEST(TestAsyncQueue, Synchronizer) {
  64. size_t nr_worker_call = 0;
  65. SCQueueSynchronizer sync(0);
  66. auto worker = [&]() {
  67. for (;;) {
  68. auto nr = sync.consumer_fetch(1);
  69. if (!nr)
  70. return;
  71. nr_worker_call += nr;
  72. ASSERT_EQ(1u, nr);
  73. sync.consumer_commit(nr);
  74. }
  75. };
  76. sync.start_worker(std::thread{worker});
  77. constexpr size_t N = 3000000;
  78. RealTimer timer;
  79. for (size_t i = 0; i < N; ++i) {
  80. sync.producer_add();
  81. }
  82. auto tadd = timer.get_secs_reset() * 1e9 / N;
  83. sync.producer_wait();
  84. auto twait = timer.get_secs_reset() * 1e9 / N;
  85. ASSERT_EQ(N, nr_worker_call);
  86. printf("tadd=%.3f twait=%.3f [ns]\n", tadd, twait);
  87. }
  88. TEST(TestAsyncQueue, SynchronizerWaitOverhead) {
  89. {
  90. size_t nr_worker_call = 0;
  91. SCQueueSynchronizer sync(0);
  92. auto worker = [&]() {
  93. for (;;) {
  94. auto nr = sync.consumer_fetch(1);
  95. if (!nr)
  96. return;
  97. nr_worker_call += nr;
  98. ASSERT_EQ(1u, nr);
  99. sync.consumer_commit(nr);
  100. }
  101. };
  102. sync.start_worker(std::thread{worker});
  103. constexpr size_t N = 300000;
  104. RealTimer timer;
  105. for (size_t i = 0; i < N; ++i) {
  106. sync.producer_add();
  107. sync.producer_wait();
  108. }
  109. ASSERT_EQ(N, nr_worker_call);
  110. printf("avg_twait=%.3f [us]\n", timer.get_msecs() * 1e3 / N);
  111. }
  112. {
  113. double worker_time = 0, avg_await;
  114. {
  115. size_t nr_worker_call = 0;
  116. SCQueueSynchronizer sync(0);
  117. auto worker = [&]() {
  118. for (;;) {
  119. auto nr = sync.consumer_fetch(1);
  120. if (!nr)
  121. return;
  122. RealTimer timer;
  123. nr_worker_call += nr;
  124. ASSERT_EQ(1u, nr);
  125. using namespace std::chrono_literals;
  126. std::this_thread::sleep_for(100ms);
  127. sync.consumer_commit(nr);
  128. worker_time += timer.get_msecs();
  129. }
  130. };
  131. sync.start_worker(std::thread{worker});
  132. constexpr size_t N = 5;
  133. RealTimer timer;
  134. for (size_t i = 0; i < N; ++i) {
  135. sync.producer_add();
  136. sync.producer_wait();
  137. }
  138. ASSERT_EQ(N, nr_worker_call);
  139. avg_await = (timer.get_msecs() - worker_time) * 1e3 / N;
  140. }
  141. printf("with workload: avg_twait=%.3f [us]\n", avg_await);
  142. }
  143. }
  144. TEST(TestAsyncQueue, SynchronizerMultiProducer0) {
  145. test_scq_sync_multi_producer<0, 100>();
  146. }
  147. TEST(TestAsyncQueue, SynchronizerMultiProducer1) {
  148. test_scq_sync_multi_producer<100, 0>();
  149. }
  150. TEST(TestAsyncQueue, SynchronizerMultiProducer2) {
  151. test_scq_sync_multi_producer<0, 0>();
  152. }
  153. TEST(TestAsyncQueue, SynchronizerMultiProducer3) {
  154. test_scq_sync_multi_producer<100, 100>();
  155. }
  156. //! asan error report stack-use-after-scope when
  157. //! only have one physics cpu or bind to one core
  158. //! with taskset 01 xxx, move processed to global
  159. //! var as a workaround.
  160. namespace {
  161. std::atomic_size_t processed{0};
  162. }
  163. TEST(TestAsyncQueue, SynchronizerWaiterStarving) {
  164. SCQueueSynchronizer sync(0);
  165. processed = 0;
  166. auto worker = [&]() {
  167. while (sync.consumer_fetch(1)) {
  168. for (int volatile i = 0; i < 1000; ++i)
  169. ;
  170. sync.consumer_commit(1);
  171. ++processed;
  172. }
  173. };
  174. sync.start_worker(std::thread{worker});
  175. std::atomic_bool producer_run{true};
  176. std::atomic_size_t nr_added{0};
  177. auto producer = [&]() {
  178. while (producer_run) {
  179. size_t cur = ++nr_added;
  180. while (cur - processed > 1000)
  181. ;
  182. sync.producer_add();
  183. }
  184. };
  185. std::thread th_producer{producer};
  186. while (nr_added.load() < 3)
  187. ;
  188. for (int i = 0; i < 10; ++i) {
  189. sync.producer_wait(); // this should not block long
  190. }
  191. producer_run = false;
  192. th_producer.join();
  193. sync.producer_wait();
  194. }
  195. TEST(TestAsyncQueue, Correctness0) {
  196. class Adder final : public AsyncQueueSC<int, Adder> {
  197. int m_sum = 0;
  198. std::mt19937 m_rng;
  199. public:
  200. std::atomic_bool add_task_in_worker{true};
  201. std::atomic_size_t nr_task_added_in_worker{0};
  202. void process_one_task(int val) {
  203. if (add_task_in_worker && (m_rng() & 2)) {
  204. ++nr_task_added_in_worker;
  205. add_task(val);
  206. } else {
  207. m_sum += val;
  208. }
  209. }
  210. int sum() const { return m_sum; }
  211. };
  212. Adder adder;
  213. std::atomic_size_t nr_started{0};
  214. auto worker = [&](bool neg) {
  215. ++nr_started;
  216. while (nr_started != 2)
  217. ;
  218. for (int i = 0; i < 10000; ++i)
  219. adder.add_task(neg ? i : -i);
  220. adder.add_task(neg);
  221. };
  222. std::thread th0(worker, false), th1(worker, true);
  223. th0.join();
  224. th1.join();
  225. while (adder.nr_task_added_in_worker < 100)
  226. ;
  227. adder.add_task_in_worker = false;
  228. adder.wait_all_task_finish();
  229. ASSERT_EQ(1, adder.sum());
  230. }
  231. TEST(TestAsyncQueue, Correctness1) {
  232. class Adder final : public AsyncQueueSC<int, Adder> {
  233. int m_sum = 0;
  234. std::mt19937 m_rng;
  235. public:
  236. void process_one_task(int val) {
  237. if ((m_rng() & 2)) {
  238. add_task(val);
  239. } else {
  240. m_sum += val;
  241. }
  242. }
  243. int sum() const { return m_sum; }
  244. };
  245. Adder adder;
  246. std::atomic_size_t nr_started{0};
  247. auto worker = [&](bool neg) {
  248. ++nr_started;
  249. while (nr_started != 2)
  250. ;
  251. for (int i = 0; i < 10000; ++i)
  252. adder.add_task(neg ? i : -i);
  253. adder.add_task(neg);
  254. };
  255. std::thread th0(worker, false), th1(worker, true);
  256. th0.join();
  257. th1.join();
  258. adder.wait_task_queue_empty();
  259. ASSERT_EQ(1, adder.sum());
  260. }
  261. TEST(TestAsyncQueue, OutOfOrderCtor) {
  262. FuncExecutor fe;
  263. std::atomic_bool started{false};
  264. class Adder {
  265. int* m_sum = nullptr;
  266. bool m_slow_ctor = false;
  267. public:
  268. Adder(int* sum, bool slow_ctor) : m_sum{sum}, m_slow_ctor{slow_ctor} {}
  269. Adder(const Adder& src) {
  270. if (m_slow_ctor) {
  271. using namespace std::literals;
  272. std::this_thread::sleep_for(300us);
  273. }
  274. m_sum = src.m_sum;
  275. }
  276. void operator()(int i) { (*m_sum) += i; }
  277. };
  278. int sum = 0;
  279. std::atomic_size_t worker_ready{0};
  280. auto worker = [&sum, &worker_ready, &started, &fe](
  281. int n, std::mt19937::result_type seed) {
  282. Adder adder{&sum, !n};
  283. std::mt19937 rng{seed};
  284. ++worker_ready;
  285. while (!started.load())
  286. ;
  287. for (int i = 0; i < 500; ++i) {
  288. if (n) {
  289. using namespace std::literals;
  290. std::this_thread::sleep_for(300us);
  291. }
  292. fe.add_task(std::bind(adder, (n ^ (i & 1)) ? i : -i));
  293. }
  294. fe.add_task(std::bind(adder, n));
  295. };
  296. std::thread th0(worker, 0, next_rand_seed()), th1(worker, 1, next_rand_seed());
  297. while (worker_ready.load() != 2)
  298. ;
  299. started.store(true);
  300. th0.join();
  301. th1.join();
  302. fe.wait_all_task_finish();
  303. ASSERT_EQ(1, sum);
  304. }
  305. #if MGB_ENABLE_EXCEPTION
  306. TEST(TestAsyncQueue, Exception) {
  307. ExcMaker exc_maker;
  308. exc_maker.wait_all_task_finish();
  309. exc_maker.add_task(0);
  310. ASSERT_THROW(exc_maker.wait_all_task_finish(), std::runtime_error);
  311. exc_maker.wait_all_task_finish();
  312. }
  313. #endif
  314. TEST(TestAsyncQueue, Benchmark) {
  315. struct Big {
  316. uint8_t data[16];
  317. };
  318. int nr_call = 0;
  319. auto func = [&](int i) __attribute__((noinline)) {
  320. asm volatile("" : : "r"(i / 12345));
  321. ++nr_call;
  322. };
  323. Big big;
  324. for (int i = 0; i < 16; ++i)
  325. big.data[i] = i;
  326. auto big_func = [ b = big, &nr_call ](int i) __attribute__((noinline)) {
  327. asm volatile("" : : "r"(i / 12345), "r"(&b));
  328. ++nr_call;
  329. };
  330. auto call = [](const thin_function<void()>& f) __attribute__((noinline)) { f(); };
  331. FuncExecutor queue;
  332. constexpr int N = 100000;
  333. RealTimer timer;
  334. for (int i = 0; i < N; ++i) {
  335. auto g = [func, i]() { func(i); };
  336. queue.add_task(g);
  337. }
  338. auto t0_add = timer.get_secs() * 1e9 / N;
  339. queue.wait_all_task_finish();
  340. auto t0_all = timer.get_secs_reset() * 1e9 / N;
  341. for (int i = 0; i < N; ++i) {
  342. auto g = [func, i]() { func(i); };
  343. call(g);
  344. }
  345. auto t1 = timer.get_secs_reset() * 1e9 / N;
  346. for (int i = 0; i < N; ++i)
  347. func(i);
  348. auto t2 = timer.get_secs_reset() * 1e9 / N;
  349. for (int i = 0; i < N; ++i) {
  350. auto g = [big_func, i]() { big_func(i); };
  351. queue.add_task(g);
  352. }
  353. auto t3_add = timer.get_secs() * 1e9 / N;
  354. queue.wait_all_task_finish();
  355. auto t3_all = timer.get_secs_reset() * 1e9 / N;
  356. for (int i = 0; i < N; ++i) {
  357. auto g = [big_func, i]() { big_func(i); };
  358. call(g);
  359. }
  360. auto t4 = timer.get_secs_reset() * 1e9 / N;
  361. // these profiling message should always be seen even if compiled without
  362. // logging support
  363. printf("time_per_iter: queue=(add=%.3f,all=%.3f) call=%.3f empty=%.3f "
  364. "big_queue=(add=%.3f,all=%.3f) big_call=%.3f [ns]\n",
  365. t0_add, t0_all, t1, t2, t3_add, t3_all, t4);
  366. ASSERT_EQ(N * 5, nr_call);
  367. }
  368. TEST(TestThread, Spinlock) {
  369. Spinlock lock;
  370. int cnt = 0;
  371. auto worker = [&](int tot) {
  372. for (int i = 0; i < tot; ++i) {
  373. MGB_LOCK_GUARD(lock);
  374. ++cnt;
  375. }
  376. };
  377. std::vector<std::thread> th;
  378. for (int i = 0; i < 10; ++i) {
  379. th.emplace_back(worker, i + 1000);
  380. }
  381. for (auto&& i : th)
  382. i.join();
  383. ASSERT_EQ((1000 + 1009) * 5, cnt);
  384. }
  385. TEST(TestThread, RecursiveSpinlock) {
  386. RecursiveSpinlock lock;
  387. int cnt = 0;
  388. auto worker = [&](int tot) {
  389. for (int i = 0; i < tot; ++i) {
  390. MGB_LOCK_GUARD(lock);
  391. {
  392. MGB_LOCK_GUARD(lock);
  393. {
  394. MGB_LOCK_GUARD(lock);
  395. ++cnt;
  396. }
  397. }
  398. }
  399. };
  400. std::vector<std::thread> th;
  401. for (int i = 0; i < 10; ++i) {
  402. th.emplace_back(worker, i + 1000);
  403. }
  404. for (auto&& i : th)
  405. i.join();
  406. ASSERT_EQ((1000 + 1009) * 5, cnt);
  407. }
  408. #else
  409. #pragma message "tests are disabled as thread is not enabled."
  410. #endif // MGB_HAVE_THREAD
  411. // vim: syntax=cpp.doxygen foldmethod=marker foldmarker=f{{{,f}}}