Browse Source

!15597 [MD] save op support variable dimension 0

From: @liyong126
Reviewed-by: @liucunwei,@jonyguo
Signed-off-by: @liucunwei
pull/15597/MERGE
mindspore-ci-bot Gitee 4 years ago
parent
commit
b663bbd9ca
4 changed files with 203 additions and 60 deletions
  1. +46
    -1
      mindspore/ccsrc/minddata/dataset/engine/consumers/tree_consumer.cc
  2. +6
    -0
      mindspore/ccsrc/minddata/dataset/engine/consumers/tree_consumer.h
  3. +21
    -1
      mindspore/dataset/engine/datasets.py
  4. +130
    -58
      tests/ut/python/dataset/test_save_op.py

+ 46
- 1
mindspore/ccsrc/minddata/dataset/engine/consumers/tree_consumer.cc View File

@@ -228,6 +228,8 @@ Status SaveToDisk::Save() {
TensorRow row;
uint64_t mr_schema_id = 0;
bool first_loop = true; // build schema in first loop
auto PreTensorRowShapes = std::map<std::string, std::vector<int>>();

do {
nlohmann::json row_raw_data;
std::map<std::string, std::unique_ptr<std::vector<uint8_t>>> row_bin_data;
@@ -235,11 +237,12 @@ Status SaveToDisk::Save() {
if (row.empty()) {
break;
}
RETURN_IF_NOT_OK(CheckTensorRowShapes(column_name_id_map, row, &PreTensorRowShapes));
if (first_loop) {
nlohmann::json mr_json;
std::vector<std::string> index_fields;
RETURN_IF_NOT_OK(FetchMetaFromTensorRow(column_name_id_map, row, &mr_json, &index_fields));
MS_LOG(DEBUG) << "Schema of saved mindrecord: " << mr_json.dump();
MS_LOG(INFO) << "Schema of saved mindrecord: " << mr_json.dump();
if (mindrecord::SUCCESS !=
mindrecord::ShardHeader::initialize(&mr_header, mr_json, index_fields, blob_fields, mr_schema_id)) {
RETURN_STATUS_UNEXPECTED("Error: failed to initialize ShardHeader.");
@@ -270,6 +273,47 @@ Status SaveToDisk::Save() {
return Status::OK();
}

template <typename T>
bool SaveToDisk::map_compare(T const &lhs, T const &rhs) {
return lhs.size() == rhs.size() && std::equal(lhs.begin(), lhs.end(), rhs.begin());
}

Status SaveToDisk::CheckTensorRowShapes(const std::unordered_map<std::string, int32_t> &column_name_id_map,
const TensorRow &row,
std::map<std::string, std::vector<int>> *PreTensorRowShapes_ptr) {
std::map<std::string, std::vector<int>> CurrTensorRowShapes;
for (auto &col : column_name_id_map) {
auto idx = col.second;
auto column_name = col.first;
auto &tensor = row[idx];
auto column_type = tensor->type();
auto column_shape = tensor->shape();

auto shapes = column_shape.AsVector();
std::vector<int> mr_shape(shapes.begin(), shapes.end());

if (mr_shape.empty() || mr_shape.size() == 1) continue; // ignore scalar and one dimension tensor
std::string mr_type;
std::string el = column_type.ToString();
if (mindrecord::kTypesMap.find(el) == mindrecord::kTypesMap.end()) {
std::string err_msg("Error: can not support data type: " + el);
RETURN_STATUS_UNEXPECTED(err_msg);
} else {
mr_type = mindrecord::kTypesMap.at(el);
}
if (mr_type == "bytes" || mr_type == "string") continue;
mr_shape.erase(mr_shape.begin()); // ignore the first dimension
CurrTensorRowShapes[column_name] = mr_shape;
}
if (PreTensorRowShapes_ptr->empty()) {
*PreTensorRowShapes_ptr = CurrTensorRowShapes;
return Status::OK();
}
auto res = map_compare(*PreTensorRowShapes_ptr, CurrTensorRowShapes);
CHECK_FAIL_RETURN_UNEXPECTED(res, "Error: current tensor shape is different from the previous's.");
return Status::OK();
}

Status SaveToDisk::FetchMetaFromTensorRow(const std::unordered_map<std::string, int32_t> &column_name_id_map,
const TensorRow &row, nlohmann::json *schema,
std::vector<std::string> *index_fields) {
@@ -314,6 +358,7 @@ Status SaveToDisk::FetchMetaFromTensorRow(const std::unordered_map<std::string,
if (mr_type == "bytes") { // ignore shape of bytes in minrecord
(*schema)[column_name] = {{"type", mr_type}};
} else {
mr_shape[0] = -1; // make first dimension -1
(*schema)[column_name] = {{"type", mr_type}, {"shape", mr_shape}};
}
}


+ 6
- 0
mindspore/ccsrc/minddata/dataset/engine/consumers/tree_consumer.h View File

@@ -136,6 +136,12 @@ class SaveToDisk : public TreeConsumer {
Status FetchItemData(std::shared_ptr<Tensor> tensor, std::string column_name, nlohmann::json *row_raw_data,
std::map<std::string, std::unique_ptr<std::vector<uint8_t>>> *row_bin_data);

template <typename T>
bool map_compare(T const &lhs, T const &rhs);

Status CheckTensorRowShapes(const std::unordered_map<std::string, int32_t> &column_name_id_map, const TensorRow &row,
std::map<std::string, std::vector<int>> *PreTensorRowShapes_ptr);

std::string dataset_path_;
int32_t num_files_;
std::string dataset_type_;


+ 21
- 1
mindspore/dataset/engine/datasets.py View File

@@ -24,6 +24,7 @@ import json
import math
import os
import signal
import stat
import time
import uuid
import multiprocessing
@@ -119,6 +120,23 @@ def _get_operator_process():
fetched_all = fetched_all and item_full
return op_process, fetched_all

def _set_dataset_permissions(file_name, num_files):
"""
set saved dataset files' permissions to 600
the rule of dataset filenames should be the same as those in C++.
"""
num_digits = len(str(num_files - 1))
if num_files == 1:
paths = [file_name]
else:
paths = ["{}{}".format(file_name, str(x).rjust(num_digits, '0')) for x in range(num_files)]

for item in paths:
if os.path.exists(item):
os.chmod(item, stat.S_IRUSR | stat.S_IWUSR)
index_file = item + ".db"
if os.path.exists(index_file):
os.chmod(index_file, stat.S_IRUSR | stat.S_IWUSR)

class Dataset:
"""
@@ -1290,7 +1308,8 @@ class Dataset:
1. To save the samples in order, set dataset's shuffle to False and num_files to 1.
2. Before calling the function, do not use batch operator, repeat operator or data augmentation operators
with random attribute in map operator.
3. Can not save number type tensor whose shape is dynamic.
3. When array dimension is variable, one-dimensional arrays or
multi-dimensional arrays with variable dimension 0 are supported.
4. Mindrecord does not support DE_UINT64, multi-dimensional DE_UINT8(drop dimension) nor
multi-dimensional DE_STRING.

@@ -1309,6 +1328,7 @@ class Dataset:
runtime_context.AssignConsumer(consumer)

consumer.Save()
_set_dataset_permissions(file_name, num_files)
del api_tree

@check_tuple_iterator


+ 130
- 58
tests/ut/python/dataset/test_save_op.py View File

@@ -23,38 +23,38 @@ import mindspore.dataset as ds
from mindspore import log as logger
from mindspore.mindrecord import FileWriter

CV_FILE_NAME1 = "../data/mindrecord/testMindDataSet/temp.mindrecord"
CV_FILE_NAME2 = "../data/mindrecord/testMindDataSet/auto.mindrecord"
TEMP_FILE = "../data/mindrecord/testMindDataSet/temp.mindrecord"
AUTO_FILE = "../data/mindrecord/testMindDataSet/auto.mindrecord"
TFRECORD_FILES = "../data/mindrecord/testTFRecordData/dummy.tfrecord"
FILES_NUM = 1
num_readers = 1


@pytest.fixture(name="add_and_remove_cv_file")
@pytest.fixture(name="add_remove_file")
def fixture_remove():
"""add/remove cv file"""
if os.path.exists("{}".format(CV_FILE_NAME1)):
os.remove("{}".format(CV_FILE_NAME1))
if os.path.exists("{}.db".format(CV_FILE_NAME1)):
os.remove("{}.db".format(CV_FILE_NAME1))
if os.path.exists("{}".format(CV_FILE_NAME2)):
os.remove("{}".format(CV_FILE_NAME2))
if os.path.exists("{}.db".format(CV_FILE_NAME2)):
os.remove("{}.db".format(CV_FILE_NAME2))
if os.path.exists("{}".format(TEMP_FILE)):
os.remove("{}".format(TEMP_FILE))
if os.path.exists("{}.db".format(TEMP_FILE)):
os.remove("{}.db".format(TEMP_FILE))
if os.path.exists("{}".format(AUTO_FILE)):
os.remove("{}".format(AUTO_FILE))
if os.path.exists("{}.db".format(AUTO_FILE)):
os.remove("{}.db".format(AUTO_FILE))
yield "yield_cv_data"
if os.path.exists("{}".format(CV_FILE_NAME1)):
os.remove("{}".format(CV_FILE_NAME1))
if os.path.exists("{}.db".format(CV_FILE_NAME1)):
os.remove("{}.db".format(CV_FILE_NAME1))
if os.path.exists("{}".format(TEMP_FILE)):
os.remove("{}".format(TEMP_FILE))
if os.path.exists("{}.db".format(TEMP_FILE)):
os.remove("{}.db".format(TEMP_FILE))

if os.path.exists("{}".format(CV_FILE_NAME2)):
os.remove("{}".format(CV_FILE_NAME2))
if os.path.exists("{}.db".format(CV_FILE_NAME2)):
os.remove("{}.db".format(CV_FILE_NAME2))
if os.path.exists("{}".format(AUTO_FILE)):
os.remove("{}".format(AUTO_FILE))
if os.path.exists("{}.db".format(AUTO_FILE)):
os.remove("{}.db".format(AUTO_FILE))


def test_case_00(add_and_remove_cv_file): # only bin data
def test_case_00(add_remove_file): # only bin data
data = [{"image1": bytes("image1 bytes abc", encoding='UTF-8'),
"image2": bytes("image1 bytes def", encoding='UTF-8'),
"image3": bytes("image1 bytes ghi", encoding='UTF-8'),
@@ -86,13 +86,13 @@ def test_case_00(add_and_remove_cv_file): # only bin data
"image3": {"type": "bytes"},
"image4": {"type": "bytes"},
"image5": {"type": "bytes"}}
writer = FileWriter(CV_FILE_NAME1, FILES_NUM)
writer = FileWriter(TEMP_FILE, FILES_NUM)
writer.add_schema(schema, "schema")
writer.write_raw_data(data)
writer.commit()

d1 = ds.MindDataset(CV_FILE_NAME1, None, num_readers, shuffle=False)
d1.save(CV_FILE_NAME2, FILES_NUM)
d1 = ds.MindDataset(TEMP_FILE, None, num_readers, shuffle=False)
d1.save(AUTO_FILE, FILES_NUM)
data_value_to_list = []

for item in data:
@@ -104,7 +104,7 @@ def test_case_00(add_and_remove_cv_file): # only bin data
new_data['image5'] = np.asarray(list(item["image5"]), dtype=np.uint8)
data_value_to_list.append(new_data)

d2 = ds.MindDataset(dataset_file=CV_FILE_NAME2,
d2 = ds.MindDataset(dataset_file=AUTO_FILE,
num_parallel_workers=num_readers,
shuffle=False)
assert d2.get_dataset_size() == 5
@@ -121,7 +121,7 @@ def test_case_00(add_and_remove_cv_file): # only bin data
assert num_iter == 5


def test_case_01(add_and_remove_cv_file): # only raw data
def test_case_01(add_remove_file): # only raw data
data = [{"file_name": "001.jpg", "label": 43},
{"file_name": "002.jpg", "label": 91},
{"file_name": "003.jpg", "label": 61},
@@ -132,13 +132,13 @@ def test_case_01(add_and_remove_cv_file): # only raw data
"label": {"type": "int32"}
}

writer = FileWriter(CV_FILE_NAME1, FILES_NUM)
writer = FileWriter(TEMP_FILE, FILES_NUM)
writer.add_schema(schema, "schema")
writer.write_raw_data(data)
writer.commit()

d1 = ds.MindDataset(CV_FILE_NAME1, None, num_readers, shuffle=False)
d1.save(CV_FILE_NAME2, FILES_NUM)
d1 = ds.MindDataset(TEMP_FILE, None, num_readers, shuffle=False)
d1.save(AUTO_FILE, FILES_NUM)

data_value_to_list = []
for item in data:
@@ -147,7 +147,7 @@ def test_case_01(add_and_remove_cv_file): # only raw data
new_data['label'] = np.asarray(list([item["label"]]), dtype=np.int32)
data_value_to_list.append(new_data)

d2 = ds.MindDataset(dataset_file=CV_FILE_NAME2,
d2 = ds.MindDataset(dataset_file=AUTO_FILE,
num_parallel_workers=num_readers,
shuffle=False)
assert d2.get_dataset_size() == 6
@@ -165,7 +165,7 @@ def test_case_01(add_and_remove_cv_file): # only raw data
assert num_iter == 6


def test_case_02(add_and_remove_cv_file): # muti-bytes
def test_case_02(add_remove_file): # muti-bytes
data = [{"file_name": "001.jpg", "label": 43,
"float32_array": np.array([1.2, 2.78, 3.1234, 4.9871, 5.12341], dtype=np.float32),
"float64_array": np.array([48.1234556789, 49.3251241431, 50.13514312414, 51.8971298471,
@@ -258,13 +258,13 @@ def test_case_02(add_and_remove_cv_file): # muti-bytes
"label": {"type": "int32"},
"image4": {"type": "bytes"},
"image5": {"type": "bytes"}}
writer = FileWriter(CV_FILE_NAME1, FILES_NUM)
writer = FileWriter(TEMP_FILE, FILES_NUM)
writer.add_schema(schema, "schema")
writer.write_raw_data(data)
writer.commit()

d1 = ds.MindDataset(CV_FILE_NAME1, None, num_readers, shuffle=False)
d1.save(CV_FILE_NAME2, FILES_NUM)
d1 = ds.MindDataset(TEMP_FILE, None, num_readers, shuffle=False)
d1.save(AUTO_FILE, FILES_NUM)
data_value_to_list = []

for item in data:
@@ -284,7 +284,7 @@ def test_case_02(add_and_remove_cv_file): # muti-bytes
new_data['image5'] = np.asarray(list(item["image5"]), dtype=np.uint8)
data_value_to_list.append(new_data)

d2 = ds.MindDataset(dataset_file=CV_FILE_NAME2,
d2 = ds.MindDataset(dataset_file=AUTO_FILE,
num_parallel_workers=num_readers,
shuffle=False)
assert d2.get_dataset_size() == 6
@@ -310,14 +310,14 @@ def generator_1d():
yield (np.array([i]),)


def test_case_03(add_and_remove_cv_file):
def test_case_03(add_remove_file):

# apply dataset operations
d1 = ds.GeneratorDataset(generator_1d, ["data"], shuffle=False)

d1.save(CV_FILE_NAME2)
d1.save(AUTO_FILE)

d2 = ds.MindDataset(dataset_file=CV_FILE_NAME2,
d2 = ds.MindDataset(dataset_file=AUTO_FILE,
num_parallel_workers=num_readers,
shuffle=False)

@@ -343,9 +343,9 @@ def type_tester(t):

data1 = data1.repeat(3)

data1.save(CV_FILE_NAME2)
data1.save(AUTO_FILE)

d2 = ds.MindDataset(dataset_file=CV_FILE_NAME2,
d2 = ds.MindDataset(dataset_file=AUTO_FILE,
num_parallel_workers=num_readers,
shuffle=False)

@@ -360,10 +360,10 @@ def type_tester(t):
i = 0
num_repeat += 1
assert num_repeat == 3
if os.path.exists("{}".format(CV_FILE_NAME2)):
os.remove("{}".format(CV_FILE_NAME2))
if os.path.exists("{}.db".format(CV_FILE_NAME2)):
os.remove("{}.db".format(CV_FILE_NAME2))
if os.path.exists("{}".format(AUTO_FILE)):
os.remove("{}".format(AUTO_FILE))
if os.path.exists("{}.db".format(AUTO_FILE)):
os.remove("{}.db".format(AUTO_FILE))


def test_case_04():
@@ -375,20 +375,20 @@ def test_case_04():
type_tester(t)


def test_case_05(add_and_remove_cv_file):
def test_case_05(add_remove_file):

d1 = ds.GeneratorDataset(generator_1d, ["data"], shuffle=False)

with pytest.raises(Exception, match="num_files should between 1 and 1000."):
d1.save(CV_FILE_NAME2, 0)
d1.save(AUTO_FILE, 0)


def test_case_06(add_and_remove_cv_file):
def test_case_06(add_remove_file):

d1 = ds.GeneratorDataset(generator_1d, ["data"], shuffle=False)

with pytest.raises(Exception, match="tfrecord dataset format is not supported."):
d1.save(CV_FILE_NAME2, 1, "tfrecord")
d1.save(AUTO_FILE, 1, "tfrecord")


def cast_name(key):
@@ -403,16 +403,16 @@ def cast_name(key):


def test_case_07():
if os.path.exists("{}".format(CV_FILE_NAME2)):
os.remove("{}".format(CV_FILE_NAME2))
if os.path.exists("{}.db".format(CV_FILE_NAME2)):
os.remove("{}.db".format(CV_FILE_NAME2))
if os.path.exists("{}".format(AUTO_FILE)):
os.remove("{}".format(AUTO_FILE))
if os.path.exists("{}.db".format(AUTO_FILE)):
os.remove("{}.db".format(AUTO_FILE))
d1 = ds.TFRecordDataset(TFRECORD_FILES, shuffle=False)
tf_data = []
for x in d1.create_dict_iterator(num_epochs=1, output_numpy=True):
tf_data.append(x)
d1.save(CV_FILE_NAME2, FILES_NUM)
d2 = ds.MindDataset(dataset_file=CV_FILE_NAME2,
d1.save(AUTO_FILE, FILES_NUM)
d2 = ds.MindDataset(dataset_file=AUTO_FILE,
num_parallel_workers=num_readers,
shuffle=False)
mr_data = []
@@ -428,7 +428,79 @@ def test_case_07():
count += 1
assert count == 10

if os.path.exists("{}".format(CV_FILE_NAME2)):
os.remove("{}".format(CV_FILE_NAME2))
if os.path.exists("{}.db".format(CV_FILE_NAME2)):
os.remove("{}.db".format(CV_FILE_NAME2))
if os.path.exists("{}".format(AUTO_FILE)):
os.remove("{}".format(AUTO_FILE))
if os.path.exists("{}.db".format(AUTO_FILE)):
os.remove("{}.db".format(AUTO_FILE))

def generator_dynamic_1d():
arr = []
for i in range(10):
if i % 5 == 0:
arr = []
arr += [i]
yield (np.array(arr),)

def generator_dynamic_2d_0():
for i in range(10):
if i < 5:
yield (np.arange(5).reshape([1, 5]),)
else:
yield (np.arange(10).reshape([2, 5]),)


def generator_dynamic_2d_1():
for i in range(10):
if i < 5:
yield (np.arange(5).reshape([5, 1]),)
else:
yield (np.arange(10).reshape([5, 2]),)

def test_case_08(add_remove_file):

# apply dataset operations
d1 = ds.GeneratorDataset(generator_dynamic_1d, ["data"], shuffle=False)

d1.save(AUTO_FILE)

d2 = ds.MindDataset(dataset_file=AUTO_FILE,
num_parallel_workers=num_readers,
shuffle=False)

i = 0
arr = []
for item in d2.create_dict_iterator(num_epochs=1, output_numpy=True):
if i % 5 == 0:
arr = []
arr += [i]
golden = np.array(arr)
np.testing.assert_array_equal(item["data"], golden)
i = i + 1

def test_case_09(add_remove_file):

# apply dataset operations
d1 = ds.GeneratorDataset(generator_dynamic_2d_0, ["data"], shuffle=False)

d1.save(AUTO_FILE)

d2 = ds.MindDataset(dataset_file=AUTO_FILE,
num_parallel_workers=num_readers,
shuffle=False)

i = 0
for item in d2.create_dict_iterator(num_epochs=1, output_numpy=True):
if i < 5:
golden = np.arange(5).reshape([1, 5])
else:
golden = np.arange(10).reshape([2, 5])
np.testing.assert_array_equal(item["data"], golden)
i = i + 1

def test_case_10(add_remove_file):

# apply dataset operations
d1 = ds.GeneratorDataset(generator_dynamic_2d_1, ["data"], shuffle=False)

with pytest.raises(Exception, match="Error: current tensor shape is different from the previous's"):
d1.save(AUTO_FILE)

Loading…
Cancel
Save