Adjust memory usage to not increase as parallelism increases. it will stay at same level it would be with 4 parallelismtags/v1.2.0-rc1
| @@ -87,7 +87,7 @@ constexpr int64_t kDeMaxFreq = std::numeric_limits<int64_t>::max(); // 92233720 | |||||
| constexpr int64_t kDeMaxTopk = std::numeric_limits<int64_t>::max(); | constexpr int64_t kDeMaxTopk = std::numeric_limits<int64_t>::max(); | ||||
| constexpr uint32_t kCfgRowsPerBuffer = 1; | constexpr uint32_t kCfgRowsPerBuffer = 1; | ||||
| constexpr uint32_t kCfgParallelWorkers = 4; | |||||
| constexpr uint32_t kCfgParallelWorkers = 8; | |||||
| constexpr uint32_t kCfgWorkerConnectorSize = 16; | constexpr uint32_t kCfgWorkerConnectorSize = 16; | ||||
| constexpr uint32_t kCfgOpConnectorSize = 16; | constexpr uint32_t kCfgOpConnectorSize = 16; | ||||
| constexpr int32_t kCfgDefaultRankId = -1; | constexpr int32_t kCfgDefaultRankId = -1; | ||||
| @@ -77,7 +77,15 @@ BatchOp::BatchOp(int32_t batch_size, bool drop, bool pad, int32_t op_queue_size, | |||||
| pad_info_(pad_map), | pad_info_(pad_map), | ||||
| batch_num_(0), | batch_num_(0), | ||||
| batch_cnt_(0) { | batch_cnt_(0) { | ||||
| worker_queues_.Init(num_workers, op_queue_size); | |||||
| // Adjust connector queue size. After batch each row is batch_size times larger | |||||
| int32_t queue_size; | |||||
| queue_size = std::max(1, op_queue_size / start_batch_size_); | |||||
| if (num_workers == 1) { | |||||
| // ensure there is at least 2 queue slots for whole operation.. If only 1 worker, incrase it to 2 | |||||
| queue_size = std::max(2, queue_size); | |||||
| } | |||||
| worker_queues_.Init(num_workers, queue_size); | |||||
| } | } | ||||
| // if PYTHON is disabled. per_batch_map can't be used | // if PYTHON is disabled. per_batch_map can't be used | ||||
| #else | #else | ||||
| @@ -88,8 +96,16 @@ BatchOp::BatchOp(int32_t batch_size, bool drop, bool pad, int32_t op_queue_size, | |||||
| drop_(drop), | drop_(drop), | ||||
| pad_(pad), | pad_(pad), | ||||
| in_col_names_(cols_to_map), | in_col_names_(cols_to_map), | ||||
| pad_info_(pad_map) { | |||||
| worker_queues_.Init(num_workers, op_queue_size); | |||||
| pad_info_(pad_map), | |||||
| batch_num_(0), | |||||
| batch_cnt_(0) { | |||||
| int32_t queue_size; | |||||
| queue_size = std::max(1, op_queue_size / start_batch_size_); | |||||
| if (num_workers == 1) { | |||||
| // ensure there is at least 2 queue slots for whole operation.. If only 1 worker, incrase it to 2 | |||||
| queue_size = std::max(2, queue_size); | |||||
| } | |||||
| worker_queues_.Init(num_workers, queue_size); | |||||
| } | } | ||||
| #endif | #endif | ||||
| @@ -33,7 +33,13 @@ ParallelOp::ParallelOp(int32_t num_workers, int32_t op_connector_size, std::shar | |||||
| worker_connector_size_(1), | worker_connector_size_(1), | ||||
| worker_connector_(nullptr), | worker_connector_(nullptr), | ||||
| num_workers_paused_(0), | num_workers_paused_(0), | ||||
| epoch_sync_flag_(false) {} | |||||
| epoch_sync_flag_(false) { | |||||
| // reduce excessive memory usage with high parallelism | |||||
| // when num_workers > 4, reduce op_connector_size to have similar total size if there were only 4 workers | |||||
| if (num_workers_ > 4) { | |||||
| oc_queue_size_ = std::max(1, op_connector_size * 4 / num_workers_); | |||||
| } | |||||
| } | |||||
| // Creates the internal worker connector for the parallel op if the derived class wants to use it | // Creates the internal worker connector for the parallel op if the derived class wants to use it | ||||
| Status ParallelOp::CreateWorkerConnector(int32_t worker_connector_size) { | Status ParallelOp::CreateWorkerConnector(int32_t worker_connector_size) { | ||||
| @@ -112,11 +112,16 @@ def set_prefetch_size(size): | |||||
| Set the number of rows to be prefetched. | Set the number of rows to be prefetched. | ||||
| Args: | Args: | ||||
| size (int): Total number of rows to be prefetched. | |||||
| size (int): Total number of rows to be prefetched per operator per parallel worker. | |||||
| Raises: | Raises: | ||||
| ValueError: If prefetch_size is invalid (<= 0 or > MAX_INT_32). | ValueError: If prefetch_size is invalid (<= 0 or > MAX_INT_32). | ||||
| Note: | |||||
| Since total memory used for prefetch can grow very large with high number of workers, | |||||
| when number of workers is > 4, the per worker prefetch size will be reduced. The actual | |||||
| prefetch size at runtime per worker will be prefetchsize * (4 / num_parallel_workers). | |||||
| Examples: | Examples: | ||||
| >>> # Set a new global configuration value for the prefetch size. | >>> # Set a new global configuration value for the prefetch size. | ||||
| >>> ds.config.set_prefetch_size(1000) | >>> ds.config.set_prefetch_size(1000) | ||||
| @@ -42,7 +42,7 @@ TEST_F(MindDataTestPipeline, TestConfigSetting) { | |||||
| EXPECT_EQ(load_status, true); | EXPECT_EQ(load_status, true); | ||||
| // Test configuration loaded | // Test configuration loaded | ||||
| EXPECT_EQ(config::get_num_parallel_workers(), 4); | |||||
| EXPECT_EQ(config::get_num_parallel_workers(), 8); | |||||
| EXPECT_EQ(config::get_prefetch_size(), 16); | EXPECT_EQ(config::get_prefetch_size(), 16); | ||||
| EXPECT_EQ(config::get_seed(), 5489); | EXPECT_EQ(config::get_seed(), 5489); | ||||
| EXPECT_EQ(config::get_monitor_sampling_interval(), 15); | EXPECT_EQ(config::get_monitor_sampling_interval(), 15); | ||||
| @@ -1,7 +1,7 @@ | |||||
| { | { | ||||
| "logFilePath": "/tmp", | "logFilePath": "/tmp", | ||||
| "rowsPerBuffer": 1, | "rowsPerBuffer": 1, | ||||
| "numParallelWorkers": 4, | |||||
| "numParallelWorkers": 8, | |||||
| "workerConnectorSize": 16, | "workerConnectorSize": 16, | ||||
| "opConnectorSize": 16, | "opConnectorSize": 16, | ||||
| "seed": 5489, | "seed": 5489, | ||||
| @@ -44,7 +44,7 @@ def test_basic(): | |||||
| ds.config.load('../data/dataset/declient.cfg') | ds.config.load('../data/dataset/declient.cfg') | ||||
| # assert ds.config.get_rows_per_buffer() == 32 | # assert ds.config.get_rows_per_buffer() == 32 | ||||
| assert ds.config.get_num_parallel_workers() == 4 | |||||
| assert ds.config.get_num_parallel_workers() == 8 | |||||
| # assert ds.config.get_worker_connector_size() == 16 | # assert ds.config.get_worker_connector_size() == 16 | ||||
| assert ds.config.get_prefetch_size() == 16 | assert ds.config.get_prefetch_size() == 16 | ||||
| assert ds.config.get_seed() == 5489 | assert ds.config.get_seed() == 5489 | ||||
| @@ -348,7 +348,7 @@ def test_deterministic_python_seed_multi_thread(): | |||||
| try: | try: | ||||
| np.testing.assert_equal(data1_output, data2_output) | np.testing.assert_equal(data1_output, data2_output) | ||||
| except Exception as e: | except Exception as e: | ||||
| # expect output to not match during multi-threaded excution | |||||
| # expect output to not match during multi-threaded execution | |||||
| logger.info("Got an exception in DE: {}".format(str(e))) | logger.info("Got an exception in DE: {}".format(str(e))) | ||||
| assert "Array" in str(e) | assert "Array" in str(e) | ||||