|
|
|
@@ -55,6 +55,7 @@ private: |
|
|
|
uint32_t last_input_; |
|
|
|
uint32_t sleep_ms_ = 0; |
|
|
|
std::vector<uint32_t> input_; |
|
|
|
WaitPost wp; |
|
|
|
|
|
|
|
// This worker loop is to be called by a single thread. It will pop my_conn Connector |
|
|
|
// and populate output vector |
|
|
|
@@ -129,11 +130,13 @@ MindDataTestConnector::MindDataTestConnector() : tg_(new TaskGroup()) { |
|
|
|
for (int i = 1; i <= last_input_; i++) { |
|
|
|
input_.push_back(i); |
|
|
|
} |
|
|
|
wp.Register(tg_.get()); |
|
|
|
} |
|
|
|
|
|
|
|
Status MindDataTestConnector::Run_test_0() { |
|
|
|
Status rc; |
|
|
|
std::vector<uint32_t> output; |
|
|
|
wp.Clear(); |
|
|
|
auto my_conn = std::make_shared<Connector<uint32_t>>(1, // num of producers |
|
|
|
1, // num of consumers |
|
|
|
10); // capacity of each queue |
|
|
|
@@ -160,8 +163,11 @@ Status MindDataTestConnector::Run_test_0() { |
|
|
|
my_conn, |
|
|
|
&output)); |
|
|
|
RETURN_IF_NOT_OK(rc); |
|
|
|
|
|
|
|
tg_->join_all(); |
|
|
|
// Wait for the threads to finish. |
|
|
|
rc = wp.Wait(); |
|
|
|
EXPECT_TRUE(rc.IsOk()); |
|
|
|
tg_->interrupt_all(); |
|
|
|
tg_->join_all(Task::WaitFlag::kNonBlocking); |
|
|
|
my_conn.reset(); |
|
|
|
return ValidateOutput(output); |
|
|
|
} |
|
|
|
@@ -169,6 +175,7 @@ Status MindDataTestConnector::Run_test_0() { |
|
|
|
Status MindDataTestConnector::Run_test_1() { |
|
|
|
std::vector<uint32_t> output; |
|
|
|
Status rc; |
|
|
|
wp.Clear(); |
|
|
|
|
|
|
|
// number of threads in each layer |
|
|
|
int l1_threads = 15; |
|
|
|
@@ -223,8 +230,11 @@ Status MindDataTestConnector::Run_test_1() { |
|
|
|
conn2, // poping the data from conn2 |
|
|
|
&output)); |
|
|
|
RETURN_IF_NOT_OK(rc); |
|
|
|
|
|
|
|
tg_->join_all(); |
|
|
|
// Wait for the threads to finish. |
|
|
|
rc = wp.Wait(); |
|
|
|
EXPECT_TRUE(rc.IsOk()); |
|
|
|
tg_->interrupt_all(); |
|
|
|
tg_->join_all(Task::WaitFlag::kNonBlocking); |
|
|
|
conn1.reset(); |
|
|
|
conn2.reset(); |
|
|
|
|
|
|
|
@@ -250,11 +260,11 @@ Status MindDataTestConnector::SerialWorkerPull( |
|
|
|
GoToSleep(sleep_ms_); |
|
|
|
} |
|
|
|
|
|
|
|
// Raising interrupt after it processed the last_input_. |
|
|
|
// Signal master thread after it processed the last_input_. |
|
|
|
// This will trigger the MidWorkerJob threads to quit their worker loop. |
|
|
|
if (res == last_input_) { |
|
|
|
MS_LOG(INFO) << "All data is collected."; |
|
|
|
tg_->interrupt_all(); |
|
|
|
wp.Set(); |
|
|
|
break; |
|
|
|
} |
|
|
|
} |
|
|
|
|