diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/CMakeLists.txt b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/CMakeLists.txt index c2394c1a42..a49fcfbb75 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/CMakeLists.txt +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/CMakeLists.txt @@ -14,6 +14,7 @@ set(DATASET_ENGINE_DATASETOPS_SOURCE_SRC_FILES clue_op.cc csv_op.cc album_op.cc + mappable_leaf_op.cc ) set(DATASET_ENGINE_DATASETOPS_SOURCE_SRC_FILES diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.cc index afafdf3b2f..f6a8d3ad7f 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.cc @@ -72,17 +72,15 @@ Status AlbumOp::Builder::SanityCheck() { AlbumOp::AlbumOp(int32_t num_wkrs, int32_t rows_per_buffer, std::string file_dir, int32_t queue_size, bool do_decode, const std::set &exts, std::unique_ptr data_schema, std::shared_ptr sampler) - : ParallelOp(num_wkrs, queue_size, std::move(sampler)), - rows_per_buffer_(rows_per_buffer), + : MappableLeafOp(num_wkrs, queue_size, std::move(sampler), rows_per_buffer), folder_path_(file_dir), decode_(do_decode), extensions_(exts), data_schema_(std::move(data_schema)), - row_cnt_(0), - buf_cnt_(0), sampler_ind_(0), dirname_offset_(0), - sample_ids_(nullptr) { + sample_ids_(nullptr), + curr_row_(0) { // Set the column name map (base class field) for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) { column_name_id_map_[data_schema_->column(i).name()] = i; @@ -131,97 +129,6 @@ Status AlbumOp::PrescanEntry() { return Status::OK(); } -// Main logic, Register Queue with TaskGroup, launch all threads and do the functor's work -Status AlbumOp::operator()() { - RETURN_IF_NOT_OK(this->PrescanEntry()); - RETURN_IF_NOT_OK(LaunchThreadsAndInitOp()); - std::unique_ptr sampler_buffer; - RETURN_IF_NOT_OK(sampler_->GetNextSample(&sampler_buffer)); - while (true) { // each iterator is 1 epoch - std::vector keys; - keys.reserve(rows_per_buffer_); - while (sampler_buffer->eoe() == false) { - TensorRow sample_row; - RETURN_IF_NOT_OK(sampler_buffer->PopRow(&sample_row)); - TensorPtr sample_ids = sample_row[0]; - for (auto itr = sample_ids->begin(); itr != sample_ids->end(); ++itr) { - if ((*itr) >= num_rows_) continue; // index out of bound, skipping - keys.push_back(*itr); - row_cnt_++; - if (row_cnt_ % rows_per_buffer_ == 0) { - RETURN_IF_NOT_OK( - io_block_queues_[buf_cnt_++ % num_workers_]->Add(std::make_unique(keys, IOBlock::kDeIoBlockNone))); - keys.clear(); - } - } - RETURN_IF_NOT_OK(sampler_->GetNextSample(&sampler_buffer)); - } - if (keys.empty() == false) { - RETURN_IF_NOT_OK( - io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::make_unique(keys, IOBlock::kDeIoBlockNone))); - } - if (IsLastIteration()) { - std::unique_ptr eoe_block = std::make_unique(IOBlock::kDeIoBlockFlagEoe); - std::unique_ptr eof_block = std::make_unique(IOBlock::kDeIoBlockFlagEof); - RETURN_IF_NOT_OK(io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::move(eoe_block))); - RETURN_IF_NOT_OK(io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::move(eof_block))); - for (int32_t i = 0; i < num_workers_; ++i) { - RETURN_IF_NOT_OK( - io_block_queues_[i]->Add(std::make_unique(std::vector(), IOBlock::kDeIoBlockNone))); - } - return Status::OK(); - } else { // not the last repeat. - RETURN_IF_NOT_OK( - io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::make_unique(IOBlock::kDeIoBlockFlagEoe))); - } - - if (epoch_sync_flag_) { - // If epoch_sync_flag_ is set, then master thread sleeps until all the worker threads have finished their job for - // the current epoch. - RETURN_IF_NOT_OK(WaitForWorkers()); - } - // If not the last repeat, self-reset and go to loop again. - if (!IsLastIteration()) { - RETURN_IF_NOT_OK(Reset()); - RETURN_IF_NOT_OK(sampler_->GetNextSample(&sampler_buffer)); - } - UpdateRepeatAndEpochCounter(); - } -} - -// contains the main logic of pulling a IOBlock from IOBlockQueue, load a buffer and push the buffer to out_connector_ -// IMPORTANT: 1 IOBlock produces 1 DataBuffer -Status AlbumOp::WorkerEntry(int32_t worker_id) { - TaskManager::FindMe()->Post(); - int64_t buffer_id = worker_id; - std::unique_ptr io_block; - RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&io_block)); - while (io_block != nullptr) { - if (io_block->wait() == true) { - // Sync io_block is a signal that master thread wants us to pause and sync with other workers. - // The last guy who comes to this sync point should reset the counter and wake up the master thread. - if (++num_workers_paused_ == num_workers_) { - wait_for_workers_post_.Set(); - } - } else if (io_block->eoe() == true) { - RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::make_unique(0, DataBuffer::kDeBFlagEOE))); - buffer_id = worker_id; - } else if (io_block->eof() == true) { - RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::make_unique(0, DataBuffer::kDeBFlagEOF))); - } else { - std::vector keys; - RETURN_IF_NOT_OK(io_block->GetKeys(&keys)); - if (keys.empty() == true) return Status::OK(); // empty key is a quit signal for workers - std::unique_ptr db = std::make_unique(buffer_id, DataBuffer::kDeBFlagNone); - RETURN_IF_NOT_OK(LoadBuffer(keys, &db)); - RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::move(db))); - buffer_id += num_workers_; - } - RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&io_block)); - } - RETURN_STATUS_UNEXPECTED("Unexpected nullptr received in worker."); -} - // Only support JPEG/PNG/GIF/BMP // Optimization: Could take in a tensor // This function does not return status because we want to just skip bad input, not crash @@ -443,7 +350,8 @@ Status AlbumOp::LoadIntTensor(const nlohmann::json &json_obj, uint32_t col_num, // to take a reference to a column descriptor? // the design of this class is to make the code more readable, forgoing minor performance gain like // getting rid of duplicated checks -Status AlbumOp::LoadTensorRow(row_id_type row_id, const std::string &file, TensorRow *row) { +Status AlbumOp::LoadTensorRow(row_id_type row_id, TensorRow *row) { + std::string file = image_rows_[row_id]; // testing here is to just print out file path (*row) = TensorRow(row_id, {}); MS_LOG(INFO) << "Image row file: " << file << "."; @@ -531,19 +439,6 @@ Status AlbumOp::loadColumnData(const std::string &file, int32_t index, nlohmann: } } -// Looping over LoadTensorRow to make 1 DataBuffer. 1 function call produces 1 buffer -Status AlbumOp::LoadBuffer(const std::vector &keys, std::unique_ptr *db) { - std::unique_ptr deq = std::make_unique(); - TensorRow trow; - - for (const int64_t &key : keys) { - RETURN_IF_NOT_OK(this->LoadTensorRow(key, image_rows_[key], &trow)); - deq->push_back(std::move(trow)); - } - (*db)->set_tensor_table(std::move(deq)); - return Status::OK(); -} - void AlbumOp::Print(std::ostream &out, bool show_all) const { // Always show the id and name as first line regardless if this summary or detailed print out << "(" << std::setw(2) << operator_id_ << ") :"; @@ -561,24 +456,12 @@ void AlbumOp::Print(std::ostream &out, bool show_all) const { } } -// Reset Sampler and wakeup Master thread (functor) -Status AlbumOp::Reset() { - MS_LOG(DEBUG) << Name() << " performing a self-reset."; - RETURN_IF_NOT_OK(sampler_->ResetSampler()); - row_cnt_ = 0; - return Status::OK(); -} - -// hand shake with Sampler, allow Sampler to call RandomAccessOp's functions to get NumRows -Status AlbumOp::InitSampler() { - RETURN_IF_NOT_OK(sampler_->HandshakeRandomAccessOp(this)); - return Status::OK(); -} - Status AlbumOp::LaunchThreadsAndInitOp() { if (tree_ == nullptr) { return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__, "Pipeline init failed, Execution tree not set."); } + RETURN_IF_NOT_OK(this->PrescanEntry()); + // registers QueueList and individual Queues for interrupt services RETURN_IF_NOT_OK(io_block_queues_.Register(tree_->AllTasks())); RETURN_IF_NOT_OK(wait_for_workers_post_.Register(tree_->AllTasks())); @@ -612,13 +495,13 @@ Status AlbumOp::GetNextRow(TensorRow *row) { RETURN_IF_NOT_OK(sample_buffer->PopRow(&sample_row)); sample_ids_ = sample_row[0]; } - if (row_cnt_ + 1 > sample_ids_->Size()) { + if (curr_row_ + 1 > sample_ids_->Size()) { return Status::OK(); } int64_t key; - sample_ids_->GetItemAt(&key, {row_cnt_}); - RETURN_IF_NOT_OK(LoadTensorRow(key, image_rows_[key], row)); - row_cnt_++; + RETURN_IF_NOT_OK(sample_ids_->GetItemAt(&key, {curr_row_})); + RETURN_IF_NOT_OK(LoadTensorRow(key, row)); + curr_row_++; return Status::OK(); } } // namespace dataset diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.h index 63491ad9f1..8174c749f3 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.h @@ -30,6 +30,7 @@ #include "minddata/dataset/engine/data_buffer.h" #include "minddata/dataset/engine/data_schema.h" #include "minddata/dataset/engine/datasetops/parallel_op.h" +#include "minddata/dataset/engine/datasetops/source/mappable_leaf_op.h" #include "minddata/dataset/engine/datasetops/source/sampler/sampler.h" #include "minddata/dataset/util/path.h" #include "minddata/dataset/util/queue.h" @@ -47,7 +48,7 @@ class Queue; using FolderImages = std::shared_ptr>>; /// \class AlbumOp album_op.h -class AlbumOp : public ParallelOp, public RandomAccessOp { +class AlbumOp : public MappableLeafOp { public: class Builder { public: @@ -171,17 +172,6 @@ class AlbumOp : public ParallelOp, public RandomAccessOp { /// \return Status The status code returned Status PrescanEntry(); - /// \brief Worker thread pulls a number of IOBlock from IOBlock Queue, make a buffer and push it to Connector - /// \param[in] int32_t workerId - id of each worker - /// \return Status The status code returned - Status WorkerEntry(int32_t worker_id) override; - - /// \brief Main Loop of AlbumOp - /// Master thread: Fill IOBlockQueue, then goes to sleep - /// Worker thread: pulls IOBlock from IOBlockQueue, work on it then put buffer to mOutConnector - /// \return Status The status code returned - Status operator()() override; - /// \brief A print method typically used for debugging /// \param[in] out /// \param[in] show_all @@ -197,10 +187,6 @@ class AlbumOp : public ParallelOp, public RandomAccessOp { std::string Name() const override { return "AlbumOp"; } private: - /// \brief Initialize Sampler, calls sampler->Init() within - /// \return Status The status code returned - Status InitSampler(); - /// \brief Load image to tensor row /// \param[in] image_file Image name of file /// \param[in] col_num Column num in schema @@ -265,10 +251,9 @@ class AlbumOp : public ParallelOp, public RandomAccessOp { /// \brief Load a tensor row according to a json file /// \param[in] row_id_type row_id - id for this tensor row - /// \param[in] ImageColumns file Json file location /// \param[in, out] TensorRow row Json content stored into a tensor row /// \return Status The status code returned - Status LoadTensorRow(row_id_type row_id, const std::string &file, TensorRow *row); + Status LoadTensorRow(row_id_type row_id, TensorRow *row) override; /// \brief Load a tensor column according to a json file /// \param[in] ImageColumns file Json file location @@ -278,23 +263,14 @@ class AlbumOp : public ParallelOp, public RandomAccessOp { /// \return Status The status code returned Status loadColumnData(const std::string &file, int32_t index, nlohmann::json js, TensorRow *row); - /// \param[in] const std::vector &keys Keys in ioblock - /// \param[in, out] std::unique_ptr db Databuffer to push to - /// \return Status The status code returned - Status LoadBuffer(const std::vector &keys, std::unique_ptr *db); - /// \brief Called first when function is called /// \return Status The status code returned - Status LaunchThreadsAndInitOp(); - - /// \brief reset Op - /// \return Status The status code returned - Status Reset() override; + Status LaunchThreadsAndInitOp() override; Status GetNextRow(TensorRow *row) override; - // Private function for computing the assignment of the column name map. - // @return Status The status code returned + /// Private function for computing the assignment of the column name map. + /// \return Status The status code returned Status ComputeColMap() override; int32_t rows_per_buffer_; @@ -303,12 +279,12 @@ class AlbumOp : public ParallelOp, public RandomAccessOp { std::set extensions_; // extensions allowed std::unordered_map col_name_map_; std::unique_ptr data_schema_; - int64_t row_cnt_; - int64_t buf_cnt_; int64_t sampler_ind_; int64_t dirname_offset_; std::vector image_rows_; TensorPtr sample_ids_; + + int32_t curr_row_; }; } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/celeba_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/celeba_op.cc index cca9584137..33ed0f1241 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/celeba_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/celeba_op.cc @@ -79,8 +79,7 @@ Status CelebAOp::Builder::SanityCheck() { CelebAOp::CelebAOp(int32_t num_workers, int32_t rows_per_buffer, const std::string &dir, int32_t queue_size, bool decode, const std::string &usage, const std::set &exts, std::unique_ptr schema, std::shared_ptr sampler) - : ParallelOp(num_workers, queue_size, std::move(sampler)), - rows_per_buffer_(rows_per_buffer), + : MappableLeafOp(num_workers, queue_size, std::move(sampler), rows_per_buffer), folder_path_(dir), decode_(decode), extensions_(exts), @@ -269,121 +268,8 @@ std::vector CelebAOp::Split(const std::string &line) { return split; } -// Main logic, Register Queue with TaskGroup, launch all threads and do the functor's work -Status CelebAOp::operator()() { - RETURN_IF_NOT_OK(LaunchThreadsAndInitOp()); - std::unique_ptr data_buffer; - RETURN_IF_NOT_OK(sampler_->GetNextSample(&data_buffer)); - RETURN_IF_NOT_OK(AddIOBlock(&data_buffer)); - return Status::OK(); -} - -Status CelebAOp::AddIOBlock(std::unique_ptr *data_buffer) { - int64_t buff_count = 0; - while (true) { - std::vector keys; - keys.reserve(rows_per_buffer_); - int64_t row_count = 0; - while (!(*data_buffer)->eoe()) { - TensorRow sample_row; - RETURN_IF_NOT_OK((*data_buffer)->PopRow(&sample_row)); - std::shared_ptr sample_ids = sample_row[0]; - for (auto itr = sample_ids->begin(); itr != sample_ids->end(); ++itr) { - if ((*itr) >= num_rows_) { - MS_LOG(WARNING) << "Sample Id (" << *itr << ") is out of bounds, skipping. Max id is " << num_rows_ << "."; - continue; - } - keys.push_back(*itr); - row_count++; - if (row_count % rows_per_buffer_ == 0) { - RETURN_IF_NOT_OK(io_block_queues_[buff_count++ % num_workers_]->Add( - std::make_unique(IOBlock(keys, IOBlock::kDeIoBlockNone)))); - keys.clear(); - } - } - RETURN_IF_NOT_OK(sampler_->GetNextSample(data_buffer)); - } - - if (!keys.empty()) { - RETURN_IF_NOT_OK(io_block_queues_[(buff_count++) % num_workers_]->Add( - std::make_unique(IOBlock(keys, IOBlock::kDeIoBlockNone)))); - } - if (IsLastIteration()) { - RETURN_IF_NOT_OK( - io_block_queues_[(buff_count++) % num_workers_]->Add(std::make_unique(IOBlock::kDeIoBlockFlagEoe))); - RETURN_IF_NOT_OK( - io_block_queues_[(buff_count++) % num_workers_]->Add(std::make_unique(IOBlock::kDeIoBlockFlagEof))); - for (int32_t i = 0; i < num_workers_; i++) { - RETURN_IF_NOT_OK( - io_block_queues_[i]->Add(std::make_unique(std::vector(), IOBlock::kDeIoBlockNone))); - } - return Status::OK(); - } else { // not the last repeat. - RETURN_IF_NOT_OK( - io_block_queues_[(buff_count++) % num_workers_]->Add(std::make_unique(IOBlock::kDeIoBlockFlagEoe))); - } - - if (epoch_sync_flag_) { - // If epoch_sync_flag_ is set, then master thread sleeps until all the worker threads have finished their job for - // the current epoch. - RETURN_IF_NOT_OK(WaitForWorkers()); - } - // If not the last repeat, self-reset and go to loop again. - if (!IsLastIteration()) { - RETURN_IF_NOT_OK(Reset()); - RETURN_IF_NOT_OK(sampler_->GetNextSample(data_buffer)); - } - UpdateRepeatAndEpochCounter(); - } -} - -Status CelebAOp::WorkerEntry(int32_t worker_id) { - TaskManager::FindMe()->Post(); - int64_t buffer_id = worker_id; - std::unique_ptr io_block; - RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&io_block)); - while (io_block != nullptr) { - if (io_block->wait() == true) { - // Sync io_block is a signal that master thread wants us to pause and sync with other workers. - // The last guy who comes to this sync point should reset the counter and wake up the master thread. - if (++num_workers_paused_ == num_workers_) { - wait_for_workers_post_.Set(); - } - } else if (io_block->eoe() == true) { - RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::make_unique(0, DataBuffer::kDeBFlagEOE))); - buffer_id = worker_id; - } else if (io_block->eof() == true) { - RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::make_unique(0, DataBuffer::kDeBFlagEOF))); - } else { - std::vector keys; - RETURN_IF_NOT_OK(io_block->GetKeys(&keys)); - if (keys.empty()) { - return Status::OK(); // empty key is a quit signal for workers - } - std::unique_ptr db = std::make_unique(buffer_id, DataBuffer::kDeBFlagNone); - RETURN_IF_NOT_OK(LoadBuffer(keys, &db)); - RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::move(db))); - buffer_id += num_workers_; - } - RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&io_block)); - } - return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__, "Unexpected nullptr received in worker."); -} - -Status CelebAOp::LoadBuffer(const std::vector &keys, std::unique_ptr *db) { - std::unique_ptr deq = std::make_unique(); - for (const auto &key : keys) { - TensorRow row; - RETURN_IF_NOT_OK(LoadTensorRow(key, image_labels_vec_[key], &row)); - deq->push_back(std::move(row)); - } - - (*db)->set_tensor_table(std::move(deq)); - return Status::OK(); -} - -Status CelebAOp::LoadTensorRow(row_id_type row_id, const std::pair> &image_label, - TensorRow *row) { +Status CelebAOp::LoadTensorRow(row_id_type row_id, TensorRow *row) { + std::pair> &image_label = image_labels_vec_[row_id]; std::shared_ptr image; std::shared_ptr label; @@ -432,13 +318,6 @@ void CelebAOp::Print(std::ostream &out, bool show_all) const { } } -// Reset Sampler and wakeup Master thread (functor) -Status CelebAOp::Reset() { - MS_LOG(DEBUG) << Name() << " performing a self-reset."; - RETURN_IF_NOT_OK(sampler_->ResetSampler()); - return Status::OK(); -} - Status CelebAOp::ComputeColMap() { // Set the column name map (base class field) if (column_name_id_map_.empty()) { diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/celeba_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/celeba_op.h index e1e0a644ed..c2375fc62a 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/celeba_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/celeba_op.h @@ -27,6 +27,7 @@ #include "minddata/dataset/util/status.h" #include "minddata/dataset/engine/data_schema.h" #include "minddata/dataset/engine/datasetops/parallel_op.h" +#include "minddata/dataset/engine/datasetops/source/mappable_leaf_op.h" #include "minddata/dataset/engine/datasetops/source/sampler/sampler.h" #include "minddata/dataset/util/queue.h" #include "minddata/dataset/engine/datasetops/source/io_block.h" @@ -41,7 +42,7 @@ namespace mindspore { namespace dataset { -class CelebAOp : public ParallelOp, RandomAccessOp { +class CelebAOp : public MappableLeafOp { public: class Builder { public: @@ -148,27 +149,11 @@ class CelebAOp : public ParallelOp, RandomAccessOp { ~CelebAOp() override = default; - // Main Loop of CelebAOp - // Master thread: Fill IOBlockQueue, then goes to sleep - // Worker thread: pulls IOBlock from IOBlockQueue, work on it then put buffer to mOutConnector - // @return Status The status code returned - Status operator()() override; - - // Worker thread pulls a number of IOBlock from IOBlock Queue, make a buffer and push it to Connector - // @param int32_t worker_id - id of each worker - // @return Status The status code returned - Status WorkerEntry(int32_t worker_id) override; - // A print method typically used for debugging // @param out // @param show_all void Print(std::ostream &out, bool show_all) const override; - // Method in operator(), to fill IOBlockQueue - // @param std::unique_ptr sampler_buffer - to fill IOBlockQueue - // @return Status The status code returned - Status AddIOBlock(std::unique_ptr *data_buffer); - // Op name getter // @return Name of the current Op std::string Name() const override { return "CelebAOp"; } @@ -176,7 +161,7 @@ class CelebAOp : public ParallelOp, RandomAccessOp { private: // Called first when function is called // @return - Status LaunchThreadsAndInitOp(); + Status LaunchThreadsAndInitOp() override; // Parse attribute file // @return @@ -191,32 +176,21 @@ class CelebAOp : public ParallelOp, RandomAccessOp { // @return std::vector - string after split std::vector Split(const std::string &line); - // @param const std::vector &keys - keys in ioblock - // @param std::unique_ptr db - // @return Status The status code returned - Status LoadBuffer(const std::vector &keys, std::unique_ptr *db); - // Load a tensor row according to a pair // @param row_id_type row_id - id for this tensor row // @param std::pair - > // @param TensorRow row - image & label read into this tensor row // @return Status The status code returned - Status LoadTensorRow(row_id_type row_id, const std::pair> &image_label, - TensorRow *row); + Status LoadTensorRow(row_id_type row_id, TensorRow *row) override; // Check if need read according to dataset type // @return bool - if need read bool CheckDatasetTypeValid(); - // reset Op - // @return Status The status code returned - Status Reset() override; - // Private function for computing the assignment of the column name map. // @return - Status Status ComputeColMap() override; - int32_t rows_per_buffer_; std::string folder_path_; // directory of celeba folder bool decode_; std::set extensions_; // extensions allowed diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/cifar_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/cifar_op.cc index 9c56a90d4b..585b22547b 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/cifar_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/cifar_op.cc @@ -88,76 +88,16 @@ Status CifarOp::Builder::SanityCheck() { CifarOp::CifarOp(CifarType type, const std::string &usage, int32_t num_works, int32_t rows_per_buf, const std::string &file_dir, int32_t queue_size, std::unique_ptr data_schema, std::shared_ptr sampler) - : ParallelOp(num_works, queue_size, std::move(sampler)), + : MappableLeafOp(num_works, queue_size, std::move(sampler), rows_per_buf), cifar_type_(type), usage_(usage), - rows_per_buffer_(rows_per_buf), folder_path_(file_dir), - data_schema_(std::move(data_schema)), - row_cnt_(0), - buf_cnt_(0) { + data_schema_(std::move(data_schema)) { constexpr uint64_t kUtilQueueSize = 512; cifar_raw_data_block_ = std::make_unique>>(kUtilQueueSize); io_block_queues_.Init(num_workers_, queue_size); } -// Main logic, Register Queue with TaskGroup, launch all threads and do the functor's work -Status CifarOp::operator()() { - RETURN_IF_NOT_OK(LaunchThreadsAndInitOp()); - std::unique_ptr sampler_buffer; - RETURN_IF_NOT_OK(sampler_->GetNextSample(&sampler_buffer)); - while (true) { // each iterator is 1 epoch - std::vector keys; - keys.reserve(rows_per_buffer_); - while (sampler_buffer->eoe() == false) { - TensorRow sample_row; - RETURN_IF_NOT_OK(sampler_buffer->PopRow(&sample_row)); - std::shared_ptr sample_ids = sample_row[0]; - for (auto itr = sample_ids->begin(); itr != sample_ids->end(); itr++) { - keys.push_back(*itr); - row_cnt_++; - if ((*itr) >= num_rows_) continue; // index out of bound, skipping - if (row_cnt_ % rows_per_buffer_ == 0) { - RETURN_IF_NOT_OK(io_block_queues_[buf_cnt_++ % num_workers_]->Add( - std::make_unique(IOBlock(keys, IOBlock::kDeIoBlockNone)))); - keys.clear(); - } - } - RETURN_IF_NOT_OK(sampler_->GetNextSample(&sampler_buffer)); - } - if (keys.empty() == false) { - RETURN_IF_NOT_OK(io_block_queues_[(buf_cnt_++) % num_workers_]->Add( - std::make_unique(IOBlock(keys, IOBlock::kDeIoBlockNone)))); - } - if (IsLastIteration()) { - RETURN_IF_NOT_OK( - io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::make_unique(IOBlock::kDeIoBlockFlagEoe))); - RETURN_IF_NOT_OK( - io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::make_unique(IOBlock::kDeIoBlockFlagEof))); - for (int32_t i = 0; i < num_workers_; i++) { - RETURN_IF_NOT_OK( - io_block_queues_[i]->Add(std::make_unique(std::vector(), IOBlock::kDeIoBlockNone))); - } - return Status::OK(); - } else { // not the last repeat. - RETURN_IF_NOT_OK( - io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::make_unique(IOBlock::kDeIoBlockFlagEoe))); - } - - if (epoch_sync_flag_) { - // If epoch_sync_flag_ is set, then master thread sleeps until all the worker threads have finished their job for - // the current epoch. - RETURN_IF_NOT_OK(WaitForWorkers()); - } - // If not the last repeat, self-reset and go to loop again. - if (!IsLastIteration()) { - RETURN_IF_NOT_OK(Reset()); - RETURN_IF_NOT_OK(sampler_->GetNextSample(&sampler_buffer)); - } - UpdateRepeatAndEpochCounter(); - } -} - Status CifarOp::LaunchThreadsAndInitOp() { if (tree_ == nullptr) { RETURN_STATUS_UNEXPECTED("Pipeline init failed, Execution tree not set."); @@ -175,43 +115,8 @@ Status CifarOp::LaunchThreadsAndInitOp() { return Status::OK(); } -// contains the main logic of pulling a IOBlock from IOBlockQueue, load a buffer and push the buffer to out_connector_ -// IMPORTANT: 1 IOBlock produces 1 DataBuffer -Status CifarOp::WorkerEntry(int32_t worker_id) { - TaskManager::FindMe()->Post(); - int64_t buffer_id = worker_id; - std::unique_ptr io_block; - RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&io_block)); - while (io_block != nullptr) { - if (io_block->wait() == true) { - // Sync io_block is a signal that master thread wants us to pause and sync with other workers. - // The last guy who comes to this sync point should reset the counter and wake up the master thread. - if (++num_workers_paused_ == num_workers_) { - wait_for_workers_post_.Set(); - } - } else if (io_block->eoe() == true) { - RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::make_unique(0, DataBuffer::kDeBFlagEOE))); - buffer_id = worker_id; - } else if (io_block->eof() == true) { - RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::make_unique(0, DataBuffer::kDeBFlagEOF))); - } else { - std::vector keys; - RETURN_IF_NOT_OK(io_block->GetKeys(&keys)); - if (keys.empty() == true) { - return Status::OK(); // empty key is a quit signal for workers - } - std::unique_ptr db = std::make_unique(buffer_id, DataBuffer::kDeBFlagNone); - RETURN_IF_NOT_OK(LoadBuffer(keys, &db)); - RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::move(db))); - buffer_id += num_workers_; - } - RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&io_block)); - } - RETURN_STATUS_UNEXPECTED("Unexpected nullptr received in worker."); -} - // Load 1 TensorRow (image,label). 1 function call produces 1 TensorTow in a DataBuffer -Status CifarOp::LoadTensorRow(uint64_t index, TensorRow *trow) { +Status CifarOp::LoadTensorRow(row_id_type index, TensorRow *trow) { std::shared_ptr label; std::shared_ptr fine_label; std::shared_ptr ori_image = cifar_image_label_pairs_[index].first; @@ -234,18 +139,6 @@ Status CifarOp::LoadTensorRow(uint64_t index, TensorRow *trow) { return Status::OK(); } -// Looping over LoadTensorRow to make 1 DataBuffer. 1 function call produces 1 buffer -Status CifarOp::LoadBuffer(const std::vector &keys, std::unique_ptr *db) { - std::unique_ptr deq = std::make_unique(); - for (const int64_t &key : keys) { - TensorRow trow; - RETURN_IF_NOT_OK(LoadTensorRow(key, &trow)); - deq->push_back(std::move(trow)); - } - (*db)->set_tensor_table(std::move(deq)); - return Status::OK(); -} - void CifarOp::Print(std::ostream &out, bool show_all) const { if (!show_all) { // Call the super class for displaying any common 1-liner info @@ -260,20 +153,6 @@ void CifarOp::Print(std::ostream &out, bool show_all) const { } } -// Reset Sampler and wakeup Master thread (functor) -Status CifarOp::Reset() { - MS_LOG(DEBUG) << Name() << " performing a self-reset."; - RETURN_IF_NOT_OK(sampler_->ResetSampler()); - row_cnt_ = 0; - return Status::OK(); -} - -// hand shake with Sampler, allow Sampler to call RandomAccessOp's functions to get NumRows -Status CifarOp::InitSampler() { - RETURN_IF_NOT_OK(sampler_->HandshakeRandomAccessOp(this)); - return Status::OK(); -} - Status CifarOp::ReadCifarBlockDataAsync() { TaskManager::FindMe()->Post(); RETURN_IF_NOT_OK(GetCifarFiles()); diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/cifar_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/cifar_op.h index abbb5da7c2..ff80d6c104 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/cifar_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/cifar_op.h @@ -26,6 +26,7 @@ #include "minddata/dataset/engine/data_buffer.h" #include "minddata/dataset/engine/data_schema.h" #include "minddata/dataset/engine/datasetops/parallel_op.h" +#include "minddata/dataset/engine/datasetops/source/mappable_leaf_op.h" #include "minddata/dataset/engine/datasetops/source/sampler/sampler.h" #include "minddata/dataset/util/path.h" #include "minddata/dataset/util/queue.h" @@ -35,7 +36,7 @@ namespace mindspore { namespace dataset { -class CifarOp : public ParallelOp, public RandomAccessOp { +class CifarOp : public MappableLeafOp { public: enum CifarType { kCifar10, kCifar100 }; @@ -142,17 +143,6 @@ class CifarOp : public ParallelOp, public RandomAccessOp { // Destructor. ~CifarOp() = default; - // Worker thread pulls a number of IOBlock from IOBlock Queue, make a buffer and push it to Connector - // @param uint32_t workerId - id of each worker - // @return Status The status code returned - Status WorkerEntry(int32_t worker_id) override; - - // Main Loop of CifarOp - // Master thread: Fill IOBlockQueue, then goes to sleep - // Worker thread: pulls IOBlock from IOBlockQueue, work on it then put buffer to mOutConnector - // @return Status The status code returned - Status operator()() override; - // A print method typically used for debugging // @param out // @param show_all @@ -170,32 +160,20 @@ class CifarOp : public ParallelOp, public RandomAccessOp { std::string Name() const override { return "CifarOp"; } private: - // Initialize Sampler, calls sampler->Init() within - // @return Status The status code returned - Status InitSampler(); - // Load a tensor row according to a pair // @param uint64_t index - index need to load // @param TensorRow row - image & label read into this tensor row // @return Status The status code returned - Status LoadTensorRow(uint64_t index, TensorRow *row); - - // @param const std::vector &keys - keys in ioblock - // @param std::unique_ptr db - // @return Status The status code returned - Status LoadBuffer(const std::vector &keys, std::unique_ptr *db); + Status LoadTensorRow(row_id_type index, TensorRow *trow) override; + private: // Read block data from cifar file // @return Status ReadCifarBlockDataAsync(); // Called first when function is called // @return - Status LaunchThreadsAndInitOp(); - - // reset Op - // @return Status The status code returned - Status Reset() override; + Status LaunchThreadsAndInitOp() override; // Get cifar files in dir // @return @@ -223,12 +201,9 @@ class CifarOp : public ParallelOp, public RandomAccessOp { Status ComputeColMap() override; CifarType cifar_type_; - int32_t rows_per_buffer_; std::string folder_path_; std::unique_ptr data_schema_; - int64_t row_cnt_; - int64_t buf_cnt_; const std::string usage_; // can only be either "train" or "test" std::unique_ptr>> cifar_raw_data_block_; std::vector cifar_files_; diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/coco_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/coco_op.cc index 91892b49ee..c2dfaf6ec8 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/coco_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/coco_op.cc @@ -124,82 +124,15 @@ Status CocoOp::Builder::SanityCheck() { CocoOp::CocoOp(const TaskType &task_type, const std::string &image_folder_path, const std::string &annotation_path, int32_t num_workers, int32_t rows_per_buffer, int32_t queue_size, bool decode, std::unique_ptr data_schema, std::shared_ptr sampler) - : ParallelOp(num_workers, queue_size, std::move(sampler)), + : MappableLeafOp(num_workers, queue_size, std::move(sampler), rows_per_buffer), decode_(decode), - row_cnt_(0), - buf_cnt_(0), task_type_(task_type), image_folder_path_(image_folder_path), annotation_path_(annotation_path), - rows_per_buffer_(rows_per_buffer), data_schema_(std::move(data_schema)) { io_block_queues_.Init(num_workers_, queue_size); } -Status CocoOp::TraverseSampleIds(const std::shared_ptr &sample_ids, std::vector *keys) { - for (auto itr = sample_ids->begin(); itr != sample_ids->end(); ++itr) { - if ((*itr) > num_rows_) continue; - keys->push_back(*itr); - row_cnt_++; - if (row_cnt_ % rows_per_buffer_ == 0) { - RETURN_IF_NOT_OK(io_block_queues_[buf_cnt_++ % num_workers_]->Add( - std::make_unique(IOBlock(*keys, IOBlock::kDeIoBlockNone)))); - keys->clear(); - } - } - return Status::OK(); -} - -Status CocoOp::operator()() { - RETURN_IF_NOT_OK(LaunchThreadsAndInitOp()); - std::unique_ptr sampler_buffer; - RETURN_IF_NOT_OK(sampler_->GetNextSample(&sampler_buffer)); - while (true) { - std::vector keys; - keys.reserve(rows_per_buffer_); - while (sampler_buffer->eoe() == false) { - std::shared_ptr sample_ids; - RETURN_IF_NOT_OK(sampler_buffer->GetTensor(&sample_ids, 0, 0)); - if (sample_ids->type() != DataType(DataType::DE_INT64)) { - RETURN_STATUS_UNEXPECTED("Invalid parameter, data type of Sampler Tensor isn't int64, got " + - sample_ids->type().ToString()); - } - RETURN_IF_NOT_OK(TraverseSampleIds(sample_ids, &keys)); - RETURN_IF_NOT_OK(sampler_->GetNextSample(&sampler_buffer)); - } - if (keys.empty() == false) { - RETURN_IF_NOT_OK(io_block_queues_[(buf_cnt_++) % num_workers_]->Add( - std::make_unique(IOBlock(keys, IOBlock::kDeIoBlockNone)))); - } - if (IsLastIteration()) { - std::unique_ptr eoe_block = std::make_unique(IOBlock::kDeIoBlockFlagEoe); - std::unique_ptr eof_block = std::make_unique(IOBlock::kDeIoBlockFlagEof); - RETURN_IF_NOT_OK(io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::move(eoe_block))); - RETURN_IF_NOT_OK(io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::move(eof_block))); - for (int32_t i = 0; i < num_workers_; i++) { - RETURN_IF_NOT_OK( - io_block_queues_[i]->Add(std::make_unique(std::vector(), IOBlock::kDeIoBlockNone))); - } - return Status::OK(); - } else { - RETURN_IF_NOT_OK( - io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::make_unique(IOBlock::kDeIoBlockFlagEoe))); - } - - if (epoch_sync_flag_) { - // If epoch_sync_flag_ is set, then master thread sleeps until all the worker threads have finished their job for - // the current epoch. - RETURN_IF_NOT_OK(WaitForWorkers()); - } - // If not the last repeat, self-reset and go to loop again. - if (!IsLastIteration()) { - RETURN_IF_NOT_OK(Reset()); - RETURN_IF_NOT_OK(sampler_->GetNextSample(&sampler_buffer)); - } - UpdateRepeatAndEpochCounter(); - } -} - void CocoOp::Print(std::ostream &out, bool show_all) const { if (!show_all) { // Call the super class for displaying any common 1-liner info @@ -215,14 +148,8 @@ void CocoOp::Print(std::ostream &out, bool show_all) const { } } -Status CocoOp::Reset() { - MS_LOG(DEBUG) << Name() << " performing a self-reset."; - RETURN_IF_NOT_OK(sampler_->ResetSampler()); - row_cnt_ = 0; - return Status::OK(); -} - -Status CocoOp::LoadTensorRow(row_id_type row_id, const std::string &image_id, TensorRow *trow) { +Status CocoOp::LoadTensorRow(row_id_type row_id, TensorRow *trow) { + std::string image_id = image_ids_[row_id]; std::shared_ptr image, coordinate; auto itr = coordinate_map_.find(image_id); if (itr == coordinate_map_.end()) { @@ -374,48 +301,6 @@ Status CocoOp::LoadMixTensorRow(row_id_type row_id, const std::string &image_id, return Status::OK(); } -Status CocoOp::LoadBuffer(const std::vector &keys, std::unique_ptr *db) { - std::unique_ptr deq = std::make_unique(); - TensorRow trow; - for (const int64_t &key : keys) { - RETURN_IF_NOT_OK(this->LoadTensorRow(key, image_ids_[key], &trow)); - deq->push_back(std::move(trow)); - } - (*db)->set_tensor_table(std::move(deq)); - return Status::OK(); -} - -Status CocoOp::WorkerEntry(int32_t worker_id) { - TaskManager::FindMe()->Post(); - int64_t buffer_id = worker_id; - std::unique_ptr io_block; - RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&io_block)); - while (io_block != nullptr) { - if (io_block->wait() == true) { - // Sync io_block is a signal that master thread wants us to pause and sync with other workers. - // The last guy who comes to this sync point should reset the counter and wake up the master thread. - if (++num_workers_paused_ == num_workers_) { - wait_for_workers_post_.Set(); - } - } else if (io_block->eoe() == true) { - RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::make_unique(0, DataBuffer::kDeBFlagEOE))); - buffer_id = worker_id; - } else if (io_block->eof() == true) { - RETURN_IF_NOT_OK(out_connector_->Add(worker_id, (std::make_unique(0, DataBuffer::kDeBFlagEOF)))); - } else { - std::vector keys; - RETURN_IF_NOT_OK(io_block->GetKeys(&keys)); - if (keys.empty() == true) return Status::OK(); - std::unique_ptr db = std::make_unique(buffer_id, DataBuffer::kDeBFlagNone); - RETURN_IF_NOT_OK(LoadBuffer(keys, &db)); - RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::move(db))); - buffer_id += num_workers_; - } - RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&io_block)); - } - RETURN_STATUS_UNEXPECTED("Unexpected nullptr received in worker"); -} - template Status CocoOp::SearchNodeInJson(const nlohmann::json &input_tree, std::string node_name, T *output_node) { auto node = input_tree.find(node_name); @@ -627,11 +512,6 @@ Status CocoOp::CategoriesColumnLoad(const nlohmann::json &categories_tree) { return Status::OK(); } -Status CocoOp::InitSampler() { - RETURN_IF_NOT_OK(sampler_->HandshakeRandomAccessOp(this)); - return Status::OK(); -} - Status CocoOp::LaunchThreadsAndInitOp() { if (tree_ == nullptr) { RETURN_STATUS_UNEXPECTED("Pipeline init failed, Execution tree not set."); diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/coco_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/coco_op.h index e152cbef8f..bd6cd99adb 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/coco_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/coco_op.h @@ -27,6 +27,7 @@ #include "minddata/dataset/engine/data_buffer.h" #include "minddata/dataset/engine/data_schema.h" #include "minddata/dataset/engine/datasetops/parallel_op.h" +#include "minddata/dataset/engine/datasetops/source/mappable_leaf_op.h" #include "minddata/dataset/engine/datasetops/source/sampler/sampler.h" #ifndef ENABLE_ANDROID #include "minddata/dataset/kernels/image/image_utils.h" @@ -46,7 +47,7 @@ class Queue; using CoordinateRow = std::vector>; -class CocoOp : public ParallelOp, public RandomAccessOp { +class CocoOp : public MappableLeafOp { public: enum class TaskType { Detection = 0, Stuff = 1, Panoptic = 2, Keypoint = 3 }; @@ -171,17 +172,6 @@ class CocoOp : public ParallelOp, public RandomAccessOp { // Destructor ~CocoOp() = default; - // Worker thread pulls a number of IOBlock from IOBlock Queue, make a buffer and push it to Connector - // @param int32_t workerId - id of each worker - // @return Status The status code returned - Status WorkerEntry(int32_t worker_id) override; - - // Main Loop of CocoOp - // Master thread: Fill IOBlockQueue, then goes to sleep - // Worker thread: pulls IOBlock from IOBlockQueue, work on it the put buffer to mOutConnector - // @return Status The status code returned - Status operator()() override; - // A print method typically used for debugging // @param out // @param show_all @@ -212,16 +202,12 @@ class CocoOp : public ParallelOp, public RandomAccessOp { Status GetClassIndexing(std::vector>> *output_class_indexing) override; private: - // Initialize Sampler, calls sampler->Init() within - // @return Status The status code returned - Status InitSampler(); - // Load a tensor row according to image id // @param row_id_type row_id - id for this tensor row // @param std::string image_id - image id // @param TensorRow row - image & target read into this tensor row // @return Status The status code returned - Status LoadTensorRow(row_id_type row_id, const std::string &image_id, TensorRow *row); + Status LoadTensorRow(row_id_type row_id, TensorRow *row) override; // Load a tensor row with vector which a vector to a tensor // @param row_id_type row_id - id for this tensor row @@ -259,27 +245,13 @@ class CocoOp : public ParallelOp, public RandomAccessOp { // @return Status The status code returned Status ReadImageToTensor(const std::string &path, const ColDescriptor &col, std::shared_ptr *tensor); - // @param const std::vector &keys - keys in ioblock - // @param std::unique_ptr db - // @return Status The status code returned - Status LoadBuffer(const std::vector &keys, std::unique_ptr *db); - // Read annotation from Annotation folder // @return Status The status code returned Status ParseAnnotationIds(); - // @param const std::shared_ptr &sample_ids - sample ids of tensor - // @param std::vector *keys - image id - // @return Status The status code returned - Status TraverseSampleIds(const std::shared_ptr &sample_ids, std::vector *keys); - // Called first when function is called // @return Status The status code returned - Status LaunchThreadsAndInitOp(); - - // Reset dataset state - // @return Status The status code returned - Status Reset() override; + Status LaunchThreadsAndInitOp() override; // @param nlohmann::json image_tree - image tree of json // @param std::vector *image_vec - image id list of json @@ -323,12 +295,9 @@ class CocoOp : public ParallelOp, public RandomAccessOp { Status ComputeColMap() override; bool decode_; - int64_t row_cnt_; - int64_t buf_cnt_; std::string image_folder_path_; std::string annotation_path_; TaskType task_type_; - int32_t rows_per_buffer_; std::unique_ptr data_schema_; std::vector image_ids_; diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/image_folder_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/image_folder_op.cc index 2a969c1f49..81b56c82da 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/image_folder_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/image_folder_op.cc @@ -68,16 +68,13 @@ ImageFolderOp::ImageFolderOp(int32_t num_wkrs, int32_t rows_per_buffer, std::str bool recursive, bool do_decode, const std::set &exts, const std::map &map, std::unique_ptr data_schema, std::shared_ptr sampler) - : ParallelOp(num_wkrs, queue_size, std::move(sampler)), - rows_per_buffer_(rows_per_buffer), + : MappableLeafOp(num_wkrs, queue_size, std::move(sampler), rows_per_buffer), folder_path_(file_dir), recursive_(recursive), decode_(do_decode), extensions_(exts), class_index_(map), data_schema_(std::move(data_schema)), - row_cnt_(0), - buf_cnt_(0), sampler_ind_(0), dirname_offset_(0) { folder_name_queue_ = std::make_unique>(num_wkrs * queue_size); @@ -125,98 +122,9 @@ Status ImageFolderOp::PrescanMasterEntry(const std::string &filedir) { return Status::OK(); } -// Main logic, Register Queue with TaskGroup, launch all threads and do the functor's work -Status ImageFolderOp::operator()() { - RETURN_IF_NOT_OK(LaunchThreadsAndInitOp()); - std::unique_ptr sampler_buffer; - RETURN_IF_NOT_OK(sampler_->GetNextSample(&sampler_buffer)); - while (true) { // each iterator is 1 epoch - std::vector keys; - keys.reserve(rows_per_buffer_); - while (sampler_buffer->eoe() == false) { - TensorRow sample_row; - RETURN_IF_NOT_OK(sampler_buffer->PopRow(&sample_row)); - std::shared_ptr sample_ids = sample_row[0]; - for (auto itr = sample_ids->begin(); itr != sample_ids->end(); ++itr) { - if ((*itr) >= num_rows_) continue; // index out of bound, skipping - keys.push_back(*itr); - row_cnt_++; - if (row_cnt_ % rows_per_buffer_ == 0) { - RETURN_IF_NOT_OK( - io_block_queues_[buf_cnt_++ % num_workers_]->Add(std::make_unique(keys, IOBlock::kDeIoBlockNone))); - keys.clear(); - } - } - RETURN_IF_NOT_OK(sampler_->GetNextSample(&sampler_buffer)); - } - if (keys.empty() == false) { - RETURN_IF_NOT_OK( - io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::make_unique(keys, IOBlock::kDeIoBlockNone))); - } - if (IsLastIteration()) { - std::unique_ptr eoe_block = std::make_unique(IOBlock::kDeIoBlockFlagEoe); - std::unique_ptr eof_block = std::make_unique(IOBlock::kDeIoBlockFlagEof); - RETURN_IF_NOT_OK(io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::move(eoe_block))); - RETURN_IF_NOT_OK(io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::move(eof_block))); - for (int32_t i = 0; i < num_workers_; ++i) { - RETURN_IF_NOT_OK( - io_block_queues_[i]->Add(std::make_unique(std::vector(), IOBlock::kDeIoBlockNone))); - } - return Status::OK(); - } else { // not the last repeat. - RETURN_IF_NOT_OK( - io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::make_unique(IOBlock::kDeIoBlockFlagEoe))); - } - - if (epoch_sync_flag_) { - // If epoch_sync_flag_ is set, then master thread sleeps until all the worker threads have finished their job for - // the current epoch. - RETURN_IF_NOT_OK(WaitForWorkers()); - } - // If not the last repeat, self-reset and go to loop again. - if (!IsLastIteration()) { - RETURN_IF_NOT_OK(Reset()); - RETURN_IF_NOT_OK(sampler_->GetNextSample(&sampler_buffer)); - } - UpdateRepeatAndEpochCounter(); - } -} - -// contains the main logic of pulling a IOBlock from IOBlockQueue, load a buffer and push the buffer to out_connector_ -// IMPORTANT: 1 IOBlock produces 1 DataBuffer -Status ImageFolderOp::WorkerEntry(int32_t worker_id) { - TaskManager::FindMe()->Post(); - int64_t buffer_id = worker_id; - std::unique_ptr io_block; - RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&io_block)); - while (io_block != nullptr) { - if (io_block->wait() == true) { - // Sync io_block is a signal that master thread wants us to pause and sync with other workers. - // The last guy who comes to this sync point should reset the counter and wake up the master thread. - if (++num_workers_paused_ == num_workers_) { - wait_for_workers_post_.Set(); - } - } else if (io_block->eoe() == true) { - RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::make_unique(0, DataBuffer::kDeBFlagEOE))); - buffer_id = worker_id; - } else if (io_block->eof() == true) { - RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::make_unique(0, DataBuffer::kDeBFlagEOF))); - } else { - std::vector keys; - RETURN_IF_NOT_OK(io_block->GetKeys(&keys)); - if (keys.empty() == true) return Status::OK(); // empty key is a quit signal for workers - std::unique_ptr db = std::make_unique(buffer_id, DataBuffer::kDeBFlagNone); - RETURN_IF_NOT_OK(LoadBuffer(keys, &db)); - RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::move(db))); - buffer_id += num_workers_; - } - RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&io_block)); - } - RETURN_STATUS_UNEXPECTED("Unexpected nullptr received in worker"); -} - // Load 1 TensorRow (image,label) using 1 ImageLabelPair. 1 function call produces 1 TensorTow in a DataBuffer -Status ImageFolderOp::LoadTensorRow(row_id_type row_id, ImageLabelPair pairPtr, TensorRow *trow) { +Status ImageFolderOp::LoadTensorRow(row_id_type row_id, TensorRow *trow) { + ImageLabelPair pairPtr = image_label_pairs_[row_id]; std::shared_ptr image, label; RETURN_IF_NOT_OK(Tensor::CreateScalar(pairPtr->second, &label)); RETURN_IF_NOT_OK(Tensor::CreateFromFile(folder_path_ + (pairPtr->first), &image)); @@ -233,18 +141,6 @@ Status ImageFolderOp::LoadTensorRow(row_id_type row_id, ImageLabelPair pairPtr, return Status::OK(); } -// Looping over LoadTensorRow to make 1 DataBuffer. 1 function call produces 1 buffer -Status ImageFolderOp::LoadBuffer(const std::vector &keys, std::unique_ptr *db) { - std::unique_ptr deq = std::make_unique(); - TensorRow trow; - for (const int64_t &key : keys) { - RETURN_IF_NOT_OK(this->LoadTensorRow(key, image_label_pairs_[key], &trow)); - deq->push_back(std::move(trow)); - } - (*db)->set_tensor_table(std::move(deq)); - return Status::OK(); -} - void ImageFolderOp::Print(std::ostream &out, bool show_all) const { if (!show_all) { // Call the super class for displaying any common 1-liner info @@ -260,20 +156,6 @@ void ImageFolderOp::Print(std::ostream &out, bool show_all) const { } } -// Reset Sampler and wakeup Master thread (functor) -Status ImageFolderOp::Reset() { - MS_LOG(DEBUG) << Name() << " performing a self-reset."; - RETURN_IF_NOT_OK(sampler_->ResetSampler()); - row_cnt_ = 0; - return Status::OK(); -} - -// hand shake with Sampler, allow Sampler to call RandomAccessOp's functions to get NumRows -Status ImageFolderOp::InitSampler() { - RETURN_IF_NOT_OK(sampler_->HandshakeRandomAccessOp(this)); - return Status::OK(); -} - // Derived from RandomAccessOp Status ImageFolderOp::GetClassIds(std::map> *cls_ids) const { if (cls_ids == nullptr || !cls_ids->empty() || image_label_pairs_.empty()) { diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/image_folder_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/image_folder_op.h index 2b4e090263..1cf6b366a5 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/image_folder_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/image_folder_op.h @@ -29,6 +29,7 @@ #include "minddata/dataset/engine/data_buffer.h" #include "minddata/dataset/engine/data_schema.h" #include "minddata/dataset/engine/datasetops/parallel_op.h" +#include "minddata/dataset/engine/datasetops/source/mappable_leaf_op.h" #include "minddata/dataset/engine/datasetops/source/sampler/sampler.h" #ifndef ENABLE_ANDROID #include "minddata/dataset/kernels/image/image_utils.h" @@ -50,7 +51,7 @@ class Queue; using ImageLabelPair = std::shared_ptr>; using FolderImagesPair = std::shared_ptr>>; -class ImageFolderOp : public ParallelOp, public RandomAccessOp { +class ImageFolderOp : public MappableLeafOp { public: class Builder { public: @@ -175,22 +176,11 @@ class ImageFolderOp : public ParallelOp, public RandomAccessOp { // @return Status The status code returned Status PrescanMasterEntry(const std::string &dir); - // Worker thread pulls a number of IOBlock from IOBlock Queue, make a buffer and push it to Connector - // @param int32_t workerId - id of each worker - // @return Status The status code returned - Status WorkerEntry(int32_t worker_id) override; - // Worker thread pulls a number of IOBlock from IOBlock Queue, make a buffer and push it to Connector // @param int32_t workerId - id of each worker // @return Status The status code returned Status PrescanWorkerEntry(int32_t worker_id); - // Main Loop of ImageFolderOp - // Master thread: Fill IOBlockQueue, then goes to sleep - // Worker thread: pulls IOBlock from IOBlockQueue, work on it then put buffer to mOutConnector - // @return Status The status code returned - Status operator()() override; - // Method derived from RandomAccess Op, enable Sampler to get all ids for each class // @param (std::map> * map - key label, val all ids for this class // @return Status The status code returned @@ -217,21 +207,12 @@ class ImageFolderOp : public ParallelOp, public RandomAccessOp { Status GetNumClasses(int64_t *num_classes) override; private: - // Initialize Sampler, calls sampler->Init() within - // @return Status The status code returned - Status InitSampler(); - // Load a tensor row according to a pair // @param row_id_type row_id - id for this tensor row // @param ImageLabelPair pair - // @param TensorRow row - image & label read into this tensor row // @return Status The status code returned - Status LoadTensorRow(row_id_type row_id, ImageLabelPair pair, TensorRow *row); - - // @param const std::vector &keys - keys in ioblock - // @param std::unique_ptr db - // @return Status The status code returned - Status LoadBuffer(const std::vector &keys, std::unique_ptr *db); + Status LoadTensorRow(row_id_type row_id, TensorRow *row) override; // @param std::string & dir - dir to walk all images // @param int64_t * cnt - number of non folder files under the current dir @@ -244,25 +225,18 @@ class ImageFolderOp : public ParallelOp, public RandomAccessOp { // Called first when function is called // @return - Status LaunchThreadsAndInitOp(); - - // reset Op - // @return Status The status code returned - Status Reset() override; + Status LaunchThreadsAndInitOp() override; // Private function for computing the assignment of the column name map. // @return - Status Status ComputeColMap() override; - int32_t rows_per_buffer_; std::string folder_path_; // directory of image folder bool recursive_; bool decode_; std::set extensions_; // extensions allowed std::map class_index_; std::unique_ptr data_schema_; - int64_t row_cnt_; - int64_t buf_cnt_; int64_t sampler_ind_; int64_t dirname_offset_; std::vector image_label_pairs_; diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/manifest_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/manifest_op.cc index a2d39ae854..3c57e9eff5 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/manifest_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/manifest_op.cc @@ -67,82 +67,18 @@ Status ManifestOp::Builder::SanityCheck() { ManifestOp::ManifestOp(int32_t num_works, int32_t rows_per_buffer, std::string file, int32_t queue_size, bool decode, const std::map &class_index, std::unique_ptr data_schema, std::shared_ptr sampler, std::string usage) - : ParallelOp(num_works, queue_size, std::move(sampler)), - rows_per_buffer_(rows_per_buffer), + : MappableLeafOp(num_works, queue_size, std::move(sampler), rows_per_buffer), io_block_pushed_(0), - row_cnt_(0), sampler_ind_(0), data_schema_(std::move(data_schema)), file_(file), class_index_(class_index), decode_(decode), - usage_(usage), - buf_cnt_(0) { + usage_(usage) { io_block_queues_.Init(num_workers_, queue_size); (void)std::transform(usage_.begin(), usage_.end(), usage_.begin(), ::tolower); } -// Main logic, Register Queue with TaskGroup, launch all threads and do the functor's work -Status ManifestOp::operator()() { - RETURN_IF_NOT_OK(LaunchThreadsAndInitOp()); - std::unique_ptr sampler_buffer; - RETURN_IF_NOT_OK(sampler_->GetNextSample(&sampler_buffer)); - return AddIoBlock(&sampler_buffer); -} - -Status ManifestOp::AddIoBlock(std::unique_ptr *sampler_buffer) { - while (true) { // each iterator is 1 epoch - std::vector keys; - keys.reserve(rows_per_buffer_); - while (!(*sampler_buffer)->eoe()) { - TensorRow sample_row; - RETURN_IF_NOT_OK((*sampler_buffer)->PopRow(&sample_row)); - std::shared_ptr sample_ids = sample_row[0]; - for (auto itr = sample_ids->begin(); itr != sample_ids->end(); ++itr) { - if ((*itr) >= num_rows_) continue; // index out of bound, skipping - keys.push_back(*itr); - row_cnt_++; - if (row_cnt_ % rows_per_buffer_ == 0) { - RETURN_IF_NOT_OK(io_block_queues_[buf_cnt_++ % num_workers_]->Add( - std::make_unique(IOBlock(keys, IOBlock::kDeIoBlockNone)))); - keys.clear(); - } - } - RETURN_IF_NOT_OK(sampler_->GetNextSample(sampler_buffer)); - } - if (keys.empty() == false) { - RETURN_IF_NOT_OK(io_block_queues_[(buf_cnt_++) % num_workers_]->Add( - std::make_unique(IOBlock(keys, IOBlock::kDeIoBlockNone)))); - } - if (IsLastIteration()) { - RETURN_IF_NOT_OK( - io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::make_unique(IOBlock::kDeIoBlockFlagEoe))); - RETURN_IF_NOT_OK( - io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::make_unique(IOBlock::kDeIoBlockFlagEof))); - for (int32_t i = 0; i < num_workers_; i++) { - RETURN_IF_NOT_OK( - io_block_queues_[i]->Add(std::make_unique(std::vector(), IOBlock::kDeIoBlockNone))); - } - return Status::OK(); - } else { - RETURN_IF_NOT_OK( - io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::make_unique(IOBlock::kDeIoBlockFlagEoe))); - } - - if (epoch_sync_flag_) { - // If epoch_sync_flag_ is set, then master thread sleeps until all the worker threads have finished their job for - // the current epoch. - RETURN_IF_NOT_OK(WaitForWorkers()); - } - // If not the last repeat, self-reset and go to loop again. - if (!IsLastIteration()) { - RETURN_IF_NOT_OK(Reset()); - RETURN_IF_NOT_OK(sampler_->GetNextSample(sampler_buffer)); - } - UpdateRepeatAndEpochCounter(); - } -} - Status ManifestOp::LaunchThreadsAndInitOp() { if (tree_ == nullptr) { RETURN_STATUS_UNEXPECTED("Pipeline init failed, Execution tree not set."); @@ -159,44 +95,9 @@ Status ManifestOp::LaunchThreadsAndInitOp() { return Status::OK(); } -// contains the main logic of pulling a IOBlock from IOBlockQueue, load a buffer and push the buffer to out_connector_ -// IMPORTANT: 1 IOBlock produces 1 DataBuffer -Status ManifestOp::WorkerEntry(int32_t worker_id) { - TaskManager::FindMe()->Post(); - int64_t buffer_id = worker_id; - std::unique_ptr io_block; - RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&io_block)); - while (io_block != nullptr) { - if (io_block->wait() == true) { - // Sync io_block is a signal that master thread wants us to pause and sync with other workers. - // The last guy who comes to this sync point should reset the counter and wake up the master thread. - if (++num_workers_paused_ == num_workers_) { - wait_for_workers_post_.Set(); - } - } else if (io_block->eoe() == true) { - RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::make_unique(0, DataBuffer::kDeBFlagEOE))); - buffer_id = worker_id; - } else if (io_block->eof() == true) { - RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::make_unique(0, DataBuffer::kDeBFlagEOF))); - } else { - std::vector keys; - RETURN_IF_NOT_OK(io_block->GetKeys(&keys)); - if (keys.empty()) { - return Status::OK(); // empty key is a quit signal for workers - } - std::unique_ptr db = std::make_unique(buffer_id, DataBuffer::kDeBFlagNone); - RETURN_IF_NOT_OK(LoadBuffer(keys, &db)); - RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::move(db))); - buffer_id += num_workers_; - } - RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&io_block)); - } - RETURN_STATUS_UNEXPECTED("Unexpected nullptr received in worker."); -} - // Load 1 TensorRow (image,label) using 1 ImageLabelPair. 1 function call produces 1 TensorTow in a DataBuffer -Status ManifestOp::LoadTensorRow(row_id_type row_id, const std::pair> &data, - TensorRow *trow) { +Status ManifestOp::LoadTensorRow(row_id_type row_id, TensorRow *trow) { + std::pair> data = image_labelname_[static_cast(row_id)]; std::shared_ptr image; std::shared_ptr label; std::vector label_index(data.second.size()); @@ -222,18 +123,6 @@ Status ManifestOp::LoadTensorRow(row_id_type row_id, const std::pair &keys, std::unique_ptr *db) { - std::unique_ptr deq = std::make_unique(); - for (const auto &key : keys) { - TensorRow trow; - RETURN_IF_NOT_OK(LoadTensorRow(key, image_labelname_[static_cast(key)], &trow)); - deq->push_back(std::move(trow)); - } - (*db)->set_tensor_table(std::move(deq)); - return Status::OK(); -} - void ManifestOp::Print(std::ostream &out, bool show_all) const { if (!show_all) { // Call the super class for displaying any common 1-liner info @@ -249,20 +138,6 @@ void ManifestOp::Print(std::ostream &out, bool show_all) const { } } -// Reset Sampler and wakeup Master thread (functor) -Status ManifestOp::Reset() { - MS_LOG(DEBUG) << Name() << " performing a self-reset."; - RETURN_IF_NOT_OK(sampler_->ResetSampler()); - row_cnt_ = 0; - return Status::OK(); -} - -// hand shake with Sampler, allow Sampler to call RandomAccessOp's functions to get NumRows -Status ManifestOp::InitSampler() { - RETURN_IF_NOT_OK(sampler_->HandshakeRandomAccessOp(this)); - return Status::OK(); -} - // Derived from RandomAccessOp Status ManifestOp::GetClassIds(std::map> *cls_ids) const { if (cls_ids == nullptr || !cls_ids->empty() || image_labelname_.empty()) { diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/manifest_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/manifest_op.h index b8417fa554..2ccb0fda8f 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/manifest_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/manifest_op.h @@ -26,6 +26,7 @@ #include "minddata/dataset/engine/data_buffer.h" #include "minddata/dataset/engine/data_schema.h" #include "minddata/dataset/engine/datasetops/parallel_op.h" +#include "minddata/dataset/engine/datasetops/source/mappable_leaf_op.h" #include "minddata/dataset/engine/datasetops/source/sampler/sampler.h" #include "minddata/dataset/kernels/image/image_utils.h" #include "minddata/dataset/util/queue.h" @@ -35,7 +36,7 @@ namespace mindspore { namespace dataset { -class ManifestOp : public ParallelOp, public RandomAccessOp { +class ManifestOp : public MappableLeafOp { public: class Builder { public: @@ -143,17 +144,6 @@ class ManifestOp : public ParallelOp, public RandomAccessOp { // Destructor. ~ManifestOp() = default; - // Worker thread pulls a number of IOBlock from IOBlock Queue, make a buffer and push it to Connector - // @param int32_t worker_id - id of each worker - // @return Status The status code returned - Status WorkerEntry(int32_t worker_id) override; - - // Main Loop of ManifestOp - // Master thread: Fill IOBlockQueue, then goes to sleep - // Worker thread: pulls IOBlock from IOBlockQueue, work on it then put buffer to mOutConnector - // @return Status The status code returned - Status operator()() override; - // Method derived from RandomAccess Op, enable Sampler to get all ids for each class // @param (std::map> * map - key label, val all ids for this class // @return Status The status code returned @@ -194,27 +184,12 @@ class ManifestOp : public ParallelOp, public RandomAccessOp { Status GetClassIndexing(std::vector>> *output_class_indexing) override; private: - // Initialize Sampler, calls sampler->Init() within - // @return Status The status code returned - Status InitSampler(); - - // Method in operator(), to fill IOBlockQueue - // @param std::unique_ptr sampler_buffer - to fill IOBlockQueue - // @return Status The status code returned - Status AddIoBlock(std::unique_ptr *sampler_buffer); - // Load a tensor row according to a pair // @param row_id_type row_id - id for this tensor row // @param std::pair> - > // @param TensorRow row - image & label read into this tensor row // @return Status The status code returned - Status LoadTensorRow(row_id_type row_id, const std::pair> &data, - TensorRow *row); - - // @param const std::vector &keys - keys in ioblock - // @param std::unique_ptr db - // @return Status The status code returned - Status LoadBuffer(const std::vector &keys, std::unique_ptr *db); + Status LoadTensorRow(row_id_type row_id, TensorRow *row) override; // Parse manifest file to get image path and label and so on. // @return Status The status code returned @@ -222,11 +197,7 @@ class ManifestOp : public ParallelOp, public RandomAccessOp { // Called first when function is called // @return Status The status code returned - Status LaunchThreadsAndInitOp(); - - // reset Op - // @return Status The status code returned - Status Reset() override; + Status LaunchThreadsAndInitOp() override; // Check if image ia valid.Only support JPEG/PNG/GIF/BMP // @return @@ -240,16 +211,13 @@ class ManifestOp : public ParallelOp, public RandomAccessOp { // @return - Status Status ComputeColMap() override; - int32_t rows_per_buffer_; int64_t io_block_pushed_; - int64_t row_cnt_; int64_t sampler_ind_; std::unique_ptr data_schema_; std::string file_; // file that store the information of images std::map class_index_; bool decode_; std::string usage_; - int64_t buf_cnt_; std::map label_index_; std::vector>> image_labelname_; diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mappable_leaf_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mappable_leaf_op.cc new file mode 100644 index 0000000000..b17d885bec --- /dev/null +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mappable_leaf_op.cc @@ -0,0 +1,152 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "minddata/dataset/engine/datasetops/source/mappable_leaf_op.h" +#include +#include +#include "utils/ms_utils.h" +#include "minddata/dataset/core/config_manager.h" +#include "minddata/dataset/core/tensor_shape.h" +#include "minddata/dataset/engine/datasetops/source/sampler/sequential_sampler.h" +#include "minddata/dataset/engine/db_connector.h" +#include "minddata/dataset/engine/execution_tree.h" + +namespace mindspore { +namespace dataset { + +MappableLeafOp::MappableLeafOp(int32_t num_wkrs, int32_t queue_size, std::shared_ptr sampler, + int32_t rows_per_buffer) + : ParallelOp(num_wkrs, queue_size, std::move(sampler)), + row_cnt_(0), + buf_cnt_(0), + rows_per_buffer_(rows_per_buffer) {} + +// Main logic, Register Queue with TaskGroup, launch all threads and do the functor's work +Status MappableLeafOp::operator()() { + RETURN_IF_NOT_OK(LaunchThreadsAndInitOp()); + std::unique_ptr sampler_buffer; + RETURN_IF_NOT_OK(sampler_->GetNextSample(&sampler_buffer)); + while (true) { // each iterator is 1 epoch + std::vector keys; + keys.reserve(rows_per_buffer_); + while (sampler_buffer->eoe() == false) { + TensorRow sample_row; + RETURN_IF_NOT_OK(sampler_buffer->PopRow(&sample_row)); + std::shared_ptr sample_ids = sample_row[0]; + for (auto itr = sample_ids->begin(); itr != sample_ids->end(); ++itr) { + if ((*itr) >= num_rows_) continue; // index out of bound, skipping + keys.push_back(*itr); + row_cnt_++; + if (row_cnt_ % rows_per_buffer_ == 0) { + RETURN_IF_NOT_OK( + io_block_queues_[buf_cnt_++ % num_workers_]->Add(std::make_unique(keys, IOBlock::kDeIoBlockNone))); + keys.clear(); + } + } + RETURN_IF_NOT_OK(sampler_->GetNextSample(&sampler_buffer)); + } + if (keys.empty() == false) { + RETURN_IF_NOT_OK( + io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::make_unique(keys, IOBlock::kDeIoBlockNone))); + } + if (IsLastIteration()) { + std::unique_ptr eoe_block = std::make_unique(IOBlock::kDeIoBlockFlagEoe); + std::unique_ptr eof_block = std::make_unique(IOBlock::kDeIoBlockFlagEof); + RETURN_IF_NOT_OK(io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::move(eoe_block))); + RETURN_IF_NOT_OK(io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::move(eof_block))); + for (int32_t i = 0; i < num_workers_; ++i) { + RETURN_IF_NOT_OK( + io_block_queues_[i]->Add(std::make_unique(std::vector(), IOBlock::kDeIoBlockNone))); + } + return Status::OK(); + } else { // not the last repeat. + RETURN_IF_NOT_OK( + io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::make_unique(IOBlock::kDeIoBlockFlagEoe))); + } + + if (epoch_sync_flag_) { + // If epoch_sync_flag_ is set, then master thread sleeps until all the worker threads have finished their job for + // the current epoch. + RETURN_IF_NOT_OK(WaitForWorkers()); + } + // If not the last repeat, self-reset and go to loop again. + if (!IsLastIteration()) { + RETURN_IF_NOT_OK(Reset()); + RETURN_IF_NOT_OK(sampler_->GetNextSample(&sampler_buffer)); + } + UpdateRepeatAndEpochCounter(); + } +} + +// Reset Sampler and wakeup Master thread (functor) +Status MappableLeafOp::Reset() { + MS_LOG(DEBUG) << Name() << " performing a self-reset."; + RETURN_IF_NOT_OK(sampler_->ResetSampler()); + return Status::OK(); +} + +// hand shake with Sampler, allow Sampler to call RandomAccessOp's functions to get NumRows +Status MappableLeafOp::InitSampler() { + RETURN_IF_NOT_OK(sampler_->HandshakeRandomAccessOp(this)); + return Status::OK(); +} + +// contains the main logic of pulling a IOBlock from IOBlockQueue, load a buffer and push the buffer to out_connector_ +// IMPORTANT: 1 IOBlock produces 1 DataBuffer +Status MappableLeafOp::WorkerEntry(int32_t worker_id) { + TaskManager::FindMe()->Post(); + int64_t buffer_id = worker_id; + std::unique_ptr io_block; + RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&io_block)); + while (io_block != nullptr) { + if (io_block->wait() == true) { + // Sync io_block is a signal that master thread wants us to pause and sync with other workers. + // The last guy who comes to this sync point should reset the counter and wake up the master thread. + if (++num_workers_paused_ == num_workers_) { + wait_for_workers_post_.Set(); + } + } else if (io_block->eoe() == true) { + RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::make_unique(0, DataBuffer::kDeBFlagEOE))); + buffer_id = worker_id; + } else if (io_block->eof() == true) { + RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::make_unique(0, DataBuffer::kDeBFlagEOF))); + } else { + std::vector keys; + RETURN_IF_NOT_OK(io_block->GetKeys(&keys)); + if (keys.empty() == true) return Status::OK(); // empty key is a quit signal for workers + std::unique_ptr db = std::make_unique(buffer_id, DataBuffer::kDeBFlagNone); + RETURN_IF_NOT_OK(LoadBuffer(keys, &db)); + RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::move(db))); + buffer_id += num_workers_; + } + RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&io_block)); + } + RETURN_STATUS_UNEXPECTED("Unexpected nullptr received in worker"); +} + +// Looping over LoadTensorRow to make 1 DataBuffer. 1 function call produces 1 buffer +Status MappableLeafOp::LoadBuffer(const std::vector &keys, std::unique_ptr *db) { + std::unique_ptr deq = std::make_unique(); + TensorRow trow; + for (const int64_t &key : keys) { + RETURN_IF_NOT_OK(this->LoadTensorRow(key, &trow)); + deq->push_back(std::move(trow)); + } + (*db)->set_tensor_table(std::move(deq)); + return Status::OK(); +} + +} // namespace dataset +} // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mappable_leaf_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mappable_leaf_op.h new file mode 100644 index 0000000000..897fb76123 --- /dev/null +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mappable_leaf_op.h @@ -0,0 +1,110 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_SOURCE_MAPPABLE_LEAF_OP_H_ +#define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_SOURCE_MAPPABLE_LEAF_OP_H_ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "minddata/dataset/core/tensor.h" +#include "minddata/dataset/engine/data_buffer.h" +#include "minddata/dataset/engine/data_schema.h" +#include "minddata/dataset/engine/datasetops/parallel_op.h" +#include "minddata/dataset/engine/datasetops/source/sampler/sampler.h" +#ifndef ENABLE_ANDROID +#include "minddata/dataset/kernels/image/image_utils.h" +#else +#include "minddata/dataset/kernels/image/lite_image_utils.h" +#endif +#include "minddata/dataset/util/path.h" +#include "minddata/dataset/util/queue.h" +#include "minddata/dataset/util/services.h" +#include "minddata/dataset/util/status.h" +#include "minddata/dataset/util/wait_post.h" + +namespace mindspore { +namespace dataset { +// Forward declares +template +class Queue; + +using ImageLabelPair = std::shared_ptr>; +using FolderImagesPair = std::shared_ptr>>; + +class MappableLeafOp : public ParallelOp, public RandomAccessOp { + public: + // Constructor + // @param int32_t num_wkrs - Num of workers reading images in parallel + // @param int32_t - rows_per_buffer Number of images (rows) in each buffer + // @param std::string - dir directory of ImageNetFolder + // @param int32_t queue_size - connector queue size + // @param std::set exts - set of file extensions to read, if empty, read everything under the dir + // @param td::unique_ptr sampler - sampler tells the source what to read + MappableLeafOp(int32_t num_wkrs, int32_t queue_size, std::shared_ptr sampler, int32_t rows_per_buffer); + + // Destructor. + ~MappableLeafOp() = default; + + // Main Loop of MappableLeaf + // Master thread: Fill IOBlockQueue, then goes to sleep + // Worker thread: pulls IOBlock from IOBlockQueue, work on it then put buffer to mOutConnector + // @return Status The status code returned + Status operator()() override; + + // Op name getter + // @return Name of the current Op + std::string Name() const override { return "MappableLeafPp"; } + + protected: + // Initialize Sampler, calls sampler->Init() within + // @return Status The status code returned + Status InitSampler(); + + // // Called first when function is called + // // @return + virtual Status LaunchThreadsAndInitOp() = 0; + + Status WorkerEntry(int32_t workerId) override; + + // @param const std::vector &keys - keys in ioblock + // @param std::unique_ptr db + // @return Status The status code returned + Status LoadBuffer(const std::vector &keys, std::unique_ptr *db); + + // Load a tensor row according to a pair + // @param row_id_type row_id - id for this tensor row + // @param ImageLabelPair pair - + // @param TensorRow row - loaded row + // @return Status The status code returned + virtual Status LoadTensorRow(row_id_type row_id, TensorRow *row) = 0; + + // reset Op + // @return Status The status code returned + Status Reset() override; + + int32_t rows_per_buffer_; + int64_t row_cnt_; + int64_t buf_cnt_; +}; +} // namespace dataset +} // namespace mindspore +#endif // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_SOURCE_MAPPABLE_LEAF_OP_H_ diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mindrecord_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mindrecord_op.cc index 9c8f2e04ed..470b505c73 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mindrecord_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mindrecord_op.cc @@ -27,6 +27,7 @@ #include "minddata/dataset/core/global_context.h" #include "minddata/dataset/engine/data_buffer.h" #include "minddata/dataset/engine/datasetops/dataset_op.h" +#include "minddata/dataset/engine/datasetops/source/sampler/sequential_sampler.h" #include "minddata/dataset/engine/db_connector.h" #include "minddata/dataset/engine/execution_tree.h" #include "minddata/dataset/util/log_adapter.h" @@ -115,16 +116,14 @@ MindRecordOp::MindRecordOp(int32_t num_mind_record_workers, int32_t rows_per_buf const std::vector &columns_to_load, const std::vector> &operators, int64_t num_padded, const mindrecord::json &sample_json, const std::map &sample_bytes) - : ParallelOp(num_mind_record_workers, op_connector_queue_size), - rows_per_buffer_(rows_per_buffer), + : MappableLeafOp(num_mind_record_workers, op_connector_queue_size, std::make_shared(0, 0), + rows_per_buffer), dataset_file_(dataset_file), load_dataset_(load_dataset), columns_to_load_(columns_to_load), operators_(operators), num_mind_record_workers_(num_mind_record_workers), - num_rows_(0), buffers_needed_(0), - buf_cnt_(0), ended_worker_(0), num_padded_(num_padded), sample_json_(sample_json), @@ -379,61 +378,19 @@ Status MindRecordOp::LoadTensorRow(TensorRow *tensor_row, const std::vectorGetNumRows(); - // Compute how many buffers we would need to accomplish rowsPerBuffer - buffers_needed_ = (num_rows_ + rows_per_buffer_ - 1) / rows_per_buffer_; - - while (true) { // each iterator is 1 epoch - for (int32_t i = 0; i < buffers_needed_; ++i) { - std::vector keys(1, i); - RETURN_IF_NOT_OK(io_block_queues_[buf_cnt_++ % num_workers_]->Add( - std::make_unique(IOBlock(keys, IOBlock::kDeIoBlockNone)))); - } - if (IsLastIteration()) { - RETURN_IF_NOT_OK( - io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::make_unique(IOBlock::kDeIoBlockFlagEoe))); - RETURN_IF_NOT_OK( - io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::make_unique(IOBlock::kDeIoBlockFlagEof))); - for (int32_t i = 0; i < num_workers_; i++) { - RETURN_IF_NOT_OK(io_block_queues_[i]->Add( - std::move(std::make_unique(std::vector(), IOBlock::kDeIoBlockNone)))); - } - return Status::OK(); - } else { - RETURN_IF_NOT_OK( - io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::make_unique(IOBlock::kDeIoBlockFlagEoe))); - } - - if (epoch_sync_flag_) { - // If epoch_sync_flag_ is set, then master thread sleeps until all the worker threads have finished their job for - // the current epoch. - RETURN_IF_NOT_OK(WaitForWorkers()); - } - // If not the last repeat, self-reset and go to loop again. - if (!IsLastIteration()) RETURN_IF_NOT_OK(Reset()); - UpdateRepeatAndEpochCounter(); - } -} - // Overrides base class reset method. When an operator does a reset, it cleans up any state // info from it's previous execution and then initializes itself so that it can be executed // again. Status MindRecordOp::Reset() { MS_LOG(DEBUG) << Name() << " performing a self-reset."; - RETURN_IF_NOT_OK(ParallelOp::Reset()); // Call our super class reset first. + RETURN_IF_NOT_OK(MappableLeafOp::Reset()); // Call our super class reset first. shard_reader_->ShuffleTask(); return Status::OK(); } -Status MindRecordOp::LaunchThreadAndInitOp() { +Status MindRecordOp::LaunchThreadsAndInitOp() { if (tree_ == nullptr) { RETURN_STATUS_UNEXPECTED("Pipeline init failed, Execution tree not set."); } @@ -446,6 +403,8 @@ Status MindRecordOp::LaunchThreadAndInitOp() { // Launch main workers that load DataBuffers by reading all images RETURN_IF_NOT_OK( tree_->LaunchWorkers(num_workers_, std::bind(&MindRecordOp::WorkerEntry, this, std::placeholders::_1), "", id())); + num_rows_ = shard_reader_->GetNumRows(); + RETURN_IF_NOT_OK(this->InitSampler()); // pass numRows to Sampler TaskManager::FindMe()->Post(); return Status::OK(); } diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mindrecord_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mindrecord_op.h index acc90fd879..89804eee21 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mindrecord_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mindrecord_op.h @@ -28,7 +28,7 @@ #include #include "minddata/dataset/engine/data_schema.h" -#include "minddata/dataset/engine/datasetops/parallel_op.h" +#include "minddata/dataset/engine/datasetops/source/mappable_leaf_op.h" #include "minddata/dataset/util/queue.h" #include "minddata/dataset/util/status.h" #include "minddata/mindrecord/include/shard_column.h" @@ -50,7 +50,7 @@ using ShardTuple = std::vector, mindrecord::json const int32_t LOG_INTERVAL = 19; -class MindRecordOp : public ParallelOp { +class MindRecordOp : public MappableLeafOp { public: // The nested builder class inside of the MindRecordOp is used to help manage all of the arguments // for constructing it. Use the builder by setting each argument with the provided set methods, @@ -167,15 +167,9 @@ class MindRecordOp : public ParallelOp { // @return Status The status code returned Status WorkerEntry(int32_t worker_id) override; - // Class functor operator () override. - // All DatasetOps operate by launching a thread (see ExecutionTree). This class functor will - // provide the master loop that drives the logic for performing the work. - // @return Status The status code returned - Status operator()() override; - // Called first when function is called // @return - Status LaunchThreadAndInitOp(); + Status LaunchThreadsAndInitOp() override; // Overrides base class reset method. When an operator does a reset, it cleans up any state // info from it's previous execution and then initializes itself so that it can be executed @@ -183,15 +177,9 @@ class MindRecordOp : public ParallelOp { // @return Status The status code returned Status Reset() override; - // Getter method - int32_t num_rows() const { return num_rows_; } - static Status CountTotalRows(const std::vector dataset_path, bool load_dataset, const std::shared_ptr &op, int64_t *count, int64_t num_padded); - // Getter method - int32_t rows_per_buffer() const { return rows_per_buffer_; } - // Getter method std::vector dataset_file() const { return dataset_file_; } @@ -216,19 +204,19 @@ class MindRecordOp : public ParallelOp { Status LoadTensorRow(TensorRow *tensor_row, const std::vector &columns_blob, const mindrecord::json &columns_json, const mindrecord::TaskType task_type); + Status LoadTensorRow(row_id_type row_id, TensorRow *row) override { + return Status(StatusCode::kMDSyntaxError, "Cannot call this method."); + } // Private function for computing the assignment of the column name map. // @return - Status Status ComputeColMap() override; - int32_t rows_per_buffer_; // The number of requested rows per buffer. std::vector dataset_file_; // dataset files bool load_dataset_; // load dataset from single file or not std::vector columns_to_load_; // Columns to load from dataset std::vector> operators_; // ShardOperators to use int32_t num_mind_record_workers_; // number of workers to be spawned by ShardReader int32_t buffers_needed_; // Counter for the buffers that were fetched - int64_t buf_cnt_; // Buffer counter - int32_t num_rows_; // One more than the last row id in the range for this cache std::atomic ended_worker_; int64_t num_padded_; diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mnist_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mnist_op.cc index 397f1cee23..c7498deb06 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mnist_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mnist_op.cc @@ -75,117 +75,18 @@ Status MnistOp::Builder::SanityCheck() { MnistOp::MnistOp(const std::string &usage, int32_t num_workers, int32_t rows_per_buffer, std::string folder_path, int32_t queue_size, std::unique_ptr data_schema, std::shared_ptr sampler) - : ParallelOp(num_workers, queue_size, std::move(sampler)), + : MappableLeafOp(num_workers, queue_size, std::move(sampler), rows_per_buffer), usage_(usage), - buf_cnt_(0), - row_cnt_(0), folder_path_(folder_path), - rows_per_buffer_(rows_per_buffer), image_path_({}), label_path_({}), data_schema_(std::move(data_schema)) { io_block_queues_.Init(num_workers, queue_size); } -Status MnistOp::TraversalSampleIds(const std::shared_ptr &sample_ids, std::vector *keys) { - for (auto itr = sample_ids->begin(); itr != sample_ids->end(); ++itr) { - if ((*itr) >= num_rows_) continue; // index out of bound, skipping - keys->push_back(*itr); - row_cnt_++; - if (row_cnt_ % rows_per_buffer_ == 0) { - RETURN_IF_NOT_OK(io_block_queues_[buf_cnt_++ % num_workers_]->Add( - std::make_unique(IOBlock(*keys, IOBlock::kDeIoBlockNone)))); - keys->clear(); - } - } - return Status::OK(); -} - -// functor that contains the main logic of MNIST op -Status MnistOp::operator()() { - RETURN_IF_NOT_OK(LaunchThreadsAndInitOp()); - std::unique_ptr sampler_buffer; - RETURN_IF_NOT_OK(sampler_->GetNextSample(&sampler_buffer)); - while (true) { // each iterator is 1 epoch - std::vector keys; - keys.reserve(rows_per_buffer_); - while (sampler_buffer->eoe() == false) { - std::shared_ptr sample_ids; - RETURN_IF_NOT_OK(sampler_buffer->GetTensor(&sample_ids, 0, 0)); - if (sample_ids->type() != DataType(DataType::DE_INT64)) { - RETURN_STATUS_UNEXPECTED("Invalid parameter, data type of Sampler Tensor isn't int64, got " + - sample_ids->type().ToString()); - } - RETURN_IF_NOT_OK(TraversalSampleIds(sample_ids, &keys)); - RETURN_IF_NOT_OK(sampler_->GetNextSample(&sampler_buffer)); - } - if (keys.empty() == false) { - RETURN_IF_NOT_OK(io_block_queues_[(buf_cnt_++) % num_workers_]->Add( - std::make_unique(IOBlock(keys, IOBlock::kDeIoBlockNone)))); - } - if (IsLastIteration()) { - RETURN_IF_NOT_OK( - io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::make_unique(IOBlock::kDeIoBlockFlagEoe))); - RETURN_IF_NOT_OK( - io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::make_unique(IOBlock::kDeIoBlockFlagEof))); - for (int32_t i = 0; i < num_workers_; ++i) { - RETURN_IF_NOT_OK( - io_block_queues_[i]->Add(std::make_unique(std::vector(), IOBlock::kDeIoBlockNone))); - } - return Status::OK(); - } else { - RETURN_IF_NOT_OK( - io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::make_unique(IOBlock::kDeIoBlockFlagEoe))); - } - - if (epoch_sync_flag_) { - // If epoch_sync_flag_ is set, then master thread sleeps until all the worker threads have finished their job for - // the current epoch. - RETURN_IF_NOT_OK(WaitForWorkers()); - } - // If not the last repeat, self-reset and go to loop again. - if (!IsLastIteration()) { - RETURN_IF_NOT_OK(Reset()); - RETURN_IF_NOT_OK(sampler_->GetNextSample(&sampler_buffer)); - } - UpdateRepeatAndEpochCounter(); - } -} - -// contains the logic of pulling a IOBlock from IOBlockQueue, load a buffer and push the buffer to out_connector_ -Status MnistOp::WorkerEntry(int32_t worker_id) { - TaskManager::FindMe()->Post(); - int64_t buffer_id = worker_id; - std::unique_ptr iOBlock; - RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&iOBlock)); - while (iOBlock != nullptr) { - if (iOBlock->wait() == true) { - // Sync io_block is a signal that master thread wants us to pause and sync with other workers. - // The last guy who comes to this sync point should reset the counter and wake up the master thread. - if (++num_workers_paused_ == num_workers_) { - wait_for_workers_post_.Set(); - } - } else if (iOBlock->eoe() == true) { - RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::make_unique(0, DataBuffer::kDeBFlagEOE))); - buffer_id = worker_id; - } else if (iOBlock->eof() == true) { - RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::make_unique(0, DataBuffer::kDeBFlagEOF))); - } else { - std::vector keys; - RETURN_IF_NOT_OK(iOBlock->GetKeys(&keys)); - if (keys.empty() == true) return Status::OK(); // empty key is a quit signal for workers - std::unique_ptr db = std::make_unique(buffer_id, DataBuffer::kDeBFlagNone); - RETURN_IF_NOT_OK(LoadBuffer(keys, &db)); - RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::move(db))); - buffer_id += num_workers_; - } - RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&iOBlock)); - } - RETURN_STATUS_UNEXPECTED("Unexpected nullptr received in worker."); -} - // Load 1 TensorRow (image,label) using 1 MnistLabelPair. -Status MnistOp::LoadTensorRow(row_id_type row_id, const MnistLabelPair &mnist_pair, TensorRow *trow) { +Status MnistOp::LoadTensorRow(row_id_type row_id, TensorRow *trow) { + MnistLabelPair mnist_pair = image_label_pairs_[row_id]; std::shared_ptr image, label; // make a copy of cached tensor RETURN_IF_NOT_OK(Tensor::CreateFromTensor(mnist_pair.first, &image)); @@ -196,18 +97,6 @@ Status MnistOp::LoadTensorRow(row_id_type row_id, const MnistLabelPair &mnist_pa return Status::OK(); } -// Looping over LoadTensorRow to make 1 DataBuffer. 1 function call produces 1 buffer -Status MnistOp::LoadBuffer(const std::vector &keys, std::unique_ptr *db) { - std::unique_ptr deq = std::make_unique(); - TensorRow trow; - for (const int64_t &key : keys) { - RETURN_IF_NOT_OK(this->LoadTensorRow(key, image_label_pairs_[key], &trow)); - deq->push_back(std::move(trow)); - } - (*db)->set_tensor_table(std::move(deq)); - return Status::OK(); -} - void MnistOp::Print(std::ostream &out, bool show_all) const { if (!show_all) { // Call the super class for displaying any common 1-liner info @@ -222,20 +111,6 @@ void MnistOp::Print(std::ostream &out, bool show_all) const { } } -// Reset Sampler and wakeup Master thread (functor) -Status MnistOp::Reset() { - MS_LOG(DEBUG) << Name() << " performing a self-reset."; - RETURN_IF_NOT_OK(sampler_->ResetSampler()); - row_cnt_ = 0; - return Status::OK(); -} - -// hand shake with Sampler, allow Sampler to call RandomAccessOp's functions to get NumRows -Status MnistOp::InitSampler() { - RETURN_IF_NOT_OK(sampler_->HandshakeRandomAccessOp(this)); - return Status::OK(); -} - // Derived from RandomAccessOp Status MnistOp::GetClassIds(std::map> *cls_ids) const { if (cls_ids == nullptr || !cls_ids->empty() || image_label_pairs_.empty()) { diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mnist_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mnist_op.h index 1791adfadd..c95b305e66 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mnist_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mnist_op.h @@ -27,6 +27,7 @@ #include "minddata/dataset/engine/data_buffer.h" #include "minddata/dataset/engine/data_schema.h" #include "minddata/dataset/engine/datasetops/parallel_op.h" +#include "minddata/dataset/engine/datasetops/source/mappable_leaf_op.h" #include "minddata/dataset/engine/datasetops/source/sampler/sampler.h" #include "minddata/dataset/util/path.h" #include "minddata/dataset/util/queue.h" @@ -41,7 +42,7 @@ class Queue; using MnistLabelPair = std::pair, uint32_t>; -class MnistOp : public ParallelOp, public RandomAccessOp { +class MnistOp : public MappableLeafOp { public: class Builder { public: @@ -131,17 +132,6 @@ class MnistOp : public ParallelOp, public RandomAccessOp { // Destructor. ~MnistOp() = default; - // Worker thread pulls a number of IOBlock from IOBlock Queue, make a buffer and push it to Connector - // @param int32_t worker_id - id of each worker - // @return Status The status code returned - Status WorkerEntry(int32_t worker_id) override; - - // Main Loop of MnistOp - // Master thread: Fill IOBlockQueue, then goes to sleep - // Worker thread: pulls IOBlock from IOBlockQueue, work on it then put buffer to mOutConnector - // @return Status The status code returned - Status operator()() override; - // Method derived from RandomAccess Op, enable Sampler to get all ids for each class // @param (std::map> * map - key label, val all ids for this class // @return Status The status code returned @@ -163,27 +153,12 @@ class MnistOp : public ParallelOp, public RandomAccessOp { std::string Name() const override { return "MnistOp"; } private: - // Initialize Sampler, calls sampler->Init() within - // @return Status The status code returned - Status InitSampler(); - // Load a tensor row according to a pair // @param row_id_type row_id - id for this tensor row // @param ImageLabelPair pair - // @param TensorRow row - image & label read into this tensor row // @return Status The status code returned - Status LoadTensorRow(row_id_type row_id, const MnistLabelPair &mnist_pair, TensorRow *row); - - // @param const std::vector &keys - keys in ioblock - // @param std::unique_ptr db - // @return Status The status code returned - Status LoadBuffer(const std::vector &keys, std::unique_ptr *db); - - // Iterate through all members in sampleIds and fill them into IOBlock. - // @param std::shared_ptr sample_ids - - // @param std::vector *keys - keys in ioblock - // @return Status The status code returned - Status TraversalSampleIds(const std::shared_ptr &sample_ids, std::vector *keys); + Status LoadTensorRow(row_id_type row_id, TensorRow *row) override; // Check image file stream. // @param const std::string *file_name - image file name @@ -226,20 +201,13 @@ class MnistOp : public ParallelOp, public RandomAccessOp { // Called first when function is called // @return Status The status code returned - Status LaunchThreadsAndInitOp(); - - // reset Op - // @return Status The status code returned - Status Reset() override; + Status LaunchThreadsAndInitOp() override; // Private function for computing the assignment of the column name map. // @return - Status Status ComputeColMap() override; - int64_t buf_cnt_; - int64_t row_cnt_; std::string folder_path_; // directory of image folder - int32_t rows_per_buffer_; const std::string usage_; // can only be either "train" or "test" std::unique_ptr data_schema_; std::vector image_label_pairs_; diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/sampler.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/sampler.cc index 95d1bf4320..ab810511d5 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/sampler.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/sampler.cc @@ -1,5 +1,5 @@ /** - * Copyright 2019 Huawei Technologies Co., Ltd + * Copyright 2019-2021 Huawei Technologies Co., Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,7 +26,7 @@ Status RandomAccessOp::GetNumRowsInDataset(int64_t *num) const { // after it has interacted with it's storage layers. // Here, it is just a getter method to return the value. However, it is invalid if there is // not a value set for this count, so generate a failure if that is the case. - if (num == nullptr || num_rows_ == 0) { + if (num == nullptr || num_rows_ == -1) { RETURN_STATUS_UNEXPECTED("RandomAccessOp has not computed its num rows yet."); } (*num) = num_rows_; @@ -70,9 +70,6 @@ Status SamplerRT::HandshakeRandomAccessOp(const RandomAccessOp *op) { } Status SamplerRT::CreateSamplerTensor(std::shared_ptr *sample_ids, int64_t num_elements) { - if (num_elements == 0) { - RETURN_STATUS_UNEXPECTED("Invalid data, num of elements cannot be 0."); - } if (col_desc_ == nullptr) { // a ColDescriptor for Tensor that holds SampleIds col_desc_ = std::make_unique("sampleIds", DataType(DataType::DE_INT64), TensorImpl::kFlexible, 1); diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/sequential_sampler.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/sequential_sampler.cc index 5968cd29c4..7668130007 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/sequential_sampler.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/sequential_sampler.cc @@ -1,5 +1,5 @@ /** - * Copyright 2019 Huawei Technologies Co., Ltd + * Copyright 2019-2021 Huawei Technologies Co., Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -70,7 +70,7 @@ Status SequentialSamplerRT::InitSampler() { CHECK_FAIL_RETURN_UNEXPECTED(start_index_ >= 0, "Invalid parameter, start_index must be greater than or equal to 0, but got " + std::to_string(start_index_) + ".\n"); - CHECK_FAIL_RETURN_UNEXPECTED(start_index_ < num_rows_, + CHECK_FAIL_RETURN_UNEXPECTED(start_index_ < num_rows_ || (num_rows_ == 0 && start_index_ == 0), "Invalid parameter, start_index must be less than num_rows, but got start_index: " + std::to_string(start_index_) + ", num_rows: " + std::to_string(num_rows_) + ".\n"); CHECK_FAIL_RETURN_UNEXPECTED(num_samples_ >= 0, @@ -83,7 +83,7 @@ Status SequentialSamplerRT::InitSampler() { num_samples_ = available_row_count; } CHECK_FAIL_RETURN_UNEXPECTED( - num_samples_ > 0 && samples_per_buffer_ > 0, + (num_samples_ > 0 && samples_per_buffer_ > 0) || num_samples_ == 0, "Invalid parameter, samples_per_buffer must be greater than 0, but got " + std::to_string(samples_per_buffer_)); samples_per_buffer_ = samples_per_buffer_ > num_samples_ ? num_samples_ : samples_per_buffer_; diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/voc_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/voc_op.cc index 11d6b24ea2..56c6ee2833 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/voc_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/voc_op.cc @@ -99,83 +99,16 @@ VOCOp::VOCOp(const TaskType &task_type, const std::string &task_mode, const std: const std::map &class_index, int32_t num_workers, int32_t rows_per_buffer, int32_t queue_size, bool decode, std::unique_ptr data_schema, std::shared_ptr sampler) - : ParallelOp(num_workers, queue_size, std::move(sampler)), + : MappableLeafOp(num_workers, queue_size, std::move(sampler), rows_per_buffer), decode_(decode), - row_cnt_(0), - buf_cnt_(0), task_type_(task_type), usage_(task_mode), folder_path_(folder_path), class_index_(class_index), - rows_per_buffer_(rows_per_buffer), data_schema_(std::move(data_schema)) { io_block_queues_.Init(num_workers_, queue_size); } -Status VOCOp::TraverseSampleIds(const std::shared_ptr &sample_ids, std::vector *keys) { - for (auto itr = sample_ids->begin(); itr != sample_ids->end(); ++itr) { - if ((*itr) > num_rows_) continue; - keys->push_back(*itr); - row_cnt_++; - if (row_cnt_ % rows_per_buffer_ == 0) { - RETURN_IF_NOT_OK(io_block_queues_[buf_cnt_++ % num_workers_]->Add( - std::make_unique(IOBlock(*keys, IOBlock::kDeIoBlockNone)))); - keys->clear(); - } - } - return Status::OK(); -} - -Status VOCOp::operator()() { - RETURN_IF_NOT_OK(LaunchThreadsAndInitOp()); - std::unique_ptr sampler_buffer; - RETURN_IF_NOT_OK(sampler_->GetNextSample(&sampler_buffer)); - while (true) { - std::vector keys; - keys.reserve(rows_per_buffer_); - while (sampler_buffer->eoe() == false) { - std::shared_ptr sample_ids; - RETURN_IF_NOT_OK(sampler_buffer->GetTensor(&sample_ids, 0, 0)); - if (sample_ids->type() != DataType(DataType::DE_INT64)) { - RETURN_STATUS_UNEXPECTED("Invalid parameter, data type of Sampler Tensor isn't int64, got " + - sample_ids->type().ToString()); - } - RETURN_IF_NOT_OK(TraverseSampleIds(sample_ids, &keys)); - RETURN_IF_NOT_OK(sampler_->GetNextSample(&sampler_buffer)); - } - if (keys.empty() == false) { - RETURN_IF_NOT_OK(io_block_queues_[(buf_cnt_++) % num_workers_]->Add( - std::make_unique(IOBlock(keys, IOBlock::kDeIoBlockNone)))); - } - if (IsLastIteration()) { - std::unique_ptr eoe_block = std::make_unique(IOBlock::kDeIoBlockFlagEoe); - std::unique_ptr eof_block = std::make_unique(IOBlock::kDeIoBlockFlagEof); - RETURN_IF_NOT_OK(io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::move(eoe_block))); - RETURN_IF_NOT_OK(io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::move(eof_block))); - for (int32_t i = 0; i < num_workers_; i++) { - RETURN_IF_NOT_OK( - io_block_queues_[i]->Add(std::make_unique(std::vector(), IOBlock::kDeIoBlockNone))); - } - return Status::OK(); - } else { - RETURN_IF_NOT_OK( - io_block_queues_[(buf_cnt_++) % num_workers_]->Add(std::make_unique(IOBlock::kDeIoBlockFlagEoe))); - } - - if (epoch_sync_flag_) { - // If epoch_sync_flag_ is set, then master thread sleeps until all the worker threads have finished their job for - // the current epoch. - RETURN_IF_NOT_OK(WaitForWorkers()); - } - // If not the last repeat, self-reset and go to loop again. - if (!IsLastIteration()) { - RETURN_IF_NOT_OK(Reset()); - RETURN_IF_NOT_OK(sampler_->GetNextSample(&sampler_buffer)); - } - UpdateRepeatAndEpochCounter(); - } -} - void VOCOp::Print(std::ostream &out, bool show_all) const { if (!show_all) { // Call the super class for displaying any common 1-liner info @@ -191,14 +124,8 @@ void VOCOp::Print(std::ostream &out, bool show_all) const { } } -Status VOCOp::Reset() { - MS_LOG(DEBUG) << Name() << " performing a self-reset."; - RETURN_IF_NOT_OK(sampler_->ResetSampler()); - row_cnt_ = 0; - return Status::OK(); -} - -Status VOCOp::LoadTensorRow(row_id_type row_id, const std::string &image_id, TensorRow *trow) { +Status VOCOp::LoadTensorRow(row_id_type row_id, TensorRow *trow) { + std::string image_id = image_ids_[row_id]; if (task_type_ == TaskType::Segmentation) { std::shared_ptr image, target; const std::string kImageFile = @@ -226,48 +153,6 @@ Status VOCOp::LoadTensorRow(row_id_type row_id, const std::string &image_id, Ten return Status::OK(); } -Status VOCOp::LoadBuffer(const std::vector &keys, std::unique_ptr *db) { - std::unique_ptr deq = std::make_unique(); - TensorRow trow; - for (const uint64_t &key : keys) { - RETURN_IF_NOT_OK(this->LoadTensorRow(key, image_ids_[key], &trow)); - deq->push_back(std::move(trow)); - } - (*db)->set_tensor_table(std::move(deq)); - return Status::OK(); -} - -Status VOCOp::WorkerEntry(int32_t worker_id) { - TaskManager::FindMe()->Post(); - int64_t buffer_id = worker_id; - std::unique_ptr io_block; - RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&io_block)); - while (io_block != nullptr) { - if (io_block->wait() == true) { - // Sync io_block is a signal that master thread wants us to pause and sync with other workers. - // The last guy who comes to this sync point should reset the counter and wake up the master thread. - if (++num_workers_paused_ == num_workers_) { - wait_for_workers_post_.Set(); - } - } else if (io_block->eoe() == true) { - RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::make_unique(0, DataBuffer::kDeBFlagEOE))); - buffer_id = worker_id; - } else if (io_block->eof() == true) { - RETURN_IF_NOT_OK(out_connector_->Add(worker_id, (std::make_unique(0, DataBuffer::kDeBFlagEOF)))); - } else { - std::vector keys; - RETURN_IF_NOT_OK(io_block->GetKeys(&keys)); - if (keys.empty() == true) return Status::OK(); - std::unique_ptr db = std::make_unique(buffer_id, DataBuffer::kDeBFlagNone); - RETURN_IF_NOT_OK(LoadBuffer(keys, &db)); - RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::move(db))); - buffer_id += num_workers_; - } - RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&io_block)); - } - RETURN_STATUS_UNEXPECTED("Unexpected nullptr received in worker"); -} - Status VOCOp::ParseImageIds() { std::string image_sets_file; if (task_type_ == TaskType::Segmentation) { @@ -378,11 +263,6 @@ Status VOCOp::ParseAnnotationBbox(const std::string &path) { return Status::OK(); } -Status VOCOp::InitSampler() { - RETURN_IF_NOT_OK(sampler_->HandshakeRandomAccessOp(this)); - return Status::OK(); -} - Status VOCOp::LaunchThreadsAndInitOp() { if (tree_ == nullptr) { RETURN_STATUS_UNEXPECTED("Pipeline init failed, Execution tree not set."); diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/voc_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/voc_op.h index 355de2266c..7ba853449e 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/voc_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/voc_op.h @@ -27,6 +27,7 @@ #include "minddata/dataset/engine/data_buffer.h" #include "minddata/dataset/engine/data_schema.h" #include "minddata/dataset/engine/datasetops/parallel_op.h" +#include "minddata/dataset/engine/datasetops/source/mappable_leaf_op.h" #include "minddata/dataset/engine/datasetops/source/sampler/sampler.h" #include "minddata/dataset/kernels/image/image_utils.h" #include "minddata/dataset/util/path.h" @@ -45,7 +46,7 @@ class Queue; using Annotation = std::vector>>; -class VOCOp : public ParallelOp, public RandomAccessOp { +class VOCOp : public MappableLeafOp { public: enum class TaskType { Segmentation = 0, Detection = 1 }; @@ -175,17 +176,6 @@ class VOCOp : public ParallelOp, public RandomAccessOp { // Destructor ~VOCOp() = default; - // Worker thread pulls a number of IOBlock from IOBlock Queue, make a buffer and push it to Connector - // @param int32_t workerId - id of each worker - // @return Status The status code returned - Status WorkerEntry(int32_t worker_id) override; - - // Main Loop of VOCOp - // Master thread: Fill IOBlockQueue, then goes to sleep - // Worker thread: pulls IOBlock from IOBlockQueue, work on it the put buffer to mOutConnector - // @return Status The status code returned - Status operator()() override; - // A print method typically used for debugging // @param out // @param show_all @@ -219,16 +209,12 @@ class VOCOp : public ParallelOp, public RandomAccessOp { Status GetClassIndexing(std::vector>> *output_class_indexing) override; private: - // Initialize Sampler, calls sampler->Init() within - // @return Status The status code returned - Status InitSampler(); - // Load a tensor row according to image id // @param row_id_type row_id - id for this tensor row // @param std::string image_id - image id // @param TensorRow row - image & target read into this tensor row // @return Status The status code returned - Status LoadTensorRow(row_id_type row_id, const std::string &image_id, TensorRow *row); + Status LoadTensorRow(row_id_type row_id, TensorRow *row) override; // @param const std::string &path - path to the image file // @param const ColDescriptor &col - contains tensor implementation and datatype @@ -241,11 +227,6 @@ class VOCOp : public ParallelOp, public RandomAccessOp { // @return Status The status code returned Status ReadAnnotationToTensor(const std::string &path, TensorRow *row); - // @param const std::vector &keys - keys in ioblock - // @param std::unique_ptr db - // @return Status The status code returned - Status LoadBuffer(const std::vector &keys, std::unique_ptr *db); - // Read image list from ImageSets // @return Status The status code returned Status ParseImageIds(); @@ -264,18 +245,9 @@ class VOCOp : public ParallelOp, public RandomAccessOp { // @return Status The status code returned void ParseNodeValue(XMLElement *bbox_node, const char *name, float *value); - // @param const std::shared_ptr &sample_ids - sample ids of tensor - // @param std::vector *keys - image id - // @return Status The status code returned - Status TraverseSampleIds(const std::shared_ptr &sample_ids, std::vector *keys); - // Called first when function is called // @return Status The status code returned - Status LaunchThreadsAndInitOp(); - - // Reset dataset state - // @return Status The status code returned - Status Reset() override; + Status LaunchThreadsAndInitOp() override; // Private function for computing the assignment of the column name map. // @return - Status diff --git a/mindspore/lite/minddata/CMakeLists.txt b/mindspore/lite/minddata/CMakeLists.txt index 5d898572cf..bc3c573782 100644 --- a/mindspore/lite/minddata/CMakeLists.txt +++ b/mindspore/lite/minddata/CMakeLists.txt @@ -154,6 +154,7 @@ if(BUILD_MINDDATA STREQUAL "full") ${MINDDATA_DIR}/engine/datasetops/map_op/cpu_map_job.cc ${MINDDATA_DIR}/engine/datasetops/source/album_op.cc ${MINDDATA_DIR}/engine/datasetops/source/mnist_op.cc + ${MINDDATA_DIR}/engine/datasetops/source/mappable_leaf_op.cc ${MINDDATA_DIR}/engine/datasetops/source/io_block.cc ${MINDDATA_DIR}/engine/opt/pre/getter_pass.cc