Browse Source

Compute column map before launch the tree.

tags/v0.6.0-beta
Lixia Chen 5 years ago
parent
commit
4fa6d5621c
49 changed files with 419 additions and 268 deletions
  1. +2
    -5
      mindspore/ccsrc/dataset/engine/dataset_iterator.cc
  2. +0
    -1
      mindspore/ccsrc/dataset/engine/dataset_iterator.h
  3. +0
    -3
      mindspore/ccsrc/dataset/engine/datasetops/barrier_op.cc
  4. +0
    -1
      mindspore/ccsrc/dataset/engine/datasetops/batch_op.cc
  5. +0
    -1
      mindspore/ccsrc/dataset/engine/datasetops/bucket_batch_by_length_op.cc
  6. +0
    -1
      mindspore/ccsrc/dataset/engine/datasetops/build_vocab_op.cc
  7. +22
    -12
      mindspore/ccsrc/dataset/engine/datasetops/concat_op.cc
  8. +4
    -0
      mindspore/ccsrc/dataset/engine/datasetops/concat_op.h
  9. +15
    -21
      mindspore/ccsrc/dataset/engine/datasetops/dataset_op.cc
  10. +5
    -5
      mindspore/ccsrc/dataset/engine/datasetops/dataset_op.h
  11. +0
    -3
      mindspore/ccsrc/dataset/engine/datasetops/filter_op.cc
  12. +54
    -62
      mindspore/ccsrc/dataset/engine/datasetops/map_op.cc
  13. +8
    -5
      mindspore/ccsrc/dataset/engine/datasetops/map_op.h
  14. +22
    -18
      mindspore/ccsrc/dataset/engine/datasetops/project_op.cc
  15. +4
    -0
      mindspore/ccsrc/dataset/engine/datasetops/project_op.h
  16. +44
    -43
      mindspore/ccsrc/dataset/engine/datasetops/rename_op.cc
  17. +3
    -1
      mindspore/ccsrc/dataset/engine/datasetops/rename_op.h
  18. +0
    -2
      mindspore/ccsrc/dataset/engine/datasetops/repeat_op.cc
  19. +0
    -3
      mindspore/ccsrc/dataset/engine/datasetops/shuffle_op.cc
  20. +0
    -3
      mindspore/ccsrc/dataset/engine/datasetops/skip_op.cc
  21. +12
    -5
      mindspore/ccsrc/dataset/engine/datasetops/source/celeba_op.cc
  22. +4
    -0
      mindspore/ccsrc/dataset/engine/datasetops/source/celeba_op.h
  23. +12
    -4
      mindspore/ccsrc/dataset/engine/datasetops/source/cifar_op.cc
  24. +4
    -0
      mindspore/ccsrc/dataset/engine/datasetops/source/cifar_op.h
  25. +14
    -7
      mindspore/ccsrc/dataset/engine/datasetops/source/clue_op.cc
  26. +4
    -0
      mindspore/ccsrc/dataset/engine/datasetops/source/clue_op.h
  27. +12
    -4
      mindspore/ccsrc/dataset/engine/datasetops/source/coco_op.cc
  28. +4
    -0
      mindspore/ccsrc/dataset/engine/datasetops/source/coco_op.h
  29. +12
    -6
      mindspore/ccsrc/dataset/engine/datasetops/source/generator_op.cc
  30. +4
    -0
      mindspore/ccsrc/dataset/engine/datasetops/source/generator_op.h
  31. +12
    -4
      mindspore/ccsrc/dataset/engine/datasetops/source/image_folder_op.cc
  32. +4
    -0
      mindspore/ccsrc/dataset/engine/datasetops/source/image_folder_op.h
  33. +12
    -4
      mindspore/ccsrc/dataset/engine/datasetops/source/manifest_op.cc
  34. +4
    -0
      mindspore/ccsrc/dataset/engine/datasetops/source/manifest_op.h
  35. +11
    -4
      mindspore/ccsrc/dataset/engine/datasetops/source/mindrecord_op.cc
  36. +4
    -0
      mindspore/ccsrc/dataset/engine/datasetops/source/mindrecord_op.h
  37. +12
    -4
      mindspore/ccsrc/dataset/engine/datasetops/source/mnist_op.cc
  38. +4
    -0
      mindspore/ccsrc/dataset/engine/datasetops/source/mnist_op.h
  39. +10
    -3
      mindspore/ccsrc/dataset/engine/datasetops/source/random_data_op.cc
  40. +4
    -0
      mindspore/ccsrc/dataset/engine/datasetops/source/random_data_op.h
  41. +12
    -5
      mindspore/ccsrc/dataset/engine/datasetops/source/text_file_op.cc
  42. +4
    -0
      mindspore/ccsrc/dataset/engine/datasetops/source/text_file_op.h
  43. +12
    -5
      mindspore/ccsrc/dataset/engine/datasetops/source/tf_reader_op.cc
  44. +4
    -0
      mindspore/ccsrc/dataset/engine/datasetops/source/tf_reader_op.h
  45. +12
    -4
      mindspore/ccsrc/dataset/engine/datasetops/source/voc_op.cc
  46. +4
    -0
      mindspore/ccsrc/dataset/engine/datasetops/source/voc_op.h
  47. +0
    -1
      mindspore/ccsrc/dataset/engine/datasetops/take_op.cc
  48. +25
    -18
      mindspore/ccsrc/dataset/engine/datasetops/zip_op.cc
  49. +4
    -0
      mindspore/ccsrc/dataset/engine/datasetops/zip_op.h

+ 2
- 5
mindspore/ccsrc/dataset/engine/dataset_iterator.cc View File

@@ -27,7 +27,7 @@
namespace mindspore { namespace mindspore {
namespace dataset { namespace dataset {
// Constructor of the IteratorBase // Constructor of the IteratorBase
IteratorBase::IteratorBase() : curr_buffer_(nullptr), eof_handled_(false), first_row_(true) {}
IteratorBase::IteratorBase() : curr_buffer_(nullptr), eof_handled_(false) {}


IteratorBase::~IteratorBase() = default; IteratorBase::~IteratorBase() = default;


@@ -51,13 +51,10 @@ Status IteratorBase::GetNextAsMap(TensorMap *out_map) {
// The column name mapping comes from the source operator that is producing the data into the iterator. // The column name mapping comes from the source operator that is producing the data into the iterator.
// To avoid having to fetch this for every time, we'll take a local copy of the column name id mapping // To avoid having to fetch this for every time, we'll take a local copy of the column name id mapping
// and save in the iterator. We only have to do this once. All subsequent iterations use the same mapping. // and save in the iterator. We only have to do this once. All subsequent iterations use the same mapping.
// Note: This can only be done after the first row has been produced, as this guarantees the the child has
// it's column mapping set up.
if (first_row_) {
if (col_name_id_map_.empty()) {
// Determine the column name map by calling the derived class method to retrieve the column // Determine the column name map by calling the derived class method to retrieve the column
// name map // name map
col_name_id_map_ = this->GetColumnNameMap(); col_name_id_map_ = this->GetColumnNameMap();
first_row_ = false;
} }


// Populate the out map from the row and return it // Populate the out map from the row and return it


+ 0
- 1
mindspore/ccsrc/dataset/engine/dataset_iterator.h View File

@@ -72,7 +72,6 @@ class IteratorBase {
protected: protected:
std::unique_ptr<DataBuffer> curr_buffer_; // holds the current buffer std::unique_ptr<DataBuffer> curr_buffer_; // holds the current buffer
bool eof_handled_; // T/F if this op got an eof bool eof_handled_; // T/F if this op got an eof
bool first_row_; // internal tracking for first row case
std::unordered_map<std::string, int32_t> col_name_id_map_; std::unordered_map<std::string, int32_t> col_name_id_map_;
}; };




+ 0
- 3
mindspore/ccsrc/dataset/engine/datasetops/barrier_op.cc View File

@@ -144,9 +144,6 @@ Status BarrierOp::prepare(TensorQTable *const table) {


table->push_back(std::move(new_row)); table->push_back(std::move(new_row));


// Assign the column name id map
RETURN_IF_NOT_OK(DatasetOp::AssignColMapFromChild());

// the update code below shouldn't do anything bad if the column name already exists. // the update code below shouldn't do anything bad if the column name already exists.
return Status::OK(); return Status::OK();
} }


+ 0
- 1
mindspore/ccsrc/dataset/engine/datasetops/batch_op.cc View File

@@ -76,7 +76,6 @@ Status BatchOp::operator()() {
std::unique_ptr<TensorQTable> table = std::make_unique<TensorQTable>(); std::unique_ptr<TensorQTable> table = std::make_unique<TensorQTable>();
child_iterator_ = std::make_unique<ChildIterator>(this, 0, 0); child_iterator_ = std::make_unique<ChildIterator>(this, 0, 0);
RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&new_row)); RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&new_row));
RETURN_IF_NOT_OK(DatasetOp::AssignColMapFromChild()); // must come after the first fetch above
int32_t cur_batch_size = 0; int32_t cur_batch_size = 0;
RETURN_IF_NOT_OK(GetBatchSize(&cur_batch_size, CBatchInfo(0, 0, 0))); RETURN_IF_NOT_OK(GetBatchSize(&cur_batch_size, CBatchInfo(0, 0, 0)));
while (child_iterator_->eof_handled() == false) { while (child_iterator_->eof_handled() == false) {


+ 0
- 1
mindspore/ccsrc/dataset/engine/datasetops/bucket_batch_by_length_op.cc View File

@@ -115,7 +115,6 @@ Status BucketBatchByLengthOp::operator()() {
TensorRow current_row; TensorRow current_row;
child_iterator_ = std::make_unique<ChildIterator>(this, 0, 0); child_iterator_ = std::make_unique<ChildIterator>(this, 0, 0);
RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&current_row)); RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&current_row));
RETURN_IF_NOT_OK(AssignColMapFromChild());
while (!child_iterator_->eof_handled()) { while (!child_iterator_->eof_handled()) {
while (!current_row.empty()) { while (!current_row.empty()) {
int32_t element_length; int32_t element_length;


+ 0
- 1
mindspore/ccsrc/dataset/engine/datasetops/build_vocab_op.cc View File

@@ -86,7 +86,6 @@ Status BuildVocabOp::operator()() {
child_iterator_ = std::make_unique<ChildIterator>(this, 0, 0); child_iterator_ = std::make_unique<ChildIterator>(this, 0, 0);
TensorRow new_row; TensorRow new_row;
RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&new_row)); RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&new_row));
RETURN_IF_NOT_OK(AssignColMapFromChild());
if (!col_names_.empty()) { if (!col_names_.empty()) {
col_ids_.reserve(col_names_.size()); col_ids_.reserve(col_names_.size());
for (std::string col : col_names_) { for (std::string col : col_names_) {


+ 22
- 12
mindspore/ccsrc/dataset/engine/datasetops/concat_op.cc View File

@@ -66,12 +66,6 @@ Status ConcatOp::operator()() {
std::unique_ptr<DataBuffer> buf; std::unique_ptr<DataBuffer> buf;
RETURN_IF_NOT_OK(child_[0]->GetNextBuffer(&buf)); RETURN_IF_NOT_OK(child_[0]->GetNextBuffer(&buf));


// Obtain columns_name_id_map from child_[0]
column_name_id_map_ = child_[0]->column_name_id_map();
if (column_name_id_map_.empty()) {
RETURN_STATUS_UNEXPECTED("Child column name map cannot be empty!");
}

int eof_count = 0; int eof_count = 0;
while (eof_count != children_num_) { while (eof_count != children_num_) {
for (int i = 0; i < children_num_; i++) { for (int i = 0; i < children_num_; i++) {
@@ -115,17 +109,13 @@ Status ConcatOp::Verify(int32_t id, const std::unique_ptr<DataBuffer> &buf) {
buf->GetRow(0, &new_row); buf->GetRow(0, &new_row);


if (id == 0) { if (id == 0) {
// Obtain the column name, data type and data rank in child[0]
column_name_id_ = child_[id]->column_name_id_map();
// Obtain the data type and data rank in child[0]
for (auto item : new_row) { for (auto item : new_row) {
data_type_.push_back(item->type()); data_type_.push_back(item->type());
data_rank_.push_back(item->Rank()); data_rank_.push_back(item->Rank());
} }
} else { } else {
// Compare the column name, data type and data rank with these in child[0]
if (child_[id]->column_name_id_map() != column_name_id_) {
RETURN_STATUS_UNEXPECTED("The column name or column order is not the same with previous dataset.");
}
// Compare the data type and data rank with these in child[0]
int32_t index = 0; int32_t index = 0;
for (auto item : new_row) { for (auto item : new_row) {
if ((item->type() != data_type_[index]) || item->Rank() != data_rank_[index++]) { if ((item->type() != data_type_[index]) || item->Rank() != data_rank_[index++]) {
@@ -141,5 +131,25 @@ Status ConcatOp::PrepareNodePostAction() {
tree_->AddToRepeatStack(shared_from_this()); tree_->AddToRepeatStack(shared_from_this());
return Status::OK(); return Status::OK();
} }

// We need to overwrite the super class ComputeColMap here because the number of children is more than 1.
Status ConcatOp::ComputeColMap() {
if (column_name_id_map_.empty()) {
// Obtain columns_name_id_map from child_[0]
column_name_id_map_ = child_[0]->column_name_id_map();
if (column_name_id_map_.empty()) {
RETURN_STATUS_UNEXPECTED("Child column name map cannot be empty!");
}
// Verify all children have the same column name map
for (int32_t i = 0; i < child_.size(); ++i) {
if (child_[i]->column_name_id_map() != column_name_id_map_) {
RETURN_STATUS_UNEXPECTED("The column name or column order is not the same with previous dataset.");
}
}
} else {
MS_LOG(WARNING) << "Column name map is already set!";
}
return Status::OK();
}
} // namespace dataset } // namespace dataset
} // namespace mindspore } // namespace mindspore

+ 4
- 0
mindspore/ccsrc/dataset/engine/datasetops/concat_op.h View File

@@ -85,6 +85,10 @@ class ConcatOp : public PipelineOp {
// @return Name of the current Op // @return Name of the current Op
std::string Name() const override { return "ConcatOp"; } std::string Name() const override { return "ConcatOp"; }


// Private function for computing the assignment of the column name map.
// @return - Status
Status ComputeColMap() override;

private: private:
Status Verify(int32_t id, const std::unique_ptr<DataBuffer> &buf); Status Verify(int32_t id, const std::unique_ptr<DataBuffer> &buf);




+ 15
- 21
mindspore/ccsrc/dataset/engine/datasetops/dataset_op.cc View File

@@ -39,8 +39,7 @@ DatasetOp::DatasetOp(int32_t op_connector_size)
tree_(nullptr), tree_(nullptr),
state_(OpState::kDeOpIdle), state_(OpState::kDeOpIdle),
op_ctrl_flags_(kDeOpNone), op_ctrl_flags_(kDeOpNone),
out_connector_(nullptr),
first_fetch_(true) {
out_connector_(nullptr) {
// The operator starts out with an invalid operator id. The only way to // The operator starts out with an invalid operator id. The only way to
// get it out of invalid state is to assign the operator to an execution tree. // get it out of invalid state is to assign the operator to an execution tree.
} }
@@ -240,6 +239,10 @@ Status DatasetOp::PrepareNodePostAction() {
RETURN_IF_NOT_OK(out_connector_->Register(tree_->AllTasks())); RETURN_IF_NOT_OK(out_connector_->Register(tree_->AllTasks()));
} }
RETURN_IF_NOT_OK(this->RegisterWorkerConnectors()); RETURN_IF_NOT_OK(this->RegisterWorkerConnectors());

// Generate the column name map for the current op.
RETURN_IF_NOT_OK(this->ComputeColMap());

return Status::OK(); return Status::OK();
} }


@@ -262,30 +265,21 @@ std::string DatasetOp::ColumnNameMapAsString() const {
return outStr; return outStr;
} }


// A helper function for providing assignment of the column name map.
// This grabs the map from child 0 and assigns it into this op.
// Can only be used if number of children is 1.
Status DatasetOp::AssignColMapFromChild() {
// Computing the assignment of the column name map.
// This just inherits the column map from its first child, can only be used if the number of children is 1.
// Operations changing the column map must overwrite this function.
Status DatasetOp::ComputeColMap() {
if (child_.size() > 1) { if (child_.size() > 1) {
RETURN_STATUS_UNEXPECTED("Assigning column name map from child only works for single-child operators."); RETURN_STATUS_UNEXPECTED("Assigning column name map from child only works for single-child operators.");
} }
// Assign the correct column name map to this op by taking it from the input child.
// This must be done AFTER the first fetch, but only needs to be done once by the first worker to
// do the first fetch.
if (first_fetch_) {
// If there was a single worker, or this is being called from a master thread in a parallel op,
// then the mutex is not really needed here, although it's harmless.
std::unique_lock<std::mutex> lock(column_name_map_mutex_);
// If the map has not been set up yet, then we are the first one in to set it up. The first_fetch_ (dirty read)
// bool allows us to avoid acquiring the lock if the map has already been set.
if (column_name_id_map_.empty()) {
column_name_id_map_ = child_[0]->column_name_id_map();
if (column_name_id_map_.empty()) { if (column_name_id_map_.empty()) {
column_name_id_map_ = child_[0]->column_name_id_map();
first_fetch_ = false;
if (column_name_id_map_.empty()) {
RETURN_STATUS_UNEXPECTED("Child column name map cannot be empty!");
}
RETURN_STATUS_UNEXPECTED("Child column name map cannot be empty!");
} }
MS_LOG(DEBUG) << "Setting column map after first fetch:\n" << DatasetOp::ColumnNameMapAsString();
MS_LOG(DEBUG) << "Setting column map:\n" << DatasetOp::ColumnNameMapAsString();
} else {
MS_LOG(WARNING) << "Column name map is already set!";
} }
return Status::OK(); return Status::OK();
} }


+ 5
- 5
mindspore/ccsrc/dataset/engine/datasetops/dataset_op.h View File

@@ -277,11 +277,12 @@ class DatasetOp : public std::enable_shared_from_this<DatasetOp> {
// @param parent - The parent node to remove // @param parent - The parent node to remove
void RemoveParent(DatasetOp *parent); void RemoveParent(DatasetOp *parent);


// A helper function for providing an assignment of the column name map.
// This grabs the map from child 0 and assigns it into this op.
// Can only be used if number of children is 1.
// Compute the current op's column map using its child's column map.
// Get called during the tree post-prepare phase in PrepareNodePostAction.
// This base implementation just inherits the map from child 0, and can only be used if the number of children is 1.
// Operations changing the column map it inherits from the child must overwrite this function.
// @return - Status // @return - Status
Status AssignColMapFromChild();
virtual Status ComputeColMap();


std::vector<std::shared_ptr<DatasetOp>> child_; // Child nodes std::vector<std::shared_ptr<DatasetOp>> child_; // Child nodes
std::vector<DatasetOp *> parent_; // Parent nodes. No ownership std::vector<DatasetOp *> parent_; // Parent nodes. No ownership
@@ -292,7 +293,6 @@ class DatasetOp : public std::enable_shared_from_this<DatasetOp> {
uint32_t op_ctrl_flags_; // Flags for the operator uint32_t op_ctrl_flags_; // Flags for the operator
std::unique_ptr<DbConnector> out_connector_; // Output Connector std::unique_ptr<DbConnector> out_connector_; // Output Connector
std::unordered_map<std::string, int32_t> column_name_id_map_; // Mapping between col index and col name std::unordered_map<std::string, int32_t> column_name_id_map_; // Mapping between col index and col name
bool first_fetch_; // For use when setting column map
std::mutex column_name_map_mutex_; // For protecting shared access to the column map std::mutex column_name_map_mutex_; // For protecting shared access to the column map


private: private:


+ 0
- 3
mindspore/ccsrc/dataset/engine/datasetops/filter_op.cc View File

@@ -126,9 +126,6 @@ Status FilterOp::WorkerEntry(int32_t worker_id) {
continue; continue;
} }


// Now that the first fetch is in, use the helper function to assign the column name map to this op.
RETURN_IF_NOT_OK(DatasetOp::AssignColMapFromChild());

RETURN_IF_NOT_OK(CheckColumns(in_buffer.get(), &in_columns_)); RETURN_IF_NOT_OK(CheckColumns(in_buffer.get(), &in_columns_));


// if the databuffer was all filtered, it is marked as kFilterEmpty. // if the databuffer was all filtered, it is marked as kFilterEmpty.


+ 54
- 62
mindspore/ccsrc/dataset/engine/datasetops/map_op.cc View File

@@ -156,14 +156,15 @@ Status MapOp::WorkerEntry(int32_t worker_id) {
// initializations that happen after the first fetch. // initializations that happen after the first fetch.
RETURN_IF_NOT_OK(FetchNextBuffer(&in_buffer, worker_id)); RETURN_IF_NOT_OK(FetchNextBuffer(&in_buffer, worker_id));


// Initialize details related to column selections and column map by calling WorkerEntryInit.
// WorkerEntryInit contains thread-safe lock to ensure that this init work is only performed once
// by the first worker to enter the codepath. All other threads will share the const info that
// gets set up here going forward.
// Sanity check the databuffer.
// Special case: if there's more threads than buffers, some threads simply get the final control // Special case: if there's more threads than buffers, some threads simply get the final control
// messages (eoe/eof), and so they will not perform the init work.
// messages (eoe/eof), and so they will not perform the check.
if (!in_buffer->eoe() && !in_buffer->eof()) { if (!in_buffer->eoe() && !in_buffer->eof()) {
RETURN_IF_NOT_OK(WorkerEntryInit(in_buffer.get()));
int32_t num_rows = in_buffer->NumRows();
int32_t num_cols = in_buffer->NumCols();
if (num_rows == 0 || num_cols == 0) {
RETURN_STATUS_UNEXPECTED("MapOp is getting an empty DataBuffer.");
}
} }


// Now that init work is done, drop into the main fetching loop. // Now that init work is done, drop into the main fetching loop.
@@ -258,63 +259,18 @@ Status MapOp::WorkerCompute(DataBuffer *in_buffer, TensorQTable *new_tensor_tabl
return Status::OK(); return Status::OK();
} }


// initialize some internal data structure used by WorkerEntry()
Status MapOp::WorkerEntryInit(const DataBuffer *in_buf) {
int32_t num_rows = in_buf->NumRows();
int32_t num_cols = in_buf->NumCols();
if (num_rows == 0 || num_cols == 0) {
RETURN_STATUS_UNEXPECTED("MapOp is getting an empty DataBuffer.");
Status MapOp::ComputeColMap() {
// If the map has not been set up yet in the base class, then set it up
if (column_name_id_map_.empty()) {
std::unordered_map<std::string, int32_t> current_name_id_map = child_[0]->column_name_id_map();
// Initialize private variables
RETURN_IF_NOT_OK(InitPrivateVariable(&current_name_id_map));
// Create the final column name to index mapping in the base class field
CreateFinalColMap(&current_name_id_map);
MS_LOG(DEBUG) << "Column name map for map op set: " << this->ColumnNameMapAsString();
} else {
MS_LOG(WARNING) << "Column name map is already set!";
} }

// We can't use AssignColMapFromChild() here since we need to modify the column map. We need to be threadsafe
// though for saving the final map in the op, so use the lock here.
if (first_fetch_) {
std::unique_lock<std::mutex> lock(column_name_map_mutex_);
// If the map has not been set up yet in the base class, then we are the first one in to set it up
// (and we are under protection of the mutex lock)
if (column_name_id_map_.empty()) {
std::unordered_map<std::string, int32_t> current_name_id_map = child_[0]->column_name_id_map();

// If input_columns is empty(), The col at index-0 will be picked.
if (in_columns_.empty()) {
for (const auto &pair : current_name_id_map) {
if (pair.second == 0) {
MS_LOG(INFO) << "Input columns empty for map op, will apply to the first column in the current table.";
in_columns_.push_back(pair.first);
break;
}
}

// If caller didn't specify the out_col_names, assume they are same as the input_columns.
// This was done in the constructor, but if input columns was empty to start we have to redo it here.
if (out_columns_.empty() || out_columns_[0].empty()) {
out_columns_ = in_columns_;
}
}

// Before we continue, issue a sanity check to make sure the input columns from user and the incoming
// columns from child are correct
RETURN_IF_NOT_OK(this->ValidateInColumns(current_name_id_map));

// initialize keep_input_columns, true means to keep the column.
keep_input_columns_.resize(num_cols, true);
for (const auto &col_name : in_columns_) {
int32_t missed = current_name_id_map[col_name];
keep_input_columns_[missed] = false;
}

// initialize to_process_indices.
for (const auto &col_name : in_columns_) {
to_process_indices_.push_back(current_name_id_map[col_name]);
}

// Create the final column name to index mapping in the base class field
CreateFinalColMap(&current_name_id_map);
first_fetch_ = false;
}
} // mutex lock will release here

MS_LOG(DEBUG) << "Column name map for map op set: " << this->ColumnNameMapAsString();
return Status::OK(); return Status::OK();
} }


@@ -330,6 +286,42 @@ Status MapOp::ValidateInColumns(const std::unordered_map<std::string, int32_t> &
return Status::OK(); return Status::OK();
} }


Status MapOp::InitPrivateVariable(std::unordered_map<std::string, int32_t> *col_name_id_map) {
// If input_columns is empty(), The col at index-0 will be picked.
if (in_columns_.empty()) {
for (const auto &pair : *col_name_id_map) {
if (pair.second == 0) {
MS_LOG(INFO) << "Input columns empty for map op, will apply to the first column in the current table.";
in_columns_.push_back(pair.first);
break;
}
}

// If caller didn't specify the out_col_names, assume they are same as the input_columns.
// This was done in the constructor, but if input columns was empty to start we have to redo it here.
if (out_columns_.empty() || out_columns_[0].empty()) {
out_columns_ = in_columns_;
}
}

// Before we continue, issue a sanity check to make sure the input columns from user and the incoming
// columns from child are correct
RETURN_IF_NOT_OK(this->ValidateInColumns(*col_name_id_map));

// initialize keep_input_columns, true means to keep the column.
keep_input_columns_.resize(col_name_id_map->size(), true);
for (const auto &col_name : in_columns_) {
int32_t missed = (*col_name_id_map)[col_name];
keep_input_columns_[missed] = false;
}

// initialize to_process_indices.
for (const auto &col_name : in_columns_) {
to_process_indices_.push_back((*col_name_id_map)[col_name]);
}
return Status::OK();
}

// Create the final column name to index mapping and get indices of the columns this mapop does not use. // Create the final column name to index mapping and get indices of the columns this mapop does not use.
void MapOp::CreateFinalColMap(std::unordered_map<std::string, int32_t> *col_name_id_map) { void MapOp::CreateFinalColMap(std::unordered_map<std::string, int32_t> *col_name_id_map) {
std::unordered_map<std::string, int32_t> final_col_name_id_map; std::unordered_map<std::string, int32_t> final_col_name_id_map;


+ 8
- 5
mindspore/ccsrc/dataset/engine/datasetops/map_op.h View File

@@ -258,15 +258,18 @@ class MapOp : public ParallelOp {
// @param col_name_id_map The column name to index mapping obtained from child operator // @param col_name_id_map The column name to index mapping obtained from child operator
void CreateFinalColMap(std::unordered_map<std::string, int32_t> *col_name_id_map); void CreateFinalColMap(std::unordered_map<std::string, int32_t> *col_name_id_map);


// Private function that initialize some internal data structure used by WorkerEntry()
// @param in_buf A raw pointer to the DataBuffer. A raw pointer is fine because this function does not manage memory
// and is not shared with other threads.
Status WorkerEntryInit(const DataBuffer *in_buf);

// Validating if each of the input_columns exists in the DataBuffer. // Validating if each of the input_columns exists in the DataBuffer.
// @param - the column map to check // @param - the column map to check
// @return - status return code // @return - status return code
Status ValidateInColumns(const std::unordered_map<std::string, int32_t> &col_name_id_map); Status ValidateInColumns(const std::unordered_map<std::string, int32_t> &col_name_id_map);

// Private function for computing the assignment of the column name map.
// @return - Status
Status ComputeColMap() override;

// Private function for initializing private variables such as in_columns_, out_columns_.
// @return - Status
Status InitPrivateVariable(std::unordered_map<std::string, int32_t> *col_name_id_map);
}; };
} // namespace dataset } // namespace dataset
} // namespace mindspore } // namespace mindspore


+ 22
- 18
mindspore/ccsrc/dataset/engine/datasetops/project_op.cc View File

@@ -74,24 +74,6 @@ void ProjectOp::Print(std::ostream &out, bool show_all) const {
Status ProjectOp::GetNextBuffer(std::unique_ptr<DataBuffer> *p_buffer, int32_t worker_id, bool retry_if_eoe) { Status ProjectOp::GetNextBuffer(std::unique_ptr<DataBuffer> *p_buffer, int32_t worker_id, bool retry_if_eoe) {
RETURN_IF_NOT_OK(child_[0]->GetNextBuffer(p_buffer, worker_id, retry_if_eoe)); RETURN_IF_NOT_OK(child_[0]->GetNextBuffer(p_buffer, worker_id, retry_if_eoe));
if (!((*p_buffer)->eoe()) && !((*p_buffer)->eof())) { if (!((*p_buffer)->eoe()) && !((*p_buffer)->eof())) {
// Only for the first buffer fetched, get the column map of the incoming data and save it
// into our own column name map after making the appropriate mods
// We cannot use the super class AssignColMapFromChild here because we're making a modification of the
// map from the child map.
if (first_fetch_) {
std::unordered_map<std::string, int32_t> child_column_name_mapping = child_[0]->column_name_id_map();
for (size_t i = 0; i < columns_to_project_.size(); i++) {
std::string &current_column = columns_to_project_[i];
if (child_column_name_mapping.find(current_column) == child_column_name_mapping.end()) {
std::string err_msg = "ProjectOp: column " + current_column + " does not exist in child operator.";
RETURN_STATUS_UNEXPECTED(err_msg);
}
// Setup the new column name mapping for ourself (base class field)
column_name_id_map_[current_column] = i;
projected_column_indices_.push_back(child_column_name_mapping[current_column]);
}
first_fetch_ = false; // we only need to do this path once
}
RETURN_IF_NOT_OK(Project(p_buffer)); RETURN_IF_NOT_OK(Project(p_buffer));
} }
return Status::OK(); return Status::OK();
@@ -151,5 +133,27 @@ Status ProjectOp::Accept(NodePass *p, bool *modified) {
// Downcast shared pointer then call visitor // Downcast shared pointer then call visitor
return p->RunOnNode(std::static_pointer_cast<ProjectOp>(shared_from_this()), modified); return p->RunOnNode(std::static_pointer_cast<ProjectOp>(shared_from_this()), modified);
} }

// Compute the column map and save it into our own column name map
// We cannot use the super class ComputeColMap here because we're making a modification of the
// map from the child map.
Status ProjectOp::ComputeColMap() {
if (column_name_id_map_.empty()) {
std::unordered_map<std::string, int32_t> child_column_name_mapping = child_[0]->column_name_id_map();
for (size_t i = 0; i < columns_to_project_.size(); i++) {
std::string &current_column = columns_to_project_[i];
if (child_column_name_mapping.find(current_column) == child_column_name_mapping.end()) {
std::string err_msg = "ProjectOp: column " + current_column + " does not exist in child operator.";
RETURN_STATUS_UNEXPECTED(err_msg);
}
// Setup the new column name mapping for ourself (base class field)
column_name_id_map_[current_column] = i;
projected_column_indices_.push_back(child_column_name_mapping[current_column]);
}
} else {
MS_LOG(WARNING) << "Column name map is already set!";
}
return Status::OK();
}
} // namespace dataset } // namespace dataset
} // namespace mindspore } // namespace mindspore

+ 4
- 0
mindspore/ccsrc/dataset/engine/datasetops/project_op.h View File

@@ -116,6 +116,10 @@ class ProjectOp : public PipelineOp {
std::vector<int32_t> projected_column_indices_; std::vector<int32_t> projected_column_indices_;


Status Project(std::unique_ptr<DataBuffer> *data_buffer); Status Project(std::unique_ptr<DataBuffer> *data_buffer);

// Computing the assignment of the column name map.
// @return - Status
Status ComputeColMap() override;
}; };
} // namespace dataset } // namespace dataset
} // namespace mindspore } // namespace mindspore


+ 44
- 43
mindspore/ccsrc/dataset/engine/datasetops/rename_op.cc View File

@@ -69,12 +69,6 @@ Status RenameOp::operator()() {
RETURN_STATUS_UNEXPECTED(err_msg); RETURN_STATUS_UNEXPECTED(err_msg);
} }


// First, populate the column map from the input child.
// This will not be the final map for output from this op.
RETURN_IF_NOT_OK(DatasetOp::AssignColMapFromChild());
// core rename functionality only needs to happen once, to identify the new column names/indexes
RETURN_IF_NOT_OK(RenameColumns());

while (curr_buffer->eof() == false) { while (curr_buffer->eof() == false) {
while (curr_buffer->eoe() == false) { while (curr_buffer->eoe() == false) {
// push the renamed input buffer // push the renamed input buffer
@@ -95,45 +89,52 @@ Status RenameOp::operator()() {
return Status::OK(); return Status::OK();
} }


// renames the columns
Status RenameOp::RenameColumns() {
// iterate over my index in input vector, find the corresponding position
std::unordered_map<std::string, int32_t> new_col_name_id_map = {};
// parameter for input check
size_t found = 0;

// iterate over all the pairs and if there is a name match with rename, rename the column and add it to new map
// by doing it this way we recreate a new ColNameIdMap and allow for switching
for (const auto &pair : column_name_id_map_) {
std::string name = pair.first;
int32_t id = pair.second;
// find name
std::vector<std::string>::iterator it;
it = std::find(in_columns_.begin(), in_columns_.end(), name);
// for c input checks here we have to count the number of times we find the stuff in in_columns_
// because we iterate over the mInputList n times
if (it != in_columns_.end()) {
// found
found += 1;
int index = std::distance(in_columns_.begin(), it);
MS_LOG(DEBUG) << "Rename operator index found " << index << " value " << id << ".";

new_col_name_id_map[out_columns_[index]] = id;
} else {
// not found
MS_LOG(DEBUG) << "Rename operator index not found: " << id << " is the column id.";
new_col_name_id_map[name] = id;
// Rename core functionality to compute the new column name id map.
// We need to overwrite the super class ComputeColMap here because we're making a modification of the
// map from the child map.
Status RenameOp::ComputeColMap() {
if (column_name_id_map_.empty()) {
column_name_id_map_ = child_[0]->column_name_id_map();
// iterate over my index in input vector, find the corresponding position
std::unordered_map<std::string, int32_t> new_col_name_id_map = {};
// parameter for input check
size_t found = 0;

// iterate over all the pairs and if there is a name match with rename, rename the column and add it to new map
// by doing it this way we recreate a new ColNameIdMap and allow for switching
for (const auto &pair : column_name_id_map_) {
std::string name = pair.first;
int32_t id = pair.second;
// find name
std::vector<std::string>::iterator it;
it = std::find(in_columns_.begin(), in_columns_.end(), name);
// for c input checks here we have to count the number of times we find the stuff in in_columns_
// because we iterate over the mInputList n times
if (it != in_columns_.end()) {
// found
found += 1;
int index = std::distance(in_columns_.begin(), it);
MS_LOG(DEBUG) << "Rename operator index found " << index << " value " << id << ".";

new_col_name_id_map[out_columns_[index]] = id;
} else {
// not found
MS_LOG(DEBUG) << "Rename operator index not found: " << id << " is the column id.";
new_col_name_id_map[name] = id;
}
}
// only checks number of renamed columns have been found, this input check doesn't check everything
if (found != in_columns_.size()) {
MS_LOG(DEBUG) << "Rename operator column names found: " << found << " out of " << in_columns_.size() << ".";
std::string err_msg = "Renamed column doesn't exist in dataset";
RETURN_STATUS_UNEXPECTED(err_msg);
} }
}
// only checks number of renamed columns have been found, this input check doesn't check everything
if (found != in_columns_.size()) {
MS_LOG(DEBUG) << "Rename operator column names found: " << found << " out of " << in_columns_.size() << ".";
std::string err_msg = "Renamed column doesn't exist in dataset";
RETURN_STATUS_UNEXPECTED(err_msg);
}


// Now, overwrite our column map with the new renamed columns/id's
column_name_id_map_ = new_col_name_id_map;
// Now, overwrite our column map with the new renamed columns/id's
column_name_id_map_ = new_col_name_id_map;
} else {
MS_LOG(WARNING) << "Column name map is already set!";
}
return Status::OK(); return Status::OK();
} }




+ 3
- 1
mindspore/ccsrc/dataset/engine/datasetops/rename_op.h View File

@@ -122,7 +122,9 @@ class RenameOp : public PipelineOp {


protected: protected:
// Rename core functionality // Rename core functionality
Status RenameColumns();
// Computing the assignment of the new column name map.
// @return - Status
Status ComputeColMap() override;


// Variable to store the input column names // Variable to store the input column names
std::vector<std::string> in_columns_; std::vector<std::string> in_columns_;


+ 0
- 2
mindspore/ccsrc/dataset/engine/datasetops/repeat_op.cc View File

@@ -123,8 +123,6 @@ Status RepeatOp::GetNextBuffer(std::unique_ptr<DataBuffer> *p_buffer, int32_t wo
if (buf->eof()) { if (buf->eof()) {
RETURN_IF_NOT_OK(EofReceived(worker_id)); RETURN_IF_NOT_OK(EofReceived(worker_id));
} }
// Update the column name map if needed
RETURN_IF_NOT_OK(DatasetOp::AssignColMapFromChild());
*p_buffer = std::move(buf); *p_buffer = std::move(buf);
return Status::OK(); return Status::OK();
} }


+ 0
- 3
mindspore/ccsrc/dataset/engine/datasetops/shuffle_op.cc View File

@@ -266,9 +266,6 @@ Status ShuffleOp::InitShuffleBuffer() {
RETURN_STATUS_UNEXPECTED("Unable to fetch a single row for shuffle buffer."); RETURN_STATUS_UNEXPECTED("Unable to fetch a single row for shuffle buffer.");
} }


// Now that a first fetch is done, assign the column map for this operator
RETURN_IF_NOT_OK(DatasetOp::AssignColMapFromChild());

// Now fill the rest of the shuffle buffer until we are unable to get the next row or we reached // Now fill the rest of the shuffle buffer until we are unable to get the next row or we reached
// the desired shuffle buffer size. // the desired shuffle buffer size.
while (!new_row.empty() && shuffle_buffer_->size() < static_cast<size_t>(shuffle_size_ - 1)) { while (!new_row.empty() && shuffle_buffer_->size() < static_cast<size_t>(shuffle_size_ - 1)) {


+ 0
- 3
mindspore/ccsrc/dataset/engine/datasetops/skip_op.cc View File

@@ -86,9 +86,6 @@ Status SkipOp::operator()() {
std::unique_ptr<DataBuffer> curr_buffer; std::unique_ptr<DataBuffer> curr_buffer;
RETURN_IF_NOT_OK(GetNextInput(&curr_buffer)); RETURN_IF_NOT_OK(GetNextInput(&curr_buffer));


// After the first buffer fetch above we can do the one-time assign of the column name map
RETURN_IF_NOT_OK(DatasetOp::AssignColMapFromChild());

while (curr_buffer->eof() == false) { while (curr_buffer->eof() == false) {
// Reset count // Reset count
skip_count_ = 0; skip_count_ = 0;


+ 12
- 5
mindspore/ccsrc/dataset/engine/datasetops/source/celeba_op.cc View File

@@ -79,11 +79,6 @@ CelebAOp::CelebAOp(int32_t num_workers, int32_t rows_per_buffer, const std::stri
sampler_(std::move(sampler)), sampler_(std::move(sampler)),
num_rows_in_attr_file_(0), num_rows_in_attr_file_(0),
dataset_type_(dataset_type) { dataset_type_(dataset_type) {
// Set the column name map (base class field)
for (int32_t index = 0; index < data_schema_->NumColumns(); index++) {
column_name_id_map_[data_schema_->column(index).name()] = index;
}

attr_info_queue_ = std::make_unique<Queue<std::vector<std::string>>>(queue_size); attr_info_queue_ = std::make_unique<Queue<std::vector<std::string>>>(queue_size);
io_block_queues_.Init(num_workers_, queue_size); io_block_queues_.Init(num_workers_, queue_size);
} }
@@ -413,5 +408,17 @@ Status CelebAOp::Reset() {
wp_.Set(); // wake up master thread after reset is done wp_.Set(); // wake up master thread after reset is done
return Status::OK(); return Status::OK();
} }

Status CelebAOp::ComputeColMap() {
// Set the column name map (base class field)
if (column_name_id_map_.empty()) {
for (int32_t index = 0; index < data_schema_->NumColumns(); index++) {
column_name_id_map_[data_schema_->column(index).name()] = index;
}
} else {
MS_LOG(WARNING) << "Column name map is already set!";
}
return Status::OK();
}
} // namespace dataset } // namespace dataset
} // namespace mindspore } // namespace mindspore

+ 4
- 0
mindspore/ccsrc/dataset/engine/datasetops/source/celeba_op.h View File

@@ -212,6 +212,10 @@ class CelebAOp : public ParallelOp, RandomAccessOp {
// @return Status - The error code return // @return Status - The error code return
Status Reset() override; Status Reset() override;


// Private function for computing the assignment of the column name map.
// @return - Status
Status ComputeColMap() override;

int32_t rows_per_buffer_; int32_t rows_per_buffer_;
std::string folder_path_; // directory of celeba folder std::string folder_path_; // directory of celeba folder
bool decode_; bool decode_;


+ 12
- 4
mindspore/ccsrc/dataset/engine/datasetops/source/cifar_op.cc View File

@@ -87,10 +87,6 @@ CifarOp::CifarOp(CifarType type, int32_t num_works, int32_t rows_per_buf, const
sampler_(std::move(sampler)), sampler_(std::move(sampler)),
row_cnt_(0), row_cnt_(0),
buf_cnt_(0) { buf_cnt_(0) {
// set the column name map (base class field)
for (uint32_t i = 0; i < data_schema_->NumColumns(); ++i) {
column_name_id_map_[data_schema_->column(i).name()] = i;
}
constexpr uint64_t kUtilQueueSize = 512; constexpr uint64_t kUtilQueueSize = 512;
cifar_raw_data_block_ = std::make_unique<Queue<std::vector<unsigned char>>>(kUtilQueueSize); cifar_raw_data_block_ = std::make_unique<Queue<std::vector<unsigned char>>>(kUtilQueueSize);
io_block_queues_.Init(num_workers_, queue_size); io_block_queues_.Init(num_workers_, queue_size);
@@ -454,5 +450,17 @@ Status CifarOp::CountTotalRows(const std::string &dir, bool isCIFAR10, int64_t *
return Status::OK(); return Status::OK();
} }
} }

Status CifarOp::ComputeColMap() {
// set the column name map (base class field)
if (column_name_id_map_.empty()) {
for (uint32_t i = 0; i < data_schema_->NumColumns(); ++i) {
column_name_id_map_[data_schema_->column(i).name()] = i;
}
} else {
MS_LOG(WARNING) << "Column name map is already set!";
}
return Status::OK();
}
} // namespace dataset } // namespace dataset
} // namespace mindspore } // namespace mindspore

+ 4
- 0
mindspore/ccsrc/dataset/engine/datasetops/source/cifar_op.h View File

@@ -208,6 +208,10 @@ class CifarOp : public ParallelOp, public RandomAccessOp {
// @return Status - The error code return // @return Status - The error code return
Status GetClassIds(std::map<int32_t, std::vector<int64_t>> *cls_ids) const override; Status GetClassIds(std::map<int32_t, std::vector<int64_t>> *cls_ids) const override;


// Private function for computing the assignment of the column name map.
// @return - Status
Status ComputeColMap() override;

CifarType cifar_type_; CifarType cifar_type_;
int32_t rows_per_buffer_; int32_t rows_per_buffer_;
std::string folder_path_; std::string folder_path_;


+ 14
- 7
mindspore/ccsrc/dataset/engine/datasetops/source/clue_op.cc View File

@@ -112,13 +112,6 @@ Status ClueOp::Init() {
int32_t safe_queue_size = static_cast<int32_t>(std::ceil(clue_files_list_.size() / num_workers_) + 1); int32_t safe_queue_size = static_cast<int32_t>(std::ceil(clue_files_list_.size() / num_workers_) + 1);
io_block_queues_.Init(num_workers_, safe_queue_size); io_block_queues_.Init(num_workers_, safe_queue_size);


// Set the column name mapping (base class field)
int count = 0;
for (auto &p : cols_to_keyword_) {
column_name_id_map_[p.first] = count;
count++;
}

RETURN_IF_NOT_OK(ParallelOp::CreateWorkerConnector(worker_connector_size_)); RETURN_IF_NOT_OK(ParallelOp::CreateWorkerConnector(worker_connector_size_));
jagged_buffer_connector_ = std::make_unique<JaggedConnector>(num_workers_, 1, worker_connector_size_); jagged_buffer_connector_ = std::make_unique<JaggedConnector>(num_workers_, 1, worker_connector_size_);


@@ -549,5 +542,19 @@ Status ClueOp::CountAllFileRows(const std::vector<std::string> &files, int64_t *
} }
return Status::OK(); return Status::OK();
} }

Status ClueOp::ComputeColMap() {
// Set the column name mapping (base class field)
if (column_name_id_map_.empty()) {
int count = 0;
for (auto &p : cols_to_keyword_) {
column_name_id_map_[p.first] = count;
count++;
}
} else {
MS_LOG(WARNING) << "Column name map is already set!";
}
return Status::OK();
}
} // namespace dataset } // namespace dataset
} // namespace mindspore } // namespace mindspore

+ 4
- 0
mindspore/ccsrc/dataset/engine/datasetops/source/clue_op.h View File

@@ -263,6 +263,10 @@ class ClueOp : public ParallelOp {
// @return Status - the error code returned. // @return Status - the error code returned.
Status GetValue(const nlohmann::json &js, std::vector<std::string> key_chain, std::shared_ptr<Tensor> *t); Status GetValue(const nlohmann::json &js, std::vector<std::string> key_chain, std::shared_ptr<Tensor> *t);


// Private function for computing the assignment of the column name map.
// @return - Status
Status ComputeColMap() override;

int32_t device_id_; int32_t device_id_;
bool shuffle_files_; bool shuffle_files_;
bool shuffle_global_; bool shuffle_global_;


+ 12
- 4
mindspore/ccsrc/dataset/engine/datasetops/source/coco_op.cc View File

@@ -129,10 +129,6 @@ CocoOp::CocoOp(const TaskType &task_type, const std::string &image_folder_path,
rows_per_buffer_(rows_per_buffer), rows_per_buffer_(rows_per_buffer),
sampler_(std::move(sampler)), sampler_(std::move(sampler)),
data_schema_(std::move(data_schema)) { data_schema_(std::move(data_schema)) {
// Set the column name map (base class field)
for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) {
column_name_id_map_[data_schema_->column(i).name()] = i;
}
io_block_queues_.Init(num_workers_, queue_size); io_block_queues_.Init(num_workers_, queue_size);
} }


@@ -627,5 +623,17 @@ Status CocoOp::GetClassIndexing(const std::string &dir, const std::string &file,
*output_class_indexing = op->label_index_; *output_class_indexing = op->label_index_;
return Status::OK(); return Status::OK();
} }

Status CocoOp::ComputeColMap() {
// Set the column name map (base class field)
if (column_name_id_map_.empty()) {
for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) {
column_name_id_map_[data_schema_->column(i).name()] = i;
}
} else {
MS_LOG(WARNING) << "Column name map is already set!";
}
return Status::OK();
}
} // namespace dataset } // namespace dataset
} // namespace mindspore } // namespace mindspore

+ 4
- 0
mindspore/ccsrc/dataset/engine/datasetops/source/coco_op.h View File

@@ -306,6 +306,10 @@ class CocoOp : public ParallelOp, public RandomAccessOp {
template <typename T> template <typename T>
Status SearchNodeInJson(nlohmann::json input_tree, std::string node_name, T *output_node); Status SearchNodeInJson(nlohmann::json input_tree, std::string node_name, T *output_node);


// Private function for computing the assignment of the column name map.
// @return - Status
Status ComputeColMap() override;

bool decode_; bool decode_;
int64_t row_cnt_; int64_t row_cnt_;
int64_t buf_cnt_; int64_t buf_cnt_;


+ 12
- 6
mindspore/ccsrc/dataset/engine/datasetops/source/generator_op.cc View File

@@ -94,12 +94,6 @@ void GeneratorOp::Dealloc() noexcept {
Status GeneratorOp::Init() { Status GeneratorOp::Init() {
// Reset BufferID // Reset BufferID
buffer_id_ = 0; buffer_id_ = 0;
// Setup column names map (base class field)
if (column_name_id_map_.empty()) {
for (int i = 0; i < column_names_.size(); ++i) {
column_name_id_map_[column_names_[i]] = i;
}
}
Status ret; Status ret;
{ {
// Acquire Python GIL // Acquire Python GIL
@@ -257,5 +251,17 @@ Status GeneratorOp::Accept(NodePass *p, bool *modified) {
// Downcast shared pointer then call visitor // Downcast shared pointer then call visitor
return p->RunOnNode(std::static_pointer_cast<GeneratorOp>(shared_from_this()), modified); return p->RunOnNode(std::static_pointer_cast<GeneratorOp>(shared_from_this()), modified);
} }

Status GeneratorOp::ComputeColMap() {
// Setup column names map (base class field)
if (column_name_id_map_.empty()) {
for (int i = 0; i < column_names_.size(); ++i) {
column_name_id_map_[column_names_[i]] = i;
}
} else {
MS_LOG(WARNING) << "Column name map is already set!";
}
return Status::OK();
}
} // namespace dataset } // namespace dataset
} // namespace mindspore } // namespace mindspore

+ 4
- 0
mindspore/ccsrc/dataset/engine/datasetops/source/generator_op.h View File

@@ -150,6 +150,10 @@ class GeneratorOp : public PipelineOp {
Status PyRowToTensorRow(py::object py_data, TensorRow *tensor_row); Status PyRowToTensorRow(py::object py_data, TensorRow *tensor_row);


Status FillBuffer(TensorQTable *tt); Status FillBuffer(TensorQTable *tt);

// Private function for computing the assignment of the column name map.
// @return - Status
Status ComputeColMap() override;
}; };


#pragma GCC visibility pop #pragma GCC visibility pop


+ 12
- 4
mindspore/ccsrc/dataset/engine/datasetops/source/image_folder_op.cc View File

@@ -78,10 +78,6 @@ ImageFolderOp::ImageFolderOp(int32_t num_wkrs, int32_t rows_per_buffer, std::str
buf_cnt_(0), buf_cnt_(0),
sampler_ind_(0), sampler_ind_(0),
dirname_offset_(0) { dirname_offset_(0) {
// Set the column name map (base class field)
for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) {
column_name_id_map_[data_schema_->column(i).name()] = i;
}
folder_name_queue_ = std::make_unique<Queue<std::string>>(num_wkrs * queue_size); folder_name_queue_ = std::make_unique<Queue<std::string>>(num_wkrs * queue_size);
image_name_queue_ = std::make_unique<Queue<FolderImagesPair>>(num_wkrs * queue_size); image_name_queue_ = std::make_unique<Queue<FolderImagesPair>>(num_wkrs * queue_size);
io_block_queues_.Init(num_workers_, queue_size); io_block_queues_.Init(num_workers_, queue_size);
@@ -418,5 +414,17 @@ Status ImageFolderOp::Accept(NodePass *p, bool *modified) {
// Downcast shared pointer then call visitor // Downcast shared pointer then call visitor
return p->RunOnNode(std::static_pointer_cast<ImageFolderOp>(shared_from_this()), modified); return p->RunOnNode(std::static_pointer_cast<ImageFolderOp>(shared_from_this()), modified);
} }

Status ImageFolderOp::ComputeColMap() {
// Set the column name map (base class field)
if (column_name_id_map_.empty()) {
for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) {
column_name_id_map_[data_schema_->column(i).name()] = i;
}
} else {
MS_LOG(WARNING) << "Column name map is already set!";
}
return Status::OK();
}
} // namespace dataset } // namespace dataset
} // namespace mindspore } // namespace mindspore

+ 4
- 0
mindspore/ccsrc/dataset/engine/datasetops/source/image_folder_op.h View File

@@ -248,6 +248,10 @@ class ImageFolderOp : public ParallelOp, public RandomAccessOp {
// @return Status - The error code return // @return Status - The error code return
Status Reset() override; Status Reset() override;


// Private function for computing the assignment of the column name map.
// @return - Status
Status ComputeColMap() override;

int32_t rows_per_buffer_; int32_t rows_per_buffer_;
std::string folder_path_; // directory of image folder std::string folder_path_; // directory of image folder
bool recursive_; bool recursive_;


+ 12
- 4
mindspore/ccsrc/dataset/engine/datasetops/source/manifest_op.cc View File

@@ -76,10 +76,6 @@ ManifestOp::ManifestOp(int32_t num_works, int32_t rows_per_buffer, std::string f
decode_(decode), decode_(decode),
usage_(usage), usage_(usage),
buf_cnt_(0) { buf_cnt_(0) {
// Set the column name map (base class field)
for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) {
column_name_id_map_[data_schema_->column(i).name()] = i;
}
io_block_queues_.Init(num_workers_, queue_size); io_block_queues_.Init(num_workers_, queue_size);
(void)std::transform(usage_.begin(), usage_.end(), usage_.begin(), ::tolower); (void)std::transform(usage_.begin(), usage_.end(), usage_.begin(), ::tolower);
} }
@@ -420,5 +416,17 @@ Status ManifestOp::GetClassIndexing(const std::string &file, const py::dict &dic


return Status::OK(); return Status::OK();
} }

Status ManifestOp::ComputeColMap() {
// Set the column name map (base class field)
if (column_name_id_map_.empty()) {
for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) {
column_name_id_map_[data_schema_->column(i).name()] = i;
}
} else {
MS_LOG(WARNING) << "Column name map is already set!";
}
return Status::OK();
}
} // namespace dataset } // namespace dataset
} // namespace mindspore } // namespace mindspore

+ 4
- 0
mindspore/ccsrc/dataset/engine/datasetops/source/manifest_op.h View File

@@ -219,6 +219,10 @@ class ManifestOp : public ParallelOp, public RandomAccessOp {
// @return Status - The error code return // @return Status - The error code return
Status CountDatasetInfo(); Status CountDatasetInfo();


// Private function for computing the assignment of the column name map.
// @return - Status
Status ComputeColMap() override;

int32_t rows_per_buffer_; int32_t rows_per_buffer_;
int64_t io_block_pushed_; int64_t io_block_pushed_;
int64_t row_cnt_; int64_t row_cnt_;


+ 11
- 4
mindspore/ccsrc/dataset/engine/datasetops/source/mindrecord_op.cc View File

@@ -196,10 +196,6 @@ Status MindRecordOp::Init() {
data_schema_ = std::move(tmp_schema); data_schema_ = std::move(tmp_schema);
} }


for (int i = 0; i < static_cast<int>(columns_to_load_.size()); i++) {
column_name_id_map_[columns_to_load_[i]] = i;
}

return Status::OK(); return Status::OK();
} }


@@ -502,5 +498,16 @@ Status MindRecordOp::Accept(NodePass *p, bool *modified) {
// Downcast shared pointer then call visitor // Downcast shared pointer then call visitor
return p->RunOnNode(std::static_pointer_cast<MindRecordOp>(shared_from_this()), modified); return p->RunOnNode(std::static_pointer_cast<MindRecordOp>(shared_from_this()), modified);
} }

Status MindRecordOp::ComputeColMap() {
if (column_name_id_map_.empty()) {
for (int i = 0; i < static_cast<int>(columns_to_load_.size()); i++) {
column_name_id_map_[columns_to_load_[i]] = i;
}
} else {
MS_LOG(WARNING) << "Column name map is already set!";
}
return Status::OK();
}
} // namespace dataset } // namespace dataset
} // namespace mindspore } // namespace mindspore

+ 4
- 0
mindspore/ccsrc/dataset/engine/datasetops/source/mindrecord_op.h View File

@@ -234,6 +234,10 @@ class MindRecordOp : public ParallelOp {


Status FetchBlockBuffer(const int32_t &buffer_id); Status FetchBlockBuffer(const int32_t &buffer_id);


// Private function for computing the assignment of the column name map.
// @return - Status
Status ComputeColMap() override;

int32_t rows_per_buffer_; // The number of requested rows per buffer. int32_t rows_per_buffer_; // The number of requested rows per buffer.
std::vector<std::string> dataset_file_; // dataset files std::vector<std::string> dataset_file_; // dataset files
bool load_dataset_; // load dataset from single file or not bool load_dataset_; // load dataset from single file or not


+ 12
- 4
mindspore/ccsrc/dataset/engine/datasetops/source/mnist_op.cc View File

@@ -73,10 +73,6 @@ MnistOp::MnistOp(int32_t num_workers, int32_t rows_per_buffer, std::string folde
rows_per_buffer_(rows_per_buffer), rows_per_buffer_(rows_per_buffer),
sampler_(std::move(sampler)), sampler_(std::move(sampler)),
data_schema_(std::move(data_schema)) { data_schema_(std::move(data_schema)) {
// set the column name map (base class field)
for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) {
column_name_id_map_[data_schema_->column(i).name()] = i;
}
io_block_queues_.Init(num_workers, queue_size); io_block_queues_.Init(num_workers, queue_size);
} }


@@ -432,5 +428,17 @@ Status MnistOp::CountTotalRows(const std::string &dir, int64_t *count) {


return Status::OK(); return Status::OK();
} }

Status MnistOp::ComputeColMap() {
// set the column name map (base class field)
if (column_name_id_map_.empty()) {
for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) {
column_name_id_map_[data_schema_->column(i).name()] = i;
}
} else {
MS_LOG(WARNING) << "Column name map is already set!";
}
return Status::OK();
}
} // namespace dataset } // namespace dataset
} // namespace mindspore } // namespace mindspore

+ 4
- 0
mindspore/ccsrc/dataset/engine/datasetops/source/mnist_op.h View File

@@ -226,6 +226,10 @@ class MnistOp : public ParallelOp, public RandomAccessOp {
// @return Status - The error code return // @return Status - The error code return
Status Reset() override; Status Reset() override;


// Private function for computing the assignment of the column name map.
// @return - Status
Status ComputeColMap() override;

int64_t buf_cnt_; int64_t buf_cnt_;
int64_t row_cnt_; int64_t row_cnt_;
WaitPost wp_; WaitPost wp_;


+ 10
- 3
mindspore/ccsrc/dataset/engine/datasetops/source/random_data_op.cc View File

@@ -53,9 +53,6 @@ Status RandomDataOp::Builder::Build(std::shared_ptr<RandomDataOp> *out_op) {
RETURN_IF_NOT_OK((*out_op)->GenerateSchema()); RETURN_IF_NOT_OK((*out_op)->GenerateSchema());
} }


// Extract the column name mapping from the schema and save it in the class.
RETURN_IF_NOT_OK((*out_op)->data_schema_->GetColumnNameMap(&((*out_op)->column_name_id_map_)));

return Status::OK(); return Status::OK();
} }


@@ -405,5 +402,15 @@ Status RandomDataOp::Reset() {


return Status::OK(); return Status::OK();
} }

Status RandomDataOp::ComputeColMap() {
// Extract the column name mapping from the schema and save it in the class.
if (column_name_id_map_.empty()) {
RETURN_IF_NOT_OK(data_schema_->GetColumnNameMap(&(column_name_id_map_)));
} else {
MS_LOG(WARNING) << "Column name map is already set!";
}
return Status::OK();
}
} // namespace dataset } // namespace dataset
} // namespace mindspore } // namespace mindspore

+ 4
- 0
mindspore/ccsrc/dataset/engine/datasetops/source/random_data_op.h View File

@@ -250,6 +250,10 @@ class RandomDataOp : public ParallelOp {
return ++buffer_id_; return ++buffer_id_;
} }


// Private function for computing the assignment of the column name map.
// @return - Status
Status ComputeColMap() override;

int32_t buffer_id_; int32_t buffer_id_;
int64_t rows_per_buffer_; int64_t rows_per_buffer_;
int64_t total_rows_; int64_t total_rows_;


+ 12
- 5
mindspore/ccsrc/dataset/engine/datasetops/source/text_file_op.cc View File

@@ -127,11 +127,6 @@ Status TextFileOp::Init() {
int32_t safe_queue_size = static_cast<int32_t>(std::ceil(text_files_list_.size() / num_workers_) + 1); int32_t safe_queue_size = static_cast<int32_t>(std::ceil(text_files_list_.size() / num_workers_) + 1);
io_block_queues_.Init(num_workers_, safe_queue_size); io_block_queues_.Init(num_workers_, safe_queue_size);


// Set the column name mapping (base class field)
for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) {
column_name_id_map_[data_schema_->column(i).name()] = i;
}

RETURN_IF_NOT_OK(ParallelOp::CreateWorkerConnector(worker_connector_size_)); RETURN_IF_NOT_OK(ParallelOp::CreateWorkerConnector(worker_connector_size_));


jagged_buffer_connector_ = std::make_unique<JaggedConnector>(num_workers_, 1, worker_connector_size_); jagged_buffer_connector_ = std::make_unique<JaggedConnector>(num_workers_, 1, worker_connector_size_);
@@ -488,5 +483,17 @@ Status TextFileOp::CountAllFileRows(const std::vector<std::string> &files, int64
} }
return Status::OK(); return Status::OK();
} }

Status TextFileOp::ComputeColMap() {
// Set the column name mapping (base class field)
if (column_name_id_map_.empty()) {
for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) {
column_name_id_map_[data_schema_->column(i).name()] = i;
}
} else {
MS_LOG(WARNING) << "Column name map is already set!";
}
return Status::OK();
}
} // namespace dataset } // namespace dataset
} // namespace mindspore } // namespace mindspore

+ 4
- 0
mindspore/ccsrc/dataset/engine/datasetops/source/text_file_op.h View File

@@ -264,6 +264,10 @@ class TextFileOp : public ParallelOp {
// @return Status - the error code returned. // @return Status - the error code returned.
Status PostEndOfEpoch(int32_t queue_index); Status PostEndOfEpoch(int32_t queue_index);


// Private function for computing the assignment of the column name map.
// @return - Status
Status ComputeColMap() override;

int32_t device_id_; int32_t device_id_;
int32_t num_devices_; int32_t num_devices_;
int64_t rows_per_buffer_; int64_t rows_per_buffer_;


+ 12
- 5
mindspore/ccsrc/dataset/engine/datasetops/source/tf_reader_op.cc View File

@@ -195,11 +195,6 @@ Status TFReaderOp::Init() {
RETURN_IF_NOT_OK(CreateSchema(dataset_files_list_[0], columns_to_load_)); RETURN_IF_NOT_OK(CreateSchema(dataset_files_list_[0], columns_to_load_));
} }


// Construct the column name map for this operator (base class field)
for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) {
column_name_id_map_[data_schema_->column(i).name()] = i;
}

if (total_rows_ == 0) { if (total_rows_ == 0) {
total_rows_ = data_schema_->num_rows(); total_rows_ = data_schema_->num_rows();
} }
@@ -1015,5 +1010,17 @@ Status TFReaderOp::Accept(NodePass *p, bool *modified) {
// Downcast shared pointer then call visitor // Downcast shared pointer then call visitor
return p->RunOnNode(std::static_pointer_cast<TFReaderOp>(shared_from_this()), modified); return p->RunOnNode(std::static_pointer_cast<TFReaderOp>(shared_from_this()), modified);
} }

Status TFReaderOp::ComputeColMap() {
// Construct the column name map for this operator (base class field)
if (column_name_id_map_.empty()) {
for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) {
column_name_id_map_[data_schema_->column(i).name()] = i;
}
} else {
MS_LOG(WARNING) << "Column name map is already set!";
}
return Status::OK();
}
} // namespace dataset } // namespace dataset
} // namespace mindspore } // namespace mindspore

+ 4
- 0
mindspore/ccsrc/dataset/engine/datasetops/source/tf_reader_op.h View File

@@ -381,6 +381,10 @@ class TFReaderOp : public ParallelOp {
// @return Status - the error code returned. // @return Status - the error code returned.
Status CalculateNumRowsPerShard(); Status CalculateNumRowsPerShard();


// Private function for computing the assignment of the column name map.
// @return - Status
Status ComputeColMap() override;

int32_t device_id_; int32_t device_id_;
int32_t num_devices_; int32_t num_devices_;
int64_t rows_per_buffer_; int64_t rows_per_buffer_;


+ 12
- 4
mindspore/ccsrc/dataset/engine/datasetops/source/voc_op.cc View File

@@ -99,10 +99,6 @@ VOCOp::VOCOp(const TaskType &task_type, const std::string &task_mode, const std:
rows_per_buffer_(rows_per_buffer), rows_per_buffer_(rows_per_buffer),
sampler_(std::move(sampler)), sampler_(std::move(sampler)),
data_schema_(std::move(data_schema)) { data_schema_(std::move(data_schema)) {
// Set the column name map (base class field)
for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) {
column_name_id_map_[data_schema_->column(i).name()] = i;
}
io_block_queues_.Init(num_workers_, queue_size); io_block_queues_.Init(num_workers_, queue_size);
} }


@@ -454,5 +450,17 @@ Status VOCOp::GetClassIndexing(const std::string &dir, const std::string &task_t


return Status::OK(); return Status::OK();
} }

Status VOCOp::ComputeColMap() {
// Set the column name map (base class field)
if (column_name_id_map_.empty()) {
for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) {
column_name_id_map_[data_schema_->column(i).name()] = i;
}
} else {
MS_LOG(WARNING) << "Column name map is already set!";
}
return Status::OK();
}
} // namespace dataset } // namespace dataset
} // namespace mindspore } // namespace mindspore

+ 4
- 0
mindspore/ccsrc/dataset/engine/datasetops/source/voc_op.h View File

@@ -263,6 +263,10 @@ class VOCOp : public ParallelOp, public RandomAccessOp {
// @return Status - The error code return // @return Status - The error code return
Status Reset() override; Status Reset() override;


// Private function for computing the assignment of the column name map.
// @return - Status
Status ComputeColMap() override;

bool decode_; bool decode_;
int64_t row_cnt_; int64_t row_cnt_;
int64_t buf_cnt_; int64_t buf_cnt_;


+ 0
- 1
mindspore/ccsrc/dataset/engine/datasetops/take_op.cc View File

@@ -73,7 +73,6 @@ Status TakeOp::operator()() {
TaskManager::FindMe()->Post(); TaskManager::FindMe()->Post();
std::unique_ptr<DataBuffer> buf; std::unique_ptr<DataBuffer> buf;
RETURN_IF_NOT_OK(child_[0]->GetNextBuffer(&buf)); RETURN_IF_NOT_OK(child_[0]->GetNextBuffer(&buf));
RETURN_IF_NOT_OK(DatasetOp::AssignColMapFromChild());


while (buf->eof() == false) { while (buf->eof() == false) {
if (take_count_ == max_takes_) { if (take_count_ == max_takes_) {


+ 25
- 18
mindspore/ccsrc/dataset/engine/datasetops/zip_op.cc View File

@@ -139,24 +139,6 @@ Status ZipOp::prepare(TensorQTable *const table) {
// Pack this first row into our tensor table // Pack this first row into our tensor table
table->push_back(std::move(new_row)); table->push_back(std::move(new_row));


// At this point we have at least 1 row produced, so all child iterators have their column names such that we
// can produce our column name map now.
column_name_id_map_ = {};
for (int32_t i = 0; i < children_num_; ++i) {
// Initializing col_name_id_map_ from the first data buffer.
const std::unordered_map<std::string, int32_t> col_name_id_map = child_iterators_[i]->GetColumnNameMap();
int32_t colsCurrent = column_name_id_map_.size();
// the update code below shouldn't do anything bad if the column name already exists.
for (const auto &pair : col_name_id_map) {
std::string name = pair.first;
int32_t old_id = pair.second;
// check if name already exists in column name descriptor
if (column_name_id_map_.count(name) == 1) {
RETURN_STATUS_UNEXPECTED("key already exists when zipping datasets");
}
column_name_id_map_[name] = old_id + colsCurrent;
}
}
return Status::OK(); return Status::OK();
} }


@@ -257,5 +239,30 @@ Status ZipOp::Accept(NodePass *p, bool *modified) {
// Downcast shared pointer then call visitor // Downcast shared pointer then call visitor
return p->RunOnNode(std::static_pointer_cast<ZipOp>(shared_from_this()), modified); return p->RunOnNode(std::static_pointer_cast<ZipOp>(shared_from_this()), modified);
} }

Status ZipOp::ComputeColMap() {
if (column_name_id_map_.empty()) {
column_name_id_map_ = {};
for (int32_t i = 0; i < child_.size(); ++i) {
// Initializing col_name_id_map from the child.
const std::unordered_map<std::string, int32_t> col_name_id_map = child_[i]->column_name_id_map();
int32_t colsCurrent = column_name_id_map_.size();
// the update code below shouldn't do anything bad if the column name already exists.
for (const auto &pair : col_name_id_map) {
std::string name = pair.first;
int32_t old_id = pair.second;
// check if name already exists in column name descriptor
if (column_name_id_map_.count(name) == 1) {
RETURN_STATUS_UNEXPECTED("key already exists when zipping datasets");
}
column_name_id_map_[name] = old_id + colsCurrent;
}
}
MS_LOG(DEBUG) << "Setting column map:\n" << this->ColumnNameMapAsString();
} else {
MS_LOG(WARNING) << "Column name map is already set!";
}
return Status::OK();
}
} // namespace dataset } // namespace dataset
} // namespace mindspore } // namespace mindspore

+ 4
- 0
mindspore/ccsrc/dataset/engine/datasetops/zip_op.h View File

@@ -141,6 +141,10 @@ class ZipOp : public PipelineOp {
// 1, a, T // 1, a, T
Status getNextTensorRow(TensorRow *const new_zip_row); Status getNextTensorRow(TensorRow *const new_zip_row);


// Computing the assignment of the column name map.
// @return - Status
Status ComputeColMap() override;

int32_t children_num_; int32_t children_num_;
int32_t rows_per_buffer_; int32_t rows_per_buffer_;
int32_t buffer_id_; int32_t buffer_id_;


Loading…
Cancel
Save