| @@ -390,11 +390,9 @@ bool AbstractNode::Disconnect(const std::shared_ptr<TcpClient> &client, const ui | |||||
| auto meta = std::make_shared<MessageMeta>(); | auto meta = std::make_shared<MessageMeta>(); | ||||
| meta->set_cmd(NodeCommand::FINISH); | meta->set_cmd(NodeCommand::FINISH); | ||||
| FinishMessage finish_message; | |||||
| finish_message.set_node_id(node_info_.node_id_); | |||||
| std::string finish_message = node_info_.node_id_; | |||||
| if (!SendMessageSync(client, meta, Protos::PROTOBUF, finish_message.SerializeAsString().data(), | |||||
| finish_message.ByteSizeLong())) { | |||||
| if (!SendMessageSync(client, meta, Protos::RAW, finish_message.data(), finish_message.length())) { | |||||
| MS_LOG(WARNING) << "The node role:" << CommUtil::NodeRoleToString(node_info_.node_role_) | MS_LOG(WARNING) << "The node role:" << CommUtil::NodeRoleToString(node_info_.node_role_) | ||||
| << " the node id:" << node_info_.node_id_ << " send Finish Message timeout!"; | << " the node id:" << node_info_.node_id_ << " send Finish Message timeout!"; | ||||
| } | } | ||||
| @@ -142,9 +142,6 @@ bool CommUtil::Retry(const std::function<bool()> &func, size_t max_attempts, siz | |||||
| void CommUtil::LogCallback(int severity, const char *msg) { | void CommUtil::LogCallback(int severity, const char *msg) { | ||||
| switch (severity) { | switch (severity) { | ||||
| case EVENT_LOG_DEBUG: | |||||
| MS_LOG(DEBUG) << kLibeventLogPrefix << msg; | |||||
| break; | |||||
| case EVENT_LOG_MSG: | case EVENT_LOG_MSG: | ||||
| MS_LOG(INFO) << kLibeventLogPrefix << msg; | MS_LOG(INFO) << kLibeventLogPrefix << msg; | ||||
| break; | break; | ||||
| @@ -155,7 +152,6 @@ void CommUtil::LogCallback(int severity, const char *msg) { | |||||
| MS_LOG(ERROR) << kLibeventLogPrefix << msg; | MS_LOG(ERROR) << kLibeventLogPrefix << msg; | ||||
| break; | break; | ||||
| default: | default: | ||||
| MS_LOG(WARNING) << kLibeventLogPrefix << msg; | |||||
| break; | break; | ||||
| } | } | ||||
| } | } | ||||
| @@ -110,7 +110,7 @@ void NodeManager::UpdateClusterState() { | |||||
| } | } | ||||
| } | } | ||||
| if (!timeout_nodes_info_.empty()) { | if (!timeout_nodes_info_.empty()) { | ||||
| is_cluster_timeout_ = true; | |||||
| is_node_timeout_ = true; | |||||
| for (auto it = timeout_nodes_info_.begin(); it != timeout_nodes_info_.end(); ++it) { | for (auto it = timeout_nodes_info_.begin(); it != timeout_nodes_info_.end(); ++it) { | ||||
| finish_nodes_id_.insert(it->first); | finish_nodes_id_.insert(it->first); | ||||
| } | } | ||||
| @@ -138,9 +138,7 @@ void NodeManager::CheckClusterTimeout() { | |||||
| } | } | ||||
| } | } | ||||
| void NodeManager::AddFinishNode(const FinishMessage &finish_message) { | |||||
| finish_nodes_id_.insert(finish_message.node_id()); | |||||
| } | |||||
| void NodeManager::AddFinishNode(const std::string &finish_message) { finish_nodes_id_.insert(finish_message); } | |||||
| std::unordered_map<std::string, NodeInfo> NodeManager::nodes_info() { return nodes_info_; } | std::unordered_map<std::string, NodeInfo> NodeManager::nodes_info() { return nodes_info_; } | ||||
| @@ -61,7 +61,7 @@ class NodeManager { | |||||
| std::vector<ServersMeta> FetchServersMeta(); | std::vector<ServersMeta> FetchServersMeta(); | ||||
| void UpdateClusterState(); | void UpdateClusterState(); | ||||
| void CheckClusterTimeout(); | void CheckClusterTimeout(); | ||||
| void AddFinishNode(const FinishMessage &finish_message); | |||||
| void AddFinishNode(const std::string &finish_message); | |||||
| std::unordered_map<std::string, NodeInfo> nodes_info(); | std::unordered_map<std::string, NodeInfo> nodes_info(); | ||||
| bool is_cluster_ready(); | bool is_cluster_ready(); | ||||
| bool is_cluster_finish(); | bool is_cluster_finish(); | ||||
| @@ -139,10 +139,9 @@ void SchedulerNode::ProcessFinish(std::shared_ptr<TcpServer> server, std::shared | |||||
| MS_EXCEPTION_IF_NULL(conn); | MS_EXCEPTION_IF_NULL(conn); | ||||
| MS_EXCEPTION_IF_NULL(meta); | MS_EXCEPTION_IF_NULL(meta); | ||||
| MS_EXCEPTION_IF_NULL(data); | MS_EXCEPTION_IF_NULL(data); | ||||
| FinishMessage finish_message; | |||||
| finish_message.ParseFromArray(data, size); | |||||
| node_manager_.AddFinishNode(finish_message); | |||||
| MS_LOG(INFO) << "Process finish message from node id:" << finish_message.node_id(); | |||||
| auto finish_message = std::make_unique<std::string>(reinterpret_cast<const char *>(data), size); | |||||
| node_manager_.AddFinishNode(*finish_message); | |||||
| MS_LOG(INFO) << "Process finish message from node id:" << *finish_message; | |||||
| server->SendMessage(conn, meta, Protos::PROTOBUF, data, size); | server->SendMessage(conn, meta, Protos::PROTOBUF, data, size); | ||||
| } | } | ||||
| @@ -18,11 +18,6 @@ | |||||
| namespace mindspore { | namespace mindspore { | ||||
| namespace ps { | namespace ps { | ||||
| namespace core { | namespace core { | ||||
| ServerNode::~ServerNode() { | |||||
| MS_LOG(INFO) << "Stop server node!"; | |||||
| Stop(); | |||||
| } | |||||
| bool ServerNode::Start(const uint32_t &timeout) { | bool ServerNode::Start(const uint32_t &timeout) { | ||||
| MS_LOG(INFO) << "Start server node!"; | MS_LOG(INFO) << "Start server node!"; | ||||
| Initialize(); | Initialize(); | ||||
| @@ -36,7 +36,7 @@ namespace core { | |||||
| class ServerNode : public AbstractNode { | class ServerNode : public AbstractNode { | ||||
| public: | public: | ||||
| ServerNode() : server_(nullptr), server_thread_(nullptr) {} | ServerNode() : server_(nullptr), server_thread_(nullptr) {} | ||||
| ~ServerNode() override; | |||||
| ~ServerNode() override = default; | |||||
| bool Start(const uint32_t &timeout = ClusterMetadata::instance()->cluster_available_timeout()) override; | bool Start(const uint32_t &timeout = ClusterMetadata::instance()->cluster_available_timeout()) override; | ||||
| bool Stop() override; | bool Stop() override; | ||||
| @@ -20,10 +20,6 @@ | |||||
| namespace mindspore { | namespace mindspore { | ||||
| namespace ps { | namespace ps { | ||||
| namespace core { | namespace core { | ||||
| WorkerNode::~WorkerNode() { | |||||
| MS_LOG(INFO) << "Stop worker node!"; | |||||
| Stop(); | |||||
| } | |||||
| bool WorkerNode::Start(const uint32_t &timeout) { | bool WorkerNode::Start(const uint32_t &timeout) { | ||||
| MS_LOG(INFO) << "Starting worker node!"; | MS_LOG(INFO) << "Starting worker node!"; | ||||
| Initialize(); | Initialize(); | ||||
| @@ -35,7 +35,7 @@ namespace core { | |||||
| class WorkerNode : public AbstractNode { | class WorkerNode : public AbstractNode { | ||||
| public: | public: | ||||
| WorkerNode() = default; | WorkerNode() = default; | ||||
| ~WorkerNode() override; | |||||
| ~WorkerNode() override = default; | |||||
| bool Start(const uint32_t &timeout = ClusterMetadata::instance()->cluster_available_timeout()) override; | bool Start(const uint32_t &timeout = ClusterMetadata::instance()->cluster_available_timeout()) override; | ||||
| bool Stop() override; | bool Stop() override; | ||||