From 4fa6d5621c6c81202a00ce03535908950fe7faf7 Mon Sep 17 00:00:00 2001 From: Lixia Chen Date: Fri, 19 Jun 2020 14:34:54 -0400 Subject: [PATCH] Compute column map before launch the tree. --- .../ccsrc/dataset/engine/dataset_iterator.cc | 7 +- .../ccsrc/dataset/engine/dataset_iterator.h | 1 - .../dataset/engine/datasetops/barrier_op.cc | 3 - .../dataset/engine/datasetops/batch_op.cc | 1 - .../datasetops/bucket_batch_by_length_op.cc | 1 - .../engine/datasetops/build_vocab_op.cc | 1 - .../dataset/engine/datasetops/concat_op.cc | 34 +++-- .../dataset/engine/datasetops/concat_op.h | 4 + .../dataset/engine/datasetops/dataset_op.cc | 36 +++--- .../dataset/engine/datasetops/dataset_op.h | 10 +- .../dataset/engine/datasetops/filter_op.cc | 3 - .../ccsrc/dataset/engine/datasetops/map_op.cc | 116 ++++++++---------- .../ccsrc/dataset/engine/datasetops/map_op.h | 13 +- .../dataset/engine/datasetops/project_op.cc | 40 +++--- .../dataset/engine/datasetops/project_op.h | 4 + .../dataset/engine/datasetops/rename_op.cc | 87 ++++++------- .../dataset/engine/datasetops/rename_op.h | 4 +- .../dataset/engine/datasetops/repeat_op.cc | 2 - .../dataset/engine/datasetops/shuffle_op.cc | 3 - .../dataset/engine/datasetops/skip_op.cc | 3 - .../engine/datasetops/source/celeba_op.cc | 17 ++- .../engine/datasetops/source/celeba_op.h | 4 + .../engine/datasetops/source/cifar_op.cc | 16 ++- .../engine/datasetops/source/cifar_op.h | 4 + .../engine/datasetops/source/clue_op.cc | 21 ++-- .../engine/datasetops/source/clue_op.h | 4 + .../engine/datasetops/source/coco_op.cc | 16 ++- .../engine/datasetops/source/coco_op.h | 4 + .../engine/datasetops/source/generator_op.cc | 18 ++- .../engine/datasetops/source/generator_op.h | 4 + .../datasetops/source/image_folder_op.cc | 16 ++- .../datasetops/source/image_folder_op.h | 4 + .../engine/datasetops/source/manifest_op.cc | 16 ++- .../engine/datasetops/source/manifest_op.h | 4 + .../engine/datasetops/source/mindrecord_op.cc | 15 ++- .../engine/datasetops/source/mindrecord_op.h | 4 + .../engine/datasetops/source/mnist_op.cc | 16 ++- .../engine/datasetops/source/mnist_op.h | 4 + .../datasetops/source/random_data_op.cc | 13 +- .../engine/datasetops/source/random_data_op.h | 4 + .../engine/datasetops/source/text_file_op.cc | 17 ++- .../engine/datasetops/source/text_file_op.h | 4 + .../engine/datasetops/source/tf_reader_op.cc | 17 ++- .../engine/datasetops/source/tf_reader_op.h | 4 + .../engine/datasetops/source/voc_op.cc | 16 ++- .../dataset/engine/datasetops/source/voc_op.h | 4 + .../dataset/engine/datasetops/take_op.cc | 1 - .../ccsrc/dataset/engine/datasetops/zip_op.cc | 43 ++++--- .../ccsrc/dataset/engine/datasetops/zip_op.h | 4 + 49 files changed, 419 insertions(+), 268 deletions(-) diff --git a/mindspore/ccsrc/dataset/engine/dataset_iterator.cc b/mindspore/ccsrc/dataset/engine/dataset_iterator.cc index 7eb38785aa..be333741b1 100644 --- a/mindspore/ccsrc/dataset/engine/dataset_iterator.cc +++ b/mindspore/ccsrc/dataset/engine/dataset_iterator.cc @@ -27,7 +27,7 @@ namespace mindspore { namespace dataset { // Constructor of the IteratorBase -IteratorBase::IteratorBase() : curr_buffer_(nullptr), eof_handled_(false), first_row_(true) {} +IteratorBase::IteratorBase() : curr_buffer_(nullptr), eof_handled_(false) {} IteratorBase::~IteratorBase() = default; @@ -51,13 +51,10 @@ Status IteratorBase::GetNextAsMap(TensorMap *out_map) { // The column name mapping comes from the source operator that is producing the data into the iterator. // To avoid having to fetch this for every time, we'll take a local copy of the column name id mapping // and save in the iterator. We only have to do this once. All subsequent iterations use the same mapping. - // Note: This can only be done after the first row has been produced, as this guarantees the the child has - // it's column mapping set up. - if (first_row_) { + if (col_name_id_map_.empty()) { // Determine the column name map by calling the derived class method to retrieve the column // name map col_name_id_map_ = this->GetColumnNameMap(); - first_row_ = false; } // Populate the out map from the row and return it diff --git a/mindspore/ccsrc/dataset/engine/dataset_iterator.h b/mindspore/ccsrc/dataset/engine/dataset_iterator.h index ada2b0ffb6..4e40e77c74 100644 --- a/mindspore/ccsrc/dataset/engine/dataset_iterator.h +++ b/mindspore/ccsrc/dataset/engine/dataset_iterator.h @@ -72,7 +72,6 @@ class IteratorBase { protected: std::unique_ptr curr_buffer_; // holds the current buffer bool eof_handled_; // T/F if this op got an eof - bool first_row_; // internal tracking for first row case std::unordered_map col_name_id_map_; }; diff --git a/mindspore/ccsrc/dataset/engine/datasetops/barrier_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/barrier_op.cc index 5ab2df4ac4..6fc276a75e 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/barrier_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/barrier_op.cc @@ -144,9 +144,6 @@ Status BarrierOp::prepare(TensorQTable *const table) { table->push_back(std::move(new_row)); - // Assign the column name id map - RETURN_IF_NOT_OK(DatasetOp::AssignColMapFromChild()); - // the update code below shouldn't do anything bad if the column name already exists. return Status::OK(); } diff --git a/mindspore/ccsrc/dataset/engine/datasetops/batch_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/batch_op.cc index 60643c90ba..f311c90c33 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/batch_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/batch_op.cc @@ -76,7 +76,6 @@ Status BatchOp::operator()() { std::unique_ptr table = std::make_unique(); child_iterator_ = std::make_unique(this, 0, 0); RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&new_row)); - RETURN_IF_NOT_OK(DatasetOp::AssignColMapFromChild()); // must come after the first fetch above int32_t cur_batch_size = 0; RETURN_IF_NOT_OK(GetBatchSize(&cur_batch_size, CBatchInfo(0, 0, 0))); while (child_iterator_->eof_handled() == false) { diff --git a/mindspore/ccsrc/dataset/engine/datasetops/bucket_batch_by_length_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/bucket_batch_by_length_op.cc index def2ea0fee..5e143b700f 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/bucket_batch_by_length_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/bucket_batch_by_length_op.cc @@ -115,7 +115,6 @@ Status BucketBatchByLengthOp::operator()() { TensorRow current_row; child_iterator_ = std::make_unique(this, 0, 0); RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(¤t_row)); - RETURN_IF_NOT_OK(AssignColMapFromChild()); while (!child_iterator_->eof_handled()) { while (!current_row.empty()) { int32_t element_length; diff --git a/mindspore/ccsrc/dataset/engine/datasetops/build_vocab_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/build_vocab_op.cc index f99804ec9b..ceb5058593 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/build_vocab_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/build_vocab_op.cc @@ -86,7 +86,6 @@ Status BuildVocabOp::operator()() { child_iterator_ = std::make_unique(this, 0, 0); TensorRow new_row; RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&new_row)); - RETURN_IF_NOT_OK(AssignColMapFromChild()); if (!col_names_.empty()) { col_ids_.reserve(col_names_.size()); for (std::string col : col_names_) { diff --git a/mindspore/ccsrc/dataset/engine/datasetops/concat_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/concat_op.cc index 7162dc0b47..c5aac523d2 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/concat_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/concat_op.cc @@ -66,12 +66,6 @@ Status ConcatOp::operator()() { std::unique_ptr buf; RETURN_IF_NOT_OK(child_[0]->GetNextBuffer(&buf)); - // Obtain columns_name_id_map from child_[0] - column_name_id_map_ = child_[0]->column_name_id_map(); - if (column_name_id_map_.empty()) { - RETURN_STATUS_UNEXPECTED("Child column name map cannot be empty!"); - } - int eof_count = 0; while (eof_count != children_num_) { for (int i = 0; i < children_num_; i++) { @@ -115,17 +109,13 @@ Status ConcatOp::Verify(int32_t id, const std::unique_ptr &buf) { buf->GetRow(0, &new_row); if (id == 0) { - // Obtain the column name, data type and data rank in child[0] - column_name_id_ = child_[id]->column_name_id_map(); + // Obtain the data type and data rank in child[0] for (auto item : new_row) { data_type_.push_back(item->type()); data_rank_.push_back(item->Rank()); } } else { - // Compare the column name, data type and data rank with these in child[0] - if (child_[id]->column_name_id_map() != column_name_id_) { - RETURN_STATUS_UNEXPECTED("The column name or column order is not the same with previous dataset."); - } + // Compare the data type and data rank with these in child[0] int32_t index = 0; for (auto item : new_row) { if ((item->type() != data_type_[index]) || item->Rank() != data_rank_[index++]) { @@ -141,5 +131,25 @@ Status ConcatOp::PrepareNodePostAction() { tree_->AddToRepeatStack(shared_from_this()); return Status::OK(); } + +// We need to overwrite the super class ComputeColMap here because the number of children is more than 1. +Status ConcatOp::ComputeColMap() { + if (column_name_id_map_.empty()) { + // Obtain columns_name_id_map from child_[0] + column_name_id_map_ = child_[0]->column_name_id_map(); + if (column_name_id_map_.empty()) { + RETURN_STATUS_UNEXPECTED("Child column name map cannot be empty!"); + } + // Verify all children have the same column name map + for (int32_t i = 0; i < child_.size(); ++i) { + if (child_[i]->column_name_id_map() != column_name_id_map_) { + RETURN_STATUS_UNEXPECTED("The column name or column order is not the same with previous dataset."); + } + } + } else { + MS_LOG(WARNING) << "Column name map is already set!"; + } + return Status::OK(); +} } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/dataset/engine/datasetops/concat_op.h b/mindspore/ccsrc/dataset/engine/datasetops/concat_op.h index 0fb8ec8362..4bcfdbf6c6 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/concat_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/concat_op.h @@ -85,6 +85,10 @@ class ConcatOp : public PipelineOp { // @return Name of the current Op std::string Name() const override { return "ConcatOp"; } + // Private function for computing the assignment of the column name map. + // @return - Status + Status ComputeColMap() override; + private: Status Verify(int32_t id, const std::unique_ptr &buf); diff --git a/mindspore/ccsrc/dataset/engine/datasetops/dataset_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/dataset_op.cc index bf991ea7d9..d5625c8d06 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/dataset_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/dataset_op.cc @@ -39,8 +39,7 @@ DatasetOp::DatasetOp(int32_t op_connector_size) tree_(nullptr), state_(OpState::kDeOpIdle), op_ctrl_flags_(kDeOpNone), - out_connector_(nullptr), - first_fetch_(true) { + out_connector_(nullptr) { // The operator starts out with an invalid operator id. The only way to // get it out of invalid state is to assign the operator to an execution tree. } @@ -240,6 +239,10 @@ Status DatasetOp::PrepareNodePostAction() { RETURN_IF_NOT_OK(out_connector_->Register(tree_->AllTasks())); } RETURN_IF_NOT_OK(this->RegisterWorkerConnectors()); + + // Generate the column name map for the current op. + RETURN_IF_NOT_OK(this->ComputeColMap()); + return Status::OK(); } @@ -262,30 +265,21 @@ std::string DatasetOp::ColumnNameMapAsString() const { return outStr; } -// A helper function for providing assignment of the column name map. -// This grabs the map from child 0 and assigns it into this op. -// Can only be used if number of children is 1. -Status DatasetOp::AssignColMapFromChild() { +// Computing the assignment of the column name map. +// This just inherits the column map from its first child, can only be used if the number of children is 1. +// Operations changing the column map must overwrite this function. +Status DatasetOp::ComputeColMap() { if (child_.size() > 1) { RETURN_STATUS_UNEXPECTED("Assigning column name map from child only works for single-child operators."); } - // Assign the correct column name map to this op by taking it from the input child. - // This must be done AFTER the first fetch, but only needs to be done once by the first worker to - // do the first fetch. - if (first_fetch_) { - // If there was a single worker, or this is being called from a master thread in a parallel op, - // then the mutex is not really needed here, although it's harmless. - std::unique_lock lock(column_name_map_mutex_); - // If the map has not been set up yet, then we are the first one in to set it up. The first_fetch_ (dirty read) - // bool allows us to avoid acquiring the lock if the map has already been set. + if (column_name_id_map_.empty()) { + column_name_id_map_ = child_[0]->column_name_id_map(); if (column_name_id_map_.empty()) { - column_name_id_map_ = child_[0]->column_name_id_map(); - first_fetch_ = false; - if (column_name_id_map_.empty()) { - RETURN_STATUS_UNEXPECTED("Child column name map cannot be empty!"); - } + RETURN_STATUS_UNEXPECTED("Child column name map cannot be empty!"); } - MS_LOG(DEBUG) << "Setting column map after first fetch:\n" << DatasetOp::ColumnNameMapAsString(); + MS_LOG(DEBUG) << "Setting column map:\n" << DatasetOp::ColumnNameMapAsString(); + } else { + MS_LOG(WARNING) << "Column name map is already set!"; } return Status::OK(); } diff --git a/mindspore/ccsrc/dataset/engine/datasetops/dataset_op.h b/mindspore/ccsrc/dataset/engine/datasetops/dataset_op.h index 973b5be962..c444004b79 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/dataset_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/dataset_op.h @@ -277,11 +277,12 @@ class DatasetOp : public std::enable_shared_from_this { // @param parent - The parent node to remove void RemoveParent(DatasetOp *parent); - // A helper function for providing an assignment of the column name map. - // This grabs the map from child 0 and assigns it into this op. - // Can only be used if number of children is 1. + // Compute the current op's column map using its child's column map. + // Get called during the tree post-prepare phase in PrepareNodePostAction. + // This base implementation just inherits the map from child 0, and can only be used if the number of children is 1. + // Operations changing the column map it inherits from the child must overwrite this function. // @return - Status - Status AssignColMapFromChild(); + virtual Status ComputeColMap(); std::vector> child_; // Child nodes std::vector parent_; // Parent nodes. No ownership @@ -292,7 +293,6 @@ class DatasetOp : public std::enable_shared_from_this { uint32_t op_ctrl_flags_; // Flags for the operator std::unique_ptr out_connector_; // Output Connector std::unordered_map column_name_id_map_; // Mapping between col index and col name - bool first_fetch_; // For use when setting column map std::mutex column_name_map_mutex_; // For protecting shared access to the column map private: diff --git a/mindspore/ccsrc/dataset/engine/datasetops/filter_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/filter_op.cc index 26b99080c8..a1c5ed0070 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/filter_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/filter_op.cc @@ -126,9 +126,6 @@ Status FilterOp::WorkerEntry(int32_t worker_id) { continue; } - // Now that the first fetch is in, use the helper function to assign the column name map to this op. - RETURN_IF_NOT_OK(DatasetOp::AssignColMapFromChild()); - RETURN_IF_NOT_OK(CheckColumns(in_buffer.get(), &in_columns_)); // if the databuffer was all filtered, it is marked as kFilterEmpty. diff --git a/mindspore/ccsrc/dataset/engine/datasetops/map_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/map_op.cc index 9918260201..053559f88b 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/map_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/map_op.cc @@ -156,14 +156,15 @@ Status MapOp::WorkerEntry(int32_t worker_id) { // initializations that happen after the first fetch. RETURN_IF_NOT_OK(FetchNextBuffer(&in_buffer, worker_id)); - // Initialize details related to column selections and column map by calling WorkerEntryInit. - // WorkerEntryInit contains thread-safe lock to ensure that this init work is only performed once - // by the first worker to enter the codepath. All other threads will share the const info that - // gets set up here going forward. + // Sanity check the databuffer. // Special case: if there's more threads than buffers, some threads simply get the final control - // messages (eoe/eof), and so they will not perform the init work. + // messages (eoe/eof), and so they will not perform the check. if (!in_buffer->eoe() && !in_buffer->eof()) { - RETURN_IF_NOT_OK(WorkerEntryInit(in_buffer.get())); + int32_t num_rows = in_buffer->NumRows(); + int32_t num_cols = in_buffer->NumCols(); + if (num_rows == 0 || num_cols == 0) { + RETURN_STATUS_UNEXPECTED("MapOp is getting an empty DataBuffer."); + } } // Now that init work is done, drop into the main fetching loop. @@ -258,63 +259,18 @@ Status MapOp::WorkerCompute(DataBuffer *in_buffer, TensorQTable *new_tensor_tabl return Status::OK(); } -// initialize some internal data structure used by WorkerEntry() -Status MapOp::WorkerEntryInit(const DataBuffer *in_buf) { - int32_t num_rows = in_buf->NumRows(); - int32_t num_cols = in_buf->NumCols(); - if (num_rows == 0 || num_cols == 0) { - RETURN_STATUS_UNEXPECTED("MapOp is getting an empty DataBuffer."); +Status MapOp::ComputeColMap() { + // If the map has not been set up yet in the base class, then set it up + if (column_name_id_map_.empty()) { + std::unordered_map current_name_id_map = child_[0]->column_name_id_map(); + // Initialize private variables + RETURN_IF_NOT_OK(InitPrivateVariable(¤t_name_id_map)); + // Create the final column name to index mapping in the base class field + CreateFinalColMap(¤t_name_id_map); + MS_LOG(DEBUG) << "Column name map for map op set: " << this->ColumnNameMapAsString(); + } else { + MS_LOG(WARNING) << "Column name map is already set!"; } - - // We can't use AssignColMapFromChild() here since we need to modify the column map. We need to be threadsafe - // though for saving the final map in the op, so use the lock here. - if (first_fetch_) { - std::unique_lock lock(column_name_map_mutex_); - // If the map has not been set up yet in the base class, then we are the first one in to set it up - // (and we are under protection of the mutex lock) - if (column_name_id_map_.empty()) { - std::unordered_map current_name_id_map = child_[0]->column_name_id_map(); - - // If input_columns is empty(), The col at index-0 will be picked. - if (in_columns_.empty()) { - for (const auto &pair : current_name_id_map) { - if (pair.second == 0) { - MS_LOG(INFO) << "Input columns empty for map op, will apply to the first column in the current table."; - in_columns_.push_back(pair.first); - break; - } - } - - // If caller didn't specify the out_col_names, assume they are same as the input_columns. - // This was done in the constructor, but if input columns was empty to start we have to redo it here. - if (out_columns_.empty() || out_columns_[0].empty()) { - out_columns_ = in_columns_; - } - } - - // Before we continue, issue a sanity check to make sure the input columns from user and the incoming - // columns from child are correct - RETURN_IF_NOT_OK(this->ValidateInColumns(current_name_id_map)); - - // initialize keep_input_columns, true means to keep the column. - keep_input_columns_.resize(num_cols, true); - for (const auto &col_name : in_columns_) { - int32_t missed = current_name_id_map[col_name]; - keep_input_columns_[missed] = false; - } - - // initialize to_process_indices. - for (const auto &col_name : in_columns_) { - to_process_indices_.push_back(current_name_id_map[col_name]); - } - - // Create the final column name to index mapping in the base class field - CreateFinalColMap(¤t_name_id_map); - first_fetch_ = false; - } - } // mutex lock will release here - - MS_LOG(DEBUG) << "Column name map for map op set: " << this->ColumnNameMapAsString(); return Status::OK(); } @@ -330,6 +286,42 @@ Status MapOp::ValidateInColumns(const std::unordered_map & return Status::OK(); } +Status MapOp::InitPrivateVariable(std::unordered_map *col_name_id_map) { + // If input_columns is empty(), The col at index-0 will be picked. + if (in_columns_.empty()) { + for (const auto &pair : *col_name_id_map) { + if (pair.second == 0) { + MS_LOG(INFO) << "Input columns empty for map op, will apply to the first column in the current table."; + in_columns_.push_back(pair.first); + break; + } + } + + // If caller didn't specify the out_col_names, assume they are same as the input_columns. + // This was done in the constructor, but if input columns was empty to start we have to redo it here. + if (out_columns_.empty() || out_columns_[0].empty()) { + out_columns_ = in_columns_; + } + } + + // Before we continue, issue a sanity check to make sure the input columns from user and the incoming + // columns from child are correct + RETURN_IF_NOT_OK(this->ValidateInColumns(*col_name_id_map)); + + // initialize keep_input_columns, true means to keep the column. + keep_input_columns_.resize(col_name_id_map->size(), true); + for (const auto &col_name : in_columns_) { + int32_t missed = (*col_name_id_map)[col_name]; + keep_input_columns_[missed] = false; + } + + // initialize to_process_indices. + for (const auto &col_name : in_columns_) { + to_process_indices_.push_back((*col_name_id_map)[col_name]); + } + return Status::OK(); +} + // Create the final column name to index mapping and get indices of the columns this mapop does not use. void MapOp::CreateFinalColMap(std::unordered_map *col_name_id_map) { std::unordered_map final_col_name_id_map; diff --git a/mindspore/ccsrc/dataset/engine/datasetops/map_op.h b/mindspore/ccsrc/dataset/engine/datasetops/map_op.h index 4d7ffd1204..94569bd41f 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/map_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/map_op.h @@ -258,15 +258,18 @@ class MapOp : public ParallelOp { // @param col_name_id_map The column name to index mapping obtained from child operator void CreateFinalColMap(std::unordered_map *col_name_id_map); - // Private function that initialize some internal data structure used by WorkerEntry() - // @param in_buf A raw pointer to the DataBuffer. A raw pointer is fine because this function does not manage memory - // and is not shared with other threads. - Status WorkerEntryInit(const DataBuffer *in_buf); - // Validating if each of the input_columns exists in the DataBuffer. // @param - the column map to check // @return - status return code Status ValidateInColumns(const std::unordered_map &col_name_id_map); + + // Private function for computing the assignment of the column name map. + // @return - Status + Status ComputeColMap() override; + + // Private function for initializing private variables such as in_columns_, out_columns_. + // @return - Status + Status InitPrivateVariable(std::unordered_map *col_name_id_map); }; } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/dataset/engine/datasetops/project_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/project_op.cc index 063c618aeb..14b064bab9 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/project_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/project_op.cc @@ -74,24 +74,6 @@ void ProjectOp::Print(std::ostream &out, bool show_all) const { Status ProjectOp::GetNextBuffer(std::unique_ptr *p_buffer, int32_t worker_id, bool retry_if_eoe) { RETURN_IF_NOT_OK(child_[0]->GetNextBuffer(p_buffer, worker_id, retry_if_eoe)); if (!((*p_buffer)->eoe()) && !((*p_buffer)->eof())) { - // Only for the first buffer fetched, get the column map of the incoming data and save it - // into our own column name map after making the appropriate mods - // We cannot use the super class AssignColMapFromChild here because we're making a modification of the - // map from the child map. - if (first_fetch_) { - std::unordered_map child_column_name_mapping = child_[0]->column_name_id_map(); - for (size_t i = 0; i < columns_to_project_.size(); i++) { - std::string ¤t_column = columns_to_project_[i]; - if (child_column_name_mapping.find(current_column) == child_column_name_mapping.end()) { - std::string err_msg = "ProjectOp: column " + current_column + " does not exist in child operator."; - RETURN_STATUS_UNEXPECTED(err_msg); - } - // Setup the new column name mapping for ourself (base class field) - column_name_id_map_[current_column] = i; - projected_column_indices_.push_back(child_column_name_mapping[current_column]); - } - first_fetch_ = false; // we only need to do this path once - } RETURN_IF_NOT_OK(Project(p_buffer)); } return Status::OK(); @@ -151,5 +133,27 @@ Status ProjectOp::Accept(NodePass *p, bool *modified) { // Downcast shared pointer then call visitor return p->RunOnNode(std::static_pointer_cast(shared_from_this()), modified); } + +// Compute the column map and save it into our own column name map +// We cannot use the super class ComputeColMap here because we're making a modification of the +// map from the child map. +Status ProjectOp::ComputeColMap() { + if (column_name_id_map_.empty()) { + std::unordered_map child_column_name_mapping = child_[0]->column_name_id_map(); + for (size_t i = 0; i < columns_to_project_.size(); i++) { + std::string ¤t_column = columns_to_project_[i]; + if (child_column_name_mapping.find(current_column) == child_column_name_mapping.end()) { + std::string err_msg = "ProjectOp: column " + current_column + " does not exist in child operator."; + RETURN_STATUS_UNEXPECTED(err_msg); + } + // Setup the new column name mapping for ourself (base class field) + column_name_id_map_[current_column] = i; + projected_column_indices_.push_back(child_column_name_mapping[current_column]); + } + } else { + MS_LOG(WARNING) << "Column name map is already set!"; + } + return Status::OK(); +} } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/dataset/engine/datasetops/project_op.h b/mindspore/ccsrc/dataset/engine/datasetops/project_op.h index ced0f9e5a9..628c1342ba 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/project_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/project_op.h @@ -116,6 +116,10 @@ class ProjectOp : public PipelineOp { std::vector projected_column_indices_; Status Project(std::unique_ptr *data_buffer); + + // Computing the assignment of the column name map. + // @return - Status + Status ComputeColMap() override; }; } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/dataset/engine/datasetops/rename_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/rename_op.cc index c026aac4fa..bebca780ff 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/rename_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/rename_op.cc @@ -69,12 +69,6 @@ Status RenameOp::operator()() { RETURN_STATUS_UNEXPECTED(err_msg); } - // First, populate the column map from the input child. - // This will not be the final map for output from this op. - RETURN_IF_NOT_OK(DatasetOp::AssignColMapFromChild()); - // core rename functionality only needs to happen once, to identify the new column names/indexes - RETURN_IF_NOT_OK(RenameColumns()); - while (curr_buffer->eof() == false) { while (curr_buffer->eoe() == false) { // push the renamed input buffer @@ -95,45 +89,52 @@ Status RenameOp::operator()() { return Status::OK(); } -// renames the columns -Status RenameOp::RenameColumns() { - // iterate over my index in input vector, find the corresponding position - std::unordered_map new_col_name_id_map = {}; - // parameter for input check - size_t found = 0; - - // iterate over all the pairs and if there is a name match with rename, rename the column and add it to new map - // by doing it this way we recreate a new ColNameIdMap and allow for switching - for (const auto &pair : column_name_id_map_) { - std::string name = pair.first; - int32_t id = pair.second; - // find name - std::vector::iterator it; - it = std::find(in_columns_.begin(), in_columns_.end(), name); - // for c input checks here we have to count the number of times we find the stuff in in_columns_ - // because we iterate over the mInputList n times - if (it != in_columns_.end()) { - // found - found += 1; - int index = std::distance(in_columns_.begin(), it); - MS_LOG(DEBUG) << "Rename operator index found " << index << " value " << id << "."; - - new_col_name_id_map[out_columns_[index]] = id; - } else { - // not found - MS_LOG(DEBUG) << "Rename operator index not found: " << id << " is the column id."; - new_col_name_id_map[name] = id; +// Rename core functionality to compute the new column name id map. +// We need to overwrite the super class ComputeColMap here because we're making a modification of the +// map from the child map. +Status RenameOp::ComputeColMap() { + if (column_name_id_map_.empty()) { + column_name_id_map_ = child_[0]->column_name_id_map(); + // iterate over my index in input vector, find the corresponding position + std::unordered_map new_col_name_id_map = {}; + // parameter for input check + size_t found = 0; + + // iterate over all the pairs and if there is a name match with rename, rename the column and add it to new map + // by doing it this way we recreate a new ColNameIdMap and allow for switching + for (const auto &pair : column_name_id_map_) { + std::string name = pair.first; + int32_t id = pair.second; + // find name + std::vector::iterator it; + it = std::find(in_columns_.begin(), in_columns_.end(), name); + // for c input checks here we have to count the number of times we find the stuff in in_columns_ + // because we iterate over the mInputList n times + if (it != in_columns_.end()) { + // found + found += 1; + int index = std::distance(in_columns_.begin(), it); + MS_LOG(DEBUG) << "Rename operator index found " << index << " value " << id << "."; + + new_col_name_id_map[out_columns_[index]] = id; + } else { + // not found + MS_LOG(DEBUG) << "Rename operator index not found: " << id << " is the column id."; + new_col_name_id_map[name] = id; + } + } + // only checks number of renamed columns have been found, this input check doesn't check everything + if (found != in_columns_.size()) { + MS_LOG(DEBUG) << "Rename operator column names found: " << found << " out of " << in_columns_.size() << "."; + std::string err_msg = "Renamed column doesn't exist in dataset"; + RETURN_STATUS_UNEXPECTED(err_msg); } - } - // only checks number of renamed columns have been found, this input check doesn't check everything - if (found != in_columns_.size()) { - MS_LOG(DEBUG) << "Rename operator column names found: " << found << " out of " << in_columns_.size() << "."; - std::string err_msg = "Renamed column doesn't exist in dataset"; - RETURN_STATUS_UNEXPECTED(err_msg); - } - // Now, overwrite our column map with the new renamed columns/id's - column_name_id_map_ = new_col_name_id_map; + // Now, overwrite our column map with the new renamed columns/id's + column_name_id_map_ = new_col_name_id_map; + } else { + MS_LOG(WARNING) << "Column name map is already set!"; + } return Status::OK(); } diff --git a/mindspore/ccsrc/dataset/engine/datasetops/rename_op.h b/mindspore/ccsrc/dataset/engine/datasetops/rename_op.h index eaca20ccc8..e209c075d6 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/rename_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/rename_op.h @@ -122,7 +122,9 @@ class RenameOp : public PipelineOp { protected: // Rename core functionality - Status RenameColumns(); + // Computing the assignment of the new column name map. + // @return - Status + Status ComputeColMap() override; // Variable to store the input column names std::vector in_columns_; diff --git a/mindspore/ccsrc/dataset/engine/datasetops/repeat_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/repeat_op.cc index 524603fd94..66e2177636 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/repeat_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/repeat_op.cc @@ -123,8 +123,6 @@ Status RepeatOp::GetNextBuffer(std::unique_ptr *p_buffer, int32_t wo if (buf->eof()) { RETURN_IF_NOT_OK(EofReceived(worker_id)); } - // Update the column name map if needed - RETURN_IF_NOT_OK(DatasetOp::AssignColMapFromChild()); *p_buffer = std::move(buf); return Status::OK(); } diff --git a/mindspore/ccsrc/dataset/engine/datasetops/shuffle_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/shuffle_op.cc index 69f7e09986..c16f3f9625 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/shuffle_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/shuffle_op.cc @@ -266,9 +266,6 @@ Status ShuffleOp::InitShuffleBuffer() { RETURN_STATUS_UNEXPECTED("Unable to fetch a single row for shuffle buffer."); } - // Now that a first fetch is done, assign the column map for this operator - RETURN_IF_NOT_OK(DatasetOp::AssignColMapFromChild()); - // Now fill the rest of the shuffle buffer until we are unable to get the next row or we reached // the desired shuffle buffer size. while (!new_row.empty() && shuffle_buffer_->size() < static_cast(shuffle_size_ - 1)) { diff --git a/mindspore/ccsrc/dataset/engine/datasetops/skip_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/skip_op.cc index 35fae8d091..c00fd486b7 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/skip_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/skip_op.cc @@ -86,9 +86,6 @@ Status SkipOp::operator()() { std::unique_ptr curr_buffer; RETURN_IF_NOT_OK(GetNextInput(&curr_buffer)); - // After the first buffer fetch above we can do the one-time assign of the column name map - RETURN_IF_NOT_OK(DatasetOp::AssignColMapFromChild()); - while (curr_buffer->eof() == false) { // Reset count skip_count_ = 0; diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/celeba_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/source/celeba_op.cc index 4b32201d6d..7889362555 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/celeba_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/celeba_op.cc @@ -79,11 +79,6 @@ CelebAOp::CelebAOp(int32_t num_workers, int32_t rows_per_buffer, const std::stri sampler_(std::move(sampler)), num_rows_in_attr_file_(0), dataset_type_(dataset_type) { - // Set the column name map (base class field) - for (int32_t index = 0; index < data_schema_->NumColumns(); index++) { - column_name_id_map_[data_schema_->column(index).name()] = index; - } - attr_info_queue_ = std::make_unique>>(queue_size); io_block_queues_.Init(num_workers_, queue_size); } @@ -413,5 +408,17 @@ Status CelebAOp::Reset() { wp_.Set(); // wake up master thread after reset is done return Status::OK(); } + +Status CelebAOp::ComputeColMap() { + // Set the column name map (base class field) + if (column_name_id_map_.empty()) { + for (int32_t index = 0; index < data_schema_->NumColumns(); index++) { + column_name_id_map_[data_schema_->column(index).name()] = index; + } + } else { + MS_LOG(WARNING) << "Column name map is already set!"; + } + return Status::OK(); +} } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/celeba_op.h b/mindspore/ccsrc/dataset/engine/datasetops/source/celeba_op.h index f4b5d040ca..f8a49dabb2 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/celeba_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/celeba_op.h @@ -212,6 +212,10 @@ class CelebAOp : public ParallelOp, RandomAccessOp { // @return Status - The error code return Status Reset() override; + // Private function for computing the assignment of the column name map. + // @return - Status + Status ComputeColMap() override; + int32_t rows_per_buffer_; std::string folder_path_; // directory of celeba folder bool decode_; diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/cifar_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/source/cifar_op.cc index ad87e394eb..e7c418b146 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/cifar_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/cifar_op.cc @@ -87,10 +87,6 @@ CifarOp::CifarOp(CifarType type, int32_t num_works, int32_t rows_per_buf, const sampler_(std::move(sampler)), row_cnt_(0), buf_cnt_(0) { - // set the column name map (base class field) - for (uint32_t i = 0; i < data_schema_->NumColumns(); ++i) { - column_name_id_map_[data_schema_->column(i).name()] = i; - } constexpr uint64_t kUtilQueueSize = 512; cifar_raw_data_block_ = std::make_unique>>(kUtilQueueSize); io_block_queues_.Init(num_workers_, queue_size); @@ -454,5 +450,17 @@ Status CifarOp::CountTotalRows(const std::string &dir, bool isCIFAR10, int64_t * return Status::OK(); } } + +Status CifarOp::ComputeColMap() { + // set the column name map (base class field) + if (column_name_id_map_.empty()) { + for (uint32_t i = 0; i < data_schema_->NumColumns(); ++i) { + column_name_id_map_[data_schema_->column(i).name()] = i; + } + } else { + MS_LOG(WARNING) << "Column name map is already set!"; + } + return Status::OK(); +} } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/cifar_op.h b/mindspore/ccsrc/dataset/engine/datasetops/source/cifar_op.h index 62c20ac401..21ed80ceab 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/cifar_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/cifar_op.h @@ -208,6 +208,10 @@ class CifarOp : public ParallelOp, public RandomAccessOp { // @return Status - The error code return Status GetClassIds(std::map> *cls_ids) const override; + // Private function for computing the assignment of the column name map. + // @return - Status + Status ComputeColMap() override; + CifarType cifar_type_; int32_t rows_per_buffer_; std::string folder_path_; diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/clue_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/source/clue_op.cc index e92ca0d26c..d863de15ad 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/clue_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/clue_op.cc @@ -112,13 +112,6 @@ Status ClueOp::Init() { int32_t safe_queue_size = static_cast(std::ceil(clue_files_list_.size() / num_workers_) + 1); io_block_queues_.Init(num_workers_, safe_queue_size); - // Set the column name mapping (base class field) - int count = 0; - for (auto &p : cols_to_keyword_) { - column_name_id_map_[p.first] = count; - count++; - } - RETURN_IF_NOT_OK(ParallelOp::CreateWorkerConnector(worker_connector_size_)); jagged_buffer_connector_ = std::make_unique(num_workers_, 1, worker_connector_size_); @@ -549,5 +542,19 @@ Status ClueOp::CountAllFileRows(const std::vector &files, int64_t * } return Status::OK(); } + +Status ClueOp::ComputeColMap() { + // Set the column name mapping (base class field) + if (column_name_id_map_.empty()) { + int count = 0; + for (auto &p : cols_to_keyword_) { + column_name_id_map_[p.first] = count; + count++; + } + } else { + MS_LOG(WARNING) << "Column name map is already set!"; + } + return Status::OK(); +} } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/clue_op.h b/mindspore/ccsrc/dataset/engine/datasetops/source/clue_op.h index b6a797d3f4..f41abd020c 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/clue_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/clue_op.h @@ -263,6 +263,10 @@ class ClueOp : public ParallelOp { // @return Status - the error code returned. Status GetValue(const nlohmann::json &js, std::vector key_chain, std::shared_ptr *t); + // Private function for computing the assignment of the column name map. + // @return - Status + Status ComputeColMap() override; + int32_t device_id_; bool shuffle_files_; bool shuffle_global_; diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/coco_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/source/coco_op.cc index 8d352bbd6c..92f6794769 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/coco_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/coco_op.cc @@ -129,10 +129,6 @@ CocoOp::CocoOp(const TaskType &task_type, const std::string &image_folder_path, rows_per_buffer_(rows_per_buffer), sampler_(std::move(sampler)), data_schema_(std::move(data_schema)) { - // Set the column name map (base class field) - for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) { - column_name_id_map_[data_schema_->column(i).name()] = i; - } io_block_queues_.Init(num_workers_, queue_size); } @@ -627,5 +623,17 @@ Status CocoOp::GetClassIndexing(const std::string &dir, const std::string &file, *output_class_indexing = op->label_index_; return Status::OK(); } + +Status CocoOp::ComputeColMap() { + // Set the column name map (base class field) + if (column_name_id_map_.empty()) { + for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) { + column_name_id_map_[data_schema_->column(i).name()] = i; + } + } else { + MS_LOG(WARNING) << "Column name map is already set!"; + } + return Status::OK(); +} } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/coco_op.h b/mindspore/ccsrc/dataset/engine/datasetops/source/coco_op.h index f5abeed72e..3791853798 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/coco_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/coco_op.h @@ -306,6 +306,10 @@ class CocoOp : public ParallelOp, public RandomAccessOp { template Status SearchNodeInJson(nlohmann::json input_tree, std::string node_name, T *output_node); + // Private function for computing the assignment of the column name map. + // @return - Status + Status ComputeColMap() override; + bool decode_; int64_t row_cnt_; int64_t buf_cnt_; diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/generator_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/source/generator_op.cc index d316524c04..eb5ba32642 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/generator_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/generator_op.cc @@ -94,12 +94,6 @@ void GeneratorOp::Dealloc() noexcept { Status GeneratorOp::Init() { // Reset BufferID buffer_id_ = 0; - // Setup column names map (base class field) - if (column_name_id_map_.empty()) { - for (int i = 0; i < column_names_.size(); ++i) { - column_name_id_map_[column_names_[i]] = i; - } - } Status ret; { // Acquire Python GIL @@ -257,5 +251,17 @@ Status GeneratorOp::Accept(NodePass *p, bool *modified) { // Downcast shared pointer then call visitor return p->RunOnNode(std::static_pointer_cast(shared_from_this()), modified); } + +Status GeneratorOp::ComputeColMap() { + // Setup column names map (base class field) + if (column_name_id_map_.empty()) { + for (int i = 0; i < column_names_.size(); ++i) { + column_name_id_map_[column_names_[i]] = i; + } + } else { + MS_LOG(WARNING) << "Column name map is already set!"; + } + return Status::OK(); +} } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/generator_op.h b/mindspore/ccsrc/dataset/engine/datasetops/source/generator_op.h index 82b395d6de..98dd2d70a1 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/generator_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/generator_op.h @@ -150,6 +150,10 @@ class GeneratorOp : public PipelineOp { Status PyRowToTensorRow(py::object py_data, TensorRow *tensor_row); Status FillBuffer(TensorQTable *tt); + + // Private function for computing the assignment of the column name map. + // @return - Status + Status ComputeColMap() override; }; #pragma GCC visibility pop diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/image_folder_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/source/image_folder_op.cc index 5cdfa8bb76..b2611a67fc 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/image_folder_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/image_folder_op.cc @@ -78,10 +78,6 @@ ImageFolderOp::ImageFolderOp(int32_t num_wkrs, int32_t rows_per_buffer, std::str buf_cnt_(0), sampler_ind_(0), dirname_offset_(0) { - // Set the column name map (base class field) - for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) { - column_name_id_map_[data_schema_->column(i).name()] = i; - } folder_name_queue_ = std::make_unique>(num_wkrs * queue_size); image_name_queue_ = std::make_unique>(num_wkrs * queue_size); io_block_queues_.Init(num_workers_, queue_size); @@ -418,5 +414,17 @@ Status ImageFolderOp::Accept(NodePass *p, bool *modified) { // Downcast shared pointer then call visitor return p->RunOnNode(std::static_pointer_cast(shared_from_this()), modified); } + +Status ImageFolderOp::ComputeColMap() { + // Set the column name map (base class field) + if (column_name_id_map_.empty()) { + for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) { + column_name_id_map_[data_schema_->column(i).name()] = i; + } + } else { + MS_LOG(WARNING) << "Column name map is already set!"; + } + return Status::OK(); +} } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/image_folder_op.h b/mindspore/ccsrc/dataset/engine/datasetops/source/image_folder_op.h index e1d578e034..06f39deee0 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/image_folder_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/image_folder_op.h @@ -248,6 +248,10 @@ class ImageFolderOp : public ParallelOp, public RandomAccessOp { // @return Status - The error code return Status Reset() override; + // Private function for computing the assignment of the column name map. + // @return - Status + Status ComputeColMap() override; + int32_t rows_per_buffer_; std::string folder_path_; // directory of image folder bool recursive_; diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/manifest_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/source/manifest_op.cc index 0762f36d5a..e26bb7de65 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/manifest_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/manifest_op.cc @@ -76,10 +76,6 @@ ManifestOp::ManifestOp(int32_t num_works, int32_t rows_per_buffer, std::string f decode_(decode), usage_(usage), buf_cnt_(0) { - // Set the column name map (base class field) - for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) { - column_name_id_map_[data_schema_->column(i).name()] = i; - } io_block_queues_.Init(num_workers_, queue_size); (void)std::transform(usage_.begin(), usage_.end(), usage_.begin(), ::tolower); } @@ -420,5 +416,17 @@ Status ManifestOp::GetClassIndexing(const std::string &file, const py::dict &dic return Status::OK(); } + +Status ManifestOp::ComputeColMap() { + // Set the column name map (base class field) + if (column_name_id_map_.empty()) { + for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) { + column_name_id_map_[data_schema_->column(i).name()] = i; + } + } else { + MS_LOG(WARNING) << "Column name map is already set!"; + } + return Status::OK(); +} } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/manifest_op.h b/mindspore/ccsrc/dataset/engine/datasetops/source/manifest_op.h index edfdbb51ae..1bdf683084 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/manifest_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/manifest_op.h @@ -219,6 +219,10 @@ class ManifestOp : public ParallelOp, public RandomAccessOp { // @return Status - The error code return Status CountDatasetInfo(); + // Private function for computing the assignment of the column name map. + // @return - Status + Status ComputeColMap() override; + int32_t rows_per_buffer_; int64_t io_block_pushed_; int64_t row_cnt_; diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/mindrecord_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/source/mindrecord_op.cc index 0f762386af..3c95b9b054 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/mindrecord_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/mindrecord_op.cc @@ -196,10 +196,6 @@ Status MindRecordOp::Init() { data_schema_ = std::move(tmp_schema); } - for (int i = 0; i < static_cast(columns_to_load_.size()); i++) { - column_name_id_map_[columns_to_load_[i]] = i; - } - return Status::OK(); } @@ -502,5 +498,16 @@ Status MindRecordOp::Accept(NodePass *p, bool *modified) { // Downcast shared pointer then call visitor return p->RunOnNode(std::static_pointer_cast(shared_from_this()), modified); } + +Status MindRecordOp::ComputeColMap() { + if (column_name_id_map_.empty()) { + for (int i = 0; i < static_cast(columns_to_load_.size()); i++) { + column_name_id_map_[columns_to_load_[i]] = i; + } + } else { + MS_LOG(WARNING) << "Column name map is already set!"; + } + return Status::OK(); +} } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/mindrecord_op.h b/mindspore/ccsrc/dataset/engine/datasetops/source/mindrecord_op.h index b704240aaa..af405a8f5b 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/mindrecord_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/mindrecord_op.h @@ -234,6 +234,10 @@ class MindRecordOp : public ParallelOp { Status FetchBlockBuffer(const int32_t &buffer_id); + // Private function for computing the assignment of the column name map. + // @return - Status + Status ComputeColMap() override; + int32_t rows_per_buffer_; // The number of requested rows per buffer. std::vector dataset_file_; // dataset files bool load_dataset_; // load dataset from single file or not diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/mnist_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/source/mnist_op.cc index eacd9daf75..67e7757da5 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/mnist_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/mnist_op.cc @@ -73,10 +73,6 @@ MnistOp::MnistOp(int32_t num_workers, int32_t rows_per_buffer, std::string folde rows_per_buffer_(rows_per_buffer), sampler_(std::move(sampler)), data_schema_(std::move(data_schema)) { - // set the column name map (base class field) - for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) { - column_name_id_map_[data_schema_->column(i).name()] = i; - } io_block_queues_.Init(num_workers, queue_size); } @@ -432,5 +428,17 @@ Status MnistOp::CountTotalRows(const std::string &dir, int64_t *count) { return Status::OK(); } + +Status MnistOp::ComputeColMap() { + // set the column name map (base class field) + if (column_name_id_map_.empty()) { + for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) { + column_name_id_map_[data_schema_->column(i).name()] = i; + } + } else { + MS_LOG(WARNING) << "Column name map is already set!"; + } + return Status::OK(); +} } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/mnist_op.h b/mindspore/ccsrc/dataset/engine/datasetops/source/mnist_op.h index 909ac22124..c22ee24acd 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/mnist_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/mnist_op.h @@ -226,6 +226,10 @@ class MnistOp : public ParallelOp, public RandomAccessOp { // @return Status - The error code return Status Reset() override; + // Private function for computing the assignment of the column name map. + // @return - Status + Status ComputeColMap() override; + int64_t buf_cnt_; int64_t row_cnt_; WaitPost wp_; diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/random_data_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/source/random_data_op.cc index 9e3d1140a7..afd7ebcc55 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/random_data_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/random_data_op.cc @@ -53,9 +53,6 @@ Status RandomDataOp::Builder::Build(std::shared_ptr *out_op) { RETURN_IF_NOT_OK((*out_op)->GenerateSchema()); } - // Extract the column name mapping from the schema and save it in the class. - RETURN_IF_NOT_OK((*out_op)->data_schema_->GetColumnNameMap(&((*out_op)->column_name_id_map_))); - return Status::OK(); } @@ -405,5 +402,15 @@ Status RandomDataOp::Reset() { return Status::OK(); } + +Status RandomDataOp::ComputeColMap() { + // Extract the column name mapping from the schema and save it in the class. + if (column_name_id_map_.empty()) { + RETURN_IF_NOT_OK(data_schema_->GetColumnNameMap(&(column_name_id_map_))); + } else { + MS_LOG(WARNING) << "Column name map is already set!"; + } + return Status::OK(); +} } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/random_data_op.h b/mindspore/ccsrc/dataset/engine/datasetops/source/random_data_op.h index 48cfb0be51..020c9a6e09 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/random_data_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/random_data_op.h @@ -250,6 +250,10 @@ class RandomDataOp : public ParallelOp { return ++buffer_id_; } + // Private function for computing the assignment of the column name map. + // @return - Status + Status ComputeColMap() override; + int32_t buffer_id_; int64_t rows_per_buffer_; int64_t total_rows_; diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/text_file_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/source/text_file_op.cc index 26058cc8b8..5ae950b803 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/text_file_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/text_file_op.cc @@ -127,11 +127,6 @@ Status TextFileOp::Init() { int32_t safe_queue_size = static_cast(std::ceil(text_files_list_.size() / num_workers_) + 1); io_block_queues_.Init(num_workers_, safe_queue_size); - // Set the column name mapping (base class field) - for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) { - column_name_id_map_[data_schema_->column(i).name()] = i; - } - RETURN_IF_NOT_OK(ParallelOp::CreateWorkerConnector(worker_connector_size_)); jagged_buffer_connector_ = std::make_unique(num_workers_, 1, worker_connector_size_); @@ -488,5 +483,17 @@ Status TextFileOp::CountAllFileRows(const std::vector &files, int64 } return Status::OK(); } + +Status TextFileOp::ComputeColMap() { + // Set the column name mapping (base class field) + if (column_name_id_map_.empty()) { + for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) { + column_name_id_map_[data_schema_->column(i).name()] = i; + } + } else { + MS_LOG(WARNING) << "Column name map is already set!"; + } + return Status::OK(); +} } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/text_file_op.h b/mindspore/ccsrc/dataset/engine/datasetops/source/text_file_op.h index dd258d914e..31224cb299 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/text_file_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/text_file_op.h @@ -264,6 +264,10 @@ class TextFileOp : public ParallelOp { // @return Status - the error code returned. Status PostEndOfEpoch(int32_t queue_index); + // Private function for computing the assignment of the column name map. + // @return - Status + Status ComputeColMap() override; + int32_t device_id_; int32_t num_devices_; int64_t rows_per_buffer_; diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/tf_reader_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/source/tf_reader_op.cc index 23dce8dc10..8b92d19249 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/tf_reader_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/tf_reader_op.cc @@ -195,11 +195,6 @@ Status TFReaderOp::Init() { RETURN_IF_NOT_OK(CreateSchema(dataset_files_list_[0], columns_to_load_)); } - // Construct the column name map for this operator (base class field) - for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) { - column_name_id_map_[data_schema_->column(i).name()] = i; - } - if (total_rows_ == 0) { total_rows_ = data_schema_->num_rows(); } @@ -1015,5 +1010,17 @@ Status TFReaderOp::Accept(NodePass *p, bool *modified) { // Downcast shared pointer then call visitor return p->RunOnNode(std::static_pointer_cast(shared_from_this()), modified); } + +Status TFReaderOp::ComputeColMap() { + // Construct the column name map for this operator (base class field) + if (column_name_id_map_.empty()) { + for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) { + column_name_id_map_[data_schema_->column(i).name()] = i; + } + } else { + MS_LOG(WARNING) << "Column name map is already set!"; + } + return Status::OK(); +} } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/tf_reader_op.h b/mindspore/ccsrc/dataset/engine/datasetops/source/tf_reader_op.h index 9c92d6d4be..417cd8bef0 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/tf_reader_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/tf_reader_op.h @@ -381,6 +381,10 @@ class TFReaderOp : public ParallelOp { // @return Status - the error code returned. Status CalculateNumRowsPerShard(); + // Private function for computing the assignment of the column name map. + // @return - Status + Status ComputeColMap() override; + int32_t device_id_; int32_t num_devices_; int64_t rows_per_buffer_; diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/voc_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/source/voc_op.cc index d3c7ff397f..5d9f0ee92c 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/voc_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/voc_op.cc @@ -99,10 +99,6 @@ VOCOp::VOCOp(const TaskType &task_type, const std::string &task_mode, const std: rows_per_buffer_(rows_per_buffer), sampler_(std::move(sampler)), data_schema_(std::move(data_schema)) { - // Set the column name map (base class field) - for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) { - column_name_id_map_[data_schema_->column(i).name()] = i; - } io_block_queues_.Init(num_workers_, queue_size); } @@ -454,5 +450,17 @@ Status VOCOp::GetClassIndexing(const std::string &dir, const std::string &task_t return Status::OK(); } + +Status VOCOp::ComputeColMap() { + // Set the column name map (base class field) + if (column_name_id_map_.empty()) { + for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) { + column_name_id_map_[data_schema_->column(i).name()] = i; + } + } else { + MS_LOG(WARNING) << "Column name map is already set!"; + } + return Status::OK(); +} } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/voc_op.h b/mindspore/ccsrc/dataset/engine/datasetops/source/voc_op.h index bce82a43c9..a0f5eba4d6 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/voc_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/voc_op.h @@ -263,6 +263,10 @@ class VOCOp : public ParallelOp, public RandomAccessOp { // @return Status - The error code return Status Reset() override; + // Private function for computing the assignment of the column name map. + // @return - Status + Status ComputeColMap() override; + bool decode_; int64_t row_cnt_; int64_t buf_cnt_; diff --git a/mindspore/ccsrc/dataset/engine/datasetops/take_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/take_op.cc index f9ff49a5b7..05c224ee2e 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/take_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/take_op.cc @@ -73,7 +73,6 @@ Status TakeOp::operator()() { TaskManager::FindMe()->Post(); std::unique_ptr buf; RETURN_IF_NOT_OK(child_[0]->GetNextBuffer(&buf)); - RETURN_IF_NOT_OK(DatasetOp::AssignColMapFromChild()); while (buf->eof() == false) { if (take_count_ == max_takes_) { diff --git a/mindspore/ccsrc/dataset/engine/datasetops/zip_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/zip_op.cc index bb438552f3..55734324fc 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/zip_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/zip_op.cc @@ -139,24 +139,6 @@ Status ZipOp::prepare(TensorQTable *const table) { // Pack this first row into our tensor table table->push_back(std::move(new_row)); - // At this point we have at least 1 row produced, so all child iterators have their column names such that we - // can produce our column name map now. - column_name_id_map_ = {}; - for (int32_t i = 0; i < children_num_; ++i) { - // Initializing col_name_id_map_ from the first data buffer. - const std::unordered_map col_name_id_map = child_iterators_[i]->GetColumnNameMap(); - int32_t colsCurrent = column_name_id_map_.size(); - // the update code below shouldn't do anything bad if the column name already exists. - for (const auto &pair : col_name_id_map) { - std::string name = pair.first; - int32_t old_id = pair.second; - // check if name already exists in column name descriptor - if (column_name_id_map_.count(name) == 1) { - RETURN_STATUS_UNEXPECTED("key already exists when zipping datasets"); - } - column_name_id_map_[name] = old_id + colsCurrent; - } - } return Status::OK(); } @@ -257,5 +239,30 @@ Status ZipOp::Accept(NodePass *p, bool *modified) { // Downcast shared pointer then call visitor return p->RunOnNode(std::static_pointer_cast(shared_from_this()), modified); } + +Status ZipOp::ComputeColMap() { + if (column_name_id_map_.empty()) { + column_name_id_map_ = {}; + for (int32_t i = 0; i < child_.size(); ++i) { + // Initializing col_name_id_map from the child. + const std::unordered_map col_name_id_map = child_[i]->column_name_id_map(); + int32_t colsCurrent = column_name_id_map_.size(); + // the update code below shouldn't do anything bad if the column name already exists. + for (const auto &pair : col_name_id_map) { + std::string name = pair.first; + int32_t old_id = pair.second; + // check if name already exists in column name descriptor + if (column_name_id_map_.count(name) == 1) { + RETURN_STATUS_UNEXPECTED("key already exists when zipping datasets"); + } + column_name_id_map_[name] = old_id + colsCurrent; + } + } + MS_LOG(DEBUG) << "Setting column map:\n" << this->ColumnNameMapAsString(); + } else { + MS_LOG(WARNING) << "Column name map is already set!"; + } + return Status::OK(); +} } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/dataset/engine/datasetops/zip_op.h b/mindspore/ccsrc/dataset/engine/datasetops/zip_op.h index 08b93c18b5..fad3c22eaa 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/zip_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/zip_op.h @@ -141,6 +141,10 @@ class ZipOp : public PipelineOp { // 1, a, T Status getNextTensorRow(TensorRow *const new_zip_row); + // Computing the assignment of the column name map. + // @return - Status + Status ComputeColMap() override; + int32_t children_num_; int32_t rows_per_buffer_; int32_t buffer_id_;