|
|
@@ -43,7 +43,8 @@ DeviceQueueOp::DeviceQueueOp(std::string channel_name, DeviceType device_type, i |
|
|
send_epoch_end_(send_epoch_end), |
|
|
send_epoch_end_(send_epoch_end), |
|
|
stop_send_(false), |
|
|
stop_send_(false), |
|
|
total_batch_(total_batch), |
|
|
total_batch_(total_batch), |
|
|
create_data_info_queue_(create_data_info_queue) { |
|
|
|
|
|
|
|
|
create_data_info_queue_(create_data_info_queue), |
|
|
|
|
|
data_info_queue_ptr_(nullptr) { |
|
|
#ifdef ENABLE_GPUQUE |
|
|
#ifdef ENABLE_GPUQUE |
|
|
// Get the total device num of current machine |
|
|
// Get the total device num of current machine |
|
|
int32_t device_count = 0; |
|
|
int32_t device_count = 0; |
|
|
@@ -106,8 +107,15 @@ Status DeviceQueueOp::operator()() { |
|
|
if (device_type_ == DeviceType::Ascend) { |
|
|
if (device_type_ == DeviceType::Ascend) { |
|
|
#ifdef ENABLE_TDTQUE |
|
|
#ifdef ENABLE_TDTQUE |
|
|
if (create_data_info_queue_) { |
|
|
if (create_data_info_queue_) { |
|
|
data_info_queue_ptr_ = std::make_unique<DATA_INFO_QUEUE>(kDataInfoQueueCapacity); |
|
|
|
|
|
RETURN_IF_NOT_OK(data_info_queue_ptr_->Register(tree_->AllTasks())); |
|
|
|
|
|
|
|
|
// This place has a race condition with GetDataInfo, so the first one |
|
|
|
|
|
// arrive here will do the initialize work. |
|
|
|
|
|
{ |
|
|
|
|
|
std::unique_lock<std::mutex> lock(data_info_mutex_); |
|
|
|
|
|
if (data_info_queue_ptr_ == nullptr) { |
|
|
|
|
|
data_info_queue_ptr_ = std::make_unique<DATA_INFO_QUEUE>(kDataInfoQueueCapacity); |
|
|
|
|
|
RETURN_IF_NOT_OK(data_info_queue_ptr_->Register(tree_->AllTasks())); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
RETURN_IF_NOT_OK(SendDataToAscend()); |
|
|
RETURN_IF_NOT_OK(SendDataToAscend()); |
|
|
#endif |
|
|
#endif |
|
|
@@ -232,6 +240,15 @@ Status DeviceQueueOp::GetDataInfo(DATA_INFO *data_info) { |
|
|
if (!create_data_info_queue_) { |
|
|
if (!create_data_info_queue_) { |
|
|
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "DataInfo queue is not created."); |
|
|
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "DataInfo queue is not created."); |
|
|
} |
|
|
} |
|
|
|
|
|
// This place has a race condition with operator(), so the first one |
|
|
|
|
|
// arrive here will do the initialize work. |
|
|
|
|
|
{ |
|
|
|
|
|
std::unique_lock<std::mutex> lock(data_info_mutex_); |
|
|
|
|
|
if (data_info_queue_ptr_ == nullptr) { |
|
|
|
|
|
data_info_queue_ptr_ = std::make_unique<DATA_INFO_QUEUE>(kDataInfoQueueCapacity); |
|
|
|
|
|
RETURN_IF_NOT_OK(data_info_queue_ptr_->Register(tree_->AllTasks())); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
RETURN_IF_NOT_OK(data_info_queue_ptr_->PopFront(data_info)); |
|
|
RETURN_IF_NOT_OK(data_info_queue_ptr_->PopFront(data_info)); |
|
|
return Status::OK(); |
|
|
return Status::OK(); |
|
|
} |
|
|
} |
|
|
|