From 7b7b2fa7a023157a4565879581a9462cdcc93301 Mon Sep 17 00:00:00 2001 From: Jamie Nisbet Date: Thu, 7 May 2020 14:15:48 -0400 Subject: [PATCH] remove column name id map from data buffer clang format updates self-review cleanups cpplint fix review updates ci fixes --- mindspore/ccsrc/dataset/engine/data_buffer.h | 24 +-- .../ccsrc/dataset/engine/dataset_iterator.cc | 32 ++- .../ccsrc/dataset/engine/dataset_iterator.h | 16 +- .../dataset/engine/datasetops/barrier_op.cc | 10 +- .../dataset/engine/datasetops/barrier_op.h | 3 - .../dataset/engine/datasetops/batch_op.cc | 22 +- .../dataset/engine/datasetops/batch_op.h | 1 - .../dataset/engine/datasetops/dataset_op.cc | 40 +++- .../dataset/engine/datasetops/dataset_op.h | 42 +++- .../dataset/engine/datasetops/filter_op.cc | 14 +- .../dataset/engine/datasetops/filter_op.h | 5 +- .../ccsrc/dataset/engine/datasetops/map_op.cc | 200 +++++++++--------- .../ccsrc/dataset/engine/datasetops/map_op.h | 63 +++--- .../dataset/engine/datasetops/parallel_op.cc | 5 - .../dataset/engine/datasetops/pipeline_op.cc | 2 +- .../dataset/engine/datasetops/project_op.cc | 35 +-- .../dataset/engine/datasetops/project_op.h | 1 + .../dataset/engine/datasetops/rename_op.cc | 20 +- .../dataset/engine/datasetops/rename_op.h | 3 +- .../dataset/engine/datasetops/repeat_op.cc | 2 + .../dataset/engine/datasetops/repeat_op.h | 1 + .../dataset/engine/datasetops/shuffle_op.cc | 5 +- .../dataset/engine/datasetops/shuffle_op.h | 1 - .../dataset/engine/datasetops/skip_op.cc | 4 + .../engine/datasetops/source/celeba_op.cc | 4 +- .../engine/datasetops/source/celeba_op.h | 2 - .../engine/datasetops/source/cifar_op.cc | 4 +- .../engine/datasetops/source/cifar_op.h | 4 +- .../engine/datasetops/source/generator_op.cc | 7 +- .../engine/datasetops/source/generator_op.h | 2 - .../datasetops/source/image_folder_op.cc | 4 +- .../datasetops/source/image_folder_op.h | 4 +- .../engine/datasetops/source/manifest_op.cc | 4 +- .../engine/datasetops/source/manifest_op.h | 4 +- .../engine/datasetops/source/mindrecord_op.cc | 5 +- .../engine/datasetops/source/mindrecord_op.h | 2 - .../engine/datasetops/source/mnist_op.cc | 4 +- .../engine/datasetops/source/mnist_op.h | 4 +- .../datasetops/source/random_data_op.cc | 4 +- .../engine/datasetops/source/random_data_op.h | 3 - .../datasetops/source/sampler/sampler.h | 3 +- .../engine/datasetops/source/storage_op.cc | 8 +- .../engine/datasetops/source/text_file_op.cc | 5 +- .../engine/datasetops/source/text_file_op.h | 2 - .../engine/datasetops/source/tf_buffer.cc | 8 +- .../engine/datasetops/source/tf_reader_op.cc | 11 +- .../engine/datasetops/source/voc_op.cc | 4 +- .../dataset/engine/datasetops/source/voc_op.h | 2 - .../dataset/engine/datasetops/take_op.cc | 1 + .../ccsrc/dataset/engine/datasetops/zip_op.cc | 13 +- .../ccsrc/dataset/engine/datasetops/zip_op.h | 1 - 51 files changed, 353 insertions(+), 317 deletions(-) diff --git a/mindspore/ccsrc/dataset/engine/data_buffer.h b/mindspore/ccsrc/dataset/engine/data_buffer.h index c624ce4339..0053d8894d 100644 --- a/mindspore/ccsrc/dataset/engine/data_buffer.h +++ b/mindspore/ccsrc/dataset/engine/data_buffer.h @@ -17,10 +17,8 @@ #define DATASET_ENGINE_DATA_BUFFER_H_ #include -#include #include #include -#include #include #include #include "dataset/util/allocator.h" @@ -106,14 +104,6 @@ class DataBuffer { Status SliceOff(int64_t number_of_rows); - // Return a mapping from col names to col id. - std::unordered_map column_name_map() const { return column_name_map_; } - - // Update the column name to index mapping. - void set_column_name_map(const std::unordered_map &new_col_name_map) { - column_name_map_ = new_col_name_map; - } - // Replacing mTensorTable, the unique_ptr assignment will release the old TensorTable. void set_tensor_table(std::unique_ptr new_table) { tensor_table_ = std::move(new_table); } @@ -123,18 +113,10 @@ class DataBuffer { void Shuffle() {} // does nothing right now. possibly remove later - // ***** column_name_map_ manipulation methods ***** - - // Append Column to mColumnNameMap - Status AppendColumn(const std::string &name, const int32_t &old_id) const { // does nothing right now - return Status::OK(); - } - protected: - int32_t buffer_id_; // An id for the buffer. - std::unique_ptr tensor_table_; // A table (row major) of Tensors - BufferFlags buffer_flags_; // bit mask for various buffer properties - std::unordered_map column_name_map_; // A mapping between column index to column name. + int32_t buffer_id_; // An id for the buffer. + std::unique_ptr tensor_table_; // A table (row major) of Tensors + BufferFlags buffer_flags_; // bit mask for various buffer properties }; } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/dataset/engine/dataset_iterator.cc b/mindspore/ccsrc/dataset/engine/dataset_iterator.cc index a9cbf06978..dac281aeae 100644 --- a/mindspore/ccsrc/dataset/engine/dataset_iterator.cc +++ b/mindspore/ccsrc/dataset/engine/dataset_iterator.cc @@ -14,6 +14,7 @@ * limitations under the License. */ #include "dataset/engine/dataset_iterator.h" +#include #include #include "dataset/core/data_type.h" #include "dataset/core/tensor.h" @@ -26,7 +27,7 @@ namespace mindspore { namespace dataset { // Constructor of the IteratorBase -IteratorBase::IteratorBase() : curr_buffer_(nullptr), eof_handled_(false) {} +IteratorBase::IteratorBase() : curr_buffer_(nullptr), eof_handled_(false), first_row_(true) {} IteratorBase::~IteratorBase() = default; @@ -46,6 +47,19 @@ Status IteratorBase::GetNextAsMap(TensorMap *out_map) { return Status::OK(); } + // The column name mapping is needed to be able to produce the tensor map output. + // The column name mapping comes from the source operator that is producing the data into the iterator. + // To avoid having to fetch this for every time, we'll take a local copy of the column name id mapping + // and save in the iterator. We only have to do this once. All subsequent iterations use the same mapping. + // Note: This can only be done after the first row has been produced, as this guarantees the the child has + // it's column mapping set up. + if (first_row_) { + // Determine the column name map by calling the derived class method to retrieve the column + // name map + col_name_id_map_ = this->GetColumnNameMap(); + first_row_ = false; + } + // Populate the out map from the row and return it for (auto colMap : col_name_id_map_) { (*out_map)[colMap.first] = std::move(curr_row[colMap.second]); @@ -87,7 +101,6 @@ Status DatasetIterator::FetchNextTensorRow(TensorRow *out_row) { // Check if we need to get a new DataBuffer to iterate. if (curr_buffer_ == nullptr || curr_buffer_->NumRows() == 0) { - col_name_id_map_.clear(); RETURN_IF_NOT_OK(root_->GetNextBuffer(&curr_buffer_)); // Since GetNextBuffer was used rather than GetNextInput(), it means we need to manually @@ -120,8 +133,6 @@ Status DatasetIterator::FetchNextTensorRow(TensorRow *out_row) { curr_buffer_.reset(); // explicitly free the eof buffer return Status::OK(); } - - col_name_id_map_ = curr_buffer_->column_name_map(); } // If we got this far, now it's time to pop that next row for return to caller @@ -157,6 +168,11 @@ Status DatasetIterator::GetOutputTypes(std::vector *out_types) { return Status::OK(); } +// Getter +std::unordered_map DatasetIterator::GetColumnNameMap() const { + return root_->column_name_id_map(); +} + // Constructor of the ChildIterator ChildIterator::ChildIterator(DatasetOp *current_op, int32_t worker_id, int32_t child_idx) : IteratorBase(), current_op_(current_op), child_idx_(child_idx), worker_id_(worker_id), end_epoch_(false) {} @@ -177,7 +193,6 @@ Status ChildIterator::FetchNextTensorRow(TensorRow *out_row) { // Check if we need to get a new DataBuffer to iterate. if (curr_buffer_ == nullptr || curr_buffer_->NumRows() == 0) { - col_name_id_map_.clear(); RETURN_IF_NOT_OK(current_op_->GetNextInput(&curr_buffer_, worker_id_, child_idx_)); // Unlike the DatasetIterator, this child iterator does not quit after eoe. @@ -194,8 +209,6 @@ Status ChildIterator::FetchNextTensorRow(TensorRow *out_row) { eof_handled_ = true; return Status::OK(); } - - col_name_id_map_ = curr_buffer_->column_name_map(); } // If we got this far, now it's time to pop that next row for return to caller @@ -226,5 +239,10 @@ Status ChildIterator::Drain() { } return Status::OK(); } + +// Getter +std::unordered_map ChildIterator::GetColumnNameMap() const { + return current_op_->child(child_idx_)->column_name_id_map(); +} } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/dataset/engine/dataset_iterator.h b/mindspore/ccsrc/dataset/engine/dataset_iterator.h index 032fc8e9a8..ddd4883a86 100644 --- a/mindspore/ccsrc/dataset/engine/dataset_iterator.h +++ b/mindspore/ccsrc/dataset/engine/dataset_iterator.h @@ -66,15 +66,13 @@ class IteratorBase { // Getter // @return The string to column id mapping. - std::unordered_map col_name_id_map() const { return col_name_id_map_; } + virtual std::unordered_map GetColumnNameMap() const = 0; protected: std::unique_ptr curr_buffer_; // holds the current buffer - - // The column name-id mapping for the current data buffer. + bool eof_handled_; // T/F if this op got an eof + bool first_row_; // internal tracking for first row case std::unordered_map col_name_id_map_; - - bool eof_handled_; // T/F if this op got an eof }; // The DatasetIterator derived class is for fetching rows off the end/root of the execution tree. @@ -104,6 +102,10 @@ class DatasetIterator : public IteratorBase { // @return Status - The error code return Status GetOutputTypes(std::vector *out_types); + // Getter + // @return The string to column id mapping. + std::unordered_map GetColumnNameMap() const override; + private: std::shared_ptr root_; // saves the root of the executionTree TensorRow device_queue_row_; @@ -134,6 +136,10 @@ class ChildIterator : public IteratorBase { // @return Status - The error code return Status Drain(); + // Getter + // @return The string to column id mapping. + std::unordered_map GetColumnNameMap() const override; + private: DatasetOp *current_op_; // The parent operator. We consume from it's children. int32_t child_idx_; // The specific child this iterator will fetch from. diff --git a/mindspore/ccsrc/dataset/engine/datasetops/barrier_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/barrier_op.cc index 3d5b682155..5ab2df4ac4 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/barrier_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/barrier_op.cc @@ -96,9 +96,8 @@ Status BarrierOp::operator()() { if (!curr_table->empty()) { std::unique_ptr curr_buffer = std::make_unique(buffer_id_, DataBuffer::kDeBFlagNone); curr_buffer->set_tensor_table(std::move(curr_table)); - curr_buffer->set_column_name_map(col_name_id_map_); MS_LOG(DEBUG) << "Barrier operator finished one buffer, pushing, rows " << curr_buffer->NumRows() << ", cols " - << curr_buffer->NumCols() << ", map " << col_name_id_map_.size() << "."; + << curr_buffer->NumCols() << ", map " << column_name_id_map_.size() << "."; RETURN_IF_NOT_OK(out_connector_->Add(0, std::move(curr_buffer))); buffer_id_++; } @@ -144,9 +143,10 @@ Status BarrierOp::prepare(TensorQTable *const table) { RETURN_IF_NOT_OK(blockCond()); table->push_back(std::move(new_row)); - // At this point we have 1 row produced, we take the old column map id and use it in the new table - // Initializing col_name_id_map_ from the first data buffer. - col_name_id_map_ = child_iterator_->col_name_id_map(); + + // Assign the column name id map + RETURN_IF_NOT_OK(DatasetOp::AssignColMapFromChild()); + // the update code below shouldn't do anything bad if the column name already exists. return Status::OK(); } diff --git a/mindspore/ccsrc/dataset/engine/datasetops/barrier_op.h b/mindspore/ccsrc/dataset/engine/datasetops/barrier_op.h index 8be55fba7e..379b8f146b 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/barrier_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/barrier_op.h @@ -18,7 +18,6 @@ #include #include -#include #include #include #include "dataset/core/tensor.h" @@ -157,8 +156,6 @@ class BarrierOp : public PipelineOp { int32_t rows_per_buffer_; // buffer_id int32_t buffer_id_; - // local variable to keep track of the buffer information - std::unordered_map col_name_id_map_; // iterator to pull new rows, we only have one child std::unique_ptr child_iterator_; // condition name, to support multiple barriers diff --git a/mindspore/ccsrc/dataset/engine/datasetops/batch_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/batch_op.cc index 018ff99e52..5080a719b4 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/batch_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/batch_op.cc @@ -74,7 +74,7 @@ Status BatchOp::operator()() { std::unique_ptr table = std::make_unique(); child_iterator_ = std::make_unique(this, 0, 0); RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&new_row)); - column_name_map_ = child_iterator_->col_name_id_map(); + RETURN_IF_NOT_OK(DatasetOp::AssignColMapFromChild()); // must come after the first fetch above int32_t cur_batch_size = 0; RETURN_IF_NOT_OK(GetBatchSize(&cur_batch_size, CBatchInfo(0, 0, 0))); while (child_iterator_->eof_handled() == false) { @@ -196,7 +196,6 @@ Status BatchOp::MakeBatchedBuffer(std::pair, CBatc std::unique_ptr dest_table = std::make_unique(); RETURN_IF_NOT_OK(BatchRows(&table_pair.first, &dest_table, table_pair.first->size())); (*db)->set_tensor_table(std::move(dest_table)); - (*db)->set_column_name_map(column_name_map_); return Status::OK(); } @@ -218,12 +217,12 @@ Status BatchOp::MapColumns(std::pair, CBatchInfo> TensorBatchTable input_table; input_table.reserve(pyfunc_column_names_.size()); for (std::string col_name : pyfunc_column_names_) { - if (column_name_map_.find(col_name) == column_name_map_.end()) { + if (column_name_id_map_.find(col_name) == column_name_id_map_.end()) { RETURN_STATUS_UNEXPECTED("column : '" + col_name + "' does not exist\n"); } TensorBatch tensor_batch; tensor_batch.reserve(table_pair->first->size()); - size_t col_idx = static_cast(column_name_map_[col_name]); + size_t col_idx = static_cast(column_name_id_map_[col_name]); for (size_t row_idx = 0; row_idx < table_pair->first->size(); row_idx++) { tensor_batch.push_back(std::move(table_pair->first->at(row_idx)[col_idx])); } @@ -236,7 +235,7 @@ Status BatchOp::MapColumns(std::pair, CBatchInfo> // Write back to TensorQTable for (size_t input_idx = 0; input_idx < pyfunc_column_names_.size(); input_idx++) { - size_t col_idx = static_cast(column_name_map_[pyfunc_column_names_[input_idx]]); + size_t col_idx = static_cast(column_name_id_map_[pyfunc_column_names_[input_idx]]); size_t row_id = 0; for (TensorRow &row : *(table_pair->first)) { row[col_idx] = std::move(output_table[input_idx][row_id++]); @@ -372,11 +371,12 @@ Status BatchOp::PadTensor(std::shared_ptr src, std::shared_ptr * Status BatchOp::PadColumns(std::pair, CBatchInfo> *table_pair) { RETURN_UNEXPECTED_IF_NULL(table_pair); // placeholder for now, might need this in the future - CHECK_FAIL_RETURN_UNEXPECTED(table_pair->first->front().size() == column_name_map_.size(), "col_name_map mismatch"); - std::vector pad_vals(column_name_map_.size(), 0); // value to pad each column's tensor with, default 0 + CHECK_FAIL_RETURN_UNEXPECTED(table_pair->first->front().size() == column_name_id_map_.size(), + "col_name_map mismatch"); + std::vector pad_vals(column_name_id_map_.size(), 0); // value to pad each column's tensor with, default 0 std::set pad_cols; // padded_shape provided by user, maximum shapes of current batch of tensors - std::vector> pad_shapes(column_name_map_.size()), max_shapes(column_name_map_.size()); + std::vector> pad_shapes(column_name_id_map_.size()), max_shapes(column_name_id_map_.size()); RETURN_IF_NOT_OK(UnpackPadInfo(&pad_cols, &pad_vals, &pad_shapes)); // init each shape in max_shape to {-1,-1...} init each unspecified shape in pad_shape to -1 as well @@ -418,14 +418,14 @@ Status BatchOp::PadColumns(std::pair, CBatchInfo> Status BatchOp::UnpackPadInfo(std::set *pad_cols, std::vector *pad_vals, std::vector> *pad_shapes) { if (pad_info_.empty()) { // if pad_info empty, pad every columns automatically - for (dsize_t col_id = 0; col_id < column_name_map_.size(); col_id++) { + for (dsize_t col_id = 0; col_id < column_name_id_map_.size(); col_id++) { pad_cols->insert(col_id); } } else { for (auto p : pad_info_) { - CHECK_FAIL_RETURN_UNEXPECTED(column_name_map_.find(p.first) != column_name_map_.end(), + CHECK_FAIL_RETURN_UNEXPECTED(column_name_id_map_.find(p.first) != column_name_id_map_.end(), "no column exists with name:" + p.first); - dsize_t col_id = static_cast(column_name_map_[p.first]); + dsize_t col_id = static_cast(column_name_id_map_[p.first]); CHECK_FAIL_RETURN_UNEXPECTED(col_id < pad_vals->size() && col_id < pad_shapes->size(), "col_id out of bound"); pad_cols->insert(col_id); (*pad_vals)[col_id] = p.second.second; // set pad values diff --git a/mindspore/ccsrc/dataset/engine/datasetops/batch_op.h b/mindspore/ccsrc/dataset/engine/datasetops/batch_op.h index f17239e378..f8faa9562e 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/batch_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/batch_op.h @@ -263,7 +263,6 @@ class BatchOp : public ParallelOp { std::vector pyfunc_column_names_; // Name of the columns to perform map op on std::map> pad_info_; // column names to perform padding on std::unique_ptr child_iterator_; // child iterator for fetching TensorRows 1 by 1 - std::unordered_map column_name_map_; // Map of column_name: column_index QueueList, CBatchInfo>> worker_queues_; // internal queue for syncing worker py::function batch_size_func_; // Function pointer of batch size function py::function batch_map_func_; // Function pointer of per batch map function diff --git a/mindspore/ccsrc/dataset/engine/datasetops/dataset_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/dataset_op.cc index adbf42487e..62a9ede587 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/dataset_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/dataset_op.cc @@ -36,7 +36,8 @@ DatasetOp::DatasetOp(int32_t op_connector_size) operator_id_(kInvalidOperatorId), tree_(nullptr), state_(OpState::kDeOpIdle), - op_ctrl_flags_(kDeOpNone) { + op_ctrl_flags_(kDeOpNone), + first_fetch_(true) { // The operator starts out with an invalid operator id. The only way to // get it out of invalid state is to assign the operator to an execution tree. } @@ -211,5 +212,42 @@ Status DatasetOp::Reset() { state_ = OpState::kDeOpRunning; return Status::OK(); } + +// gives a string output for the column map for handy debug printing +std::string DatasetOp::ColumnNameMapAsString() const { + std::string outStr = "Column name id map: "; + for (auto &it : column_name_id_map_) { + outStr += (" " + it.first + ":" + std::to_string(it.second)); + } + return outStr; +} + +// A helper function for providing assignment of the column name map. +// This grabs the map from child 0 and assigns it into this op. +// Can only be used if number of children is 1. +Status DatasetOp::AssignColMapFromChild() { + if (child_.size() > 1) { + RETURN_STATUS_UNEXPECTED("Assigning column name map from child only works for single-child operators."); + } + // Assign the correct column name map to this op by taking it from the input child. + // This must be done AFTER the first fetch, but only needs to be done once by the first worker to + // do the first fetch. + if (first_fetch_) { + // If there was a single worker, or this is being called from a master thread in a parallel op, + // then the mutex is not really needed here, although it's harmless. + std::unique_lock lock(column_name_map_mutex_); + // If the map has not been set up yet, then we are the first one in to set it up. The first_fetch_ (dirty read) + // bool allows us to avoid acquiring the lock if the map has already been set. + if (column_name_id_map_.empty()) { + column_name_id_map_ = child_[0]->column_name_id_map(); + first_fetch_ = false; + if (column_name_id_map_.empty()) { + RETURN_STATUS_UNEXPECTED("Child column name map cannot be empty!"); + } + } + MS_LOG(INFO) << "Setting column map after first fetch:\n" << DatasetOp::ColumnNameMapAsString(); + } + return Status::OK(); +} } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/dataset/engine/datasetops/dataset_op.h b/mindspore/ccsrc/dataset/engine/datasetops/dataset_op.h index 0111f5239a..cbd3115074 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/dataset_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/dataset_op.h @@ -17,6 +17,9 @@ #define DATASET_ENGINE_DATASETOPS_DATASET_OP_H_ #include +#include +#include +#include #include #include "dataset/core/constants.h" #include "dataset/engine/db_connector.h" @@ -59,7 +62,7 @@ class DatasetOp : public std::enable_shared_from_this { // @param child - shared pointer to the child to add. Status AddChild(std::shared_ptr child); - // Getter function to get a shared pointer to our childAdds a operator to become our child. + // Getter function to get a shared pointer to our child // @param child_index - An operator can have n children. Indicates choose which child to return. std::shared_ptr child(int32_t child_index) const; @@ -194,20 +197,41 @@ class DatasetOp : public std::enable_shared_from_this { // @return Status virtual Status RegisterWorkerConnectors() { return Status::OK(); } + // Getter for the column name mapping + // @return The returned map + std::unordered_map column_name_id_map() const { return column_name_id_map_; } + + // Checks if the column name map has been set up yet for this op + // @return - T/F if the operator has the map set up + bool HasColumnNameMap() const { return (column_name_id_map_.empty()); } + + // gives a string output for the column map for handy debug printing + // @return - the column name map as a string + std::string ColumnNameMapAsString() const; + protected: // Adds a parent operator to this operator // @notes External callers do not have access to this function. // @param parent - The parent node to add void AddParent(const DatasetOp *parent); - std::vector> child_; // Child nodes - std::vector parent_; // Parent nodes. No ownership and read-only - int32_t oc_queue_size_; // Capacity for each out_connector_ - int32_t operator_id_; // Generated id for the node - ExecutionTree *tree_; // Back pointer to our tree. - OpState state_; // The state of the operator, Running, Idle, Terminated - uint32_t op_ctrl_flags_; // Flags for the operator - std::unique_ptr out_connector_; // Output Connector + // A helper function for providing an assignment of the column name map. + // This grabs the map from child 0 and assigns it into this op. + // Can only be used if number of children is 1. + // @return - Status + Status AssignColMapFromChild(); + + std::vector> child_; // Child nodes + std::vector parent_; // Parent nodes. No ownership and read-only + int32_t oc_queue_size_; // Capacity for each out_connector_ + int32_t operator_id_; // Generated id for the node + ExecutionTree *tree_; // Back pointer to our tree. + OpState state_; // The state of the operator, Running, Idle, Terminated + uint32_t op_ctrl_flags_; // Flags for the operator + std::unique_ptr out_connector_; // Output Connector + std::unordered_map column_name_id_map_; // Mapping between col index and col name + bool first_fetch_; // For use when setting column map + std::mutex column_name_map_mutex_; // For protecting shared access to the column map private: // Sets the operator id. diff --git a/mindspore/ccsrc/dataset/engine/datasetops/filter_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/filter_op.cc index 322942d56e..b97d35345e 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/filter_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/filter_op.cc @@ -75,10 +75,9 @@ Status FilterOp::EofReceived(int32_t) { return Status::OK(); } Status FilterOp::EoeReceived(int32_t) { return Status::OK(); } // Validating if each of the input_columns exists in the DataBuffer. -Status FilterOp::ValidateInColumns(const std::unordered_map &col_name_id_map, - const std::vector *input_columns) { +Status FilterOp::ValidateInColumns(const std::vector *input_columns) { for (const auto &inCol : *input_columns) { - bool found = col_name_id_map.find(inCol) != col_name_id_map.end() ? true : false; + bool found = column_name_id_map_.find(inCol) != column_name_id_map_.end() ? true : false; if (!found) { std::string err_msg = "input column name: " + inCol + " doesn't exist in the dataset columns."; RETURN_STATUS_UNEXPECTED(err_msg); @@ -125,6 +124,9 @@ Status FilterOp::WorkerEntry(int32_t worker_id) { continue; } + // Now that the first fetch is in, use the helper function to assign the column name map to this op. + RETURN_IF_NOT_OK(DatasetOp::AssignColMapFromChild()); + RETURN_IF_NOT_OK(CheckColumns(in_buffer.get(), &in_columns_)); // if the databuffer was all filtered, it is marked as kFilterEmpty. @@ -161,10 +163,9 @@ Status FilterOp::WorkerCompute(DataBuffer *in_buffer, std::unique_ptr col_map = in_buffer->column_name_map(); (void)std::transform( in_columns_.begin(), in_columns_.end(), std::back_inserter(to_process), - [&cur_row, &col_map](const auto &it) -> std::shared_ptr { return cur_row[col_map[it]]; }); + [&cur_row, this](const auto &it) -> std::shared_ptr { return cur_row[column_name_id_map_[it]]; }); } bool predicate = true; RETURN_IF_NOT_OK(InvokePredicateFunc(to_process, &predicate)); @@ -217,9 +218,8 @@ Status FilterOp::CheckColumns(const DataBuffer *in_buf, const std::vector col_name_id_map = in_buf->column_name_map(); // Check if there is invalid column name in the inColumns. - RETURN_IF_NOT_OK(ValidateInColumns(col_name_id_map, input_columns)); + RETURN_IF_NOT_OK(ValidateInColumns(input_columns)); return Status::OK(); } diff --git a/mindspore/ccsrc/dataset/engine/datasetops/filter_op.h b/mindspore/ccsrc/dataset/engine/datasetops/filter_op.h index 92312e0843..a2b5bfa541 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/filter_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/filter_op.h @@ -19,7 +19,6 @@ #include #include #include -#include #include #include #include "dataset/engine/datasetops/parallel_op.h" @@ -162,11 +161,9 @@ class FilterOp : public ParallelOp { // Private function for validating if each of the user specified input column names // exist in the DataBuffer. - // @param col_name_id_map The column name to index mapping obtained from DataBuffer. // @param input_columns The vector of input column names used in the current thread. // @return Status The error code return. - Status ValidateInColumns(const std::unordered_map &col_name_id_map, - const std::vector *input_columns); + Status ValidateInColumns(const std::vector *input_columns); // Private function for checking the column legality // @param in_buf A raw pointer to the DataBuffer. A raw pointer is fine because this function does not manage memory diff --git a/mindspore/ccsrc/dataset/engine/datasetops/map_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/map_op.cc index 9c5ecddf72..cff15ab573 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/map_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/map_op.cc @@ -147,28 +147,33 @@ Status MapOp::operator()() { Status MapOp::WorkerEntry(int32_t worker_id) { // Handshake with TaskManager that thread creation is successful. TaskManager::FindMe()->Post(); - - // MapOp is not using the ChildIterator class to fetch rows because it needs to track - // rows at a per-buffer level. ChildIterator abstracts the concept of buffers making it - // less convenient to use that interface for fetching. std::unique_ptr in_buffer; - // Loop until eof buffer is encountered - while (true) { - // Getting a databuffer to work on. - // When PerformanceMode is enabled, workers pop from the local queue. - // Otherwise, workers pop from the first child output Connector. - if (perf_mode_) { - RETURN_IF_NOT_OK(local_queues_[worker_id]->PopFront(&in_buffer)); - } else { - RETURN_IF_NOT_OK(child_[0]->GetNextBuffer(&in_buffer, worker_id)); - } + // Getting a databuffer to work on. + // Perform the first fetch here outside of the loop. This allows us to execute one-time only + // initializations that happen after the first fetch. + RETURN_IF_NOT_OK(FetchNextBuffer(&in_buffer, worker_id)); + + // Initialize details related to column selections and column map by calling WorkerEntryInit. + // WorkerEntryInit contains thread-safe lock to ensure that this init work is only performed once + // by the first worker to enter the codepath. All other threads will share the const info that + // gets set up here going forward. + // Special case: if there's more threads than buffers, some threads simply get the final control + // messages (eoe/eof), and so they will not perform the init work. + if (!in_buffer->eoe() && !in_buffer->eof()) { + RETURN_IF_NOT_OK(WorkerEntryInit(in_buffer.get())); + } + // Now that init work is done, drop into the main fetching loop. + // Map op does not use child iterator, and it needs to manually handle eoe and eof's itself + // rather than use the base-class defaults. + while (true) { // Handle EOE and EOF ourselves. Implicit eoe/eof handling in GetNextInput does not work // with Performance Mode design. if (in_buffer->eoe()) { // Calling base class EoeReceived to forward eoe buffer. RETURN_IF_NOT_OK(EoeReceived(worker_id)); + RETURN_IF_NOT_OK(FetchNextBuffer(&in_buffer, worker_id)); continue; } else if (in_buffer->eof()) { // Calling base class EofReceived to forward eof buffer. @@ -176,42 +181,24 @@ Status MapOp::WorkerEntry(int32_t worker_id) { break; } - // Boolean mapping, true means to keep the column. - std::vector keep_input_columns; - // Indices of the columns to process. - std::vector to_process_indices; - // The final column mapping after performing this map - std::unordered_map final_col_name_id_map; - - // Thread local variables to avoid lock. When in_columns_ is empty and workers will write - // the name of the first column into input_columns (thread local) instead of in_columns_ (thread global). - std::vector input_columns = in_columns_; - std::vector output_columns = out_columns_; - - // Initialize the above data structures - RETURN_IF_NOT_OK(WorkerEntryInit(in_buffer.get(), &keep_input_columns, &to_process_indices, &final_col_name_id_map, - &input_columns, &output_columns)); - std::unique_ptr new_tensor_table(std::make_unique()); // Perform the compute function of TensorOp(s) and store the result in new_tensor_table. - RETURN_IF_NOT_OK(WorkerCompute(in_buffer.get(), to_process_indices, new_tensor_table.get(), keep_input_columns, - &input_columns, &output_columns)); + RETURN_IF_NOT_OK(WorkerCompute(in_buffer.get(), new_tensor_table.get())); - // Update column name to index mapping because tensorOp might add/remove column. - in_buffer->set_column_name_map(final_col_name_id_map); // Replace the TensorTable in DataBuffer with the new one. in_buffer->set_tensor_table(std::move(new_tensor_table)); // Push the buffer onto the connector for next operator to consume. RETURN_IF_NOT_OK(out_connector_->Add(static_cast(worker_id), std::move(in_buffer))); + + // Fetch the next buffer and loop back to the top. + RETURN_IF_NOT_OK(FetchNextBuffer(&in_buffer, worker_id)); } return Status::OK(); } -Status MapOp::WorkerCompute(DataBuffer *in_buffer, const std::vector &to_process_indices, - TensorQTable *new_tensor_table, const std::vector &keep_input_columns, - std::vector *input_columns, std::vector *output_columns) { +Status MapOp::WorkerCompute(DataBuffer *in_buffer, TensorQTable *new_tensor_table) { // Getting number of rows and cols in this buffer. int32_t num_rows = in_buffer->NumRows(); int32_t num_cols = in_buffer->NumCols(); @@ -224,7 +211,7 @@ Status MapOp::WorkerCompute(DataBuffer *in_buffer, const std::vector &to RETURN_IF_NOT_OK(in_buffer->PopRow(&cur_row)); // Populate the Tensor from the current row to be processed by TensorOp - for (const auto &idx : to_process_indices) { + for (const auto &idx : to_process_indices_) { to_process.push_back(std::move(cur_row[idx])); } @@ -243,20 +230,20 @@ Status MapOp::WorkerCompute(DataBuffer *in_buffer, const std::vector &to } } - if (output_columns->size() != result_row.size()) { + if (out_columns_.size() != result_row.size()) { return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "Result of a tensorOp doesn't match output column names"); } - if (input_columns->size() == output_columns->size()) { + if (in_columns_.size() == out_columns_.size()) { for (size_t i = 0; i < result_row.size(); i++) { - cur_row[to_process_indices[i]] = std::move(result_row[i]); + cur_row[to_process_indices_[i]] = std::move(result_row[i]); } new_tensor_table->push_back(std::move(cur_row)); } else { // Add the columns we did not touch to the result_row. for (int32_t i = 0; i < num_cols; i++) { - if (keep_input_columns[i]) { + if (keep_input_columns_[i]) { result_row.push_back(std::move(cur_row[i])); } } @@ -269,92 +256,102 @@ Status MapOp::WorkerCompute(DataBuffer *in_buffer, const std::vector &to return Status::OK(); } -// Validating if each of the input_columns exists in the DataBuffer. -Status MapOp::ValidateInColumns(const std::unordered_map &col_name_id_map, - std::vector *input_columns) { - for (const auto &inCol : *input_columns) { - bool found = col_name_id_map.find(inCol) != col_name_id_map.end() ? true : false; - if (!found) { - std::string err_msg = "input column name: " + inCol + " doesn't exist in the dataset columns."; - RETURN_STATUS_UNEXPECTED(err_msg); - } - } - return Status::OK(); -} - // initialize some internal data structure used by WorkerEntry() -Status MapOp::WorkerEntryInit(const DataBuffer *in_buf, std::vector *keep_input_columns, - std::vector *to_process_indices, - std::unordered_map *final_col_name_id_map, - std::vector *input_columns, std::vector *output_columns) { +Status MapOp::WorkerEntryInit(const DataBuffer *in_buf) { int32_t num_rows = in_buf->NumRows(); int32_t num_cols = in_buf->NumCols(); if (num_rows == 0 || num_cols == 0) { RETURN_STATUS_UNEXPECTED("MapOp is getting an empty DataBuffer."); } - std::unordered_map col_name_id_map = in_buf->column_name_map(); - // Check if there is invalid column name in the inColumns. - RETURN_IF_NOT_OK(ValidateInColumns(col_name_id_map, input_columns)); - - // If input_columns is empty(), The col at index-0 will be picked. - if (input_columns->empty()) { - for (const auto &pair : col_name_id_map) { - if (pair.second == 0) { - MS_LOG(INFO) << "Input columns in map operator is empty, will apply to the first column in the current table."; - input_columns->push_back(pair.first); - break; + + // We can't use AssignColMapFromChild() here since we need to modify the column map. We need to be threadsafe + // though for saving the final map in the op, so use the lock here. + if (first_fetch_) { + std::unique_lock lock(column_name_map_mutex_); + // If the map has not been set up yet in the base class, then we are the first one in to set it up + // (and we are under protection of the mutex lock) + if (column_name_id_map_.empty()) { + std::unordered_map current_name_id_map = child_[0]->column_name_id_map(); + + // If input_columns is empty(), The col at index-0 will be picked. + if (in_columns_.empty()) { + for (const auto &pair : current_name_id_map) { + if (pair.second == 0) { + MS_LOG(INFO) << "Input columns empty for map op, will apply to the first column in the current table."; + in_columns_.push_back(pair.first); + break; + } + } + + // If caller didn't specify the out_col_names, assume they are same as the input_columns. + // This was done in the constructor, but if input columns was empty to start we have to redo it here. + if (out_columns_.empty() || out_columns_[0].empty()) { + out_columns_ = in_columns_; + } } - } - // If caller didn't specify the out_col_names, assume they are same as the input_columns. - if (output_columns->empty() || (*output_columns)[0].empty()) { - *output_columns = *input_columns; - } - } + // Before we continue, issue a sanity check to make sure the input columns from user and the incoming + // columns from child are correct + this->ValidateInColumns(current_name_id_map); - // initialize keep_input_columns, true means to keep the column. - keep_input_columns->resize(num_cols, true); - for (const auto &col_name : *input_columns) { - int32_t missed = col_name_id_map[col_name]; - (*keep_input_columns)[missed] = false; - } + // initialize keep_input_columns, true means to keep the column. + keep_input_columns_.resize(num_cols, true); + for (const auto &col_name : in_columns_) { + int32_t missed = current_name_id_map[col_name]; + keep_input_columns_[missed] = false; + } - // initialize to_process_indices. - for (const auto &col_name : *input_columns) { - to_process_indices->push_back(col_name_id_map[col_name]); - } + // initialize to_process_indices. + for (const auto &col_name : in_columns_) { + to_process_indices_.push_back(current_name_id_map[col_name]); + } - // Create the final column name to index mapping. - *final_col_name_id_map = CreateFinalColMap(&col_name_id_map, *keep_input_columns, input_columns, output_columns); + // Create the final column name to index mapping in the base class field + CreateFinalColMap(¤t_name_id_map); + first_fetch_ = false; + } + } // mutex lock will release here + MS_LOG(INFO) << "Column name map for map op set: " << this->ColumnNameMapAsString(); + return Status::OK(); +} + +// Validating if each of the input_columns exists in the DataBuffer. +Status MapOp::ValidateInColumns(const std::unordered_map &col_name_id_map) { + for (const auto &inCol : in_columns_) { + bool found = col_name_id_map.find(inCol) != col_name_id_map.end() ? true : false; + if (!found) { + std::string err_msg = "input column name: " + inCol + " doesn't exist in the dataset columns."; + RETURN_STATUS_UNEXPECTED(err_msg); + } + } return Status::OK(); } // Create the final column name to index mapping and get indices of the columns this mapop does not use. -std::unordered_map MapOp::CreateFinalColMap( - std::unordered_map *col_name_id_map, const std::vector &keep_input_columns, - std::vector *input_columns, std::vector *output_columns) { +void MapOp::CreateFinalColMap(std::unordered_map *col_name_id_map) { std::unordered_map final_col_name_id_map; size_t num_cols = col_name_id_map->size(); std::vector new_ids(num_cols); - if (input_columns->size() == output_columns->size()) { - for (size_t i = 0; i < input_columns->size(); i++) { - int32_t loc = (*col_name_id_map)[(*input_columns)[i]]; - (void)col_name_id_map->erase((*input_columns)[i]); - (*col_name_id_map)[(*output_columns)[i]] = loc; + if (in_columns_.size() == out_columns_.size()) { + for (size_t i = 0; i < in_columns_.size(); i++) { + int32_t loc = (*col_name_id_map)[in_columns_[i]]; + (void)col_name_id_map->erase(in_columns_[i]); + (*col_name_id_map)[out_columns_[i]] = loc; } - return *col_name_id_map; + // Set the base class final column id map result + column_name_id_map_ = *col_name_id_map; } else { int32_t fill_idx = 0; // First columns of the tables are occupied by the output columns from tensorOp. - for (const auto &col_name : *output_columns) { + for (const auto &col_name : out_columns_) { final_col_name_id_map[col_name] = fill_idx++; } // Creating new_ids mapping for the columns we keep. for (size_t i = 0; i < num_cols; i++) { - if (keep_input_columns[i]) { + if (keep_input_columns_[i]) { new_ids[i] = fill_idx++; } } @@ -364,12 +361,13 @@ std::unordered_map MapOp::CreateFinalColMap( for (const auto &pair : *col_name_id_map) { name = pair.first; int32_t old_id = pair.second; - if (keep_input_columns[old_id]) { + if (keep_input_columns_[old_id]) { final_col_name_id_map[name] = new_ids[old_id]; } } - return final_col_name_id_map; + // Set the base class final column id map result + column_name_id_map_ = final_col_name_id_map; } } } // namespace dataset diff --git a/mindspore/ccsrc/dataset/engine/datasetops/map_op.h b/mindspore/ccsrc/dataset/engine/datasetops/map_op.h index 4c9d27f9c7..a6a573146e 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/map_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/map_op.h @@ -186,6 +186,12 @@ class MapOp : public ParallelOp { // Variable to store the column name that the tensorOps are producing std::vector out_columns_; + // Boolean mapping, true means to keep the column. + std::vector keep_input_columns_; + + // Indices of the columns to process. + std::vector to_process_indices_; + // Performance mode is when the main thread creates local queues, pulls databuffers from the previous // op's Connector and distributes them to the local queues. Workers pull from the local queues. // If this flag is false, each worker pulls directly from the Connector. This use less resources @@ -201,51 +207,40 @@ class MapOp : public ParallelOp { // @return Status The error code return Status WorkerEntry(int32_t worker_id) override; // In: workerId assigned by tree_ + // Private helper function for getting the next buffer + // When PerformanceMode is enabled, workers pop from the local queue. + // Otherwise, workers pop from the first child output Connector. + // @param p_buffer - the buffer to return + // @return Status return code + Status FetchNextBuffer(std::unique_ptr *p_buffer, int32_t worker_id) { + if (perf_mode_) { + RETURN_IF_NOT_OK(local_queues_[worker_id]->PopFront(p_buffer)); + } else { + RETURN_IF_NOT_OK(child_[0]->GetNextBuffer(p_buffer, worker_id)); + } + return Status::OK(); + } + // Private function for worker thread to perform TensorOp's compute function and get the result. // @param in_buffer A raw pointer to the DataBuffer. A raw pointer is fine because this function doesn't manage memory // and is not shared with other threads. - // @param to_process_indices Indices of columns to be processed by the TensorOp. // @param[out] new_tensor_table A new Tensor Table to be populated in this function. - // @param keep_input_columns Keeping track of which columns to keep (not used by TensorOp). - // @param input_columns The vector of input column names used in the current thread. - // @param output_columns The vector of output column names used in the current thread. - Status WorkerCompute(DataBuffer *in_buffer, const std::vector &to_process_indices, - TensorQTable *new_tensor_table, const std::vector &keep_input_columns, - std::vector *input_columns, std::vector *output_columns); - - // Private function for validating if each of the user specified input column names - // exist in the DataBuffer. - // @param col_name_id_map The column name to index mapping obtained from DataBuffer. - // @param input_columns The vector of input column names used in the current thread. - // @return Status The error code return - Status ValidateInColumns(const std::unordered_map &col_name_id_map, - std::vector *input_columns); + Status WorkerCompute(DataBuffer *in_buffer, TensorQTable *new_tensor_table); // Private function that create the final column name to index mapping and // get indices of the columns this mapop does not use. - // @param col_name_id_map The column name to index mapping obtained from DataBuffer. - // @param keep_input_columns To mark which columns are to be kept (not used in mapOp). - // @param input_columns The vector of input column names used in the current thread. - // @param output_columns The vector of output column names used in the current thread. - // @return finalColNameIdMap The final column name to index mapping. - std::unordered_map CreateFinalColMap(std::unordered_map *col_name_id_map, - const std::vector &keep_input_columns, - std::vector *input_columns, - std::vector *output_columns); + // @param col_name_id_map The column name to index mapping obtained from child operator + void CreateFinalColMap(std::unordered_map *col_name_id_map); // Private function that initialize some internal data structure used by WorkerEntry() // @param in_buf A raw pointer to the DataBuffer. A raw pointer is fine because this function does not manage memory // and is not shared with other threads. - // @param[out] keep_input_columns Keeping track of which columns to keep (not used by TensorOp) - // @param[out] to_process_indices Indices of columns to be processed by the TensorOp - // @param[out] final_col_name_id_map Create the final column name id map. This final mapping will replace the old one - // if the TensorOp Compute() is successful. - // @param input_columns The vector of input column names used in the current thread. - // @param output_columns The vector of output column names used in the current thread. - Status WorkerEntryInit(const DataBuffer *in_buf, std::vector *keep_input_columns, - std::vector *to_process_indices, - std::unordered_map *final_col_name_id_map, - std::vector *input_columns, std::vector *output_columns); + Status WorkerEntryInit(const DataBuffer *in_buf); + + // Validating if each of the input_columns exists in the DataBuffer. + // @param - the column map to check + // @return - status return code + Status ValidateInColumns(const std::unordered_map &col_name_id_map); }; } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/dataset/engine/datasetops/parallel_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/parallel_op.cc index 2eeb931554..c0a8d95f15 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/parallel_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/parallel_op.cc @@ -15,17 +15,12 @@ */ #include "dataset/engine/datasetops/parallel_op.h" -#include #include -#include #include -#include "dataset/engine/data_schema.h" #include "dataset/engine/datasetops/dataset_op.h" #include "dataset/engine/execution_tree.h" #include "dataset/core/config_manager.h" #include "dataset/engine/db_connector.h" - -#include "dataset/engine/datasetops/source/storage_client.h" #include "dataset/util/task_manager.h" namespace mindspore { diff --git a/mindspore/ccsrc/dataset/engine/datasetops/pipeline_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/pipeline_op.cc index 69ace1ed9a..46eded8ea1 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/pipeline_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/pipeline_op.cc @@ -13,9 +13,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include "dataset/engine/datasetops/pipeline_op.h" #include #include -#include "dataset/engine/datasetops/pipeline_op.h" namespace mindspore { namespace dataset { diff --git a/mindspore/ccsrc/dataset/engine/datasetops/project_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/project_op.cc index 128d3e68e5..f99319396f 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/project_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/project_op.cc @@ -73,35 +73,40 @@ void ProjectOp::Print(std::ostream &out, bool show_all) const { Status ProjectOp::GetNextBuffer(std::unique_ptr *p_buffer, int32_t worker_id, bool retry_if_eoe) { RETURN_IF_NOT_OK(child_[0]->GetNextBuffer(p_buffer, worker_id, retry_if_eoe)); if (!((*p_buffer)->eoe()) && !((*p_buffer)->eof())) { + // Only for the first buffer fetched, get the column map of the incoming data and save it + // into our own column name map after making the appropriate mods + // We cannot use the super class AssignColMapFromChild here because we're making a modification of the + // map from the child map. + if (first_fetch_) { + std::unordered_map child_column_name_mapping = child_[0]->column_name_id_map(); + for (size_t i = 0; i < columns_to_project_.size(); i++) { + std::string ¤t_column = columns_to_project_[i]; + if (child_column_name_mapping.find(current_column) == child_column_name_mapping.end()) { + std::string err_msg = "ProjectOp: column " + current_column + " does not exist in child operator."; + RETURN_STATUS_UNEXPECTED(err_msg); + } + // Setup the new column name mapping for ourself (base class field) + column_name_id_map_[current_column] = i; + projected_column_indices_.push_back(child_column_name_mapping[current_column]); + } + first_fetch_ = false; // we only need to do this path once + } RETURN_IF_NOT_OK(Project(p_buffer)); } return Status::OK(); } Status ProjectOp::Project(std::unique_ptr *data_buffer) { - std::unordered_map column_name_mapping = (*data_buffer)->column_name_map(); - std::unordered_map new_column_name_mapping; - std::vector projected_column_indices; - for (size_t i = 0; i < columns_to_project_.size(); i++) { - std::string ¤t_column = columns_to_project_[i]; - if (column_name_mapping.find(current_column) == column_name_mapping.end()) { - std::string err_msg = "ProjectOp: column " + current_column + " does not exist in this buffer."; - RETURN_STATUS_UNEXPECTED(err_msg); - } - new_column_name_mapping[current_column] = i; - projected_column_indices.push_back(column_name_mapping[current_column]); - } std::unique_ptr new_tensor_table = std::make_unique(); while ((*data_buffer)->NumRows() > 0) { TensorRow current_row; RETURN_IF_NOT_OK((*data_buffer)->PopRow(¤t_row)); TensorRow new_row; - (void)std::transform(projected_column_indices.begin(), projected_column_indices.end(), std::back_inserter(new_row), - [¤t_row](uint32_t x) { return current_row[x]; }); + (void)std::transform(projected_column_indices_.begin(), projected_column_indices_.end(), + std::back_inserter(new_row), [¤t_row](uint32_t x) { return current_row[x]; }); new_tensor_table->push_back(new_row); } (*data_buffer)->set_tensor_table(std::move(new_tensor_table)); - (*data_buffer)->set_column_name_map(new_column_name_mapping); return Status::OK(); } diff --git a/mindspore/ccsrc/dataset/engine/datasetops/project_op.h b/mindspore/ccsrc/dataset/engine/datasetops/project_op.h index 09d022da86..25b6cc691e 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/project_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/project_op.h @@ -103,6 +103,7 @@ class ProjectOp : public PipelineOp { private: std::vector columns_to_project_; + std::vector projected_column_indices_; Status Project(std::unique_ptr *data_buffer); }; diff --git a/mindspore/ccsrc/dataset/engine/datasetops/rename_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/rename_op.cc index 92639b8809..a940bef0b8 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/rename_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/rename_op.cc @@ -67,10 +67,15 @@ Status RenameOp::operator()() { // if 1st eoe or eof, pass it on then return RETURN_STATUS_UNEXPECTED(err_msg); } + + // First, populate the column map from the input child. + // This will not be the final map for output from this op. + RETURN_IF_NOT_OK(DatasetOp::AssignColMapFromChild()); + // core rename functionality only needs to happen once, to identify the new column names/indexes + RETURN_IF_NOT_OK(RenameColumns()); + while (curr_buffer->eof() == false) { while (curr_buffer->eoe() == false) { - // core rename functionality - RETURN_IF_NOT_OK(RenameBuffer(&curr_buffer)); // push the renamed input buffer MS_LOG(DEBUG) << "Rename operator pushing next buffer."; RETURN_IF_NOT_OK(out_connector_->Add(0, std::move(curr_buffer))); @@ -89,17 +94,16 @@ Status RenameOp::operator()() { return Status::OK(); } -// renames buffer -Status RenameOp::RenameBuffer(std::unique_ptr *input_buffer) { +// renames the columns +Status RenameOp::RenameColumns() { // iterate over my index in input vector, find the corresponding position - const std::unordered_map col_name_id_map = (*input_buffer)->column_name_map(); std::unordered_map new_col_name_id_map = {}; // parameter for input check size_t found = 0; // iterate over all the pairs and if there is a name match with rename, rename the column and add it to new map // by doing it this way we recreate a new ColNameIdMap and allow for switching - for (const auto &pair : col_name_id_map) { + for (const auto &pair : column_name_id_map_) { std::string name = pair.first; int32_t id = pair.second; // find name @@ -126,7 +130,9 @@ Status RenameOp::RenameBuffer(std::unique_ptr *input_buffer) { std::string err_msg = "Renamed column doesn't exist in dataset"; RETURN_STATUS_UNEXPECTED(err_msg); } - (*input_buffer)->set_column_name_map(new_col_name_id_map); + + // Now, overwrite our column map with the new renamed columns/id's + column_name_id_map_ = new_col_name_id_map; return Status::OK(); } diff --git a/mindspore/ccsrc/dataset/engine/datasetops/rename_op.h b/mindspore/ccsrc/dataset/engine/datasetops/rename_op.h index 5d820a3d0b..f91b6af931 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/rename_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/rename_op.h @@ -112,8 +112,7 @@ class RenameOp : public PipelineOp { protected: // Rename core functionality - // @param input_buffer buffer to run rename on - Status RenameBuffer(std::unique_ptr *input_buffer); + Status RenameColumns(); // Variable to store the input column names std::vector in_columns_; diff --git a/mindspore/ccsrc/dataset/engine/datasetops/repeat_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/repeat_op.cc index 065631eb31..bcda2dab45 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/repeat_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/repeat_op.cc @@ -122,6 +122,8 @@ Status RepeatOp::GetNextBuffer(std::unique_ptr *p_buffer, int32_t wo if (buf->eof()) { RETURN_IF_NOT_OK(EofReceived(worker_id)); } + // Update the column name map if needed + RETURN_IF_NOT_OK(DatasetOp::AssignColMapFromChild()); *p_buffer = std::move(buf); return Status::OK(); } diff --git a/mindspore/ccsrc/dataset/engine/datasetops/repeat_op.h b/mindspore/ccsrc/dataset/engine/datasetops/repeat_op.h index 8497b4cf3c..3a857a3f89 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/repeat_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/repeat_op.h @@ -122,6 +122,7 @@ class RepeatOp : public PipelineOp { int32_t max_repeats_; // The number of repeats that the user requested int32_t repeat_count_; // A counter for the current number of executed repeats std::vector> eoe_ops_; // List of operators that can generate EOE underneath this repeat. + bool first_fetch_; // Track the first fetch from this op }; } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/dataset/engine/datasetops/shuffle_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/shuffle_op.cc index 9867945e36..0f10d3106a 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/shuffle_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/shuffle_op.cc @@ -185,7 +185,6 @@ Status ShuffleOp::operator()() { if (new_buffer_table->size() == rows_per_buffer_ || shuffle_last_row_idx_ == 0) { auto new_buffer = std::make_unique(buffer_counter_, DataBuffer::kDeBFlagNone); new_buffer->set_tensor_table(std::move(new_buffer_table)); - new_buffer->set_column_name_map(column_name_map_); buffer_counter_++; MS_LOG(DEBUG) << "Shuffle operator sending a buffer to output."; RETURN_IF_NOT_OK(out_connector_->Add(0, std::move(new_buffer))); @@ -266,8 +265,8 @@ Status ShuffleOp::InitShuffleBuffer() { RETURN_STATUS_UNEXPECTED("Unable to fetch a single row for shuffle buffer."); } - // Take a copy of the column name mapping. We'll use this when constructing output buffers later. - column_name_map_ = child_iterator_->col_name_id_map(); + // Now that a first fetch is done, assign the column map for this operator + RETURN_IF_NOT_OK(DatasetOp::AssignColMapFromChild()); // Now fill the rest of the shuffle buffer until we are unable to get the next row or we reached // the desired shuffle buffer size. diff --git a/mindspore/ccsrc/dataset/engine/datasetops/shuffle_op.h b/mindspore/ccsrc/dataset/engine/datasetops/shuffle_op.h index 0294744ebd..23f6f15c41 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/shuffle_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/shuffle_op.h @@ -185,7 +185,6 @@ class ShuffleOp : public PipelineOp { std::unique_ptr shuffle_buffer_; int32_t shuffle_last_row_idx_; // Internal tracking of the last slot of our shuffle buffer int32_t shuffle_buffer_state_; // State tracking for the shuffle buffer phases of work - std::unordered_map column_name_map_; // A mapping between column index to column name. std::unique_ptr child_iterator_; // An iterator for fetching. }; diff --git a/mindspore/ccsrc/dataset/engine/datasetops/skip_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/skip_op.cc index f6eec1cf94..d3edd98909 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/skip_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/skip_op.cc @@ -84,6 +84,10 @@ Status SkipOp::operator()() { TaskManager::FindMe()->Post(); std::unique_ptr curr_buffer; RETURN_IF_NOT_OK(GetNextInput(&curr_buffer)); + + // After the first buffer fetch above we can do the one-time assign of the column name map + RETURN_IF_NOT_OK(DatasetOp::AssignColMapFromChild()); + while (curr_buffer->eof() == false) { // Reset count skip_count_ = 0; diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/celeba_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/source/celeba_op.cc index 896cf94044..06c2b06727 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/celeba_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/celeba_op.cc @@ -80,8 +80,9 @@ CelebAOp::CelebAOp(int32_t num_workers, int32_t rows_per_buffer, const std::stri num_rows_exact_(0), num_samples_(num_samples), dataset_type_(dataset_type) { + // Set the column name map (base class field) for (int32_t index = 0; index < data_schema_->NumColumns(); index++) { - col_name_map_[data_schema_->column(index).name()] = index; + column_name_id_map_[data_schema_->column(index).name()] = index; } attr_info_queue_ = std::make_unique>>(queue_size); @@ -385,7 +386,6 @@ Status CelebAOp::LoadBuffer(const std::vector &keys, std::unique_ptrset_tensor_table(std::move(deq)); - (*db)->set_column_name_map(col_name_map_); return Status::OK(); } diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/celeba_op.h b/mindspore/ccsrc/dataset/engine/datasetops/source/celeba_op.h index 55056c2a49..e0055441ef 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/celeba_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/celeba_op.h @@ -21,7 +21,6 @@ #include #include #include -#include #include #include @@ -232,7 +231,6 @@ class CelebAOp : public ParallelOp, RandomAccessOp { std::set extensions_; // extensions allowed std::unique_ptr data_schema_; std::shared_ptr sampler_; - std::unordered_map col_name_map_; std::unique_ptr>> attr_info_queue_; int64_t num_rows_in_attr_file_; // rows number specified in attr file int64_t num_rows_exact_; // exact rows number,maybe is less than rows_num_in_attr_file_ diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/cifar_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/source/cifar_op.cc index 7e880dd51c..4a0a882103 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/cifar_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/cifar_op.cc @@ -88,8 +88,9 @@ CifarOp::CifarOp(CifarType type, int32_t num_works, int32_t rows_per_buf, const num_rows_(0), row_cnt_(0), buf_cnt_(0) { + // set the column name map (base class field) for (uint32_t i = 0; i < data_schema_->NumColumns(); ++i) { - col_name_map_[data_schema_->column(i).name()] = i; + column_name_id_map_[data_schema_->column(i).name()] = i; } constexpr uint64_t kUtilQueueSize = 512; cifar_raw_data_block_ = std::make_unique>>(kUtilQueueSize); @@ -221,7 +222,6 @@ Status CifarOp::LoadBuffer(const std::vector &keys, std::unique_ptrpush_back(std::move(trow)); } (*db)->set_tensor_table(std::move(deq)); - (*db)->set_column_name_map(col_name_map_); return Status::OK(); } diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/cifar_op.h b/mindspore/ccsrc/dataset/engine/datasetops/source/cifar_op.h index 3cb26e5679..ade0998c30 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/cifar_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/cifar_op.h @@ -19,7 +19,6 @@ #include #include #include -#include #include #include @@ -221,7 +220,7 @@ class CifarOp : public ParallelOp, public RandomAccessOp { Status ParseCifarData(); // Method derived from RandomAccess Op, enable Sampler to get all ids for each calss - // @param (std::unordered_map> * map - key label, val all ids for this class + // @param (std::map> * map - key label, val all ids for this class // @return Status - The error code return Status GetClassIds(std::map> *cls_ids) const override; @@ -236,7 +235,6 @@ class CifarOp : public ParallelOp, public RandomAccessOp { int64_t buf_cnt_; WaitPost wp_; - std::unordered_map col_name_map_; QueueList> io_block_queues_; std::unique_ptr>> cifar_raw_data_block_; std::vector cifar_files_; diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/generator_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/source/generator_op.cc index a3d3eb5cce..bff7a7580e 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/generator_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/generator_op.cc @@ -93,10 +93,10 @@ void GeneratorOp::Dealloc() noexcept { Status GeneratorOp::Init() { // Reset BufferID buffer_id_ = 0; - // Setup column names map. - if (column_names_map_.empty()) { + // Setup column names map (base class field) + if (column_name_id_map_.empty()) { for (int i = 0; i < column_names_.size(); ++i) { - (void)column_names_map_.insert(std::make_pair(column_names_[i], i)); + column_name_id_map_[column_names_[i]] = i; } } Status ret; @@ -195,7 +195,6 @@ Status GeneratorOp::operator()() { while (!eof) { // Create new buffer each iteration fetched_buffer = std::make_unique(buffer_id_++, DataBuffer::kDeBFlagNone); - fetched_buffer->set_column_name_map(column_names_map_); std::unique_ptr fetched_table = std::make_unique(); bool eoe = false; { diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/generator_op.h b/mindspore/ccsrc/dataset/engine/datasetops/source/generator_op.h index 8165fed970..f8227dafa5 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/generator_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/generator_op.h @@ -20,7 +20,6 @@ #include #include #include -#include #include #include #include "dataset/core/data_type.h" @@ -130,7 +129,6 @@ class GeneratorOp : public PipelineOp { int32_t buffer_size_; py::object generator_; - std::unordered_map column_names_map_; int32_t buffer_id_; WaitPost wp_; diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/image_folder_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/source/image_folder_op.cc index 81bac3aee7..4503855b34 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/image_folder_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/image_folder_op.cc @@ -78,8 +78,9 @@ ImageFolderOp::ImageFolderOp(int32_t num_wkrs, int32_t rows_per_buffer, std::str buf_cnt_(0), sampler_ind_(0), dirname_offset_(0) { + // Set the column name map (base class field) for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) { - col_name_map_[data_schema_->column(i).name()] = i; + column_name_id_map_[data_schema_->column(i).name()] = i; } folder_name_queue_ = std::make_unique>(num_wkrs * queue_size); image_name_queue_ = std::make_unique>(num_wkrs * queue_size); @@ -237,7 +238,6 @@ Status ImageFolderOp::LoadBuffer(const std::vector &keys, std::unique_p deq->push_back(std::move(trow)); } (*db)->set_tensor_table(std::move(deq)); - (*db)->set_column_name_map(col_name_map_); return Status::OK(); } diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/image_folder_op.h b/mindspore/ccsrc/dataset/engine/datasetops/source/image_folder_op.h index 63fcfa483c..d27d220cb5 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/image_folder_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/image_folder_op.h @@ -23,7 +23,6 @@ #include #include #include -#include #include #include #include "dataset/core/tensor.h" @@ -210,7 +209,7 @@ class ImageFolderOp : public ParallelOp, public RandomAccessOp { Status GetNumRowsInDataset(int64_t *num) const override; // Method derived from RandomAccess Op, enable Sampler to get all ids for each class - // @param (std::unordered_map> * map - key label, val all ids for this class + // @param (std::map> * map - key label, val all ids for this class // @return Status - The error code return Status GetClassIds(std::map> *cls_ids) const override; @@ -275,7 +274,6 @@ class ImageFolderOp : public ParallelOp, public RandomAccessOp { int64_t dirname_offset_; WaitPost wp_; std::vector image_label_pairs_; - std::unordered_map col_name_map_; QueueList> io_block_queues_; // queues of IOBlocks std::unique_ptr> folder_name_queue_; std::unique_ptr> image_name_queue_; diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/manifest_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/source/manifest_op.cc index 065162c095..349ca7cf6e 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/manifest_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/manifest_op.cc @@ -76,8 +76,9 @@ ManifestOp::ManifestOp(int32_t num_works, int32_t rows_per_buffer, std::string f decode_(decode), usage_(usage), buf_cnt_(0) { + // Set the column name map (base class field) for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) { - col_name_map_[data_schema_->column(i).name()] = i; + column_name_id_map_[data_schema_->column(i).name()] = i; } io_block_queues_.Init(num_workers_, queue_size); (void)std::transform(usage_.begin(), usage_.end(), usage_.begin(), ::tolower); @@ -235,7 +236,6 @@ Status ManifestOp::LoadBuffer(const std::vector &keys, std::unique_ptr< deq->push_back(std::move(trow)); } (*db)->set_tensor_table(std::move(deq)); - (*db)->set_column_name_map(col_name_map_); return Status::OK(); } diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/manifest_op.h b/mindspore/ccsrc/dataset/engine/datasetops/source/manifest_op.h index 0f58993000..e015496acc 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/manifest_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/manifest_op.h @@ -19,7 +19,6 @@ #include #include #include -#include #include #include @@ -176,7 +175,7 @@ class ManifestOp : public ParallelOp, public RandomAccessOp { Status GetNumRowsInDataset(int64_t *num) const override; // Method derived from RandomAccess Op, enable Sampler to get all ids for each class - // @param (std::unordered_map> * map - key label, val all ids for this class + // @param (std::map> * map - key label, val all ids for this class // @return Status - The error code return Status GetClassIds(std::map> *cls_ids) const override; @@ -248,7 +247,6 @@ class ManifestOp : public ParallelOp, public RandomAccessOp { int64_t buf_cnt_; WaitPost wp_; - std::unordered_map col_name_map_; QueueList> io_block_queues_; std::map label_index_; std::vector>> image_labelname_; diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/mindrecord_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/source/mindrecord_op.cc index 1f34a1d373..bd6f038288 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/mindrecord_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/mindrecord_op.cc @@ -152,7 +152,7 @@ Status MindRecordOp::Init() { } for (int i = 0; i < static_cast(columns_to_load_.size()); i++) { - column_name_mapping_[columns_to_load_[i]] = i; + column_name_id_map_[columns_to_load_[i]] = i; } num_rows_ = shard_reader_->GetNumRows(); @@ -180,7 +180,7 @@ Status MindRecordOp::SetColumnsBlob() { columns_blob_index_ = std::vector(columns_to_load_.size(), -1); int32_t iBlob = 0; for (auto &blob_exact : columns_blob_exact) { - columns_blob_index_[column_name_mapping_[blob_exact]] = iBlob++; + columns_blob_index_[column_name_id_map_[blob_exact]] = iBlob++; } return Status::OK(); } @@ -501,7 +501,6 @@ Status MindRecordOp::WorkerEntry(int32_t worker_id) { Status MindRecordOp::GetBufferFromReader(std::unique_ptr *fetched_buffer, int64_t buffer_id, int32_t worker_id) { *fetched_buffer = std::make_unique(buffer_id, DataBuffer::kDeBFlagNone); - (*fetched_buffer)->set_column_name_map(column_name_mapping_); std::unique_ptr tensor_table = std::make_unique(); for (int32_t i = 0; i < rows_per_buffer_; ++i) { ShardTuple tupled_buffer; diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/mindrecord_op.h b/mindspore/ccsrc/dataset/engine/datasetops/source/mindrecord_op.h index 899919e529..fbf3d8f98e 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/mindrecord_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/mindrecord_op.h @@ -23,7 +23,6 @@ #include #include #include -#include #include #include @@ -262,7 +261,6 @@ class MindRecordOp : public ParallelOp { std::vector columns_blob_; // Blob Columns to load from dataset std::vector columns_blob_index_; // Blob Columns to load from dataset - std::unordered_map column_name_mapping_; std::unique_ptr shard_reader_; WaitPost shard_reader_wait_post_; QueueList> io_blk_queues_; diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/mnist_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/source/mnist_op.cc index c2abc129c8..8ca65fe20f 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/mnist_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/mnist_op.cc @@ -75,8 +75,9 @@ MnistOp::MnistOp(int32_t num_workers, int32_t rows_per_buffer, std::string folde rows_per_buffer_(rows_per_buffer), sampler_(std::move(sampler)), data_schema_(std::move(data_schema)) { + // set the column name map (base class field) for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) { - col_name_map_[data_schema_->column(i).name()] = i; + column_name_id_map_[data_schema_->column(i).name()] = i; } io_block_queues_.Init(num_workers, queue_size); } @@ -185,7 +186,6 @@ Status MnistOp::LoadBuffer(const std::vector &keys, std::unique_ptrpush_back(std::move(trow)); } (*db)->set_tensor_table(std::move(deq)); - (*db)->set_column_name_map(col_name_map_); return Status::OK(); } diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/mnist_op.h b/mindspore/ccsrc/dataset/engine/datasetops/source/mnist_op.h index e558dca473..397a51710e 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/mnist_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/mnist_op.h @@ -20,7 +20,6 @@ #include #include #include -#include #include #include @@ -158,7 +157,7 @@ class MnistOp : public ParallelOp, public RandomAccessOp { Status GetNumRowsInDataset(int64_t *num) const override; // Method derived from RandomAccess Op, enable Sampler to get all ids for each class - // @param (std::unordered_map> * map - key label, val all ids for this class + // @param (std::map> * map - key label, val all ids for this class // @return Status - The error code return Status GetClassIds(std::map> *cls_ids) const override; @@ -253,7 +252,6 @@ class MnistOp : public ParallelOp, public RandomAccessOp { std::shared_ptr sampler_; std::unique_ptr data_schema_; std::vector image_label_pairs_; - std::unordered_map col_name_map_; std::vector image_names_; std::vector label_names_; QueueList> io_block_queues_; diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/random_data_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/source/random_data_op.cc index 306f74ad6f..3d0da63544 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/random_data_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/random_data_op.cc @@ -54,8 +54,7 @@ Status RandomDataOp::Builder::Build(std::shared_ptr *out_op) { } // Extract the column name mapping from the schema and save it in the class. - // This will be needed when constructing buffers. - RETURN_IF_NOT_OK((*out_op)->data_schema_->GetColumnNameMap(&((*out_op)->column_name_map_))); + RETURN_IF_NOT_OK((*out_op)->data_schema_->GetColumnNameMap(&((*out_op)->column_name_id_map_))); return Status::OK(); } @@ -320,7 +319,6 @@ Status RandomDataOp::WorkerEntry(int32_t worker_id) { Status RandomDataOp::PackAndSend(int32_t worker_id, std::unique_ptr in_table) { auto new_buffer = std::make_unique(GetNextBufferId(), DataBuffer::kDeBFlagNone); new_buffer->set_tensor_table(std::move(in_table)); - new_buffer->set_column_name_map(column_name_map_); RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::move(new_buffer))); return Status::OK(); } diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/random_data_op.h b/mindspore/ccsrc/dataset/engine/datasetops/source/random_data_op.h index 84e4c42702..a9566b9c9f 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/random_data_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/random_data_op.h @@ -17,13 +17,11 @@ #define DATASET_ENGINE_DATASETOPS_SOURCE_RANDOM_DATA_OP_ #include -#include #include #include #include #include #include -#include #include #include "dataset/util/status.h" #include "dataset/core/tensor.h" @@ -259,7 +257,6 @@ class RandomDataOp : public ParallelOp { std::unique_ptr data_schema_; std::vector worker_max_rows_; std::vector worker_rows_packed_; - std::unordered_map column_name_map_; std::mt19937 rand_gen_; WaitPost epoch_sync_wait_post_; WaitPost all_out_; diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/sampler/sampler.h b/mindspore/ccsrc/dataset/engine/datasetops/source/sampler/sampler.h index 4ea221027a..13570323f1 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/sampler/sampler.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/sampler/sampler.h @@ -21,7 +21,6 @@ #include #include #include -#include #include "dataset/core/tensor.h" #include "dataset/engine/data_buffer.h" @@ -53,7 +52,7 @@ class RandomAccessOp { } // sampler gets label , imageIds from storageOp, this function is unique to PK - // @param std::unordered_map> * map + // @param std::map> * map // @return - The error code return virtual Status GetClassIds(std::map> *map) const { RETURN_STATUS_UNEXPECTED("GetClassIds needs to be override to support PK"); diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/storage_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/source/storage_op.cc index f310a097ee..60572b2012 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/storage_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/storage_op.cc @@ -247,11 +247,17 @@ Status StorageOp::InitOp(const std::vector &files_list, const std:: Status StorageOp::init() { // First a sanity check to make sure the StorageClient initialization has done the proper // handshake and initialized both the schema and the number of rows for the dataset. - if (store_client_->schema()->NumColumns() == 0 || num_rows_ == 0) { + const DataSchema *the_schema = store_client_->schema(); + if (the_schema->NumColumns() == 0 || num_rows_ == 0) { return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "Storage client did not run handshake to init schema and number of rows."); } + // Now that we have schema, generate the column name map (base class field) + for (int32_t i = 0; i < the_schema->NumColumns(); ++i) { + column_name_id_map_[the_schema->column(i).name()] = i; + } + // If the data buffer vector is not empty, then we may be redoing a scan again after a repeat. // In such a case, we have vector of nullptrs that used to hold the buffers. get rid of this // so we can reuse the vector. diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/text_file_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/source/text_file_op.cc index 16f2e29824..9336446852 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/text_file_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/text_file_op.cc @@ -121,8 +121,9 @@ Status TextFileOp::Init() { int32_t safe_queue_size = static_cast(std::ceil(text_files_list_.size() / num_workers_) + 1); io_block_queues_.Init(num_workers_, safe_queue_size); + // Set the column name mapping (base class field) for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) { - col_name_map_[data_schema_->column(i).name()] = i; + column_name_id_map_[data_schema_->column(i).name()] = i; } RETURN_IF_NOT_OK(ParallelOp::CreateWorkerConnector(worker_connector_size_)); @@ -164,7 +165,6 @@ Status TextFileOp::LoadFile(const std::string &file, const int64_t start_offset, int64_t rows_total = 0; std::string line; std::unique_ptr cur_buffer = std::make_unique(0, DataBuffer::BufferFlags::kDeBFlagNone); - cur_buffer->set_column_name_map(col_name_map_); std::unique_ptr tensor_table = std::make_unique(); while (getline(handle, line)) { @@ -189,7 +189,6 @@ Status TextFileOp::LoadFile(const std::string &file, const int64_t start_offset, RETURN_IF_NOT_OK(jagged_buffer_connector_->Add(worker_id, std::move(cur_buffer))); cur_buffer = std::make_unique(0, DataBuffer::BufferFlags::kDeBFlagNone); - cur_buffer->set_column_name_map(col_name_map_); tensor_table = std::make_unique(); rows_each_buffer = 0; } diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/text_file_op.h b/mindspore/ccsrc/dataset/engine/datasetops/source/text_file_op.h index 305b2596fa..8b8eda00fe 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/text_file_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/text_file_op.h @@ -20,7 +20,6 @@ #include #include #include -#include #include #include "dataset/util/status.h" @@ -260,7 +259,6 @@ class TextFileOp : public ParallelOp { bool finished_reading_dataset_; bool load_io_block_queue_; bool load_jagged_connector_; - std::unordered_map col_name_map_; std::unique_ptr jagged_buffer_connector_; }; } // namespace dataset diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/tf_buffer.cc b/mindspore/ccsrc/dataset/engine/datasetops/source/tf_buffer.cc index 372dcd2c1c..9a07974615 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/tf_buffer.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/tf_buffer.cc @@ -35,13 +35,7 @@ TFBuffer::TFBuffer( uint32_t id, // In: The id for this buffer BufferFlags flags, // In: The flags for this buffer const std::shared_ptr &storage_client) // In: Storage client that is related to this buffer type - : DataBuffer(id, flags), storage_client_(storage_client) { - // Initializing mColumnNameMap from the schema file - const DataSchema *the_schema = storage_client_->schema(); - for (int32_t i = 0; i < the_schema->NumColumns(); ++i) { - column_name_map_[the_schema->column(i).name()] = i; - } -} + : DataBuffer(id, flags), storage_client_(storage_client) {} // destructor TFBuffer::~TFBuffer() {} diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/tf_reader_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/source/tf_reader_op.cc index a2985b7656..aac2d08997 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/tf_reader_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/tf_reader_op.cc @@ -191,6 +191,11 @@ Status TFReaderOp::Init() { RETURN_IF_NOT_OK(CreateSchema(dataset_files_list_[0], columns_to_load_)); } + // Construct the column name map for this operator (base class field) + for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) { + column_name_id_map_[data_schema_->column(i).name()] = i; + } + if (total_rows_ == 0) { total_rows_ = data_schema_->num_rows(); } @@ -571,11 +576,6 @@ Status TFReaderOp::LoadFile(const std::string &filename, const int64_t start_off int64_t rows_read = 0; int64_t rows_total = 0; std::unique_ptr current_buffer = std::make_unique(0, DataBuffer::BufferFlags::kDeBFlagNone); - std::unordered_map column_name_map; - for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) { - column_name_map[data_schema_->column(i).name()] = i; - } - current_buffer->set_column_name_map(column_name_map); std::unique_ptr new_tensor_table = std::make_unique(); while (reader.peek() != EOF) { @@ -613,7 +613,6 @@ Status TFReaderOp::LoadFile(const std::string &filename, const int64_t start_off RETURN_IF_NOT_OK(jagged_buffer_connector_->Add(worker_id, std::move(current_buffer))); current_buffer = std::make_unique(0, DataBuffer::BufferFlags::kDeBFlagNone); - current_buffer->set_column_name_map(column_name_map); new_tensor_table = std::make_unique(); rows_read = 0; } diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/voc_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/source/voc_op.cc index c8fc6edbf9..d339c41a14 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/voc_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/voc_op.cc @@ -71,8 +71,9 @@ VOCOp::VOCOp(int32_t num_workers, int32_t rows_per_buffer, const std::string &fo rows_per_buffer_(rows_per_buffer), sampler_(std::move(sampler)), data_schema_(std::move(data_schema)) { + // Set the column name map (base class field) for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) { - col_name_map_[data_schema_->column(i).name()] = i; + column_name_id_map_[data_schema_->column(i).name()] = i; } io_block_queues_.Init(num_workers_, queue_size); } @@ -183,7 +184,6 @@ Status VOCOp::LoadBuffer(const std::vector &keys, std::unique_ptrpush_back(std::move(trow)); } (*db)->set_tensor_table(std::move(deq)); - (*db)->set_column_name_map(col_name_map_); return Status::OK(); } diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/voc_op.h b/mindspore/ccsrc/dataset/engine/datasetops/source/voc_op.h index 5751388519..9ceb3210b8 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/voc_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/voc_op.h @@ -18,7 +18,6 @@ #include #include -#include #include #include @@ -212,7 +211,6 @@ class VOCOp : public ParallelOp, public RandomAccessOp { WaitPost wp_; std::vector image_ids_; - std::unordered_map col_name_map_; QueueList> io_block_queues_; }; } // namespace dataset diff --git a/mindspore/ccsrc/dataset/engine/datasetops/take_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/take_op.cc index 7e6055027e..23bd1b08d3 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/take_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/take_op.cc @@ -72,6 +72,7 @@ Status TakeOp::operator()() { TaskManager::FindMe()->Post(); std::unique_ptr buf; RETURN_IF_NOT_OK(child_[0]->GetNextBuffer(&buf)); + RETURN_IF_NOT_OK(DatasetOp::AssignColMapFromChild()); while (buf->eof() == false) { if (take_count_ == max_takes_) { diff --git a/mindspore/ccsrc/dataset/engine/datasetops/zip_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/zip_op.cc index bb8bddcc09..fc99be8d88 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/zip_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/zip_op.cc @@ -92,9 +92,8 @@ Status ZipOp::operator()() { if (!curr_table->empty()) { std::unique_ptr curr_buffer = std::make_unique(buffer_id_, DataBuffer::kDeBFlagNone); curr_buffer->set_tensor_table(std::move(curr_table)); - curr_buffer->set_column_name_map(col_name_id_map_); MS_LOG(DEBUG) << "Zip operator finished one buffer, pushing, rows " << curr_buffer->NumRows() << ", cols " - << curr_buffer->NumCols() << ", map " << col_name_id_map_.size() << "."; + << curr_buffer->NumCols() << ", map " << column_name_id_map_.size() << "."; RETURN_IF_NOT_OK(out_connector_->Add(0, std::move(curr_buffer))); buffer_id_++; } @@ -141,20 +140,20 @@ Status ZipOp::prepare(TensorQTable *const table) { // At this point we have at least 1 row produced, so all child iterators have their column names such that we // can produce our column name map now. - col_name_id_map_ = {}; + column_name_id_map_ = {}; for (int32_t i = 0; i < children_num_; ++i) { // Initializing col_name_id_map_ from the first data buffer. - const std::unordered_map col_name_id_map = child_iterators_[i]->col_name_id_map(); - int32_t colsCurrent = col_name_id_map_.size(); + const std::unordered_map col_name_id_map = child_iterators_[i]->GetColumnNameMap(); + int32_t colsCurrent = column_name_id_map_.size(); // the update code below shouldn't do anything bad if the column name already exists. for (const auto &pair : col_name_id_map) { std::string name = pair.first; int32_t old_id = pair.second; // check if name already exists in column name descriptor - if (col_name_id_map_.count(name) == 1) { + if (column_name_id_map_.count(name) == 1) { RETURN_STATUS_UNEXPECTED("key already exists when zipping datasets"); } - col_name_id_map_[name] = old_id + colsCurrent; + column_name_id_map_[name] = old_id + colsCurrent; } } return Status::OK(); diff --git a/mindspore/ccsrc/dataset/engine/datasetops/zip_op.h b/mindspore/ccsrc/dataset/engine/datasetops/zip_op.h index 04d8ab0121..fa7f97f387 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/zip_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/zip_op.h @@ -136,7 +136,6 @@ class ZipOp : public PipelineOp { int32_t buffer_id_; bool draining_; bool eof_; - std::unordered_map col_name_id_map_; std::vector> child_iterators_; }; } // namespace dataset