diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc index b13e56b7a7..3176d1a445 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc @@ -16,6 +16,7 @@ #include "minddata/dataset/engine/datasetops/device_queue_op.h" +#include #include #include #include "minddata/dataset/engine/data_buffer.h" @@ -34,6 +35,7 @@ DeviceQueueOp::DeviceQueueOp(std::string channel_name, DeviceType device_type, i prefetch_size_(prefetch_size), send_epoch_end_(send_epoch_end), stop_send_(false), + send_finished_(false), total_batch_(total_batch), create_data_info_queue_(create_data_info_queue), data_info_queue_ptr_(nullptr) { @@ -56,9 +58,19 @@ DeviceQueueOp::DeviceQueueOp(std::string channel_name, DeviceType device_type, i #ifdef ENABLE_TDTQUE ascend_keep_waiting_ = true; #endif +#ifdef ENABLE_DUMP_IR + md_channel_info_ = std::make_shared(channel_name_); +#endif } -DeviceQueueOp::~DeviceQueueOp() {} +DeviceQueueOp::~DeviceQueueOp() { +#ifdef ENABLE_DUMP_IR + std::string rdr_msg = md_channel_info_->ToString(); + if (!send_finished_ && !rdr_msg.empty()) { + MS_LOG(INFO) << rdr_msg; + } +#endif +} #ifdef ENABLE_GPUQUE void DeviceQueueOp::ReleaseData(void *addr, int32_t worker_id) { @@ -96,6 +108,11 @@ Status DeviceQueueOp::CheckExceptions(const std::unique_ptr &buffer) Status DeviceQueueOp::operator()() { TaskManager::FindMe()->Post(); +#ifdef ENABLE_DUMP_IR + if (md_channel_info_ == nullptr) { + return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__, "RDR module init failed."); + } +#endif if (device_type_ == DeviceType::Ascend) { #ifdef ENABLE_TDTQUE if (create_data_info_queue_) { @@ -125,8 +142,8 @@ Status DeviceQueueOp::operator()() { #ifdef ENABLE_TDTQUE Status DeviceQueueOp::SendDataToAscend() { MS_LOG(INFO) << "Device queue, sending data to Ascend."; - int64_t send_batch = 0; uint64_t batch_start_time, end_time; + int64_t send_batch = 0; int32_t tdt_cost; int32_t connector_size = 0; int32_t connector_capacity; @@ -141,6 +158,10 @@ Status DeviceQueueOp::SendDataToAscend() { batch_start_time = ProfilingTime::GetCurMilliSecond(); connector_capacity = ChildOpConnectorCapacity(); } +#ifdef ENABLE_DUMP_IR + md_channel_info_->RecordBatchQueue(ChildOpConnectorSize()); + md_channel_info_->RecordPreprocessBatch(0); +#endif std::unique_ptr current_buffer; RETURN_IF_NOT_OK(GetNextInput(¤t_buffer)); @@ -151,24 +172,20 @@ Status DeviceQueueOp::SendDataToAscend() { for (int row_id = 0; row_id < current_buffer->NumRows(); row_id++) { RETURN_IF_NOT_OK(current_buffer->GetRow(row_id, &currRow)); WaitContinueSignal(); - auto status = tdtInstancePtr->hostPush(currRow, true, channel_name_, isProfilingEnable, tdt_cost); - if (status == TdtStatus::FAILED) { - if (stop_send_) { - MS_LOG(INFO) << "stop_send received"; - return Status::OK(); - } - return Status(StatusCode::kMDTDTPushFailure, "TDT Push Failed"); - } - if (create_data_info_queue_) { - DATA_INFO data_info; - (void)std::transform( - currRow.begin(), currRow.end(), std::back_inserter(data_info), - [](const std::shared_ptr &ts) { return std::make_pair(ts->type(), ts->shape()); }); - RETURN_IF_NOT_OK(data_info_queue_ptr_->Add(data_info)); - } +#ifdef ENABLE_DUMP_IR + md_channel_info_->RecordBatchQueue(ChildOpConnectorSize()); + md_channel_info_->RecordPreprocessBatch(send_batch); + md_channel_info_->RecordPushStartTime(); +#endif + RETURN_IF_NOT_OK(SendRowToTdt(currRow, isProfilingEnable, &tdt_cost)); ProfilingRecorder(isProfilingEnable, profiling_node, send_batch, tdt_cost, &batch_start_time, &end_time, connector_capacity, connector_size); send_batch++; +#ifdef ENABLE_DUMP_IR + md_channel_info_->RecordBatchQueue(ChildOpConnectorSize()); + md_channel_info_->RecordPreprocessBatch(send_batch); + md_channel_info_->RecordPushEndTime(); +#endif if (total_batch_ > 0 && send_batch >= total_batch_) { is_break_loop = true; @@ -187,6 +204,7 @@ Status DeviceQueueOp::SendDataToAscend() { tdtInstancePtr->hostPush(currRow, true, channel_name_, isProfilingEnable, tdt_cost, tdt::TDT_END_OF_SEQUENCE); if (status == TdtStatus::FAILED) { if (stop_send_) { + send_finished_ = true; MS_LOG(INFO) << "stop_send received"; return Status::OK(); } @@ -203,6 +221,10 @@ Status DeviceQueueOp::SendDataToAscend() { RETURN_IF_NOT_OK(GetNextInput(¤t_buffer)); } + // now we use this flag to judge whether exception raised. + if (stop_send_ || !TaskManager::FindMe()->Interrupted()) { + send_finished_ = true; + } tree_->SetFinished(); return Status::OK(); @@ -214,6 +236,23 @@ void DeviceQueueOp::WaitContinueSignal() const { } } +Status DeviceQueueOp::SendRowToTdt(TensorRow currRow, bool isProfilingEnable, int32_t *tdt_cost) { + auto status = tdtInstancePtr->hostPush(currRow, true, channel_name_, isProfilingEnable, *tdt_cost); + if (status == TdtStatus::FAILED) { + if (stop_send_) { + MS_LOG(INFO) << "stop_send received"; + return Status::OK(); + } + return Status(StatusCode::kMDTDTPushFailure, "TDT Push Failed"); + } + if (create_data_info_queue_) { + DATA_INFO data_info; + (void)std::transform(currRow.begin(), currRow.end(), std::back_inserter(data_info), + [](const std::shared_ptr &ts) { return std::make_pair(ts->type(), ts->shape()); }); + RETURN_IF_NOT_OK(data_info_queue_ptr_->Add(data_info)); + } + return Status::OK(); +} #endif #ifdef ENABLE_TDTQUE @@ -280,8 +319,6 @@ Status DeviceQueueOp::PushDataToGPU() { RETURN_IF_NOT_OK(SetThreadDevice()); TaskManager::FindMe()->Post(); uint64_t batch_start_time = 0; - uint64_t end_time = 0; - int32_t batch_cost = 0; int32_t push_cost = 0; int32_t connector_size = 0; int32_t connector_capacity = 0; @@ -294,13 +331,22 @@ Status DeviceQueueOp::PushDataToGPU() { batch_start_time = ProfilingTime::GetCurMilliSecond(); connector_capacity = gpu_item_connector_->capacity(); } +#ifdef ENABLE_DUMP_IR + md_channel_info_->RecordBatchQueue(gpu_item_connector_->size()); + md_channel_info_->RecordPreprocessBatch(0); +#endif std::vector items; RETURN_IF_NOT_OK(gpu_item_connector_->Pop(0, &items)); + int64_t send_batch = 0; bool is_open = false; uint32_t handle = INVALID_HANDLE; - int64_t send_batch = 0; auto release_function = std::bind(&DeviceQueueOp::ReleaseData, this, std::placeholders::_1, std::placeholders::_2); while (!items.empty() && !GpuBufferMgr::GetInstance().IsClosed()) { +#ifdef ENABLE_DUMP_IR + md_channel_info_->RecordBatchQueue(gpu_item_connector_->size()); + md_channel_info_->RecordPreprocessBatch(send_batch); + md_channel_info_->RecordPushStartTime(); +#endif if (!is_open) { std::vector data_size; for (int32_t index = 0; index < items.size(); index++) { @@ -318,28 +364,13 @@ Status DeviceQueueOp::PushDataToGPU() { items[0].data_type_)) { return Status(StatusCode::kMDTimeOut, __LINE__, __FILE__, "Failed to prefetch data."); } - while (!GpuBufferMgr::GetInstance().IsClosed() && !TaskManager::FindMe()->Interrupted()) { - BlockQueueStatus_T ret = GpuBufferMgr::GetInstance().Push(handle, items, WAIT_TIME); - if (ret) { - if (ret == BlockQueueStatus_T::ERROR_INPUT) { - return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__, "Invalid input data, please check it."); - } else { - if (!stop_send_) { - MS_LOG(DEBUG) << "Retry pushing data..."; - continue; - } - break; - } - } else { - break; - } - } + RETURN_IF_NOT_OK(RetryPushData(handle, items)); send_batch++; if (isProfilingEnable) { - end_time = ProfilingTime::GetCurMilliSecond(); + uint64_t end_time = ProfilingTime::GetCurMilliSecond(); // record push data time profiling_node->Record(TIME, TDT_PUSH_TIME, send_batch, push_cost, end_time); - batch_cost = (int32_t)(end_time - batch_start_time); + int32_t batch_cost = (int32_t)(end_time - batch_start_time); // record batch time profiling_node->Record(TIME, BATCH_TIME, send_batch, batch_cost, end_time); // record pipeline time @@ -350,6 +381,11 @@ Status DeviceQueueOp::PushDataToGPU() { connector_size = gpu_item_connector_->size(); connector_capacity = gpu_item_connector_->capacity(); } +#ifdef ENABLE_DUMP_IR + md_channel_info_->RecordBatchQueue(gpu_item_connector_->size()); + md_channel_info_->RecordPreprocessBatch(send_batch); + md_channel_info_->RecordPushEndTime(); +#endif if (total_batch_ > 0 && send_batch >= total_batch_) { break; } @@ -360,6 +396,10 @@ Status DeviceQueueOp::PushDataToGPU() { } } + // now we use this flag to judge whether exception raised. + if (!TaskManager::FindMe()->Interrupted() && !GpuBufferMgr::GetInstance().IsClosed()) { + send_finished_ = true; + } tree_->SetFinished(); MS_LOG(INFO) << "Device queue send " << send_batch << " batch."; @@ -368,6 +408,26 @@ Status DeviceQueueOp::PushDataToGPU() { return Status::OK(); } +Status DeviceQueueOp::RetryPushData(unsigned int handle, const std::vector &items) { + while (!GpuBufferMgr::GetInstance().IsClosed() && !TaskManager::FindMe()->Interrupted()) { + BlockQueueStatus_T ret = GpuBufferMgr::GetInstance().Push(handle, items, WAIT_TIME); + if (ret) { + if (ret == BlockQueueStatus_T::ERROR_INPUT) { + return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__, "Invalid input data, please check it."); + } else { + if (!stop_send_) { + MS_LOG(DEBUG) << "Retry pushing data..."; + continue; + } + break; + } + } else { + break; + } + } + return Status::OK(); +} + // WorkEntry of DeviceQueueOp just do multi_threads memcpy for performance optimization. Status DeviceQueueOp::WorkerEntry(int32_t worker_id) { // Every thread use cuda api should SetThreadDevice diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.h index 6c6df4c4b1..3135fc57df 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.h @@ -25,6 +25,10 @@ #include "minddata/dataset/engine/datasetops/repeat_op.h" #include "minddata/dataset/engine/perf/device_queue_tracing.h" #include "minddata/dataset/util/status.h" +#ifdef ENABLE_DUMP_IR +#include "debug/rdr/running_data_recorder.h" +#include "minddata/dataset/util/rdr.h" +#endif #ifdef ENABLE_TDTQUE #include "minddata/dataset/util/queue.h" @@ -186,12 +190,14 @@ class DeviceQueueOp : public PipelineOp { #ifdef ENABLE_TDTQUE void WaitContinueSignal() const; Status SendDataToAscend(); + Status SendRowToTdt(TensorRow currRow, bool isProfilingEnable, int32_t *tdt_cost); bool ascend_keep_waiting_; #endif #ifdef ENABLE_GPUQUE Status SendDataToGPU(); Status MallocForGPUData(std::vector *items, const TensorRow &curr_row, const int32_t &worker_id); + Status RetryPushData(unsigned int handle, const std::vector &data); void ReleaseData(void *addr, int32_t worker_id); Status LaunchParallelCopyThread(); Status PushDataToGPU(); @@ -220,6 +226,10 @@ class DeviceQueueOp : public PipelineOp { bool create_data_info_queue_; std::unique_ptr data_info_queue_ptr_; std::mutex data_info_mutex_; + bool send_finished_; +#ifdef ENABLE_DUMP_IR + std::shared_ptr md_channel_info_; +#endif #ifdef ENABLE_TDTQUE std::shared_ptr tdtInstancePtr; diff --git a/mindspore/ccsrc/minddata/dataset/util/rdr.cc b/mindspore/ccsrc/minddata/dataset/util/rdr.cc new file mode 100644 index 0000000000..0f80461efc --- /dev/null +++ b/mindspore/ccsrc/minddata/dataset/util/rdr.cc @@ -0,0 +1,97 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "minddata/dataset/util/rdr.h" +#include "minddata/dataset/util/log_adapter.h" + +namespace mindspore { +namespace dataset { +const int32_t kMdRdrRecordLimit = 10; + +std::string MDChannelInfo::ToString() { + std::ostringstream ss; + { + std::unique_lock lock(mutex_); + ss << "preprocess_batch: " << preprocess_batch_ << "; "; + ss << "batch_queue: "; + for (uint32_t i = 0; i < batch_queue_.size(); i++) { + ss << batch_queue_.at(i); + if (i < batch_queue_.size() - 1) { + ss << ", "; + } + } + + ss << "; push_start_time: "; + for (uint32_t i = 0; i < push_start_time_.size(); i++) { + ss << push_start_time_.at(i); + if (i < push_start_time_.size() - 1) { + ss << ", "; + } + } + + ss << "; push_end_time: "; + for (uint32_t i = 0; i < push_end_time_.size(); i++) { + ss << push_end_time_.at(i); + if (i < push_end_time_.size() - 1) { + ss << ", "; + } + } + ss << "."; + } + return ss.str(); +} + +Status MDChannelInfo::RecordBatchQueue(int64_t batch_queue_size) { + { + std::unique_lock lock(mutex_); + if (batch_queue_.size() == kMdRdrRecordLimit) { + batch_queue_.pop_front(); + } + batch_queue_.push_back(batch_queue_size); + } + return Status::OK(); +} + +Status MDChannelInfo::RecordPreprocessBatch(int64_t preprocess_batch) { + { + std::unique_lock lock(mutex_); + preprocess_batch_ = preprocess_batch; + } + return Status::OK(); +} + +Status MDChannelInfo::RecordPushStartTime() { + { + std::unique_lock lock(mutex_); + if (push_start_time_.size() == kMdRdrRecordLimit) { + push_start_time_.pop_front(); + } + push_start_time_.push_back(GetTimeString()); + } + return Status::OK(); +} + +Status MDChannelInfo::RecordPushEndTime() { + { + std::unique_lock lock(mutex_); + if (push_end_time_.size() == kMdRdrRecordLimit) { + push_end_time_.pop_front(); + } + push_end_time_.push_back(GetTimeString()); + } + return Status::OK(); +} +} // namespace dataset +} // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/util/rdr.h b/mindspore/ccsrc/minddata/dataset/util/rdr.h new file mode 100644 index 0000000000..cb4b7f9422 --- /dev/null +++ b/mindspore/ccsrc/minddata/dataset/util/rdr.h @@ -0,0 +1,52 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_UTIL_RDR_H_ +#define MINDSPORE_CCSRC_MINDDATA_DATASET_UTIL_RDR_H_ + +#include +#include +#include "minddata/dataset/util/status.h" + +namespace mindspore { +namespace dataset { +class MDChannelInfo { + public: + explicit MDChannelInfo(std::string channel_name) : channel_name_(channel_name) {} + + ~MDChannelInfo() = default; + + std::string ToString(); + + Status RecordBatchQueue(int64_t batch_queue_size); + + Status RecordPreprocessBatch(int64_t preprocess_batch); + + Status RecordPushStartTime(); + + Status RecordPushEndTime(); + + private: + std::string channel_name_; + std::deque batch_queue_; + int64_t preprocess_batch_; + std::deque push_start_time_; + std::deque push_end_time_; + std::mutex mutex_; +}; +} // namespace dataset +} // namespace mindspore + +#endif // MINDSPORE_CCSRC_MINDDATA_DATASET_UTIL_RDR_H_ diff --git a/mindspore/core/gvar/get_time.cc b/mindspore/core/gvar/get_time.cc new file mode 100644 index 0000000000..7122e084da --- /dev/null +++ b/mindspore/core/gvar/get_time.cc @@ -0,0 +1,53 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include "utils/log_adapter.h" + +namespace mindspore { +// export GetTimeString for all sub modules +std::string GetTimeString() { +#define BUFLEN 80 + char buf[BUFLEN] = {'\0'}; +#if defined(_WIN32) || defined(_WIN64) + time_t time_seconds = time(0); + struct tm now_time; + localtime_s(&now_time, &time_seconds); + sprintf_s(buf, BUFLEN, "%d-%d-%d %d:%d:%d", now_time.tm_year + 1900, now_time.tm_mon + 1, now_time.tm_mday, + now_time.tm_hour, now_time.tm_min, now_time.tm_sec); +#else + struct timeval cur_time; + (void)gettimeofday(&cur_time, nullptr); + + struct tm now; + (void)localtime_r(&cur_time.tv_sec, &now); + (void)strftime(buf, BUFLEN, "%Y-%m-%d-%H:%M:%S", &now); // format date and time + // set micro-second + buf[27] = '\0'; + int idx = 26; + auto num = cur_time.tv_usec; + for (int i = 5; i >= 0; i--) { + buf[idx--] = static_cast(num % 10 + '0'); + num /= 10; + if (i % 3 == 0) { + buf[idx--] = '.'; + } + } +#endif + return std::string(buf); +} +} // namespace mindspore diff --git a/mindspore/core/utils/log_adapter.cc b/mindspore/core/utils/log_adapter.cc index dcd863e788..11708284f5 100644 --- a/mindspore/core/utils/log_adapter.cc +++ b/mindspore/core/utils/log_adapter.cc @@ -23,37 +23,6 @@ // namespace to support utils module definition namespace mindspore { #ifdef USE_GLOG -static std::string GetTime() { -#define BUFLEN 80 - static char buf[BUFLEN]; -#if defined(_WIN32) || defined(_WIN64) - time_t time_seconds = time(0); - struct tm now_time; - localtime_s(&now_time, &time_seconds); - sprintf_s(buf, BUFLEN, "%d-%d-%d %d:%d:%d", now_time.tm_year + 1900, now_time.tm_mon + 1, now_time.tm_mday, - now_time.tm_hour, now_time.tm_min, now_time.tm_sec); -#else - struct timeval cur_time; - (void)gettimeofday(&cur_time, nullptr); - - struct tm now; - (void)localtime_r(&cur_time.tv_sec, &now); - (void)strftime(buf, BUFLEN, "%Y-%m-%d-%H:%M:%S", &now); // format date and time - // set micro-second - buf[27] = '\0'; - int idx = 26; - auto num = cur_time.tv_usec; - for (int i = 5; i >= 0; i--) { - buf[idx--] = static_cast(num % 10 + '0'); - num /= 10; - if (i % 3 == 0) { - buf[idx--] = '.'; - } - } -#endif - return std::string(buf); -} - static std::string GetProcName() { #if defined(__APPLE__) || defined(__FreeBSD__) const char *appname = getprogname(); @@ -193,7 +162,7 @@ void LogWriter::OutputLog(const std::ostringstream &msg) const { auto submodule_name = GetSubModuleName(submodule_); google::LogMessage("", 0, GetGlogLevel(log_level_)).stream() << "[" << GetLogLevel(log_level_) << "] " << submodule_name << "(" << getpid() << "," << GetProcName() - << "):" << GetTime() << " " + << "):" << GetTimeString() << " " << "[" << location_.file_ << ":" << location_.line_ << "] " << location_.func_ << "] " << msg.str() << std::endl; #else auto str_msg = msg.str(); diff --git a/mindspore/core/utils/log_adapter.h b/mindspore/core/utils/log_adapter.h index af88190a03..63329fda0f 100644 --- a/mindspore/core/utils/log_adapter.h +++ b/mindspore/core/utils/log_adapter.h @@ -136,6 +136,12 @@ enum SubModuleId : int { const char *EnumStrForMsLogLevel(MsLogLevel level); +#if defined(_WIN32) || defined(_WIN64) +extern std::string GetTimeString() __attribute__((dllexport)); +#else +extern std::string GetTimeString() __attribute__((visibility("default"))); +#endif + #if defined(_WIN32) || defined(_WIN64) extern int g_ms_submodule_log_levels[] __attribute__((dllexport)); #else