|
|
|
@@ -16,6 +16,7 @@ |
|
|
|
|
|
|
|
#include "minddata/dataset/engine/datasetops/device_queue_op.h" |
|
|
|
|
|
|
|
#include <algorithm> |
|
|
|
#include <iostream> |
|
|
|
#include <memory> |
|
|
|
#include "minddata/dataset/engine/data_buffer.h" |
|
|
|
@@ -34,6 +35,7 @@ DeviceQueueOp::DeviceQueueOp(std::string channel_name, DeviceType device_type, i |
|
|
|
prefetch_size_(prefetch_size), |
|
|
|
send_epoch_end_(send_epoch_end), |
|
|
|
stop_send_(false), |
|
|
|
send_finished_(false), |
|
|
|
total_batch_(total_batch), |
|
|
|
create_data_info_queue_(create_data_info_queue), |
|
|
|
data_info_queue_ptr_(nullptr) { |
|
|
|
@@ -56,9 +58,19 @@ DeviceQueueOp::DeviceQueueOp(std::string channel_name, DeviceType device_type, i |
|
|
|
#ifdef ENABLE_TDTQUE |
|
|
|
ascend_keep_waiting_ = true; |
|
|
|
#endif |
|
|
|
#ifdef ENABLE_DUMP_IR |
|
|
|
md_channel_info_ = std::make_shared<MDChannelInfo>(channel_name_); |
|
|
|
#endif |
|
|
|
} |
|
|
|
|
|
|
|
DeviceQueueOp::~DeviceQueueOp() {} |
|
|
|
DeviceQueueOp::~DeviceQueueOp() { |
|
|
|
#ifdef ENABLE_DUMP_IR |
|
|
|
std::string rdr_msg = md_channel_info_->ToString(); |
|
|
|
if (!send_finished_ && !rdr_msg.empty()) { |
|
|
|
MS_LOG(INFO) << rdr_msg; |
|
|
|
} |
|
|
|
#endif |
|
|
|
} |
|
|
|
|
|
|
|
#ifdef ENABLE_GPUQUE |
|
|
|
void DeviceQueueOp::ReleaseData(void *addr, int32_t worker_id) { |
|
|
|
@@ -96,6 +108,11 @@ Status DeviceQueueOp::CheckExceptions(const std::unique_ptr<DataBuffer> &buffer) |
|
|
|
Status DeviceQueueOp::operator()() { |
|
|
|
TaskManager::FindMe()->Post(); |
|
|
|
|
|
|
|
#ifdef ENABLE_DUMP_IR |
|
|
|
if (md_channel_info_ == nullptr) { |
|
|
|
return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__, "RDR module init failed."); |
|
|
|
} |
|
|
|
#endif |
|
|
|
if (device_type_ == DeviceType::Ascend) { |
|
|
|
#ifdef ENABLE_TDTQUE |
|
|
|
if (create_data_info_queue_) { |
|
|
|
@@ -125,8 +142,8 @@ Status DeviceQueueOp::operator()() { |
|
|
|
#ifdef ENABLE_TDTQUE |
|
|
|
Status DeviceQueueOp::SendDataToAscend() { |
|
|
|
MS_LOG(INFO) << "Device queue, sending data to Ascend."; |
|
|
|
int64_t send_batch = 0; |
|
|
|
uint64_t batch_start_time, end_time; |
|
|
|
int64_t send_batch = 0; |
|
|
|
int32_t tdt_cost; |
|
|
|
int32_t connector_size = 0; |
|
|
|
int32_t connector_capacity; |
|
|
|
@@ -141,6 +158,10 @@ Status DeviceQueueOp::SendDataToAscend() { |
|
|
|
batch_start_time = ProfilingTime::GetCurMilliSecond(); |
|
|
|
connector_capacity = ChildOpConnectorCapacity(); |
|
|
|
} |
|
|
|
#ifdef ENABLE_DUMP_IR |
|
|
|
md_channel_info_->RecordBatchQueue(ChildOpConnectorSize()); |
|
|
|
md_channel_info_->RecordPreprocessBatch(0); |
|
|
|
#endif |
|
|
|
std::unique_ptr<DataBuffer> current_buffer; |
|
|
|
RETURN_IF_NOT_OK(GetNextInput(¤t_buffer)); |
|
|
|
|
|
|
|
@@ -151,24 +172,20 @@ Status DeviceQueueOp::SendDataToAscend() { |
|
|
|
for (int row_id = 0; row_id < current_buffer->NumRows(); row_id++) { |
|
|
|
RETURN_IF_NOT_OK(current_buffer->GetRow(row_id, &currRow)); |
|
|
|
WaitContinueSignal(); |
|
|
|
auto status = tdtInstancePtr->hostPush(currRow, true, channel_name_, isProfilingEnable, tdt_cost); |
|
|
|
if (status == TdtStatus::FAILED) { |
|
|
|
if (stop_send_) { |
|
|
|
MS_LOG(INFO) << "stop_send received"; |
|
|
|
return Status::OK(); |
|
|
|
} |
|
|
|
return Status(StatusCode::kMDTDTPushFailure, "TDT Push Failed"); |
|
|
|
} |
|
|
|
if (create_data_info_queue_) { |
|
|
|
DATA_INFO data_info; |
|
|
|
(void)std::transform( |
|
|
|
currRow.begin(), currRow.end(), std::back_inserter(data_info), |
|
|
|
[](const std::shared_ptr<Tensor> &ts) { return std::make_pair(ts->type(), ts->shape()); }); |
|
|
|
RETURN_IF_NOT_OK(data_info_queue_ptr_->Add(data_info)); |
|
|
|
} |
|
|
|
#ifdef ENABLE_DUMP_IR |
|
|
|
md_channel_info_->RecordBatchQueue(ChildOpConnectorSize()); |
|
|
|
md_channel_info_->RecordPreprocessBatch(send_batch); |
|
|
|
md_channel_info_->RecordPushStartTime(); |
|
|
|
#endif |
|
|
|
RETURN_IF_NOT_OK(SendRowToTdt(currRow, isProfilingEnable, &tdt_cost)); |
|
|
|
ProfilingRecorder(isProfilingEnable, profiling_node, send_batch, tdt_cost, &batch_start_time, &end_time, |
|
|
|
connector_capacity, connector_size); |
|
|
|
send_batch++; |
|
|
|
#ifdef ENABLE_DUMP_IR |
|
|
|
md_channel_info_->RecordBatchQueue(ChildOpConnectorSize()); |
|
|
|
md_channel_info_->RecordPreprocessBatch(send_batch); |
|
|
|
md_channel_info_->RecordPushEndTime(); |
|
|
|
#endif |
|
|
|
|
|
|
|
if (total_batch_ > 0 && send_batch >= total_batch_) { |
|
|
|
is_break_loop = true; |
|
|
|
@@ -187,6 +204,7 @@ Status DeviceQueueOp::SendDataToAscend() { |
|
|
|
tdtInstancePtr->hostPush(currRow, true, channel_name_, isProfilingEnable, tdt_cost, tdt::TDT_END_OF_SEQUENCE); |
|
|
|
if (status == TdtStatus::FAILED) { |
|
|
|
if (stop_send_) { |
|
|
|
send_finished_ = true; |
|
|
|
MS_LOG(INFO) << "stop_send received"; |
|
|
|
return Status::OK(); |
|
|
|
} |
|
|
|
@@ -203,6 +221,10 @@ Status DeviceQueueOp::SendDataToAscend() { |
|
|
|
RETURN_IF_NOT_OK(GetNextInput(¤t_buffer)); |
|
|
|
} |
|
|
|
|
|
|
|
// now we use this flag to judge whether exception raised. |
|
|
|
if (stop_send_ || !TaskManager::FindMe()->Interrupted()) { |
|
|
|
send_finished_ = true; |
|
|
|
} |
|
|
|
tree_->SetFinished(); |
|
|
|
|
|
|
|
return Status::OK(); |
|
|
|
@@ -214,6 +236,23 @@ void DeviceQueueOp::WaitContinueSignal() const { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
Status DeviceQueueOp::SendRowToTdt(TensorRow currRow, bool isProfilingEnable, int32_t *tdt_cost) { |
|
|
|
auto status = tdtInstancePtr->hostPush(currRow, true, channel_name_, isProfilingEnable, *tdt_cost); |
|
|
|
if (status == TdtStatus::FAILED) { |
|
|
|
if (stop_send_) { |
|
|
|
MS_LOG(INFO) << "stop_send received"; |
|
|
|
return Status::OK(); |
|
|
|
} |
|
|
|
return Status(StatusCode::kMDTDTPushFailure, "TDT Push Failed"); |
|
|
|
} |
|
|
|
if (create_data_info_queue_) { |
|
|
|
DATA_INFO data_info; |
|
|
|
(void)std::transform(currRow.begin(), currRow.end(), std::back_inserter(data_info), |
|
|
|
[](const std::shared_ptr<Tensor> &ts) { return std::make_pair(ts->type(), ts->shape()); }); |
|
|
|
RETURN_IF_NOT_OK(data_info_queue_ptr_->Add(data_info)); |
|
|
|
} |
|
|
|
return Status::OK(); |
|
|
|
} |
|
|
|
#endif |
|
|
|
|
|
|
|
#ifdef ENABLE_TDTQUE |
|
|
|
@@ -280,8 +319,6 @@ Status DeviceQueueOp::PushDataToGPU() { |
|
|
|
RETURN_IF_NOT_OK(SetThreadDevice()); |
|
|
|
TaskManager::FindMe()->Post(); |
|
|
|
uint64_t batch_start_time = 0; |
|
|
|
uint64_t end_time = 0; |
|
|
|
int32_t batch_cost = 0; |
|
|
|
int32_t push_cost = 0; |
|
|
|
int32_t connector_size = 0; |
|
|
|
int32_t connector_capacity = 0; |
|
|
|
@@ -294,13 +331,22 @@ Status DeviceQueueOp::PushDataToGPU() { |
|
|
|
batch_start_time = ProfilingTime::GetCurMilliSecond(); |
|
|
|
connector_capacity = gpu_item_connector_->capacity(); |
|
|
|
} |
|
|
|
#ifdef ENABLE_DUMP_IR |
|
|
|
md_channel_info_->RecordBatchQueue(gpu_item_connector_->size()); |
|
|
|
md_channel_info_->RecordPreprocessBatch(0); |
|
|
|
#endif |
|
|
|
std::vector<device::DataItemGpu> items; |
|
|
|
RETURN_IF_NOT_OK(gpu_item_connector_->Pop(0, &items)); |
|
|
|
int64_t send_batch = 0; |
|
|
|
bool is_open = false; |
|
|
|
uint32_t handle = INVALID_HANDLE; |
|
|
|
int64_t send_batch = 0; |
|
|
|
auto release_function = std::bind(&DeviceQueueOp::ReleaseData, this, std::placeholders::_1, std::placeholders::_2); |
|
|
|
while (!items.empty() && !GpuBufferMgr::GetInstance().IsClosed()) { |
|
|
|
#ifdef ENABLE_DUMP_IR |
|
|
|
md_channel_info_->RecordBatchQueue(gpu_item_connector_->size()); |
|
|
|
md_channel_info_->RecordPreprocessBatch(send_batch); |
|
|
|
md_channel_info_->RecordPushStartTime(); |
|
|
|
#endif |
|
|
|
if (!is_open) { |
|
|
|
std::vector<size_t> data_size; |
|
|
|
for (int32_t index = 0; index < items.size(); index++) { |
|
|
|
@@ -318,28 +364,13 @@ Status DeviceQueueOp::PushDataToGPU() { |
|
|
|
items[0].data_type_)) { |
|
|
|
return Status(StatusCode::kMDTimeOut, __LINE__, __FILE__, "Failed to prefetch data."); |
|
|
|
} |
|
|
|
while (!GpuBufferMgr::GetInstance().IsClosed() && !TaskManager::FindMe()->Interrupted()) { |
|
|
|
BlockQueueStatus_T ret = GpuBufferMgr::GetInstance().Push(handle, items, WAIT_TIME); |
|
|
|
if (ret) { |
|
|
|
if (ret == BlockQueueStatus_T::ERROR_INPUT) { |
|
|
|
return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__, "Invalid input data, please check it."); |
|
|
|
} else { |
|
|
|
if (!stop_send_) { |
|
|
|
MS_LOG(DEBUG) << "Retry pushing data..."; |
|
|
|
continue; |
|
|
|
} |
|
|
|
break; |
|
|
|
} |
|
|
|
} else { |
|
|
|
break; |
|
|
|
} |
|
|
|
} |
|
|
|
RETURN_IF_NOT_OK(RetryPushData(handle, items)); |
|
|
|
send_batch++; |
|
|
|
if (isProfilingEnable) { |
|
|
|
end_time = ProfilingTime::GetCurMilliSecond(); |
|
|
|
uint64_t end_time = ProfilingTime::GetCurMilliSecond(); |
|
|
|
// record push data time |
|
|
|
profiling_node->Record(TIME, TDT_PUSH_TIME, send_batch, push_cost, end_time); |
|
|
|
batch_cost = (int32_t)(end_time - batch_start_time); |
|
|
|
int32_t batch_cost = (int32_t)(end_time - batch_start_time); |
|
|
|
// record batch time |
|
|
|
profiling_node->Record(TIME, BATCH_TIME, send_batch, batch_cost, end_time); |
|
|
|
// record pipeline time |
|
|
|
@@ -350,6 +381,11 @@ Status DeviceQueueOp::PushDataToGPU() { |
|
|
|
connector_size = gpu_item_connector_->size(); |
|
|
|
connector_capacity = gpu_item_connector_->capacity(); |
|
|
|
} |
|
|
|
#ifdef ENABLE_DUMP_IR |
|
|
|
md_channel_info_->RecordBatchQueue(gpu_item_connector_->size()); |
|
|
|
md_channel_info_->RecordPreprocessBatch(send_batch); |
|
|
|
md_channel_info_->RecordPushEndTime(); |
|
|
|
#endif |
|
|
|
if (total_batch_ > 0 && send_batch >= total_batch_) { |
|
|
|
break; |
|
|
|
} |
|
|
|
@@ -360,6 +396,10 @@ Status DeviceQueueOp::PushDataToGPU() { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// now we use this flag to judge whether exception raised. |
|
|
|
if (!TaskManager::FindMe()->Interrupted() && !GpuBufferMgr::GetInstance().IsClosed()) { |
|
|
|
send_finished_ = true; |
|
|
|
} |
|
|
|
tree_->SetFinished(); |
|
|
|
MS_LOG(INFO) << "Device queue send " << send_batch << " batch."; |
|
|
|
|
|
|
|
@@ -368,6 +408,26 @@ Status DeviceQueueOp::PushDataToGPU() { |
|
|
|
return Status::OK(); |
|
|
|
} |
|
|
|
|
|
|
|
Status DeviceQueueOp::RetryPushData(unsigned int handle, const std::vector<DataItemGpu> &items) { |
|
|
|
while (!GpuBufferMgr::GetInstance().IsClosed() && !TaskManager::FindMe()->Interrupted()) { |
|
|
|
BlockQueueStatus_T ret = GpuBufferMgr::GetInstance().Push(handle, items, WAIT_TIME); |
|
|
|
if (ret) { |
|
|
|
if (ret == BlockQueueStatus_T::ERROR_INPUT) { |
|
|
|
return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__, "Invalid input data, please check it."); |
|
|
|
} else { |
|
|
|
if (!stop_send_) { |
|
|
|
MS_LOG(DEBUG) << "Retry pushing data..."; |
|
|
|
continue; |
|
|
|
} |
|
|
|
break; |
|
|
|
} |
|
|
|
} else { |
|
|
|
break; |
|
|
|
} |
|
|
|
} |
|
|
|
return Status::OK(); |
|
|
|
} |
|
|
|
|
|
|
|
// WorkEntry of DeviceQueueOp just do multi_threads memcpy for performance optimization. |
|
|
|
Status DeviceQueueOp::WorkerEntry(int32_t worker_id) { |
|
|
|
// Every thread use cuda api should SetThreadDevice |
|
|
|
|