From: @xiefangqi Reviewed-by: @pandoublefeng,@liucunwei Signed-off-by: @liucunweitags/v1.1.0
| @@ -35,6 +35,7 @@ ConfigManager::ConfigManager() | |||
| num_parallel_workers_(kCfgParallelWorkers), | |||
| worker_connector_size_(kCfgWorkerConnectorSize), | |||
| op_connector_size_(kCfgOpConnectorSize), | |||
| rank_id_(kCfgDefaultRankId), | |||
| seed_(kCfgDefaultSeed), | |||
| monitor_sampling_interval_(kCfgMonitorSamplingInterval), | |||
| callback_timout_(kCfgCallbackTimeout), | |||
| @@ -128,7 +129,7 @@ void ConfigManager::set_op_connector_size(int32_t connector_size) { op_connector | |||
| uint32_t ConfigManager::seed() const { return seed_; } | |||
| void ConfigManager::set_rank_id(uint32_t rank_id) { rank_id_ = rank_id; } | |||
| void ConfigManager::set_rank_id(int32_t rank_id) { rank_id_ = rank_id; } | |||
| void ConfigManager::set_seed(uint32_t seed) { seed_ = seed; } | |||
| @@ -148,11 +148,11 @@ class ConfigManager { | |||
| // for standalone scenario, this rank_id may come from env 'CUDA_VISIBLE_DEVICES', | |||
| // but for distribute scenario, this rank_id come from _get_global_rank() in python | |||
| // @return Get the current device id, for one process, it's only with one rank_id. | |||
| uint32_t rank_id() const { return rank_id_; } | |||
| int32_t rank_id() const { return rank_id_; } | |||
| // setter function | |||
| // @param rank_id - Set the current device id | |||
| void set_rank_id(uint32_t rank_id); | |||
| void set_rank_id(int32_t rank_id); | |||
| uint32_t seed() const; | |||
| @@ -210,7 +210,7 @@ class ConfigManager { | |||
| // This rank_id is for numa and device_queue, one process work with only one rank_id, | |||
| // for standalone scenario, this rank_id may come from env 'CUDA_VISIBLE_DEVICES', | |||
| // but for distribute scenario, this rank_id come from _get_global_rank() in python | |||
| uint32_t rank_id_; | |||
| int32_t rank_id_; | |||
| uint32_t seed_; | |||
| uint32_t monitor_sampling_interval_; | |||
| uint32_t callback_timout_; | |||
| @@ -84,6 +84,7 @@ constexpr uint32_t kCfgRowsPerBuffer = 1; | |||
| constexpr uint32_t kCfgParallelWorkers = 4; | |||
| constexpr uint32_t kCfgWorkerConnectorSize = 16; | |||
| constexpr uint32_t kCfgOpConnectorSize = 16; | |||
| constexpr int32_t kCfgDefaultRankId = -1; | |||
| constexpr uint32_t kCfgDefaultSeed = std::mt19937::default_seed; | |||
| constexpr uint32_t kCfgMonitorSamplingInterval = 10; | |||
| constexpr uint32_t kCfgCallbackTimeout = 60; // timeout value for callback in seconds | |||
| @@ -18,7 +18,7 @@ | |||
| #include <string> | |||
| #include <utility> | |||
| #include <limits> | |||
| #if defined(NUMA_ENABLED) && defined(ENABLE_GPUQUE) | |||
| #if defined(NUMA_ENABLED) && (defined(ENABLE_GPUQUE) || defined(ENABLE_TDTQUE)) | |||
| #include <numa.h> | |||
| #endif | |||
| #include "minddata/dataset/engine/datasetops/dataset_op.h" | |||
| @@ -46,7 +46,7 @@ ExecutionTree::ExecutionTree() : id_count_(0), pre_pass_override_(nullptr) { | |||
| prepare_flags_ = kDePrepNone; | |||
| profiling_manager_ = std::make_unique<ProfilingManager>(this); | |||
| optimize_ = common::GetEnv("OPTIMIZE") == "true" ? true : false; | |||
| #if defined(NUMA_ENABLED) && defined(ENABLE_GPUQUE) | |||
| #if defined(NUMA_ENABLED) && (defined(ENABLE_GPUQUE) || defined(ENABLE_TDTQUE)) | |||
| std::shared_ptr<ConfigManager> cfg = GlobalContext::config_manager(); | |||
| rank_id_ = cfg->rank_id(); | |||
| #endif | |||
| @@ -145,7 +145,7 @@ Status ExecutionTree::Launch() { | |||
| // opencv limit too many threads | |||
| #ifndef ENABLE_ANDROID | |||
| #if !defined(_WIN32) && !defined(_WIN64) && !defined(__APPLE__) | |||
| #if defined(NUMA_ENABLED) && defined(ENABLE_GPUQUE) | |||
| #if defined(NUMA_ENABLED) && (defined(ENABLE_GPUQUE) || defined(ENABLE_TDTQUE)) | |||
| // Here we do numa bind for performance optimization, as our test result, | |||
| // if we do numa bind when get_dataset_size launch a tree, we'll get a | |||
| // better performance than only we do numa bind at the time _To_Device | |||
| @@ -155,7 +155,10 @@ Status ExecutionTree::Launch() { | |||
| // Now we only test pass in GPU scenario, we've not tested D scenario, | |||
| // without enough test we don't suggest numa feature open in D scenario | |||
| int numa_node_max_id = numa_max_node(); | |||
| if (numa_node_max_id >= 0 && rank_id_ >= 0) { | |||
| if (numa_node_max_id < 0) { | |||
| RETURN_STATUS_UNEXPECTED("Get numa max node failed."); | |||
| } | |||
| if (rank_id_ >= 0) { | |||
| uint32_t numa_bind_id = static_cast<uint32_t>(rank_id_ % (numa_node_max_id + 1)); | |||
| auto bm = numa_allocate_nodemask(); | |||
| numa_bitmask_clearall(bm); | |||
| @@ -163,7 +166,7 @@ Status ExecutionTree::Launch() { | |||
| numa_bind(bm); | |||
| numa_bitmask_free(bm); | |||
| } else { | |||
| RETURN_STATUS_UNEXPECTED("Get numa max node failed."); | |||
| MS_LOG(INFO) << "Numa bind feature doesn't work now."; | |||
| } | |||
| #endif | |||
| int32_t thread_num = get_nprocs(); | |||
| @@ -282,11 +282,11 @@ class ExecutionTree { | |||
| bool optimize_; // Flag to enable optional optimizations | |||
| std::function<OptPass(OptPass)> pre_pass_override_; // function ptr that overrides pre pass, called in PrePrepare() | |||
| bool partially_prepare_; // Temp: during migration to IR, if true, run remaining passes. | |||
| #if defined(NUMA_ENABLED) && defined(ENABLE_GPUQUE) | |||
| #if defined(NUMA_ENABLED) && (defined(ENABLE_GPUQUE) || defined(ENABLE_TDTQUE)) | |||
| // This rank_id is for numa and device_queue, one process work with only one rank_id, | |||
| // for standalone scenario, this rank_id may come from env 'CUDA_VISIBLE_DEVICES', | |||
| // but for distribute scenario, this rank_id come from _get_global_rank() in python | |||
| uint32_t rank_id_; | |||
| int32_t rank_id_; | |||
| #endif | |||
| }; | |||
| } // namespace dataset | |||
| @@ -41,7 +41,7 @@ def _init_device_info(): | |||
| """ | |||
| from mindspore import context | |||
| from mindspore.parallel._auto_parallel_context import auto_parallel_context | |||
| from mindspore.parallel._utils import _get_global_rank | |||
| from mindspore.parallel._utils import _get_global_rank, _get_device_num | |||
| if context.get_context("device_target") == "GPU": | |||
| rank_id = _get_global_rank() | |||
| parallel_mode = auto_parallel_context().get_parallel_mode() | |||
| @@ -52,6 +52,12 @@ def _init_device_info(): | |||
| if cuda_id != rank_id: | |||
| rank_id = cuda_id | |||
| _config.set_rank_id(rank_id) | |||
| elif context.get_context("device_target") == "Ascend": | |||
| rank_id = _get_global_rank() | |||
| device_num = _get_device_num() | |||
| # Ascend only support multi-process scenario | |||
| if device_num > 1: | |||
| _config.set_rank_id(rank_id) | |||
| def set_seed(seed): | |||
| @@ -85,9 +85,11 @@ def create_dataset_imagenet(dataset_path, batch_size=32, repeat_num=1, training= | |||
| device_num, rank_id = _get_rank_info() | |||
| cfg = alexnet_imagenet_cfg | |||
| if num_parallel_workers is None: | |||
| num_parallel_workers = int(64 / device_num) | |||
| data_set = ds.ImageFolderDataset(dataset_path, num_parallel_workers=num_parallel_workers, | |||
| num_parallel_workers = 16 | |||
| if device_num == 1: | |||
| num_parallel_workers = 48 | |||
| ds.config.set_prefetch_size(8) | |||
| data_set = ds.ImageFolderDataset(dataset_path, num_parallel_workers=4, | |||
| shuffle=shuffle, sampler=sampler, class_indexing=class_indexing, | |||
| num_shards=device_num, shard_id=rank_id) | |||
| @@ -113,18 +115,14 @@ def create_dataset_imagenet(dataset_path, batch_size=32, repeat_num=1, training= | |||
| CV.HWC2CHW() | |||
| ] | |||
| transform_label = [C.TypeCast(mstype.int32)] | |||
| data_set = data_set.map(input_columns="image", num_parallel_workers=num_parallel_workers, | |||
| operations=transform_img) | |||
| data_set = data_set.map(input_columns="label", num_parallel_workers=num_parallel_workers, | |||
| operations=transform_label) | |||
| num_parallel_workers2 = int(16 / device_num) | |||
| data_set = data_set.batch(batch_size, num_parallel_workers=num_parallel_workers2, drop_remainder=True) | |||
| data_set = data_set.batch(batch_size, drop_remainder=True) | |||
| # apply dataset repeat operation | |||
| data_set = data_set.repeat(repeat_num) | |||
| if repeat_num > 1: | |||
| data_set = data_set.repeat(repeat_num) | |||
| return data_set | |||