From 587247e7932156a0701216d05ee57e2bd3212525 Mon Sep 17 00:00:00 2001 From: liyong Date: Tue, 27 Apr 2021 19:03:46 +0800 Subject: [PATCH] add support save dynamic shape tensor --- .../dataset/engine/consumers/tree_consumer.cc | 47 ++++- .../dataset/engine/consumers/tree_consumer.h | 6 + mindspore/dataset/engine/datasets.py | 22 +- tests/ut/python/dataset/test_save_op.py | 188 ++++++++++++------ 4 files changed, 203 insertions(+), 60 deletions(-) diff --git a/mindspore/ccsrc/minddata/dataset/engine/consumers/tree_consumer.cc b/mindspore/ccsrc/minddata/dataset/engine/consumers/tree_consumer.cc index 541e5c7875..c79ae6407a 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/consumers/tree_consumer.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/consumers/tree_consumer.cc @@ -228,6 +228,8 @@ Status SaveToDisk::Save() { TensorRow row; uint64_t mr_schema_id = 0; bool first_loop = true; // build schema in first loop + auto PreTensorRowShapes = std::map>(); + do { nlohmann::json row_raw_data; std::map>> row_bin_data; @@ -235,11 +237,12 @@ Status SaveToDisk::Save() { if (row.empty()) { break; } + RETURN_IF_NOT_OK(CheckTensorRowShapes(column_name_id_map, row, &PreTensorRowShapes)); if (first_loop) { nlohmann::json mr_json; std::vector index_fields; RETURN_IF_NOT_OK(FetchMetaFromTensorRow(column_name_id_map, row, &mr_json, &index_fields)); - MS_LOG(DEBUG) << "Schema of saved mindrecord: " << mr_json.dump(); + MS_LOG(INFO) << "Schema of saved mindrecord: " << mr_json.dump(); if (mindrecord::SUCCESS != mindrecord::ShardHeader::initialize(&mr_header, mr_json, index_fields, blob_fields, mr_schema_id)) { RETURN_STATUS_UNEXPECTED("Error: failed to initialize ShardHeader."); @@ -270,6 +273,47 @@ Status SaveToDisk::Save() { return Status::OK(); } +template +bool SaveToDisk::map_compare(T const &lhs, T const &rhs) { + return lhs.size() == rhs.size() && std::equal(lhs.begin(), lhs.end(), rhs.begin()); +} + +Status SaveToDisk::CheckTensorRowShapes(const std::unordered_map &column_name_id_map, + const TensorRow &row, + std::map> *PreTensorRowShapes_ptr) { + std::map> CurrTensorRowShapes; + for (auto &col : column_name_id_map) { + auto idx = col.second; + auto column_name = col.first; + auto &tensor = row[idx]; + auto column_type = tensor->type(); + auto column_shape = tensor->shape(); + + auto shapes = column_shape.AsVector(); + std::vector mr_shape(shapes.begin(), shapes.end()); + + if (mr_shape.empty() || mr_shape.size() == 1) continue; // ignore scalar and one dimension tensor + std::string mr_type; + std::string el = column_type.ToString(); + if (mindrecord::kTypesMap.find(el) == mindrecord::kTypesMap.end()) { + std::string err_msg("Error: can not support data type: " + el); + RETURN_STATUS_UNEXPECTED(err_msg); + } else { + mr_type = mindrecord::kTypesMap.at(el); + } + if (mr_type == "bytes" || mr_type == "string") continue; + mr_shape.erase(mr_shape.begin()); // ignore the first dimension + CurrTensorRowShapes[column_name] = mr_shape; + } + if (PreTensorRowShapes_ptr->empty()) { + *PreTensorRowShapes_ptr = CurrTensorRowShapes; + return Status::OK(); + } + auto res = map_compare(*PreTensorRowShapes_ptr, CurrTensorRowShapes); + CHECK_FAIL_RETURN_UNEXPECTED(res, "Error: current tensor shape is different from the previous's."); + return Status::OK(); +} + Status SaveToDisk::FetchMetaFromTensorRow(const std::unordered_map &column_name_id_map, const TensorRow &row, nlohmann::json *schema, std::vector *index_fields) { @@ -314,6 +358,7 @@ Status SaveToDisk::FetchMetaFromTensorRow(const std::unordered_map tensor, std::string column_name, nlohmann::json *row_raw_data, std::map>> *row_bin_data); + template + bool map_compare(T const &lhs, T const &rhs); + + Status CheckTensorRowShapes(const std::unordered_map &column_name_id_map, const TensorRow &row, + std::map> *PreTensorRowShapes_ptr); + std::string dataset_path_; int32_t num_files_; std::string dataset_type_; diff --git a/mindspore/dataset/engine/datasets.py b/mindspore/dataset/engine/datasets.py index 92d2fb50aa..d098745fb8 100644 --- a/mindspore/dataset/engine/datasets.py +++ b/mindspore/dataset/engine/datasets.py @@ -24,6 +24,7 @@ import json import math import os import signal +import stat import time import uuid import multiprocessing @@ -119,6 +120,23 @@ def _get_operator_process(): fetched_all = fetched_all and item_full return op_process, fetched_all +def _set_dataset_permissions(file_name, num_files): + """ + set saved dataset files' permissions to 600 + the rule of dataset filenames should be the same as those in C++. + """ + num_digits = len(str(num_files - 1)) + if num_files == 1: + paths = [file_name] + else: + paths = ["{}{}".format(file_name, str(x).rjust(num_digits, '0')) for x in range(num_files)] + + for item in paths: + if os.path.exists(item): + os.chmod(item, stat.S_IRUSR | stat.S_IWUSR) + index_file = item + ".db" + if os.path.exists(index_file): + os.chmod(index_file, stat.S_IRUSR | stat.S_IWUSR) class Dataset: """ @@ -1290,7 +1308,8 @@ class Dataset: 1. To save the samples in order, set dataset's shuffle to False and num_files to 1. 2. Before calling the function, do not use batch operator, repeat operator or data augmentation operators with random attribute in map operator. - 3. Can not save number type tensor whose shape is dynamic. + 3. When array dimension is variable, one-dimensional arrays or + multi-dimensional arrays with variable dimension 0 are supported. 4. Mindrecord does not support DE_UINT64, multi-dimensional DE_UINT8(drop dimension) nor multi-dimensional DE_STRING. @@ -1309,6 +1328,7 @@ class Dataset: runtime_context.AssignConsumer(consumer) consumer.Save() + _set_dataset_permissions(file_name, num_files) del api_tree @check_tuple_iterator diff --git a/tests/ut/python/dataset/test_save_op.py b/tests/ut/python/dataset/test_save_op.py index c0484a2b63..909cac3762 100644 --- a/tests/ut/python/dataset/test_save_op.py +++ b/tests/ut/python/dataset/test_save_op.py @@ -23,38 +23,38 @@ import mindspore.dataset as ds from mindspore import log as logger from mindspore.mindrecord import FileWriter -CV_FILE_NAME1 = "../data/mindrecord/testMindDataSet/temp.mindrecord" -CV_FILE_NAME2 = "../data/mindrecord/testMindDataSet/auto.mindrecord" +TEMP_FILE = "../data/mindrecord/testMindDataSet/temp.mindrecord" +AUTO_FILE = "../data/mindrecord/testMindDataSet/auto.mindrecord" TFRECORD_FILES = "../data/mindrecord/testTFRecordData/dummy.tfrecord" FILES_NUM = 1 num_readers = 1 -@pytest.fixture(name="add_and_remove_cv_file") +@pytest.fixture(name="add_remove_file") def fixture_remove(): """add/remove cv file""" - if os.path.exists("{}".format(CV_FILE_NAME1)): - os.remove("{}".format(CV_FILE_NAME1)) - if os.path.exists("{}.db".format(CV_FILE_NAME1)): - os.remove("{}.db".format(CV_FILE_NAME1)) - - if os.path.exists("{}".format(CV_FILE_NAME2)): - os.remove("{}".format(CV_FILE_NAME2)) - if os.path.exists("{}.db".format(CV_FILE_NAME2)): - os.remove("{}.db".format(CV_FILE_NAME2)) + if os.path.exists("{}".format(TEMP_FILE)): + os.remove("{}".format(TEMP_FILE)) + if os.path.exists("{}.db".format(TEMP_FILE)): + os.remove("{}.db".format(TEMP_FILE)) + + if os.path.exists("{}".format(AUTO_FILE)): + os.remove("{}".format(AUTO_FILE)) + if os.path.exists("{}.db".format(AUTO_FILE)): + os.remove("{}.db".format(AUTO_FILE)) yield "yield_cv_data" - if os.path.exists("{}".format(CV_FILE_NAME1)): - os.remove("{}".format(CV_FILE_NAME1)) - if os.path.exists("{}.db".format(CV_FILE_NAME1)): - os.remove("{}.db".format(CV_FILE_NAME1)) + if os.path.exists("{}".format(TEMP_FILE)): + os.remove("{}".format(TEMP_FILE)) + if os.path.exists("{}.db".format(TEMP_FILE)): + os.remove("{}.db".format(TEMP_FILE)) - if os.path.exists("{}".format(CV_FILE_NAME2)): - os.remove("{}".format(CV_FILE_NAME2)) - if os.path.exists("{}.db".format(CV_FILE_NAME2)): - os.remove("{}.db".format(CV_FILE_NAME2)) + if os.path.exists("{}".format(AUTO_FILE)): + os.remove("{}".format(AUTO_FILE)) + if os.path.exists("{}.db".format(AUTO_FILE)): + os.remove("{}.db".format(AUTO_FILE)) -def test_case_00(add_and_remove_cv_file): # only bin data +def test_case_00(add_remove_file): # only bin data data = [{"image1": bytes("image1 bytes abc", encoding='UTF-8'), "image2": bytes("image1 bytes def", encoding='UTF-8'), "image3": bytes("image1 bytes ghi", encoding='UTF-8'), @@ -86,13 +86,13 @@ def test_case_00(add_and_remove_cv_file): # only bin data "image3": {"type": "bytes"}, "image4": {"type": "bytes"}, "image5": {"type": "bytes"}} - writer = FileWriter(CV_FILE_NAME1, FILES_NUM) + writer = FileWriter(TEMP_FILE, FILES_NUM) writer.add_schema(schema, "schema") writer.write_raw_data(data) writer.commit() - d1 = ds.MindDataset(CV_FILE_NAME1, None, num_readers, shuffle=False) - d1.save(CV_FILE_NAME2, FILES_NUM) + d1 = ds.MindDataset(TEMP_FILE, None, num_readers, shuffle=False) + d1.save(AUTO_FILE, FILES_NUM) data_value_to_list = [] for item in data: @@ -104,7 +104,7 @@ def test_case_00(add_and_remove_cv_file): # only bin data new_data['image5'] = np.asarray(list(item["image5"]), dtype=np.uint8) data_value_to_list.append(new_data) - d2 = ds.MindDataset(dataset_file=CV_FILE_NAME2, + d2 = ds.MindDataset(dataset_file=AUTO_FILE, num_parallel_workers=num_readers, shuffle=False) assert d2.get_dataset_size() == 5 @@ -121,7 +121,7 @@ def test_case_00(add_and_remove_cv_file): # only bin data assert num_iter == 5 -def test_case_01(add_and_remove_cv_file): # only raw data +def test_case_01(add_remove_file): # only raw data data = [{"file_name": "001.jpg", "label": 43}, {"file_name": "002.jpg", "label": 91}, {"file_name": "003.jpg", "label": 61}, @@ -132,13 +132,13 @@ def test_case_01(add_and_remove_cv_file): # only raw data "label": {"type": "int32"} } - writer = FileWriter(CV_FILE_NAME1, FILES_NUM) + writer = FileWriter(TEMP_FILE, FILES_NUM) writer.add_schema(schema, "schema") writer.write_raw_data(data) writer.commit() - d1 = ds.MindDataset(CV_FILE_NAME1, None, num_readers, shuffle=False) - d1.save(CV_FILE_NAME2, FILES_NUM) + d1 = ds.MindDataset(TEMP_FILE, None, num_readers, shuffle=False) + d1.save(AUTO_FILE, FILES_NUM) data_value_to_list = [] for item in data: @@ -147,7 +147,7 @@ def test_case_01(add_and_remove_cv_file): # only raw data new_data['label'] = np.asarray(list([item["label"]]), dtype=np.int32) data_value_to_list.append(new_data) - d2 = ds.MindDataset(dataset_file=CV_FILE_NAME2, + d2 = ds.MindDataset(dataset_file=AUTO_FILE, num_parallel_workers=num_readers, shuffle=False) assert d2.get_dataset_size() == 6 @@ -165,7 +165,7 @@ def test_case_01(add_and_remove_cv_file): # only raw data assert num_iter == 6 -def test_case_02(add_and_remove_cv_file): # muti-bytes +def test_case_02(add_remove_file): # muti-bytes data = [{"file_name": "001.jpg", "label": 43, "float32_array": np.array([1.2, 2.78, 3.1234, 4.9871, 5.12341], dtype=np.float32), "float64_array": np.array([48.1234556789, 49.3251241431, 50.13514312414, 51.8971298471, @@ -258,13 +258,13 @@ def test_case_02(add_and_remove_cv_file): # muti-bytes "label": {"type": "int32"}, "image4": {"type": "bytes"}, "image5": {"type": "bytes"}} - writer = FileWriter(CV_FILE_NAME1, FILES_NUM) + writer = FileWriter(TEMP_FILE, FILES_NUM) writer.add_schema(schema, "schema") writer.write_raw_data(data) writer.commit() - d1 = ds.MindDataset(CV_FILE_NAME1, None, num_readers, shuffle=False) - d1.save(CV_FILE_NAME2, FILES_NUM) + d1 = ds.MindDataset(TEMP_FILE, None, num_readers, shuffle=False) + d1.save(AUTO_FILE, FILES_NUM) data_value_to_list = [] for item in data: @@ -284,7 +284,7 @@ def test_case_02(add_and_remove_cv_file): # muti-bytes new_data['image5'] = np.asarray(list(item["image5"]), dtype=np.uint8) data_value_to_list.append(new_data) - d2 = ds.MindDataset(dataset_file=CV_FILE_NAME2, + d2 = ds.MindDataset(dataset_file=AUTO_FILE, num_parallel_workers=num_readers, shuffle=False) assert d2.get_dataset_size() == 6 @@ -310,14 +310,14 @@ def generator_1d(): yield (np.array([i]),) -def test_case_03(add_and_remove_cv_file): +def test_case_03(add_remove_file): # apply dataset operations d1 = ds.GeneratorDataset(generator_1d, ["data"], shuffle=False) - d1.save(CV_FILE_NAME2) + d1.save(AUTO_FILE) - d2 = ds.MindDataset(dataset_file=CV_FILE_NAME2, + d2 = ds.MindDataset(dataset_file=AUTO_FILE, num_parallel_workers=num_readers, shuffle=False) @@ -343,9 +343,9 @@ def type_tester(t): data1 = data1.repeat(3) - data1.save(CV_FILE_NAME2) + data1.save(AUTO_FILE) - d2 = ds.MindDataset(dataset_file=CV_FILE_NAME2, + d2 = ds.MindDataset(dataset_file=AUTO_FILE, num_parallel_workers=num_readers, shuffle=False) @@ -360,10 +360,10 @@ def type_tester(t): i = 0 num_repeat += 1 assert num_repeat == 3 - if os.path.exists("{}".format(CV_FILE_NAME2)): - os.remove("{}".format(CV_FILE_NAME2)) - if os.path.exists("{}.db".format(CV_FILE_NAME2)): - os.remove("{}.db".format(CV_FILE_NAME2)) + if os.path.exists("{}".format(AUTO_FILE)): + os.remove("{}".format(AUTO_FILE)) + if os.path.exists("{}.db".format(AUTO_FILE)): + os.remove("{}.db".format(AUTO_FILE)) def test_case_04(): @@ -375,20 +375,20 @@ def test_case_04(): type_tester(t) -def test_case_05(add_and_remove_cv_file): +def test_case_05(add_remove_file): d1 = ds.GeneratorDataset(generator_1d, ["data"], shuffle=False) with pytest.raises(Exception, match="num_files should between 1 and 1000."): - d1.save(CV_FILE_NAME2, 0) + d1.save(AUTO_FILE, 0) -def test_case_06(add_and_remove_cv_file): +def test_case_06(add_remove_file): d1 = ds.GeneratorDataset(generator_1d, ["data"], shuffle=False) with pytest.raises(Exception, match="tfrecord dataset format is not supported."): - d1.save(CV_FILE_NAME2, 1, "tfrecord") + d1.save(AUTO_FILE, 1, "tfrecord") def cast_name(key): @@ -403,16 +403,16 @@ def cast_name(key): def test_case_07(): - if os.path.exists("{}".format(CV_FILE_NAME2)): - os.remove("{}".format(CV_FILE_NAME2)) - if os.path.exists("{}.db".format(CV_FILE_NAME2)): - os.remove("{}.db".format(CV_FILE_NAME2)) + if os.path.exists("{}".format(AUTO_FILE)): + os.remove("{}".format(AUTO_FILE)) + if os.path.exists("{}.db".format(AUTO_FILE)): + os.remove("{}.db".format(AUTO_FILE)) d1 = ds.TFRecordDataset(TFRECORD_FILES, shuffle=False) tf_data = [] for x in d1.create_dict_iterator(num_epochs=1, output_numpy=True): tf_data.append(x) - d1.save(CV_FILE_NAME2, FILES_NUM) - d2 = ds.MindDataset(dataset_file=CV_FILE_NAME2, + d1.save(AUTO_FILE, FILES_NUM) + d2 = ds.MindDataset(dataset_file=AUTO_FILE, num_parallel_workers=num_readers, shuffle=False) mr_data = [] @@ -428,7 +428,79 @@ def test_case_07(): count += 1 assert count == 10 - if os.path.exists("{}".format(CV_FILE_NAME2)): - os.remove("{}".format(CV_FILE_NAME2)) - if os.path.exists("{}.db".format(CV_FILE_NAME2)): - os.remove("{}.db".format(CV_FILE_NAME2)) + if os.path.exists("{}".format(AUTO_FILE)): + os.remove("{}".format(AUTO_FILE)) + if os.path.exists("{}.db".format(AUTO_FILE)): + os.remove("{}.db".format(AUTO_FILE)) + +def generator_dynamic_1d(): + arr = [] + for i in range(10): + if i % 5 == 0: + arr = [] + arr += [i] + yield (np.array(arr),) + +def generator_dynamic_2d_0(): + for i in range(10): + if i < 5: + yield (np.arange(5).reshape([1, 5]),) + else: + yield (np.arange(10).reshape([2, 5]),) + + +def generator_dynamic_2d_1(): + for i in range(10): + if i < 5: + yield (np.arange(5).reshape([5, 1]),) + else: + yield (np.arange(10).reshape([5, 2]),) + +def test_case_08(add_remove_file): + + # apply dataset operations + d1 = ds.GeneratorDataset(generator_dynamic_1d, ["data"], shuffle=False) + + d1.save(AUTO_FILE) + + d2 = ds.MindDataset(dataset_file=AUTO_FILE, + num_parallel_workers=num_readers, + shuffle=False) + + i = 0 + arr = [] + for item in d2.create_dict_iterator(num_epochs=1, output_numpy=True): + if i % 5 == 0: + arr = [] + arr += [i] + golden = np.array(arr) + np.testing.assert_array_equal(item["data"], golden) + i = i + 1 + +def test_case_09(add_remove_file): + + # apply dataset operations + d1 = ds.GeneratorDataset(generator_dynamic_2d_0, ["data"], shuffle=False) + + d1.save(AUTO_FILE) + + d2 = ds.MindDataset(dataset_file=AUTO_FILE, + num_parallel_workers=num_readers, + shuffle=False) + + i = 0 + for item in d2.create_dict_iterator(num_epochs=1, output_numpy=True): + if i < 5: + golden = np.arange(5).reshape([1, 5]) + else: + golden = np.arange(10).reshape([2, 5]) + np.testing.assert_array_equal(item["data"], golden) + i = i + 1 + +def test_case_10(add_remove_file): + + # apply dataset operations + d1 = ds.GeneratorDataset(generator_dynamic_2d_1, ["data"], shuffle=False) + + with pytest.raises(Exception, match="Error: current tensor shape is different from the previous's"): + d1.save(AUTO_FILE)