diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc index 981b89ba29..d553eda16a 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc @@ -43,7 +43,8 @@ DeviceQueueOp::DeviceQueueOp(std::string channel_name, DeviceType device_type, i send_epoch_end_(send_epoch_end), stop_send_(false), total_batch_(total_batch), - create_data_info_queue_(create_data_info_queue) { + create_data_info_queue_(create_data_info_queue), + data_info_queue_ptr_(nullptr) { #ifdef ENABLE_GPUQUE // Get the total device num of current machine int32_t device_count = 0; @@ -106,8 +107,15 @@ Status DeviceQueueOp::operator()() { if (device_type_ == DeviceType::Ascend) { #ifdef ENABLE_TDTQUE if (create_data_info_queue_) { - data_info_queue_ptr_ = std::make_unique(kDataInfoQueueCapacity); - RETURN_IF_NOT_OK(data_info_queue_ptr_->Register(tree_->AllTasks())); + // This place has a race condition with GetDataInfo, so the first one + // arrive here will do the initialize work. + { + std::unique_lock lock(data_info_mutex_); + if (data_info_queue_ptr_ == nullptr) { + data_info_queue_ptr_ = std::make_unique(kDataInfoQueueCapacity); + RETURN_IF_NOT_OK(data_info_queue_ptr_->Register(tree_->AllTasks())); + } + } } RETURN_IF_NOT_OK(SendDataToAscend()); #endif @@ -232,6 +240,15 @@ Status DeviceQueueOp::GetDataInfo(DATA_INFO *data_info) { if (!create_data_info_queue_) { return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "DataInfo queue is not created."); } + // This place has a race condition with operator(), so the first one + // arrive here will do the initialize work. + { + std::unique_lock lock(data_info_mutex_); + if (data_info_queue_ptr_ == nullptr) { + data_info_queue_ptr_ = std::make_unique(kDataInfoQueueCapacity); + RETURN_IF_NOT_OK(data_info_queue_ptr_->Register(tree_->AllTasks())); + } + } RETURN_IF_NOT_OK(data_info_queue_ptr_->PopFront(data_info)); return Status::OK(); } diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.h index 2d259b134e..846d03c396 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.h @@ -217,6 +217,7 @@ class DeviceQueueOp : public PipelineOp { int32_t total_batch_; bool create_data_info_queue_; std::unique_ptr data_info_queue_ptr_; + std::mutex data_info_mutex_; #ifdef ENABLE_TDTQUE std::shared_ptr tdtInstancePtr;