diff --git a/mindspore/ccsrc/ps/core/abstract_node.cc b/mindspore/ccsrc/ps/core/abstract_node.cc index 01f8edf32d..0e867dd767 100644 --- a/mindspore/ccsrc/ps/core/abstract_node.cc +++ b/mindspore/ccsrc/ps/core/abstract_node.cc @@ -418,14 +418,18 @@ bool AbstractNode::InitClientToScheduler() { client_to_scheduler_ = std::make_shared(scheduler_host, scheduler_port); client_to_scheduler_->SetMessageCallback( [&](std::shared_ptr meta, const Protos &protos, const void *data, size_t size) { - if (handlers_.count(meta->cmd()) == 0) { - MS_LOG(EXCEPTION) << "The cmd:" << meta->cmd() << " is not supported!"; - } - if (handlers_[meta->cmd()] != nullptr) { - const auto &handler_ptr = handlers_[meta->cmd()]; - (this->*handler_ptr)(meta, data, size); + try { + if (handlers_.count(meta->cmd()) == 0) { + MS_LOG(EXCEPTION) << "The cmd:" << meta->cmd() << " is not supported!"; + } + if (handlers_[meta->cmd()] != nullptr) { + const auto &handler_ptr = handlers_[meta->cmd()]; + (this->*handler_ptr)(meta, data, size); + } + NotifyMessageArrival(meta); + } catch (const std::exception &e) { + MsException::Instance().SetException(); } - NotifyMessageArrival(meta); }); client_to_scheduler_->Init(); diff --git a/mindspore/ccsrc/ps/core/abstract_node.h b/mindspore/ccsrc/ps/core/abstract_node.h index 434df7160e..c803f8ecd6 100644 --- a/mindspore/ccsrc/ps/core/abstract_node.h +++ b/mindspore/ccsrc/ps/core/abstract_node.h @@ -26,6 +26,7 @@ #include "ps/core/node.h" #include "ps/core/message.h" +#include "utils/ms_exception.h" namespace mindspore { namespace ps { diff --git a/mindspore/ccsrc/ps/core/server_node.cc b/mindspore/ccsrc/ps/core/server_node.cc index c10ba5ebe6..6f50e3eb05 100644 --- a/mindspore/ccsrc/ps/core/server_node.cc +++ b/mindspore/ccsrc/ps/core/server_node.cc @@ -40,6 +40,7 @@ bool ServerNode::Start(const uint32_t &timeout) { FetchServers(client_to_scheduler_); MS_LOG(INFO) << "Server node get all the servers address successful!"; } + MsException::Instance().CheckException(); MS_LOG(INFO) << "Start the node is successful!"; return true; } diff --git a/mindspore/ccsrc/ps/core/tcp_client.cc b/mindspore/ccsrc/ps/core/tcp_client.cc index 7a8094be4c..2d4e2afe0b 100644 --- a/mindspore/ccsrc/ps/core/tcp_client.cc +++ b/mindspore/ccsrc/ps/core/tcp_client.cc @@ -141,17 +141,9 @@ void TcpClient::StartWithDelay(int seconds) { void TcpClient::Stop() { std::lock_guard lock(connection_mutex_); MS_LOG(INFO) << "Stop tcp client!"; - if (event_base_got_break(event_base_)) { - MS_LOG(DEBUG) << "The event base has stopped!"; - is_stop_ = true; - return; - } - if (!is_stop_.load()) { - is_stop_ = true; - int ret = event_base_loopbreak(event_base_); - if (ret != 0) { - MS_LOG(ERROR) << "Event base loop break failed!"; - } + int ret = event_base_loopbreak(event_base_); + if (ret != 0) { + MS_LOG(ERROR) << "Event base loop break failed!"; } } diff --git a/mindspore/ccsrc/ps/core/worker_node.cc b/mindspore/ccsrc/ps/core/worker_node.cc index 11216a7ce8..391c8dc546 100644 --- a/mindspore/ccsrc/ps/core/worker_node.cc +++ b/mindspore/ccsrc/ps/core/worker_node.cc @@ -41,6 +41,7 @@ bool WorkerNode::Start(const uint32_t &timeout) { FetchServers(client_to_scheduler_); MS_LOG(INFO) << "Worker node get all the servers address successful!"; } + MsException::Instance().CheckException(); MS_LOG(INFO) << "The Worker node has successfully started."; return true; } @@ -59,23 +60,17 @@ void WorkerNode::Initialize() { } bool WorkerNode::Stop() { - MS_LOG(INFO) << "Stop worker node!"; if (!is_already_stopped_.load()) { + MS_LOG(INFO) << "Stop worker node!"; is_ready_ = true; is_timeout_ = true; is_finish_ = true; - if (heart_beat_thread_->joinable()) { - heart_beat_thread_->join(); - } client_to_scheduler_->Stop(); if (!connected_nodes_.empty()) { for (auto &connected_node : connected_nodes_) { connected_node.second->Stop(); } } - if (client_to_scheduler_thread_->joinable()) { - client_to_scheduler_thread_->join(); - } is_already_stopped_ = true; } return true;