@@ -13,14 +13,19 @@
# limitations under the License.
# ==============================================================================
"""
Testing configuration manager
Testing configuration manager
"""
import filecmp
import glob
import numpy as np
import os
from mindspore import log as logger
import mindspore.dataset as ds
import mindspore.dataset.transforms.vision.c_transforms as vision
import mindspore.dataset.transforms.vision.py_transforms as py_vision
DATA_DIR = ["../data/dataset/test_tf_file_3_images/train-0000-of-0001.data"]
SCHEMA_DIR = "../data/dataset/test_tf_file_3_images/datasetSchema.json"
@@ -46,9 +51,17 @@ def test_basic():
assert ds.config.get_prefetch_size() == 4
assert ds.config.get_seed() == 5
def test_get_seed():
"""
This gets the seed value without explicitly setting a default, expect int.
"""
assert isinstance(ds.config.get_seed(), int)
def test_pipeline():
"""
Test that our configuration pipeline works when we set parameters at dataset interval
"""
Test that our configuration pipeline works when we set parameters at different locations in dataset code
"""
data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False)
ds.config.set_num_parallel_workers(2)
@@ -60,12 +73,12 @@ def test_pipeline():
data2 = data2.map(input_columns=["image"], operations=[vision.Decode(True)])
ds.serialize(data2, "testpipeline2.json")
# check that the generated output is different
# check that the generated output is different
assert (filecmp.cmp('testpipeline.json', 'testpipeline2.json'))
# this test passes currently because our num_parallel_workers don't get updated.
# this test passes currently because our num_parallel_workers don't get updated.
# remove generated jason files
# remove generated jason files
file_list = glob.glob('*.json')
for f in file_list:
try:
@@ -74,6 +87,209 @@ def test_pipeline():
logger.info("Error while deleting: {}".format(f))
def test_deterministic_run_fail():
"""
Test RandomCrop with seed, expected to fail
"""
logger.info("test_deterministic_run_fail")
# when we set the seed all operations within our dataset should be deterministic
ds.config.set_seed(0)
ds.config.set_num_parallel_workers(1)
# First dataset
data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False)
# Assuming we get the same seed on calling constructor, if this op is re-used then result won't be
# the same in between the two datasets. For example, RandomCrop constructor takes seed (0)
# outputs a deterministic series of numbers, e,g "a" = [1, 2, 3, 4, 5, 6] <- pretend these are random
random_crop_op = vision.RandomCrop([512, 512], [200, 200, 200, 200])
decode_op = vision.Decode()
data1 = data1.map(input_columns=["image"], operations=decode_op)
data1 = data1.map(input_columns=["image"], operations=random_crop_op)
# Second dataset
data2 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False)
data2 = data2.map(input_columns=["image"], operations=decode_op)
# If seed is set up on constructor
data2 = data2.map(input_columns=["image"], operations=random_crop_op)
try:
for item1, item2 in zip(data1.create_dict_iterator(), data2.create_dict_iterator()):
np.testing.assert_equal (item1["image"], item2["image"])
except BaseException as e:
# two datasets split the number out of the sequence a
logger.info("Got an exception in DE: {}".format(str(e)))
assert "Array" in str(e)
def test_deterministic_run_pass():
"""
Test deterministic run with with setting the seed
"""
logger.info("test_deterministic_run_pass")
ds.config.set_seed(0)
ds.config.set_num_parallel_workers(1)
# First dataset
data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False)
# We get the seed when constructor is called
random_crop_op = vision.RandomCrop([512, 512], [200, 200, 200, 200])
decode_op = vision.Decode()
data1 = data1.map(input_columns=["image"], operations=decode_op)
data1 = data1.map(input_columns=["image"], operations=random_crop_op)
# Second dataset
data2 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False)
data2 = data2.map(input_columns=["image"], operations=decode_op)
# Since seed is set up on constructor, so the two ops output deterministic sequence.
# Assume the generated random sequence "a" = [1, 2, 3, 4, 5, 6] <- pretend these are random
random_crop_op2 = vision.RandomCrop([512, 512], [200, 200, 200, 200])
data2 = data2.map(input_columns=["image"], operations=random_crop_op2)
try:
for item1, item2 in zip(data1.create_dict_iterator(), data2.create_dict_iterator()):
np.testing.assert_equal (item1["image"], item2["image"])
except BaseException as e:
# two datasets both use numbers from the generated sequence "a"
logger.info("Got an exception in DE: {}".format(str(e)))
assert "Array" in str(e)
def test_seed_undeterministic():
"""
Test seed with num parallel workers in c, this test is expected to fail some of the time
"""
logger.info("test_seed_undeterministic")
ds.config.set_seed(0)
# First dataset
data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False)
# seed will be read in during constructor call
random_crop_op = vision.RandomCrop([512, 512], [200, 200, 200, 200])
decode_op = vision.Decode()
data1 = data1.map(input_columns=["image"], operations=decode_op)
data1 = data1.map(input_columns=["image"], operations=random_crop_op)
# Second dataset
data2 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False)
data2 = data2.map(input_columns=["image"], operations=decode_op)
# If seed is set up on constructor, so the two ops output deterministic sequence
random_crop_op2 = vision.RandomCrop([512, 512], [200, 200, 200, 200])
data2 = data2.map(input_columns=["image"], operations=random_crop_op2)
for item1, item2 in zip(data1.create_dict_iterator(), data2.create_dict_iterator()):
np.testing.assert_equal (item1["image"], item2["image"])
def test_deterministic_run_distribution():
"""
Test deterministic run with with setting the seed being used in a distribution
"""
logger.info("test_deterministic_run_distribution")
# when we set the seed all operations within our dataset should be deterministic
ds.config.set_seed(0)
ds.config.set_num_parallel_workers(1)
# First dataset
data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False)
random_crop_op = vision.RandomHorizontalFlip(0.1)
decode_op = vision.Decode()
data1 = data1.map(input_columns=["image"], operations=decode_op)
data1 = data1.map(input_columns=["image"], operations=random_crop_op)
# Second dataset
data2 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False)
data2 = data2.map(input_columns=["image"], operations=decode_op)
# If seed is set up on constructor, so the two ops output deterministic sequence
random_crop_op2 = vision.RandomHorizontalFlip(0.1)
data2 = data2.map(input_columns=["image"], operations=random_crop_op2)
for item1, item2 in zip(data1.create_dict_iterator(), data2.create_dict_iterator()):
np.testing.assert_equal (item1["image"], item2["image"])
def test_deterministic_python_seed():
"""
Test deterministic execution with seed in python
"""
logger.info("deterministic_random_crop_op_python_2")
ds.config.set_seed(0)
ds.config.set_num_parallel_workers(1)
# First dataset
data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False)
transforms = [
py_vision.Decode(),
py_vision.RandomCrop([512, 512], [200, 200, 200, 200]),
py_vision.ToTensor(),
]
transform = py_vision.ComposeOp(transforms)
data1 = data1.map(input_columns=["image"], operations=transform())
data1_output = []
# config.set_seed() calls random.seed()
for data_one in data1.create_dict_iterator():
data1_output.append(data_one["image"])
# Second dataset
data2 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False)
data2 = data2.map(input_columns=["image"], operations=transform())
# config.set_seed() calls random.seed(), resets seed for next dataset iterator
ds.config.set_seed(0)
data2_output = []
for data_two in data2.create_dict_iterator():
data2_output.append(data_two["image"])
np.testing.assert_equal (data1_output, data2_output)
def test_deterministic_python_seed_multi_thread():
"""
Test deterministic execution with seed in python, this fails with multi-thread pyfunc run
"""
logger.info("deterministic_random_crop_op_python_2")
ds.config.set_seed(0)
# when we set the seed all operations within our dataset should be deterministic
# First dataset
data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False)
transforms = [
py_vision.Decode(),
py_vision.RandomCrop([512, 512], [200, 200, 200, 200]),
py_vision.ToTensor(),
]
transform = py_vision.ComposeOp(transforms)
data1 = data1.map(input_columns=["image"], operations=transform(), python_multiprocessing=True)
data1_output = []
# config.set_seed() calls random.seed()
for data_one in data1.create_dict_iterator():
data1_output.append(data_one["image"])
# Second dataset
data2 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False)
# If seed is set up on constructor
data2 = data2.map(input_columns=["image"], operations=transform(), python_multiprocessing=True)
# config.set_seed() calls random.seed()
ds.config.set_seed(0)
data2_output = []
for data_two in data2.create_dict_iterator():
data2_output.append(data_two["image"])
try:
np.testing.assert_equal (data1_output, data2_output)
except BaseException as e:
# expect output to not match during multi-threaded excution
logger.info("Got an exception in DE: {}".format(str(e)))
assert "Array" in str(e)
if __name__ == '__main__':
test_basic()
test_pipeline()
test_deterministic_run_pass()
test_deterministic_run_distribution()
test_deterministic_run_fail()
test_deterministic_python_seed()
test_seed_undeterministic()
test_get_seed()