Browse Source

!32231 fix: when return numpy in GeneratorDataset

Merge pull request !32231 from guozhijian/fix_generator_with_numpy
pull/1/head
i-robot Gitee 4 years ago
parent
commit
8ea9ceaecd
No known key found for this signature in database GPG Key ID: 173E9B9CA92EEF8F
3 changed files with 688 additions and 31 deletions
  1. +5
    -0
      mindspore/python/mindspore/dataset/engine/datasets_user_defined.py
  2. +34
    -31
      mindspore/python/mindspore/dataset/engine/queue.py
  3. +649
    -0
      tests/ut/python/dataset/test_datasets_generator.py

+ 5
- 0
mindspore/python/mindspore/dataset/engine/datasets_user_defined.py View File

@@ -137,6 +137,11 @@ def _convert_row(row):
if isinstance(row, dict):
raise ValueError("Return value in user defined python function should be numpy array, but got dict.")

# just return a numpy value from generator
if isinstance(row, np.ndarray):
value.append(np.array(row))
return tuple(value,)

# convert each column in row into numpy array
for x in row:
if isinstance(x, bytes): # got image bytes from a file


+ 34
- 31
mindspore/python/mindspore/dataset/engine/queue.py View File

@@ -70,7 +70,7 @@ class _SharedQueue(multiprocessing.queues.Queue):
)

def put(self, data, timeout=None):
if isinstance(data, ExceptionHandler):
if isinstance(data, ExceptionHandler): # pylint: disable=too-many-nested-blocks
super().put(data, timeout=timeout)
else:
name_list = []
@@ -79,36 +79,39 @@ class _SharedQueue(multiprocessing.queues.Queue):
if not isinstance(data, tuple) and not isinstance(data, np.ndarray):
raise TypeError("return value of user defined python function in GeneratorDataset or"
" map should be numpy array or tuple of numpy array.")
for r in data:
# the map:pyfunc is a yield generator which can't be serialize
if isinstance(r, types.GeneratorType):
raise TypeError("Can not pickle {} object, please verify pyfunc return with numpy array"
.format(type(r)))
if (isinstance(r, np.ndarray) and r.size > self.min_shared_mem
and start_bytes + r.nbytes < self.seg_size):
# need to convert start_bytes to offset in array
start_offset = start_bytes
dest = np.ndarray(r.shape, r.dtype, buffer=self.shm_list[self.seg_pos].get_obj(),
offset=start_offset)
np.copyto(dest, r)
byte = r.nbytes
byte = 8 * ((byte + 7) // 8)
start_bytes += byte
name_list.append((self.data_shared, self.seg_pos, byte, r.dtype, r.shape))
count += 1
else:
if isinstance(r, np.ndarray) and r.size >= self.min_shared_mem:
# Only print out error the first time it happens
if self.print_error:
logger.warning(
"Using shared memory queue, but rowsize is larger than allocated memory "
+ "max_rowsize "
+ str(self.seg_size)
+ " current rowsize "
+ str(start_bytes + r.nbytes)
)
self.print_error = False
name_list.append((self.data_immediate, r))
if isinstance(data, np.ndarray):
name_list.append((self.data_immediate, np.array(data)))
else:
for r in data:
# the map:pyfunc is a yield generator which can't be serialize
if isinstance(r, types.GeneratorType):
raise TypeError("Can not pickle {} object, please verify pyfunc return with numpy array"
.format(type(r)))
if (isinstance(r, np.ndarray) and r.size > self.min_shared_mem
and start_bytes + r.nbytes < self.seg_size):
# need to convert start_bytes to offset in array
start_offset = start_bytes
dest = np.ndarray(r.shape, r.dtype, buffer=self.shm_list[self.seg_pos].get_obj(),
offset=start_offset)
np.copyto(dest, r)
byte = r.nbytes
byte = 8 * ((byte + 7) // 8)
start_bytes += byte
name_list.append((self.data_shared, self.seg_pos, byte, r.dtype, r.shape))
count += 1
else:
if isinstance(r, np.ndarray) and r.size >= self.min_shared_mem:
# Only print out error the first time it happens
if self.print_error:
logger.warning(
"Using shared memory queue, but rowsize is larger than allocated memory "
+ "max_rowsize "
+ str(self.seg_size)
+ " current rowsize "
+ str(start_bytes + r.nbytes)
)
self.print_error = False
name_list.append((self.data_immediate, r))
super().put(name_list, timeout=timeout)
# note above could generate a queue full exception. It will be handled by teh caller
# only increment seg_pos after successfully adding to metadata queue


+ 649
- 0
tests/ut/python/dataset/test_datasets_generator.py View File

@@ -990,6 +990,652 @@ def test_generator_mixed_operator():
pass


def test_generator_with_single_numpy():
"""
Feature: Test GeneratorDataset with single numpy and multi columns when use __getitem__
Description: single numpy, tuple numpy with single columns and multi columns
Expectation: success
"""
class get_dataset_generator:
def __init__(self, value):
np.random.seed(58)
self.__value = value

def __getitem__(self, index):
return self.__value

def __len__(self):
return 20

def test_generator_one_column(value):
number = np.random.randint(1, 4)
process_flag = False
if number > 1 and number % 2 == 0:
process_flag = True
dataset_generator = get_dataset_generator(value)
dataset = ds.GeneratorDataset(dataset_generator, ["data"], shuffle=False, num_parallel_workers=number,
python_multiprocessing=process_flag)
count = 0
for data in dataset.create_dict_iterator(output_numpy=True):
assert (data["data"] == value).all()
count += 1
assert count == 20

# test user define one column
numpy_1 = np.array(1)
numpy_2 = np.array([1])
numpy_3 = np.array([1, 2])
numpy_4 = np.array([1, 2, 3])
numpy_5 = np.array([[1], [2]])
numpy_6 = np.array([[1, 2], [2, 3]])
numpy_7 = np.array([[1, 2, 3], [2, 3, 4]])
numpy_8 = np.array([[1], [2], [3]])
numpy_9 = np.array([[1, 2], [2, 3], [3, 4]])
numpy_10 = np.array([[1, 2, 3], [2, 3, 4], [3, 4, 5]])
test_generator_one_column(numpy_1)
test_generator_one_column(numpy_2)
test_generator_one_column(numpy_3)
test_generator_one_column(numpy_4)
test_generator_one_column(numpy_5)
test_generator_one_column(numpy_6)
test_generator_one_column(numpy_7)
test_generator_one_column(numpy_8)
test_generator_one_column(numpy_9)
test_generator_one_column(numpy_10)

tuple_1 = (numpy_7,)
dataset_generator = get_dataset_generator(tuple_1)
dataset = ds.GeneratorDataset(dataset_generator, ["data"], shuffle=False)
count = 0
for data in dataset.create_dict_iterator(output_numpy=True):
assert (data["data"] == tuple_1[0]).all()
count += 1
assert count == 20

tuple_2 = (numpy_6, numpy_7)
with pytest.raises(RuntimeError) as info:
dataset_generator = get_dataset_generator(tuple_2)
dataset = ds.GeneratorDataset(dataset_generator, ["data"], shuffle=False)
for data in dataset.create_dict_iterator(output_numpy=True):
print(data["data"])
assert "the 'source' of 'GeneratorDataset' should return same number of NumPy arrays as specified in " \
"column_names," in str(info.value)
assert "the size of column_names is:1 and number of returned NumPy array is:2" in str(info.value)

tuple_4 = (numpy_4, numpy_5, numpy_6, numpy_7)
with pytest.raises(RuntimeError) as info:
dataset_generator = get_dataset_generator(tuple_4)
dataset = ds.GeneratorDataset(dataset_generator, ["data"], shuffle=False)
for data in dataset.create_dict_iterator(output_numpy=True):
print(data["data"])
assert "the 'source' of 'GeneratorDataset' should return same number of NumPy arrays as specified in " \
"column_names," in str(info.value)
assert "the size of column_names is:1 and number of returned NumPy array is:4" in str(info.value)

# test user define two column
def test_generator_two_column(value):
number = np.random.randint(1, 4)
process_flag = False
if number > 1 and number % 2 == 0:
process_flag = True
dataset_generator = get_dataset_generator(value)
dataset = ds.GeneratorDataset(dataset_generator, ["data", "label"], shuffle=False, num_parallel_workers=number,
python_multiprocessing=process_flag)
count = 0
with pytest.raises(RuntimeError) as info:
for data in dataset.create_dict_iterator(output_numpy=True):
print(data)
count += 1
assert count == 20
assert "the 'source' of 'GeneratorDataset' should return same number of NumPy arrays as specified in " \
"column_names," in str(info.value)
assert "the size of column_names is:2 and number of returned NumPy array is:1" in str(info.value)

numpy_1 = np.array(1)
numpy_2 = np.array([1])
numpy_3 = np.array([1, 2])
numpy_4 = np.array([1, 2, 3])
numpy_5 = np.array([[1], [2]])
numpy_6 = np.array([[1, 2], [2, 3]])
numpy_7 = np.array([[1, 2, 3], [2, 3, 4]])
numpy_8 = np.array([[1], [2], [3]])
numpy_9 = np.array([[1, 2], [2, 3], [3, 4]])
numpy_10 = np.array([[1, 2, 3], [2, 3, 4], [3, 4, 5]])
test_generator_two_column(numpy_1)
test_generator_two_column(numpy_2)
test_generator_two_column(numpy_3)
test_generator_two_column(numpy_4)
test_generator_two_column(numpy_5)
test_generator_two_column(numpy_6)
test_generator_two_column(numpy_7)
test_generator_two_column(numpy_8)
test_generator_two_column(numpy_9)
test_generator_two_column(numpy_10)
tuple_1 = (numpy_7,)
test_generator_two_column(tuple_1)

tuple_2 = (numpy_2, numpy_3)
dataset_generator = get_dataset_generator(tuple_2)
dataset = ds.GeneratorDataset(dataset_generator, ["data", "label"], shuffle=False)
count = 0
for data in dataset.create_dict_iterator(output_numpy=True):
assert (data["data"] == numpy_2).all()
assert (data["label"] == numpy_3).all()
count += 1
assert count == 20

tuple_3 = (numpy_4, numpy_5, numpy_6)
with pytest.raises(RuntimeError) as info:
dataset_generator = get_dataset_generator(tuple_3)
dataset = ds.GeneratorDataset(dataset_generator, ["data", "label"], shuffle=False)
for data in dataset.create_dict_iterator(output_numpy=True):
print(data["data"])
assert "the 'source' of 'GeneratorDataset' should return same number of NumPy arrays as specified in " \
"column_names," in str(info.value)
assert "the size of column_names is:2 and number of returned NumPy array is:3" in str(info.value)

# test user define three column
def test_generator_three_column(value):
number = np.random.randint(1, 4)
process_flag = False
if number > 1 and number % 2 == 0:
process_flag = True
dataset_generator = get_dataset_generator(value)
dataset = ds.GeneratorDataset(dataset_generator, ["data", "label", "label2"], shuffle=False,
num_parallel_workers=number, python_multiprocessing=process_flag)
count = 0
with pytest.raises(RuntimeError) as info:
for data in dataset.create_dict_iterator(output_numpy=True):
print(data)
count += 1
assert count == 20
assert "the 'source' of 'GeneratorDataset' should return same number of NumPy arrays as specified in " \
"column_names," in str(info.value)
assert "the size of column_names is:3 and number of returned NumPy array is:1" in str(info.value)

numpy_1 = np.array(1)
numpy_2 = np.array([1])
numpy_3 = np.array([1, 2])
numpy_4 = np.array([1, 2, 3])
numpy_5 = np.array([[1], [2]])
numpy_6 = np.array([[1, 2], [2, 3]])
numpy_7 = np.array([[1, 2, 3], [2, 3, 4]])
numpy_8 = np.array([[1], [2], [3]])
numpy_9 = np.array([[1, 2], [2, 3], [3, 4]])
numpy_10 = np.array([[1, 2, 3], [2, 3, 4], [3, 4, 5]])
test_generator_three_column(numpy_1)
test_generator_three_column(numpy_2)
test_generator_three_column(numpy_3)
test_generator_three_column(numpy_4)
test_generator_three_column(numpy_5)
test_generator_three_column(numpy_6)
test_generator_three_column(numpy_7)
test_generator_three_column(numpy_8)
test_generator_three_column(numpy_9)
test_generator_three_column(numpy_10)
tuple_1 = (numpy_7,)
test_generator_three_column(tuple_1)

tuple_2 = (numpy_2, numpy_3)
with pytest.raises(RuntimeError) as info:
dataset_generator = get_dataset_generator(tuple_2)
dataset = ds.GeneratorDataset(dataset_generator, ["data", "label", "label2"], shuffle=False)
for data in dataset.create_dict_iterator(output_numpy=True):
print(data["data"])
assert "the 'source' of 'GeneratorDataset' should return same number of NumPy arrays as specified in " \
"column_names," in str(info.value)
assert "the size of column_names is:3 and number of returned NumPy array is:2" in str(info.value)

tuple_3 = (numpy_4, numpy_5, numpy_6)
dataset_generator = get_dataset_generator(tuple_3)
dataset = ds.GeneratorDataset(dataset_generator, ["data", "label", "label2"], shuffle=False)
count = 0
for data in dataset.create_dict_iterator(output_numpy=True):
assert (data["data"] == numpy_4).all()
assert (data["label"] == numpy_5).all()
assert (data["label2"] == numpy_6).all()
count += 1
assert count == 20


def test_generator_with_single_numpy_with_next():
"""
Feature: Test GeneratorDataset with single numpy and multi columns when use __next__
Description: single numpy, tuple numpy with single columns and multi columns
Expectation: success
"""
class get_dataset_generator:
def __init__(self, value):
np.random.seed(58)
self.__value = value
self.__index = 0

def __next__(self):
if self.__index >= 20:
raise StopIteration

self.__index += 1
return self.__value

def __iter__(self):
self.__index = 0
return self

def __len__(self):
return 20

def test_generator_one_column(value):
number = np.random.randint(1, 4)
process_flag = False
if number > 1 and number % 2 == 0:
process_flag = True
dataset_generator = get_dataset_generator(value)
dataset = ds.GeneratorDataset(dataset_generator, ["data"], shuffle=False, num_parallel_workers=number,
python_multiprocessing=process_flag)
count = 0
for data in dataset.create_dict_iterator(output_numpy=True):
assert (data["data"] == value).all()
count += 1
assert count == 20

# test user define one column
numpy_1 = np.array(1)
numpy_2 = np.array([1])
numpy_3 = np.array([1, 2])
numpy_4 = np.array([1, 2, 3])
numpy_5 = np.array([[1], [2]])
numpy_6 = np.array([[1, 2], [2, 3]])
numpy_7 = np.array([[1, 2, 3], [2, 3, 4]])
numpy_8 = np.array([[1], [2], [3]])
numpy_9 = np.array([[1, 2], [2, 3], [3, 4]])
numpy_10 = np.array([[1, 2, 3], [2, 3, 4], [3, 4, 5]])
test_generator_one_column(numpy_1)
test_generator_one_column(numpy_2)
test_generator_one_column(numpy_3)
test_generator_one_column(numpy_4)
test_generator_one_column(numpy_5)
test_generator_one_column(numpy_6)
test_generator_one_column(numpy_7)
test_generator_one_column(numpy_8)
test_generator_one_column(numpy_9)
test_generator_one_column(numpy_10)

tuple_1 = (numpy_7,)
dataset_generator = get_dataset_generator(tuple_1)
dataset = ds.GeneratorDataset(dataset_generator, ["data"], shuffle=False)
count = 0
for data in dataset.create_dict_iterator(output_numpy=True):
assert (data["data"] == tuple_1[0]).all()
count += 1
assert count == 20

tuple_2 = (numpy_6, numpy_7)
with pytest.raises(RuntimeError) as info:
dataset_generator = get_dataset_generator(tuple_2)
dataset = ds.GeneratorDataset(dataset_generator, ["data"], shuffle=False)
for data in dataset.create_dict_iterator(output_numpy=True):
print(data["data"])
assert "the 'source' of 'GeneratorDataset' should return same number of NumPy arrays as specified in " \
"column_names," in str(info.value)
assert "the size of column_names is:1 and number of returned NumPy array is:2" in str(info.value)

tuple_3 = (numpy_1, numpy_2)
with pytest.raises(RuntimeError) as info:
dataset_generator = get_dataset_generator(tuple_3)
dataset = ds.GeneratorDataset(dataset_generator, ["data"], shuffle=False)
for data in dataset.create_dict_iterator(output_numpy=True):
print(data["data"])
assert "the 'source' of 'GeneratorDataset' should return same number of NumPy arrays as specified in " \
"column_names," in str(info.value)
assert "the size of column_names is:1 and number of returned NumPy array is:2" in str(info.value)

tuple_4 = (numpy_4, numpy_5, numpy_6, numpy_7)
with pytest.raises(RuntimeError) as info:
dataset_generator = get_dataset_generator(tuple_4)
dataset = ds.GeneratorDataset(dataset_generator, ["data"], shuffle=False)
for data in dataset.create_dict_iterator(output_numpy=True):
print(data["data"])
assert "the 'source' of 'GeneratorDataset' should return same number of NumPy arrays as specified in " \
"column_names," in str(info.value)
assert "the size of column_names is:1 and number of returned NumPy array is:4" in str(info.value)

# test user define two column
def test_generator_two_column(value):
number = np.random.randint(1, 4)
process_flag = False
if number > 1 and number % 2 == 0:
process_flag = True
dataset_generator = get_dataset_generator(value)
dataset = ds.GeneratorDataset(dataset_generator, ["data", "label"], shuffle=False, num_parallel_workers=number,
python_multiprocessing=process_flag)
count = 0
with pytest.raises(RuntimeError) as info:
for data in dataset.create_dict_iterator(output_numpy=True):
print(data)
count += 1
assert count == 20
assert "the 'source' of 'GeneratorDataset' should return same number of NumPy arrays as specified in " \
"column_names," in str(info.value)
assert "the size of column_names is:2 and number of returned NumPy array is:1" in str(info.value)

numpy_1 = np.array(1)
numpy_2 = np.array([1])
numpy_3 = np.array([1, 2])
numpy_4 = np.array([1, 2, 3])
numpy_5 = np.array([[1], [2]])
numpy_6 = np.array([[1, 2], [2, 3]])
numpy_7 = np.array([[1, 2, 3], [2, 3, 4]])
numpy_8 = np.array([[1], [2], [3]])
numpy_9 = np.array([[1, 2], [2, 3], [3, 4]])
numpy_10 = np.array([[1, 2, 3], [2, 3, 4], [3, 4, 5]])
test_generator_two_column(numpy_1)
test_generator_two_column(numpy_2)
test_generator_two_column(numpy_3)
test_generator_two_column(numpy_4)
test_generator_two_column(numpy_5)
test_generator_two_column(numpy_6)
test_generator_two_column(numpy_7)
test_generator_two_column(numpy_8)
test_generator_two_column(numpy_9)
test_generator_two_column(numpy_10)
tuple_1 = (numpy_7,)
test_generator_two_column(tuple_1)

tuple_2 = (numpy_2, numpy_3)
dataset_generator = get_dataset_generator(tuple_2)
dataset = ds.GeneratorDataset(dataset_generator, ["data", "label"], shuffle=False)
count = 0
for data in dataset.create_dict_iterator(output_numpy=True):
assert (data["data"] == numpy_2).all()
assert (data["label"] == numpy_3).all()
count += 1
assert count == 20

tuple_3 = (numpy_4, numpy_5, numpy_6)
with pytest.raises(RuntimeError) as info:
dataset_generator = get_dataset_generator(tuple_3)
dataset = ds.GeneratorDataset(dataset_generator, ["data", "label"], shuffle=False)
for data in dataset.create_dict_iterator(output_numpy=True):
print(data["data"])
assert "the 'source' of 'GeneratorDataset' should return same number of NumPy arrays as specified in " \
"column_names," in str(info.value)
assert "the size of column_names is:2 and number of returned NumPy array is:3" in str(info.value)

# test user define three column
def test_generator_three_column(value):
number = np.random.randint(1, 4)
process_flag = False
if number > 1 and number % 2 == 0:
process_flag = True
dataset_generator = get_dataset_generator(value)
dataset = ds.GeneratorDataset(dataset_generator, ["data", "label", "label2"], shuffle=False,
num_parallel_workers=number, python_multiprocessing=process_flag)
count = 0
with pytest.raises(RuntimeError) as info:
for data in dataset.create_dict_iterator(output_numpy=True):
print(data)
count += 1
assert count == 20
assert "the 'source' of 'GeneratorDataset' should return same number of NumPy arrays as specified in " \
"column_names," in str(info.value)
assert "the size of column_names is:3 and number of returned NumPy array is:1" in str(info.value)

numpy_1 = np.array(1)
numpy_2 = np.array([1])
numpy_3 = np.array([1, 2])
numpy_4 = np.array([1, 2, 3])
numpy_5 = np.array([[1], [2]])
numpy_6 = np.array([[1, 2], [2, 3]])
numpy_7 = np.array([[1, 2, 3], [2, 3, 4]])
numpy_8 = np.array([[1], [2], [3]])
numpy_9 = np.array([[1, 2], [2, 3], [3, 4]])
numpy_10 = np.array([[1, 2, 3], [2, 3, 4], [3, 4, 5]])
test_generator_three_column(numpy_1)
test_generator_three_column(numpy_2)
test_generator_three_column(numpy_3)
test_generator_three_column(numpy_4)
test_generator_three_column(numpy_5)
test_generator_three_column(numpy_6)
test_generator_three_column(numpy_7)
test_generator_three_column(numpy_8)
test_generator_three_column(numpy_9)
test_generator_three_column(numpy_10)
tuple_1 = (numpy_7,)
test_generator_three_column(tuple_1)

tuple_2 = (numpy_2, numpy_3)
with pytest.raises(RuntimeError) as info:
dataset_generator = get_dataset_generator(tuple_2)
dataset = ds.GeneratorDataset(dataset_generator, ["data", "label", "label2"], shuffle=False)
for data in dataset.create_dict_iterator(output_numpy=True):
print(data["data"])
assert "the 'source' of 'GeneratorDataset' should return same number of NumPy arrays as specified in " \
"column_names," in str(info.value)
assert "the size of column_names is:3 and number of returned NumPy array is:2" in str(info.value)

tuple_3 = (numpy_4, numpy_5, numpy_6)
dataset_generator = get_dataset_generator(tuple_3)
dataset = ds.GeneratorDataset(dataset_generator, ["data", "label", "label2"], shuffle=False)
count = 0
for data in dataset.create_dict_iterator(output_numpy=True):
assert (data["data"] == numpy_4).all()
assert (data["label"] == numpy_5).all()
assert (data["label2"] == numpy_6).all()
count += 1
assert count == 20


def test_generator_with_single_numpy_with_yield():
"""
Feature: Test GeneratorDataset with single numpy and multi columns when use yield
Description: single numpy, tuple numpy with single columns and multi columns
Expectation: success
"""
def get_dataset_generator(value):
for _ in range(20):
yield value

def test_generator_one_column(value):
number = np.random.randint(1, 4)
process_flag = False
if number > 1 and number % 2 == 0:
process_flag = True
dataset_generator = get_dataset_generator(value)
dataset = ds.GeneratorDataset(dataset_generator, ["data"], shuffle=False, num_parallel_workers=number,
python_multiprocessing=process_flag)
count = 0
for data in dataset.create_dict_iterator(output_numpy=True):
assert (data["data"] == value).all()
count += 1
assert count == 20

# test user define one column
numpy_1 = np.array(1)
numpy_2 = np.array([1])
numpy_3 = np.array([1, 2])
numpy_4 = np.array([1, 2, 3])
numpy_5 = np.array([[1], [2]])
numpy_6 = np.array([[1, 2], [2, 3]])
numpy_7 = np.array([[1, 2, 3], [2, 3, 4]])
numpy_8 = np.array([[1], [2], [3]])
numpy_9 = np.array([[1, 2], [2, 3], [3, 4]])
numpy_10 = np.array([[1, 2, 3], [2, 3, 4], [3, 4, 5]])
test_generator_one_column(numpy_1)
test_generator_one_column(numpy_2)
test_generator_one_column(numpy_3)
test_generator_one_column(numpy_4)
test_generator_one_column(numpy_5)
test_generator_one_column(numpy_6)
test_generator_one_column(numpy_7)
test_generator_one_column(numpy_8)
test_generator_one_column(numpy_9)
test_generator_one_column(numpy_10)

tuple_1 = (numpy_7,)
dataset_generator = get_dataset_generator(tuple_1)
dataset = ds.GeneratorDataset(dataset_generator, ["data"], shuffle=False)
count = 0
for data in dataset.create_dict_iterator(output_numpy=True):
assert (data["data"] == tuple_1[0]).all()
count += 1
assert count == 20

tuple_2 = (numpy_6, numpy_7)
with pytest.raises(RuntimeError) as info:
dataset_generator = get_dataset_generator(tuple_2)
dataset = ds.GeneratorDataset(dataset_generator, ["data"], shuffle=False)
for data in dataset.create_dict_iterator(output_numpy=True):
print(data["data"])
assert "the 'source' of 'GeneratorDataset' should return same number of NumPy arrays as specified in " \
"column_names," in str(info.value)
assert "the size of column_names is:1 and number of returned NumPy array is:2" in str(info.value)

tuple_3 = (numpy_1, numpy_2)
with pytest.raises(RuntimeError) as info:
dataset_generator = get_dataset_generator(tuple_3)
dataset = ds.GeneratorDataset(dataset_generator, ["data"], shuffle=False)
for data in dataset.create_dict_iterator(output_numpy=True):
print(data["data"])
assert "the 'source' of 'GeneratorDataset' should return same number of NumPy arrays as specified in " \
"column_names," in str(info.value)
assert "the size of column_names is:1 and number of returned NumPy array is:2" in str(info.value)

tuple_4 = (numpy_4, numpy_5, numpy_6, numpy_7)
with pytest.raises(RuntimeError) as info:
dataset_generator = get_dataset_generator(tuple_4)
dataset = ds.GeneratorDataset(dataset_generator, ["data"], shuffle=False)
for data in dataset.create_dict_iterator(output_numpy=True):
print(data["data"])
assert "the 'source' of 'GeneratorDataset' should return same number of NumPy arrays as specified in " \
"column_names," in str(info.value)
assert "the size of column_names is:1 and number of returned NumPy array is:4" in str(info.value)

# test user define two column
def test_generator_two_column(value):
number = np.random.randint(1, 4)
process_flag = False
if number > 1 and number % 2 == 0:
process_flag = True
dataset_generator = get_dataset_generator(value)
dataset = ds.GeneratorDataset(dataset_generator, ["data", "label"], shuffle=False, num_parallel_workers=number,
python_multiprocessing=process_flag)
count = 0
with pytest.raises(RuntimeError) as info:
for data in dataset.create_dict_iterator(output_numpy=True):
print(data)
count += 1
assert count == 20
assert "the 'source' of 'GeneratorDataset' should return same number of NumPy arrays as specified in " \
"column_names," in str(info.value)
assert "the size of column_names is:2 and number of returned NumPy array is:1" in str(info.value)

numpy_1 = np.array(1)
numpy_2 = np.array([1])
numpy_3 = np.array([1, 2])
numpy_4 = np.array([1, 2, 3])
numpy_5 = np.array([[1], [2]])
numpy_6 = np.array([[1, 2], [2, 3]])
numpy_7 = np.array([[1, 2, 3], [2, 3, 4]])
numpy_8 = np.array([[1], [2], [3]])
numpy_9 = np.array([[1, 2], [2, 3], [3, 4]])
numpy_10 = np.array([[1, 2, 3], [2, 3, 4], [3, 4, 5]])
test_generator_two_column(numpy_1)
test_generator_two_column(numpy_2)
test_generator_two_column(numpy_3)
test_generator_two_column(numpy_4)
test_generator_two_column(numpy_5)
test_generator_two_column(numpy_6)
test_generator_two_column(numpy_7)
test_generator_two_column(numpy_8)
test_generator_two_column(numpy_9)
test_generator_two_column(numpy_10)
tuple_1 = (numpy_7,)
test_generator_two_column(tuple_1)

tuple_2 = (numpy_2, numpy_3)
dataset_generator = get_dataset_generator(tuple_2)
dataset = ds.GeneratorDataset(dataset_generator, ["data", "label"], shuffle=False)
count = 0
for data in dataset.create_dict_iterator(output_numpy=True):
assert (data["data"] == numpy_2).all()
assert (data["label"] == numpy_3).all()
count += 1
assert count == 20

tuple_3 = (numpy_4, numpy_5, numpy_6)
with pytest.raises(RuntimeError) as info:
dataset_generator = get_dataset_generator(tuple_3)
dataset = ds.GeneratorDataset(dataset_generator, ["data", "label"], shuffle=False)
for data in dataset.create_dict_iterator(output_numpy=True):
print(data["data"])
assert "the 'source' of 'GeneratorDataset' should return same number of NumPy arrays as specified in " \
"column_names," in str(info.value)
assert "the size of column_names is:2 and number of returned NumPy array is:3" in str(info.value)

# test user define three column
def test_generator_three_column(value):
number = np.random.randint(1, 4)
process_flag = False
if number > 1 and number % 2 == 0:
process_flag = True
dataset_generator = get_dataset_generator(value)
dataset = ds.GeneratorDataset(dataset_generator, ["data", "label", "label2"], shuffle=False,
num_parallel_workers=number, python_multiprocessing=process_flag)
count = 0
with pytest.raises(RuntimeError) as info:
for data in dataset.create_dict_iterator(output_numpy=True):
print(data)
count += 1
assert count == 20
assert "the 'source' of 'GeneratorDataset' should return same number of NumPy arrays as specified in " \
"column_names," in str(info.value)
assert "the size of column_names is:3 and number of returned NumPy array is:1" in str(info.value)

numpy_1 = np.array(1)
numpy_2 = np.array([1])
numpy_3 = np.array([1, 2])
numpy_4 = np.array([1, 2, 3])
numpy_5 = np.array([[1], [2]])
numpy_6 = np.array([[1, 2], [2, 3]])
numpy_7 = np.array([[1, 2, 3], [2, 3, 4]])
numpy_8 = np.array([[1], [2], [3]])
numpy_9 = np.array([[1, 2], [2, 3], [3, 4]])
numpy_10 = np.array([[1, 2, 3], [2, 3, 4], [3, 4, 5]])
test_generator_three_column(numpy_1)
test_generator_three_column(numpy_2)
test_generator_three_column(numpy_3)
test_generator_three_column(numpy_4)
test_generator_three_column(numpy_5)
test_generator_three_column(numpy_6)
test_generator_three_column(numpy_7)
test_generator_three_column(numpy_8)
test_generator_three_column(numpy_9)
test_generator_three_column(numpy_10)
tuple_1 = (numpy_7,)
test_generator_three_column(tuple_1)

tuple_2 = (numpy_2, numpy_3)
with pytest.raises(RuntimeError) as info:
dataset_generator = get_dataset_generator(tuple_2)
dataset = ds.GeneratorDataset(dataset_generator, ["data", "label", "label2"], shuffle=False)
for data in dataset.create_dict_iterator(output_numpy=True):
print(data["data"])
assert "the 'source' of 'GeneratorDataset' should return same number of NumPy arrays as specified in " \
"column_names," in str(info.value)
assert "the size of column_names is:3 and number of returned NumPy array is:2" in str(info.value)

tuple_3 = (numpy_4, numpy_5, numpy_6)
dataset_generator = get_dataset_generator(tuple_3)
dataset = ds.GeneratorDataset(dataset_generator, ["data", "label", "label2"], shuffle=False)
count = 0
for data in dataset.create_dict_iterator(output_numpy=True):
assert (data["data"] == numpy_4).all()
assert (data["label"] == numpy_5).all()
assert (data["label2"] == numpy_6).all()
count += 1
assert count == 20


if __name__ == "__main__":
test_generator_0()
test_generator_1()
@@ -1031,3 +1677,6 @@ if __name__ == "__main__":
test_func_generator_dataset_005()
test_func_generator_dataset_with_zip_source()
test_generator_mixed_operator()
test_generator_with_single_numpy()
test_generator_with_single_numpy_with_next()
test_generator_with_single_numpy_with_yield()

Loading…
Cancel
Save