diff --git a/mindspore/ccsrc/backend/session/executor.cc b/mindspore/ccsrc/backend/session/executor.cc index 9353d6931d..78ef715ed6 100644 --- a/mindspore/ccsrc/backend/session/executor.cc +++ b/mindspore/ccsrc/backend/session/executor.cc @@ -110,6 +110,12 @@ Executor::Executor(const std::string &device_name, uint32_t device_id) { worker_ = std::make_shared(&Executor::WorkerLoop, this); } +void Executor::CheckException() { + if (exception_ptr_ != nullptr) { + std::rethrow_exception(exception_ptr_); + } +} + void Executor::WorkerJoin() { StopWorker(); worker_->join(); @@ -128,7 +134,11 @@ void Executor::WorkerLoop() { OnWorkerExit(); return; } - task->Run(); + try { + task->Run(); + } catch (const std::exception &e) { + exception_ptr_ = std::current_exception(); + } if (task->type_ == kCompileNodes) { compile_cond_var_.notify_all(); } else if (task->type_ == kCompileGraph) { @@ -183,6 +193,7 @@ bool Executor::IsAllInputsReady(const std::vector &inputs) { GraphId Executor::CompileGraphAsync(const SessionPtr &session, const AnfNodePtrList &lst, const AnfNodePtrList &outputs) { + CheckException(); std::unique_lock lock(task_mutex_); auto task = std::make_shared(); task->session_ = session; @@ -191,10 +202,12 @@ GraphId Executor::CompileGraphAsync(const SessionPtr &session, const AnfNodePtrL ready_tasks_.push(task); task_cond_var_.notify_all(); compile_cond_var_.wait(lock); + CheckException(); return task->graph_id_; } GraphId Executor::CompileGraphAsync(const SessionPtr &session, NotNull func_graph) { + CheckException(); std::unique_lock lock(task_mutex_); auto task = std::make_shared(); task->session_ = session; @@ -202,10 +215,12 @@ GraphId Executor::CompileGraphAsync(const SessionPtr &session, NotNullgraph_id_; } void Executor::BuildGraphAsync(const SessionPtr &session, GraphId graphId) { + CheckException(); std::unique_lock lock(task_mutex_); auto task = std::make_shared(); task->session_ = session; @@ -213,10 +228,12 @@ void Executor::BuildGraphAsync(const SessionPtr &session, GraphId graphId) { ready_tasks_.push(task); task_cond_var_.notify_all(); build_cond_var_.wait(lock); + CheckException(); } void Executor::RunGraphAsync(const SessionPtr &session, const GraphId &graph_id, const std::vector &inputs, VectorRef *outputs) { + CheckException(); auto task = std::make_shared(); task->session_ = session; task->graph_id_ = graph_id; @@ -237,10 +254,12 @@ void Executor::RunGraphAsync(const SessionPtr &session, const GraphId &graph_id, task_cond_var_.notify_all(); py::gil_scoped_release release; run_cond_var_.wait(lock); + CheckException(); } void Executor::BuildOpAsync(const SessionPtr &session, OpRunInfo *op_run_info, const GraphInfo &graph_info, const std::vector &input_tensors, const std::vector &tensors_mask) { + CheckException(); std::unique_lock lock(task_mutex_); auto task = std::make_shared(); task->session_ = session; @@ -251,10 +270,12 @@ void Executor::BuildOpAsync(const SessionPtr &session, OpRunInfo *op_run_info, c ready_tasks_.push(task); task_cond_var_.notify_all(); build_op_cond_var_.wait(lock); + CheckException(); } py::tuple Executor::RunOpAsync(const SessionPtr &session, OpRunInfo *op_run_info, const GraphInfo &graph_info, const std::vector &input_tensors) { + CheckException(); std::unique_lock lock(task_mutex_); auto task = std::make_shared(); task->session_ = session; @@ -264,6 +285,7 @@ py::tuple Executor::RunOpAsync(const SessionPtr &session, OpRunInfo *op_run_info ready_tasks_.push(task); task_cond_var_.notify_all(); run_op_cond_var_.wait(lock); + CheckException(); // Trans output to tuple auto output_tensors = TransformBaseRefListToTuple(task->outputs_); diff --git a/mindspore/ccsrc/backend/session/executor.h b/mindspore/ccsrc/backend/session/executor.h index 3078c41f1d..467a61ce2e 100644 --- a/mindspore/ccsrc/backend/session/executor.h +++ b/mindspore/ccsrc/backend/session/executor.h @@ -26,6 +26,7 @@ #include #include #include +#include #include "backend/session/session_basic.h" #include "ir/anf.h" #include "ir/tensor.h" @@ -128,11 +129,12 @@ class Executor { const std::vector &input_tensors); void OnRunGraphFinished(); - protected: + private: void UpdateOutputTensors(VectorRef *outputs, const std::map &tensor_to_node); std::vector> GetNewReadyTasks(); bool IsAllInputsReady(const std::vector &inputs); + void CheckException(); void StopWorker(); void OnWorkerExit(); @@ -149,6 +151,7 @@ class Executor { std::queue> ready_tasks_; std::list> pending_tasks_; std::shared_ptr worker_; + std::exception_ptr exception_ptr_{nullptr}; }; } // namespace session } // namespace mindspore diff --git a/mindspore/ccsrc/backend/session/executor_manager.h b/mindspore/ccsrc/backend/session/executor_manager.h index 3dc4f6ee17..ff876b8673 100644 --- a/mindspore/ccsrc/backend/session/executor_manager.h +++ b/mindspore/ccsrc/backend/session/executor_manager.h @@ -13,8 +13,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#ifndef MINDSPORE_CCSRC_BACKEND_SESSION_EXECUTOR_MANGER_H_ -#define MINDSPORE_CCSRC_BACKEND_SESSION_EXECUTOR_MANGER_H_ +#ifndef MINDSPORE_CCSRC_BACKEND_SESSION_EXECUTOR_MANAGER_H_ +#define MINDSPORE_CCSRC_BACKEND_SESSION_EXECUTOR_MANAGER_H_ #include #include #include @@ -42,4 +42,4 @@ class ExecutorManager { }; } // namespace session } // namespace mindspore -#endif // MINDSPORE_CCSRC_BACKEND_SESSION_EXECUTOR_MANGER_H_ +#endif // MINDSPORE_CCSRC_BACKEND_SESSION_EXECUTOR_MANAGER_H_