You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

batch_op.cc 19 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417
  1. /**
  2. * Copyright 2019 Huawei Technologies Co., Ltd
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "dataset/engine/datasetops/batch_op.h"
  17. #include <utility>
  18. #include <iomanip>
  19. #include "common/utils.h"
  20. #include "dataset/core/pybind_support.h"
  21. #include "dataset/engine/data_buffer.h"
  22. #include "dataset/engine/db_connector.h"
  23. #include "dataset/engine/opt/pass.h"
  24. #include "dataset/kernels/data/data_utils.h"
  25. using float16 = Eigen::half;
  26. namespace mindspore {
  27. namespace dataset {
  28. BatchOp::Builder::Builder(int32_t batch_size) : builder_drop_(false), builder_pad_(false), builder_pad_map_({}) {
  29. builder_batch_size_ = batch_size;
  30. std::shared_ptr<ConfigManager> cfg = GlobalContext::config_manager();
  31. builder_num_workers_ = cfg->num_parallel_workers();
  32. builder_op_connector_size_ = cfg->op_connector_size();
  33. }
  34. Status BatchOp::Builder::Build(std::shared_ptr<BatchOp> *ptr) {
  35. RETURN_IF_NOT_OK(SanityCheck());
  36. *ptr = std::make_shared<BatchOp>(builder_batch_size_, builder_drop_, builder_pad_, builder_op_connector_size_,
  37. builder_num_workers_, builder_cols_to_map_, builder_batch_size_func_,
  38. builder_batch_map_func_, builder_pad_map_);
  39. return Status::OK();
  40. }
  41. Status BatchOp::Builder::SanityCheck() {
  42. std::string err;
  43. err += builder_op_connector_size_ <= 0 ? "connector size <= 0\n" : "";
  44. err += builder_batch_size_ <= 0 ? "batch size <= 0\n" : "";
  45. err += builder_num_workers_ <= 0 ? "batch num_parallel_workers <= 0\n" : "";
  46. return err.empty() ? Status::OK() : Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, common::SafeCStr(err));
  47. }
  48. BatchOp::BatchOp(int32_t batch_size, bool drop, bool pad, int32_t op_queue_size, int32_t num_workers,
  49. const std::vector<std::string> &cols_to_map, py::function batch_size_func, py::function batch_map_func,
  50. PadInfo pad_map)
  51. : ParallelOp(num_workers, op_queue_size),
  52. start_batch_size_(batch_size),
  53. drop_(drop),
  54. pad_(pad),
  55. pyfunc_column_names_(cols_to_map),
  56. batch_size_func_(batch_size_func),
  57. batch_map_func_(batch_map_func),
  58. pad_info_(pad_map) {
  59. worker_queues_.Init(num_workers, op_queue_size);
  60. }
  61. Status BatchOp::operator()() {
  62. Status rc = LaunchThreadsAndInitOp();
  63. // Synchronize with TaskManager
  64. TaskManager::FindMe()->Post();
  65. RETURN_IF_NOT_OK(rc);
  66. int64_t epoch_num = 0, batch_num = 0, cnt = 0;
  67. TensorRow new_row;
  68. std::unique_ptr<TensorQTable> table = std::make_unique<TensorQTable>();
  69. child_iterator_ = std::make_unique<ChildIterator>(this, 0, 0);
  70. RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&new_row));
  71. RETURN_IF_NOT_OK(DatasetOp::AssignColMapFromChild()); // must come after the first fetch above
  72. int32_t cur_batch_size = 0;
  73. RETURN_IF_NOT_OK(GetBatchSize(&cur_batch_size, CBatchInfo(0, 0, 0)));
  74. while (child_iterator_->eof_handled() == false) {
  75. while (new_row.empty() == false) {
  76. table->emplace_back(new_row);
  77. // if # of rows is enough to make 1 batch (1 batch is buffer), send it to worker_queue
  78. if (table->size() == static_cast<size_t>(cur_batch_size)) {
  79. RETURN_IF_NOT_OK(worker_queues_[cnt++ % num_workers_]->EmplaceBack(
  80. std::make_pair(std::move(table), CBatchInfo(epoch_num, batch_num++, cnt - epoch_num))));
  81. table = std::make_unique<TensorQTable>();
  82. RETURN_IF_NOT_OK(GetBatchSize(&cur_batch_size, CBatchInfo(epoch_num, batch_num, cnt - epoch_num)));
  83. }
  84. RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&new_row));
  85. }
  86. // Reminder logic, execute only when there is a remainder (table is non empty) and don't drop
  87. if (drop_ == false && table->empty() == false) {
  88. RETURN_IF_NOT_OK(worker_queues_[cnt++ % num_workers_]->EmplaceBack(
  89. std::make_pair(std::move(table), CBatchInfo(epoch_num, batch_num++, cnt - epoch_num))));
  90. }
  91. table = std::make_unique<TensorQTable>(); // this drops when drop == true
  92. // end of the current epoch, batch_num should start from 0 again
  93. batch_num = 0;
  94. epoch_num++;
  95. RETURN_IF_NOT_OK(
  96. worker_queues_[cnt++ % num_workers_]->EmplaceBack(std::make_pair(nullptr, CBatchInfo(batchCtrl::kEOE))));
  97. RETURN_IF_NOT_OK(GetBatchSize(&cur_batch_size, CBatchInfo(epoch_num, batch_num, cnt - epoch_num)));
  98. RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&new_row));
  99. } // end of eof_handled() == false
  100. RETURN_IF_NOT_OK(
  101. worker_queues_[cnt++ % num_workers_]->EmplaceBack(std::make_pair(nullptr, CBatchInfo(batchCtrl::kEOF))));
  102. // EOF received, send quit signal (an empty buffer) to all workers
  103. for (int32_t ind = 0; ind < num_workers_; ind++) {
  104. RETURN_IF_NOT_OK(
  105. worker_queues_[cnt++ % num_workers_]->EmplaceBack(std::make_pair(nullptr, CBatchInfo(batchCtrl::kQuit))));
  106. }
  107. return Status::OK();
  108. }
  109. void BatchOp::Print(std::ostream &out, bool show_all) const {
  110. // Always show the id and name as first line regardless if this summary or detailed print
  111. out << "(" << std::setw(2) << operator_id_ << ") <BatchOp>:";
  112. if (!show_all) {
  113. // Call the super class for displaying any common 1-liner info
  114. ParallelOp::Print(out, show_all);
  115. // Then show any custom derived-internal 1-liner info for this op
  116. out << " [batch size: " << start_batch_size_ << "]\n";
  117. } else {
  118. // Call the super class for displaying any common detailed info
  119. ParallelOp::Print(out, show_all);
  120. // Then show any custom derived-internal stuff
  121. out << "\nStart batch size: " << start_batch_size_ << "\nDrop remainder: " << (drop_ ? "yes" : "no") << "\n\n";
  122. }
  123. }
  124. Status BatchOp::BatchRows(const std::unique_ptr<TensorQTable> *src, const std::unique_ptr<TensorQTable> *dest,
  125. dsize_t batch_size) {
  126. if ((*src)->size() != batch_size) {
  127. RETURN_STATUS_UNEXPECTED("[Internal Batch ERROR] Source table size does not match the batch_size");
  128. }
  129. if (batch_size == 1) {
  130. TensorRow row = std::move((*src)->front());
  131. (*src)->pop_front();
  132. (*dest)->push_back(row);
  133. for (const auto &tensor : (*dest)->front()) {
  134. RETURN_IF_NOT_OK(tensor->ExpandDim(0));
  135. }
  136. return Status::OK();
  137. }
  138. TensorRow batched_row;
  139. auto num_columns = (*src)->front().size();
  140. for (size_t i = 0; i < num_columns; i++) {
  141. std::shared_ptr<Tensor> first_tensor = (*src)->at(0).at(i); // first row, column i
  142. TensorShape first_shape = first_tensor->shape();
  143. DataType first_type = first_tensor->type();
  144. TensorShape new_shape = first_shape.PrependDim(static_cast<int64_t>(batch_size));
  145. std::shared_ptr<Tensor> new_tensor;
  146. if (first_type.IsNumeric()) { // numeric tensor
  147. RETURN_IF_NOT_OK(Tensor::CreateTensor(&new_tensor, TensorImpl::kFlexible, new_shape, first_type));
  148. dsize_t j = 0;
  149. for (auto row : **src) {
  150. std::shared_ptr<Tensor> old_tensor = row.at(i); // row j, column i
  151. if (old_tensor->shape() == first_shape) { // check the newly popped rows have the same dim as the first
  152. RETURN_IF_NOT_OK(new_tensor->InsertTensor({j++}, old_tensor));
  153. } else {
  154. RETURN_STATUS_UNEXPECTED("[Batch ERROR] Inconsistent TensorShapes of Column " + std::to_string(i));
  155. }
  156. }
  157. } else { // handle string column differently
  158. std::vector<std::string> strings;
  159. for (dsize_t j = 0; j < batch_size; j++) {
  160. std::shared_ptr<Tensor> old_tensor = (*src)->at(j).at(i);
  161. for (auto itr = old_tensor->begin<std::string_view>(); itr != old_tensor->end<std::string_view>(); itr++) {
  162. strings.emplace_back(*itr);
  163. }
  164. }
  165. RETURN_IF_NOT_OK(Tensor::CreateTensor(&new_tensor, strings, new_shape));
  166. }
  167. batched_row.emplace_back(new_tensor);
  168. }
  169. (*dest)->emplace_back(batched_row);
  170. return Status::OK();
  171. }
  172. Status BatchOp::WorkerEntry(int32_t workerId) {
  173. TaskManager::FindMe()->Post();
  174. std::pair<std::unique_ptr<TensorQTable>, CBatchInfo> table_pair;
  175. RETURN_IF_NOT_OK(worker_queues_[workerId]->PopFront(&table_pair));
  176. while (table_pair.second.ctrl_ != batchCtrl::kQuit) {
  177. if (table_pair.second.ctrl_ == batchCtrl::kEOE) {
  178. RETURN_IF_NOT_OK(out_connector_->Add(workerId, std::make_unique<DataBuffer>(0, DataBuffer::kDeBFlagEOE)));
  179. } else if (table_pair.second.ctrl_ == batchCtrl::kEOF) {
  180. RETURN_IF_NOT_OK(out_connector_->Add(workerId, std::make_unique<DataBuffer>(0, DataBuffer::kDeBFlagEOF)));
  181. } else if (table_pair.second.ctrl_ == batchCtrl::kNoCtrl) {
  182. std::unique_ptr<DataBuffer> db = nullptr;
  183. RETURN_IF_NOT_OK(MakeBatchedBuffer(std::move(table_pair), &db));
  184. RETURN_IF_NOT_OK(out_connector_->Add(workerId, std::move(db)));
  185. }
  186. RETURN_IF_NOT_OK(worker_queues_[workerId]->PopFront(&table_pair));
  187. }
  188. return Status::OK();
  189. }
  190. Status BatchOp::MakeBatchedBuffer(std::pair<std::unique_ptr<TensorQTable>, CBatchInfo> table_pair,
  191. std::unique_ptr<DataBuffer> *db) {
  192. RETURN_UNEXPECTED_IF_NULL(table_pair.first);
  193. if (!pyfunc_column_names_.empty()) RETURN_IF_NOT_OK(MapColumns(&table_pair)); // pass it through pyfunc
  194. if (pad_) RETURN_IF_NOT_OK(PadColumns(&table_pair.first, pad_info_, column_name_id_map_)); // do padding if needed
  195. (*db) = std::make_unique<DataBuffer>(table_pair.second.batch_num_, DataBuffer::kDeBFlagNone);
  196. std::unique_ptr<TensorQTable> dest_table = std::make_unique<TensorQTable>();
  197. RETURN_IF_NOT_OK(BatchRows(&table_pair.first, &dest_table, table_pair.first->size()));
  198. (*db)->set_tensor_table(std::move(dest_table));
  199. return Status::OK();
  200. }
  201. Status BatchOp::LaunchThreadsAndInitOp() {
  202. RETURN_UNEXPECTED_IF_NULL(tree_);
  203. RETURN_IF_NOT_OK(worker_queues_.Register(tree_->AllTasks()));
  204. RETURN_IF_NOT_OK(tree_->LaunchWorkers(num_workers_, std::bind(&BatchOp::WorkerEntry, this, std::placeholders::_1)));
  205. return Status::OK();
  206. }
  207. Status BatchOp::EofReceived(int32_t) { return Status::OK(); }
  208. Status BatchOp::EoeReceived(int32_t) {
  209. state_ = OpState::kDeOpIdle;
  210. return Status::OK();
  211. }
  212. Status BatchOp::MapColumns(std::pair<std::unique_ptr<TensorQTable>, CBatchInfo> *table_pair) {
  213. TensorBatchTable input_table;
  214. input_table.reserve(pyfunc_column_names_.size());
  215. for (std::string col_name : pyfunc_column_names_) {
  216. if (column_name_id_map_.find(col_name) == column_name_id_map_.end()) {
  217. RETURN_STATUS_UNEXPECTED("column : '" + col_name + "' does not exist\n");
  218. }
  219. TensorBatch tensor_batch;
  220. tensor_batch.reserve(table_pair->first->size());
  221. size_t col_idx = static_cast<size_t>(column_name_id_map_[col_name]);
  222. for (size_t row_idx = 0; row_idx < table_pair->first->size(); row_idx++) {
  223. tensor_batch.push_back(std::move(table_pair->first->at(row_idx)[col_idx]));
  224. }
  225. input_table.push_back(std::move(tensor_batch));
  226. }
  227. // Perform batch map
  228. TensorBatchTable output_table;
  229. RETURN_IF_NOT_OK(InvokeBatchMapFunc(&input_table, &output_table, table_pair->second));
  230. // Write back to TensorQTable
  231. for (size_t input_idx = 0; input_idx < pyfunc_column_names_.size(); input_idx++) {
  232. size_t col_idx = static_cast<size_t>(column_name_id_map_[pyfunc_column_names_[input_idx]]);
  233. size_t row_id = 0;
  234. for (TensorRow &row : *(table_pair->first)) {
  235. row[col_idx] = std::move(output_table[input_idx][row_id++]);
  236. }
  237. }
  238. return Status::OK();
  239. }
  240. Status BatchOp::GetBatchSize(int32_t *batch_size, CBatchInfo info) {
  241. if (batch_size_func_ != nullptr) {
  242. RETURN_IF_NOT_OK(InvokeBatchSizeFunc(batch_size, info));
  243. } else {
  244. (*batch_size) = start_batch_size_;
  245. }
  246. return Status::OK();
  247. }
  248. Status BatchOp::InvokeBatchSizeFunc(int32_t *batch_size, CBatchInfo info) {
  249. {
  250. // Acquire Python GIL
  251. py::gil_scoped_acquire gil_acquire;
  252. if (Py_IsInitialized() == 0) {
  253. return Status(StatusCode::kPythonInterpreterFailure, "Python Interpreter is finalized");
  254. }
  255. try {
  256. py::object size = batch_size_func_(info);
  257. *batch_size = size.cast<int32_t>();
  258. if (*batch_size <= 0) {
  259. return Status(StatusCode::kPyFuncException, "Batch size function should return an integer > 0");
  260. }
  261. } catch (const py::error_already_set &e) {
  262. return Status(StatusCode::kPyFuncException, e.what());
  263. } catch (const py::cast_error &e) {
  264. return Status(StatusCode::kPyFuncException, "Batch size function should return an integer > 0");
  265. }
  266. }
  267. return Status(StatusCode::kOK, "Batch size func call succeed");
  268. }
  269. Status BatchOp::InvokeBatchMapFunc(TensorBatchTable *input, TensorBatchTable *output, CBatchInfo info) {
  270. {
  271. // Acquire Python GIL
  272. py::gil_scoped_acquire gil_acquire;
  273. if (Py_IsInitialized() == 0) {
  274. return Status(StatusCode::kPythonInterpreterFailure, "Python Interpreter is finalized");
  275. }
  276. try {
  277. // Prepare batch map call back parameters
  278. py::tuple input_args(input->size() + 1);
  279. for (size_t i = 0; i < input->size(); i++) {
  280. std::vector<py::array> np_batch;
  281. for (std::shared_ptr<Tensor> t : input->at(i)) {
  282. py::array np_array;
  283. RETURN_IF_NOT_OK(t->GetDataAsNumpy(&np_array));
  284. np_batch.push_back(std::move(np_array));
  285. }
  286. input_args[i] = np_batch;
  287. }
  288. input_args[input->size()] = info;
  289. // Invoke batch map func
  290. py::object ret_py_obj = batch_map_func_(*input_args);
  291. // Parse batch map return value
  292. py::tuple ret_tuple = py::cast<py::tuple>(ret_py_obj);
  293. if (ret_tuple.size() != pyfunc_column_names_.size() || !py::isinstance<py::tuple>(ret_tuple)) {
  294. return Status(StatusCode::kPyFuncException, "Batch map function should return a tuple");
  295. }
  296. for (size_t i = 0; i < ret_tuple.size(); i++) {
  297. TensorBatch output_batch;
  298. py::list output_list = py::cast<py::list>(ret_tuple[i]);
  299. for (size_t j = 0; j < output_list.size(); j++) {
  300. std::shared_ptr<Tensor> out;
  301. RETURN_IF_NOT_OK(Tensor::CreateTensor(&out, py::cast<py::array>(output_list[j])));
  302. output_batch.push_back(std::move(out));
  303. }
  304. output->push_back(std::move(output_batch));
  305. }
  306. } catch (const py::error_already_set &e) {
  307. return Status(StatusCode::kPyFuncException, e.what());
  308. } catch (const py::cast_error &e) {
  309. return Status(StatusCode::kPyFuncException, "Batch map function should return an tuple of list of numpy array");
  310. }
  311. }
  312. return Status(StatusCode::kOK);
  313. }
  314. Status BatchOp::PadColumns(std::unique_ptr<TensorQTable> *table, const PadInfo &pad_info,
  315. const std::unordered_map<std::string, int32_t> &column_name_id_map) {
  316. RETURN_UNEXPECTED_IF_NULL(table); // placeholder for now, might need this in the future
  317. CHECK_FAIL_RETURN_UNEXPECTED((*table)->front().size() == column_name_id_map.size(), "col_name_map mismatch");
  318. std::vector<std::shared_ptr<Tensor>> pad_vals(column_name_id_map.size(),
  319. 0); // value to pad each column's tensor with, default 0
  320. std::set<int32_t> pad_cols;
  321. // padded_shape provided by user, maximum shapes of current batch of tensors
  322. std::vector<std::vector<dsize_t>> pad_shapes(column_name_id_map.size()), max_shapes(column_name_id_map.size());
  323. RETURN_IF_NOT_OK(UnpackPadInfo(pad_info, column_name_id_map, &pad_cols, &pad_vals, &pad_shapes));
  324. // init each shape in max_shape to {-1,-1...} init each unspecified shape in pad_shape to -1 as well
  325. for (size_t col_id : pad_cols) {
  326. max_shapes[col_id] = std::vector<dsize_t>((*table)->front()[col_id]->Rank(), -1);
  327. if (pad_shapes[col_id].empty()) pad_shapes[col_id] = max_shapes[col_id]; // fill pad shape with -1
  328. CHECK_FAIL_RETURN_UNEXPECTED(pad_shapes[col_id].size() == max_shapes[col_id].size(), "wrong rank in pad_shape");
  329. }
  330. // calculate maximum shape for each column that needs to be padded
  331. for (const TensorRow &row : **table) { // iterator each row in a batch
  332. for (size_t col_id : pad_cols) { // iterator each tensor in a row
  333. CHECK_FAIL_RETURN_UNEXPECTED(row[col_id]->Rank() == max_shapes[col_id].size(),
  334. "Tensor to be padded together need to have the same rank");
  335. for (size_t dim = 0; dim < row[col_id]->Rank(); dim++) { // pick the largest number in each dimension
  336. max_shapes[col_id][dim] = std::max(max_shapes[col_id][dim], row[col_id]->shape()[dim]);
  337. }
  338. }
  339. }
  340. // if user sets a dimension to -1 (None in python), use the max value for current dimension
  341. for (size_t col_id : pad_cols) {
  342. for (size_t dim = 0; dim < pad_shapes[col_id].size(); dim++) {
  343. if (pad_shapes[col_id][dim] < 0) pad_shapes[col_id][dim] = max_shapes[col_id][dim];
  344. }
  345. }
  346. // call pad on each tensor that needs to be padded
  347. for (TensorRow &row : **table) {
  348. for (size_t col_id : pad_cols) {
  349. std::shared_ptr<Tensor> pad_tensor;
  350. RETURN_IF_NOT_OK(PadEnd(row[col_id], &pad_tensor, pad_shapes[col_id], pad_vals[col_id]));
  351. row[col_id] = pad_tensor;
  352. }
  353. }
  354. return Status::OK();
  355. }
  356. Status BatchOp::UnpackPadInfo(const PadInfo &pad_info,
  357. const std::unordered_map<std::string, int32_t> &column_name_id_map,
  358. std::set<int32_t> *pad_cols, std::vector<std::shared_ptr<Tensor>> *pad_vals,
  359. std::vector<std::vector<dsize_t>> *pad_shapes) {
  360. if (pad_info.empty()) { // if pad_info empty, pad every columns automatically
  361. for (dsize_t col_id = 0; col_id < column_name_id_map.size(); col_id++) {
  362. pad_cols->insert(col_id);
  363. }
  364. } else {
  365. for (const auto &p : pad_info) {
  366. auto location = column_name_id_map.find(p.first);
  367. CHECK_FAIL_RETURN_UNEXPECTED(location != column_name_id_map.end(), "no column exists with name:" + p.first);
  368. auto col_id = static_cast<dsize_t>(location->second);
  369. CHECK_FAIL_RETURN_UNEXPECTED(col_id < pad_vals->size() && col_id < pad_shapes->size(), "col_id out of bound");
  370. pad_cols->insert(col_id);
  371. (*pad_vals)[col_id] = p.second.second; // set pad values
  372. (*pad_shapes)[col_id] = p.second.first.AsVector(); // empty vector if shape is unknown
  373. }
  374. }
  375. return Status::OK();
  376. }
  377. // Visitor accept method for NodePass
  378. Status BatchOp::Accept(NodePass *p, bool *modified) {
  379. // Downcast shared pointer then call visitor
  380. return p->RunOnNode(std::static_pointer_cast<BatchOp>(shared_from_this()), modified);
  381. }
  382. } // namespace dataset
  383. } // namespace mindspore