| @@ -418,14 +418,18 @@ bool AbstractNode::InitClientToScheduler() { | |||||
| client_to_scheduler_ = std::make_shared<TcpClient>(scheduler_host, scheduler_port); | client_to_scheduler_ = std::make_shared<TcpClient>(scheduler_host, scheduler_port); | ||||
| client_to_scheduler_->SetMessageCallback( | client_to_scheduler_->SetMessageCallback( | ||||
| [&](std::shared_ptr<MessageMeta> meta, const Protos &protos, const void *data, size_t size) { | [&](std::shared_ptr<MessageMeta> meta, const Protos &protos, const void *data, size_t size) { | ||||
| if (handlers_.count(meta->cmd()) == 0) { | |||||
| MS_LOG(EXCEPTION) << "The cmd:" << meta->cmd() << " is not supported!"; | |||||
| } | |||||
| if (handlers_[meta->cmd()] != nullptr) { | |||||
| const auto &handler_ptr = handlers_[meta->cmd()]; | |||||
| (this->*handler_ptr)(meta, data, size); | |||||
| try { | |||||
| if (handlers_.count(meta->cmd()) == 0) { | |||||
| MS_LOG(EXCEPTION) << "The cmd:" << meta->cmd() << " is not supported!"; | |||||
| } | |||||
| if (handlers_[meta->cmd()] != nullptr) { | |||||
| const auto &handler_ptr = handlers_[meta->cmd()]; | |||||
| (this->*handler_ptr)(meta, data, size); | |||||
| } | |||||
| NotifyMessageArrival(meta); | |||||
| } catch (const std::exception &e) { | |||||
| MsException::Instance().SetException(); | |||||
| } | } | ||||
| NotifyMessageArrival(meta); | |||||
| }); | }); | ||||
| client_to_scheduler_->Init(); | client_to_scheduler_->Init(); | ||||
| @@ -26,6 +26,7 @@ | |||||
| #include "ps/core/node.h" | #include "ps/core/node.h" | ||||
| #include "ps/core/message.h" | #include "ps/core/message.h" | ||||
| #include "utils/ms_exception.h" | |||||
| namespace mindspore { | namespace mindspore { | ||||
| namespace ps { | namespace ps { | ||||
| @@ -40,6 +40,7 @@ bool ServerNode::Start(const uint32_t &timeout) { | |||||
| FetchServers(client_to_scheduler_); | FetchServers(client_to_scheduler_); | ||||
| MS_LOG(INFO) << "Server node get all the servers address successful!"; | MS_LOG(INFO) << "Server node get all the servers address successful!"; | ||||
| } | } | ||||
| MsException::Instance().CheckException(); | |||||
| MS_LOG(INFO) << "Start the node is successful!"; | MS_LOG(INFO) << "Start the node is successful!"; | ||||
| return true; | return true; | ||||
| } | } | ||||
| @@ -141,17 +141,9 @@ void TcpClient::StartWithDelay(int seconds) { | |||||
| void TcpClient::Stop() { | void TcpClient::Stop() { | ||||
| std::lock_guard<std::mutex> lock(connection_mutex_); | std::lock_guard<std::mutex> lock(connection_mutex_); | ||||
| MS_LOG(INFO) << "Stop tcp client!"; | MS_LOG(INFO) << "Stop tcp client!"; | ||||
| if (event_base_got_break(event_base_)) { | |||||
| MS_LOG(DEBUG) << "The event base has stopped!"; | |||||
| is_stop_ = true; | |||||
| return; | |||||
| } | |||||
| if (!is_stop_.load()) { | |||||
| is_stop_ = true; | |||||
| int ret = event_base_loopbreak(event_base_); | |||||
| if (ret != 0) { | |||||
| MS_LOG(ERROR) << "Event base loop break failed!"; | |||||
| } | |||||
| int ret = event_base_loopbreak(event_base_); | |||||
| if (ret != 0) { | |||||
| MS_LOG(ERROR) << "Event base loop break failed!"; | |||||
| } | } | ||||
| } | } | ||||
| @@ -41,6 +41,7 @@ bool WorkerNode::Start(const uint32_t &timeout) { | |||||
| FetchServers(client_to_scheduler_); | FetchServers(client_to_scheduler_); | ||||
| MS_LOG(INFO) << "Worker node get all the servers address successful!"; | MS_LOG(INFO) << "Worker node get all the servers address successful!"; | ||||
| } | } | ||||
| MsException::Instance().CheckException(); | |||||
| MS_LOG(INFO) << "The Worker node has successfully started."; | MS_LOG(INFO) << "The Worker node has successfully started."; | ||||
| return true; | return true; | ||||
| } | } | ||||
| @@ -59,23 +60,17 @@ void WorkerNode::Initialize() { | |||||
| } | } | ||||
| bool WorkerNode::Stop() { | bool WorkerNode::Stop() { | ||||
| MS_LOG(INFO) << "Stop worker node!"; | |||||
| if (!is_already_stopped_.load()) { | if (!is_already_stopped_.load()) { | ||||
| MS_LOG(INFO) << "Stop worker node!"; | |||||
| is_ready_ = true; | is_ready_ = true; | ||||
| is_timeout_ = true; | is_timeout_ = true; | ||||
| is_finish_ = true; | is_finish_ = true; | ||||
| if (heart_beat_thread_->joinable()) { | |||||
| heart_beat_thread_->join(); | |||||
| } | |||||
| client_to_scheduler_->Stop(); | client_to_scheduler_->Stop(); | ||||
| if (!connected_nodes_.empty()) { | if (!connected_nodes_.empty()) { | ||||
| for (auto &connected_node : connected_nodes_) { | for (auto &connected_node : connected_nodes_) { | ||||
| connected_node.second->Stop(); | connected_node.second->Stop(); | ||||
| } | } | ||||
| } | } | ||||
| if (client_to_scheduler_thread_->joinable()) { | |||||
| client_to_scheduler_thread_->join(); | |||||
| } | |||||
| is_already_stopped_ = true; | is_already_stopped_ = true; | ||||
| } | } | ||||
| return true; | return true; | ||||