Browse Source

!12862 Increase default level of parallelism in minddata pipeline

From: @robingrosman
Reviewed-by: 
Signed-off-by:
tags/v1.2.0-rc1
mindspore-ci-bot Gitee 5 years ago
parent
commit
236c874e64
7 changed files with 37 additions and 10 deletions
  1. +1
    -1
      mindspore/ccsrc/minddata/dataset/core/constants.h
  2. +19
    -3
      mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.cc
  3. +7
    -1
      mindspore/ccsrc/minddata/dataset/engine/datasetops/parallel_op.cc
  4. +6
    -1
      mindspore/dataset/core/config.py
  5. +1
    -1
      tests/ut/cpp/dataset/c_api_dataset_config_test.cc
  6. +1
    -1
      tests/ut/data/dataset/declient.cfg
  7. +2
    -2
      tests/ut/python/dataset/test_config.py

+ 1
- 1
mindspore/ccsrc/minddata/dataset/core/constants.h View File

@@ -87,7 +87,7 @@ constexpr int64_t kDeMaxFreq = std::numeric_limits<int64_t>::max(); // 92233720
constexpr int64_t kDeMaxTopk = std::numeric_limits<int64_t>::max();

constexpr uint32_t kCfgRowsPerBuffer = 1;
constexpr uint32_t kCfgParallelWorkers = 4;
constexpr uint32_t kCfgParallelWorkers = 8;
constexpr uint32_t kCfgWorkerConnectorSize = 16;
constexpr uint32_t kCfgOpConnectorSize = 16;
constexpr int32_t kCfgDefaultRankId = -1;


+ 19
- 3
mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.cc View File

@@ -77,7 +77,15 @@ BatchOp::BatchOp(int32_t batch_size, bool drop, bool pad, int32_t op_queue_size,
pad_info_(pad_map),
batch_num_(0),
batch_cnt_(0) {
worker_queues_.Init(num_workers, op_queue_size);
// Adjust connector queue size. After batch each row is batch_size times larger
int32_t queue_size;
queue_size = std::max(1, op_queue_size / start_batch_size_);
if (num_workers == 1) {
// ensure there is at least 2 queue slots for whole operation.. If only 1 worker, incrase it to 2
queue_size = std::max(2, queue_size);
}

worker_queues_.Init(num_workers, queue_size);
}
// if PYTHON is disabled. per_batch_map can't be used
#else
@@ -88,8 +96,16 @@ BatchOp::BatchOp(int32_t batch_size, bool drop, bool pad, int32_t op_queue_size,
drop_(drop),
pad_(pad),
in_col_names_(cols_to_map),
pad_info_(pad_map) {
worker_queues_.Init(num_workers, op_queue_size);
pad_info_(pad_map),
batch_num_(0),
batch_cnt_(0) {
int32_t queue_size;
queue_size = std::max(1, op_queue_size / start_batch_size_);
if (num_workers == 1) {
// ensure there is at least 2 queue slots for whole operation.. If only 1 worker, incrase it to 2
queue_size = std::max(2, queue_size);
}
worker_queues_.Init(num_workers, queue_size);
}
#endif



+ 7
- 1
mindspore/ccsrc/minddata/dataset/engine/datasetops/parallel_op.cc View File

@@ -33,7 +33,13 @@ ParallelOp::ParallelOp(int32_t num_workers, int32_t op_connector_size, std::shar
worker_connector_size_(1),
worker_connector_(nullptr),
num_workers_paused_(0),
epoch_sync_flag_(false) {}
epoch_sync_flag_(false) {
// reduce excessive memory usage with high parallelism
// when num_workers > 4, reduce op_connector_size to have similar total size if there were only 4 workers
if (num_workers_ > 4) {
oc_queue_size_ = std::max(1, op_connector_size * 4 / num_workers_);
}
}

// Creates the internal worker connector for the parallel op if the derived class wants to use it
Status ParallelOp::CreateWorkerConnector(int32_t worker_connector_size) {


+ 6
- 1
mindspore/dataset/core/config.py View File

@@ -112,11 +112,16 @@ def set_prefetch_size(size):
Set the number of rows to be prefetched.

Args:
size (int): Total number of rows to be prefetched.
size (int): Total number of rows to be prefetched per operator per parallel worker.

Raises:
ValueError: If prefetch_size is invalid (<= 0 or > MAX_INT_32).

Note:
Since total memory used for prefetch can grow very large with high number of workers,
when number of workers is > 4, the per worker prefetch size will be reduced. The actual
prefetch size at runtime per worker will be prefetchsize * (4 / num_parallel_workers).

Examples:
>>> # Set a new global configuration value for the prefetch size.
>>> ds.config.set_prefetch_size(1000)


+ 1
- 1
tests/ut/cpp/dataset/c_api_dataset_config_test.cc View File

@@ -42,7 +42,7 @@ TEST_F(MindDataTestPipeline, TestConfigSetting) {
EXPECT_EQ(load_status, true);

// Test configuration loaded
EXPECT_EQ(config::get_num_parallel_workers(), 4);
EXPECT_EQ(config::get_num_parallel_workers(), 8);
EXPECT_EQ(config::get_prefetch_size(), 16);
EXPECT_EQ(config::get_seed(), 5489);
EXPECT_EQ(config::get_monitor_sampling_interval(), 15);


+ 1
- 1
tests/ut/data/dataset/declient.cfg View File

@@ -1,7 +1,7 @@
{
"logFilePath": "/tmp",
"rowsPerBuffer": 1,
"numParallelWorkers": 4,
"numParallelWorkers": 8,
"workerConnectorSize": 16,
"opConnectorSize": 16,
"seed": 5489,


+ 2
- 2
tests/ut/python/dataset/test_config.py View File

@@ -44,7 +44,7 @@ def test_basic():
ds.config.load('../data/dataset/declient.cfg')

# assert ds.config.get_rows_per_buffer() == 32
assert ds.config.get_num_parallel_workers() == 4
assert ds.config.get_num_parallel_workers() == 8
# assert ds.config.get_worker_connector_size() == 16
assert ds.config.get_prefetch_size() == 16
assert ds.config.get_seed() == 5489
@@ -348,7 +348,7 @@ def test_deterministic_python_seed_multi_thread():
try:
np.testing.assert_equal(data1_output, data2_output)
except Exception as e:
# expect output to not match during multi-threaded excution
# expect output to not match during multi-threaded execution
logger.info("Got an exception in DE: {}".format(str(e)))
assert "Array" in str(e)



Loading…
Cancel
Save