You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

dataset_op.cc 11 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. /**
  2. * Copyright 2019 Huawei Technologies Co., Ltd
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "dataset/engine/datasetops/dataset_op.h"
  17. #include <iomanip>
  18. #include <iostream>
  19. #include <memory>
  20. #include <utility>
  21. #include <string>
  22. #include "dataset/engine/execution_tree.h"
  23. #include "dataset/engine/datasetops/device_queue_op.h"
  24. #include "dataset/engine/data_buffer.h"
  25. #include "dataset/engine/db_connector.h"
  26. #include "utils/log_adapter.h"
  27. namespace mindspore {
  28. namespace dataset {
  29. // Constructor
  30. DatasetOp::DatasetOp(int32_t op_connector_size)
  31. : oc_queue_size_(op_connector_size),
  32. operator_id_(kInvalidOperatorId),
  33. tree_(nullptr),
  34. state_(OpState::kDeOpIdle),
  35. op_ctrl_flags_(kDeOpNone),
  36. first_fetch_(true) {
  37. // The operator starts out with an invalid operator id. The only way to
  38. // get it out of invalid state is to assign the operator to an execution tree.
  39. }
  40. // Adds a operator to become our child.
  41. Status DatasetOp::AddChild(std::shared_ptr<DatasetOp> child) {
  42. if (std::dynamic_pointer_cast<DeviceQueueOp>(child) != nullptr) {
  43. std::string err_msg("DeviceQueueOp cannot be added as a child, DeviceQueueOp must be a root node");
  44. RETURN_STATUS_UNEXPECTED(err_msg);
  45. }
  46. if (operator_id_ == kInvalidOperatorId) {
  47. std::string err_msg(
  48. "Cannot add child node. Tree node connections can only"
  49. "be made if the node belongs to a tree.");
  50. RETURN_STATUS_UNEXPECTED(err_msg);
  51. }
  52. // disallow relationships with other trees
  53. if (tree_ != child->tree_) {
  54. std::string err_msg(
  55. "Cannot add child node. Tree node connections can only be made if both nodes belong to the same tree.");
  56. RETURN_STATUS_UNEXPECTED(err_msg);
  57. }
  58. child_.push_back(child);
  59. child->AddParent(this);
  60. return Status::OK();
  61. }
  62. // Adds a parent operator to this operator
  63. void DatasetOp::AddParent(const DatasetOp *parent) { parent_.push_back(parent); }
  64. // Getter function to get a shared pointer to our childAdds a operator to become our child.
  65. std::shared_ptr<DatasetOp> DatasetOp::child(int32_t child_index) const {
  66. DS_ASSERT(child_index < static_cast<int>(child_.size()));
  67. // Return a shared pointer
  68. return child_[child_index];
  69. }
  70. // Creates the connector within this operator
  71. void DatasetOp::CreateConnector(int32_t num_producers, int32_t num_consumers) {
  72. MS_LOG(INFO) << "Creating connector in tree operator: " << operator_id_ << ". Producer: " << num_producers
  73. << ". Consumer: " << num_consumers << ".";
  74. if (oc_queue_size_ > 0) {
  75. out_connector_ = std::make_unique<DbConnector>(num_producers, // The number of producers
  76. num_consumers, // Only one consumer (the training App)
  77. oc_queue_size_);
  78. } else {
  79. // Some op's may choose not to have an output connector
  80. MS_LOG(INFO) << "Bypassed connector creation for tree operator: " << operator_id_ << ".";
  81. out_connector_ = nullptr;
  82. }
  83. }
  84. // A print method typically used for debugging. showAll of true will recursively descend to child prints
  85. void DatasetOp::Print(std::ostream &out, bool show_all) const {
  86. // When show_all is false, we display a 1 liner piece of text for the op.
  87. // When show_all is true, we display more detailed output for the op.
  88. // Derived printers should show their own header info, then call base class printer, followed by
  89. // derived-specific items.
  90. // For now, the base class doesn't have any summary info to show so it's a no-op in that case.
  91. if (show_all) {
  92. // The detailed display will show common base class info of the op. Allow the derived class to print
  93. // it's own id and name though as the first line.
  94. out << "\nNumber of children : " << child_.size();
  95. for (size_t i = 0; i < child_.size(); i++) {
  96. out << "\n Child[" << i << "] id: " << child_[i]->id();
  97. }
  98. out << "\nNumber of parents : " << parent_.size();
  99. for (size_t i = 0; i < parent_.size(); i++) {
  100. out << "\n Parent[" << i << "] id: " << parent_[i]->id();
  101. }
  102. out << "\nConnector queue size : " << oc_queue_size_ << "\nOperator control flags : 0x" << std::hex
  103. << std::setw(8) << std::setfill('0') << op_ctrl_flags_ << std::dec << std::setfill(' ');
  104. }
  105. }
  106. // Gets the next buffer from the given child
  107. Status DatasetOp::GetNextBuffer(std::unique_ptr<DataBuffer> *p_buffer, int32_t worker_id, bool retry_if_eoe) {
  108. #if defined(_WIN32) || defined(_WIN64)
  109. RETURN_IF_NOT_OK(out_connector_->PopWithRetry(static_cast<int>(worker_id), p_buffer, retry_if_eoe));
  110. #else
  111. std::unique_ptr<DataBuffer> next_buff;
  112. // pop is a blocked call and will throw an interruption if the whole group shuts down.
  113. RETURN_IF_NOT_OK(out_connector_->PopWithRetry(static_cast<int>(worker_id), &next_buff, retry_if_eoe));
  114. *p_buffer = std::move(next_buff);
  115. #endif
  116. return Status::OK();
  117. }
  118. // Gets the next buffer from the given child . This function also has built-in eoe and eof
  119. // message handling so that child classes don't have to manually code pass-through logic when
  120. // those messages are received.
  121. Status DatasetOp::GetNextInput(std::unique_ptr<DataBuffer> *p_buffer, int32_t worker_id, int32_t child_index) {
  122. if (child_.size() == 0) {
  123. return this->GetNextBuffer(p_buffer, worker_id);
  124. }
  125. CHECK_FAIL_RETURN_UNEXPECTED(child_index < child_.size(), "Child index too big : " + std::to_string(child_index));
  126. std::shared_ptr<DatasetOp> child = child_[child_index];
  127. std::unique_ptr<DataBuffer> buf;
  128. RETURN_IF_NOT_OK(child->GetNextBuffer(&buf, worker_id));
  129. // Loop until non EOE is received
  130. while (buf->eoe()) {
  131. RETURN_IF_NOT_OK(EoeReceived(worker_id));
  132. if (state_ == OpState::kDeOpIdle) {
  133. *p_buffer = std::move(buf);
  134. return Status::OK();
  135. }
  136. RETURN_IF_NOT_OK(child->GetNextBuffer(&buf, worker_id));
  137. }
  138. // Check if the last buf is next eof
  139. if (buf->eof()) {
  140. RETURN_IF_NOT_OK(EofReceived(worker_id));
  141. }
  142. *p_buffer = std::move(buf);
  143. return Status::OK();
  144. }
  145. // Performs handling for when an eoe message is received.
  146. // The base class implementation simply flows the eoe message to output. Derived classes
  147. // may override if they need to perform special eoe handling.
  148. Status DatasetOp::EoeReceived(int32_t worker_id) {
  149. std::unique_ptr<DataBuffer> eoe_buffer = std::make_unique<DataBuffer>(0, DataBuffer::kDeBFlagEOE);
  150. return (out_connector_->Add(static_cast<int>(worker_id), std::move(eoe_buffer)));
  151. }
  152. // Performs handling for when an eof message is received.
  153. // The base class implementation simply flows the eof message to output. Derived classes
  154. // may override if they need to perform special eof handling.
  155. Status DatasetOp::EofReceived(int32_t worker_id) {
  156. std::unique_ptr<DataBuffer> eof_buffer = std::make_unique<DataBuffer>(0, DataBuffer::kDeBFlagEOF);
  157. return (out_connector_->Add(static_cast<int>(worker_id), std::move(eof_buffer)));
  158. }
  159. // During tree prepare phase, operators may have specific pre-operations to perform depending on
  160. // their role.
  161. Status DatasetOp::PrepareNodePreAction() {
  162. if (BitTest(tree_->PrepareFlags(), ExecutionTree::kDePrepRepeat)) set_control_flag(kDeOpRepeated);
  163. return Status::OK();
  164. }
  165. // During tree prepare phase, operators may have specific post-operations to perform depending on
  166. // their role.
  167. Status DatasetOp::PrepareNodePostAction() {
  168. // If this op does not have any children and it is in a repeat path of the tree...
  169. if (child_.empty() && BitTest(op_ctrl_flags_, kDeOpRepeated)) {
  170. // push ourselves onto the tree repeat stack. Later, the repeat operator
  171. // above us will consume them.
  172. tree_->AddToRepeatStack(shared_from_this());
  173. }
  174. // Creating Connector object for each op.
  175. // The consumer of the root node is assumed to be one thread.
  176. // If multiple threads are consuming from the root node, they will get the ordered data in round robin fashion.
  177. if (parent_.empty()) {
  178. this->CreateConnector(num_producers(), 1);
  179. } else {
  180. this->CreateConnector(num_producers(), parent_[0]->num_consumers());
  181. }
  182. if (out_connector_) {
  183. RETURN_IF_NOT_OK(out_connector_->Register(tree_->AllTasks()));
  184. }
  185. RETURN_IF_NOT_OK(this->RegisterWorkerConnectors());
  186. return Status::OK();
  187. }
  188. // Getter function. Base class does not have any special flags setting.
  189. uint32_t DatasetOp::PrepareFlags() const { return ExecutionTree::kDePrepNone; }
  190. // Derived classes may implement the reset function if the operator is stateful and needs
  191. // specific reset handling that is not contained in this common code version of the reset.
  192. Status DatasetOp::Reset() {
  193. state_ = OpState::kDeOpRunning;
  194. return Status::OK();
  195. }
  196. // gives a string output for the column map for handy debug printing
  197. std::string DatasetOp::ColumnNameMapAsString() const {
  198. std::string outStr = "Column name id map: ";
  199. for (auto &it : column_name_id_map_) {
  200. outStr += (" " + it.first + ":" + std::to_string(it.second));
  201. }
  202. return outStr;
  203. }
  204. // A helper function for providing assignment of the column name map.
  205. // This grabs the map from child 0 and assigns it into this op.
  206. // Can only be used if number of children is 1.
  207. Status DatasetOp::AssignColMapFromChild() {
  208. if (child_.size() > 1) {
  209. RETURN_STATUS_UNEXPECTED("Assigning column name map from child only works for single-child operators.");
  210. }
  211. // Assign the correct column name map to this op by taking it from the input child.
  212. // This must be done AFTER the first fetch, but only needs to be done once by the first worker to
  213. // do the first fetch.
  214. if (first_fetch_) {
  215. // If there was a single worker, or this is being called from a master thread in a parallel op,
  216. // then the mutex is not really needed here, although it's harmless.
  217. std::unique_lock<std::mutex> lock(column_name_map_mutex_);
  218. // If the map has not been set up yet, then we are the first one in to set it up. The first_fetch_ (dirty read)
  219. // bool allows us to avoid acquiring the lock if the map has already been set.
  220. if (column_name_id_map_.empty()) {
  221. column_name_id_map_ = child_[0]->column_name_id_map();
  222. first_fetch_ = false;
  223. if (column_name_id_map_.empty()) {
  224. RETURN_STATUS_UNEXPECTED("Child column name map cannot be empty!");
  225. }
  226. }
  227. MS_LOG(INFO) << "Setting column map after first fetch:\n" << DatasetOp::ColumnNameMapAsString();
  228. }
  229. return Status::OK();
  230. }
  231. } // namespace dataset
  232. } // namespace mindspore