From 83cb87bcd17fed63249bd41b0f6ff93b6d6c7f85 Mon Sep 17 00:00:00 2001 From: yanghaitao1 Date: Mon, 16 Nov 2020 14:24:15 +0800 Subject: [PATCH] cpu profiling add operator profiling part add op_id param when create task repair bugs in op cpu sampling fix wrong order of write and parse cpu_info add process cpu profiling part add multiprocessing part(part 1) adapter cpu_sampling c layer fix little error in sampling descrease cirle complex in voc descrease cirle complex in device_que --- .../dataset/api/python/pybind_conversion.cc | 20 +- .../dataset/api/python/pybind_conversion.h | 6 + .../dataset/engine/dataset_iterator.cc | 4 +- .../dataset/engine/datasetops/batch_op.cc | 2 +- .../build_sentence_piece_vocab_op.cc | 4 +- .../engine/datasetops/build_vocab_op.cc | 5 +- .../engine/datasetops/cache_lookup_op.cc | 2 +- .../engine/datasetops/cache_merge_op.cc | 7 +- .../engine/datasetops/device_queue_op.cc | 57 +- .../engine/datasetops/device_queue_op.h | 6 + .../dataset/engine/datasetops/filter_op.cc | 2 +- .../engine/datasetops/source/album_op.cc | 117 ++-- .../engine/datasetops/source/album_op.h | 34 +- .../engine/datasetops/source/celeba_op.cc | 6 +- .../engine/datasetops/source/cifar_op.cc | 5 +- .../engine/datasetops/source/clue_op.cc | 5 +- .../engine/datasetops/source/coco_op.cc | 3 +- .../engine/datasetops/source/csv_op.cc | 5 +- .../datasetops/source/image_folder_op.cc | 7 +- .../engine/datasetops/source/manifest_op.cc | 2 +- .../engine/datasetops/source/mindrecord_op.cc | 2 +- .../engine/datasetops/source/mnist_op.cc | 3 +- .../datasetops/source/random_data_op.cc | 2 +- .../engine/datasetops/source/text_file_op.cc | 4 +- .../engine/datasetops/source/tf_reader_op.cc | 4 +- .../engine/datasetops/source/voc_op.cc | 4 +- .../minddata/dataset/engine/execution_tree.cc | 7 +- .../minddata/dataset/engine/execution_tree.h | 5 +- .../dataset/engine/perf/CMakeLists.txt | 1 + .../dataset/engine/perf/cpu_sampling.cc | 567 ++++++++++++++++++ .../dataset/engine/perf/cpu_sampling.h | 201 +++++++ .../engine/perf/dataset_iterator_tracing.cc | 4 +- .../engine/perf/dataset_iterator_tracing.h | 3 +- .../engine/perf/device_queue_tracing.cc | 9 +- .../engine/perf/device_queue_tracing.h | 3 +- .../minddata/dataset/engine/perf/profiling.cc | 6 +- .../minddata/dataset/engine/perf/profiling.h | 3 +- .../minddata/dataset/engine/tree_adapter.cc | 4 +- mindspore/ccsrc/minddata/dataset/util/task.cc | 5 +- mindspore/ccsrc/minddata/dataset/util/task.h | 9 +- .../minddata/dataset/util/task_manager.cc | 16 +- .../minddata/dataset/util/task_manager.h | 10 +- mindspore/dataset/engine/datasets.py | 66 +- 43 files changed, 1076 insertions(+), 161 deletions(-) create mode 100644 mindspore/ccsrc/minddata/dataset/engine/perf/cpu_sampling.cc create mode 100644 mindspore/ccsrc/minddata/dataset/engine/perf/cpu_sampling.h diff --git a/mindspore/ccsrc/minddata/dataset/api/python/pybind_conversion.cc b/mindspore/ccsrc/minddata/dataset/api/python/pybind_conversion.cc index 835d20c38f..82299edba2 100644 --- a/mindspore/ccsrc/minddata/dataset/api/python/pybind_conversion.cc +++ b/mindspore/ccsrc/minddata/dataset/api/python/pybind_conversion.cc @@ -13,7 +13,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - #include "minddata/dataset/api/python/pybind_conversion.h" namespace mindspore { @@ -63,6 +62,25 @@ std::vector toStringVector(const py::list list) { return vector; } +std::vector toIntVector(const py::list input_list) { + std::vector vector; + if (!input_list.empty()) { + std::transform(input_list.begin(), input_list.end(), std::back_inserter(vector), + [&](const py::handle &handle) { return static_cast(toInt(handle)); }); + } + return vector; +} + +std::unordered_map> toIntMap(const py::dict input_dict) { + std::unordered_map> map; + if (!input_dict.empty()) { + for (auto p : input_dict) { + (void)map.emplace(toInt(p.first), toIntVector(py::reinterpret_borrow(p.second))); + } + } + return map; +} + std::pair toIntPair(const py::tuple tuple) { std::pair pair; if (!tuple.empty()) { diff --git a/mindspore/ccsrc/minddata/dataset/api/python/pybind_conversion.h b/mindspore/ccsrc/minddata/dataset/api/python/pybind_conversion.h index 4f9a95d2d1..d22a861f5d 100644 --- a/mindspore/ccsrc/minddata/dataset/api/python/pybind_conversion.h +++ b/mindspore/ccsrc/minddata/dataset/api/python/pybind_conversion.h @@ -17,11 +17,13 @@ #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_API_PYTHON_PYBIND_CONVERSION_H_ #define MINDSPORE_CCSRC_MINDDATA_DATASET_API_PYTHON_PYBIND_CONVERSION_H_ +#include #include #include #include #include #include +#include #include #include "pybind11/pybind11.h" #include "pybind11/stl.h" @@ -53,6 +55,10 @@ std::map toStringMap(const py::dict dict); std::vector toStringVector(const py::list list); +std::vector toIntVector(const py::list input_list); + +std::unordered_map> toIntMap(const py::dict input_dict); + std::pair toIntPair(const py::tuple tuple); std::vector> toPairVector(const py::list list); diff --git a/mindspore/ccsrc/minddata/dataset/engine/dataset_iterator.cc b/mindspore/ccsrc/minddata/dataset/engine/dataset_iterator.cc index 855e891242..714b9e5647 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/dataset_iterator.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/dataset_iterator.cc @@ -23,6 +23,7 @@ #include "minddata/dataset/engine/execution_tree.h" #include "minddata/dataset/util/status.h" #include "minddata/dataset/engine/datasetops/dataset_op.h" +#include "minddata/dataset/engine/perf/profiling.h" namespace mindspore { namespace dataset { @@ -185,7 +186,8 @@ Status DatasetIterator::FetchNextTensorRow(TensorRow *out_row) { RETURN_IF_NOT_OK(curr_buffer_->PopRow(out_row)); if (tracing_ != nullptr) { cur_batch_num_++; - tracing_->Record(CONNECTOR_DEPTH, cur_connector_capacity_, cur_batch_num_, cur_connector_size_); + tracing_->Record(CONNECTOR_DEPTH, cur_connector_capacity_, cur_batch_num_, cur_connector_size_, + ProfilingTime::GetCurMilliSecond()); } return Status::OK(); } diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.cc index 502c0707ad..4c64c47057 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.cc @@ -265,7 +265,7 @@ Status BatchOp::LaunchThreadsAndInitOp() { } RETURN_IF_NOT_OK(worker_queues_.Register(tree_->AllTasks())); RETURN_IF_NOT_OK( - tree_->LaunchWorkers(num_workers_, std::bind(&BatchOp::WorkerEntry, this, std::placeholders::_1), Name())); + tree_->LaunchWorkers(num_workers_, std::bind(&BatchOp::WorkerEntry, this, std::placeholders::_1), Name(), id())); return Status::OK(); } diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/build_sentence_piece_vocab_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/build_sentence_piece_vocab_op.cc index 540e8db408..a633784fc5 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/build_sentence_piece_vocab_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/build_sentence_piece_vocab_op.cc @@ -45,8 +45,8 @@ Status BuildSentencePieceVocabOp::operator()() { return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "Pipeline init failed, Execution tree not set."); } RETURN_IF_NOT_OK(sentence_queue_->Register(tree_->AllTasks())); - RETURN_IF_NOT_OK( - tree_->AllTasks()->CreateAsyncTask("sentenceTask", std::bind(&BuildSentencePieceVocabOp::SentenceThread, this))); + RETURN_IF_NOT_OK(tree_->AllTasks()->CreateAsyncTask( + "sentenceTask", std::bind(&BuildSentencePieceVocabOp::SentenceThread, this), nullptr, id())); TaskManager::FindMe()->Post(); child_iterator_ = std::make_unique(this, 0, 0); TensorRow new_row; diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/build_vocab_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/build_vocab_op.cc index f5b18c8f7c..c2145e0e42 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/build_vocab_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/build_vocab_op.cc @@ -86,8 +86,9 @@ Status BuildVocabOp::operator()() { RETURN_IF_NOT_OK(collector_queue_->Register(tree_->AllTasks())); // launch worker threads and collector thread RETURN_IF_NOT_OK( - tree_->LaunchWorkers(num_workers_, std::bind(&BuildVocabOp::WorkerEntry, this, std::placeholders::_1))); - RETURN_IF_NOT_OK(tree_->AllTasks()->CreateAsyncTask("collector", std::bind(&BuildVocabOp::CollectorThread, this))); + tree_->LaunchWorkers(num_workers_, std::bind(&BuildVocabOp::WorkerEntry, this, std::placeholders::_1), "", id())); + RETURN_IF_NOT_OK( + tree_->AllTasks()->CreateAsyncTask("collector", std::bind(&BuildVocabOp::CollectorThread, this), nullptr, id())); TaskManager::FindMe()->Post(); child_iterator_ = std::make_unique(this, 0, 0); TensorRow new_row; diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/cache_lookup_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/cache_lookup_op.cc index 658a590267..8549caf82c 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/cache_lookup_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/cache_lookup_op.cc @@ -65,7 +65,7 @@ Status CacheLookupOp::operator()() { RETURN_IF_NOT_OK(RegisterResources()); // Kick off the workers RETURN_IF_NOT_OK( - tree_->LaunchWorkers(num_workers_, std::bind(&CacheLookupOp::WorkerEntry, this, std::placeholders::_1))); + tree_->LaunchWorkers(num_workers_, std::bind(&CacheLookupOp::WorkerEntry, this, std::placeholders::_1), "", id())); // required task group sync after launching workers TaskManager::FindMe()->Post(); // We have to wait until the leaf op has handshake with us. diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/cache_merge_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/cache_merge_op.cc index a9d1dcec41..6037897fad 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/cache_merge_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/cache_merge_op.cc @@ -60,13 +60,14 @@ Status CacheMergeOp::operator()() { io_que_ = std::make_unique>(queue_sz); RETURN_IF_NOT_OK(io_que_->Register(tree_->AllTasks())); RETURN_IF_NOT_OK(tree_->LaunchWorkers( - num_workers_, std::bind(&CacheMergeOp::WorkerEntry, this, std::placeholders::_1), Name() + "::WorkerEntry")); + num_workers_, std::bind(&CacheMergeOp::WorkerEntry, this, std::placeholders::_1), Name() + "::WorkerEntry", id())); RETURN_IF_NOT_OK(tree_->LaunchWorkers(num_workers_, std::bind(&CacheMergeOp::CacheMissWorkerEntry, this, std::placeholders::_1), - Name() + "::CacheMissWorkerEntry")); + Name() + "::CacheMissWorkerEntry", id())); // One dedicated thread to move TensorRow from the pool to the cache server for (auto i = 0; i < num_cleaners_; ++i) { - RETURN_IF_NOT_OK(tree_->AllTasks()->CreateAsyncTask("Cleaner", std::bind(&CacheMergeOp::Cleaner, this))); + RETURN_IF_NOT_OK( + tree_->AllTasks()->CreateAsyncTask("Cleaner", std::bind(&CacheMergeOp::Cleaner, this), nullptr, id())); } TaskManager::FindMe()->Post(); return Status::OK(); diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc index 3b89a64e55..e1876e232d 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc @@ -26,7 +26,6 @@ #include "minddata/dataset/engine/dataset_iterator.h" #include "minddata/dataset/engine/datasetops/epoch_ctrl_op.h" #include "minddata/dataset/engine/opt/pass.h" -#include "minddata/dataset/engine/perf/device_queue_tracing.h" #include "minddata/dataset/engine/perf/profiling.h" #include "minddata/dataset/util/status.h" #include "minddata/dataset/util/task_manager.h" @@ -134,8 +133,8 @@ Status DeviceQueueOp::operator()() { Status DeviceQueueOp::SendDataToAscend() { MS_LOG(INFO) << "Device queue, sending data to Ascend."; int64_t send_batch = 0; - double batch_start_time, end_time; - int32_t batch_cost, tdt_cost; + uint64_t batch_start_time, end_time; + int32_t tdt_cost; int32_t connector_size = 0; int32_t connector_capacity; bool is_break_loop = false; @@ -178,20 +177,8 @@ Status DeviceQueueOp::SendDataToAscend() { [](const std::shared_ptr &ts) { return std::make_pair(ts->type(), ts->shape()); }); RETURN_IF_NOT_OK(data_info_queue_ptr_->Add(data_info)); } - - if (isProfilingEnable) { - end_time = ProfilingTime::GetCurMilliSecond(); - // record push tdt time - profiling_node->Record(TIME, TDT_PUSH_TIME, send_batch + 1, tdt_cost); - batch_cost = (int32_t)(end_time - batch_start_time); - // record batch time - profiling_node->Record(TIME, BATCH_TIME, send_batch + 1, batch_cost); - // record pipeline time - profiling_node->Record(TIME, PIPELINE_TIME, send_batch + 1, batch_cost - tdt_cost); - batch_start_time = end_time; - // record connector depth - profiling_node->Record(CONNECTOR_DEPTH, connector_capacity, send_batch + 1, connector_size); - } + ProfilingRecorder(isProfilingEnable, profiling_node, send_batch, tdt_cost, &batch_start_time, &end_time, + connector_capacity, connector_size); send_batch++; if (total_batch_ > 0 && send_batch >= total_batch_) { @@ -273,9 +260,9 @@ Status DeviceQueueOp::LaunchParallelCopyThread() { receive_queues_.Init(num_workers_, queue_capacity_); RETURN_IF_NOT_OK(receive_queues_.Register(tree_->AllTasks())); RETURN_IF_NOT_OK( - tree_->LaunchWorkers(num_workers_, std::bind(&DeviceQueueOp::WorkerEntry, this, std::placeholders::_1))); - RETURN_IF_NOT_OK( - tree_->AllTasks()->CreateAsyncTask("Push data to GPU queue", std::bind(&DeviceQueueOp::PushDataToGPU, this))); + tree_->LaunchWorkers(num_workers_, std::bind(&DeviceQueueOp::WorkerEntry, this, std::placeholders::_1), "", id())); + RETURN_IF_NOT_OK(tree_->AllTasks()->CreateAsyncTask("Push data to GPU queue", + std::bind(&DeviceQueueOp::PushDataToGPU, this), nullptr, id())); return Status::OK(); } @@ -285,8 +272,8 @@ Status DeviceQueueOp::PushDataToGPU() { // and will overload in distribute scenario, so don't remove this line cudaSetDevice(rank_id_); TaskManager::FindMe()->Post(); - double batch_start_time = 0.0; - double end_time = 0.0; + uint64_t batch_start_time = 0; + uint64_t end_time = 0; int32_t batch_cost = 0; int32_t push_cost = 0; int32_t connector_size = 0; @@ -345,15 +332,15 @@ Status DeviceQueueOp::PushDataToGPU() { if (isProfilingEnable) { end_time = ProfilingTime::GetCurMilliSecond(); // record push data time - profiling_node->Record(TIME, TDT_PUSH_TIME, send_batch, push_cost); + profiling_node->Record(TIME, TDT_PUSH_TIME, send_batch, push_cost, end_time); batch_cost = (int32_t)(end_time - batch_start_time); // record batch time - profiling_node->Record(TIME, BATCH_TIME, send_batch, batch_cost); + profiling_node->Record(TIME, BATCH_TIME, send_batch, batch_cost, end_time); // record pipeline time - profiling_node->Record(TIME, PIPELINE_TIME, send_batch, batch_cost - push_cost); + profiling_node->Record(TIME, PIPELINE_TIME, send_batch, batch_cost - push_cost, end_time); batch_start_time = end_time; // record connector depth - profiling_node->Record(CONNECTOR_DEPTH, connector_capacity, send_batch, connector_size); + profiling_node->Record(CONNECTOR_DEPTH, connector_capacity, send_batch, connector_size, end_time); connector_size = gpu_item_connector_->size(); connector_capacity = gpu_item_connector_->capacity(); } @@ -508,5 +495,23 @@ Status DeviceQueueOp::Accept(NodePass *p, bool *const modified) { return p->RunOnNode(shared_from_base(), modified); } +void DeviceQueueOp::ProfilingRecorder(bool isProfilingEnable, std::shared_ptr profiling_node, + int64_t send_batch, int32_t tdt_cost, uint64_t *batch_start_time, + uint64_t *end_time, int32_t connector_capacity, int32_t connector_size) { + // Record the pipeline profiling info + if (isProfilingEnable) { + *end_time = ProfilingTime::GetCurMilliSecond(); + // record push tdt time + profiling_node->Record(TIME, TDT_PUSH_TIME, send_batch + 1, tdt_cost, *end_time); + int32_t batch_cost = (int32_t)(*end_time - *batch_start_time); + // record batch time + profiling_node->Record(TIME, BATCH_TIME, send_batch + 1, batch_cost, *end_time); + // record pipeline time + profiling_node->Record(TIME, PIPELINE_TIME, send_batch + 1, batch_cost - tdt_cost, *end_time); + *batch_start_time = *end_time; + // record connector depth + profiling_node->Record(CONNECTOR_DEPTH, connector_capacity, send_batch + 1, connector_size, *end_time); + } +} } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.h index 1905b62711..16b45b5511 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.h @@ -23,6 +23,7 @@ #include "minddata/dataset/engine/datasetops/pipeline_op.h" #include "minddata/dataset/engine/datasetops/repeat_op.h" +#include "minddata/dataset/engine/perf/device_queue_tracing.h" #include "minddata/dataset/util/status.h" #ifdef ENABLE_TDTQUE @@ -173,6 +174,11 @@ class DeviceQueueOp : public PipelineOp { // @return - Status of the node visit. Status Accept(NodePass *p, bool *const modified) override; + // Record the pipeline profiling info + void ProfilingRecorder(bool isProfilingEnable, std::shared_ptr profiling_node, int64_t send_batch, + int32_t tdt_cost, uint64_t *batch_start_time, uint64_t *end_time, int32_t connector_capacity, + int32_t connector_size); + // Op name getter // @return Name of the current Op std::string Name() const override { return kDeviceQueueOp; } diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/filter_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/filter_op.cc index 3bd7d13b42..487501ba73 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/filter_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/filter_op.cc @@ -71,7 +71,7 @@ Status FilterOp::operator()() { filter_queues_.Init(num_workers_, oc_queue_size_); RETURN_IF_NOT_OK(filter_queues_.Register(tree_->AllTasks())); Status rc = - tree_->LaunchWorkers(num_workers_, std::bind(&FilterOp::WorkerEntry, this, std::placeholders::_1), Name()); + tree_->LaunchWorkers(num_workers_, std::bind(&FilterOp::WorkerEntry, this, std::placeholders::_1), Name(), id()); // Synchronize with TaskManager. TaskManager::FindMe()->Post(); RETURN_IF_NOT_OK(rc); diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.cc index f6c828db7c..97e7104a98 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.cc @@ -464,65 +464,7 @@ Status AlbumOp::LoadTensorRow(row_id_type row_id, const std::string &file, Tenso // loop over each column descriptor, this can optimized by switch cases for (int32_t i = 0; i < columns; i++) { - // special case to handle - if (data_schema_->column(i).name() == "id") { - // id is internal, special case to load from file - RETURN_IF_NOT_OK(LoadIDTensor(file, i, row)); - continue; - } - // find if key does not exist, insert placeholder nullptr if not found - if (js.find(data_schema_->column(i).name()) == js.end()) { - // iterator not found, push nullptr as placeholder - MS_LOG(INFO) << "Pushing empty tensor for column: " << data_schema_->column(i).name() << "."; - RETURN_IF_NOT_OK(LoadEmptyTensor(i, row)); - continue; - } - nlohmann::json column_value = js.at(data_schema_->column(i).name()); - MS_LOG(INFO) << "This column is: " << data_schema_->column(i).name() << "."; - bool is_array = column_value.is_array(); - // load single string - if (column_value.is_string() && data_schema_->column(i).type() == DataType::DE_STRING) { - RETURN_IF_NOT_OK(LoadStringTensor(column_value, i, row)); - continue; - } - // load string array - if (is_array && data_schema_->column(i).type() == DataType::DE_STRING) { - RETURN_IF_NOT_OK(LoadStringArrayTensor(column_value, i, row)); - continue; - } - // load image file - if (column_value.is_string() && data_schema_->column(i).type() != DataType::DE_STRING) { - std::string image_file_path = column_value; - RETURN_IF_NOT_OK(LoadImageTensor(image_file_path, i, row)); - continue; - } - // load float value - if (!is_array && (data_schema_->column(i).type() == DataType::DE_FLOAT32 || - data_schema_->column(i).type() == DataType::DE_FLOAT64)) { - RETURN_IF_NOT_OK(LoadFloatTensor(column_value, i, row)); - continue; - } - // load float array - if (is_array && (data_schema_->column(i).type() == DataType::DE_FLOAT32 || - data_schema_->column(i).type() == DataType::DE_FLOAT64)) { - RETURN_IF_NOT_OK(LoadFloatArrayTensor(column_value, i, row)); - continue; - } - // int value - if (!is_array && (data_schema_->column(i).type() == DataType::DE_INT64 || - data_schema_->column(i).type() == DataType::DE_INT32)) { - RETURN_IF_NOT_OK(LoadIntTensor(column_value, i, row)); - continue; - } - // int array - if (is_array && (data_schema_->column(i).type() == DataType::DE_INT64 || - data_schema_->column(i).type() == DataType::DE_INT32)) { - RETURN_IF_NOT_OK(LoadIntArrayTensor(column_value, i, row)); - continue; - } else { - MS_LOG(WARNING) << "Value type for column: " << data_schema_->column(i).name() << " is not supported."; - continue; - } + RETURN_IF_NOT_OK(loadColumnData(file, i, js, row)); } } catch (const std::exception &err) { file_handle.close(); @@ -535,6 +477,60 @@ Status AlbumOp::LoadTensorRow(row_id_type row_id, const std::string &file, Tenso return Status::OK(); } +Status AlbumOp::loadColumnData(const std::string &file, int32_t index, nlohmann::json js, TensorRow *row) { + int32_t i = index; + // special case to handle + if (data_schema_->column(i).name() == "id") { + // id is internal, special case to load from file + return LoadIDTensor(file, i, row); + } + // find if key does not exist, insert placeholder nullptr if not found + if (js.find(data_schema_->column(i).name()) == js.end()) { + // iterator not found, push nullptr as placeholder + MS_LOG(INFO) << "Pushing empty tensor for column: " << data_schema_->column(i).name() << "."; + return LoadEmptyTensor(i, row); + } + nlohmann::json column_value = js.at(data_schema_->column(i).name()); + MS_LOG(INFO) << "This column is: " << data_schema_->column(i).name() << "."; + bool is_array = column_value.is_array(); + // load single string + if (column_value.is_string() && data_schema_->column(i).type() == DataType::DE_STRING) { + return LoadStringTensor(column_value, i, row); + } + // load string array + if (is_array && data_schema_->column(i).type() == DataType::DE_STRING) { + return LoadStringArrayTensor(column_value, i, row); + } + // load image file + if (column_value.is_string() && data_schema_->column(i).type() != DataType::DE_STRING) { + std::string image_file_path = column_value; + return LoadImageTensor(image_file_path, i, row); + } + // load float value + bool judge_float = (data_schema_->column(i).type() == DataType::DE_FLOAT32) || + (data_schema_->column(i).type() == DataType::DE_FLOAT64); + if (!is_array && judge_float) { + return LoadFloatTensor(column_value, i, row); + } + // load float array + if (is_array && judge_float) { + return LoadFloatArrayTensor(column_value, i, row); + } + // int value + if (!is_array && + (data_schema_->column(i).type() == DataType::DE_INT64 || data_schema_->column(i).type() == DataType::DE_INT32)) { + return LoadIntTensor(column_value, i, row); + } + // int array + if (is_array && + (data_schema_->column(i).type() == DataType::DE_INT64 || data_schema_->column(i).type() == DataType::DE_INT32)) { + return LoadIntArrayTensor(column_value, i, row); + } else { + MS_LOG(WARNING) << "Value type for column: " << data_schema_->column(i).name() << " is not supported."; + return Status::OK(); + } +} + // Looping over LoadTensorRow to make 1 DataBuffer. 1 function call produces 1 buffer Status AlbumOp::LoadBuffer(const std::vector &keys, std::unique_ptr *db) { std::unique_ptr deq = std::make_unique(); @@ -587,7 +583,8 @@ Status AlbumOp::LaunchThreadsAndInitOp() { RETURN_IF_NOT_OK(io_block_queues_.Register(tree_->AllTasks())); RETURN_IF_NOT_OK(wait_for_workers_post_.Register(tree_->AllTasks())); // launch main workers that load DataBuffers by reading all images - RETURN_IF_NOT_OK(tree_->LaunchWorkers(num_workers_, std::bind(&AlbumOp::WorkerEntry, this, std::placeholders::_1))); + RETURN_IF_NOT_OK( + tree_->LaunchWorkers(num_workers_, std::bind(&AlbumOp::WorkerEntry, this, std::placeholders::_1), "", id())); TaskManager::FindMe()->Post(); RETURN_IF_NOT_OK(this->InitSampler()); // pass numRows to Sampler return Status::OK(); diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.h index 46aece4f42..764e666da2 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.h @@ -134,7 +134,7 @@ class AlbumOp : public ParallelOp, public RandomAccessOp { Status SanityCheck(); /// \brief The builder "build" method creates the final object. - /// \param[inout] std::shared_ptr *op - DatasetOp + /// \param[in, out] std::shared_ptr *op - DatasetOp /// \return Status The status code returned Status Build(std::shared_ptr *op); @@ -210,74 +210,82 @@ class AlbumOp : public ParallelOp, public RandomAccessOp { /// \brief Load image to tensor row /// \param[in] image_file Image name of file /// \param[in] col_num Column num in schema - /// \param[inout] row Tensor row to push to + /// \param[in, out] row Tensor row to push to /// \return Status The status code returned Status LoadImageTensor(const std::string &image_file, uint32_t col_num, TensorRow *row); /// \brief Load vector of ints to tensor, append tensor to tensor row /// \param[in] json_obj Json object containing multi-dimensional label /// \param[in] col_num Column num in schema - /// \param[inout] row Tensor row to push to + /// \param[in, out] row Tensor row to push to /// \return Status The status code returned Status LoadIntArrayTensor(const nlohmann::json &json_obj, uint32_t col_num, TensorRow *row); /// \brief Load vector of floatss to tensor, append tensor to tensor row /// \param[in] json_obj Json object containing array data /// \param[in] col_num Column num in schema - /// \param[inout] row Tensor row to push to + /// \param[in, out] row Tensor row to push to /// \return Status The status code returned Status LoadFloatArrayTensor(const nlohmann::json &json_obj, uint32_t col_num, TensorRow *row); /// \brief Load string array into a tensor, append tensor to tensor row /// \param[in] json_obj Json object containing string tensor /// \param[in] col_num Column num in schema - /// \param[inout] row Tensor row to push to + /// \param[in, out] row Tensor row to push to /// \return Status The status code returned Status LoadStringArrayTensor(const nlohmann::json &json_obj, uint32_t col_num, TensorRow *row); /// \brief Load string into a tensor, append tensor to tensor row /// \param[in] json_obj Json object containing string tensor /// \param[in] col_num Column num in schema - /// \param[inout] row Tensor row to push to + /// \param[in, out] row Tensor row to push to /// \return Status The status code returned Status LoadStringTensor(const nlohmann::json &json_obj, uint32_t col_num, TensorRow *row); /// \brief Load float value to tensor row /// \param[in] json_obj Json object containing float /// \param[in] col_num Column num in schema - /// \param[inout] row Tensor row to push to + /// \param[in, out] row Tensor row to push to /// \return Status The status code returned Status LoadFloatTensor(const nlohmann::json &json_obj, uint32_t col_num, TensorRow *row); /// \brief Load int value to tensor row /// \param[in] json_obj Json object containing int /// \param[in] col_num Column num in schema - /// \param[inout] row Tensor row to push to + /// \param[in, out] row Tensor row to push to /// \return Status The status code returned Status LoadIntTensor(const nlohmann::json &json_obj, uint32_t col_num, TensorRow *row); - /// \brief Load emtpy tensor to tensor row + /// \brief Load empty tensor to tensor row /// \param[in] col_num Column num in schema - /// \param[inout] row Tensor row to push to + /// \param[in, out] row Tensor row to push to /// \return Status The status code returned Status LoadEmptyTensor(uint32_t col_num, TensorRow *row); /// \brief Load id from file name to tensor row /// \param[in] file The file name to get ID from /// \param[in] col_num Column num in schema - /// \param[inout] row Tensor row to push to + /// \param[in, out] row Tensor row to push to /// \return Status The status code returned Status LoadIDTensor(const std::string &file, uint32_t col_num, TensorRow *row); /// \brief Load a tensor row according to a json file /// \param[in] row_id_type row_id - id for this tensor row /// \param[in] ImageColumns file Json file location - /// \param[inout] TensorRow row Json content stored into a tensor row + /// \param[in, out] TensorRow row Json content stored into a tensor row /// \return Status The status code returned Status LoadTensorRow(row_id_type row_id, const std::string &file, TensorRow *row); + /// \brief Load a tensor column according to a json file + /// \param[in] ImageColumns file Json file location + /// \param[in] index - certain column index + /// \param[in] js - json object + /// \param[in, out] TensorRow row Json content stored into a tensor row + /// \return Status The status code returned + Status loadColumnData(const std::string &file, int32_t index, nlohmann::json js, TensorRow *row); + /// \param[in] const std::vector &keys Keys in ioblock - /// \param[inout] std::unique_ptr db Databuffer to push to + /// \param[in, out] std::unique_ptr db Databuffer to push to /// \return Status The status code returned Status LoadBuffer(const std::vector &keys, std::unique_ptr *db); diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/celeba_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/celeba_op.cc index 67db42ec40..3dae849e65 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/celeba_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/celeba_op.cc @@ -102,8 +102,10 @@ Status CelebAOp::LaunchThreadsAndInitOp() { RETURN_IF_NOT_OK(attr_info_queue_->Register(tree_->AllTasks())); RETURN_IF_NOT_OK(wait_for_workers_post_.Register(tree_->AllTasks())); - RETURN_IF_NOT_OK(tree_->AllTasks()->CreateAsyncTask("Walking attr file", std::bind(&CelebAOp::ParseAttrFile, this))); - RETURN_IF_NOT_OK(tree_->LaunchWorkers(num_workers_, std::bind(&CelebAOp::WorkerEntry, this, std::placeholders::_1))); + RETURN_IF_NOT_OK( + tree_->AllTasks()->CreateAsyncTask("Walking attr file", std::bind(&CelebAOp::ParseAttrFile, this), nullptr, id())); + RETURN_IF_NOT_OK( + tree_->LaunchWorkers(num_workers_, std::bind(&CelebAOp::WorkerEntry, this, std::placeholders::_1), Name(), id())); TaskManager::FindMe()->Post(); RETURN_IF_NOT_OK(ParseImageAttrInfo()); RETURN_IF_NOT_OK(sampler_->HandshakeRandomAccessOp(this)); diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/cifar_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/cifar_op.cc index f32fb07691..6158a8957f 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/cifar_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/cifar_op.cc @@ -165,9 +165,10 @@ Status CifarOp::LaunchThreadsAndInitOp() { } RETURN_IF_NOT_OK(io_block_queues_.Register(tree_->AllTasks())); RETURN_IF_NOT_OK(wait_for_workers_post_.Register(tree_->AllTasks())); + RETURN_IF_NOT_OK(tree_->AllTasks()->CreateAsyncTask( + "Get cifar data block", std::bind(&CifarOp::ReadCifarBlockDataAsync, this), nullptr, id())); RETURN_IF_NOT_OK( - tree_->AllTasks()->CreateAsyncTask("Get cifar data block", std::bind(&CifarOp::ReadCifarBlockDataAsync, this))); - RETURN_IF_NOT_OK(tree_->LaunchWorkers(num_workers_, std::bind(&CifarOp::WorkerEntry, this, std::placeholders::_1))); + tree_->LaunchWorkers(num_workers_, std::bind(&CifarOp::WorkerEntry, this, std::placeholders::_1), "", id())); TaskManager::FindMe()->Post(); // The order of the following 2 functions must not be changed! RETURN_IF_NOT_OK(ParseCifarData()); // Parse cifar data and get num rows, blocking diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/clue_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/clue_op.cc index 8287191d5a..affd60be2a 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/clue_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/clue_op.cc @@ -237,9 +237,10 @@ Status ClueOp::operator()() { RETURN_IF_NOT_OK(io_block_queue_wait_post_.Register(tree_->AllTasks())); // launch one thread, responsible for filling IoBlockQueue - RETURN_IF_NOT_OK(tree_->LaunchWorkers(1, std::bind(&ClueOp::WaitToFillIOBlockQueue, this))); + RETURN_IF_NOT_OK(tree_->LaunchWorkers(1, std::bind(&ClueOp::WaitToFillIOBlockQueue, this), "", id())); - RETURN_IF_NOT_OK(tree_->LaunchWorkers(num_workers_, std::bind(&ClueOp::WorkerEntry, this, std::placeholders::_1))); + RETURN_IF_NOT_OK( + tree_->LaunchWorkers(num_workers_, std::bind(&ClueOp::WorkerEntry, this, std::placeholders::_1), "", id())); // must be called after launching workers. TaskManager::FindMe()->Post(); diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/coco_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/coco_op.cc index 2bf690de35..223b12499e 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/coco_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/coco_op.cc @@ -638,7 +638,8 @@ Status CocoOp::LaunchThreadsAndInitOp() { } RETURN_IF_NOT_OK(io_block_queues_.Register(tree_->AllTasks())); RETURN_IF_NOT_OK(wait_for_workers_post_.Register(tree_->AllTasks())); - RETURN_IF_NOT_OK(tree_->LaunchWorkers(num_workers_, std::bind(&CocoOp::WorkerEntry, this, std::placeholders::_1))); + RETURN_IF_NOT_OK( + tree_->LaunchWorkers(num_workers_, std::bind(&CocoOp::WorkerEntry, this, std::placeholders::_1), "", id())); TaskManager::FindMe()->Post(); RETURN_IF_NOT_OK(this->ParseAnnotationIds()); RETURN_IF_NOT_OK(this->InitSampler()); diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/csv_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/csv_op.cc index 9938811a5b..32118e7e78 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/csv_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/csv_op.cc @@ -555,9 +555,10 @@ Status CsvOp::operator()() { RETURN_IF_NOT_OK(io_block_queue_wait_post_.Register(tree_->AllTasks())); // launch one thread, responsible for filling IoBlockQueue - RETURN_IF_NOT_OK(tree_->LaunchWorkers(1, std::bind(&CsvOp::WaitToFillIOBlockQueue, this))); + RETURN_IF_NOT_OK(tree_->LaunchWorkers(1, std::bind(&CsvOp::WaitToFillIOBlockQueue, this), "", id())); - RETURN_IF_NOT_OK(tree_->LaunchWorkers(num_workers_, std::bind(&CsvOp::WorkerEntry, this, std::placeholders::_1))); + RETURN_IF_NOT_OK( + tree_->LaunchWorkers(num_workers_, std::bind(&CsvOp::WorkerEntry, this, std::placeholders::_1), "", id())); // must be called after launching workers. TaskManager::FindMe()->Post(); diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/image_folder_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/image_folder_op.cc index efd02b1709..dbece1371d 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/image_folder_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/image_folder_op.cc @@ -385,12 +385,13 @@ Status ImageFolderOp::LaunchThreadsAndInitOp() { // 1) A thread that walks all folders and push the folder names to a util:Queue folder_name_queue_. // 2) Workers that pull foldername from folder_name_queue_, walk it and return the sorted images to image_name_queue // 3) Launch main workers that load DataBuffers by reading all images - RETURN_IF_NOT_OK(tree_->AllTasks()->CreateAsyncTask("walk dir", std::bind(&ImageFolderOp::StartAsyncWalk, this))); + RETURN_IF_NOT_OK( + tree_->AllTasks()->CreateAsyncTask("walk dir", std::bind(&ImageFolderOp::StartAsyncWalk, this), nullptr, id())); RETURN_IF_NOT_OK(tree_->LaunchWorkers(num_workers_, std::bind(&ImageFolderOp::PrescanWorkerEntry, this, std::placeholders::_1), - Name() + "::PrescanWorkerEntry")); + Name() + "::PrescanWorkerEntry", id())); RETURN_IF_NOT_OK(tree_->LaunchWorkers( - num_workers_, std::bind(&ImageFolderOp::WorkerEntry, this, std::placeholders::_1), Name() + "::WorkerEntry")); + num_workers_, std::bind(&ImageFolderOp::WorkerEntry, this, std::placeholders::_1), Name() + "::WorkerEntry", id())); TaskManager::FindMe()->Post(); // The order of the following 2 functions must not be changed! RETURN_IF_NOT_OK(this->PrescanMasterEntry(folder_path_)); // Master thread of pre-scan workers, blocking diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/manifest_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/manifest_op.cc index 3c6827bcdb..a7e83d48f3 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/manifest_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/manifest_op.cc @@ -152,7 +152,7 @@ Status ManifestOp::LaunchThreadsAndInitOp() { RETURN_IF_NOT_OK(wait_for_workers_post_.Register(tree_->AllTasks())); RETURN_IF_NOT_OK( - tree_->LaunchWorkers(num_workers_, std::bind(&ManifestOp::WorkerEntry, this, std::placeholders::_1))); + tree_->LaunchWorkers(num_workers_, std::bind(&ManifestOp::WorkerEntry, this, std::placeholders::_1), "", id())); TaskManager::FindMe()->Post(); RETURN_IF_NOT_OK(ParseManifestFile()); RETURN_IF_NOT_OK(CountDatasetInfo()); diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mindrecord_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mindrecord_op.cc index da24b5e5fb..d3d68b2521 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mindrecord_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mindrecord_op.cc @@ -446,7 +446,7 @@ Status MindRecordOp::LaunchThreadAndInitOp() { } // Launch main workers that load DataBuffers by reading all images RETURN_IF_NOT_OK( - tree_->LaunchWorkers(num_workers_, std::bind(&MindRecordOp::WorkerEntry, this, std::placeholders::_1))); + tree_->LaunchWorkers(num_workers_, std::bind(&MindRecordOp::WorkerEntry, this, std::placeholders::_1), "", id())); TaskManager::FindMe()->Post(); return Status::OK(); } diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mnist_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mnist_op.cc index 3afbe771d7..363747b10c 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mnist_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mnist_op.cc @@ -420,7 +420,8 @@ Status MnistOp::LaunchThreadsAndInitOp() { } RETURN_IF_NOT_OK(io_block_queues_.Register(tree_->AllTasks())); RETURN_IF_NOT_OK(wait_for_workers_post_.Register(tree_->AllTasks())); - RETURN_IF_NOT_OK(tree_->LaunchWorkers(num_workers_, std::bind(&MnistOp::WorkerEntry, this, std::placeholders::_1))); + RETURN_IF_NOT_OK( + tree_->LaunchWorkers(num_workers_, std::bind(&MnistOp::WorkerEntry, this, std::placeholders::_1), "", id())); TaskManager::FindMe()->Post(); RETURN_IF_NOT_OK(this->WalkAllFiles()); RETURN_IF_NOT_OK(this->ParseMnistData()); diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/random_data_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/random_data_op.cc index bfccc97ff3..0beb69e1be 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/random_data_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/random_data_op.cc @@ -197,7 +197,7 @@ Status RandomDataOp::operator()() { // RandomDataOp doesn't need the master thread to stay around. Kick off the workers and then master exits. RETURN_IF_NOT_OK( - tree_->LaunchWorkers(num_workers_, std::bind(&RandomDataOp::WorkerEntry, this, std::placeholders::_1))); + tree_->LaunchWorkers(num_workers_, std::bind(&RandomDataOp::WorkerEntry, this, std::placeholders::_1), "", id())); // required task group setup after launching workers TaskManager::FindMe()->Post(); diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/text_file_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/text_file_op.cc index 5ddc4732ec..0680f9aca8 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/text_file_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/text_file_op.cc @@ -385,11 +385,11 @@ Status TextFileOp::operator()() { RETURN_IF_NOT_OK(io_block_queue_wait_post_.Register(tree_->AllTasks())); // launch one thread, responsible for filling IoBlockQueue - RETURN_IF_NOT_OK(tree_->LaunchWorkers(1, std::bind(&TextFileOp::WaitToFillIOBlockQueue, this))); + RETURN_IF_NOT_OK(tree_->LaunchWorkers(1, std::bind(&TextFileOp::WaitToFillIOBlockQueue, this), Name(), id())); // Read data from disk into buffers RETURN_IF_NOT_OK( - tree_->LaunchWorkers(num_workers_, std::bind(&TextFileOp::WorkerEntry, this, std::placeholders::_1))); + tree_->LaunchWorkers(num_workers_, std::bind(&TextFileOp::WorkerEntry, this, std::placeholders::_1), Name(), id())); // must be called after launching workers. TaskManager::FindMe()->Post(); diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/tf_reader_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/tf_reader_op.cc index 4b9db81fed..562545b313 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/tf_reader_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/tf_reader_op.cc @@ -234,12 +234,12 @@ Status TFReaderOp::operator()() { RETURN_IF_NOT_OK(io_block_queue_wait_post_.Register(tree_->AllTasks())); // launch one thread, responsible for filling mIOBlockQueue - RETURN_IF_NOT_OK(tree_->LaunchWorkers(1, std::bind(&TFReaderOp::WaitToFillIOBlockQueue, this))); + RETURN_IF_NOT_OK(tree_->LaunchWorkers(1, std::bind(&TFReaderOp::WaitToFillIOBlockQueue, this), "", id())); // launch num_workers_ worker threads, responsible for pulling from the IOBlockQueue and reading // data from disk into buffers RETURN_IF_NOT_OK( - tree_->LaunchWorkers(num_workers_, std::bind(&TFReaderOp::WorkerEntry, this, std::placeholders::_1))); + tree_->LaunchWorkers(num_workers_, std::bind(&TFReaderOp::WorkerEntry, this, std::placeholders::_1), "", id())); // must be called after launching workers. workers can't be spawned after this post, // so workers have to be kept alive until the end of the program diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/voc_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/voc_op.cc index a876d78a1c..bb4946d910 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/voc_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/voc_op.cc @@ -366,6 +366,7 @@ Status VOCOp::ParseAnnotationBbox(const std::string &path) { } else { RETURN_STATUS_UNEXPECTED("Invalid data, bndbox dismatch in " + path); } + if (label_name != "" && (class_index_.empty() || class_index_.find(label_name) != class_index_.end()) && xmin > 0 && ymin > 0 && xmax > xmin && ymax > ymin) { std::vector bbox_list = {xmin, ymin, xmax - xmin, ymax - ymin, difficult, truncated}; @@ -389,7 +390,8 @@ Status VOCOp::LaunchThreadsAndInitOp() { } RETURN_IF_NOT_OK(io_block_queues_.Register(tree_->AllTasks())); RETURN_IF_NOT_OK(wait_for_workers_post_.Register(tree_->AllTasks())); - RETURN_IF_NOT_OK(tree_->LaunchWorkers(num_workers_, std::bind(&VOCOp::WorkerEntry, this, std::placeholders::_1))); + RETURN_IF_NOT_OK( + tree_->LaunchWorkers(num_workers_, std::bind(&VOCOp::WorkerEntry, this, std::placeholders::_1), "", id())); TaskManager::FindMe()->Post(); RETURN_IF_NOT_OK(this->ParseImageIds()); if (task_type_ == TaskType::Detection) { diff --git a/mindspore/ccsrc/minddata/dataset/engine/execution_tree.cc b/mindspore/ccsrc/minddata/dataset/engine/execution_tree.cc index d10d70a224..27543bc6c2 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/execution_tree.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/execution_tree.cc @@ -208,7 +208,7 @@ Status ExecutionTree::Launch() { // the launching tree/user thread. Do not exec any thread for an inlined op. itr->state_ = DatasetOp::OpState::kDeOpRunning; if (!itr->inlined()) { - RETURN_IF_NOT_OK(tg_->CreateAsyncTask(itr->NameWithID(), std::ref(*itr))); + RETURN_IF_NOT_OK(tg_->CreateAsyncTask(itr->NameWithID(), std::ref(*itr), nullptr, itr->id())); // Set the state of the Operator as running. This only matters in Leaf ops, CacheOp and TakeOp } } @@ -237,7 +237,8 @@ ExecutionTree::Iterator::Iterator(const std::shared_ptr &root) : ind_ // Given the number of workers, launches the worker entry function for each. Essentially a // wrapper for the TaskGroup handling that is stored inside the execution tree. -Status ExecutionTree::LaunchWorkers(int32_t num_workers, std::function func, std::string name) { +Status ExecutionTree::LaunchWorkers(int32_t num_workers, std::function func, std::string name, + int32_t operator_id) { int32_t num_cpu_threads = GlobalContext::Instance()->config_manager()->num_cpu_threads(); // this performs check that num_workers is positive and not unreasonably large which could happen // for example, un-initialized variable. uint16 max is 65536 which is large enough to cover everything @@ -249,7 +250,7 @@ Status ExecutionTree::LaunchWorkers(int32_t num_workers, std::functionCreateAsyncTask(name, std::bind(func, i))); + RETURN_IF_NOT_OK(tg_->CreateAsyncTask(name, std::bind(func, i), nullptr, operator_id)); } return Status::OK(); } diff --git a/mindspore/ccsrc/minddata/dataset/engine/execution_tree.h b/mindspore/ccsrc/minddata/dataset/engine/execution_tree.h index 3c4f1d2fe2..87a6bd069f 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/execution_tree.h +++ b/mindspore/ccsrc/minddata/dataset/engine/execution_tree.h @@ -155,8 +155,11 @@ class ExecutionTree { // wrapper for the TaskGroup handling that is stored inside the execution tree. // @param num_workers - The number of workers to launch // @param func - The function entry point that workers will execute + // @param name - The description of worker to launch + // @param op_id - The id of corresponding operator, if not inherit from dataset op then it is -1. // @return Status The status code returned - Status LaunchWorkers(int32_t num_workers, std::function func, std::string name = ""); + Status LaunchWorkers(int32_t num_workers, std::function func, std::string name = "", + int32_t operator_id = -1); // Getter method // @return shared_ptr to the root operator diff --git a/mindspore/ccsrc/minddata/dataset/engine/perf/CMakeLists.txt b/mindspore/ccsrc/minddata/dataset/engine/perf/CMakeLists.txt index e611add983..d072ea0bbe 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/perf/CMakeLists.txt +++ b/mindspore/ccsrc/minddata/dataset/engine/perf/CMakeLists.txt @@ -5,4 +5,5 @@ add_library(engine-perf OBJECT connector_size.cc dataset_iterator_tracing.cc connector_throughput.cc + cpu_sampling.cc ) diff --git a/mindspore/ccsrc/minddata/dataset/engine/perf/cpu_sampling.cc b/mindspore/ccsrc/minddata/dataset/engine/perf/cpu_sampling.cc new file mode 100644 index 0000000000..5b1ef29f2d --- /dev/null +++ b/mindspore/ccsrc/minddata/dataset/engine/perf/cpu_sampling.cc @@ -0,0 +1,567 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "minddata/dataset/engine/perf/cpu_sampling.h" +#if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__) +#include +#endif +#include +#include +#include +#include +#include +#include +#include "minddata/dataset/api/python/pybind_conversion.h" +#include "minddata/dataset/core/config_manager.h" +#include "minddata/dataset/engine/execution_tree.h" +#include "minddata/dataset/util/path.h" + +using json = nlohmann::json; +namespace mindspore { +namespace dataset { +bool BaseCpu::fetched_all_process_shared = false; +std::unordered_map> BaseCpu::op_process_shared = {}; + +#if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__) +#define USING_LINUX +#endif + +BaseCpu::BaseCpu() { + pre_cpu_stat_.user_stat_ = 0; + pre_cpu_stat_.sys_stat_ = 0; + pre_cpu_stat_.io_stat_ = 0; + pre_cpu_stat_.idle_stat_ = 0; + pre_cpu_stat_.total_stat_ = 0; +} + +Status DeviceCpu::ParseCpuInfo(const std::string &str) { + CpuStat cpu_stat; + uint64_t nice = 0; + uint64_t irq = 0; + uint64_t softirq = 0; + if (std::sscanf(str.c_str(), "%*s %lu %lu %lu %lu %lu %lu %lu", &cpu_stat.user_stat_, &nice, &cpu_stat.sys_stat_, + &cpu_stat.idle_stat_, &cpu_stat.io_stat_, &irq, &softirq) == EOF) { + return Status(StatusCode::kUnexpectedError, "Get device CPU failed."); + } + + cpu_stat.total_stat_ = + cpu_stat.user_stat_ + nice + cpu_stat.sys_stat_ + cpu_stat.idle_stat_ + cpu_stat.io_stat_ + irq + softirq; + // Calculate the utilization from the second sampling + if (!first_collect_) { + CpuUtil info; + info.user_utilization_ = floor((cpu_stat.user_stat_ - pre_cpu_stat_.user_stat_) * 1.0 / + (cpu_stat.total_stat_ - pre_cpu_stat_.total_stat_) * 100 + + 0.5); + info.sys_utilization_ = floor((cpu_stat.sys_stat_ - pre_cpu_stat_.sys_stat_) * 1.0 / + (cpu_stat.total_stat_ - pre_cpu_stat_.total_stat_) * 100 + + 0.5); + info.io_utilization_ = floor((cpu_stat.io_stat_ - pre_cpu_stat_.io_stat_) * 1.0 / + (cpu_stat.total_stat_ - pre_cpu_stat_.total_stat_) * 100 + + 0.5); + info.idle_utilization_ = floor((cpu_stat.idle_stat_ - pre_cpu_stat_.idle_stat_) * 1.0 / + (cpu_stat.total_stat_ - pre_cpu_stat_.total_stat_) * 100 + + 0.5); + cpu_util_.emplace_back(info); + } + pre_cpu_stat_.user_stat_ = cpu_stat.user_stat_; + pre_cpu_stat_.sys_stat_ = cpu_stat.sys_stat_; + pre_cpu_stat_.io_stat_ = cpu_stat.io_stat_; + pre_cpu_stat_.idle_stat_ = cpu_stat.idle_stat_; + pre_cpu_stat_.total_stat_ = cpu_stat.total_stat_; + + return Status::OK(); +} + +Status DeviceCpu::ParseCtxt(const std::string &str) { + uint64_t ctxt; + if (std::sscanf(str.c_str(), "%*s %lu", &ctxt) == EOF) { + return Status(StatusCode::kUnexpectedError, "Get context switch count failed."); + } + // Calculate the utilization from the second sampling + if (!first_collect_) { + context_switch_count_.push_back(ctxt - pre_context_switch_count_); + } + pre_context_switch_count_ = ctxt; + return Status::OK(); +} + +Status DeviceCpu::ParseRunningProcess(const std::string &str) { + uint32_t running_process; + if (std::sscanf(str.c_str(), "%*s %ud", &running_process) == EOF) { + return Status(StatusCode::kUnexpectedError, "Get context switch count failed."); + } + // Drop the first value in order to collect same amount of CPU utilization + if (!first_collect_) { + running_process_.push_back(running_process); + } + + return Status::OK(); +} + +Status DeviceCpu::Collect(ExecutionTree *tree) { + std::ifstream file("/proc/stat"); + if (!file.is_open()) { + MS_LOG(WARNING) << "Open CPU file failed when collect CPU information"; + return Status::OK(); + } + bool first_line = true; + std::string line; + while (getline(file, line)) { + if (first_line) { + first_line = false; + RETURN_IF_NOT_OK(ParseCpuInfo(line)); + } + if (line.find("ctxt") != std::string::npos) { + RETURN_IF_NOT_OK(ParseCtxt(line)); + } + if (line.find("procs_running") != std::string::npos) { + RETURN_IF_NOT_OK(ParseRunningProcess(line)); + } + } + file.close(); + + first_collect_ = false; + return Status::OK(); +} + +Status DeviceCpu::SaveToFile(const std::string &file_path) { + Path path = Path(file_path); + json output; + if (path.Exists()) { + MS_LOG(DEBUG) << file_path << " exists already"; + std::ifstream file(file_path); + file >> output; + } else { + output["sampling_interval"] = GlobalContext::config_manager()->monitor_sampling_interval(); + } + + std::vector user_util; + std::transform(cpu_util_.begin(), cpu_util_.end(), std::back_inserter(user_util), + [&](const CpuUtil &info) { return info.user_utilization_; }); + std::vector sys_util; + std::transform(cpu_util_.begin(), cpu_util_.end(), std::back_inserter(sys_util), + [&](const CpuUtil &info) { return info.sys_utilization_; }); + std::vector io_util; + std::transform(cpu_util_.begin(), cpu_util_.end(), std::back_inserter(io_util), + [&](const CpuUtil &info) { return info.io_utilization_; }); + std::vector idle_util; + std::transform(cpu_util_.begin(), cpu_util_.end(), std::back_inserter(idle_util), + [&](const CpuUtil &info) { return info.idle_utilization_; }); + + output["device_info"] = {{"user_utilization", user_util}, + {"sys_utilization", sys_util}, + {"io_utilization", io_util}, + {"idle_utilization", idle_util}, + {"runable_processes", running_process_}, + {"context_switch_count", context_switch_count_}}; + + // Discard the content of the file when opening. + std::ofstream os(file_path, std::ios::trunc); + os << output; + + MS_LOG(INFO) << "Save device CPU success."; + return Status::OK(); +} + +Status OperatorCpu::ParseCpuInfo(int32_t op_id, int64_t thread_id, + std::unordered_map> *op_stat) { + pid_t pid = 0; +#if defined(USING_LINUX) + pid = syscall(SYS_getpid); +#endif + std::string stat_path = "/proc/" + std::to_string(pid) + "/task/" + std::to_string(thread_id) + "/stat"; + + // Judge whether file exist first + Path temp_path(stat_path); + if (!temp_path.Exists()) { + (*op_stat)[op_id][thread_id].user_stat_ = 0; + (*op_stat)[op_id][thread_id].sys_stat_ = 0; + return Status(StatusCode::kFileNotExist); + } + + std::ifstream file(stat_path); + if (!file.is_open()) { + MS_LOG(WARNING) << "Open CPU file failed when collect CPU information"; + return Status::OK(); + } + std::string str; + getline(file, str); + uint64_t utime; + uint64_t stime; + if (std::sscanf(str.c_str(), "%*d %*s %*s %*lu %*lu %*lu %*lu %*lu %*lu %*lu %*lu %*lu %*lu %lu %lu", &utime, + &stime) == EOF) { + file.close(); + return Status(StatusCode::kUnexpectedError, "Get device CPU failed."); + } + file.close(); + (*op_stat)[op_id][thread_id].user_stat_ = utime; + (*op_stat)[op_id][thread_id].sys_stat_ = stime; + + return Status::OK(); +} + +Status OperatorCpu::GetTotalCpuTime(uint64_t *total_stat) { + std::ifstream file("/proc/stat"); + if (!file.is_open()) { + MS_LOG(WARNING) << "Open CPU file failed when collect CPU information"; + return Status::OK(); + } + std::string str; + getline(file, str); + uint64_t user = 0, sys = 0, idle = 0, iowait = 0, nice = 0, irq = 0, softirq = 0; + if (std::sscanf(str.c_str(), "%*s %lu %lu %lu %lu %lu %lu %lu", &user, &nice, &sys, &idle, &iowait, &irq, &softirq) == + EOF) { + file.close(); + return Status(StatusCode::kUnexpectedError, "Get device CPU failed."); + } + file.close(); + *total_stat = user + nice + sys + idle + iowait + irq + softirq; + + return Status::OK(); +} + +Status OperatorCpu::Collect(ExecutionTree *tree) { + if (first_collect_) { + for (auto iter = tree->begin(); iter != tree->end(); ++iter) { + id_count++; + } +#if defined(USING_LINUX) + cpu_processor_num = get_nprocs_conf(); +#endif + } + + // Obtain the op and thread mapping + op_thread.clear(); + List allTasks = tree->AllTasks()->GetTask(); + for (auto &task1 : allTasks) { + int32_t op_id = task1.get_operator_id(); + op_thread[op_id].emplace_back(task1.get_linux_id()); + } + + // add process id into op_thread + if (!fetched_all_process) { + { + py::gil_scoped_acquire gil_acquire; + py::module ds = py::module::import("mindspore.dataset.engine.datasets"); + py::tuple process_info = ds.attr("_get_operator_process")(); + py::dict sub_process = py::reinterpret_borrow(process_info[0]); + fetched_all_process = py::reinterpret_borrow(process_info[1]); + // parse dict value + op_process = toIntMap(sub_process); + BaseCpu::op_process_shared = op_process; + BaseCpu::fetched_all_process_shared = fetched_all_process; + } + + // judge whether there is device_que operator, if so operator id may need increase by one, temp use directly + for (auto item : op_process) { + if (!item.second.empty()) { + if (op_thread.find(item.first) != op_thread.end()) { + op_thread[item.first].insert(op_thread[item.first].end(), item.second.begin(), item.second.end()); + } else { + op_thread[item.first] = item.second; + } + } + } + } + + uint64_t total_stat_; + RETURN_IF_NOT_OK(GetTotalCpuTime(&total_stat_)); + std::vector cpu_step_util_; + std::unordered_map> op_stat_; + + if (!first_collect_) { + // obtain all the op id in current tasks + std::vector total_op_id; + for (auto iter = op_thread.begin(); iter != op_thread.end(); iter++) { + total_op_id.emplace_back(iter->first); + } + + // iter all the op, and obtain the CPU utilization of each operator + for (auto op_id = -1; op_id < id_count; op_id++) { + float user_util = 0, sys_util = 0; + auto iter = std::find(total_op_id.begin(), total_op_id.end(), op_id); + if (iter != total_op_id.end()) { + for (auto thread_id : op_thread[op_id]) { + if (ParseCpuInfo(op_id, thread_id, &op_stat_) == Status::OK()) { + user_util += (op_stat_[op_id][thread_id].user_stat_ - pre_op_stat_[op_id][thread_id].user_stat_) * 1.0 / + (total_stat_ - pre_total_stat_) * 100; + sys_util += (op_stat_[op_id][thread_id].sys_stat_ - pre_op_stat_[op_id][thread_id].sys_stat_) * 1.0 / + (total_stat_ - pre_total_stat_) * 100; + } + } + } + CpuOpUtil info; + info.op_id = op_id; + info.sys_utilization_ = sys_util; + info.user_utilization_ = user_util; + cpu_step_util_.emplace_back(info); + } + cpu_op_util_.emplace_back(cpu_step_util_); + } else { + // mainly obtain the init CPU execute time in first collect + for (auto iter = op_thread.begin(); iter != op_thread.end(); iter++) { + int32_t op_id = iter->first; + for (auto thread_id : iter->second) { + ParseCpuInfo(op_id, thread_id, &op_stat_); + } + } + } + + // copy current op_stat into pre_op_stat + pre_op_stat_ = op_stat_; + pre_total_stat_ = total_stat_; + + first_collect_ = false; + return Status::OK(); +} + +Status OperatorCpu::SaveToFile(const std::string &file_path) { + Path path = Path(file_path); + json output; + if (path.Exists()) { + MS_LOG(DEBUG) << file_path << "already exist."; + std::ifstream file(file_path); + file >> output; + } + + uint8_t index = 0; + json OpWriter; + for (auto op_id = -1; op_id < id_count; op_id++) { + std::vector user_util; + std::vector sys_util; + std::transform( + cpu_op_util_.begin(), cpu_op_util_.end(), std::back_inserter(user_util), + [&](const std::vector &info) { return int16_t(info[index].user_utilization_ * cpu_processor_num); }); + std::transform( + cpu_op_util_.begin(), cpu_op_util_.end(), std::back_inserter(sys_util), + [&](const std::vector &info) { return int16_t(info[index].sys_utilization_ * cpu_processor_num); }); + + json per_op_info = {{"metrics", {{"user_utilization", user_util}, {"sys_utilization", sys_util}}}, + {"op_id", op_id}}; + OpWriter.emplace_back(per_op_info); + index++; + } + output["op_info"] = OpWriter; + + // Discard the content of the file when opening. + std::ofstream os(file_path, std::ios::trunc); + os << output; + + MS_LOG(INFO) << "Save device CPU success."; + return Status::OK(); +} + +Status ProcessCpu::ParseCpuInfo() { + uint64_t total_stat_; + RETURN_IF_NOT_OK(GetTotalCpuTime(&total_stat_)); + + if (!pre_fetched_state) { + process_id.clear(); + pid_t main_pid = 0; +#if defined(USING_LINUX) + main_pid = syscall(SYS_getpid); +#endif + process_id.emplace_back(main_pid); + op_process = BaseCpu::op_process_shared; + fetched_all_process = BaseCpu::fetched_all_process_shared; + for (auto item : op_process) { + for (auto id : item.second) { + process_id.emplace_back(id); + } + } + } + + float user_util = 0, sys_util = 0; + for (auto pid : process_id) { + std::string stat_path = "/proc/" + std::to_string(pid) + "/stat"; + + std::ifstream file(stat_path); + if (!file.is_open()) { + MS_LOG(WARNING) << "Open CPU file failed when collect CPU information"; + continue; + } + std::string str; + getline(file, str); + uint64_t user = 0, sys = 0; + if (std::sscanf(str.c_str(), "%*d %*s %*s %*lu %*lu %*lu %*lu %*lu %*lu %*lu %*lu %*lu %*lu %lu %lu", &user, + &sys) == EOF) { + file.close(); + return Status(StatusCode::kUnexpectedError, "Get device CPU failed."); + } + file.close(); + + // Calculate the utilization from the second sampling + if (!first_collect_ && (pre_process_stat_.find(pid) != pre_process_stat_.end())) { + user_util += (user - pre_process_stat_[pid].user_stat_) * 1.0 / (total_stat_ - pre_total_stat_) * 100; + sys_util += (sys - pre_process_stat_[pid].sys_stat_) * 1.0 / (total_stat_ - pre_total_stat_) * 100; + } + pre_process_stat_[pid].user_stat_ = user; + pre_process_stat_[pid].sys_stat_ = sys; + } + if (!first_collect_) { + CpuProcessUtil info; + info.user_utilization_ = user_util; + info.sys_utilization_ = sys_util; + process_util_.emplace_back(info); + } + pre_total_stat_ = total_stat_; + first_collect_ = false; + pre_fetched_state = fetched_all_process; + return Status::OK(); +} + +Status ProcessCpu::GetTotalCpuTime(uint64_t *total_stat) { + std::ifstream file("/proc/stat"); + if (!file.is_open()) { + MS_LOG(WARNING) << "Open CPU file failed when collect CPU information"; + return Status::OK(); + } + std::string str; + getline(file, str); + uint64_t user = 0, sys = 0, idle = 0, iowait = 0, nice = 0, irq = 0, softirq = 0; + if (std::sscanf(str.c_str(), "%*s %lu %lu %lu %lu %lu %lu %lu", &user, &nice, &sys, &idle, &iowait, &irq, &softirq) == + EOF) { + file.close(); + return Status(StatusCode::kUnexpectedError, "Get device CPU failed."); + } + file.close(); + *total_stat = user + nice + sys + idle + iowait + irq + softirq; + + return Status::OK(); +} + +Status ProcessCpu::Collect(ExecutionTree *tree) { + if (first_collect_) { +#if defined(USING_LINUX) + cpu_processor_num = get_nprocs_conf(); +#endif + } + RETURN_IF_NOT_OK(ParseCpuInfo()); + + return Status::OK(); +} + +Status ProcessCpu::SaveToFile(const std::string &file_path) { + Path path = Path(file_path); + json output; + if (path.Exists()) { + MS_LOG(DEBUG) << file_path << "already exist."; + std::ifstream file(file_path); + file >> output; + } else { + output["sampling_interval"] = GlobalContext::config_manager()->monitor_sampling_interval(); + } + + std::vector user_util; + std::transform(process_util_.begin(), process_util_.end(), std::back_inserter(user_util), + [&](const CpuProcessUtil &info) { return uint16_t(info.user_utilization_ * cpu_processor_num); }); + std::vector sys_util; + std::transform(process_util_.begin(), process_util_.end(), std::back_inserter(sys_util), + [&](const CpuProcessUtil &info) { return uint16_t(info.sys_utilization_ * cpu_processor_num); }); + + output["process_info"] = {{"user_utilization", user_util}, {"sys_utilization", sys_util}}; + output["cpu_processor_num"] = cpu_processor_num; + // Discard the content of the file when opening. + std::ofstream os(file_path, std::ios::trunc); + os << output; + + MS_LOG(INFO) << "Save process CPU success."; + return Status::OK(); +} + +Status CpuSampling::CollectTimeStamp() { + time_stamp_.emplace_back(ProfilingTime::GetCurMilliSecond()); + return Status::OK(); +} + +// Sample action +Status CpuSampling::Sample() { + // Collect cpu information + for (auto cpu : cpu_) { + RETURN_IF_NOT_OK(cpu->Collect(this->tree_)); + } + + // Collect time stamp + RETURN_IF_NOT_OK(CollectTimeStamp()); + return Status::OK(); +} + +Status CpuSampling::SaveTimeStampToFile() { + // Save time stamp to json file + // If the file is already exist, simply add the data to corresponding field. + Path path = Path(file_path_); + json output; + if (path.Exists()) { + std::ifstream file(file_path_); + file >> output; + } + output["time_stamp"] = time_stamp_; + std::ofstream os(file_path_, std::ios::trunc); + os << output; + + return Status::OK(); +} + +Status CpuSampling::SaveSamplingItervalToFile() { + // If the file is already exist, simply add the data to corresponding field. + Path path = Path(file_path_); + json output; + if (path.Exists()) { + std::ifstream file(file_path_); + file >> output; + } + output["sampling_interval"] = GlobalContext::config_manager()->monitor_sampling_interval(); + std::ofstream os(file_path_, std::ios::trunc); + os << output; + + return Status::OK(); +} + +// Save profiling data to file +Status CpuSampling::SaveToFile() { + // Save time stamp to json file + RETURN_IF_NOT_OK(SaveTimeStampToFile()); + + // Save time stamp to json file + RETURN_IF_NOT_OK(SaveSamplingItervalToFile()); + + // Save cpu information to json file + for (auto cpu : cpu_) { + RETURN_IF_NOT_OK(cpu->SaveToFile(file_path_)); + } + + return Status::OK(); +} + +Status CpuSampling::Init(const std::string &dir_path, const std::string &device_id) { + file_path_ = (Path(dir_path) / Path("minddata_cpu_utilization_" + device_id + ".json")).toString(); + std::shared_ptr device_cpu = std::make_shared(); + std::shared_ptr operator_cpu = std::make_shared(); + std::shared_ptr process_cpu = std::make_shared(); + cpu_.push_back(device_cpu); + cpu_.push_back(operator_cpu); + cpu_.push_back(process_cpu); + return Status::OK(); +} + +Status CpuSampling::ChangeFileMode() { + if (chmod(common::SafeCStr(file_path_), S_IRUSR | S_IWUSR) == -1) { + std::string err_str = "Change file mode failed," + file_path_; + return Status(StatusCode::kUnexpectedError, err_str); + } + return Status::OK(); +} +} // namespace dataset +} // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/perf/cpu_sampling.h b/mindspore/ccsrc/minddata/dataset/engine/perf/cpu_sampling.h new file mode 100644 index 0000000000..a987f0d898 --- /dev/null +++ b/mindspore/ccsrc/minddata/dataset/engine/perf/cpu_sampling.h @@ -0,0 +1,201 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_CPU_SAMPLING_H +#define MINDSPORE_CCSRC_MINDDATA_DATASET_CPU_SAMPLING_H + +#include +#include +#include +#include +#include +#include "minddata/dataset/engine/perf/profiling.h" +#include "minddata/dataset/engine/datasetops/dataset_op.h" + +namespace mindspore { +namespace dataset { +class ExecutionTree; + +// CPU information from /proc/stat or /proc/pid/stat file +typedef struct CpuStat_s { + uint64_t user_stat_; + uint64_t sys_stat_; + uint64_t io_stat_; + uint64_t idle_stat_; + uint64_t total_stat_; +} CpuStat; + +// Cpu utilization +typedef struct CpuInfo_s { + uint8_t user_utilization_; + uint8_t sys_utilization_; + uint8_t io_utilization_; + uint8_t idle_utilization_; +} CpuUtil; + +// CPU utilization of operator +typedef struct CpuOpInfo_s { + float user_utilization_; + float sys_utilization_; + int32_t op_id; +} CpuOpUtil; + +// CPU utilization of process +typedef struct CpuProcessInfo_s { + float user_utilization_; + float sys_utilization_; +} CpuProcessUtil; + +// CPU stat of operator +typedef struct CpuOpStat_s { + uint64_t user_stat_; + uint64_t sys_stat_; +} CpuOpStat; + +class BaseCpu { + public: + BaseCpu(); + ~BaseCpu() = default; + // Collect CPU information + virtual Status Collect(ExecutionTree *tree) = 0; + virtual Status SaveToFile(const std::string &file_path) = 0; + + protected: + std::vector cpu_util_; + CpuStat pre_cpu_stat_; + static bool fetched_all_process_shared; + static std::unordered_map> op_process_shared; + bool fetched_all_process = false; + bool pre_fetched_state = false; + std::unordered_map> op_process; + int32_t cpu_processor_num; +}; + +// Collect device CPU information +class DeviceCpu : public BaseCpu { + public: + DeviceCpu() : pre_running_process_(0), pre_context_switch_count_(0), first_collect_(true) {} + ~DeviceCpu() = default; + Status Collect(ExecutionTree *tree) override; + Status SaveToFile(const std::string &file_path) override; + + private: + // Get CPU information, include use/sys/idle/io utilization + Status ParseCpuInfo(const std::string &str); + + // Get context switch count + Status ParseCtxt(const std::string &str); + + // Get running process count + Status ParseRunningProcess(const std::string &str); + + std::vector running_process_; + std::vector context_switch_count_; + uint32_t pre_running_process_; + uint64_t pre_context_switch_count_; + bool first_collect_; +}; + +// Collect operator CPU information +class OperatorCpu : public BaseCpu { + public: + OperatorCpu() : first_collect_(true) {} + ~OperatorCpu() = default; + Status Collect(ExecutionTree *tree) override; + Status SaveToFile(const std::string &file_path) override; + + private: + // Get cpu information, include use/sys/idle/io utilization + Status ParseCpuInfo(int32_t op_id, int64_t thread_id, + std::unordered_map> *op_stat); + + // Get the total CPU time of device + Status GetTotalCpuTime(uint64_t *total_stat); + + // Store the CPU utilization of each operator + std::vector> cpu_op_util_; + + bool first_collect_; + + // Store the id and its corresponding threads. + std::unordered_map> op_thread; + std::unordered_map> pre_op_stat_; + uint64_t pre_total_stat_; + int32_t id_count = 0; +}; + +// Collect operator CPU information +class ProcessCpu : public BaseCpu { + public: + ProcessCpu() : first_collect_(true) {} + ~ProcessCpu() = default; + Status Collect(ExecutionTree *tree) override; + Status SaveToFile(const std::string &file_path) override; + + private: + // Get CPU information, include use/sys/idle/io utilization + Status ParseCpuInfo(); + + // Get the total CPU time of device + Status GetTotalCpuTime(uint64_t *total_stat); + + bool first_collect_; + std::vector process_util_; + uint64_t pre_total_stat_; + std::unordered_map pre_process_stat_; + std::vector process_id; +}; + +// Sampling CPU information +// It support JSON serialization for external usage. +class CpuSampling : public Sampling { + using TimeStamp = std::vector; + + public: + explicit CpuSampling(ExecutionTree *tree) : tree_(tree) {} + + ~CpuSampling() = default; + + // Driver function for CPU sampling. + // This function samples the CPU information of device/process/op + Status Sample() override; + + std::string Name() const override { return kCpuSamplingName; } + + // Save sampling data to file + // @return Status - The error code return + Status SaveToFile() override; + + Status Init(const std::string &dir_path, const std::string &device_id) override; + + // Change file mode after save CPU data + Status ChangeFileMode() override; + + private: + Status CollectTimeStamp(); + + Status SaveTimeStampToFile(); + + Status SaveSamplingItervalToFile(); + + ExecutionTree *tree_ = nullptr; // ExecutionTree pointer + std::vector> cpu_; // CPU information of device/process/op + TimeStamp time_stamp_; // Time stamp +}; + +} // namespace dataset +} // namespace mindspore + +#endif // MINDSPORE_CCSRC_MINDDATA_DATASET_CPU_SAMPLING_H diff --git a/mindspore/ccsrc/minddata/dataset/engine/perf/dataset_iterator_tracing.cc b/mindspore/ccsrc/minddata/dataset/engine/perf/dataset_iterator_tracing.cc index 90197dbc22..b2188ed0cd 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/perf/dataset_iterator_tracing.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/perf/dataset_iterator_tracing.cc @@ -24,7 +24,7 @@ namespace mindspore { namespace dataset { Status DatasetIteratorTracing::Record(const int32_t type, const int32_t extra_info, const int32_t batch_num, - const int32_t value) { + const int32_t value, const uint64_t time_stamp) { // Format: "type extra-info batch-num value" // type: 0: time, 1: connector size // extra-info: if type is 0 - 0: pipeline time, 1: push tdt time, 2: batch time @@ -36,7 +36,7 @@ Status DatasetIteratorTracing::Record(const int32_t type, const int32_t extra_in // 0 0 20 10 - The 20th batch took 10ms to get data from pipeline. // 1 64 20 5 - Connector size is 5 when get the 20th batch.Connector capacity is 64. std::string data = std::to_string(type) + " " + std::to_string(extra_info) + " " + std::to_string(batch_num) + " " + - std::to_string(value); + std::to_string(value) + " " + std::to_string(time_stamp); value_.emplace_back(data); return Status::OK(); } diff --git a/mindspore/ccsrc/minddata/dataset/engine/perf/dataset_iterator_tracing.h b/mindspore/ccsrc/minddata/dataset/engine/perf/dataset_iterator_tracing.h index 1ddcb03e64..55c3f3108c 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/perf/dataset_iterator_tracing.h +++ b/mindspore/ccsrc/minddata/dataset/engine/perf/dataset_iterator_tracing.h @@ -33,7 +33,8 @@ class DatasetIteratorTracing : public Tracing { // Record tracing data // @return Status The status code returned - Status Record(const int32_t type, const int32_t extra_info, const int32_t batch_num, const int32_t value); + Status Record(const int32_t type, const int32_t extra_info, const int32_t batch_num, const int32_t value, + const uint64_t time_stamp); std::string Name() const override { return kDatasetIteratorTracingName; }; diff --git a/mindspore/ccsrc/minddata/dataset/engine/perf/device_queue_tracing.cc b/mindspore/ccsrc/minddata/dataset/engine/perf/device_queue_tracing.cc index 174fbf5129..8255ed677b 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/perf/device_queue_tracing.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/perf/device_queue_tracing.cc @@ -24,7 +24,7 @@ namespace mindspore { namespace dataset { Status DeviceQueueTracing::Record(const int32_t type, const int32_t extra_info, const int32_t batch_num, - const int32_t value) { + const int32_t value, const uint64_t time_stamp) { // Format: "type extra-info batch-num value" // type: 0: time, 1: connector size // extra-info: if type is 0 - 0: pipeline time, 1: push tdt time, 2: batch time @@ -32,11 +32,12 @@ Status DeviceQueueTracing::Record(const int32_t type, const int32_t extra_info, // batch-num: batch number // value: if type is 0 - value is time(ms) // if type is 1 - value is connector size + // time-stamp: time stamp // Examples: - // 0 0 20 10 - The 20th batch took 10ms to get data from pipeline. - // 1 64 20 5 - Connector size is 5 when get the 20th batch.Connector capacity is 64. + // 0 0 20 10 xxx- The 20th batch took 10ms to get data from pipeline. + // 1 64 20 5 xxx- Connector size is 5 when get the 20th batch.Connector capacity is 64. std::string data = std::to_string(type) + " " + std::to_string(extra_info) + " " + std::to_string(batch_num) + " " + - std::to_string(value); + std::to_string(value) + " " + std::to_string(time_stamp); value_.emplace_back(data); return Status::OK(); } diff --git a/mindspore/ccsrc/minddata/dataset/engine/perf/device_queue_tracing.h b/mindspore/ccsrc/minddata/dataset/engine/perf/device_queue_tracing.h index 15338f1f70..1ed646876d 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/perf/device_queue_tracing.h +++ b/mindspore/ccsrc/minddata/dataset/engine/perf/device_queue_tracing.h @@ -33,7 +33,8 @@ class DeviceQueueTracing : public Tracing { // Record tracing data // @return Status The status code returned - Status Record(const int32_t type, const int32_t extra_info, const int32_t batch_num, const int32_t value); + Status Record(const int32_t type, const int32_t extra_info, const int32_t batch_num, const int32_t value, + const uint64_t time_stamp); std::string Name() const override { return kDeviceQueueTracingName; }; diff --git a/mindspore/ccsrc/minddata/dataset/engine/perf/profiling.cc b/mindspore/ccsrc/minddata/dataset/engine/perf/profiling.cc index a218402b44..49fc54188d 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/perf/profiling.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/perf/profiling.cc @@ -24,6 +24,7 @@ #include "minddata/dataset/engine/perf/device_queue_tracing.h" #include "minddata/dataset/engine/perf/connector_size.h" #include "minddata/dataset/engine/perf/connector_throughput.h" +#include "minddata/dataset/engine/perf/cpu_sampling.h" #include "minddata/dataset/engine/perf/dataset_iterator_tracing.h" #include "minddata/dataset/util/log_adapter.h" @@ -79,6 +80,9 @@ Status ProfilingManager::Initialize() { std::shared_ptr connector_thr_sampling = std::make_shared(tree_); RETURN_IF_NOT_OK(RegisterSamplingNode(connector_thr_sampling)); + std::shared_ptr cpu_sampling = std::make_shared(tree_); + RETURN_IF_NOT_OK(RegisterSamplingNode(cpu_sampling)); + return Status::OK(); } @@ -168,7 +172,7 @@ Status ProfilingManager::ChangeFileMode() { return Status::OK(); } -int64_t ProfilingTime::GetCurMilliSecond() { +uint64_t ProfilingTime::GetCurMilliSecond() { // because cpplint does not allow using namespace using std::chrono::duration_cast; using std::chrono::milliseconds; diff --git a/mindspore/ccsrc/minddata/dataset/engine/perf/profiling.h b/mindspore/ccsrc/minddata/dataset/engine/perf/profiling.h index c76f87af72..0e267c770e 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/perf/profiling.h +++ b/mindspore/ccsrc/minddata/dataset/engine/perf/profiling.h @@ -33,6 +33,7 @@ const char kDeviceQueueTracingName[] = "Device_Queue_Tracing"; const char kDatasetIteratorTracingName[] = "Dataset_Iterator_Tracing"; const char kConnectorSizeSamplingName[] = "Connector_Size_Sampling"; const char kConnectorThroughputSamplingName[] = "Connector_Throughput_Sampling"; +const char kCpuSamplingName[] = "Cpu_Sampling"; // Profiling is a class of basic unit of profiling action // This base class encapsulate the serialization output logic @@ -150,7 +151,7 @@ enum ProfilingTimeSubType { class ProfilingTime { public: - static int64_t GetCurMilliSecond(); + static uint64_t GetCurMilliSecond(); }; } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/tree_adapter.cc b/mindspore/ccsrc/minddata/dataset/engine/tree_adapter.cc index 097fa08c6c..a2726d2c61 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/tree_adapter.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/tree_adapter.cc @@ -246,8 +246,10 @@ Status TreeAdapter::GetNext(TensorRow *row) { RETURN_IF_NOT_OK(cur_db_->PopRow(row)); // Record profiling info if (tracing_ != nullptr) { + uint64_t end_time = ProfilingTime::GetCurMilliSecond(); cur_batch_num_++; - RETURN_IF_NOT_OK(tracing_->Record(CONNECTOR_DEPTH, cur_connector_capacity_, cur_batch_num_, cur_connector_size_)); + RETURN_IF_NOT_OK( + tracing_->Record(CONNECTOR_DEPTH, cur_connector_capacity_, cur_batch_num_, cur_connector_size_, end_time)); } return Status::OK(); } diff --git a/mindspore/ccsrc/minddata/dataset/util/task.cc b/mindspore/ccsrc/minddata/dataset/util/task.cc index b20327d1c9..514b6bb991 100644 --- a/mindspore/ccsrc/minddata/dataset/util/task.cc +++ b/mindspore/ccsrc/minddata/dataset/util/task.cc @@ -44,8 +44,8 @@ void Task::operator()() { #if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__) native_handle_ = pthread_self(); + thread_id_ = syscall(SYS_gettid); #endif - try { // Previously there is a timing hole where the thread is spawn but hit error immediately before we can set // the TaskGroup pointer and register. We move the registration logic to here (after we spawn) so we can @@ -105,8 +105,9 @@ Status Task::GetTaskErrorIfAny() const { } } -Task::Task(const std::string &myName, const std::function &f) +Task::Task(const std::string &myName, const std::function &f, int32_t operator_id) : my_name_(myName), + operator_id_(operator_id), rc_(), fnc_obj_(f), task_group_(nullptr), diff --git a/mindspore/ccsrc/minddata/dataset/util/task.h b/mindspore/ccsrc/minddata/dataset/util/task.h index 5a1d4c6e96..72294d1bd4 100644 --- a/mindspore/ccsrc/minddata/dataset/util/task.h +++ b/mindspore/ccsrc/minddata/dataset/util/task.h @@ -18,6 +18,7 @@ #if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__) #include +#include #endif #include #include @@ -48,7 +49,7 @@ class Task : public IntrpResource { enum class WaitFlag : int { kBlocking, kNonBlocking }; - Task(const std::string &myName, const std::function &f); + Task(const std::string &myName, const std::function &f, int32_t operator_id = -1); // Future objects are not copyable. Task(const Task &) = delete; @@ -87,8 +88,12 @@ class Task : public IntrpResource { std::thread::id get_id() { return id_; } + pid_t get_linux_id() { return thread_id_; } + std::string MyName() const { return my_name_; } + int32_t get_operator_id() { return operator_id_; } + // An operator used by std::find bool operator==(const Task &other) const { return (this == &other); } @@ -107,6 +112,8 @@ class Task : public IntrpResource { private: mutable std::mutex mux_; std::string my_name_; + int32_t operator_id_; + pid_t thread_id_; Status rc_; WaitPost wp_; // Task need to provide definition for this function. It diff --git a/mindspore/ccsrc/minddata/dataset/util/task_manager.cc b/mindspore/ccsrc/minddata/dataset/util/task_manager.cc index f994ab6de1..f1d172c84c 100644 --- a/mindspore/ccsrc/minddata/dataset/util/task_manager.cc +++ b/mindspore/ccsrc/minddata/dataset/util/task_manager.cc @@ -25,7 +25,7 @@ TaskManager *TaskManager::instance_ = nullptr; std::once_flag TaskManager::init_instance_flag_; // This takes the same parameter as Task constructor. Status TaskManager::CreateAsyncTask(const std::string &my_name, const std::function &f, TaskGroup *vg, - Task **task) { + Task **task, int32_t operator_id) { // We need to block destructor coming otherwise we will deadlock. We will grab the // stateLock in shared allowing CreateAsyncTask to run concurrently. SharedLock stateLck(&state_lock_); @@ -33,7 +33,7 @@ Status TaskManager::CreateAsyncTask(const std::string &my_name, const std::funct if (ServiceState() == STATE::kStopInProg || ServiceState() == STATE::kStopped) { return Status(StatusCode::kInterrupted, __LINE__, __FILE__, "TaskManager is shutting down"); } - RETURN_IF_NOT_OK(GetFreeTask(my_name, f, task)); + RETURN_IF_NOT_OK(GetFreeTask(my_name, f, task, operator_id)); if (vg == nullptr) { RETURN_STATUS_UNEXPECTED("TaskGroup is null"); } @@ -248,7 +248,8 @@ void TaskManager::ReturnFreeTask(Task *p) noexcept { } } -Status TaskManager::GetFreeTask(const std::string &my_name, const std::function &f, Task **p) { +Status TaskManager::GetFreeTask(const std::string &my_name, const std::function &f, Task **p, + int32_t operator_id) { if (p == nullptr) { RETURN_STATUS_UNEXPECTED("p is null"); } @@ -262,18 +263,19 @@ Status TaskManager::GetFreeTask(const std::string &my_name, const std::function< } } if (q) { - new (q) Task(my_name, f); + new (q) Task(my_name, f, operator_id); } else { std::shared_ptr mp = Services::GetInstance().GetServiceMemPool(); Status rc; - q = new (&rc, mp) Task(my_name, f); + q = new (&rc, mp) Task(my_name, f, operator_id); RETURN_IF_NOT_OK(rc); } *p = q; return Status::OK(); } -Status TaskGroup::CreateAsyncTask(const std::string &my_name, const std::function &f, Task **ppTask) { +Status TaskGroup::CreateAsyncTask(const std::string &my_name, const std::function &f, Task **ppTask, + int32_t operator_id) { auto pMytask = TaskManager::FindMe(); // We need to block ~TaskGroup coming otherwise we will deadlock. We will grab the // stateLock in shared allowing CreateAsyncTask to run concurrently. @@ -293,7 +295,7 @@ Status TaskGroup::CreateAsyncTask(const std::string &my_name, const std::functio return pMytask->IsMasterThread() ? rc_ : Status(StatusCode::kInterrupted); } } - RETURN_IF_NOT_OK(dm.CreateAsyncTask(my_name, f, this, &pTask)); + RETURN_IF_NOT_OK(dm.CreateAsyncTask(my_name, f, this, &pTask, operator_id)); if (ppTask) { *ppTask = pTask; } diff --git a/mindspore/ccsrc/minddata/dataset/util/task_manager.h b/mindspore/ccsrc/minddata/dataset/util/task_manager.h index 1d20171b61..188ed9e93c 100644 --- a/mindspore/ccsrc/minddata/dataset/util/task_manager.h +++ b/mindspore/ccsrc/minddata/dataset/util/task_manager.h @@ -75,7 +75,8 @@ class TaskManager : public Service { // API // This takes the same parameter as Task constructor. Take a look // of the test-thread.cc for usage. - Status CreateAsyncTask(const std::string &my_name, const std::function &f, TaskGroup *vg, Task **); + Status CreateAsyncTask(const std::string &my_name, const std::function &f, TaskGroup *vg, Task **, + int32_t operator_id = -1); // Same usage as boot thread group Status join_all(); @@ -100,7 +101,7 @@ class TaskManager : public Service { void ReturnFreeTask(Task *p) noexcept; - Status GetFreeTask(const std::string &my_name, const std::function &f, Task **p); + Status GetFreeTask(const std::string &my_name, const std::function &f, Task **p, int32_t operator_id = -1); Status WatchDog(); @@ -129,7 +130,8 @@ class TaskGroup : public Service { friend class Task; friend class TaskManager; - Status CreateAsyncTask(const std::string &my_name, const std::function &f, Task **pTask = nullptr); + Status CreateAsyncTask(const std::string &my_name, const std::function &f, Task **pTask = nullptr, + int32_t operator_id = -1); void interrupt_all() noexcept; @@ -137,6 +139,8 @@ class TaskGroup : public Service { int size() const noexcept { return grp_list_.count; } + List GetTask() const noexcept { return grp_list_; } + Status DoServiceStart() override { return Status::OK(); } Status DoServiceStop() override; diff --git a/mindspore/dataset/engine/datasets.py b/mindspore/dataset/engine/datasets.py index b95dbb77b1..2718a31309 100644 --- a/mindspore/dataset/engine/datasets.py +++ b/mindspore/dataset/engine/datasets.py @@ -107,6 +107,25 @@ def zip(datasets): return ZipDataset(datasets) +def _get_operator_process(): + """ + Inner implemented method, mainly for passing sub-process id in C layer + + Returns: + dict, mapping dict of operator id and corresponding process id. + """ + global _OP_PROCESS + process_info = _OP_PROCESS + op_process = dict() + keys = process_info.keys() + fetched_all = True + for key in keys: + op_process[key] = list(process_info[key][1]) + item_full = (len(process_info[key][1]) == process_info[key][0]) + fetched_all = fetched_all and item_full + return op_process, fetched_all + + class Dataset: """ Abstract class to represent a dataset in DataEngine's data pipeline. @@ -156,11 +175,47 @@ class Dataset: parent = self.parent self.parent = [] dataset = copy.deepcopy(self) + global _OP_NAME + _OP_NAME = Dataset._get_operator_id(dataset) ir_tree = dataset.parse_tree() self.parent = parent _init_device_info() return ir_tree, dataset + @staticmethod + def _get_operator_id(dataset): + """ + Internal method to iterate the tree and obtain op_id of each operator. + + Returns: + Dataset, the root dataset of the tree. + """ + op_name = dict() + generator_process = dict() + op_name[str(dataset)] = 0 + op_id = 1 + + def process_name(datasets, operator_id): + if not datasets: + return 0 + temp = [] + for item in datasets: + for d in item.children: + temp.append(d) + op_name[str(d)] = operator_id + if isinstance(d, GeneratorDataset) and d.sample_fn: + if d.sample_fn.pid: + generator_process[operator_id] = [d.num_parallel_workers, set(d.sample_fn.pid)] + + operator_id = operator_id + 1 + return process_name(temp, operator_id) + + process_name([dataset], op_id) + if generator_process: + global _OP_PROCESS + _OP_PROCESS.update(generator_process) + return op_name + def parse_tree(self): """ Internal method to parse the API tree into an IR tree. @@ -2202,7 +2257,8 @@ class ShuffleDataset(Dataset): # Pyfunc collection for multiprocess pyfunc # This global variable will only be used within subprocesses _GLOBAL_PYFUNC_LIST = [] - +_OP_NAME = dict() +_OP_PROCESS = dict() # Pyfunc worker init function # Python multiprocessing library forbid sending lambda function through pipe. @@ -2215,6 +2271,9 @@ def _pyfunc_worker_init(pyfunc_list): # Pyfunc worker execution function # All exceptions will be raised to main processes def _pyfunc_worker_exec(index, *args): + """ + Internal function for call certain pyfunc in python process. + """ try: return _GLOBAL_PYFUNC_LIST[index](*args) except KeyboardInterrupt: @@ -3509,6 +3568,7 @@ class SamplerFn: self.multi_process = multi_process self.joined = False self.ppid = os.getpid() + self.pid = [] # Event for end of epoch if multi_process is True: self.eof = multiprocessing.Event() @@ -3523,6 +3583,7 @@ class SamplerFn: # which may cause deadlock. Therefore, the subprocess startup is performed in che initialization phase. # In this phase, the main process is not locked. worker.start() + self.pid.append(worker.pid) else: worker = _GeneratorWorkerMt(dataset, self.eof) worker.daemon = True @@ -3847,6 +3908,7 @@ class GeneratorDataset(MappableDataset): new_op.source_len = self.source_len new_op.saved_output_types = self.saved_output_types new_op.saved_output_shapes = self.saved_output_shapes + sample_fn = None if hasattr(self, "__total_batch__"): new_op.__total_batch__ = self.__total_batch__ if new_op.sampler is not None and hasattr(self.source, "__getitem__"): @@ -3870,9 +3932,11 @@ class GeneratorDataset(MappableDataset): new_op.source = (lambda: _py_sampler_fn_mp(new_op.sample_ids, new_op.num_samples, sample_fn)) else: new_op.source = (lambda: _py_sampler_fn(new_op.sample_ids, new_op.num_samples, self.source)) + new_op.sample_fn = sample_fn else: try: new_op.sampler = None + new_op.sample_fn = sample_fn iter(self.source) except TypeError: # Use generator function if input callable