Browse Source

!8378 [MD][GPU] minddata device_que push optimizer

From: @xiefangqi
Reviewed-by: 
Signed-off-by:
tags/v1.1.0
mindspore-ci-bot Gitee 5 years ago
parent
commit
bccf04d36c
16 changed files with 385 additions and 144 deletions
  1. +28
    -4
      mindspore/ccsrc/minddata/dataset/CMakeLists.txt
  2. +1
    -0
      mindspore/ccsrc/minddata/dataset/api/python/bindings/dataset/core/bindings.cc
  3. +2
    -0
      mindspore/ccsrc/minddata/dataset/core/config_manager.cc
  4. +15
    -0
      mindspore/ccsrc/minddata/dataset/core/config_manager.h
  5. +1
    -19
      mindspore/ccsrc/minddata/dataset/engine/cache/CMakeLists.txt
  6. +164
    -107
      mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc
  7. +16
    -6
      mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.h
  8. +29
    -0
      mindspore/ccsrc/minddata/dataset/engine/execution_tree.cc
  9. +6
    -0
      mindspore/ccsrc/minddata/dataset/engine/execution_tree.h
  10. +84
    -0
      mindspore/ccsrc/minddata/dataset/engine/gpu_item_connector.h
  11. +2
    -2
      mindspore/ccsrc/runtime/device/gpu/blocking_queue.cc
  12. +4
    -3
      mindspore/ccsrc/runtime/device/gpu/blocking_queue.h
  13. +1
    -1
      mindspore/ccsrc/runtime/device/gpu/gpu_buffer_mgr.cc
  14. +1
    -1
      mindspore/ccsrc/runtime/device/gpu/gpu_buffer_mgr.h
  15. +22
    -0
      mindspore/dataset/core/config.py
  16. +9
    -1
      mindspore/dataset/engine/datasets.py

+ 28
- 4
mindspore/ccsrc/minddata/dataset/CMakeLists.txt View File

@@ -31,6 +31,26 @@ if (MS_BUILD_GRPC)
message(STATUS "Cache is enabled")
endif()

if (${CMAKE_SYSTEM_NAME} MATCHES "Linux")
# Try to find numa header file and its library
FIND_PATH(NUMA_INCLUDE_DIR numa.h)
MESSAGE("Numa include dir is: ${NUMA_INCLUDE_DIR}")

FIND_LIBRARY(NUMA_LIBRARY NAMES libnuma.so)
MESSAGE("Numa library is: ${NUMA_LIBRARY}")

FIND_PACKAGE_HANDLE_STANDARD_ARGS(NUMA DEFAULT_MSG
NUMA_INCLUDE_DIR
NUMA_LIBRARY
)
if (NUMA_FOUND)
ADD_DEFINITIONS(-DNUMA_ENABLED)
MESSAGE("Numa package found")
else()
MESSAGE(FATAL_ERROR "Numa package not found, try 'sudo yum install numactl-devel' or 'sudo apt-get install libnuma-dev'")
endif()
endif ()

# conde coverage
# option(ENABLE_COVERAGE "Enable code coverage report" OFF)
# if (ENABLE_COVERAGE)
@@ -180,14 +200,18 @@ else ()
target_link_libraries(_c_dataengine PRIVATE -ldl ${SECUREC_LIBRARY})
endif ()
target_link_libraries(_c_dataengine PUBLIC mindspore::sentencepiece)
endif ()
if (NUMA_FOUND)
target_link_libraries(_c_dataengine PUBLIC numa)
endif()
endif()

target_link_libraries(_c_dataengine PUBLIC mindspore::jpeg_turbo mindspore::turbojpeg mindspore::opencv_core mindspore::opencv_imgcodecs
mindspore::opencv_imgproc mindspore::tinyxml2 mindspore::sentencepiece_train ${ICU_LIB})
if (ENABLE_GPUQUE)
target_link_libraries(_c_dataengine PRIVATE gpu_queue
${CUDNN_LIBRARY_PATH}
${CUDA_PATH}/lib64/libcudart.so
${CUDA_PATH}/lib64/stubs/libcuda.so)
${CUDNN_LIBRARY_PATH}
${CUDA_PATH}/lib64/libcudart.so
${CUDA_PATH}/lib64/stubs/libcuda.so)
endif ()

if (ENABLE_TDTQUE)


+ 1
- 0
mindspore/ccsrc/minddata/dataset/api/python/bindings/dataset/core/bindings.cc View File

@@ -42,6 +42,7 @@ PYBIND_REGISTER(ConfigManager, 0, ([](const py::module *m) {
.def("get_op_connector_size", &ConfigManager::op_connector_size)
.def("get_rows_per_buffer", &ConfigManager::rows_per_buffer)
.def("get_seed", &ConfigManager::seed)
.def("set_rank_id", &ConfigManager::set_rank_id)
.def("get_worker_connector_size", &ConfigManager::worker_connector_size)
.def("set_auto_num_workers", &ConfigManager::set_auto_num_workers)
.def("set_auto_worker_config", &ConfigManager::set_auto_worker_config_)


+ 2
- 0
mindspore/ccsrc/minddata/dataset/core/config_manager.cc View File

@@ -128,6 +128,8 @@ void ConfigManager::set_op_connector_size(int32_t connector_size) { op_connector

uint32_t ConfigManager::seed() const { return seed_; }

void ConfigManager::set_rank_id(uint32_t rank_id) { rank_id_ = rank_id; }

void ConfigManager::set_seed(uint32_t seed) { seed_ = seed; }

void ConfigManager::set_monitor_sampling_interval(uint32_t interval) { monitor_sampling_interval_ = interval; }


+ 15
- 0
mindspore/ccsrc/minddata/dataset/core/config_manager.h View File

@@ -143,6 +143,17 @@ class ConfigManager {
/// \param prefetch_size
void set_prefetch_size(int32_t prefetch_size);

// getter function
// This rank_id is for numa and device_queue, one process work with only one rank_id
// for standalone scenario, this rank_id may come from env 'CUDA_VISIBLE_DEVICES',
// but for distribute scenario, this rank_id come from _get_global_rank() in python
// @return Get the current device id, for one process, it's only with one rank_id.
uint32_t rank_id() const { return rank_id_; }

// setter function
// @param rank_id - Set the current device id
void set_rank_id(uint32_t rank_id);

uint32_t seed() const;

// setter function
@@ -196,6 +207,10 @@ class ConfigManager {
int32_t num_parallel_workers_;
int32_t worker_connector_size_;
int32_t op_connector_size_;
// This rank_id is for numa and device_queue, one process work with only one rank_id,
// for standalone scenario, this rank_id may come from env 'CUDA_VISIBLE_DEVICES',
// but for distribute scenario, this rank_id come from _get_global_rank() in python
uint32_t rank_id_;
uint32_t seed_;
uint32_t monitor_sampling_interval_;
uint32_t callback_timout_;


+ 1
- 19
mindspore/ccsrc/minddata/dataset/engine/cache/CMakeLists.txt View File

@@ -6,26 +6,8 @@ ms_build_flatbuffers("de_tensor.fbs" ${CMAKE_CURRENT_SOURCE_DIR} generated_engin
file(GLOB_RECURSE _CURRENT_SRC_FILES RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "*.cc")
set_property(SOURCE ${_CURRENT_SRC_FILES} PROPERTY COMPILE_DEFINITIONS SUBMODULE_ID=mindspore::SubModuleId::SM_MD)

if (${CMAKE_SYSTEM_NAME} MATCHES "Linux")
if (NUMA_FOUND)
ADD_DEFINITIONS(-DCACHE_LOCAL_CLIENT)

# Try to find numa header file and its library
FIND_PATH( NUMA_INCLUDE_DIR numa.h )
MESSAGE( "Numa include dir is: ${NUMA_INCLUDE_DIR}" )

FIND_LIBRARY( NUMA_LIBRARY NAMES libnuma.so )
MESSAGE( "Numa library is: ${NUMA_LIBRARY}" )

FIND_PACKAGE_HANDLE_STANDARD_ARGS( NUMA DEFAULT_MSG
NUMA_INCLUDE_DIR
NUMA_LIBRARY
)
if ( NUMA_FOUND )
ADD_DEFINITIONS(-DNUMA_ENABLED)
MESSAGE("Numa package found")
else()
MESSAGE(FATAL_ERROR "Numa package not found, try 'sudo yum install numactl-devel' or 'sudo apt-get install libnuma-dev'")
endif()
endif ()

add_library(engine-cache-client OBJECT


+ 164
- 107
mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc View File

@@ -19,6 +19,7 @@
#include <iomanip>
#include <iostream>
#include <memory>
#include <utility>
#include "minddata/dataset/core/config_manager.h"
#include "minddata/dataset/core/global_context.h"
#include "minddata/dataset/engine/data_buffer.h"
@@ -43,6 +44,16 @@ DeviceQueueOp::DeviceQueueOp(std::string channel_name, DeviceType device_type, i
stop_send_(false),
total_batch_(total_batch),
create_data_info_queue_(create_data_info_queue) {
#ifdef ENABLE_GPUQUE
std::shared_ptr<ConfigManager> cfg = GlobalContext::config_manager();
rank_id_ = cfg->rank_id(); // Get the current rank_id
// Be careful when try to modified these num_workers_ and queue_capacity_,
// and we suggest num_workers_ * queue_capacity_ not greater than 16, because
// one worker one circular_pool with 1G pin memory, so num_workers_ * queue_capacity_
// must limit to avoid memory overload
num_workers_ = 2;
queue_capacity_ = 8;
#endif
#ifdef ENABLE_TDTQUE
ascend_keep_waiting_ = true;
#endif
@@ -51,9 +62,9 @@ DeviceQueueOp::DeviceQueueOp(std::string channel_name, DeviceType device_type, i
DeviceQueueOp::~DeviceQueueOp() {}

#ifdef ENABLE_GPUQUE
void DeviceQueueOp::ReleaseData(void *addr) {
void DeviceQueueOp::ReleaseData(void *addr, int32_t worker_id) {
if (addr != nullptr) {
pool_->Deallocate(addr);
pool_[worker_id]->Deallocate(addr);
}
}
#endif
@@ -96,7 +107,6 @@ Status DeviceQueueOp::operator()() {
#endif
} else if (device_type_ == DeviceType::GPU) {
#ifdef ENABLE_GPUQUE
RETURN_IF_NOT_OK(CircularPool::CreateCircularPool(&pool_, -1, 1024, false, true));
RETURN_IF_NOT_OK(SendDataToGPU());
#endif
} else if (device_type_ == DeviceType::CPU) {
@@ -226,17 +236,38 @@ Status DeviceQueueOp::GetDataInfo(DATA_INFO *data_info) {
#endif

#ifdef ENABLE_GPUQUE
Status DeviceQueueOp::SendDataToGPU() {
MS_LOG(INFO) << "Device queue, sending data to GPU.";
int64_t send_batch = 0;
bool is_break_loop = false;
bool is_open = false;
uint32_t handle = INVALID_HANDLE;
auto release_function = std::bind(&DeviceQueueOp::ReleaseData, this, std::placeholders::_1);
double batch_start_time, end_time;
int32_t batch_cost, push_cost;
Status DeviceQueueOp::LaunchParallelCopyThread() {
// Without cudaSetDevice cuda memory will allocate on GPU:0 as default
// and will overload in distribute scenario, so don't remove this line
cudaSetDevice(rank_id_);
// CircularPool may not safe under multi-threads scenario, so one worker with one pool
for (int i = 0; i < num_workers_; i++) {
std::shared_ptr<MemoryPool> pool;
RETURN_IF_NOT_OK(CircularPool::CreateCircularPool(&pool, -1, 1024, false, true));
pool_.push_back(pool);
}
gpu_item_connector_ = std::make_unique<GpuItemConnector>(num_workers_, 1, queue_capacity_);
receive_queues_.Init(num_workers_, queue_capacity_);
RETURN_IF_NOT_OK(receive_queues_.Register(tree_->AllTasks()));
RETURN_IF_NOT_OK(
tree_->LaunchWorkers(num_workers_, std::bind(&DeviceQueueOp::WorkerEntry, this, std::placeholders::_1)));
RETURN_IF_NOT_OK(
tree_->AllTasks()->CreateAsyncTask("Push data to GPU queue", std::bind(&DeviceQueueOp::PushDataToGPU, this)));

return Status::OK();
}

Status DeviceQueueOp::PushDataToGPU() {
// Without cudaSetDevice cuda memory will allocate on GPU:0 as default
// and will overload in distribute scenario, so don't remove this line
cudaSetDevice(rank_id_);
TaskManager::FindMe()->Post();
double batch_start_time = 0.0;
double end_time = 0.0;
int32_t batch_cost = 0;
int32_t push_cost = 0;
int32_t connector_size = 0;
int32_t connector_capacity;
int32_t connector_capacity = 0;
std::shared_ptr<DeviceQueueTracing> profiling_node;
bool isProfilingEnable = tree_->GetProfilingManager()->IsProfilingEnable();
if (isProfilingEnable) {
@@ -244,135 +275,161 @@ Status DeviceQueueOp::SendDataToGPU() {
RETURN_IF_NOT_OK(tree_->GetProfilingManager()->GetTracingNode(kDeviceQueueTracingName, &node));
profiling_node = std::dynamic_pointer_cast<DeviceQueueTracing>(node);
batch_start_time = ProfilingTime::GetCurMilliSecond();
connector_capacity = ChildOpConnectorCapacity();
connector_capacity = gpu_item_connector_->capacity();
}
std::vector<device::DataItemGpu> items;
RETURN_IF_NOT_OK(gpu_item_connector_->Pop(0, &items));
bool is_open = false;
uint32_t handle = INVALID_HANDLE;
int64_t send_batch = 0;
bool ps_data_prefetch = false;
auto release_function = std::bind(&DeviceQueueOp::ReleaseData, this, std::placeholders::_1, std::placeholders::_2);
while (!items.empty() && !GpuBufferMgr::GetInstance().IsClosed()) {
if (!is_open) {
std::vector<size_t> data_size;
for (int32_t index = 0; index < items.size(); index++) {
data_size.push_back(items[index].data_len_);
}
handle = GpuBufferMgr::GetInstance().Open(0, channel_name_, data_size, release_function);
if (handle == INVALID_HANDLE) {
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "Failed to open channel for sending data.");
}
is_open = true;
}

std::unique_ptr<DataBuffer> current_buffer;
RETURN_IF_NOT_OK(GetNextInput(&current_buffer));

while (!current_buffer->eof() && !is_break_loop && !GpuBufferMgr::GetInstance().IsClosed()) {
while (!current_buffer->eoe() && !is_break_loop && !GpuBufferMgr::GetInstance().IsClosed()) {
RETURN_IF_NOT_OK(CheckExceptions(current_buffer));
TensorRow curr_row; // batch data
for (int row_id = 0;
row_id < current_buffer->NumRows() && !is_break_loop && !GpuBufferMgr::GetInstance().IsClosed(); row_id++) {
RETURN_IF_NOT_OK(current_buffer->GetRow(row_id, &curr_row));

std::vector<size_t> data_size;
for (int i = 0; i < curr_row.size(); i++) {
data_size.push_back(static_cast<size_t>(curr_row[i]->SizeInBytes()));
}
if (!is_open) {
handle = GpuBufferMgr::GetInstance().Open(0, channel_name_, data_size, release_function);
if (handle == INVALID_HANDLE) {
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "Failed to open channel for sending data.");
// Data prefetch only when PS mode enables cache.
if ((!ps_data_prefetch) && (items.size() > 0)) {
ps::PsDataPrefetch::GetInstance().PrefetchData(channel_name_, items[0].data_ptr_, items[0].data_len_);
ps_data_prefetch = true;
}
while (!GpuBufferMgr::GetInstance().IsClosed() && !TaskManager::FindMe()->Interrupted()) {
BlockQueueStatus_T ret = GpuBufferMgr::GetInstance().Push(handle, items, WAIT_TIME);
if (ret) {
if (ret == BlockQueueStatus_T::ERROR_INPUT) {
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "Invalid input data, please check it.");
} else {
if (!stop_send_) {
MS_LOG(DEBUG) << "Retry pushing data...";
continue;
}
is_open = true;
}
RETURN_IF_NOT_OK(RetryPushGPUData(data_size, curr_row, handle, isProfilingEnable, &push_cost));
send_batch++;
if (isProfilingEnable) {
end_time = ProfilingTime::GetCurMilliSecond();
// record push data time
profiling_node->Record(TIME, TDT_PUSH_TIME, send_batch, push_cost);
batch_cost = (int32_t)(end_time - batch_start_time);
// record batch time
profiling_node->Record(TIME, BATCH_TIME, send_batch, batch_cost);
// record pipeline time
profiling_node->Record(TIME, PIPELINE_TIME, send_batch, batch_cost - push_cost);
batch_start_time = end_time;
// record connector depth
profiling_node->Record(CONNECTOR_DEPTH, connector_capacity, send_batch, connector_size);
}
if (total_batch_ > 0 && send_batch >= total_batch_) {
is_break_loop = true;
break;
}
}
if (!TaskManager::FindMe()->Interrupted() && !GpuBufferMgr::GetInstance().IsClosed()) {
if (isProfilingEnable) {
connector_size = ChildOpConnectorSize();
connector_capacity = ChildOpConnectorCapacity();
}
RETURN_IF_NOT_OK(GetNextInput(&current_buffer));
} else {
is_break_loop = true;
break;
}
}
send_batch++;
if (isProfilingEnable) {
end_time = ProfilingTime::GetCurMilliSecond();
// record push data time
profiling_node->Record(TIME, TDT_PUSH_TIME, send_batch, push_cost);
batch_cost = (int32_t)(end_time - batch_start_time);
// record batch time
profiling_node->Record(TIME, BATCH_TIME, send_batch, batch_cost);
// record pipeline time
profiling_node->Record(TIME, PIPELINE_TIME, send_batch, batch_cost - push_cost);
batch_start_time = end_time;
// record connector depth
profiling_node->Record(CONNECTOR_DEPTH, connector_capacity, send_batch, connector_size);
connector_size = gpu_item_connector_->size();
connector_capacity = gpu_item_connector_->capacity();
}
if (total_batch_ > 0 && send_batch >= total_batch_) {
break;
}
if (!TaskManager::FindMe()->Interrupted() && !GpuBufferMgr::GetInstance().IsClosed()) {
if (isProfilingEnable) {
connector_size = ChildOpConnectorSize();
connector_capacity = ChildOpConnectorCapacity();
}
RETURN_IF_NOT_OK(GetNextInput(&current_buffer));
RETURN_IF_NOT_OK(gpu_item_connector_->Pop(0, &items));
} else {
is_break_loop = true;
break;
}
}

tree_->SetFinished();
MS_LOG(INFO) << "Device queue total batch is " << send_batch << ".";
MS_LOG(INFO) << "Device queue send " << send_batch << " batch.";

GpuBufferMgr::GetInstance().Close(handle);
GpuBufferMgr::GetInstance().CloseConfirm();
return Status::OK();
}

Status DeviceQueueOp::RetryPushGPUData(const std::vector<size_t> &data_size, const TensorRow &curr_row, uint32_t handle,
bool profiling, int32_t *push_time) {
std::vector<device::DataItemGpu> items;
double start_time;
bool ps_data_prefetch = false;
for (int i = 0; i < data_size.size(); i++) {
device::DataItemGpu data_item;
data_item.data_len_ = data_size[i];
data_item.data_ptr_ = nullptr;
items.push_back(data_item);
// WorkEntry of DeviceQueueOp just do multi_threads memcpy for performance optimization.
Status DeviceQueueOp::WorkerEntry(int32_t worker_id) {
// Without cudaSetDevice cuda memory will allocate on GPU:0 as default
// and will overload in distribute scenario, so don't remove this line
cudaSetDevice(rank_id_);
TaskManager::FindMe()->Post();
std::unique_ptr<DataBuffer> current_buffer;
uint32_t batch_num = 0;
RETURN_IF_NOT_OK(receive_queues_[worker_id]->PopFront(&current_buffer));
while (!current_buffer->quit() && !GpuBufferMgr::GetInstance().IsClosed()) {
TensorRow curr_row;
for (int row_id = 0; row_id < current_buffer->NumRows() && !GpuBufferMgr::GetInstance().IsClosed(); row_id++) {
RETURN_IF_NOT_OK(current_buffer->GetRow(row_id, &curr_row));
std::vector<device::DataItemGpu> items;
for (int i = 0; i < curr_row.size(); i++) {
device::DataItemGpu data_item;
data_item.data_len_ = static_cast<size_t>(curr_row[i]->SizeInBytes());
data_item.data_ptr_ = nullptr;
data_item.worker_id_ = worker_id;
items.push_back(data_item);
}
RETURN_IF_NOT_OK(MallocForGPUData(&items, curr_row, worker_id));
RETURN_IF_NOT_OK(gpu_item_connector_->Add(worker_id, std::move(items)));
batch_num++;
}

RETURN_IF_NOT_OK(receive_queues_[worker_id]->PopFront(&current_buffer));
}

while (!GpuBufferMgr::GetInstance().IsClosed() && !TaskManager::FindMe()->Interrupted()) {
RETURN_IF_NOT_OK(MallocForGPUData(&items, curr_row));
if (profiling) {
start_time = ProfilingTime::GetCurMilliSecond();
}
// Data prefetch only when PS mode enables cache.
if ((!ps_data_prefetch) && (items.size() > 0)) {
ps::PsDataPrefetch::GetInstance().PrefetchData(channel_name_, items[0].data_ptr_, items[0].data_len_);
ps_data_prefetch = true;
}
BlockQueueStatus_T ret = GpuBufferMgr::GetInstance().Push(handle, items, WAIT_TIME);
if (profiling) {
double end_time = ProfilingTime::GetCurMilliSecond();
*push_time = (int32_t)(end_time - start_time);
}
if (ret) {
for (int i = 0; i < items.size(); i++) {
ReleaseData(items[i].data_ptr_);
}
if (ret == BlockQueueStatus_T::ERROR_INPUT) {
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "Invalid input data, please check it.");
MS_LOG(INFO) << "Device queue worker id " << worker_id << "proc " << batch_num << "batch.";
// Add empty vector as quit flag.
std::vector<device::DataItemGpu> items;
RETURN_IF_NOT_OK(gpu_item_connector_->Add(worker_id, std::move(items)));
return Status::OK();
}

Status DeviceQueueOp::SendDataToGPU() {
RETURN_IF_NOT_OK(LaunchParallelCopyThread());
MS_LOG(INFO) << "Device queue, sending data to GPU.";
std::unique_ptr<DataBuffer> current_buffer;
RETURN_IF_NOT_OK(GetNextInput(&current_buffer));
int64_t num_buf = 0;
bool is_break_loop = false;
while (!current_buffer->eof() && !is_break_loop && !GpuBufferMgr::GetInstance().IsClosed()) {
while (!current_buffer->eoe() && !is_break_loop && !GpuBufferMgr::GetInstance().IsClosed()) {
RETURN_IF_NOT_OK(CheckExceptions(current_buffer));
RETURN_IF_NOT_OK(receive_queues_[num_buf++ % num_workers_]->Add(std::move(current_buffer)));
if (!TaskManager::FindMe()->Interrupted() && !GpuBufferMgr::GetInstance().IsClosed()) {
RETURN_IF_NOT_OK(GetNextInput(&current_buffer));
} else {
if (!stop_send_) {
MS_LOG(DEBUG) << "Retry pushing data...";
continue;
}
break;
is_break_loop = true;
}
}

if (!TaskManager::FindMe()->Interrupted() && !GpuBufferMgr::GetInstance().IsClosed()) {
RETURN_IF_NOT_OK(GetNextInput(&current_buffer));
} else {
break;
is_break_loop = true;
}
}

for (uint32_t index = 0; index < num_workers_; index++) {
auto quit = std::make_unique<DataBuffer>(0, DataBuffer::kDeBFlagQuit);
RETURN_IF_NOT_OK(receive_queues_[num_buf++ % num_workers_]->Add(std::move(quit)));
}

MS_LOG(INFO) << "Device queue receive " << num_buf - num_workers_ << " batch.";
return Status::OK();
}

Status DeviceQueueOp::MallocForGPUData(std::vector<device::DataItemGpu> *items, const TensorRow &curr_row) {
Status DeviceQueueOp::MallocForGPUData(std::vector<device::DataItemGpu> *items, const TensorRow &curr_row,
const int32_t &worker_id) {
int i = 0;
for (auto &sub_item : *items) {
RETURN_IF_NOT_OK(pool_->Allocate(sub_item.data_len_, &sub_item.data_ptr_));
RETURN_IF_NOT_OK(pool_[worker_id]->Allocate(sub_item.data_len_, &sub_item.data_ptr_));
if (sub_item.data_ptr_ == nullptr) {
return Status(StatusCode::kOutOfMemory, __LINE__, __FILE__, "Memory malloc failed.");
}
(void)memset_s(sub_item.data_ptr_, sub_item.data_len_, 0, sub_item.data_len_);
const unsigned char *column_data = curr_row[i]->GetBuffer();
if (memcpy_s(sub_item.data_ptr_, sub_item.data_len_, column_data,
static_cast<uint32_t>(curr_row[i++]->SizeInBytes())) != 0) {


+ 16
- 6
mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.h View File

@@ -31,6 +31,7 @@
#endif

#ifdef ENABLE_GPUQUE
#include "minddata/dataset/engine/gpu_item_connector.h"
#include "minddata/dataset/util/circular_pool.h"
#include "runtime/device/gpu/gpu_buffer_mgr.h"
#include "ps/ps_cache/ps_data/ps_data_prefetch.h"
@@ -189,12 +190,21 @@ class DeviceQueueOp : public PipelineOp {

#ifdef ENABLE_GPUQUE
Status SendDataToGPU();
Status RetryPushGPUData(const std::vector<size_t> &data_size, const TensorRow &curr_row, uint32_t handle,
bool profiling, int32_t *push_time);
Status MallocForGPUData(std::vector<device::DataItemGpu> *items, const TensorRow &curr_row);
void ReleaseData(void *addr);

std::shared_ptr<MemoryPool> pool_;
Status MallocForGPUData(std::vector<device::DataItemGpu> *items, const TensorRow &curr_row, const int32_t &worker_id);
void ReleaseData(void *addr, int32_t worker_id);
Status LaunchParallelCopyThread();
Status PushDataToGPU();
Status WorkerEntry(int32_t worker_id);

QueueList<std::unique_ptr<DataBuffer>> receive_queues_;
std::vector<std::shared_ptr<MemoryPool>> pool_;
std::unique_ptr<GpuItemConnector> gpu_item_connector_;
uint32_t num_workers_;
uint32_t queue_capacity_;
// This rank_id is for device_queue, one process work with only one rank_id,
// for standalone scenario, this rank_id may come from env 'CUDA_VISIBLE_DEVICES',
// but for distribute scenario, this rank_id come from _get_global_rank() in python
uint32_t rank_id_;
#endif

Status SendDataToCPU();


+ 29
- 0
mindspore/ccsrc/minddata/dataset/engine/execution_tree.cc View File

@@ -17,6 +17,10 @@
#include <iostream>
#include <string>
#include <utility>
#include <limits>
#if defined(NUMA_ENABLED) && defined(ENABLE_GPUQUE)
#include <numa.h>
#endif
#include "minddata/dataset/engine/datasetops/dataset_op.h"
#include "minddata/dataset/engine/datasetops/shuffle_op.h"
#include "minddata/dataset/engine/datasetops/device_queue_op.h"
@@ -42,6 +46,10 @@ ExecutionTree::ExecutionTree() : id_count_(0), pre_pass_override_(nullptr) {
prepare_flags_ = kDePrepNone;
profiling_manager_ = std::make_unique<ProfilingManager>(this);
optimize_ = common::GetEnv("OPTIMIZE") == "true" ? true : false;
#if defined(NUMA_ENABLED) && defined(ENABLE_GPUQUE)
std::shared_ptr<ConfigManager> cfg = GlobalContext::config_manager();
rank_id_ = cfg->rank_id();
#endif
}

// Destructor
@@ -137,6 +145,27 @@ Status ExecutionTree::Launch() {
// opencv limit too many threads
#ifndef ENABLE_ANDROID
#if !defined(_WIN32) && !defined(_WIN64) && !defined(__APPLE__)
#if defined(NUMA_ENABLED) && defined(ENABLE_GPUQUE)
// Here we do numa bind for performance optimization, as our test result,
// if we do numa bind when get_dataset_size launch a tree, we'll get a
// better performance than only we do numa bind at the time _To_Device
// launch a tree. Our numa bind work is a process level bind, bind with
// both cpu and memory and we choose numa_node with a polling logic:
// numa_bind_id = rank_id_ % (numa_max_node() + 1)
// Now we only test pass in GPU scenario, we've not tested D scenario,
// without enough test we don't suggest numa feature open in D scenario
int numa_node_max_id = numa_max_node();
if (numa_node_max_id >= 0 && rank_id_ >= 0) {
uint32_t numa_bind_id = static_cast<uint32_t>(rank_id_ % (numa_node_max_id + 1));
auto bm = numa_allocate_nodemask();
numa_bitmask_clearall(bm);
numa_bitmask_setbit(bm, numa_bind_id);
numa_bind(bm);
numa_bitmask_free(bm);
} else {
RETURN_STATUS_UNEXPECTED("Get numa max node failed.");
}
#endif
int32_t thread_num = get_nprocs();
if (thread_num == 0) {
std::string err_msg = "Invalid thread number.";


+ 6
- 0
mindspore/ccsrc/minddata/dataset/engine/execution_tree.h View File

@@ -282,6 +282,12 @@ class ExecutionTree {
bool optimize_; // Flag to enable optional optimizations
std::function<OptPass(OptPass)> pre_pass_override_; // function ptr that overrides pre pass, called in PrePrepare()
bool partially_prepare_; // Temp: during migration to IR, if true, run remaining passes.
#if defined(NUMA_ENABLED) && defined(ENABLE_GPUQUE)
// This rank_id is for numa and device_queue, one process work with only one rank_id,
// for standalone scenario, this rank_id may come from env 'CUDA_VISIBLE_DEVICES',
// but for distribute scenario, this rank_id come from _get_global_rank() in python
uint32_t rank_id_;
#endif
};
} // namespace dataset
} // namespace mindspore


+ 84
- 0
mindspore/ccsrc/minddata/dataset/engine/gpu_item_connector.h View File

@@ -0,0 +1,84 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_GPU_ITEM_CONNECTOR_H_
#define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_GPU_ITEM_CONNECTOR_H_

#ifdef ENABLE_GPUQUE
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "minddata/dataset/engine/connector.h"
#include "minddata/dataset/util/status.h"
#include "minddata/dataset/core/constants.h"
#include "runtime/device/gpu/blocking_queue.h"

using mindspore::device::DataItemGpu;

namespace mindspore {
namespace dataset {
class GpuItemConnector : public Connector<std::vector<device::DataItemGpu>> {
public:
GpuItemConnector(int32_t num_producers, int32_t num_consumers, int32_t queue_capacity)
: Connector<std::vector<device::DataItemGpu>>(num_producers, num_consumers, queue_capacity) {
for (int i = 0; i < num_producers; i++) {
is_queue_finished_.push_back(false);
}
}

~GpuItemConnector() = default;

Status Add(int32_t worker_d, std::vector<device::DataItemGpu> &&element) noexcept {
return Connector<std::vector<device::DataItemGpu>>::Push(worker_d, std::move(element));
}

Status Pop(int32_t worker_id, std::vector<device::DataItemGpu> *result) noexcept override {
{
MS_ASSERT(worker_id < num_consumers_);
std::unique_lock<std::mutex> lock(m_);
RETURN_IF_NOT_OK(cv_.Wait(&lock, [this, worker_id]() { return expect_consumer_ == worker_id; }));
if (is_queue_finished_[pop_from_]) {
std::string errMsg = "ERROR: popping from a finished queue in GpuItemConnector";
RETURN_STATUS_UNEXPECTED(errMsg);
}

RETURN_IF_NOT_OK(queues_[pop_from_]->PopFront(result));
if ((*result).empty()) {
is_queue_finished_[pop_from_] = true;
}

for (int offset = 1; offset <= num_producers_; offset++) {
int32_t nextQueueIndex = (pop_from_ + offset) % num_producers_;
if (is_queue_finished_[nextQueueIndex] == false) {
pop_from_ = nextQueueIndex;
break;
}
}

expect_consumer_ = (expect_consumer_ + 1) % num_consumers_;
}

cv_.NotifyAll();
return Status::OK();
}

private:
std::vector<bool> is_queue_finished_;
};
} // namespace dataset
} // namespace mindspore
#endif // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_GPU_ITEM_CONNECTOR_H_
#endif // ENABLE_GPUQUE

+ 2
- 2
mindspore/ccsrc/runtime/device/gpu/blocking_queue.cc View File

@@ -72,7 +72,7 @@ BlockQueueStatus_T GpuQueue::Front(void **addr, size_t *len) const {
*len = len_;

for (auto item : node_info_[head_].data_) {
host_release_(item.data_ptr_);
host_release_(item.data_ptr_, item.worker_id_);
}
return SUCCESS;
}
@@ -105,7 +105,7 @@ BlockQueueStatus_T BlockingQueue::Create(void *addr, const std::vector<size_t> &
return SUCCESS;
}

void BlockingQueue::RegisterRelease(const std::function<void(void *)> &func) { queue_->RegisterRelease(func); }
void BlockingQueue::RegisterRelease(const std::function<void(void *, int32_t)> &func) { queue_->RegisterRelease(func); }

BlockQueueStatus_T BlockingQueue::Push(const std::vector<DataItemGpu> &data, unsigned int) {
std::unique_lock<std::mutex> locker(mutex_);


+ 4
- 3
mindspore/ccsrc/runtime/device/gpu/blocking_queue.h View File

@@ -33,6 +33,7 @@ namespace device {
enum BlockQueueStatus_T : int { SUCCESS = 0, QUEUE_NOT_EXIST, HANDLE_NOT_EXIST, ERROR_INPUT, INTERNAL_ERROR, TIMEOUT };

struct DataItemGpu {
int32_t worker_id_;
size_t data_len_;
void *data_ptr_;
};
@@ -42,7 +43,7 @@ class GpuQueue {
GpuQueue(void *addr, const std::vector<size_t> &shape, const size_t &capacity);
virtual ~GpuQueue();

void RegisterRelease(const std::function<void(void *)> &func) { host_release_ = func; }
void RegisterRelease(const std::function<void(void *, int32_t)> &func) { host_release_ = func; }

inline bool IsEmpty() const { return size_ == 0; }
inline bool IsFull() const { return size_ == capacity_; }
@@ -69,7 +70,7 @@ class GpuQueue {
size_t capacity_;
cudaStream_t stream_;
std::unique_ptr<NodeInfo[]> node_info_;
std::function<void(void *)> host_release_;
std::function<void(void *, int32_t)> host_release_;

GpuQueue(const GpuQueue &) = delete;
GpuQueue &operator=(const GpuQueue &) = delete;
@@ -81,7 +82,7 @@ class BlockingQueue {
~BlockingQueue() = default;

BlockQueueStatus_T Create(void *addr, const std::vector<size_t> &shape, const size_t &capacity);
void RegisterRelease(const std::function<void(void *)> &func);
void RegisterRelease(const std::function<void(void *, int32_t)> &func);
BlockQueueStatus_T Push(const std::vector<DataItemGpu> &data, unsigned int timeout_in_sec);
BlockQueueStatus_T Front(void **ptr, size_t *len);
BlockQueueStatus_T Pop();


+ 1
- 1
mindspore/ccsrc/runtime/device/gpu/gpu_buffer_mgr.cc View File

@@ -66,7 +66,7 @@ BlockQueueStatus_T GpuBufferMgr::Create(unsigned int device_id, const std::strin
}

unsigned int GpuBufferMgr::Open(unsigned int device_id, const std::string &channel_name,
const std::vector<size_t> &shape, const std::function<void(void *)> func) {
const std::vector<size_t> &shape, const std::function<void(void *, int32_t)> func) {
set_device();
std::string name = std::to_string(device_id) + std::string("_") + channel_name;
if (!name_queue_map_.count(name)) {


+ 1
- 1
mindspore/ccsrc/runtime/device/gpu/gpu_buffer_mgr.h View File

@@ -85,7 +85,7 @@ class GpuBufferMgr {

// call for Push thread
EXPORT unsigned int Open(unsigned int device_id, const std::string &channel_name, const std::vector<size_t> &shape,
std::function<void(void *)> func);
std::function<void(void *, int32_t)> func);

// call for Front/Pop thread
EXPORT unsigned int Open(unsigned int device_id, const std::string &channel_name, const std::vector<size_t> &shape);


+ 22
- 0
mindspore/dataset/core/config.py View File

@@ -16,6 +16,7 @@
The configuration module provides various functions to set and get the supported
configuration parameters, and read a configuration file.
"""
import os
import random
import numpy
import mindspore._c_dataengine as cde
@@ -29,6 +30,27 @@ UINT32_MAX = 4294967295

_config = cde.GlobalContext.config_manager()

def _init_device_info():
"""
INTERNAL USE ONLY!
As rank_id need to pass into deep layer for numa and device_queue.
One process work with only one rank_id, In standalone scenario,
rank_id may come from env 'CUDA_VISIBLE_DEVICES', For distribute
scenario, rank_id come from _get_global_rank()
"""
from mindspore import context
from mindspore.parallel._auto_parallel_context import auto_parallel_context
from mindspore.parallel._utils import _get_global_rank
if context.get_context("device_target") == "GPU":
rank_id = _get_global_rank()
parallel_mode = auto_parallel_context().get_parallel_mode()
if parallel_mode == "stand_alone":
cuda_device_info = os.getenv("CUDA_VISIBLE_DEVICES")
if cuda_device_info:
cuda_id = int(cuda_device_info.split(",")[0].strip())
if cuda_id != rank_id:
rank_id = cuda_id
_config.set_rank_id(rank_id)

def set_seed(seed):
"""


+ 9
- 1
mindspore/dataset/engine/datasets.py View File

@@ -52,7 +52,7 @@ from .validators import check_batch, check_shuffle, check_map, check_filter, che
check_generatordataset, check_sync_wait, check_zip_dataset, check_add_column, check_textfiledataset, check_concat, \
check_random_dataset, check_split, check_bucket_batch_by_length, check_cluedataset, check_save, check_csvdataset, \
check_paddeddataset, check_tuple_iterator, check_dict_iterator, check_schema, check_to_device_send, replace_none
from ..core.config import get_callback_timeout
from ..core.config import get_callback_timeout, _init_device_info
from ..core.datatypes import mstype_to_detype, mstypelist_to_detypelist

try:
@@ -141,11 +141,19 @@ class Dataset:
self._sync = False

def create_ir_tree(self):
"""
Internal method to create an IR tree.

Returns:
ir_tree, The onject of the IR tree.
dataset, the root dataset of the IR tree.
"""
parent = self.parent
self.parent = []
dataset = copy.deepcopy(self)
ir_tree = dataset.parse_tree()
self.parent = parent
_init_device_info()
return ir_tree, dataset

def parse_tree(self):


Loading…
Cancel
Save