From: @anancds Reviewed-by: @limingqi107 Signed-off-by:tags/v1.2.0-rc1
| @@ -50,6 +50,9 @@ void AbstractNode::ProcessRegisterResp(std::shared_ptr<MessageMeta> meta, const | |||||
| << " is not match the current node id:" << node_info_.node_id_; | << " is not match the current node id:" << node_info_.node_id_; | ||||
| } | } | ||||
| if (register_resp_message.rank_id() < 0) { | |||||
| MS_LOG(EXCEPTION) << "The rank id is wrong."; | |||||
| } | |||||
| node_info_.rank_id_ = register_resp_message.rank_id(); | node_info_.rank_id_ = register_resp_message.rank_id(); | ||||
| MS_LOG(INFO) << "The node id is:" << node_info_.node_id_ << ", and the rank id is:" << node_info_.rank_id_ | MS_LOG(INFO) << "The node id is:" << node_info_.node_id_ << ", and the rank id is:" << node_info_.rank_id_ | ||||
| @@ -39,6 +39,11 @@ int NodeManager::NextRankId(const RegisterMessage ®ister_message) { | |||||
| uint32_t port = register_message.port(); | uint32_t port = register_message.port(); | ||||
| rank_id = ++next_server_rank_id_; | rank_id = ++next_server_rank_id_; | ||||
| if (IntToUint(rank_id) >= ClusterMetadata::instance()->total_server_num()) { | |||||
| MS_LOG(WARNING) << "The rank id is greater than the number of servers."; | |||||
| rank_id = -1; | |||||
| --next_server_rank_id_; | |||||
| } | |||||
| NodeInfo node_info; | NodeInfo node_info; | ||||
| node_info.node_role_ = NodeRole::SERVER; | node_info.node_role_ = NodeRole::SERVER; | ||||
| node_info.node_id_ = node_id; | node_info.node_id_ = node_id; | ||||
| @@ -50,6 +55,11 @@ int NodeManager::NextRankId(const RegisterMessage ®ister_message) { | |||||
| << " assign rank id:" << rank_id; | << " assign rank id:" << rank_id; | ||||
| } else if (register_message.role() == NodeRole::WORKER) { | } else if (register_message.role() == NodeRole::WORKER) { | ||||
| rank_id = ++next_worker_rank_id_; | rank_id = ++next_worker_rank_id_; | ||||
| if (IntToUint(rank_id) >= ClusterMetadata::instance()->total_worker_num()) { | |||||
| MS_LOG(WARNING) << "The rank id is greater than the number of workers."; | |||||
| rank_id = -1; | |||||
| --next_worker_rank_id_; | |||||
| } | |||||
| NodeInfo node_info; | NodeInfo node_info; | ||||
| node_info.node_role_ = NodeRole::WORKER; | node_info.node_role_ = NodeRole::WORKER; | ||||
| node_info.node_id_ = node_id; | node_info.node_id_ = node_id; | ||||
| @@ -120,7 +120,7 @@ void SchedulerNode::ProcessRegister(std::shared_ptr<TcpServer> server, std::shar | |||||
| // assign worker node and server node rank id | // assign worker node and server node rank id | ||||
| int rank_id = node_manager_.NextRankId(register_message); | int rank_id = node_manager_.NextRankId(register_message); | ||||
| if (rank_id < 0) { | if (rank_id < 0) { | ||||
| MS_LOG(EXCEPTION) << "The rank id is wrong!"; | |||||
| MS_LOG(WARNING) << "The rank id is wrong!"; | |||||
| } | } | ||||
| const std::string &node_id = register_message.node_id(); | const std::string &node_id = register_message.node_id(); | ||||
| node_manager_.UpdateHeartbeat(node_id); | node_manager_.UpdateHeartbeat(node_id); | ||||