From 3b80c10b1ed03247218ffc12aaed536b70f9eec0 Mon Sep 17 00:00:00 2001 From: Jesse Lee Date: Wed, 27 May 2020 14:22:37 -0400 Subject: [PATCH] Fix some timing hole in these two testcases --- tests/ut/cpp/dataset/connector_test.cc | 22 ++++++++++++++++------ tests/ut/cpp/dataset/task_manager_test.cc | 5 +++-- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/tests/ut/cpp/dataset/connector_test.cc b/tests/ut/cpp/dataset/connector_test.cc index eb33cc3e8d..6d9e538ab4 100644 --- a/tests/ut/cpp/dataset/connector_test.cc +++ b/tests/ut/cpp/dataset/connector_test.cc @@ -55,6 +55,7 @@ private: uint32_t last_input_; uint32_t sleep_ms_ = 0; std::vector input_; + WaitPost wp; // This worker loop is to be called by a single thread. It will pop my_conn Connector // and populate output vector @@ -129,11 +130,13 @@ MindDataTestConnector::MindDataTestConnector() : tg_(new TaskGroup()) { for (int i = 1; i <= last_input_; i++) { input_.push_back(i); } + wp.Register(tg_.get()); } Status MindDataTestConnector::Run_test_0() { Status rc; std::vector output; + wp.Clear(); auto my_conn = std::make_shared>(1, // num of producers 1, // num of consumers 10); // capacity of each queue @@ -160,8 +163,11 @@ Status MindDataTestConnector::Run_test_0() { my_conn, &output)); RETURN_IF_NOT_OK(rc); - - tg_->join_all(); + // Wait for the threads to finish. + rc = wp.Wait(); + EXPECT_TRUE(rc.IsOk()); + tg_->interrupt_all(); + tg_->join_all(Task::WaitFlag::kNonBlocking); my_conn.reset(); return ValidateOutput(output); } @@ -169,6 +175,7 @@ Status MindDataTestConnector::Run_test_0() { Status MindDataTestConnector::Run_test_1() { std::vector output; Status rc; + wp.Clear(); // number of threads in each layer int l1_threads = 15; @@ -223,8 +230,11 @@ Status MindDataTestConnector::Run_test_1() { conn2, // poping the data from conn2 &output)); RETURN_IF_NOT_OK(rc); - - tg_->join_all(); + // Wait for the threads to finish. + rc = wp.Wait(); + EXPECT_TRUE(rc.IsOk()); + tg_->interrupt_all(); + tg_->join_all(Task::WaitFlag::kNonBlocking); conn1.reset(); conn2.reset(); @@ -250,11 +260,11 @@ Status MindDataTestConnector::SerialWorkerPull( GoToSleep(sleep_ms_); } - // Raising interrupt after it processed the last_input_. + // Signal master thread after it processed the last_input_. // This will trigger the MidWorkerJob threads to quit their worker loop. if (res == last_input_) { MS_LOG(INFO) << "All data is collected."; - tg_->interrupt_all(); + wp.Set(); break; } } diff --git a/tests/ut/cpp/dataset/task_manager_test.cc b/tests/ut/cpp/dataset/task_manager_test.cc index a28b10a1fe..3d34ec9ec5 100644 --- a/tests/ut/cpp/dataset/task_manager_test.cc +++ b/tests/ut/cpp/dataset/task_manager_test.cc @@ -38,12 +38,13 @@ TEST_F(MindDataTestTaskManager, Test1) { ASSERT_TRUE(vg_rc.IsOk() || vg_rc.IsOutofMemory()); ASSERT_TRUE(vg.join_all().IsOk()); ASSERT_TRUE(vg.GetTaskErrorIfAny().IsOutofMemory()); - // Test the error is passed back to the master thread. + // Test the error is passed back to the master thread if vg_rc above is OK. + // If vg_rc is kOutOfMemory, the group error is already passed back. // Some compiler may choose to run the next line in parallel with the above 3 lines // and this will cause some mismatch once a while. // To block this racing condition, we need to create a dependency that the next line // depends on previous lines. - if (vg.GetTaskErrorIfAny().IsError()) { + if (vg.GetTaskErrorIfAny().IsError() && vg_rc.IsOk()) { Status rc = TaskManager::GetMasterThreadRc(); ASSERT_TRUE(rc.IsOutofMemory()); }