Browse Source

!14929 Cleanup stale comments about DataBuffers

From: @hfarahat
Reviewed-by: @pandoublefeng,@robingrosman
Signed-off-by: @pandoublefeng
pull/14929/MERGE
mindspore-ci-bot Gitee 5 years ago
parent
commit
1d89109d19
71 changed files with 215 additions and 250 deletions
  1. +2
    -2
      mindspore/ccsrc/minddata/dataset/api/iterator.cc
  2. +1
    -1
      mindspore/ccsrc/minddata/dataset/core/config_manager.h
  3. +4
    -4
      mindspore/ccsrc/minddata/dataset/core/tensor_row.h
  4. +2
    -2
      mindspore/ccsrc/minddata/dataset/engine/connector.h
  5. +3
    -5
      mindspore/ccsrc/minddata/dataset/engine/dataset_iterator.cc
  6. +1
    -2
      mindspore/ccsrc/minddata/dataset/engine/datasetops/barrier_op.cc
  7. +4
    -11
      mindspore/ccsrc/minddata/dataset/engine/datasetops/barrier_op.h
  8. +2
    -2
      mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.cc
  9. +1
    -1
      mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.h
  10. +1
    -2
      mindspore/ccsrc/minddata/dataset/engine/datasetops/cache_base_op.cc
  11. +3
    -3
      mindspore/ccsrc/minddata/dataset/engine/datasetops/concat_op.cc
  12. +1
    -1
      mindspore/ccsrc/minddata/dataset/engine/datasetops/dataset_op.cc
  13. +5
    -5
      mindspore/ccsrc/minddata/dataset/engine/datasetops/dataset_op.h
  14. +2
    -2
      mindspore/ccsrc/minddata/dataset/engine/datasetops/epoch_ctrl_op.cc
  15. +4
    -4
      mindspore/ccsrc/minddata/dataset/engine/datasetops/epoch_ctrl_op.h
  16. +3
    -3
      mindspore/ccsrc/minddata/dataset/engine/datasetops/filter_op.cc
  17. +2
    -2
      mindspore/ccsrc/minddata/dataset/engine/datasetops/filter_op.h
  18. +18
    -20
      mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/map_op.cc
  19. +4
    -5
      mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/map_op.h
  20. +1
    -1
      mindspore/ccsrc/minddata/dataset/engine/datasetops/project_op.cc
  21. +2
    -2
      mindspore/ccsrc/minddata/dataset/engine/datasetops/project_op.h
  22. +2
    -2
      mindspore/ccsrc/minddata/dataset/engine/datasetops/rename_op.cc
  23. +5
    -5
      mindspore/ccsrc/minddata/dataset/engine/datasetops/repeat_op.cc
  24. +6
    -6
      mindspore/ccsrc/minddata/dataset/engine/datasetops/repeat_op.h
  25. +2
    -2
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.cc
  26. +1
    -1
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/cifar_op.cc
  27. +2
    -2
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/clue_op.cc
  28. +1
    -1
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/clue_op.h
  29. +5
    -5
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/csv_op.cc
  30. +2
    -2
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/csv_op.h
  31. +1
    -5
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/generator_op.cc
  32. +0
    -3
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/generator_op.h
  33. +2
    -2
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/image_folder_op.cc
  34. +1
    -1
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/image_folder_op.h
  35. +1
    -1
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/manifest_op.cc
  36. +5
    -6
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mindrecord_op.cc
  37. +1
    -1
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mindrecord_op.h
  38. +4
    -4
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/nonmappable_leaf_op.cc
  39. +2
    -2
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/nonmappable_leaf_op.h
  40. +5
    -5
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/distributed_sampler.cc
  41. +2
    -2
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/distributed_sampler.h
  42. +2
    -2
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/pk_sampler.cc
  43. +3
    -3
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/pk_sampler.h
  44. +2
    -2
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/python_sampler.cc
  45. +5
    -5
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/python_sampler.h
  46. +2
    -2
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/random_sampler.cc
  47. +4
    -4
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/random_sampler.h
  48. +4
    -4
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/sampler.cc
  49. +3
    -3
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/sampler.h
  50. +5
    -5
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/sequential_sampler.cc
  51. +4
    -4
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/sequential_sampler.h
  52. +2
    -2
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/subset_random_sampler.cc
  53. +3
    -3
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/subset_random_sampler.h
  54. +2
    -3
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/subset_sampler.cc
  55. +5
    -8
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/subset_sampler.h
  56. +3
    -8
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/weighted_random_sampler.cc
  57. +5
    -8
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/weighted_random_sampler.h
  58. +2
    -2
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/text_file_op.cc
  59. +1
    -1
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/text_file_op.h
  60. +3
    -3
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/tf_reader_op.cc
  61. +1
    -1
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/tf_reader_op.h
  62. +1
    -1
      mindspore/ccsrc/minddata/dataset/engine/datasetops/take_op.cc
  63. +1
    -1
      mindspore/ccsrc/minddata/dataset/engine/datasetops/zip_op.cc
  64. +0
    -1
      mindspore/ccsrc/minddata/dataset/engine/datasetops/zip_op.h
  65. +5
    -5
      mindspore/ccsrc/minddata/dataset/engine/db_connector.h
  66. +6
    -6
      mindspore/ccsrc/minddata/dataset/engine/perf/connector_throughput.cc
  67. +5
    -5
      mindspore/ccsrc/minddata/dataset/engine/perf/connector_throughput.h
  68. +2
    -2
      tests/ut/cpp/dataset/image_folder_op_test.cc
  69. +4
    -4
      tests/ut/cpp/dataset/subset_random_sampler_test.cc
  70. +4
    -4
      tests/ut/cpp/dataset/subset_sampler_test.cc
  71. +10
    -10
      tests/ut/cpp/dataset/weighted_random_sampler_test.cc

+ 2
- 2
mindspore/ccsrc/minddata/dataset/api/iterator.cc View File

@@ -28,7 +28,7 @@ Iterator::~Iterator() { Stop(); }

// Get the next row from the data pipeline.
Status Iterator::GetNextRowCharIF(MSTensorMapChar *row) {
// Clean data buffer
// Clean data row
row->clear();
std::unordered_map<std::string, std::shared_ptr<dataset::Tensor>> md_map;
Status rc = consumer_->GetNextAsMap(&md_map);
@@ -47,7 +47,7 @@ Status Iterator::GetNextRowCharIF(MSTensorMapChar *row) {

// Get the next row from the data pipeline.
Status Iterator::GetNextRow(MSTensorVec *row) {
// Clean data buffer
// Clean data row
row->clear();
// create a dataset tensor row and fetch. Then we convert the output to MSTensor
std::vector<std::shared_ptr<dataset::Tensor>> md_row;


+ 1
- 1
mindspore/ccsrc/minddata/dataset/core/config_manager.h View File

@@ -30,7 +30,7 @@
// Config settings for the client-side
// example config file:
// {
// "rowsPerBuffer": 3
// "numParallelWorkers": 3
// }
//



+ 4
- 4
mindspore/ccsrc/minddata/dataset/core/tensor_row.h View File

@@ -37,10 +37,10 @@ class TensorRow {

enum TensorRowFlags : uint32_t {
kFlagNone = 0,
kFlagEOF = 1, // The buffer is an eof end-of-data msg
kFlagEOE = 1u << 1, // The buffer is an eoe end-of-epoch msg
kFlagWait = 1u << 2, // The buffer is an control signal for workers to suspend operations
kFlagQuit = 1u << 3 // The buffer is a control signal for workers to quit
kFlagEOF = 1, // The row is an eof end-of-data msg
kFlagEOE = 1u << 1, // The row is an eoe end-of-epoch msg
kFlagWait = 1u << 2, // The row is an control signal for workers to suspend operations
kFlagQuit = 1u << 3 // The row is a control signal for workers to quit
};

// Type definitions


+ 2
- 2
mindspore/ccsrc/minddata/dataset/engine/connector.h View File

@@ -70,7 +70,7 @@ class Connector {
// any sync overhead.
// @param n_producers The number of threads producing data into this DbConnector.
// @param n_consumers The number of thread consuming data from this DbConnector.
// @param queue_capacity The number of element (DataBuffer) for each queue.
// @param queue_capacity The number of element for each queue.
Connector(int32_t n_producers, int32_t n_consumers, int32_t queue_capacity)
: num_producers_(n_producers), num_consumers_(n_consumers) {
MS_LOG(DEBUG) << "A connector is created with " << n_producers << " producers and " << n_consumers << " consumers.";
@@ -121,7 +121,7 @@ class Connector {
return (queues_[worker_id]->Add(el));
}

auto out_buffers_count() const { return out_buffers_count_.load(); }
auto out_rows_count() const { return out_buffers_count_.load(); }

// Add an element into the DbConnector without the overhead of synchronization.
// It may block when the internal queue is full.


+ 3
- 5
mindspore/ccsrc/minddata/dataset/engine/dataset_iterator.cc View File

@@ -98,19 +98,17 @@ Status DatasetIterator::FetchNextTensorRow(TensorRow *out_row) {
RETURN_STATUS_UNEXPECTED(err);
}

// Check if we need to get a new DataBuffer to iterate.

if (tracing_ != nullptr) {
cur_connector_size_ = root_->ConnectorSize();
cur_connector_capacity_ = root_->ConnectorCapacity();
}
RETURN_IF_NOT_OK(root_->GetNextRow(out_row));

// Since GetNextBuffer was used rather than GetNextInput(), it means we need to manually
// Since GetNextRow was used rather than GetNextInput(), it means we need to manually
// handle eoe and eof messages here.
//
// An eoe buffer means we have iterated an epoch.
// The next buffer in the pipeline might be an EOF or a databuffer for next epoch
// An eoe row means we have iterated an epoch.
// The next row in the pipeline might be an EOF or a TensorRow for next epoch
if (out_row->eoe()) {
MS_LOG(INFO) << "End of data iteration.";
if (isProfilingEnable) {


+ 1
- 2
mindspore/ccsrc/minddata/dataset/engine/datasetops/barrier_op.cc View File

@@ -76,7 +76,6 @@ Status BarrierOp::operator()() {
break;
}

// we have to output new buffer with possibly different buffer size, possibly one row
while (!clean_up_) {
// 2 Block
RETURN_IF_NOT_OK(blockCond());
@@ -131,7 +130,7 @@ Status BarrierOp::blockCond() {
return Status::OK();
}

// fetches next Barrier buffer row
// fetches next Barrier row
Status BarrierOp::getNextTensorRow(TensorRow *new_row) {
// iterate over all iterators and generate a row
RETURN_IF_NOT_OK((child_iterator_)->FetchNextTensorRow(new_row));


+ 4
- 11
mindspore/ccsrc/minddata/dataset/engine/datasetops/barrier_op.h View File

@@ -31,9 +31,7 @@ namespace dataset {
class ExecutionTree;

// BarrierOp class implements the Barrier operator. It will block sending of rows until a signal has
// been received. This signal is given from python layer. The current barrier design respects the
// rows per buffer design and will only output a buffer with rows once it has received rows per buffer
// signals from python.
// been received. This signal is given from python layer.

class BarrierOp : public PipelineOp {
public:
@@ -101,8 +99,7 @@ class BarrierOp : public PipelineOp {
// @param condition_name - the condition name associated with this operator
// @param condition_func - the blocking condition check per row
// The reason for this is having other values would complicate how the pipeline behaves with other operators
// One example of such case is having batch after barrier. Batch would be waiting for data and having
// rows per buffer in this case can result in hanging
// One example of such case is having batch after barrier.
BarrierOp(int32_t op_connector_size, const std::string &condition_name, py::function condition_func);

// Destructor
@@ -134,13 +131,9 @@ class BarrierOp : public PipelineOp {
Status operator()() override;

// Handles preprocessing of the main loop, used when starting new epoch
// @param table - a table of tensors to be moved into a buffer
// @param table - a table of tensors to be moved into a row
Status prepare();

// This function calls takes a table repeatedly adds rows to it.
// @param table - a table of tensors to be moved into a buffer
Status fillBuffer(TensorQTable *const table);

// Gets next tensor row and sets control signals
Status getNextTensorRow(TensorRow *new_row);

@@ -148,7 +141,7 @@ class BarrierOp : public PipelineOp {
Status blockCond();

private:
// clean up variable to return incomplete buffer
// clean up variable
bool clean_up_;
// end of file state, we stop reading data and shut down
bool eof_;


+ 2
- 2
mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.cc View File

@@ -124,7 +124,7 @@ Status BatchOp::operator()() {
while (child_iterator_->eof_handled() == false) {
while (new_row.empty() == false) {
table->emplace_back(new_row);
// if # of rows is enough to make 1 batch (1 batch is buffer), send it to worker_queue
// if # of rows is enough to make 1 batch, send it to worker_queue
if (table->size() == static_cast<size_t>(cur_batch_size)) {
RETURN_IF_NOT_OK(worker_queues_[cnt % num_workers_]->EmplaceBack(
std::make_pair(std::move(table), CBatchInfo(epoch_num, batch_num++, cnt + 1 - epoch_num))));
@@ -160,7 +160,7 @@ Status BatchOp::operator()() {
} // end of eof_handled() == false
RETURN_IF_NOT_OK(
worker_queues_[cnt++ % num_workers_]->EmplaceBack(std::make_pair(nullptr, CBatchInfo(batchCtrl::kEOF))));
// EOF received, send quit signal (an empty buffer) to all workers
// EOF received, send quit signal to all workers
for (int32_t ind = 0; ind < num_workers_; ind++) {
RETURN_IF_NOT_OK(
worker_queues_[cnt++ % num_workers_]->EmplaceBack(std::make_pair(nullptr, CBatchInfo(batchCtrl::kQuit))));


+ 1
- 1
mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.h View File

@@ -222,7 +222,7 @@ class BatchOp : public ParallelOp {
// @return Status The status code returned
Status WorkerEntry(int32_t worker_id) override;

// Generate buffer with batched tensors
// Generate row with batched tensors
// @return Status The status code returned
Status MakeBatchedRow(std::pair<std::unique_ptr<TensorQTable>, CBatchInfo> table_pair, TensorRow *new_row);



+ 1
- 2
mindspore/ccsrc/minddata/dataset/engine/datasetops/cache_base_op.cc View File

@@ -70,8 +70,7 @@ Status CacheBase::FetchSamplesToWorkers() {
int64_t buf_cnt = 0;
int64_t wait_cnt = 0;
int64_t prefetch_cnt = 0;
// Kick off several threads which will prefetch prefetch_size_ rows in advance. The rows_per_buffers_
// is too small (1 by default) and won't help performance.
// Kick off several threads which will prefetch prefetch_size_ rows in advance.
RETURN_IF_NOT_OK(
tree_->LaunchWorkers(num_prefetchers_, std::bind(&CacheBase::Prefetcher, this, std::placeholders::_1), Name()));
auto send_to_que = [](QueueList<std::unique_ptr<IOBlock>> &qList, int32_t worker_id,


+ 3
- 3
mindspore/ccsrc/minddata/dataset/engine/datasetops/concat_op.cc View File

@@ -144,7 +144,7 @@ Status ConcatOp::operator()() {
}
}

// 4. Add eoe buffer after get buffer from all child
// 4. Add eoe row after get rows from all child
if (eof_count == 0) {
RETURN_IF_NOT_OK(out_connector_->SendEOE());
}
@@ -152,8 +152,8 @@ Status ConcatOp::operator()() {
}
CHECK_FAIL_RETURN_UNEXPECTED(eof_count == children_num_,
"Something went wrong, eof count does not match the number of children.");
// 5. Add eof buffer in the end manually
MS_LOG(DEBUG) << "Add the eof buffer manually in the end.";
// 5. Add eof row in the end manually
MS_LOG(DEBUG) << "Add the eof row manually in the end.";
RETURN_IF_NOT_OK(out_connector_->SendEOF());
return Status::OK();
}


+ 1
- 1
mindspore/ccsrc/minddata/dataset/engine/datasetops/dataset_op.cc View File

@@ -257,7 +257,7 @@ Status DatasetOp::GetNextRowPullMode(TensorRow *const row) {
return child_[0]->GetNextRowPullMode(row);
}

// Gets the next buffer from the given child
// Gets the next row from the given child
Status DatasetOp::GetNextRow(TensorRow *row, int32_t worker_id, bool retry_if_eoe) {
// pop is a blocked call and will throw an interruption if the whole group shuts down.
RETURN_IF_NOT_OK(out_connector_->PopWithRetry(static_cast<int>(worker_id), row, retry_if_eoe));


+ 5
- 5
mindspore/ccsrc/minddata/dataset/engine/datasetops/dataset_op.h View File

@@ -149,13 +149,13 @@ class DatasetOp : public std::enable_shared_from_this<DatasetOp> {
/// \return Status The status code returned
virtual Status operator()() = 0;

/// \brief Gets the next buffer from the given child
/// \brief Gets the next row from the given child
/// \param row[out] - Fetched TensorRow
/// \param worker_id[in] - The worker id, default to 0.
/// \return Status The status code returned
virtual Status GetNextRow(TensorRow *row, int32_t worker_id = 0) { return GetNextRow(row, worker_id, false); }

/// \brief Gets the next buffer from the given child
/// \brief Gets the next row from the given child
/// \param row[out] - Fetched TensorRow
/// \param worker_id[in] - The worker id, default to 0.
/// \param retry_if_eoe Set this flag to true to allow calling pop() again after the first pop() returns EOE.
@@ -263,9 +263,9 @@ class DatasetOp : public std::enable_shared_from_this<DatasetOp> {
return ChildOpConnectorSize();
}

/// \brief Counting number of buffer sent out by a connector
int64_t ConnectorOutBufferCount() const {
return out_connector_ == nullptr ? int64_t(-1) : static_cast<int64_t>(out_connector_->out_buffers_count());
/// \brief Counting number of rows sent out by a connector
int64_t ConnectorOutRowsCount() const {
return out_connector_ == nullptr ? int64_t(-1) : static_cast<int64_t>(out_connector_->out_rows_count());
}

/// \brief Getter function


+ 2
- 2
mindspore/ccsrc/minddata/dataset/engine/datasetops/epoch_ctrl_op.cc View File

@@ -70,7 +70,7 @@ Status EpochCtrlOp::GetNextRow(TensorRow *row, int32_t worker_id, bool retry_if_
RETURN_IF_NOT_OK(child_[0]->GetNextRow(row, worker_id, false));

// Only intercept EOE for EoeReceived processing, after that the EOE is forwarded to next op.
// Other databuffers containing data or EOF will simply be forwarded.
// Other TensorRows containing data or EOF will simply be forwarded.
// EOF can simply be forwarded because this op does not spawn any thread, thus does not require clean up.
if (row->eoe()) {
RETURN_IF_NOT_OK(EoeReceived(worker_id));
@@ -85,7 +85,7 @@ Status EpochCtrlOp::EoeReceived(int32_t worker_id) {
MS_LOG(DEBUG) << "Epoch Control operator received end of epoch. Epoch count is now: " << repeat_count_
<< ". Max epochs: " << num_repeats_;

// This will allow GetNextInput in DatasetOp class to pass EOE buffer instead of eating it.
// This will allow GetNextInput in DatasetOp class to pass EOE row instead of eating it.
state_ = OpState::kDeOpIdle;

if (repeat_count_ != num_repeats_) {


+ 4
- 4
mindspore/ccsrc/minddata/dataset/engine/datasetops/epoch_ctrl_op.h View File

@@ -54,10 +54,10 @@ class EpochCtrlOp : public RepeatOp {
void Print(std::ostream &out, bool show_all) const override;
std::string Name() const override { return kEpochCtrlOp; }

// This function returns the buffer that is at the top of our output connector. The caller is
// typically our parent node, when the parent is asking us to provide the next buffer of data.
// Since EpochCtrlOp is derived from RepeatOp which is an inlined op, getting a buffer from us
// will simply bounce you to get a buffer from our child.
// This function returns the row that is at the top of our output connector. The caller is
// typically our parent node, when the parent is asking us to provide the next row of data.
// Since EpochCtrlOp is derived from RepeatOp which is an inlined op, getting a row from us
// will simply bounce you to get a row from our child.
// Epoch Control Op does not eat the EOE, it will pass the EOE to the next op.
Status GetNextRow(TensorRow *row, int32_t worker_id, bool retry_if_eoe) override;



+ 3
- 3
mindspore/ccsrc/minddata/dataset/engine/datasetops/filter_op.cc View File

@@ -78,7 +78,7 @@ Status FilterOp::EofReceived(int32_t) { return Status::OK(); }

Status FilterOp::EoeReceived(int32_t) { return Status::OK(); }

// Validating if each of the input_columns exists in the DataBuffer.
// Validating if each of the input_columns exists in the column_name_id_map_.
Status FilterOp::ValidateInColumns(const std::vector<std::string> &input_columns) {
for (const auto &inCol : input_columns) {
bool found = column_name_id_map_.find(inCol) != column_name_id_map_.end() ? true : false;
@@ -158,9 +158,9 @@ Status FilterOp::WorkerCompute(const TensorRow &in_row, bool *out_predicate) {
return Status::OK();
}

// if the filtered DataBuffer is written directly to out_connector_,
// if the filtered TensorRow is written directly to out_connector_,
// the thread fetching data will block in a queue.
// Collector function will reorder the DataBuffer in order.
// Collector function will reorder the TensorRow in order.
// for example in two work queues:
// int filter_queues_:
// queue1: DB(data1 kFilterEmpty) DB(eoe) DB(data4) DB(eof)


+ 2
- 2
mindspore/ccsrc/minddata/dataset/engine/datasetops/filter_op.h View File

@@ -149,7 +149,7 @@ class FilterOp : public ParallelOp {
// @return Status The status code returned
Status WorkerCompute(const TensorRow &in_row, bool *out_predicate);

// Collector databuffer.
// Collector TensorRows.
// @return Status The status code returned
Status Collector();

@@ -164,7 +164,7 @@ class FilterOp : public ParallelOp {
Status InvokePredicateFunc(const TensorRow &input, bool *out_predicate);

// Private function for validating if each of the user specified input column names
// exist in the DataBuffer.
// exist in column_name_id_map_.
// @param input_columns The vector of input column names used in the current thread.
// @return Status The status code returned
Status ValidateInColumns(const std::vector<std::string> &input_columns);


+ 18
- 20
mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/map_op.cc View File

@@ -103,9 +103,9 @@ void MapOp::Print(std::ostream &out, bool show_all) const {
// A helper function that fetch worker map job from local queues and extract the data and map job list
Status MapOp::FetchNextWork(uint32_t worker_id, TensorRow *row, std::vector<std::shared_ptr<MapJob>> *job_list) {
std::unique_ptr<MapWorkerJob> worker_job;
// Fetch the next worker job and data buffer
// Fetch the next worker job and TensorRow
RETURN_IF_NOT_OK(local_queues_[worker_id]->PopFront(&worker_job));
// Extract the databuffer and job list from the map worker job.
// Extract the TensorRow and job list from the map worker job.
*row = std::move(worker_job->tensor_row);
*job_list = std::move(worker_job->jobs);

@@ -160,8 +160,8 @@ Status MapOp::operator()() {
// Synchronize with TaskManager
TaskManager::FindMe()->Post();
RETURN_IF_NOT_OK(rc);
// num_buffers received, including eoe, num_epoch, num_step of current epoch
int64_t num_buf = 0, ep_step = 0, total_step = 0;
// num_rows received, including eoe, num_epoch, num_step of current epoch
int64_t num_rows = 0, ep_step = 0, total_step = 0;

RETURN_IF_NOT_OK(callback_manager_.Begin(CallbackParam(0, ep_step, total_step)));

@@ -176,7 +176,7 @@ Status MapOp::operator()() {
while (!new_row.eoe()) {
ep_step++;
total_step++;
// Create an empty map worker job to be populated by a databuffer and map jobs
// Create an empty map worker job to be populated by a TensorRow and map jobs

RETURN_IF_NOT_OK(callback_manager_.StepBegin(CallbackParam(op_current_epochs_ + 1, ep_step, total_step)));

@@ -186,7 +186,7 @@ Status MapOp::operator()() {
RETURN_IF_NOT_OK(GenerateWorkerJob(&worker_job));

// Push map worker job to the corresponding worker's queue
RETURN_IF_NOT_OK(local_queues_[num_buf++ % num_workers_]->Add(std::move(worker_job)));
RETURN_IF_NOT_OK(local_queues_[num_rows++ % num_workers_]->Add(std::move(worker_job)));

RETURN_IF_NOT_OK(callback_manager_.StepEnd(CallbackParam(op_current_epochs_ + 1, ep_step, total_step)));

@@ -199,22 +199,22 @@ Status MapOp::operator()() {

ep_step = 0;
}
// Propagate the eoe buffer to worker
// Propagate the eoe row to worker
std::unique_ptr<MapWorkerJob> worker_job = std::make_unique<MapWorkerJob>(std::move(new_row));
RETURN_IF_NOT_OK(local_queues_[num_buf++ % num_workers_]->Add(std::move(worker_job)));
RETURN_IF_NOT_OK(local_queues_[num_rows++ % num_workers_]->Add(std::move(worker_job)));
UpdateRepeatAndEpochCounter();
RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&new_row));
}
// End() is commented out because it might never be called due to the lack of EOF when EpochCtrl is -1
// Handle eof logic, this code might never be reached if epoch_ctrl = -1.
std::unique_ptr<MapWorkerJob> worker_job = std::make_unique<MapWorkerJob>(std::move(new_row));
RETURN_IF_NOT_OK(local_queues_[num_buf++ % num_workers_]->Add(std::move(worker_job)));
RETURN_IF_NOT_OK(local_queues_[num_rows++ % num_workers_]->Add(std::move(worker_job)));

// Quit all workers, this code might never be reached if EpochCtrl is -1.
for (int32_t wkr_id = 0; wkr_id < num_workers_; wkr_id++) {
TensorRow quit_flag(TensorRow::kFlagQuit);
auto quit = std::make_unique<MapWorkerJob>(quit_flag);
RETURN_IF_NOT_OK(local_queues_[num_buf++ % num_workers_]->Add(std::move(quit)));
RETURN_IF_NOT_OK(local_queues_[num_rows++ % num_workers_]->Add(std::move(quit)));
}

return Status::OK();
@@ -230,14 +230,14 @@ Status MapOp::WorkerEntry(int32_t worker_id) {

TensorRow in_row;
std::vector<std::shared_ptr<MapJob>> job_list;
// Fetch next data buffer and map job list
// Fetch next data row and map job list
RETURN_IF_NOT_OK(FetchNextWork(worker_id, &in_row, &job_list));

// Now that init work is done, drop into the main fetching loop.
// Map op does not use child iterator, and it needs to manually handle eoe and eof's itself
// rather than use the base-class defaults.
while (true) {
// Handle special logic where buffer carries a ctrl flag.
// Handle special logic where row carries a ctrl flag.
if (in_row.Flags() != TensorRow::kFlagNone) {
if (in_row.wait()) {
// When worker receives the signal from master thread, it increments a atomic int
@@ -247,10 +247,10 @@ Status MapOp::WorkerEntry(int32_t worker_id) {
}
// This will block the worker until master thread gives it a new work
} else if (in_row.eoe()) {
// Calling base class EoeReceived to forward eoe buffer.
// Calling base class EoeReceived to forward eoe row.
RETURN_IF_NOT_OK(out_connector_->SendEOE(worker_id));
} else if (in_row.eof()) {
// Calling base class EofReceived to forward eof buffer.
// Calling base class EofReceived to forward eof row.
RETURN_IF_NOT_OK(out_connector_->SendEOF(worker_id));
} else if (in_row.quit()) {
break;
@@ -262,10 +262,9 @@ Status MapOp::WorkerEntry(int32_t worker_id) {
TensorRow out_row;
// Perform the compute function of TensorOp(s) and store the result in new_tensor_table.
RETURN_IF_NOT_OK(WorkerCompute(in_row, &out_row, job_list));
// Replace the TensorTable in DataBuffer with the new one.
// Push the buffer onto the connector for next operator to consume.
// Push the row onto the connector for next operator to consume.
RETURN_IF_NOT_OK(out_connector_->Add(std::move(out_row), static_cast<int>(worker_id)));
// Fetch next data buffer and map job list
// Fetch next data row and map job list
RETURN_IF_NOT_OK(FetchNextWork(worker_id, &in_row, &job_list));
}
return Status::OK();
@@ -280,7 +279,6 @@ Status MapOp::WorkerCompute(const TensorRow &in_row, TensorRow *out_row,
TensorRow to_process;
// Prepare the data that we need from in_row
// to_process : A vector of Tensors only holding cols in input_columns.
// cur_row : A vector of Tensors holding all the cols from DataBuffer.

// From the current row, select the Tensor that need to be passed to TensorOp
(void)std::transform(to_process_indices_.begin(), to_process_indices_.end(), std::back_inserter(to_process),
@@ -349,7 +347,7 @@ Status MapOp::ComputeColMap() {
return Status::OK();
}

// Validating if each of the input_columns exists in the DataBuffer.
// Validating if each of the input_columns exists in the col_name_id_map.
Status MapOp::ValidateInColumns(const std::unordered_map<std::string, int32_t> &col_name_id_map) {
for (const auto &inCol : in_columns_) {
bool found = col_name_id_map.find(inCol) != col_name_id_map.end();
@@ -442,7 +440,7 @@ Status MapOp::WaitForWorkers() {
// reset num_paused workers to 0
num_workers_paused_ = 0;
for (int32_t wkr_id = 0; wkr_id < num_workers_; wkr_id++) {
// a special buffer (id=-1, empty, none flag) is used to signal that worker needs to pause.
// a special row (id=-1, empty, none flag) is used to signal that worker needs to pause.
TensorRow waitRow(TensorRow::kFlagWait);
RETURN_IF_NOT_OK(local_queues_[wkr_id]->Add(std::make_unique<MapWorkerJob>(waitRow)));
}


+ 4
- 5
mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/map_op.h View File

@@ -168,7 +168,7 @@ class MapOp : public ParallelOp {
// Class functor operator () override.
// All dataset ops operate by launching a thread (see ExecutionTree). This class functor will
// provide the master loop that drives the logic for performing the work
// This main thread creates local queues, pulls databuffers from the previous
// This main thread creates local queues, pulls TensorRow from the previous
// op's Connector and distributes them to the local queues. Workers pull from the local queues.
// @return Status The status code returned
Status operator()() override;
@@ -232,9 +232,8 @@ class MapOp : public ParallelOp {
Status WorkerEntry(int32_t worker_id) override; // In: workerId assigned by tree_

// Private function for worker thread to perform TensorOp's compute function and get the result.
// @param in_buffer A raw pointer to the DataBuffer. A raw pointer is fine because this function doesn't manage memory
// and is not shared with other threads.
// @param[out] new_tensor_table A new Tensor Table to be populated in this function.
// @param in_row Input TensorRow
// @param[out] out_row Generated TensorRow
Status WorkerCompute(const TensorRow &in_row, TensorRow *out_row,
const std::vector<std::shared_ptr<MapJob>> &job_list);

@@ -243,7 +242,7 @@ class MapOp : public ParallelOp {
// @param col_name_id_map The column name to index mapping obtained from child operator
void CreateFinalColMap(std::unordered_map<std::string, int32_t> *col_name_id_map);

// Validating if each of the input_columns exists in the DataBuffer.
// Validating if each of the input_columns exists in col_name_id_map.
// @param - the column map to check
// @return - status return code
Status ValidateInColumns(const std::unordered_map<std::string, int32_t> &col_name_id_map);


+ 1
- 1
mindspore/ccsrc/minddata/dataset/engine/datasetops/project_op.cc View File

@@ -66,7 +66,7 @@ void ProjectOp::Print(std::ostream &out, bool show_all) const {
}
}

// Gets a buffer from the child operator and projects the buffer.
// Gets a row from the child operator and projects the buffer.
Status ProjectOp::GetNextRow(TensorRow *row, int32_t worker_id, bool retry_if_eoe) {
RETURN_IF_NOT_OK(child_[0]->GetNextRow(row, worker_id, retry_if_eoe));
if (!row->eoe() && !row->eof()) {


+ 2
- 2
mindspore/ccsrc/minddata/dataset/engine/datasetops/project_op.h View File

@@ -78,8 +78,8 @@ class ProjectOp : public PipelineOp {
// @return Status The status code returned
Status operator()() override;

// Gets a buffer from the child node and projects that buffer. The caller is typically our parent node.
// @param p_buffer - output pointer to the projected buffer.
// Gets a row from the child node and projects that row. The caller is typically our parent node.
// @param row - output pointer to the projected row.
// @param worker_id - The worker id
Status GetNextRow(TensorRow *row, int32_t worker_id, bool retry_if_eoe) override;



+ 2
- 2
mindspore/ccsrc/minddata/dataset/engine/datasetops/rename_op.cc View File

@@ -66,14 +66,14 @@ Status RenameOp::operator()() {

while (!new_row.eof()) {
while (!new_row.eoe()) {
MS_LOG(DEBUG) << "Rename operator pushing next buffer.";
MS_LOG(DEBUG) << "Rename operator pushing next row.";
RETURN_IF_NOT_OK(out_connector_->Add(std::move(new_row)));
RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&new_row));
}
RETURN_IF_NOT_OK(out_connector_->SendEOE());
MS_LOG(DEBUG) << "Rename operator EOE Received.";
RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&new_row));
MS_LOG(DEBUG) << "Rename operator fetching buffer after EOE.";
MS_LOG(DEBUG) << "Rename operator fetching row after EOE.";
}
RETURN_IF_NOT_OK(out_connector_->SendEOF());
MS_LOG(DEBUG) << "Rename operator EOF Received.";


+ 5
- 5
mindspore/ccsrc/minddata/dataset/engine/datasetops/repeat_op.cc View File

@@ -72,12 +72,12 @@ void RepeatOp::Print(std::ostream &out, bool show_all) const {
}
}

// This function returns the buffer that is at the top of our output connector. The caller is
// typically our parent node, when the parent is asking us to provide the next buffer of data.
// Since RepeatOp is an inlined op, getting a buffer from us will simply bounce you to get
// a buffer from our child.
// This function returns the row that is at the top of our output connector. The caller is
// typically our parent node, when the parent is asking us to provide the next row of data.
// Since RepeatOp is an inlined op, getting a row from us will simply bounce you to get
// a row from our child.
// This function sets the `retryIfEoe` flag when popping from the child connector. This way,
// this function will retry to pop the connector again and will get the non-EOE buffer if any.
// this function will retry to pop the connector again and will get the non-EOE row if any.
Status RepeatOp::GetNextRow(TensorRow *row, int32_t worker_id, bool retry_if_eoe) {
if (child_.empty()) {
RETURN_STATUS_UNEXPECTED("Pipeline init failed, RepeatOp can't be the first op in pipeline.");


+ 6
- 6
mindspore/ccsrc/minddata/dataset/engine/datasetops/repeat_op.h View File

@@ -81,13 +81,13 @@ class RepeatOp : public PipelineOp {
// @return Status The status code returned
Status operator()() override;

// This function returns the buffer that is at the top of our output connector. The caller is
// typically our parent node, when the parent is asking us to provide the next buffer of data.
// Since RepeatOp is an inlined op, getting a buffer from us will simply bounce you to get
// a buffer from our child.
// This function returns the row that is at the top of our output connector. The caller is
// typically our parent node, when the parent is asking us to provide the next row of data.
// Since RepeatOp is an inlined op, getting a row from us will simply bounce you to get
// a row from our child.
// @note This function sets the `retryIfEoe` flag when popping from the child connector. This way,
// this function will retry to pop the connector again and will get the non-EOE buffer if any.
// @param p_buffer - output pointer to the buffer that it will fetch.
// this function will retry to pop the connector again and will get the non-EOE row if any.
// @param row - output pointer to the buffer that it will fetch.
// @param worker_id - The worker id
// @param retry_if_eoe Set this flag to true to allow calling pop() again after the first pop() returns EOE.
// @return Status The status code returned


+ 2
- 2
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.cc View File

@@ -343,7 +343,7 @@ Status AlbumOp::LoadIntTensor(const nlohmann::json &json_obj, uint32_t col_num,
return Status::OK();
}

// Load 1 TensorRow (image,label) using 1 ImageColumns. 1 function call produces 1 TensorRow in a DataBuffer
// Load 1 TensorRow (image,label) using 1 ImageColumns. 1 function call produces 1 TensorRow
// possible optimization: the helper functions of LoadTensorRow should be optimized
// to take a reference to a column descriptor?
// the design of this class is to make the code more readable, forgoing minor performance gain like
@@ -463,7 +463,7 @@ Status AlbumOp::LaunchThreadsAndInitOp() {
// registers QueueList and individual Queues for interrupt services
RETURN_IF_NOT_OK(io_block_queues_.Register(tree_->AllTasks()));
RETURN_IF_NOT_OK(wait_for_workers_post_.Register(tree_->AllTasks()));
// launch main workers that load DataBuffers by reading all images
// launch main workers that load TensorRows by reading all images
RETURN_IF_NOT_OK(
tree_->LaunchWorkers(num_workers_, std::bind(&AlbumOp::WorkerEntry, this, std::placeholders::_1), "", id()));
TaskManager::FindMe()->Post();


+ 1
- 1
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/cifar_op.cc View File

@@ -113,7 +113,7 @@ Status CifarOp::LaunchThreadsAndInitOp() {
return Status::OK();
}

// Load 1 TensorRow (image,label). 1 function call produces 1 TensorTow in a DataBuffer
// Load 1 TensorRow (image,label). 1 function call produces 1 TensorTow
Status CifarOp::LoadTensorRow(row_id_type index, TensorRow *trow) {
std::shared_ptr<Tensor> label;
std::shared_ptr<Tensor> fine_label;


+ 2
- 2
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/clue_op.cc View File

@@ -100,7 +100,7 @@ Status ClueOp::Init() {
io_block_queues_.Init(num_workers_, safe_queue_size);

RETURN_IF_NOT_OK(ParallelOp::CreateWorkerConnector(worker_connector_size_));
jagged_buffer_connector_ = std::make_unique<JaggedConnector>(num_workers_, 1, worker_connector_size_);
jagged_rows_connector_ = std::make_unique<JaggedConnector>(num_workers_, 1, worker_connector_size_);

return Status::OK();
}
@@ -181,7 +181,7 @@ Status ClueOp::LoadFile(const std::string &file, int64_t start_offset, int64_t e
}

rows_total++;
RETURN_IF_NOT_OK(jagged_buffer_connector_->Add(worker_id, std::move(tRow)));
RETURN_IF_NOT_OK(jagged_rows_connector_->Add(worker_id, std::move(tRow)));
}

return Status::OK();


+ 1
- 1
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/clue_op.h View File

@@ -169,7 +169,7 @@ class ClueOp : public NonMappableLeafOp {
std::string Name() const override { return "ClueOp"; }

private:
// Reads a clue file and loads the data into multiple buffers.
// Reads a clue file and loads the data into multiple TensorRows.
// @param file - the file to read.
// @param start_offset - the start offset of file.
// @param end_offset - the end offset of file.


+ 5
- 5
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/csv_op.cc View File

@@ -85,7 +85,7 @@ Status CsvOp::Init() {
io_block_queues_.Init(num_workers_, safe_queue_size);

RETURN_IF_NOT_OK(ParallelOp::CreateWorkerConnector(worker_connector_size_));
jagged_buffer_connector_ = std::make_unique<JaggedConnector>(num_workers_, 1, worker_connector_size_);
jagged_rows_connector_ = std::make_unique<JaggedConnector>(num_workers_, 1, worker_connector_size_);

return Status::OK();
}
@@ -93,7 +93,7 @@ Status CsvOp::Init() {
CsvOp::CsvParser::CsvParser(int32_t worker_id, JaggedConnector *connector, char field_delim,
std::vector<std::shared_ptr<CsvOp::BaseRecord>> column_default, std::string file_path)
: worker_id_(worker_id),
buffer_connector_(connector),
rows_connector_(connector),
csv_field_delim_(field_delim),
column_default_(column_default),
file_path_(file_path),
@@ -200,7 +200,7 @@ int CsvOp::CsvParser::PutRow(int c) {
total_rows_++;
cur_col_ = 0;

buffer_connector_->Add(worker_id_, std::move(cur_row_));
rows_connector_->Add(worker_id_, std::move(cur_row_));

return 0;
}
@@ -467,7 +467,7 @@ Status CsvOp::CsvParser::InitCsvParser() {
}

Status CsvOp::LoadFile(const std::string &file, int64_t start_offset, int64_t end_offset, int32_t worker_id) {
CsvParser csv_parser(worker_id, jagged_buffer_connector_.get(), field_delim_, column_default_list_, file);
CsvParser csv_parser(worker_id, jagged_rows_connector_.get(), field_delim_, column_default_list_, file);
csv_parser.SetStartOffset(start_offset);
csv_parser.SetEndOffset(end_offset);
std::ifstream ifs;
@@ -588,7 +588,7 @@ Status CsvOp::CalculateNumRowsPerShard() {
}

int64_t CsvOp::CountTotalRows(const std::string &file) {
CsvParser csv_parser(0, jagged_buffer_connector_.get(), field_delim_, column_default_list_, file);
CsvParser csv_parser(0, jagged_rows_connector_.get(), field_delim_, column_default_list_, file);
std::ifstream ifs;
ifs.open(file, std::ifstream::in);
if (!ifs.is_open()) {


+ 2
- 2
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/csv_op.h View File

@@ -127,7 +127,7 @@ class CsvOp : public NonMappableLeafOp {
int CatchException(int c);

int32_t worker_id_;
JaggedConnector *buffer_connector_;
JaggedConnector *rows_connector_;
const char csv_field_delim_;
std::vector<std::shared_ptr<CsvOp::BaseRecord>> column_default_;
State cur_state_;
@@ -298,7 +298,7 @@ class CsvOp : public NonMappableLeafOp {
// @return Status - the error code returned.
Status LoadTensor(const std::string &line, std::unique_ptr<TensorQTable> *tensor_table, int64_t row);

// Reads a csv file and loads the data into multiple buffers.
// Reads a csv file and loads the data into multiple tensors.
// @param file - the file to read.
// @param start_offset - the start offset of file.
// @param end_offset - the end offset of file.


+ 1
- 5
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/generator_op.cc View File

@@ -53,7 +53,6 @@ GeneratorOp::GeneratorOp(py::function generator_function, std::vector<std::strin
column_names_(column_names),
column_types_(column_types),
prefetch_size_(prefetch_size),
buffer_id_(0),
generator_counter_(0) {}

void GeneratorOp::Print(std::ostream &out, bool show_all) const {
@@ -108,7 +107,6 @@ Status GeneratorOp::CreateGeneratorObject() {

// Reentrant init method.
Status GeneratorOp::Init() {
buffer_id_ = 0;
RETURN_IF_NOT_OK(InitSampler());
return CreateGeneratorObject();
}
@@ -150,7 +148,7 @@ Status GeneratorOp::PyRowToTensorRow(py::object py_data, TensorRow *tensor_row)
//
// while !eof:
// Try:
// Prepare one data buffer GIL, Can throw
// Prepare one data row GIL, Can throw
// Catch:
// Fetch Python Exception GIL
// Check if Exception is StopIteration (EOE) GIL
@@ -248,8 +246,6 @@ Status GeneratorOp::operator()() {
Status GeneratorOp::Reset() {
// Reset Op state
MS_LOG(DEBUG) << Name() << " performing a self-reset.";
// Reset BufferID
buffer_id_ = 0;
// Create new generator object
RETURN_IF_NOT_OK(CreateGeneratorObject());
if (this->op_total_repeats() < 0) {


+ 0
- 3
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/generator_op.h View File

@@ -138,14 +138,11 @@ class GeneratorOp : public PipelineOp, public RandomAccessOp {
int64_t generator_counter_;

py::object generator_;
int32_t buffer_id_;

WaitPost wp_;

Status PyRowToTensorRow(py::object py_data, TensorRow *tensor_row);

Status FillBuffer(TensorQTable *tt);

/// Private function for computing the assignment of the column name map.
/// \return - Status
Status ComputeColMap() override;


+ 2
- 2
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/image_folder_op.cc View File

@@ -119,7 +119,7 @@ Status ImageFolderOp::PrescanMasterEntry(const std::string &filedir) {
return Status::OK();
}

// Load 1 TensorRow (image,label) using 1 ImageLabelPair. 1 function call produces 1 TensorTow in a DataBuffer
// Load 1 TensorRow (image,label) using 1 ImageLabelPair. 1 function call produces 1 TensorTow
Status ImageFolderOp::LoadTensorRow(row_id_type row_id, TensorRow *trow) {
ImageLabelPair pairPtr = image_label_pairs_[row_id];
std::shared_ptr<Tensor> image, label;
@@ -262,7 +262,7 @@ Status ImageFolderOp::LaunchThreadsAndInitOp() {
// The following code launch 3 threads group
// 1) A thread that walks all folders and push the folder names to a util:Queue folder_name_queue_.
// 2) Workers that pull foldername from folder_name_queue_, walk it and return the sorted images to image_name_queue
// 3) Launch main workers that load DataBuffers by reading all images
// 3) Launch main workers that load TensorRows by reading all images
RETURN_IF_NOT_OK(
tree_->AllTasks()->CreateAsyncTask("walk dir", std::bind(&ImageFolderOp::StartAsyncWalk, this), nullptr, id()));
RETURN_IF_NOT_OK(tree_->LaunchWorkers(num_workers_,


+ 1
- 1
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/image_folder_op.h View File

@@ -167,7 +167,7 @@ class ImageFolderOp : public MappableLeafOp {
// @return Status The status code returned
Status PrescanMasterEntry(const std::string &dir);

// Worker thread pulls a number of IOBlock from IOBlock Queue, make a buffer and push it to Connector
// Worker thread pulls a number of IOBlock from IOBlock Queue, make a TensorRow and push it to Connector
// @param int32_t workerId - id of each worker
// @return Status The status code returned
Status PrescanWorkerEntry(int32_t worker_id);


+ 1
- 1
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/manifest_op.cc View File

@@ -94,7 +94,7 @@ Status ManifestOp::LaunchThreadsAndInitOp() {
return Status::OK();
}

// Load 1 TensorRow (image,label) using 1 ImageLabelPair. 1 function call produces 1 TensorTow in a DataBuffer
// Load 1 TensorRow (image,label) using 1 ImageLabelPair. 1 function call produces 1 TensorTow
Status ManifestOp::LoadTensorRow(row_id_type row_id, TensorRow *trow) {
std::pair<std::string, std::vector<std::string>> data = image_labelname_[static_cast<size_t>(row_id)];
std::shared_ptr<Tensor> image;


+ 5
- 6
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mindrecord_op.cc View File

@@ -119,7 +119,6 @@ MindRecordOp::MindRecordOp(int32_t num_mind_record_workers, std::vector<std::str
columns_to_load_(columns_to_load),
operators_(operators),
num_mind_record_workers_(num_mind_record_workers),
buffers_needed_(0),
ended_worker_(0),
num_padded_(num_padded),
sample_json_(sample_json),
@@ -207,8 +206,8 @@ void MindRecordOp::Print(std::ostream &out, bool show_all) const {
for (auto &file : dataset_file_) {
out << file << " ";
}
out << "\nNumber of rows : " << num_rows_ << "\nNumber of buffers : " << buffers_needed_
<< "\nNumber of ShardReader workers : " << num_mind_record_workers_ << "\n\n";
out << "\nNumber of rows : " << num_rows_ << "\nNumber of ShardReader workers : " << num_mind_record_workers_
<< "\n\n";
}
}

@@ -237,7 +236,7 @@ Status MindRecordOp::WorkerEntry(int32_t worker_id) {
continue;
}

// load data buffer
// load TensorRow
std::vector<int64_t> keys;
RETURN_IF_NOT_OK(io_block->GetKeys(&keys));
if (keys.empty() == true) {
@@ -252,7 +251,7 @@ Status MindRecordOp::WorkerEntry(int32_t worker_id) {
const uint64_t row_id = keys[0];
TensorRow fetched_row;

// Get the next buffer. Push it up to the output connector.
// Get the next row. Push it up to the output connector.
if (row_id % LOG_INTERVAL == 0) {
MS_LOG(DEBUG) << "MindRecord operator consumed row " << row_id << " by worker " << worker_id << ".";
}
@@ -382,7 +381,7 @@ Status MindRecordOp::LaunchThreadsAndInitOp() {
if (shard_reader_->Launch(true) == MSRStatus::FAILED) {
RETURN_STATUS_UNEXPECTED("MindRecordOp launch failed.");
}
// Launch main workers that load DataBuffers by reading all images
// Launch main workers that load TensorRows by reading all images
RETURN_IF_NOT_OK(
tree_->LaunchWorkers(num_workers_, std::bind(&MindRecordOp::WorkerEntry, this, std::placeholders::_1), "", id()));
num_rows_ = shard_reader_->GetNumRows();


+ 1
- 1
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mindrecord_op.h View File

@@ -160,7 +160,7 @@ class MindRecordOp : public MappableLeafOp {
return out;
}

// Worker thread pulls a number of IOBlock from IOBlock Queue, make a buffer and push it to Connector
// Worker thread pulls a number of IOBlock from IOBlock Queue, make a TensorRow and push it to Connector
// @param int32_t workerId - id of each worker
// @return Status The status code returned
Status WorkerEntry(int32_t worker_id) override;


+ 4
- 4
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/nonmappable_leaf_op.cc View File

@@ -66,7 +66,7 @@ Status NonMappableLeafOp::operator()() {
RETURN_IF_NOT_OK(tree_->LaunchWorkers(1, std::bind(&NonMappableLeafOp::WaitToFillIOBlockQueue, this), "", id()));

// launch num_workers_ worker threads, responsible for pulling from the IOBlockQueue and reading
// data from disk into buffers
// data from disk into TensorRows
RETURN_IF_NOT_OK(tree_->LaunchWorkers(
num_workers_, std::bind(&NonMappableLeafOp::WorkerEntry, this, std::placeholders::_1), "", id()));

@@ -85,7 +85,7 @@ Status NonMappableLeafOp::operator()() {

while (workers_done < num_workers_) {
TensorRow fetched_row;
RETURN_IF_NOT_OK(jagged_buffer_connector_->Pop(0, &fetched_row));
RETURN_IF_NOT_OK(jagged_rows_connector_->Pop(0, &fetched_row));
if (fetched_row.eoe()) {
workers_done++;
} else if (total_rows_ == 0 || rows_read < total_rows_) {
@@ -122,7 +122,7 @@ Status NonMappableLeafOp::operator()() {
finished_reading_dataset_ = true;
NotifyToFillIOBlockQueue();
} else {
jagged_buffer_connector_->DoReset();
jagged_rows_connector_->DoReset();
// Self-reset to start a new iteration
RETURN_IF_NOT_OK(Reset());
}
@@ -156,7 +156,7 @@ Status NonMappableLeafOp::WorkerEntry(int32_t worker_id) {
}
} else {
TensorRow eoe = TensorRow(TensorRow::kFlagEOE);
RETURN_IF_NOT_OK(jagged_buffer_connector_->Add(worker_id, std::move(eoe)));
RETURN_IF_NOT_OK(jagged_rows_connector_->Add(worker_id, std::move(eoe)));
}

RETURN_IF_NOT_OK(PopIoBlockQueue(worker_id, &io_block));


+ 2
- 2
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/nonmappable_leaf_op.h View File

@@ -115,7 +115,7 @@ class NonMappableLeafOp : public ParallelOp {
// @return Status - the error code returned.
Status PushIoBlockQueue(int32_t index, std::unique_ptr<FilenameBlock> &&io_block);

// Reads a tf_file file and loads the data into multiple buffers.
// Reads a tf_file file and loads the data into multiple TensorRows.
// @param filename - the tf_file file to read.
// @param start_offset - the start offset of file.
// @param end_offset - the end offset of file.
@@ -156,7 +156,7 @@ class NonMappableLeafOp : public ParallelOp {
WaitPost io_block_queue_wait_post_;
bool load_io_block_queue_;
std::mutex load_io_block_queue_mutex_;
std::unique_ptr<JaggedConnector> jagged_buffer_connector_;
std::unique_ptr<JaggedConnector> jagged_rows_connector_;
bool shuffle_files_;
int64_t num_rows_per_shard_;
int64_t num_rows_;


+ 5
- 5
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/distributed_sampler.cc View File

@@ -87,17 +87,17 @@ Status DistributedSamplerRT::InitSampler() {
Status DistributedSamplerRT::GetNextSample(TensorRow *out) {
if (cnt_ > samples_per_tensor_) {
RETURN_STATUS_UNEXPECTED(
"Number of samples(cnt) that have already been filled in to buffer should be less than or "
"equal to samples_per_buffer, but got cnt: " +
std::to_string(cnt_) + ", samples_per_buffer: " + std::to_string(samples_per_tensor_));
"Number of samples(cnt) that have already been filled in to Tensor should be less than or "
"equal to samples_per_tensor, but got cnt: " +
std::to_string(cnt_) + ", samples_per_tensor: " + std::to_string(samples_per_tensor_));
} else if (cnt_ == samples_per_tensor_ && (non_empty_ || !even_dist_)) {
(*out) = TensorRow(TensorRow::kFlagEOE);
if (!samples_per_tensor_) {
non_empty_ = false;
}
} else if (!samples_per_tensor_ && !non_empty_) {
// If the buffer is empty, we add samples with subscript 0 in the current dataset.
// This step is to make up for the solution that the code default buffer is not empty before.
// If the Tensor is empty, we add samples with subscript 0 in the current dataset.
// This step is to make up for the solution that the code default Tensor is not empty before.
// We will remove this value in the concat phase
non_empty_ = true;
std::shared_ptr<Tensor> sample_ids;


+ 2
- 2
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/distributed_sampler.h View File

@@ -47,7 +47,7 @@ class DistributedSamplerRT : public SamplerRT {
/// \brief default destructor
~DistributedSamplerRT() = default;

/// \param std::unique_ptr<DataBuffer> * pBuffer
/// \param TensorRow out
/// \param int32_t workerId
/// \return Status code
Status GetNextSample(TensorRow *out) override;
@@ -78,7 +78,7 @@ class DistributedSamplerRT : public SamplerRT {
Status to_json(nlohmann::json *out_json) override;

private:
int64_t cnt_; // number of samples that have already been filled in to buffer
int64_t cnt_; // number of samples that have already been filled in to Tensor
uint32_t seed_;
int64_t device_id_;
int64_t num_devices_;


+ 2
- 2
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/pk_sampler.cc View File

@@ -20,8 +20,8 @@

namespace mindspore {
namespace dataset {
PKSamplerRT::PKSamplerRT(int64_t num_samples, int64_t val, bool shuffle, int64_t samples_per_buffer)
: SamplerRT(num_samples, samples_per_buffer),
PKSamplerRT::PKSamplerRT(int64_t num_samples, int64_t val, bool shuffle, int64_t samples_per_tensor)
: SamplerRT(num_samples, samples_per_tensor),
shuffle_(shuffle),
seed_(GetSeed()),
next_id_(0),


+ 3
- 3
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/pk_sampler.h View File

@@ -31,14 +31,14 @@ class PKSamplerRT : public SamplerRT { // NOT YET FINISHED
// @param num_samples - the number of samples to draw. value of 0 means to take the full amount
// @param int64_t val
// @param bool shuffle - shuffle all classIds or not, if true, classes may be 5,1,4,3,2
// @param int64_t samplesPerBuffer - Num of Sampler Ids to fetch via 1 GetNextBuffer call
// @param int64_t samples_per_tensor - Num of Sampler Ids to fetch via 1 GetNextSample call
PKSamplerRT(int64_t num_samples, int64_t val, bool shuffle,
int64_t samples_per_buffer = std::numeric_limits<int64_t>::max());
int64_t samples_per_tensor = std::numeric_limits<int64_t>::max());

// default destructor
~PKSamplerRT() = default;

// @param std::unique_ptr<DataBuffer pBuffer
// @param TensorRow
// @param int32_t workerId
// @return Status The status code returned
Status GetNextSample(TensorRow *out) override;


+ 2
- 2
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/python_sampler.cc View File

@@ -20,8 +20,8 @@
namespace mindspore {
namespace dataset {

PythonSamplerRT::PythonSamplerRT(int64_t num_samples, py::object py_sampler_instance, int64_t samples_per_buffer)
: SamplerRT(num_samples, samples_per_buffer), py_sampler_instance(py_sampler_instance), need_to_reset_(false) {}
PythonSamplerRT::PythonSamplerRT(int64_t num_samples, py::object py_sampler_instance, int64_t samples_per_tensor)
: SamplerRT(num_samples, samples_per_tensor), py_sampler_instance(py_sampler_instance), need_to_reset_(false) {}

Status PythonSamplerRT::GetNextSample(TensorRow *out) {
if (need_to_reset_) {


+ 5
- 5
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/python_sampler.h View File

@@ -29,9 +29,9 @@ class PythonSamplerRT : public SamplerRT {
// @param num_samples - the number of samples to draw. Value of 0 means to sample all of the
// data from the dataset.
// @param py_sampler_instance - the python instance of the sampler
// @param int64_t samples_per_buffer - Num of Sampler Ids to fetch via 1 GetNextBuffer call
// @param int64_t samples_per_tensor - Num of Sampler Ids to fetch via 1 GetNextSample call
explicit PythonSamplerRT(int64_t num_samples, py::object py_sampler_instance,
int64_t samples_per_buffer = std::numeric_limits<int64_t>::max());
int64_t samples_per_tensor = std::numeric_limits<int64_t>::max());

// Destructor.
~PythonSamplerRT() = default;
@@ -44,8 +44,8 @@ class PythonSamplerRT : public SamplerRT {
// @return Status The status code returned
Status ResetSampler() override;

// Op calls this to get next Buffer that contains all the sampleIds
// @param std::unique_ptr<DataBuffer> pBuffer - Buffer to be returned to corresponding Dataset Op
// Op calls this to get next Sample that contains all the sampleIds
// @param TensorRow to be returned to corresponding Dataset Op
// @param int32_t workerId - not meant to be used
// @return Status The status code returned
Status GetNextSample(TensorRow *out) override;
@@ -56,7 +56,7 @@ class PythonSamplerRT : public SamplerRT {
void SamplerPrint(std::ostream &out, bool show_all) const override;

private:
bool need_to_reset_; // Whether Reset() should be called before calling GetNextBuffer()
bool need_to_reset_; // Whether Reset() should be called before calling GetNextSample()

py::object py_sampler_instance; // The handle to the py_sampler python object
};


+ 2
- 2
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/random_sampler.cc View File

@@ -23,8 +23,8 @@
namespace mindspore {
namespace dataset {
RandomSamplerRT::RandomSamplerRT(int64_t num_samples, bool replacement, bool reshuffle_each_epoch,
int64_t samples_per_buffer)
: SamplerRT(num_samples, samples_per_buffer),
int64_t samples_per_tensor)
: SamplerRT(num_samples, samples_per_tensor),
seed_(GetSeed()),
replacement_(replacement),
next_id_(0),


+ 4
- 4
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/random_sampler.h View File

@@ -30,15 +30,15 @@ class RandomSamplerRT : public SamplerRT {
// @param int64_t num_samples - number samples to draw
// @param bool replacement - put he id back / or not after a sample
// @param reshuffle_each_epoch - T/F to reshuffle after epoch
// @param int64_t samples_per_buffer - Num of Sampler Ids to fetch via 1 GetNextBuffer call
// @param int64_t samples_per_tensor - Num of Sampler Ids to fetch via 1 GetNextSample call
RandomSamplerRT(int64_t num_samples, bool replacement, bool reshuffle_each_epoch,
int64_t samples_per_buffer = std::numeric_limits<int64_t>::max());
int64_t samples_per_tensor = std::numeric_limits<int64_t>::max());

// Destructor.
~RandomSamplerRT() = default;

// Op calls this to get next Buffer that contains all the sampleIds
// @param std::unique_ptr<DataBuffer> pBuffer - Buffer to be returned to StorageOp
// Op calls this to get next Sample that contains all the sampleIds
// @param TensorRow to be returned to StorageOp
// @param int32_t workerId - not meant to be used
// @return Status The status code returned
Status GetNextSample(TensorRow *out) override;


+ 4
- 4
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/sampler.cc View File

@@ -33,10 +33,10 @@ Status RandomAccessOp::GetNumRowsInDataset(int64_t *num) const {
return Status::OK();
}

SamplerRT::SamplerRT(int64_t num_samples, int64_t samples_per_buffer)
SamplerRT::SamplerRT(int64_t num_samples, int64_t samples_per_tensor)
: num_rows_(0),
num_samples_(num_samples),
samples_per_tensor_(samples_per_buffer),
samples_per_tensor_(samples_per_tensor),
col_desc_(nullptr),
is_initialized(false) {}

@@ -98,10 +98,10 @@ Status SamplerRT::GetAllIdsThenReset(py::array *data) {
RETURN_IF_NOT_OK(GetNextSample(&sample_row));
sample_ids = sample_row[0];

// check this buffer is not a ctrl buffer
// check this tensorRow is not a ctrl tensorRow
CHECK_FAIL_RETURN_UNEXPECTED(sample_row.Flags() == TensorRow::kFlagNone, "ERROR ctrl row received");

// perform error checking! Next buffer supposed to be EOE since last one already contains all ids for current epoch
// perform error checking! Next TensorRow supposed to be EOE since last one already contains all ids for current epoch
RETURN_IF_NOT_OK(GetNextSample(&sample_row));
CHECK_FAIL_RETURN_UNEXPECTED(sample_row.eoe(), "ERROR Non EOE received");
// Reset Sampler since this is the end of the epoch


+ 3
- 3
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/sampler.h View File

@@ -63,8 +63,8 @@ class SamplerRT {
// Constructor
// @param int64_t num_samples: the user-requested number of samples ids to generate. A value of 0
// indicates that the sampler should produce the complete set of ids.
// @param int64_t samplesPerBuffer: Num of Sampler Ids to fetch via 1 GetNextBuffer call
SamplerRT(int64_t num_samples, int64_t samples_per_buffer);
// @param int64_t samples_per_tensor: Num of Sampler Ids to fetch via 1 GetNextSample call
SamplerRT(int64_t num_samples, int64_t samples_per_tensor);

SamplerRT(const SamplerRT &s) : SamplerRT(s.num_samples_, s.samples_per_tensor_) {}

@@ -73,7 +73,7 @@ class SamplerRT {

// Get a list of sample ids.
// @note It is Sampler responsibility to make sure that the id is not out of bound.
// @param std::unique_ptr<DataBuffer> pBuffer - Buffer to be returned to StorageOp
// @param TensorRow to be returned to StorageOp
// @param int32_t workerId - not meant to be used
// @return Status The status code returned
virtual Status GetNextSample(TensorRow *out) = 0;


+ 5
- 5
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/sequential_sampler.cc View File

@@ -21,8 +21,8 @@

namespace mindspore {
namespace dataset {
SequentialSamplerRT::SequentialSamplerRT(int64_t num_samples, int64_t start_index, int64_t samples_per_buffer)
: SamplerRT(num_samples, samples_per_buffer), current_id_(start_index), start_index_(start_index), id_count_(0) {}
SequentialSamplerRT::SequentialSamplerRT(int64_t num_samples, int64_t start_index, int64_t samples_per_tensor)
: SamplerRT(num_samples, samples_per_tensor), current_id_(start_index), start_index_(start_index), id_count_(0) {}

Status SequentialSamplerRT::GetNextSample(TensorRow *out) {
if (id_count_ > num_samples_) {
@@ -36,8 +36,8 @@ Status SequentialSamplerRT::GetNextSample(TensorRow *out) {

std::shared_ptr<Tensor> sampleIds;

// Compute how many ids are left to pack, and pack this amount into a new buffer. Respect the setting for
// samples per buffer though.
// Compute how many ids are left to pack, and pack this amount into a new Tensor. Respect the setting for
// samples per Tensor though.
int64_t remaining_ids = num_samples_ - id_count_;
int64_t num_elements = std::min(remaining_ids, samples_per_tensor_);

@@ -82,7 +82,7 @@ Status SequentialSamplerRT::InitSampler() {
}
CHECK_FAIL_RETURN_UNEXPECTED(
(num_samples_ > 0 && samples_per_tensor_ > 0) || num_samples_ == 0,
"Invalid parameter, samples_per_buffer must be greater than 0, but got " + std::to_string(samples_per_tensor_));
"Invalid parameter, samples_per_tensor must be greater than 0, but got " + std::to_string(samples_per_tensor_));
samples_per_tensor_ = samples_per_tensor_ > num_samples_ ? num_samples_ : samples_per_tensor_;

is_initialized = true;


+ 4
- 4
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/sequential_sampler.h View File

@@ -29,9 +29,9 @@ class SequentialSamplerRT : public SamplerRT {
// @param num_samples - The number of samples to draw. A value of 0 indicates the sampler should produce the
// full amount of ids from the dataset
// @param start_index - The starting index value
// @param int64_t samplesPerBuffer - Num of Sampler Ids to fetch via 1 GetNextBuffer call
// @param int64_t samples_per_tensor - Num of Sampler Ids to fetch via 1 GetNextSample call
SequentialSamplerRT(int64_t num_samples, int64_t start_index,
int64_t samples_per_buffer = std::numeric_limits<int64_t>::max());
int64_t samples_per_tensor = std::numeric_limits<int64_t>::max());

// Destructor.
~SequentialSamplerRT() = default;
@@ -43,8 +43,8 @@ class SequentialSamplerRT : public SamplerRT {
// @return Status The status code returned
Status ResetSampler() override;

// Op calls this to get next Buffer that contains all the sampleIds
// @param std::unique_ptr<DataBuffer> pBuffer - Buffer to be returned to corresponding Dataset Op
// Op calls this to get next Sample that contains all the sampleIds
// @param TensorRow to be returned to corresponding Dataset Op
// @param int32_t workerId - not meant to be used
// @return Status The status code returned
Status GetNextSample(TensorRow *out) override;


+ 2
- 2
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/subset_random_sampler.cc View File

@@ -27,8 +27,8 @@ namespace mindspore {
namespace dataset {
// Constructor.
SubsetRandomSamplerRT::SubsetRandomSamplerRT(int64_t num_samples, const std::vector<int64_t> &indices,
int64_t samples_per_buffer)
: SubsetSamplerRT(num_samples, indices, samples_per_buffer) {}
int64_t samples_per_tensor)
: SubsetSamplerRT(num_samples, indices, samples_per_tensor) {}

// Initialized this Sampler.
Status SubsetRandomSamplerRT::InitSampler() {


+ 3
- 3
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/subset_random_sampler.h View File

@@ -31,10 +31,10 @@ class SubsetRandomSamplerRT : public SubsetSamplerRT {
/// Constructor.
/// \param num_samples The number of samples to draw. 0 for the full amount.
/// \param indices List of indices from where we will randomly draw samples.
/// \param samples_per_buffer The number of ids we draw on each call to GetNextBuffer().
/// When samples_per_buffer=0, GetNextBuffer() will draw all the sample ids and return them at once.
/// \param samples_per_tensor The number of ids we draw on each call to GetNextSample().
/// When samples_per_tensor=0, GetNextSample() will draw all the sample ids and return them at once.
SubsetRandomSamplerRT(int64_t num_samples, const std::vector<int64_t> &indices,
std::int64_t samples_per_buffer = std::numeric_limits<int64_t>::max());
std::int64_t samples_per_tensor = std::numeric_limits<int64_t>::max());

/// Destructor.
~SubsetRandomSamplerRT() = default;


+ 2
- 3
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/subset_sampler.cc View File

@@ -22,8 +22,8 @@
namespace mindspore {
namespace dataset {
// Constructor.
SubsetSamplerRT::SubsetSamplerRT(int64_t num_samples, const std::vector<int64_t> &indices, int64_t samples_per_buffer)
: SamplerRT(num_samples, samples_per_buffer), indices_(indices), sample_id_(0), buffer_id_(0) {}
SubsetSamplerRT::SubsetSamplerRT(int64_t num_samples, const std::vector<int64_t> &indices, int64_t samples_per_tensor)
: SamplerRT(num_samples, samples_per_tensor), indices_(indices), sample_id_(0) {}

// Initialized this Sampler.
Status SubsetSamplerRT::InitSampler() {
@@ -51,7 +51,6 @@ Status SubsetSamplerRT::InitSampler() {
Status SubsetSamplerRT::ResetSampler() {
// Reset the internal counters.
sample_id_ = 0;
buffer_id_ = 0;

if (HasChildSampler()) {
RETURN_IF_NOT_OK(child_[0]->ResetSampler());


+ 5
- 8
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/subset_sampler.h View File

@@ -30,10 +30,10 @@ class SubsetSamplerRT : public SamplerRT {
/// Constructor.
/// \param num_samples The number of elements to sample. 0 for the full amount.
/// \param indices List of indices.
/// \param samples_per_buffer The number of ids we draw on each call to GetNextBuffer().
/// When samples_per_buffer=0, GetNextBuffer() will draw all the sample ids and return them at once.
/// \param samples_per_tensor The number of ids we draw on each call to GetNextSample().
/// When samples_per_tensor=0, GetNextSample() will draw all the sample ids and return them at once.
SubsetSamplerRT(int64_t num_samples, const std::vector<int64_t> &indices,
std::int64_t samples_per_buffer = std::numeric_limits<int64_t>::max());
std::int64_t samples_per_tensor = std::numeric_limits<int64_t>::max());

/// Destructor.
~SubsetSamplerRT() = default;
@@ -47,8 +47,8 @@ class SubsetSamplerRT : public SamplerRT {
Status ResetSampler() override;

/// Get the sample ids.
/// \param[out] out The address of a unique_ptr to DataBuffer where the sample ids will be placed.
/// @note the sample ids (int64_t) will be placed in one Tensor and be placed into pBuffer.
/// \param[out] TensorRow where the sample ids will be placed.
/// @note the sample ids (int64_t) will be placed in one Tensor
Status GetNextSample(TensorRow *out) override;

/// Printer for debugging purposes.
@@ -74,9 +74,6 @@ class SubsetSamplerRT : public SamplerRT {
private:
/// Current sample id.
int64_t sample_id_;

/// Current buffer id.
int64_t buffer_id_;
};
} // namespace dataset
} // namespace mindspore


+ 3
- 8
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/weighted_random_sampler.cc View File

@@ -28,12 +28,8 @@ namespace mindspore {
namespace dataset {
// Constructor.
WeightedRandomSamplerRT::WeightedRandomSamplerRT(int64_t num_samples, const std::vector<double> &weights,
bool replacement, int64_t samples_per_buffer)
: SamplerRT(num_samples, samples_per_buffer),
weights_(weights),
replacement_(replacement),
sample_id_(0),
buffer_id_(0) {}
bool replacement, int64_t samples_per_tensor)
: SamplerRT(num_samples, samples_per_tensor), weights_(weights), replacement_(replacement), sample_id_(0) {}

// Initialized this Sampler.
Status WeightedRandomSamplerRT::InitSampler() {
@@ -50,7 +46,7 @@ Status WeightedRandomSamplerRT::InitSampler() {
"Invalid parameter, num_samples and num_rows must be greater than 0, but got num_rows: " +
std::to_string(num_rows_) + ", num_samples: " + std::to_string(num_samples_));
CHECK_FAIL_RETURN_UNEXPECTED(samples_per_tensor_ > 0,
"Invalid parameter, samples_per_buffer must be greater than 0, but got " +
"Invalid parameter, samples_per_tensor must be greater than 0, but got " +
std::to_string(samples_per_tensor_) + ".\n");

if (weights_.size() > static_cast<size_t>(num_rows_)) {
@@ -101,7 +97,6 @@ void WeightedRandomSamplerRT::InitOnePassSampling() {
// Reset the internal variable to the initial state and reshuffle the indices.
Status WeightedRandomSamplerRT::ResetSampler() {
sample_id_ = 0;
buffer_id_ = 0;
rand_gen_.seed(GetSeed());
if (!replacement_) {
InitOnePassSampling();


+ 5
- 8
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/weighted_random_sampler.h View File

@@ -32,10 +32,10 @@ class WeightedRandomSamplerRT : public SamplerRT {
// @param num_samples Number of samples to be drawn.
// @param weights A lift of sample weights.
// @param replacement Determine if samples are drawn with/without replacement.
// @param samples_per_buffer The number of ids we draw on each call to GetNextBuffer().
// When samplesPerBuffer=0, GetNextBuffer() will draw all the sample ids and return them at once.
// @param samples_per_tensor The number of ids we draw on each call to GetNextSample().
// When samples_per_tensor=0, GetNextSample() will draw all the sample ids and return them at once.
WeightedRandomSamplerRT(int64_t num_samples, const std::vector<double> &weights, bool replacement,
int64_t samples_per_buffer = std::numeric_limits<int64_t>::max());
int64_t samples_per_tensor = std::numeric_limits<int64_t>::max());

// Destructor.
~WeightedRandomSamplerRT() = default;
@@ -49,8 +49,8 @@ class WeightedRandomSamplerRT : public SamplerRT {
Status ResetSampler() override;

// Get the sample ids.
// @param[out] out_buffer The address of a unique_ptr to DataBuffer where the sample ids will be placed.
// @note the sample ids (int64_t) will be placed in one Tensor and be placed into pBuffer.
// @param[out] TensorRow where the sample ids will be placed.
// @note the sample ids (int64_t) will be placed in one Tensor
Status GetNextSample(TensorRow *out) override;

// Printer for debugging purposes.
@@ -73,9 +73,6 @@ class WeightedRandomSamplerRT : public SamplerRT {
// Current sample id.
int64_t sample_id_;

// Current buffer id.
int64_t buffer_id_;

// Random engine and device
std::mt19937 rand_gen_;



+ 2
- 2
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/text_file_op.cc View File

@@ -111,7 +111,7 @@ Status TextFileOp::Init() {

RETURN_IF_NOT_OK(ParallelOp::CreateWorkerConnector(worker_connector_size_));

jagged_buffer_connector_ = std::make_unique<JaggedConnector>(num_workers_, 1, worker_connector_size_);
jagged_rows_connector_ = std::make_unique<JaggedConnector>(num_workers_, 1, worker_connector_size_);
return Status::OK();
}

@@ -148,7 +148,7 @@ Status TextFileOp::LoadFile(const std::string &file, int64_t start_offset, int64
TensorRow tRow(1, nullptr);
tRow.setPath({file});
RETURN_IF_NOT_OK(LoadTensor(line, &tRow));
RETURN_IF_NOT_OK(jagged_buffer_connector_->Add(worker_id, std::move(tRow)));
RETURN_IF_NOT_OK(jagged_rows_connector_->Add(worker_id, std::move(tRow)));

rows_total++;
}


+ 1
- 1
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/text_file_op.h View File

@@ -174,7 +174,7 @@ class TextFileOp : public NonMappableLeafOp {
// @return Status - the error code returned.
Status LoadTensor(const std::string &line, TensorRow *out_row);

// Reads a text file and loads the data into multiple buffers.
// Reads a text file and loads the data into multiple TensorRows.
// @param file - the file to read.
// @param start_offset - the start offset of file.
// @param end_offset - the end offset of file.


+ 3
- 3
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/tf_reader_op.cc View File

@@ -181,7 +181,7 @@ Status TFReaderOp::Init() {
// parallel op base.
RETURN_IF_NOT_OK(ParallelOp::CreateWorkerConnector(worker_connector_size_));

jagged_buffer_connector_ = std::make_unique<JaggedConnector>(num_workers_, 1, worker_connector_size_);
jagged_rows_connector_ = std::make_unique<JaggedConnector>(num_workers_, 1, worker_connector_size_);

// temporary: make size large enough to hold all files + EOE to avoid hangs
int32_t safe_queue_size = static_cast<int32_t>(std::ceil(dataset_files_list_.size() / num_workers_)) + 1;
@@ -304,7 +304,7 @@ Status TFReaderOp::FillIOBlockNoShuffle() {
return Status::OK();
}

// Reads a tf_file file and loads the data into multiple buffers.
// Reads a tf_file file and loads the data into multiple TensorRows.
Status TFReaderOp::LoadFile(const std::string &filename, int64_t start_offset, int64_t end_offset, int32_t worker_id) {
std::ifstream reader;
reader.open(filename);
@@ -348,7 +348,7 @@ Status TFReaderOp::LoadFile(const std::string &filename, int64_t start_offset, i
newRow.setPath(file_path);
RETURN_IF_NOT_OK(LoadExample(&tf_file, &newRow));
rows_read++;
RETURN_IF_NOT_OK(jagged_buffer_connector_->Add(worker_id, std::move(newRow)));
RETURN_IF_NOT_OK(jagged_rows_connector_->Add(worker_id, std::move(newRow)));
}

// ignore crc footer


+ 1
- 1
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/tf_reader_op.h View File

@@ -219,7 +219,7 @@ class TFReaderOp : public NonMappableLeafOp {
static bool ValidateFirstRowCrc(const std::string &filename);

private:
// Reads a tf_file file and loads the data into multiple buffers.
// Reads a tf_file file and loads the data into multiple TensorRows.
// @param filename - the tf_file file to read.
// @param start_offset - the start offset of file.
// @param end_offset - the end offset of file.


+ 1
- 1
mindspore/ccsrc/minddata/dataset/engine/datasetops/take_op.cc View File

@@ -94,7 +94,7 @@ Status TakeOp::operator()() {
}

take_count_ = 0;
MS_LOG(DEBUG) << "Meet the end and push-back eof buffer.";
MS_LOG(DEBUG) << "Meet the end and push-back eof row.";
RETURN_IF_NOT_OK(out_connector_->SendEOF());
return Status::OK();
}


+ 1
- 1
mindspore/ccsrc/minddata/dataset/engine/datasetops/zip_op.cc View File

@@ -106,7 +106,7 @@ Status ZipOp::prepare() {
return Status::OK();
}

// fetches next zip buffer row (merged row)
// fetches next zipped (merged) row
Status ZipOp::getNextTensorRow(TensorRow *const new_zip_row) {
// iterate over all iterators and generate a row
for (int32_t i = 0; i < children_num_; ++i) {


+ 0
- 1
mindspore/ccsrc/minddata/dataset/engine/datasetops/zip_op.h View File

@@ -36,7 +36,6 @@ class ZipOp : public PipelineOp {
// the arguments for constructing it. Use the builder by setting each argument
// with the provided set methods, and then finally call the build method to execute
// the actual construction.
// NOTE: the rows per buffer with initial value 0 means to default to the number of rows from the first child

class Builder {
public:


+ 5
- 5
mindspore/ccsrc/minddata/dataset/engine/db_connector.h View File

@@ -34,14 +34,14 @@ class DbConnector : public Connector<TensorRow> {
// See Connector.h for more details.
// @param n_producers The number of threads producing data into this DbConnector.
// @param n_consumers The number of thread consuming data from this DbConnector.
// @param queue_capacity The number of element (DataBuffer) for each internal queue.
// @param queue_capacity The number of element (TensorRows) for each internal queue.
DbConnector(int32_t n_producers, int32_t n_consumers, int32_t queue_capacity)
: Connector<TensorRow>(n_producers, n_consumers, queue_capacity), end_of_file_(false) {}

// Destructor of DbConnector
~DbConnector() = default;

// Add a unique_ptr<DataBuffer> into the DbConnector.
// Add a TensorRow into the DbConnector.
// @note The caller of this add method should use std::move to pass the ownership to DbConnector.
// @param worker_id The id of a worker thread calling this method.
// @param el A rvalue reference to an element to be passed/added/pushed.
@@ -58,13 +58,13 @@ class DbConnector : public Connector<TensorRow> {
TensorRow eof = TensorRow(TensorRow::kFlagEOF);
return Add(std::move(eof), worker_id);
}
// Get a unique_ptr<DataBuffer> from the DbConnector.
// @note After the first EOF Buffer is encountered, subsequent pop()s will return EOF Buffer.
// Get a TensorRow from the DbConnector.
// @note After the first EOF row is encountered, subsequent pop()s will return EOF row.
// This will provide/propagate the EOF to all consumer threads of this Connector.
// Thus, When the num_consumers < num_producers, there will be extra EOF messages in some of the internal queues
// and reset() must be called before reusing DbConnector.
// @param worker_id The id of a worker thread calling this method.
// @param result The address of a unique_ptr<DataBuffer> where the popped element will be placed.
// @param result The address of a TensorRow where the popped element will be placed.
// @param retry_if_eoe A flag to allow the same thread invoke pop() again if the current pop returns eoe buffer.
Status PopWithRetry(int32_t worker_id, TensorRow *result, bool retry_if_eoe = false) noexcept {
if (result == nullptr) {


+ 6
- 6
mindspore/ccsrc/minddata/dataset/engine/perf/connector_throughput.cc View File

@@ -36,13 +36,13 @@ int ConnectorThroughput::InitNodes() {
}
// Sample action
Status ConnectorThroughput::Sample() {
std::vector<int64_t> out_buffer_count_row(n_nodes_);
std::vector<int64_t> out_row_count_row(n_nodes_);
std::vector<double> throughput_row(n_nodes_);
TimePoint cur_time; // initialised inside the loop, used outside the loop to update prev sample time.
auto col = 0;
for (const auto &node : *tree_) {
auto cur_out_buffer_count = node.ConnectorOutBufferCount();
out_buffer_count_row[col] = cur_out_buffer_count;
auto cur_out_rows_count = node.ConnectorOutRowsCount();
out_row_count_row[col] = cur_out_rows_count;
auto sz = timestamps_.size();
cur_time = std::chrono::steady_clock::now();
double dt = 0;
@@ -50,9 +50,9 @@ Status ConnectorThroughput::Sample() {
auto _dt = std::chrono::duration_cast<std::chrono::microseconds>(timestamps_[0][sz - 1] - timestamps_[0][sz - 2]);
dt = std::chrono::duration<double>(_dt).count();
}
auto prev_out_buffer_count = out_buffer_count_table_[col][out_buffer_count_table_.size() - 1];
auto prev_out_rows_count = out_row_count_table_[col][out_row_count_table_.size() - 1];
if (dt != 0) {
auto thr = (cur_out_buffer_count - prev_out_buffer_count) / (1000 * dt);
auto thr = (cur_out_rows_count - prev_out_rows_count) / (1000 * dt);
throughput_row[col] = thr;
} else {
throughput_row[col] = 0;
@@ -62,7 +62,7 @@ Status ConnectorThroughput::Sample() {
std::vector<TimePoint> v = {cur_time}; // temporary fix
timestamps_.AddSample(v);
// Push new row of sample
out_buffer_count_table_.AddSample(out_buffer_count_row);
out_row_count_table_.AddSample(out_row_count_row);
throughput_.AddSample(throughput_row);
return Status::OK();
}


+ 5
- 5
mindspore/ccsrc/minddata/dataset/engine/perf/connector_throughput.h View File

@@ -32,10 +32,10 @@ using json = nlohmann::json;
namespace mindspore {
namespace dataset {
// Connector throughput samples the output connector size of each op in the pipeline.
// For the description of the data structure see perf_buffer.h
// For the description of the data structure see perf_data.h
// It support JSON serialization for external usage.
class ConnectorThroughput : public Sampling {
using OutBufferCount = PerfData<CyclicArray<int64_t>>;
using OutRowCount = PerfData<CyclicArray<int64_t>>;
using Throughput = PerfData<CyclicArray<double>>;
using TimePoint = std::chrono::time_point<std::chrono::steady_clock>;
using TimeStamps = PerfData<CyclicArray<TimePoint>>;
@@ -45,11 +45,11 @@ class ConnectorThroughput : public Sampling {
: tree_(tree),
max_rows_(max_rows),
n_nodes_(InitNodes()),
out_buffer_count_table_(OutBufferCount(max_rows_, n_nodes_)),
out_row_count_table_(OutRowCount(max_rows_, n_nodes_)),
throughput_(Throughput(max_rows_, n_nodes_)),
timestamps_(TimeStamps(max_rows_, 1)) {
timestamps_.AddSample(std::vector<TimePoint>(1));
out_buffer_count_table_.AddSample(std::vector<int64_t>(n_nodes_));
out_row_count_table_.AddSample(std::vector<int64_t>(n_nodes_));
}

/// \brief Destructor
@@ -80,7 +80,7 @@ class ConnectorThroughput : public Sampling {
ExecutionTree *tree_ = nullptr; // ExecutionTree pointer
int64_t max_rows_;
int32_t n_nodes_;
OutBufferCount out_buffer_count_table_;
OutRowCount out_row_count_table_;
Throughput throughput_;
TimeStamps timestamps_;
std::string name_ = kConnectorThroughputSamplingName;


+ 2
- 2
tests/ut/cpp/dataset/image_folder_op_test.cc View File

@@ -239,12 +239,12 @@ TEST_F(MindDataTestImageFolderSampler, TestWeightedRandomSamplerImageFolder) {
// num samples to draw.
int64_t num_samples = 12;
int64_t total_samples = 44;
int64_t samples_per_buffer = 10;
int64_t samples_per_tensor = 10;
std::vector<double> weights(total_samples, std::rand() % 100);

// create sampler with replacement = replacement
std::shared_ptr<SamplerRT> sampler =
std::make_shared<WeightedRandomSamplerRT>(num_samples, weights, true, samples_per_buffer);
std::make_shared<WeightedRandomSamplerRT>(num_samples, weights, true, samples_per_tensor);

std::string folder_path = datasets_root_path_ + "/testPK/data";
auto tree = Build({ImageFolder(16, 2, 32, folder_path, false, std::move(sampler))});


+ 4
- 4
tests/ut/cpp/dataset/subset_random_sampler_test.cc View File

@@ -64,12 +64,12 @@ TEST_F(MindDataTestSubsetRandomSampler, TestAllAtOnce) {
ASSERT_EQ(row.eoe(), true);
}

TEST_F(MindDataTestSubsetRandomSampler, TestGetNextBuffer) {
TEST_F(MindDataTestSubsetRandomSampler, TestGetNextSample) {
int64_t total_samples = 100000 - 5;
int64_t samples_per_buffer = 10;
int64_t samples_per_tensor = 10;
int64_t num_samples = 0;
std::vector<int64_t> input(total_samples, 1);
SubsetRandomSamplerRT sampler(num_samples, input, samples_per_buffer);
SubsetRandomSamplerRT sampler(num_samples, input, samples_per_tensor);

DummyRandomAccessOp dummyRandomAccessOp(total_samples);
sampler.HandshakeRandomAccessOp(&dummyRandomAccessOp);
@@ -90,7 +90,7 @@ TEST_F(MindDataTestSubsetRandomSampler, TestGetNextBuffer) {
ASSERT_EQ(sampler.GetNextSample(&row), Status::OK());
}

ASSERT_EQ(epoch, (total_samples + samples_per_buffer - 1) / samples_per_buffer);
ASSERT_EQ(epoch, (total_samples + samples_per_tensor - 1) / samples_per_tensor);
ASSERT_EQ(input.size(), out.size());
}



+ 4
- 4
tests/ut/cpp/dataset/subset_sampler_test.cc View File

@@ -64,12 +64,12 @@ TEST_F(MindDataTestSubsetSampler, TestAllAtOnce) {
ASSERT_EQ(row.eoe(), true);
}

TEST_F(MindDataTestSubsetSampler, TestGetNextBuffer) {
TEST_F(MindDataTestSubsetSampler, TestGetNextSample) {
int64_t total_samples = 100000 - 5;
int64_t samples_per_buffer = 10;
int64_t samples_per_tensor = 10;
int64_t num_samples = 0;
std::vector<int64_t> input(total_samples, 1);
SubsetSamplerRT sampler(num_samples, input, samples_per_buffer);
SubsetSamplerRT sampler(num_samples, input, samples_per_tensor);

DummyRandomAccessOp dummyRandomAccessOp(total_samples);
sampler.HandshakeRandomAccessOp(&dummyRandomAccessOp);
@@ -91,7 +91,7 @@ TEST_F(MindDataTestSubsetSampler, TestGetNextBuffer) {
ASSERT_EQ(sampler.GetNextSample(&row), Status::OK());
}

ASSERT_EQ(epoch, (total_samples + samples_per_buffer - 1) / samples_per_buffer);
ASSERT_EQ(epoch, (total_samples + samples_per_tensor - 1) / samples_per_tensor);
ASSERT_EQ(input.size(), out.size());
}



+ 10
- 10
tests/ut/cpp/dataset/weighted_random_sampler_test.cc View File

@@ -27,9 +27,9 @@
#include <unordered_set>

using namespace mindspore::dataset;
using mindspore::MsLogLevel::INFO;
using mindspore::ExceptionType::NoExceptionType;
using mindspore::LogStream;
using mindspore::ExceptionType::NoExceptionType;
using mindspore::MsLogLevel::INFO;

class MindDataTestWeightedRandomSampler : public UT::Common {
public:
@@ -107,15 +107,15 @@ TEST_F(MindDataTestWeightedRandomSampler, TestOneshotNoReplacement) {
ASSERT_EQ(row.eoe(), true);
}

TEST_F(MindDataTestWeightedRandomSampler, TestGetNextBufferReplacement) {
TEST_F(MindDataTestWeightedRandomSampler, TestGetNextSampleReplacement) {
// num samples to draw.
uint64_t num_samples = 100;
uint64_t total_samples = 1000;
uint64_t samples_per_buffer = 10;
uint64_t samples_per_tensor = 10;
std::vector<double> weights(total_samples, std::rand() % 100);

// create sampler with replacement = replacement
WeightedRandomSamplerRT m_sampler(num_samples, weights, true, samples_per_buffer);
WeightedRandomSamplerRT m_sampler(num_samples, weights, true, samples_per_tensor);
DummyRandomAccessOp dummyRandomAccessOp(total_samples);
m_sampler.HandshakeRandomAccessOp(&dummyRandomAccessOp);

@@ -135,22 +135,22 @@ TEST_F(MindDataTestWeightedRandomSampler, TestGetNextBufferReplacement) {
ASSERT_EQ(m_sampler.GetNextSample(&row), Status::OK());
}

ASSERT_EQ(epoch, (num_samples + samples_per_buffer - 1) / samples_per_buffer);
ASSERT_EQ(epoch, (num_samples + samples_per_tensor - 1) / samples_per_tensor);
ASSERT_EQ(num_samples, out.size());
}

TEST_F(MindDataTestWeightedRandomSampler, TestGetNextBufferNoReplacement) {
TEST_F(MindDataTestWeightedRandomSampler, TestGetNextSampleNoReplacement) {
// num samples to draw.
uint64_t num_samples = 100;
uint64_t total_samples = 100;
uint64_t samples_per_buffer = 10;
uint64_t samples_per_tensor = 10;
std::vector<double> weights(total_samples, std::rand() % 100);
weights[1] = 0;
weights[2] = 0;
std::vector<uint64_t> freq(total_samples, 0);

// create sampler with replacement = replacement
WeightedRandomSamplerRT m_sampler(num_samples, weights, false, samples_per_buffer);
WeightedRandomSamplerRT m_sampler(num_samples, weights, false, samples_per_tensor);
DummyRandomAccessOp dummyRandomAccessOp(total_samples);
m_sampler.HandshakeRandomAccessOp(&dummyRandomAccessOp);

@@ -178,7 +178,7 @@ TEST_F(MindDataTestWeightedRandomSampler, TestGetNextBufferNoReplacement) {
}
}

ASSERT_EQ(epoch, (num_samples + samples_per_buffer - 1) / samples_per_buffer);
ASSERT_EQ(epoch, (num_samples + samples_per_tensor - 1) / samples_per_tensor);
ASSERT_EQ(num_samples, out.size());
}



Loading…
Cancel
Save