Merge pull request !6441 from anzhengqi/fix-stack-while-exceptiontags/v1.0.0
| @@ -39,7 +39,11 @@ DeviceQueueOp::DeviceQueueOp(std::string channel_name, DeviceType device_type, i | |||||
| device_id_(device_id), | device_id_(device_id), | ||||
| prefetch_size_(prefetch_size), | prefetch_size_(prefetch_size), | ||||
| send_epoch_end_(send_epoch_end), | send_epoch_end_(send_epoch_end), | ||||
| stop_send_(false) {} | |||||
| stop_send_(false) { | |||||
| #ifdef ENABLE_TDTQUE | |||||
| ascend_keep_waiting_ = true; | |||||
| #endif | |||||
| } | |||||
| DeviceQueueOp::~DeviceQueueOp() {} | DeviceQueueOp::~DeviceQueueOp() {} | ||||
| @@ -120,7 +124,7 @@ Status DeviceQueueOp::SendDataToAscend() { | |||||
| TensorRow currRow; | TensorRow currRow; | ||||
| for (int row_id = 0; row_id < current_buffer->NumRows(); row_id++) { | for (int row_id = 0; row_id < current_buffer->NumRows(); row_id++) { | ||||
| RETURN_IF_NOT_OK(current_buffer->GetRow(row_id, &currRow)); | RETURN_IF_NOT_OK(current_buffer->GetRow(row_id, &currRow)); | ||||
| while (stop_send_) { | |||||
| while (stop_send_ && ascend_keep_waiting_) { | |||||
| MS_LOG(DEBUG) << "stop_send flag is set, waiting for continue signal..."; | MS_LOG(DEBUG) << "stop_send flag is set, waiting for continue signal..."; | ||||
| std::this_thread::sleep_for(std::chrono::microseconds(100)); | std::this_thread::sleep_for(std::chrono::microseconds(100)); | ||||
| } | } | ||||
| @@ -128,6 +128,10 @@ class DeviceQueueOp : public PipelineOp { | |||||
| stop_send_ = false; | stop_send_ = false; | ||||
| } | } | ||||
| #ifdef ENABLE_TDTQUE | |||||
| void StopWaiting() { ascend_keep_waiting_ = false; } | |||||
| #endif | |||||
| // Name: Print() | // Name: Print() | ||||
| // Description: A function that prints info about the node | // Description: A function that prints info about the node | ||||
| void Print(std::ostream &out, // In: The output stream to print to | void Print(std::ostream &out, // In: The output stream to print to | ||||
| @@ -159,6 +163,7 @@ class DeviceQueueOp : public PipelineOp { | |||||
| private: | private: | ||||
| #ifdef ENABLE_TDTQUE | #ifdef ENABLE_TDTQUE | ||||
| Status SendDataToAscend(); | Status SendDataToAscend(); | ||||
| bool ascend_keep_waiting_; | |||||
| #endif | #endif | ||||
| #ifdef ENABLE_GPUQUE | #ifdef ENABLE_GPUQUE | ||||
| @@ -18,6 +18,7 @@ | |||||
| #include <string> | #include <string> | ||||
| #include "minddata/dataset/engine/datasetops/dataset_op.h" | #include "minddata/dataset/engine/datasetops/dataset_op.h" | ||||
| #include "minddata/dataset/engine/datasetops/shuffle_op.h" | #include "minddata/dataset/engine/datasetops/shuffle_op.h" | ||||
| #include "minddata/dataset/engine/datasetops/device_queue_op.h" | |||||
| #include "minddata/dataset/util/task_manager.h" | #include "minddata/dataset/util/task_manager.h" | ||||
| #include "minddata/dataset/engine/opt/pass.h" | #include "minddata/dataset/engine/opt/pass.h" | ||||
| #include "minddata/dataset/engine/opt/pre/removal_pass.h" | #include "minddata/dataset/engine/opt/pre/removal_pass.h" | ||||
| @@ -42,7 +43,15 @@ ExecutionTree::ExecutionTree() : id_count_(0) { | |||||
| } | } | ||||
| // Destructor | // Destructor | ||||
| ExecutionTree::~ExecutionTree() { (void)tg_->ServiceStop(); } | |||||
| ExecutionTree::~ExecutionTree() { | |||||
| #ifdef ENABLE_TDTQUE | |||||
| DeviceQueueOp *op = dynamic_cast<DeviceQueueOp *>(root_.get()); | |||||
| if (op != nullptr) { | |||||
| op->StopWaiting(); | |||||
| } | |||||
| #endif | |||||
| (void)tg_->ServiceStop(); | |||||
| } | |||||
| // Associates a DatasetOp with this tree. This assigns a valid node id to the operator and | // Associates a DatasetOp with this tree. This assigns a valid node id to the operator and | ||||
| // provides it with a link to the tree. A node cannot form any relationships (parent/child) with | // provides it with a link to the tree. A node cannot form any relationships (parent/child) with | ||||