Merge pull request !3549 from anthonyaje/device_augment_mapop_refactor1tags/v0.7.0-beta
| @@ -66,6 +66,7 @@ add_dependencies(kernels core) | |||
| add_dependencies(engine-datasetops-source core) | |||
| add_dependencies(engine-datasetops-source-sampler core) | |||
| add_dependencies(engine-datasetops core) | |||
| add_dependencies(engine-datasetops-mapop core) | |||
| add_dependencies(engine-opt core) | |||
| add_dependencies(engine-perf core) | |||
| add_dependencies(engine-gnn core) | |||
| @@ -89,6 +90,7 @@ set(submodules | |||
| $<TARGET_OBJECTS:cpp-API> | |||
| $<TARGET_OBJECTS:engine-datasetops-source> | |||
| $<TARGET_OBJECTS:engine-datasetops-source-sampler> | |||
| $<TARGET_OBJECTS:engine-datasetops-mapop> | |||
| $<TARGET_OBJECTS:engine-gnn> | |||
| $<TARGET_OBJECTS:engine-perf> | |||
| $<TARGET_OBJECTS:engine-datasetops> | |||
| @@ -27,7 +27,7 @@ | |||
| #include "minddata/dataset/engine/datasetops/source/voc_op.h" | |||
| // Dataset operator headers (in alphabetical order) | |||
| #include "minddata/dataset/engine/datasetops/batch_op.h" | |||
| #include "minddata/dataset/engine/datasetops/map_op.h" | |||
| #include "minddata/dataset/engine/datasetops/map_op/map_op.h" | |||
| #include "minddata/dataset/engine/datasetops/project_op.h" | |||
| #include "minddata/dataset/engine/datasetops/rename_op.h" | |||
| #include "minddata/dataset/engine/datasetops/repeat_op.h" | |||
| @@ -537,9 +537,6 @@ std::vector<std::shared_ptr<DatasetOp>> MapDataset::Build() { | |||
| // A vector containing shared pointer to the Dataset Ops that this object will create | |||
| std::vector<std::shared_ptr<DatasetOp>> node_ops; | |||
| // Currently default is true, and this is not exposed to user. | |||
| bool perf_mode = true; | |||
| std::vector<std::shared_ptr<TensorOp>> tensor_ops; | |||
| // Build tensorOp from tensorOperation vector | |||
| @@ -550,8 +547,7 @@ std::vector<std::shared_ptr<DatasetOp>> MapDataset::Build() { | |||
| // This parameter will be removed with next rebase | |||
| std::vector<std::string> col_orders; | |||
| auto map_op = | |||
| std::make_shared<MapOp>(input_columns_, output_columns_, tensor_ops, num_workers_, connector_que_size_, perf_mode); | |||
| auto map_op = std::make_shared<MapOp>(input_columns_, output_columns_, tensor_ops, num_workers_, connector_que_size_); | |||
| if (!project_columns_.empty()) { | |||
| auto project_op = std::make_shared<ProjectOp>(project_columns_); | |||
| node_ops.push_back(project_op); | |||
| @@ -39,7 +39,7 @@ | |||
| #include "minddata/dataset/engine/datasetops/batch_op.h" | |||
| #include "minddata/dataset/engine/datasetops/dataset_op.h" | |||
| #include "minddata/dataset/engine/datasetops/device_queue_op.h" | |||
| #include "minddata/dataset/engine/datasetops/map_op.h" | |||
| #include "minddata/dataset/engine/datasetops/map_op/map_op.h" | |||
| #include "minddata/dataset/engine/datasetops/project_op.h" | |||
| #include "minddata/dataset/engine/datasetops/rename_op.h" | |||
| #include "minddata/dataset/engine/datasetops/repeat_op.h" | |||
| @@ -26,6 +26,9 @@ namespace dataset { | |||
| using uchar = unsigned char; | |||
| using dsize_t = int64_t; | |||
| // Target devices to perform map operation | |||
| enum class MapTargetDevice { kCpu, kGpu, kDvpp }; | |||
| // Possible dataset types for holding the data and client type | |||
| enum class DatasetType { kUnknown, kArrow, kTf }; | |||
| @@ -22,8 +22,8 @@ endif() | |||
| if (ENABLE_TDTQUE) | |||
| add_dependencies(engine engine-datasetops engine-datasetops-source engine-tdt engine-opt engine-gnn engine-perf | |||
| engine-cache-client engine-cache-server) | |||
| engine-cache-client engine-cache-server engine-datasetops-mapop) | |||
| else () | |||
| add_dependencies(engine engine-datasetops engine-datasetops-source engine-opt engine-gnn engine-perf | |||
| engine-cache-client engine-cache-server) | |||
| engine-cache-client engine-cache-server engine-datasetops-mapop) | |||
| endif () | |||
| @@ -1,4 +1,5 @@ | |||
| add_subdirectory(source) | |||
| add_subdirectory(map_op) | |||
| file(GLOB_RECURSE _CURRENT_SRC_FILES RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "*.cc") | |||
| set_property(SOURCE ${_CURRENT_SRC_FILES} PROPERTY COMPILE_DEFINITIONS SUBMODULE_ID=mindspore::SubModuleId::SM_MD) | |||
| @@ -9,7 +10,6 @@ set(DATASET_ENGINE_DATASETOPS_SRC_FILES | |||
| pipeline_op.cc | |||
| batch_op.cc | |||
| device_queue_op.cc | |||
| map_op.cc | |||
| project_op.cc | |||
| rename_op.cc | |||
| repeat_op.cc | |||
| @@ -37,4 +37,3 @@ if (ENABLE_PYTHON) | |||
| endif() | |||
| add_library(engine-datasetops OBJECT ${DATASET_ENGINE_DATASETOPS_SRC_FILES}) | |||
| @@ -0,0 +1,10 @@ | |||
| file(GLOB_RECURSE _CURRENT_SRC_FILES RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "*.cc") | |||
| set_property(SOURCE ${_CURRENT_SRC_FILES} PROPERTY COMPILE_DEFINITIONS SUBMODULE_ID=mindspore::SubModuleId::SM_MD) | |||
| set(DATASET_ENGINE_DATASETOPS_MAPOP_SRC_FILES | |||
| map_op.cc | |||
| cpu_map_job.cc | |||
| gpu_map_job.cc | |||
| ) | |||
| add_library(engine-datasetops-mapop OBJECT ${DATASET_ENGINE_DATASETOPS_MAPOP_SRC_FILES}) | |||
| @@ -0,0 +1,56 @@ | |||
| /** | |||
| * Copyright 2020 Huawei Technologies Co., Ltd | |||
| * | |||
| * Licensed under the Apache License, Version 2.0 (the "License"); | |||
| * you may not use this file except in compliance with the License. | |||
| * You may obtain a copy of the License at | |||
| * | |||
| * http://www.apache.org/licenses/LICENSE-2.0 | |||
| * | |||
| * Unless required by applicable law or agreed to in writing, software | |||
| * distributed under the License is distributed on an "AS IS" BASIS, | |||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
| * See the License for the specific language governing permissions and | |||
| * limitations under the License. | |||
| */ | |||
| #include <memory> | |||
| #include <vector> | |||
| #include <utility> | |||
| #include "minddata/dataset/engine/datasetops/map_op/cpu_map_job.h" | |||
| namespace mindspore { | |||
| namespace dataset { | |||
| // Constructor | |||
| CpuMapJob::CpuMapJob() = default; | |||
| // Constructor | |||
| CpuMapJob::CpuMapJob(std::vector<std::shared_ptr<TensorOp>> operations) : MapJob(operations) {} | |||
| // Destructor | |||
| CpuMapJob::~CpuMapJob() = default; | |||
| // A function to execute a cpu map job | |||
| Status CpuMapJob::Run(std::vector<TensorRow> in, std::vector<TensorRow> *out) { | |||
| int32_t num_rows = in.size(); | |||
| for (int32_t row = 0; row < num_rows; row++) { | |||
| TensorRow input_row = in[row]; | |||
| TensorRow result_row; | |||
| for (size_t i = 0; i < ops_.size(); i++) { | |||
| // Call compute function for cpu | |||
| RETURN_IF_NOT_OK(ops_[i]->Compute(input_row, &result_row)); | |||
| // Assign result_row to to_process for the next TensorOp processing, except for the last TensorOp in the list. | |||
| if (i + 1 < ops_.size()) { | |||
| input_row = std::move(result_row); | |||
| } | |||
| } | |||
| out->push_back(std::move(result_row)); | |||
| } | |||
| return Status::OK(); | |||
| } | |||
| } // namespace dataset | |||
| } // namespace mindspore | |||
| @@ -0,0 +1,43 @@ | |||
| /** | |||
| * Copyright 2020 Huawei Technologies Co., Ltd | |||
| * | |||
| * Licensed under the Apache License, Version 2.0 (the "License"); | |||
| * you may not use this file except in compliance with the License. | |||
| * You may obtain a copy of the License at | |||
| * | |||
| * http://www.apache.org/licenses/LICENSE-2.0 | |||
| * | |||
| * Unless required by applicable law or agreed to in writing, software | |||
| * distributed under the License is distributed on an "AS IS" BASIS, | |||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
| * See the License for the specific language governing permissions and | |||
| * limitations under the License. | |||
| */ | |||
| #ifndef DATASET_ENGINE_DATASETOPS_MAP_OP_CPU_MAP_JOB_H_ | |||
| #define DATASET_ENGINE_DATASETOPS_MAP_OP_CPU_MAP_JOB_H_ | |||
| #include <memory> | |||
| #include <vector> | |||
| #include "minddata/dataset/engine/datasetops/map_op/map_job.h" | |||
| namespace mindspore { | |||
| namespace dataset { | |||
| class CpuMapJob : public MapJob { | |||
| public: | |||
| // Constructor | |||
| CpuMapJob(); | |||
| // Constructor | |||
| explicit CpuMapJob(std::vector<std::shared_ptr<TensorOp>> operations); | |||
| // Destructor | |||
| ~CpuMapJob(); | |||
| // A pure virtual run function to execute a cpu map job | |||
| Status Run(std::vector<TensorRow> in, std::vector<TensorRow> *out) override; | |||
| }; | |||
| } // namespace dataset | |||
| } // namespace mindspore | |||
| #endif // DATASET_ENGINE_DATASETOPS_MAP_OP_CPU_MAP_JOB_H_ | |||
| @@ -0,0 +1,37 @@ | |||
| /** | |||
| * Copyright 2020 Huawei Technologies Co., Ltd | |||
| * | |||
| * Licensed under the Apache License, Version 2.0 (the "License"); | |||
| * you may not use this file except in compliance with the License. | |||
| * You may obtain a copy of the License at | |||
| * | |||
| * http://www.apache.org/licenses/LICENSE-2.0 | |||
| * | |||
| * Unless required by applicable law or agreed to in writing, software | |||
| * distributed under the License is distributed on an "AS IS" BASIS, | |||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
| * See the License for the specific language governing permissions and | |||
| * limitations under the License. | |||
| */ | |||
| #include <memory> | |||
| #include <vector> | |||
| #include "minddata/dataset/engine/datasetops/map_op/gpu_map_job.h" | |||
| namespace mindspore { | |||
| namespace dataset { | |||
| // Constructor | |||
| GpuMapJob::GpuMapJob(std::vector<std::shared_ptr<TensorOp>> operations) : MapJob(operations) {} | |||
| // Destructor | |||
| GpuMapJob::~GpuMapJob() = default; | |||
| // A function to execute a cpu map job | |||
| Status GpuMapJob::Run(std::vector<TensorRow> in, std::vector<TensorRow> *out) { | |||
| // Do nothing for now | |||
| return Status::OK(); | |||
| } | |||
| } // namespace dataset | |||
| } // namespace mindspore | |||
| @@ -0,0 +1,40 @@ | |||
| /** | |||
| * Copyright 2020 Huawei Technologies Co., Ltd | |||
| * | |||
| * Licensed under the Apache License, Version 2.0 (the "License"); | |||
| * you may not use this file except in compliance with the License. | |||
| * You may obtain a copy of the License at | |||
| * | |||
| * http://www.apache.org/licenses/LICENSE-2.0 | |||
| * | |||
| * Unless required by applicable law or agreed to in writing, software | |||
| * distributed under the License is distributed on an "AS IS" BASIS, | |||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
| * See the License for the specific language governing permissions and | |||
| * limitations under the License. | |||
| */ | |||
| #ifndef DATASET_ENGINE_DATASETOPS_MAP_OP_GPU_MAP_JOB_H_ | |||
| #define DATASET_ENGINE_DATASETOPS_MAP_OP_GPU_MAP_JOB_H_ | |||
| #include <memory> | |||
| #include <vector> | |||
| #include "minddata/dataset/engine/datasetops/map_op/map_job.h" | |||
| namespace mindspore { | |||
| namespace dataset { | |||
| class GpuMapJob : public MapJob { | |||
| public: | |||
| // Constructor | |||
| explicit GpuMapJob(std::vector<std::shared_ptr<TensorOp>> operations); | |||
| // Destructor | |||
| ~GpuMapJob(); | |||
| // A pure virtual run function to execute a cpu map job | |||
| Status Run(std::vector<TensorRow> in, std::vector<TensorRow> *out) override; | |||
| }; | |||
| } // namespace dataset | |||
| } // namespace mindspore | |||
| #endif // DATASET_ENGINE_DATASETOPS_MAP_OP_GPU_MAP_JOB_H_ | |||
| @@ -0,0 +1,55 @@ | |||
| /** | |||
| * Copyright 2020 Huawei Technologies Co., Ltd | |||
| * | |||
| * Licensed under the Apache License, Version 2.0 (the "License"); | |||
| * you may not use this file except in compliance with the License. | |||
| * You may obtain a copy of the License at | |||
| * | |||
| * http://www.apache.org/licenses/LICENSE-2.0 | |||
| * | |||
| * Unless required by applicable law or agreed to in writing, software | |||
| * distributed under the License is distributed on an "AS IS" BASIS, | |||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
| * See the License for the specific language governing permissions and | |||
| * limitations under the License. | |||
| */ | |||
| #ifndef DATASET_ENGINE_DATASETOPS_MAP_OP_MAP_JOB_H_ | |||
| #define DATASET_ENGINE_DATASETOPS_MAP_OP_MAP_JOB_H_ | |||
| #include <memory> | |||
| #include <vector> | |||
| #include "minddata/dataset/kernels/tensor_op.h" | |||
| #include "minddata/dataset/core/tensor.h" | |||
| #include "minddata/dataset/core/tensor_row.h" | |||
| #include "minddata/dataset/util/status.h" | |||
| namespace mindspore { | |||
| namespace dataset { | |||
| class MapJob { | |||
| public: | |||
| // Constructor | |||
| explicit MapJob(std::vector<std::shared_ptr<TensorOp>> operations) : ops_(operations) {} | |||
| // Constructor | |||
| MapJob() = default; | |||
| // Destructor | |||
| ~MapJob() = default; | |||
| Status AddOperation(std::shared_ptr<TensorOp> operation) { | |||
| ops_.push_back(operation); | |||
| return Status::OK(); | |||
| } | |||
| // A pure virtual run function to execute a particular map job | |||
| virtual Status Run(std::vector<TensorRow> in, std::vector<TensorRow> *out) = 0; | |||
| protected: | |||
| std::vector<std::shared_ptr<TensorOp>> ops_; | |||
| }; | |||
| } // namespace dataset | |||
| } // namespace mindspore | |||
| #endif // DATASET_ENGINE_DATASETOPS_MAP_OP_MAP_JOB_H_ | |||
| @@ -13,7 +13,7 @@ | |||
| * See the License for the specific language governing permissions and | |||
| * limitations under the License. | |||
| */ | |||
| #include "minddata/dataset/engine/datasetops/map_op.h" | |||
| #include <algorithm> | |||
| #include <cstring> | |||
| #include <iomanip> | |||
| #include <iostream> | |||
| @@ -28,6 +28,9 @@ | |||
| #include "minddata/dataset/engine/db_connector.h" | |||
| #include "minddata/dataset/engine/execution_tree.h" | |||
| #include "minddata/dataset/engine/opt/pass.h" | |||
| #include "minddata/dataset/engine/datasetops/map_op/map_op.h" | |||
| #include "minddata/dataset/engine/datasetops/map_op/cpu_map_job.h" | |||
| #include "minddata/dataset/engine/datasetops/map_op/gpu_map_job.h" | |||
| #include "minddata/dataset/kernels/tensor_op.h" | |||
| #include "utils/log_adapter.h" | |||
| #include "minddata/dataset/util/task_manager.h" | |||
| @@ -35,7 +38,7 @@ | |||
| namespace mindspore { | |||
| namespace dataset { | |||
| // Builder constructor. Creates the builder object. | |||
| MapOp::Builder::Builder() : build_perf_mode_(true) { | |||
| MapOp::Builder::Builder() { | |||
| std::shared_ptr<ConfigManager> cfg = GlobalContext::config_manager(); | |||
| build_num_workers_ = cfg->num_parallel_workers(); | |||
| build_op_connector_size_ = cfg->op_connector_size(); | |||
| @@ -54,31 +57,27 @@ Status MapOp::Builder::sanityCheck() const { | |||
| Status MapOp::Builder::Build(std::shared_ptr<MapOp> *ptr) { | |||
| RETURN_IF_NOT_OK(sanityCheck()); | |||
| *ptr = std::make_shared<MapOp>(std::move(build_in_col_names_), std::move(build_out_col_names_), | |||
| std::move(build_tensor_funcs_), build_num_workers_, build_op_connector_size_, | |||
| build_perf_mode_); | |||
| std::move(build_tensor_funcs_), build_num_workers_, build_op_connector_size_); | |||
| return Status::OK(); | |||
| } | |||
| // Constructor of MapOp | |||
| MapOp::MapOp(const std::vector<std::string> &in_col_names, const std::vector<std::string> &out_col_names, | |||
| std::vector<std::shared_ptr<TensorOp>> tensor_funcs, int32_t num_workers, int32_t op_connector_size, | |||
| bool perf_mode) | |||
| std::vector<std::shared_ptr<TensorOp>> tensor_funcs, int32_t num_workers, int32_t op_connector_size) | |||
| : ParallelOp(num_workers, op_connector_size), | |||
| tfuncs_(std::move(tensor_funcs)), | |||
| in_columns_(in_col_names), | |||
| out_columns_(out_col_names), | |||
| perf_mode_(perf_mode) { | |||
| out_columns_(out_col_names) { | |||
| // If caller didn't specify the out_col_names, assume they are same as the in_columns. | |||
| if (out_columns_.empty() || out_columns_[0].empty()) { | |||
| out_columns_ = in_columns_; | |||
| } | |||
| MS_LOG(DEBUG) << "Performance Mode in map operator is " << perf_mode_ << "."; | |||
| } | |||
| // The number of threads consuming data from previous op's output Connector. | |||
| int32_t MapOp::num_consumers() const { | |||
| // When Performance Mode is on, there is only one thread consuming from the previous Connector. | |||
| return perf_mode_ == true ? 1 : num_workers_; | |||
| return 1; | |||
| } | |||
| // A print method typically used for debugging | |||
| @@ -106,36 +105,98 @@ void MapOp::Print(std::ostream &out, bool show_all) const { | |||
| } | |||
| } | |||
| // A helper function that fetch worker map job from local queues and extract the data and map job list | |||
| Status MapOp::FetchNextWork(uint32_t worker_id, std::unique_ptr<DataBuffer> *db, | |||
| std::vector<std::shared_ptr<MapJob>> *job_list) { | |||
| std::unique_ptr<MapWorkerJob> worker_job; | |||
| // Fetch the next worker job and data buffer | |||
| RETURN_IF_NOT_OK(local_queues_[worker_id]->PopFront(&worker_job)); | |||
| // Extract the databuffer and job list from the map worker job. | |||
| *db = std::move(worker_job->databuffer); | |||
| *job_list = std::move(worker_job->jobs); | |||
| return Status::OK(); | |||
| } | |||
| Status MapOp::GenerateWorkerJob(const std::unique_ptr<MapWorkerJob> *worker_job) { | |||
| std::shared_ptr<MapJob> map_job = nullptr; | |||
| MapTargetDevice prev_target; | |||
| for (size_t i = 0; i < tfuncs_.size(); i++) { | |||
| // Currently we only have CPU as the device target | |||
| // In the future, we will have heuristic or control from user to select target device | |||
| // MapTargetDevice target_device; | |||
| // RETURN_IF_NOT_OK(SelectTarget(tfuncs_[i], &target_device)); | |||
| MapTargetDevice target_device = MapTargetDevice::kCpu; | |||
| switch (target_device) { | |||
| case MapTargetDevice::kCpu: | |||
| // If there is no existing map_job, we will create one. | |||
| // map_job could be nullptr when we are at the first tensor op or when the target device of the prev op | |||
| // is different with that of the current op. | |||
| if (map_job == nullptr) { | |||
| map_job = std::make_shared<CpuMapJob>(); | |||
| } | |||
| map_job->AddOperation(tfuncs_[i]); | |||
| break; | |||
| case MapTargetDevice::kGpu: | |||
| break; | |||
| case MapTargetDevice::kDvpp: | |||
| break; | |||
| default: | |||
| break; | |||
| } | |||
| // Push map_job into worker_job if one of the two conditions is true: | |||
| // 1) It is the last tensor operation in tfuncs_ | |||
| // 2) The the target device of the current tensor operation is different with previous one | |||
| if ((i + 1 == tfuncs_.size()) || ((i != 0) && (prev_target != target_device))) { | |||
| (*worker_job)->jobs.push_back(std::move(map_job)); | |||
| } | |||
| prev_target = target_device; | |||
| } | |||
| return Status::OK(); | |||
| } | |||
| // This class functor will provide the master loop that drives the logic for performing the work | |||
| Status MapOp::operator()() { | |||
| if (perf_mode_) { | |||
| // Create and register the local queues. | |||
| local_queues_.Init(num_workers_, oc_queue_size_); | |||
| Status rc = local_queues_.Register(tree_->AllTasks()); | |||
| if (rc.IsError()) { | |||
| TaskManager::FindMe()->Post(); | |||
| return rc; | |||
| } | |||
| // Create and register the local queues. | |||
| local_queues_.Init(num_workers_, oc_queue_size_); | |||
| Status rc = local_queues_.Register(tree_->AllTasks()); | |||
| if (rc.IsError()) { | |||
| TaskManager::FindMe()->Post(); | |||
| return rc; | |||
| } | |||
| // The operator class just starts off threads by calling the tree_ function | |||
| Status rc = tree_->LaunchWorkers(num_workers_, std::bind(&MapOp::WorkerEntry, this, std::placeholders::_1)); | |||
| rc = tree_->LaunchWorkers(num_workers_, std::bind(&MapOp::WorkerEntry, this, std::placeholders::_1)); | |||
| // Synchronize with TaskManager | |||
| TaskManager::FindMe()->Post(); | |||
| RETURN_IF_NOT_OK(rc); | |||
| if (perf_mode_) { | |||
| int64_t que_id = 0; | |||
| std::unique_ptr<DataBuffer> buff; | |||
| bool is_eof = false; | |||
| // Draining output connector of the previous op and distribute it to local queues. | |||
| // Stop when all worker threads are finished (received EOF). | |||
| while (!is_eof) { | |||
| RETURN_IF_NOT_OK(child_[0]->GetNextBuffer(&buff, 0)); | |||
| is_eof = buff->eof(); | |||
| RETURN_IF_NOT_OK(local_queues_[que_id]->Add(std::move(buff))); | |||
| que_id = (que_id + 1) % num_workers_; | |||
| } | |||
| int64_t que_id = 0; | |||
| std::unique_ptr<DataBuffer> buff; | |||
| bool is_eof = false; | |||
| // Drain output connector of the previous op, generate jobs for worker threads, and distribute them via local queues | |||
| // Stop when all worker threads are finished (received EOF) | |||
| while (!is_eof) { | |||
| RETURN_IF_NOT_OK(child_[0]->GetNextBuffer(&buff, 0)); | |||
| is_eof = buff->eof(); | |||
| // Create an empty map worker job to be populated by a databuffer and map jobs | |||
| std::unique_ptr<MapWorkerJob> worker_job = std::make_unique<MapWorkerJob>(); | |||
| worker_job->databuffer = std::move(buff); | |||
| // Populate map worker job for a worker to execute | |||
| RETURN_IF_NOT_OK(GenerateWorkerJob(&worker_job)); | |||
| // Push map worker job to the corresponding worker's queue | |||
| RETURN_IF_NOT_OK(local_queues_[que_id]->Add(std::move(worker_job))); | |||
| que_id = (que_id + 1) % num_workers_; | |||
| } | |||
| return Status::OK(); | |||
| @@ -148,12 +209,11 @@ Status MapOp::operator()() { | |||
| Status MapOp::WorkerEntry(int32_t worker_id) { | |||
| // Handshake with TaskManager that thread creation is successful. | |||
| TaskManager::FindMe()->Post(); | |||
| std::unique_ptr<DataBuffer> in_buffer; | |||
| // Getting a databuffer to work on. | |||
| // Perform the first fetch here outside of the loop. This allows us to execute one-time only | |||
| // initializations that happen after the first fetch. | |||
| RETURN_IF_NOT_OK(FetchNextBuffer(&in_buffer, worker_id)); | |||
| std::unique_ptr<DataBuffer> in_buffer; | |||
| std::vector<std::shared_ptr<MapJob>> job_list; | |||
| // Fetch next data buffer and map job list | |||
| RETURN_IF_NOT_OK(FetchNextWork(worker_id, &in_buffer, &job_list)); | |||
| // Sanity check the databuffer. | |||
| // Special case: if there's more threads than buffers, some threads simply get the final control | |||
| @@ -175,7 +235,8 @@ Status MapOp::WorkerEntry(int32_t worker_id) { | |||
| if (in_buffer->eoe()) { | |||
| // Calling base class EoeReceived to forward eoe buffer. | |||
| RETURN_IF_NOT_OK(EoeReceived(worker_id)); | |||
| RETURN_IF_NOT_OK(FetchNextBuffer(&in_buffer, worker_id)); | |||
| // Fetch next data buffer and map job list | |||
| RETURN_IF_NOT_OK(FetchNextWork(worker_id, &in_buffer, &job_list)); | |||
| continue; | |||
| } else if (in_buffer->eof()) { | |||
| // Calling base class EofReceived to forward eof buffer. | |||
| @@ -185,76 +246,77 @@ Status MapOp::WorkerEntry(int32_t worker_id) { | |||
| std::unique_ptr<TensorQTable> new_tensor_table(std::make_unique<TensorQTable>()); | |||
| // Perform the compute function of TensorOp(s) and store the result in new_tensor_table. | |||
| RETURN_IF_NOT_OK(WorkerCompute(in_buffer.get(), new_tensor_table.get())); | |||
| RETURN_IF_NOT_OK(WorkerCompute(in_buffer.get(), new_tensor_table.get(), job_list)); | |||
| // Replace the TensorTable in DataBuffer with the new one. | |||
| in_buffer->set_tensor_table(std::move(new_tensor_table)); | |||
| // Push the buffer onto the connector for next operator to consume. | |||
| RETURN_IF_NOT_OK(out_connector_->Add(static_cast<int>(worker_id), std::move(in_buffer))); | |||
| // Fetch the next buffer and loop back to the top. | |||
| RETURN_IF_NOT_OK(FetchNextBuffer(&in_buffer, worker_id)); | |||
| // Fetch next data buffer and map job list | |||
| RETURN_IF_NOT_OK(FetchNextWork(worker_id, &in_buffer, &job_list)); | |||
| } | |||
| return Status::OK(); | |||
| } | |||
| Status MapOp::WorkerCompute(DataBuffer *in_buffer, TensorQTable *new_tensor_table) { | |||
| // Getting number of rows and cols in this buffer. | |||
| Status MapOp::WorkerCompute(DataBuffer *in_buffer, TensorQTable *new_tensor_table, | |||
| const std::vector<std::shared_ptr<MapJob>> &job_list) { | |||
| int32_t num_rows = in_buffer->NumRows(); | |||
| int32_t num_cols = in_buffer->NumCols(); | |||
| std::vector<TensorRow> job_input_table; | |||
| std::vector<TensorRow> original_table; | |||
| // Prepare the data that we need from in_buffer | |||
| for (int32_t r = 0; r < num_rows; r++) { | |||
| // to_process : A vector of Tensors only holding cols in input_columns. | |||
| // result_row; : A vector of Tensors to hold the result after Compute(). | |||
| // cur_row : A vector of Tensors holding all the columns from DataBuffer. | |||
| TensorRow to_process, result_row, cur_row; | |||
| // cur_row : A vector of Tensors holding all the cols from DataBuffer. | |||
| TensorRow to_process, cur_row; | |||
| RETURN_IF_NOT_OK(in_buffer->PopRow(&cur_row)); | |||
| // From the current row, select the Tensor that need to be passed to TensorOp | |||
| (void)std::transform(to_process_indices_.begin(), to_process_indices_.end(), std::back_inserter(to_process), | |||
| [&cur_row](const auto &it) { return std::move(cur_row[it]); }); | |||
| job_input_table.push_back(std::move(to_process)); | |||
| original_table.push_back(std::move(cur_row)); | |||
| } | |||
| // Populate the Tensor from the current row to be processed by TensorOp | |||
| for (const auto &idx : to_process_indices_) { | |||
| to_process.push_back(std::move(cur_row[idx])); | |||
| } | |||
| // Looping over multiple TensorOps supplied in to MapOp. | |||
| // The assumption is that the result of one TensorOp matches the required input to the next TensorOp. | |||
| for (size_t i = 0; i < tfuncs_.size(); i++) { | |||
| // TensorOp can operate on single col or multiple cols. MapOp always call compute for multiple cols. | |||
| // TensorOp base class will call the single column Compute() depending on the ops. | |||
| // Note: The columns of the result_row is not preallocated, the compute function of each tensor op are | |||
| // required to resize/push back the result_row | |||
| RETURN_IF_NOT_OK(tfuncs_[i]->Compute(to_process, &result_row)); | |||
| // Assign result_row to to_process for the next TensorOp processing, except for the last TensorOp in the list. | |||
| if (i + 1 < tfuncs_.size()) { | |||
| to_process = std::move(result_row); | |||
| } | |||
| // Variable to keep the result after executing the job. | |||
| std::vector<TensorRow> result_table; | |||
| // Executing the list of jobs | |||
| for (size_t i = 0; i < job_list.size(); i++) { | |||
| // Executre MapJob. | |||
| RETURN_IF_NOT_OK(job_list[i]->Run(job_input_table, &result_table)); | |||
| // Assign the pocessed data as an input for the next job processing, except for the last TensorOp in the list. | |||
| if (i + 1 < job_list.size()) { | |||
| job_input_table = std::move(result_table); | |||
| } | |||
| } | |||
| if (out_columns_.size() != result_row.size()) { | |||
| return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, | |||
| "Result of a tensorOp doesn't match output column names"); | |||
| } | |||
| // Sanity check a row in result_table | |||
| if (!result_table.empty() && out_columns_.size() != result_table[0].size()) { | |||
| return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, | |||
| "Result of a tensorOp doesn't match output column names"); | |||
| } | |||
| // Merging the data processed by job (result_table) with the data that are not used. | |||
| for (int32_t r = 0; r < num_rows; r++) { | |||
| TensorRow out_row; | |||
| if (in_columns_.size() == out_columns_.size()) { | |||
| for (size_t i = 0; i < result_row.size(); i++) { | |||
| cur_row[to_process_indices_[i]] = std::move(result_row[i]); | |||
| // Place the processed tensor back into the original index of the input tensor | |||
| for (size_t i = 0; i < result_table[r].size(); i++) { | |||
| original_table[r][to_process_indices_[i]] = std::move(result_table[r][i]); | |||
| } | |||
| new_tensor_table->push_back(std::move(cur_row)); | |||
| out_row = std::move(original_table[r]); | |||
| } else { | |||
| // Add the columns we did not touch to the result_row. | |||
| // Append the data in the original table that we did not use to the end of each row in result_table. | |||
| for (int32_t i = 0; i < num_cols; i++) { | |||
| if (keep_input_columns_[i]) { | |||
| result_row.push_back(std::move(cur_row[i])); | |||
| result_table[r].push_back(std::move(original_table[r][i])); | |||
| } | |||
| } | |||
| // Add this final result_row to our new TensorTable. | |||
| new_tensor_table->push_back(std::move(result_row)); | |||
| out_row = std::move(result_table[r]); | |||
| } | |||
| // Add this final out_row to our new TensorTable. | |||
| new_tensor_table->push_back(std::move(out_row)); | |||
| } | |||
| return Status::OK(); | |||
| } | |||
| @@ -288,13 +350,11 @@ Status MapOp::ValidateInColumns(const std::unordered_map<std::string, int32_t> & | |||
| Status MapOp::InitPrivateVariable(std::unordered_map<std::string, int32_t> *col_name_id_map) { | |||
| // If input_columns is empty(), The col at index-0 will be picked. | |||
| if (in_columns_.empty()) { | |||
| for (const auto &pair : *col_name_id_map) { | |||
| if (pair.second == 0) { | |||
| MS_LOG(INFO) << "Input columns empty for map op, will apply to the first column in the current table."; | |||
| in_columns_.push_back(pair.first); | |||
| break; | |||
| } | |||
| } | |||
| auto itr = | |||
| std::find_if(col_name_id_map->begin(), col_name_id_map->end(), [](const auto &it) { return it.second == 0; }); | |||
| CHECK_FAIL_RETURN_UNEXPECTED(itr != col_name_id_map->end(), "Column name id map doesn't have id 0"); | |||
| MS_LOG(INFO) << "Input columns empty for map op, will apply to the first column in the current table."; | |||
| in_columns_.push_back(itr->first); | |||
| // If caller didn't specify the out_col_names, assume they are same as the input_columns. | |||
| // This was done in the constructor, but if input columns was empty to start we have to redo it here. | |||
| @@ -24,6 +24,7 @@ | |||
| #include "minddata/dataset/engine/datasetops/parallel_op.h" | |||
| #include "minddata/dataset/kernels/tensor_op.h" | |||
| #include "minddata/dataset/util/queue.h" | |||
| #include "minddata/dataset/engine/datasetops/map_op/map_job.h" | |||
| namespace mindspore { | |||
| namespace dataset { | |||
| @@ -107,13 +108,6 @@ class MapOp : public ParallelOp { | |||
| return *this; | |||
| } | |||
| // Setter method. | |||
| // @return Builder setter method returns reference to the builder. | |||
| Builder &SetPerformanceMode(bool perf_mode) { | |||
| build_perf_mode_ = perf_mode; | |||
| return *this; | |||
| } | |||
| // The builder "build" method creates the final object. | |||
| // @param ptr The shared_ptr to the new MapOp object | |||
| // @return Status | |||
| @@ -125,7 +119,6 @@ class MapOp : public ParallelOp { | |||
| std::vector<std::shared_ptr<TensorOp>> build_tensor_funcs_; | |||
| int32_t build_num_workers_; | |||
| int32_t build_op_connector_size_; | |||
| bool build_perf_mode_; // Default true. | |||
| // Check if the required parameters are set by the builder. | |||
| // @return Status The error code return | |||
| @@ -140,8 +133,7 @@ class MapOp : public ParallelOp { | |||
| // @param num_workers The number of worker threads. | |||
| // @param op_connector_size The size of each queue in the connector. | |||
| MapOp(const std::vector<std::string> &in_col_names, const std::vector<std::string> &out_col_names, | |||
| std::vector<std::shared_ptr<TensorOp>> tensor_funcs, int32_t num_workers, int32_t op_connector_size, | |||
| bool perf_mode); | |||
| std::vector<std::shared_ptr<TensorOp>> tensor_funcs, int32_t num_workers, int32_t op_connector_size); | |||
| // Destructor | |||
| ~MapOp() = default; | |||
| @@ -164,6 +156,8 @@ class MapOp : public ParallelOp { | |||
| // Class functor operator () override. | |||
| // All dataset ops operate by launching a thread (see ExecutionTree). This class functor will | |||
| // provide the master loop that drives the logic for performing the work | |||
| // This main thread creates local queues, pulls databuffers from the previous | |||
| // op's Connector and distributes them to the local queues. Workers pull from the local queues. | |||
| // @return Status The error code return | |||
| Status operator()() override; | |||
| @@ -189,12 +183,24 @@ class MapOp : public ParallelOp { | |||
| const auto &TFuncs() const { return tfuncs_; } | |||
| private: | |||
| // Local queues where worker threads can pop from. | |||
| // Popping directly from the Connector can block if the previous designated threads haven't pop. | |||
| // Setting the size of these queues to 0 is essentially the same as pulling directly from Connector. | |||
| QueueList<std::unique_ptr<DataBuffer>> local_queues_; | |||
| // A unit of job for map worker thread. | |||
| // MapWorkerJob holds a list of MapJob where each MapJob can be a CpuMapJob, GpuMapJob or DvppMapJob. | |||
| struct MapWorkerJob { | |||
| std::vector<std::shared_ptr<MapJob>> jobs; | |||
| std::unique_ptr<DataBuffer> databuffer; | |||
| }; | |||
| // A helper function to create jobs for workers. | |||
| Status GenerateWorkerJob(const std::unique_ptr<MapWorkerJob> *worker_job); | |||
| // Static variables to be ready by worker threads, no modification and readonly | |||
| // A helper function that fetch worker map job from local queues and extract the data and map job list | |||
| Status FetchNextWork(uint32_t worker_id, std::unique_ptr<DataBuffer> *db, | |||
| std::vector<std::shared_ptr<MapJob>> *job_list); | |||
| // Local queues where worker threads get a job from | |||
| QueueList<std::unique_ptr<MapWorkerJob>> local_queues_; | |||
| // Tensorops to be read and applied by worker threads | |||
| std::vector<std::shared_ptr<TensorOp>> tfuncs_; | |||
| // Variable to store the column name that the tensorOps are consuming | |||
| @@ -209,13 +215,6 @@ class MapOp : public ParallelOp { | |||
| // Indices of the columns to process. | |||
| std::vector<size_t> to_process_indices_; | |||
| // Performance mode is when the main thread creates local queues, pulls databuffers from the previous | |||
| // op's Connector and distributes them to the local queues. Workers pull from the local queues. | |||
| // If this flag is false, each worker pulls directly from the Connector. This use less resources | |||
| // (thread and memory), but when the computation cost is heavy (e.g. DecodeOp) and fluctuating, it can | |||
| // cause additional blocking because pop calls to Connector from the threads are synchronized to enforce the order. | |||
| bool perf_mode_; | |||
| // Private function for worker/thread to loop continuously. It comprises the main | |||
| // logic of MapOp: getting the data from previous Op, validating user specified column names, | |||
| // applying a list of TensorOps to each of the data, process the results and then | |||
| @@ -224,25 +223,12 @@ class MapOp : public ParallelOp { | |||
| // @return Status The error code return | |||
| Status WorkerEntry(int32_t worker_id) override; // In: workerId assigned by tree_ | |||
| // Private helper function for getting the next buffer | |||
| // When PerformanceMode is enabled, workers pop from the local queue. | |||
| // Otherwise, workers pop from the first child output Connector. | |||
| // @param p_buffer - the buffer to return | |||
| // @return Status return code | |||
| Status FetchNextBuffer(std::unique_ptr<DataBuffer> *p_buffer, int32_t worker_id) { | |||
| if (perf_mode_) { | |||
| RETURN_IF_NOT_OK(local_queues_[worker_id]->PopFront(p_buffer)); | |||
| } else { | |||
| RETURN_IF_NOT_OK(child_[0]->GetNextBuffer(p_buffer, worker_id)); | |||
| } | |||
| return Status::OK(); | |||
| } | |||
| // Private function for worker thread to perform TensorOp's compute function and get the result. | |||
| // @param in_buffer A raw pointer to the DataBuffer. A raw pointer is fine because this function doesn't manage memory | |||
| // and is not shared with other threads. | |||
| // @param[out] new_tensor_table A new Tensor Table to be populated in this function. | |||
| Status WorkerCompute(DataBuffer *in_buffer, TensorQTable *new_tensor_table); | |||
| Status WorkerCompute(DataBuffer *in_buffer, TensorQTable *new_tensor_table, | |||
| const std::vector<std::shared_ptr<MapJob>> &job_list); | |||
| // Private function that create the final column name to index mapping and | |||
| // get indices of the columns this mapop does not use. | |||
| @@ -17,7 +17,7 @@ | |||
| #include <memory> | |||
| #include "minddata/dataset/engine/opt/optional/tensor_op_fusion_pass.h" | |||
| #include "minddata/dataset/kernels/image/decode_op.h" | |||
| #include "minddata/dataset/engine/datasetops/map_op.h" | |||
| #include "minddata/dataset/engine/datasetops/map_op/map_op.h" | |||
| #include "minddata/dataset/kernels/image/random_crop_decode_resize_op.h" | |||
| namespace mindspore { | |||
| @@ -24,7 +24,7 @@ | |||
| #include "minddata/dataset/engine/datasetops/dataset_op.h" | |||
| #include "minddata/dataset/engine/datasetops/device_queue_op.h" | |||
| #include "minddata/dataset/engine/datasetops/epoch_ctrl_op.h" | |||
| #include "minddata/dataset/engine/datasetops/map_op.h" | |||
| #include "minddata/dataset/engine/datasetops/map_op/map_op.h" | |||
| #include "minddata/dataset/engine/datasetops/project_op.h" | |||
| #include "minddata/dataset/engine/datasetops/rename_op.h" | |||
| #include "minddata/dataset/engine/datasetops/repeat_op.h" | |||
| @@ -645,16 +645,14 @@ TEST_F(MindDataTestMapOp, ImageFolder_Decode_Repeat_Resize) { | |||
| map_decode_builder.SetInColNames({"image"}) | |||
| .SetOutColNames({}) | |||
| .SetTensorFuncs(func_list) | |||
| .SetNumWorkers(14) | |||
| .SetPerformanceMode(false); | |||
| .SetNumWorkers(14); | |||
| rc = map_decode_builder.Build(&map_decode_map); | |||
| EXPECT_TRUE(rc.IsOk()); | |||
| map_resize_builder.SetInColNames({"image"}) | |||
| .SetOutColNames({}) | |||
| .SetTensorFuncs(func_list2) | |||
| .SetNumWorkers(15) | |||
| .SetPerformanceMode(false); | |||
| .SetNumWorkers(15); | |||
| rc = map_resize_builder.Build(&map_resize_op); | |||
| EXPECT_TRUE(rc.IsOk()); | |||
| @@ -739,5 +737,3 @@ TEST_F(MindDataTestMapOp, ImageFolder_Decode_Repeat_Resize_NoInputColumns) { | |||
| } | |||
| EXPECT_TRUE(i == 88); | |||
| } | |||
| @@ -19,7 +19,6 @@ | |||
| #include <string> | |||
| #include "minddata/dataset/core/client.h" | |||
| #include "minddata/dataset/core/constants.h" | |||
| #include "minddata/dataset/engine/datasetops/map_op.h" | |||
| #include "minddata/dataset/engine/datasetops/rename_op.h" | |||
| #include "common/common.h" | |||
| #include "common/utils.h" | |||
| @@ -23,7 +23,6 @@ | |||
| #include <thread> | |||
| #include "minddata/dataset/core/client.h" | |||
| #include "minddata/dataset/core/constants.h" | |||
| #include "minddata/dataset/engine/datasetops/map_op.h" | |||
| #include "minddata/dataset/engine/datasetops/zip_op.h" | |||
| #include "minddata/dataset/core/tensor.h" | |||
| #include "minddata/dataset/core/config_manager.h" | |||