Browse Source

!16384 Fix python multiprocessing with shared memory for edge case

From: @robingrosman
Reviewed-by: @mikef
Signed-off-by: @mikef
tags/v1.3.0
mindspore-ci-bot Gitee 4 years ago
parent
commit
ec9fc5fb3b
5 changed files with 35 additions and 4 deletions
  1. +1
    -1
      mindspore/ccsrc/minddata/dataset/core/config_manager.cc
  2. +1
    -1
      mindspore/dataset/core/config.py
  3. +2
    -2
      mindspore/dataset/engine/queue.py
  4. +30
    -0
      tests/ut/python/dataset/test_datasets_generator.py
  5. +1
    -0
      tests/ut/python/dataset/test_pyfunc.py

+ 1
- 1
mindspore/ccsrc/minddata/dataset/core/config_manager.cc View File

@@ -50,7 +50,7 @@ ConfigManager::ConfigManager()
num_cpu_threads_(std::thread::hardware_concurrency()),
auto_num_workers_num_shards_(1),
auto_worker_config_(0),
enable_shared_mem_(false) {
enable_shared_mem_(true) {
num_cpu_threads_ = num_cpu_threads_ > 0 ? num_cpu_threads_ : std::numeric_limits<uint16_t>::max();
num_parallel_workers_ = num_parallel_workers_ < num_cpu_threads_ ? num_parallel_workers_ : num_cpu_threads_;
auto env_cache_host = std::getenv("MS_CACHE_HOST");


+ 1
- 1
mindspore/dataset/core/config.py View File

@@ -381,7 +381,7 @@ def get_enable_shared_mem():


Returns:
bool, the state of shared mem enabled variable (default: True).
bool, the state of shared mem enabled variable (default=True).
"""
return _config.get_enable_shared_mem()



+ 2
- 2
mindspore/dataset/engine/queue.py View File

@@ -66,7 +66,7 @@ class _SharedQueue(multiprocessing.queues.Queue):
if (isinstance(r, np.ndarray) and r.size > self.min_shared_mem and
start_bytes + r.nbytes < self.seg_size):
##need to convert start_bytes to offset in array
start_offset = start_bytes // r.dtype.itemsize
start_offset = start_bytes
dest = np.ndarray(r.shape, r.dtype, buffer=self.shm_list[self.seg_pos].get_obj(), offset=start_offset)
np.copyto(dest, r)
byte = r.nbytes
@@ -101,7 +101,7 @@ class _SharedQueue(multiprocessing.queues.Queue):
byte = x[2]
dtype = x[3]
shape = x[4]
start_offset = start_bytes // dtype.itemsize
start_offset = start_bytes
b = self.shm_list[seg_pos]
data = np.ndarray(shape, dtype, buffer=b.get_obj(), offset=start_offset)
start_bytes += byte


+ 30
- 0
tests/ut/python/dataset/test_datasets_generator.py View File

@@ -36,6 +36,15 @@ class DatasetGenerator:

def __len__(self):
return 10
class DatasetGeneratorLarge:
def __init__(self):
self.data = np.array(range(4000))

def __getitem__(self, item):
return (self.data + item, self.data *10)

def __len__(self):
return 10


def test_generator_0():
@@ -503,6 +512,26 @@ def test_generator_18():
golden = np.array([i * 5])
np.testing.assert_array_equal(item["out0"], golden)

def test_generator_19():
"""
Test multiprocessing flag with 2 different large columns
"""
logger.info("Test map column order when input_columns is None.")

# apply dataset operations
data1 = ds.GeneratorDataset(DatasetGeneratorLarge(), ["col0", "col1"], python_multiprocessing=True, shuffle=False)

# Expected column order is |out0|col1|
i = 0
for item in data1.create_tuple_iterator(num_epochs=1, output_numpy=True):
assert len(item) == 2
golden = np.array(range(4000)) + i
np.testing.assert_array_equal(item[0], golden)
golden = np.array(range(4000)) * 10
np.testing.assert_array_equal(item[1], golden)
i = i + 1


def test_generator_error_1():
def generator_np():
for i in range(64):
@@ -804,6 +833,7 @@ if __name__ == "__main__":
test_generator_16()
test_generator_17()
test_generator_18()
test_generator_19()
test_generator_error_1()
test_generator_error_2()
test_generator_error_3()


+ 1
- 0
tests/ut/python/dataset/test_pyfunc.py View File

@@ -341,6 +341,7 @@ if __name__ == "__main__":
test_case_7()
test_case_8()
test_case_9()
test_case_10()
test_pyfunc_implicit_compose()
test_pyfunc_exception()
skip_test_pyfunc_exception_multiprocess()

Loading…
Cancel
Save