|
- /**
- * Copyright 2019 Huawei Technologies Co., Ltd
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
- #include <fcntl.h>
- #include <iostream>
- #include <memory>
- #include <vector>
- #include <chrono>
- #include <thread>
-
-
- #include "common/common.h"
- #include "dataset/engine/connector.h"
- #include "dataset/util/task_manager.h"
- #include "utils/log_adapter.h"
-
- using namespace mindspore::dataset;
- using mindspore::MsLogLevel::INFO;
- using mindspore::ExceptionType::NoExceptionType;
- using mindspore::LogStream;
-
- class MindDataTestConnector : public UT::Common {
- public:
-
- MindDataTestConnector();
-
- // Test scenario: single producer, single consumer.
- // This means there is only one queue in the connector.
- Status Run_test_0();
-
- // Test scenario: multiple producers, multiple cosumers
- // A chain of three layer of thread groups connected by two Connectors between
- // two layer. You can set different num of threads on layer 1 and 2, and layer 3
- // that does the serialization to _ouput vector needs to be single thread.
- // A random sleep/delay can be introduced for each thread. See run().
- Status Run_test_1();
-
- void SetSleepMilliSec(uint32_t ms) { sleep_ms_ = ms; }
-
- private:
- std::unique_ptr<TaskGroup> tg_;
- uint32_t last_input_;
- uint32_t sleep_ms_ = 0;
- std::vector<uint32_t> input_;
-
- // This worker loop is to be called by a single thread. It will pop my_conn Connector
- // and populate output vector
- Status SerialWorkerPull(
- int tid,
- std::shared_ptr<Connector<uint32_t>> my_conn,
- std::vector<uint32_t> *output
- );
-
- // This worker loop read from input_ vector that have complete list of tasks/elements.
- // The assignment from the elements in input_ to each worker is ensured in RoundRobin,
- // i.e., tid-0 will pick input_[0], tid-1 will pick input_[1], so-on circularly.
- Status FirstWorkerPush(
- int tid,
- std::shared_ptr<Connector<uint32_t> > my_conn,
- int start_in,
- int offset);
-
- // This worker loop read from a Connector and put the result into another Connector.
- Status MidWorkerJob(
- int tid,
- std::shared_ptr<Connector<uint32_t> > from_conn,
- std::shared_ptr<Connector<uint32_t> > to_conn);
-
- Status ValidateOutput(const std::vector<uint32_t> &output);
-
- uint32_t GenRand(int max);
-
- // Put the current thread to sleep mode for MaxDue milliseconds.
- // (Imitating nondeterministic processing time)
- void GoToSleep(int max_dur);
- };
-
-
- // Test0 : single producer, single consumer which means there is only one queue in the connector
- TEST_F(MindDataTestConnector, Test0) {
- MS_LOG(INFO) << "MindDataTestConnector Test0: single producer, single consumer.";
- Status rc = this->Run_test_0();
- ASSERT_TRUE(rc.IsOk());
- rc = TaskManager::GetMasterThreadRc();
- ASSERT_TRUE(rc.IsOk());
- }
-
- // Test1: multiple producers, multiple consumers without random delay
- // A chain of three layer of thread groups connected by two Connectors between
- // two layer.
- TEST_F(MindDataTestConnector, Test1) {
- MS_LOG(INFO) << "MindDataTestConnector Test1.";
- Status rc = this->Run_test_1();
- ASSERT_TRUE(rc.IsOk());
- rc = TaskManager::GetMasterThreadRc();
- ASSERT_TRUE(rc.IsOk());
- }
-
- // Test1: multiple producers, multiple consumers with random delay after push/pop
- // A chain of three layer of thread groups connected by two Connectors between
- // two layer.
- TEST_F(MindDataTestConnector, Test2) {
- MS_LOG(INFO) << "MindDataTestConnector Test2.";
- this->SetSleepMilliSec(30);
- Status rc = this->Run_test_1();
- ASSERT_TRUE(rc.IsOk());
- rc = TaskManager::GetMasterThreadRc();
- ASSERT_TRUE(rc.IsOk());
- }
-
-
-
- // Implementation of MindDataTestConnector class and the helper functions.
- MindDataTestConnector::MindDataTestConnector() : tg_(new TaskGroup()) {
- last_input_ = 150;
- for (int i = 1; i <= last_input_; i++) {
- input_.push_back(i);
- }
- }
-
- Status MindDataTestConnector::Run_test_0() {
- Status rc;
- std::vector<uint32_t> output;
- auto my_conn = std::make_shared<Connector<uint32_t>>(1, // num of producers
- 1, // num of consumers
- 10); // capacity of each queue
- DS_ASSERT(my_conn != nullptr);
-
- // Spawn a thread to read input_ vector and put it in my_conn
- rc = tg_->CreateAsyncTask("Worker Push",
- std::bind(&MindDataTestConnector::FirstWorkerPush,
- this, // passing this instance
- 0, // id = 0 for this simple one to one case
- my_conn, // the connector
- 0, // start index to read from the input_ list
- 1)); // the offset to read the next index
- RETURN_IF_NOT_OK(rc);
-
- // Spawn another thread to read from my_conn and write to _output vector.
- rc = tg_->CreateAsyncTask("Worker Pull",
- std::bind(&MindDataTestConnector::SerialWorkerPull,
- this,
- 0,
- my_conn,
- &output));
- RETURN_IF_NOT_OK(rc);
-
- tg_->join_all();
- my_conn.reset();
- return ValidateOutput(output);
- }
-
- Status MindDataTestConnector::Run_test_1() {
- std::vector<uint32_t> output;
- Status rc;
-
- // number of threads in each layer
- int l1_threads = 15;
- int l2_threads = 20;
- int l3_threads = 1;
-
- // Capacity for the first and second connectors
- int conn1_qcap = 5;
- int conn2_qcap = 10;
-
- auto conn1 = std::make_shared<Connector<uint32_t>>(l1_threads, // num of producers
- l2_threads, // num of consumers
- conn1_qcap); // the cap of each queue
-
- auto conn2 = std::make_shared<Connector<uint32_t>>(l2_threads,
- l3_threads,
- conn2_qcap);
-
- // Instantiating the threads in the first layer
- for (int i = 0; i < l1_threads; i++) {
- rc = tg_->CreateAsyncTask("First Worker Push",
- std::bind(&MindDataTestConnector::FirstWorkerPush,
- this, // passing this instance
- i, // thread id in this group of thread
- conn1, // the connector
- i, // start index to read from the input_ list
- l1_threads)); // the offset to read the next index
- RETURN_IF_NOT_OK(rc);
- }
-
- // Instantiating the threads in the 2nd layer
- for (int i = 0; i < l2_threads; i++) {
- rc = tg_->CreateAsyncTask("Mid Worker Job",
- std::bind(&MindDataTestConnector::MidWorkerJob,
- this, // passing this instance
- i, // thread id in this group of thread
- conn1, // the 1st connector
- conn2)); // the 2nd connector
- RETURN_IF_NOT_OK(rc);
- }
-
- // Last layer doing serialization to one queue to check if the order is preserved
- rc = tg_->CreateAsyncTask("Worker Pull",
- std::bind(&MindDataTestConnector::SerialWorkerPull,
- this,
- 0, // thread id = 0, since it's the only one
- conn2, // poping the data from conn2
- &output));
- RETURN_IF_NOT_OK(rc);
-
- tg_->join_all();
- conn1.reset();
- conn2.reset();
-
- return ValidateOutput(output);
- }
-
- Status MindDataTestConnector::SerialWorkerPull(
- int tid,
- std::shared_ptr<Connector<uint32_t>> my_conn,
- std::vector<uint32_t> *output
- ) {
- Status rc;
- TaskManager::FindMe()->Post();
- while (1) {
- uint32_t res;
- rc = my_conn->Pop(tid, &res);
- RETURN_IF_NOT_OK(rc);
-
- output->push_back(res);
-
- // Emulate different processing time for each thread
- if (sleep_ms_ != 0) {
- GoToSleep(sleep_ms_);
- }
-
- // Raising interrupt after it processed the last_input_.
- // This will trigger the MidWorkerJob threads to quit their worker loop.
- if (res == last_input_) {
- MS_LOG(INFO) << "All data is collected.";
- tg_->interrupt_all();
- break;
- }
- }
- return Status::OK();
- }
-
- Status MindDataTestConnector::FirstWorkerPush(
- int tid,
- std::shared_ptr<Connector<uint32_t> > my_conn,
- int start_in,
- int offset) {
- TaskManager::FindMe()->Post();
- DS_ASSERT(my_conn != nullptr);
- Status rc;
- for (int i = start_in; i < input_.size(); i += offset) {
- rc = my_conn->Push(tid, input_[i]);
-
- // Emulate different processing time for each thread
- if (sleep_ms_ != 0)
- GoToSleep(sleep_ms_);
- }
- return Status::OK();
- }
-
- // This worker loop read from a Connector and put the result into another Connector.
- Status MindDataTestConnector::MidWorkerJob(
- int tid,
- std::shared_ptr<Connector<uint32_t> > from_conn,
- std::shared_ptr<Connector<uint32_t> > to_conn) {
- DS_ASSERT((from_conn != nullptr) && (to_conn != nullptr));
- Status rc;
- TaskManager::FindMe()->Post();
- while (1) {
- uint32_t el;
- rc = from_conn->Pop(tid, &el);
- RETURN_IF_NOT_OK(rc);
-
- // Emulate different processing time for each thread
- if (sleep_ms_ != 0) {
- GoToSleep(sleep_ms_);
- }
- rc = to_conn->Push(tid, el);
- RETURN_IF_NOT_OK(rc);
- }
- return Status::OK();
- }
-
- Status MindDataTestConnector::ValidateOutput(const std::vector<uint32_t> &output) {
- int prev = 0;
- for (auto el : output) {
- if (prev >= el) {
- return Status(StatusCode::kUnexpectedError, "Output vector are not in-order.");
- }
- prev = el;
- }
- return Status::OK();
- }
-
- uint32_t MindDataTestConnector::GenRand(int max) {
- uint32_t r_int = 0;
- if (max == 0) {
- return r_int;
- }
-
- // open urandom not random
- int fd = open("/dev/urandom", O_RDONLY);
- if (fd > 0) {
- if (read(fd, &r_int, sizeof(uint32_t)) != sizeof(uint32_t)) {
- r_int = max / 2;
- }
- }
- (void)close(fd); // close it!
-
- return r_int % max;
- }
-
- // Put the current thread to sleep mode for MaxDue milliseconds.
- // (Imitating nondeterministic processing time)
- void MindDataTestConnector::GoToSleep(int max_dur) {
- uint32_t duration = GenRand(max_dur);
- std::this_thread::sleep_for(std::chrono::milliseconds(duration));
- }
|