You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

tfReader_op_test.cc 23 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735
  1. /**
  2. * Copyright 2019-2021 Huawei Technologies Co., Ltd
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include <iostream>
  17. #include <memory>
  18. #include <vector>
  19. #include "minddata/dataset/core/client.h"
  20. #include "minddata/dataset/engine/data_schema.h"
  21. #include "common/common.h"
  22. #include "gtest/gtest.h"
  23. #include "utils/log_adapter.h"
  24. namespace common = mindspore::common;
  25. using namespace mindspore::dataset;
  26. using mindspore::MsLogLevel::INFO;
  27. using mindspore::ExceptionType::NoExceptionType;
  28. using mindspore::LogStream;
  29. class MindDataTestTFReaderOp : public UT::DatasetOpTesting {
  30. };
  31. TEST_F(MindDataTestTFReaderOp, TestTFReaderBasic1) {
  32. // Start with an empty execution tree
  33. auto my_tree = std::make_shared<ExecutionTree>();
  34. std::string dataset_path;
  35. dataset_path = datasets_root_path_ + "/testTFTestAllTypes/test.data";
  36. std::shared_ptr<TFReaderOp> my_tfreader_op;
  37. TFReaderOp::Builder builder;
  38. builder.SetDatasetFilesList({dataset_path})
  39. .SetRowsPerBuffer(16)
  40. .SetNumWorkers(16);
  41. std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>();
  42. schema->LoadSchemaFile(datasets_root_path_ + "/testTFTestAllTypes/datasetSchema.json", {});
  43. builder.SetDataSchema(std::move(schema));
  44. Status rc = builder.Build(&my_tfreader_op);
  45. ASSERT_TRUE(rc.IsOk());
  46. rc = my_tree->AssociateNode(my_tfreader_op);
  47. ASSERT_TRUE(rc.IsOk());
  48. rc = my_tree->AssignRoot(my_tfreader_op);
  49. ASSERT_TRUE(rc.IsOk());
  50. MS_LOG(INFO) << "Launching tree and begin iteration.";
  51. rc = my_tree->Prepare();
  52. ASSERT_TRUE(rc.IsOk());
  53. rc = my_tree->Launch();
  54. ASSERT_TRUE(rc.IsOk());
  55. // Start the loop of reading tensors from our pipeline
  56. DatasetIterator di(my_tree);
  57. TensorRow tensor_list;
  58. rc = di.FetchNextTensorRow(&tensor_list);
  59. ASSERT_TRUE(rc.IsOk());
  60. int row_count = 0;
  61. while (!tensor_list.empty()) {
  62. // Display the tensor by calling the printer on it
  63. for (int i = 0; i < tensor_list.size(); i++) {
  64. std::ostringstream ss;
  65. ss << "(" << tensor_list[i] << "): " << *tensor_list[i] << std::endl;
  66. MS_LOG(INFO) << "Tensor print: " << ss.str() << ".";
  67. }
  68. rc = di.FetchNextTensorRow(&tensor_list);
  69. ASSERT_TRUE(rc.IsOk());
  70. row_count++;
  71. }
  72. ASSERT_EQ(row_count, 12);
  73. }
  74. TEST_F(MindDataTestTFReaderOp, TestTFReaderLargeRowsPerBuffer) {
  75. // Start with an empty execution tree
  76. auto my_tree = std::make_shared<ExecutionTree>();
  77. std::string dataset_path;
  78. dataset_path = datasets_root_path_ + "/testTFTestAllTypes/test.data";
  79. std::shared_ptr<TFReaderOp> my_tfreader_op;
  80. TFReaderOp::Builder builder;
  81. builder.SetDatasetFilesList({dataset_path})
  82. .SetRowsPerBuffer(500)
  83. .SetNumWorkers(16);
  84. std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>();
  85. schema->LoadSchemaFile(datasets_root_path_ + "/testTFTestAllTypes/datasetSchema.json", {});
  86. builder.SetDataSchema(std::move(schema));
  87. Status rc = builder.Build(&my_tfreader_op);
  88. ASSERT_TRUE(rc.IsOk());
  89. rc = my_tree->AssociateNode(my_tfreader_op);
  90. ASSERT_TRUE(rc.IsOk());
  91. rc = my_tree->AssignRoot(my_tfreader_op);
  92. ASSERT_TRUE(rc.IsOk());
  93. MS_LOG(INFO) << "Launching tree and begin iteration.";
  94. rc = my_tree->Prepare();
  95. ASSERT_TRUE(rc.IsOk());
  96. rc = my_tree->Launch();
  97. ASSERT_TRUE(rc.IsOk());
  98. // Start the loop of reading tensors from our pipeline
  99. DatasetIterator di(my_tree);
  100. TensorRow tensor_list;
  101. rc = di.FetchNextTensorRow(&tensor_list);
  102. ASSERT_TRUE(rc.IsOk());
  103. int row_count = 0;
  104. while (!tensor_list.empty()) {
  105. // Display the tensor by calling the printer on it
  106. for (int i = 0; i < tensor_list.size(); i++) {
  107. std::ostringstream ss;
  108. ss << "(" << tensor_list[i] << "): " << *tensor_list[i] << std::endl;
  109. MS_LOG(INFO) << "Tensor print: " << ss.str() << ".";
  110. }
  111. rc = di.FetchNextTensorRow(&tensor_list);
  112. ASSERT_TRUE(rc.IsOk());
  113. row_count++;
  114. }
  115. ASSERT_EQ(row_count, 12);
  116. }
  117. TEST_F(MindDataTestTFReaderOp, TestTFReaderSmallRowsPerBuffer) {
  118. // Start with an empty execution tree
  119. auto my_tree = std::make_shared<ExecutionTree>();
  120. std::string dataset_path;
  121. dataset_path = datasets_root_path_ + "/testTFTestAllTypes/test.data";
  122. std::shared_ptr<TFReaderOp> my_tfreader_op;
  123. TFReaderOp::Builder builder;
  124. builder.SetDatasetFilesList({dataset_path})
  125. .SetRowsPerBuffer(1)
  126. .SetNumWorkers(16);
  127. std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>();
  128. schema->LoadSchemaFile(datasets_root_path_ + "/testTFTestAllTypes/datasetSchema.json", {});
  129. builder.SetDataSchema(std::move(schema));
  130. Status rc = builder.Build(&my_tfreader_op);
  131. ASSERT_TRUE(rc.IsOk());
  132. rc = my_tree->AssociateNode(my_tfreader_op);
  133. ASSERT_TRUE(rc.IsOk());
  134. rc = my_tree->AssignRoot(my_tfreader_op);
  135. ASSERT_TRUE(rc.IsOk());
  136. MS_LOG(INFO) << "Launching tree and begin iteration.";
  137. rc = my_tree->Prepare();
  138. ASSERT_TRUE(rc.IsOk());
  139. rc = my_tree->Launch();
  140. ASSERT_TRUE(rc.IsOk());
  141. // Start the loop of reading tensors from our pipeline
  142. DatasetIterator di(my_tree);
  143. TensorRow tensor_list;
  144. rc = di.FetchNextTensorRow(&tensor_list);
  145. ASSERT_TRUE(rc.IsOk());
  146. int row_count = 0;
  147. while (!tensor_list.empty()) {
  148. // Display the tensor by calling the printer on it
  149. for (int i = 0; i < tensor_list.size(); i++) {
  150. std::ostringstream ss;
  151. ss << "(" << tensor_list[i] << "): " << *tensor_list[i] << std::endl;
  152. MS_LOG(INFO) << "Tensor print: " << ss.str() << ".";
  153. }
  154. rc = di.FetchNextTensorRow(&tensor_list);
  155. ASSERT_TRUE(rc.IsOk());
  156. row_count++;
  157. }
  158. ASSERT_EQ(row_count, 12);
  159. }
  160. TEST_F(MindDataTestTFReaderOp, TestTFReaderLargeQueueSize) {
  161. // Start with an empty execution tree
  162. auto my_tree = std::make_shared<ExecutionTree>();
  163. std::string dataset_path;
  164. dataset_path = datasets_root_path_ + "/testTFTestAllTypes/test.data";
  165. std::shared_ptr<TFReaderOp> my_tfreader_op;
  166. TFReaderOp::Builder builder;
  167. builder.SetDatasetFilesList({dataset_path})
  168. .SetWorkerConnectorSize(1)
  169. .SetRowsPerBuffer(16)
  170. .SetNumWorkers(16);
  171. std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>();
  172. schema->LoadSchemaFile(datasets_root_path_ + "/testTFTestAllTypes/datasetSchema.json", {});
  173. builder.SetDataSchema(std::move(schema));
  174. Status rc = builder.Build(&my_tfreader_op);
  175. ASSERT_TRUE(rc.IsOk());
  176. rc = my_tree->AssociateNode(my_tfreader_op);
  177. ASSERT_TRUE(rc.IsOk());
  178. rc = my_tree->AssignRoot(my_tfreader_op);
  179. ASSERT_TRUE(rc.IsOk());
  180. MS_LOG(INFO) << "Launching tree and begin iteration.";
  181. rc = my_tree->Prepare();
  182. ASSERT_TRUE(rc.IsOk());
  183. rc = my_tree->Launch();
  184. ASSERT_TRUE(rc.IsOk());
  185. // Start the loop of reading tensors from our pipeline
  186. DatasetIterator di(my_tree);
  187. TensorRow tensor_list;
  188. rc = di.FetchNextTensorRow(&tensor_list);
  189. ASSERT_TRUE(rc.IsOk());
  190. int row_count = 0;
  191. while (!tensor_list.empty()) {
  192. // Display the tensor by calling the printer on it
  193. for (int i = 0; i < tensor_list.size(); i++) {
  194. std::ostringstream ss;
  195. ss << "(" << tensor_list[i] << "): " << *tensor_list[i] << std::endl;
  196. MS_LOG(INFO) << "Tensor print: " << ss.str() << ".";
  197. }
  198. rc = di.FetchNextTensorRow(&tensor_list);
  199. ASSERT_TRUE(rc.IsOk());
  200. row_count++;
  201. }
  202. ASSERT_EQ(row_count, 12);
  203. }
  204. TEST_F(MindDataTestTFReaderOp, TestTFReaderOneThread) {
  205. // Start with an empty execution tree
  206. auto my_tree = std::make_shared<ExecutionTree>();
  207. std::string dataset_path;
  208. dataset_path = datasets_root_path_ + "/testTFTestAllTypes/test.data";
  209. std::shared_ptr<TFReaderOp> my_tfreader_op;
  210. TFReaderOp::Builder builder;
  211. builder.SetDatasetFilesList({dataset_path})
  212. .SetRowsPerBuffer(16)
  213. .SetNumWorkers(1);
  214. std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>();
  215. schema->LoadSchemaFile(datasets_root_path_ + "/testTFTestAllTypes/datasetSchema.json", {});
  216. builder.SetDataSchema(std::move(schema));
  217. Status rc = builder.Build(&my_tfreader_op);
  218. ASSERT_TRUE(rc.IsOk());
  219. rc = my_tree->AssociateNode(my_tfreader_op);
  220. ASSERT_TRUE(rc.IsOk());
  221. rc = my_tree->AssignRoot(my_tfreader_op);
  222. ASSERT_TRUE(rc.IsOk());
  223. MS_LOG(INFO) << "Launching tree and begin iteration.";
  224. rc = my_tree->Prepare();
  225. ASSERT_TRUE(rc.IsOk());
  226. rc = my_tree->Launch();
  227. ASSERT_TRUE(rc.IsOk());
  228. // Start the loop of reading tensors from our pipeline
  229. DatasetIterator di(my_tree);
  230. TensorRow tensor_list;
  231. rc = di.FetchNextTensorRow(&tensor_list);
  232. ASSERT_TRUE(rc.IsOk());
  233. int row_count = 0;
  234. while (!tensor_list.empty()) {
  235. // Display the tensor by calling the printer on it
  236. for (int i = 0; i < tensor_list.size(); i++) {
  237. std::ostringstream ss;
  238. ss << "(" << tensor_list[i] << "): " << *tensor_list[i] << std::endl;
  239. MS_LOG(INFO) << "Tensor print: " << ss.str() << ".";
  240. }
  241. rc = di.FetchNextTensorRow(&tensor_list);
  242. ASSERT_TRUE(rc.IsOk());
  243. row_count++;
  244. }
  245. ASSERT_EQ(row_count, 12);
  246. }
  247. TEST_F(MindDataTestTFReaderOp, TestTFReaderRepeat) {
  248. // Start with an empty execution tree
  249. auto my_tree = std::make_shared<ExecutionTree>();
  250. std::string dataset_path;
  251. dataset_path = datasets_root_path_ + "/testTFTestAllTypes/test.data";
  252. // TFReaderOp
  253. std::shared_ptr<TFReaderOp> my_tfreader_op;
  254. TFReaderOp::Builder builder;
  255. builder.SetDatasetFilesList({dataset_path})
  256. .SetRowsPerBuffer(16)
  257. .SetWorkerConnectorSize(16)
  258. .SetNumWorkers(16);
  259. std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>();
  260. schema->LoadSchemaFile(datasets_root_path_ + "/testTFTestAllTypes/datasetSchema.json", {});
  261. builder.SetDataSchema(std::move(schema));
  262. Status rc= builder.Build(&my_tfreader_op);
  263. ASSERT_TRUE(rc.IsOk());
  264. rc = my_tree->AssociateNode(my_tfreader_op);
  265. ASSERT_TRUE(rc.IsOk());
  266. // RepeatOp
  267. uint32_t num_repeats = 3;
  268. std::shared_ptr<RepeatOp> my_repeat_op = std::make_shared<RepeatOp>(num_repeats);
  269. rc = my_tree->AssociateNode(my_repeat_op);
  270. ASSERT_TRUE(rc.IsOk());
  271. // Set children/root layout.
  272. my_tfreader_op->set_total_repeats(num_repeats);
  273. my_tfreader_op->set_num_repeats_per_epoch(num_repeats);
  274. rc = my_repeat_op->AddChild(my_tfreader_op);
  275. ASSERT_TRUE(rc.IsOk());
  276. rc = my_tree->AssignRoot(my_repeat_op);
  277. ASSERT_TRUE(rc.IsOk());
  278. MS_LOG(INFO) << "Launching tree and begin iteration.";
  279. rc = my_tree->Prepare();
  280. ASSERT_TRUE(rc.IsOk());
  281. rc = my_tree->Launch();
  282. ASSERT_TRUE(rc.IsOk());
  283. // Start the loop of reading tensors from our pipeline
  284. DatasetIterator di(my_tree);
  285. TensorRow tensor_list;
  286. rc = di.FetchNextTensorRow(&tensor_list);
  287. ASSERT_TRUE(rc.IsOk());
  288. int row_count = 0;
  289. while (!tensor_list.empty()) {
  290. MS_LOG(INFO) << "Row display for row #: " << row_count << ".";
  291. // Display the tensor by calling the printer on it
  292. for (int i = 0; i < tensor_list.size(); i++) {
  293. std::ostringstream ss;
  294. ss << "(" << tensor_list[i] << "): " << *tensor_list[i] << std::endl;
  295. MS_LOG(INFO) << "Tensor print: " << ss.str() << ".";
  296. }
  297. rc = di.FetchNextTensorRow(&tensor_list);
  298. ASSERT_TRUE(rc.IsOk());
  299. row_count++;
  300. }
  301. ASSERT_EQ(row_count, 12 * 3);
  302. }
  303. TEST_F(MindDataTestTFReaderOp, TestTFReaderSchemaConstructor) {
  304. // Start with an empty execution tree
  305. auto my_tree = std::make_shared<ExecutionTree>();
  306. std::string dataset_path;
  307. dataset_path = datasets_root_path_ + "/testTFTestAllTypes";
  308. std::unique_ptr<DataSchema> data_schema = std::make_unique<DataSchema>();
  309. std::vector<std::string> columns_to_load;
  310. columns_to_load.push_back("col_sint32");
  311. columns_to_load.push_back("col_binary");
  312. data_schema->LoadSchemaFile(dataset_path + "/datasetSchema.json", columns_to_load);
  313. std::shared_ptr<TFReaderOp> my_tfreader_op;
  314. TFReaderOp::Builder builder;
  315. builder.SetDatasetFilesList({dataset_path+"/test.data"})
  316. .SetRowsPerBuffer(16)
  317. .SetNumWorkers(16)
  318. .SetDataSchema(std::move(data_schema));
  319. Status rc = builder.Build(&my_tfreader_op);
  320. ASSERT_TRUE(rc.IsOk());
  321. rc = my_tree->AssociateNode(my_tfreader_op);
  322. ASSERT_TRUE(rc.IsOk());
  323. rc = my_tree->AssignRoot(my_tfreader_op);
  324. ASSERT_TRUE(rc.IsOk());
  325. MS_LOG(INFO) << "Launching tree and begin iteration.";
  326. rc = my_tree->Prepare();
  327. ASSERT_TRUE(rc.IsOk());
  328. rc = my_tree->Launch();
  329. ASSERT_TRUE(rc.IsOk());
  330. // Start the loop of reading tensors from our pipeline
  331. DatasetIterator di(my_tree);
  332. TensorRow tensor_list;
  333. rc = di.FetchNextTensorRow(&tensor_list);
  334. ASSERT_TRUE(rc.IsOk());
  335. int row_count = 0;
  336. while (!tensor_list.empty()) {
  337. // Display the tensor by calling the printer on it
  338. ASSERT_EQ(tensor_list.size(), columns_to_load.size());
  339. for (int i = 0; i < tensor_list.size(); i++) {
  340. std::ostringstream ss;
  341. ss << "(" << tensor_list[i] << "): " << *tensor_list[i] << std::endl;
  342. MS_LOG(INFO) << "Tensor print: " << ss.str() << ".";
  343. }
  344. rc = di.FetchNextTensorRow(&tensor_list);
  345. ASSERT_TRUE(rc.IsOk());
  346. row_count++;
  347. }
  348. ASSERT_EQ(row_count, 12);
  349. }
  350. TEST_F(MindDataTestTFReaderOp, TestTFReaderTake1Row) {
  351. // Start with an empty execution tree
  352. auto my_tree = std::make_shared<ExecutionTree>();
  353. std::string dataset_path;
  354. dataset_path = datasets_root_path_ + "/testTFTestAllTypes";
  355. std::string data_schema_filepath = dataset_path + "/datasetSchema1Row.json";
  356. // TFReaderOp
  357. std::shared_ptr<TFReaderOp> my_tfreader_op;
  358. TFReaderOp::Builder builder;
  359. builder.SetDatasetFilesList({dataset_path + "/test.data"}).SetRowsPerBuffer(5).SetNumWorkers(16);
  360. std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>();
  361. schema->LoadSchemaFile(datasets_root_path_ + "/testTFTestAllTypes/datasetSchema1Row.json", {});
  362. builder.SetDataSchema(std::move(schema));
  363. Status rc= builder.Build(&my_tfreader_op);
  364. ASSERT_TRUE(rc.IsOk());
  365. rc = my_tree->AssociateNode(my_tfreader_op);
  366. ASSERT_TRUE(rc.IsOk());
  367. rc = my_tree->AssignRoot(my_tfreader_op);
  368. ASSERT_TRUE(rc.IsOk());
  369. MS_LOG(INFO) << "Launching tree and begin iteration.";
  370. rc = my_tree->Prepare();
  371. ASSERT_TRUE(rc.IsOk());
  372. rc = my_tree->Launch();
  373. ASSERT_TRUE(rc.IsOk());
  374. // Start the loop of reading tensors from our pipeline
  375. DatasetIterator di(my_tree);
  376. TensorRow tensor_list;
  377. rc = di.FetchNextTensorRow(&tensor_list);
  378. ASSERT_TRUE(rc.IsOk());
  379. int row_count = 0;
  380. while (!tensor_list.empty()) {
  381. MS_LOG(INFO) << "Row display for row #: " << row_count << ".";
  382. // Display the tensor by calling the printer on it
  383. for (int i = 0; i < tensor_list.size(); i++) {
  384. std::ostringstream ss;
  385. ss << "(" << tensor_list[i] << "): " << *tensor_list[i] << std::endl;
  386. MS_LOG(INFO) << "Tensor print: " << ss.str() << ".";
  387. }
  388. rc = di.FetchNextTensorRow(&tensor_list);
  389. ASSERT_TRUE(rc.IsOk());
  390. row_count++;
  391. }
  392. ASSERT_EQ(row_count, 1);
  393. }
  394. TEST_F(MindDataTestTFReaderOp, TestTFReaderTake1Buffer) {
  395. // Start with an empty execution tree
  396. auto my_tree = std::make_shared<ExecutionTree>();
  397. std::string dataset_path;
  398. dataset_path = datasets_root_path_ + "/testTFTestAllTypes";
  399. std::string data_schema_filepath = dataset_path + "/datasetSchema5Rows.json";
  400. // TFReaderOp
  401. std::shared_ptr<TFReaderOp> my_tfreader_op;
  402. TFReaderOp::Builder builder;
  403. builder.SetDatasetFilesList({dataset_path + "/test.data"}).SetRowsPerBuffer(5).SetNumWorkers(16);
  404. std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>();
  405. schema->LoadSchemaFile(datasets_root_path_ + "/testTFTestAllTypes/datasetSchema5Rows.json", {});
  406. builder.SetDataSchema(std::move(schema));
  407. Status rc= builder.Build(&my_tfreader_op);
  408. ASSERT_TRUE(rc.IsOk());
  409. rc = my_tree->AssociateNode(my_tfreader_op);
  410. ASSERT_TRUE(rc.IsOk());
  411. rc = my_tree->AssignRoot(my_tfreader_op);
  412. ASSERT_TRUE(rc.IsOk());
  413. MS_LOG(INFO) << "Launching tree and begin iteration.";
  414. rc = my_tree->Prepare();
  415. ASSERT_TRUE(rc.IsOk());
  416. rc = my_tree->Launch();
  417. ASSERT_TRUE(rc.IsOk());
  418. // Start the loop of reading tensors from our pipeline
  419. DatasetIterator di(my_tree);
  420. TensorRow tensor_list;
  421. rc = di.FetchNextTensorRow(&tensor_list);
  422. ASSERT_TRUE(rc.IsOk());
  423. int row_count = 0;
  424. while (!tensor_list.empty()) {
  425. MS_LOG(INFO) << "Row display for row #: " << row_count << ".";
  426. // Display the tensor by calling the printer on it
  427. for (int i = 0; i < tensor_list.size(); i++) {
  428. std::ostringstream ss;
  429. ss << "(" << tensor_list[i] << "): " << *tensor_list[i] << std::endl;
  430. MS_LOG(INFO) << "Tensor print: " << ss.str() << ".";
  431. }
  432. rc = di.FetchNextTensorRow(&tensor_list);
  433. ASSERT_TRUE(rc.IsOk());
  434. row_count++;
  435. }
  436. ASSERT_EQ(row_count, 5);
  437. }
  438. TEST_F(MindDataTestTFReaderOp, TestTFReaderTake7Rows) {
  439. // Start with an empty execution tree
  440. auto my_tree = std::make_shared<ExecutionTree>();
  441. std::string dataset_path;
  442. dataset_path = datasets_root_path_ + "/testTFTestAllTypes";
  443. std::string data_schema_filepath = dataset_path + "/datasetSchema7Rows.json";
  444. // TFReaderOp
  445. std::shared_ptr<TFReaderOp> my_tfreader_op;
  446. TFReaderOp::Builder builder;
  447. builder.SetDatasetFilesList({dataset_path + "/test.data"}).SetRowsPerBuffer(5).SetNumWorkers(16);
  448. std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>();
  449. schema->LoadSchemaFile(datasets_root_path_ + "/testTFTestAllTypes/datasetSchema7Rows.json", {});
  450. builder.SetDataSchema(std::move(schema));
  451. Status rc= builder.Build(&my_tfreader_op);
  452. ASSERT_TRUE(rc.IsOk());
  453. rc = my_tree->AssociateNode(my_tfreader_op);
  454. ASSERT_TRUE(rc.IsOk());
  455. rc = my_tree->AssignRoot(my_tfreader_op);
  456. ASSERT_TRUE(rc.IsOk());
  457. MS_LOG(INFO) << "Launching tree and begin iteration.";
  458. rc = my_tree->Prepare();
  459. ASSERT_TRUE(rc.IsOk());
  460. rc = my_tree->Launch();
  461. ASSERT_TRUE(rc.IsOk());
  462. // Start the loop of reading tensors from our pipeline
  463. DatasetIterator di(my_tree);
  464. TensorRow tensor_list;
  465. rc = di.FetchNextTensorRow(&tensor_list);
  466. ASSERT_TRUE(rc.IsOk());
  467. int row_count = 0;
  468. while (!tensor_list.empty()) {
  469. MS_LOG(INFO) << "Row display for row #: " << row_count << ".";
  470. // Display the tensor by calling the printer on it
  471. for (int i = 0; i < tensor_list.size(); i++) {
  472. std::ostringstream ss;
  473. ss << "(" << tensor_list[i] << "): " << *tensor_list[i] << std::endl;
  474. MS_LOG(INFO) << "Tensor print: " << ss.str() << ".";
  475. }
  476. rc = di.FetchNextTensorRow(&tensor_list);
  477. ASSERT_TRUE(rc.IsOk());
  478. row_count++;
  479. }
  480. ASSERT_EQ(row_count, 7);
  481. }
  482. TEST_F(MindDataTestTFReaderOp, TestTFReaderBasicNoSchema) {
  483. // Start with an empty execution tree
  484. auto my_tree = std::make_shared<ExecutionTree>();
  485. std::string dataset_path;
  486. dataset_path = datasets_root_path_ + "/testTFTestAllTypes/test.data";
  487. std::shared_ptr<TFReaderOp> my_tfreader_op;
  488. TFReaderOp::Builder builder;
  489. builder.SetDatasetFilesList({dataset_path})
  490. .SetRowsPerBuffer(16)
  491. .SetNumWorkers(16);
  492. Status rc = builder.Build(&my_tfreader_op);
  493. ASSERT_TRUE(rc.IsOk());
  494. rc = my_tree->AssociateNode(my_tfreader_op);
  495. ASSERT_TRUE(rc.IsOk());
  496. rc = my_tree->AssignRoot(my_tfreader_op);
  497. ASSERT_TRUE(rc.IsOk());
  498. MS_LOG(INFO) << "Launching tree and begin iteration.";
  499. rc = my_tree->Prepare();
  500. ASSERT_TRUE(rc.IsOk());
  501. rc = my_tree->Launch();
  502. ASSERT_TRUE(rc.IsOk());
  503. // Start the loop of reading tensors from our pipeline
  504. DatasetIterator di(my_tree);
  505. TensorRow tensor_list;
  506. rc = di.FetchNextTensorRow(&tensor_list);
  507. ASSERT_TRUE(rc.IsOk());
  508. int row_count = 0;
  509. while (!tensor_list.empty()) {
  510. // Display the tensor by calling the printer on it
  511. ASSERT_EQ(tensor_list.size(), 9);
  512. for (int i = 0; i < tensor_list.size(); i++) {
  513. std::ostringstream ss;
  514. ss << "(" << tensor_list[i] << "): " << *tensor_list[i] << std::endl;
  515. MS_LOG(INFO) << "Tensor print: " << ss.str() << ".";
  516. }
  517. rc = di.FetchNextTensorRow(&tensor_list);
  518. ASSERT_TRUE(rc.IsOk());
  519. row_count++;
  520. }
  521. ASSERT_EQ(row_count, 12);
  522. }
  523. TEST_F(MindDataTestTFReaderOp, TestTotalRowsBasic) {
  524. std::string tf_file = datasets_root_path_ + "/testTFTestAllTypes/test.data";
  525. std::vector<std::string> filenames;
  526. for (int i = 0; i < 5; i++) {
  527. filenames.push_back(tf_file);
  528. }
  529. int64_t total_rows = 0;
  530. TFReaderOp::CountTotalRows(&total_rows, filenames, 1);
  531. ASSERT_EQ(total_rows, 60);
  532. TFReaderOp::CountTotalRows(&total_rows, filenames, 2);
  533. ASSERT_EQ(total_rows, 60);
  534. TFReaderOp::CountTotalRows(&total_rows, filenames, 3);
  535. ASSERT_EQ(total_rows, 60);
  536. TFReaderOp::CountTotalRows(&total_rows, filenames, 4);
  537. ASSERT_EQ(total_rows, 60);
  538. TFReaderOp::CountTotalRows(&total_rows, filenames, 5);
  539. ASSERT_EQ(total_rows, 60);
  540. TFReaderOp::CountTotalRows(&total_rows, filenames, 6);
  541. ASSERT_EQ(total_rows, 60);
  542. TFReaderOp::CountTotalRows(&total_rows, filenames, 729);
  543. ASSERT_EQ(total_rows, 60);
  544. TFReaderOp::CountTotalRows(&total_rows, filenames, 1, true);
  545. ASSERT_EQ(total_rows, 60);
  546. TFReaderOp::CountTotalRows(&total_rows, filenames, 2, true);
  547. ASSERT_EQ(total_rows, 60);
  548. TFReaderOp::CountTotalRows(&total_rows, filenames, 3, true);
  549. ASSERT_EQ(total_rows, 60);
  550. TFReaderOp::CountTotalRows(&total_rows, filenames, 4, true);
  551. ASSERT_EQ(total_rows, 60);
  552. TFReaderOp::CountTotalRows(&total_rows, filenames, 5, true);
  553. ASSERT_EQ(total_rows, 60);
  554. TFReaderOp::CountTotalRows(&total_rows, filenames, 6, true);
  555. ASSERT_EQ(total_rows, 60);
  556. TFReaderOp::CountTotalRows(&total_rows, filenames, 729, true);
  557. ASSERT_EQ(total_rows, 60);
  558. }
  559. TEST_F(MindDataTestTFReaderOp, TestTFReaderInvalidFiles) {
  560. // Start with an empty execution tree
  561. auto my_tree = std::make_shared<ExecutionTree>();
  562. std::string valid_file = datasets_root_path_ + "/testTFTestAllTypes/test.data";
  563. std::string schema_file = datasets_root_path_ + "/testTFTestAllTypes/datasetSchema.json";
  564. std::string invalid_file = datasets_root_path_ + "/testTFTestAllTypes/invalidFile.txt";
  565. std::string nonexistent_file = "this/file/not/exist";
  566. std::shared_ptr<TFReaderOp> my_tfreader_op;
  567. TFReaderOp::Builder builder;
  568. builder.SetDatasetFilesList({invalid_file, valid_file, schema_file})
  569. .SetRowsPerBuffer(16)
  570. .SetNumWorkers(16);
  571. std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>();
  572. schema->LoadSchemaFile(schema_file, {});
  573. builder.SetDataSchema(std::move(schema));
  574. Status rc = builder.Build(&my_tfreader_op);
  575. ASSERT_TRUE(!rc.IsOk());
  576. builder.SetDatasetFilesList({invalid_file, valid_file, schema_file, nonexistent_file})
  577. .SetRowsPerBuffer(16)
  578. .SetNumWorkers(16);
  579. schema = std::make_unique<DataSchema>();
  580. schema->LoadSchemaFile(schema_file, {});
  581. builder.SetDataSchema(std::move(schema));
  582. rc = builder.Build(&my_tfreader_op);
  583. ASSERT_TRUE(!rc.IsOk());
  584. }