|
|
|
@@ -69,9 +69,18 @@ bool AbstractNode::Broadcast(const NodeRole &node_role, const DataPtr &message, |
|
|
|
MS_LOG(EXCEPTION) << "Currently only supports broadcast to server nodes"; |
|
|
|
} |
|
|
|
|
|
|
|
uint64_t request_id = AddMessageTrack(nodes_address_.size()); |
|
|
|
uint32_t broadcast_size = 0; |
|
|
|
std::for_each(nodes_address_.begin(), nodes_address_.end(), [&broadcast_size, &node_role](const auto &addr) { |
|
|
|
if (addr.first.first == node_role) { |
|
|
|
++broadcast_size; |
|
|
|
} |
|
|
|
}); |
|
|
|
uint64_t request_id = AddMessageTrack(broadcast_size); |
|
|
|
|
|
|
|
for (auto it = nodes_address_.begin(); it != nodes_address_.end(); ++it) { |
|
|
|
if (it->first.first != node_role) { |
|
|
|
continue; |
|
|
|
} |
|
|
|
auto message_meta = std::make_shared<MessageMeta>(); |
|
|
|
MS_EXCEPTION_IF_NULL(message_meta); |
|
|
|
message_meta->set_cmd(NodeCommand::SEND_DATA); |
|
|
|
@@ -626,7 +635,7 @@ void AbstractNode::ProcessFetchServersResp(const std::shared_ptr<MessageMeta> &m |
|
|
|
|
|
|
|
nodes_address_.clear(); |
|
|
|
for (const auto &it : fetch_servers_resp_message.servers_meta()) { |
|
|
|
nodes_address_[std::make_pair(NodeRole::SERVER, it.rank_id())] = std::make_pair(it.ip(), it.port()); |
|
|
|
nodes_address_[std::make_pair(it.role(), it.rank_id())] = std::make_pair(it.ip(), it.port()); |
|
|
|
MS_LOG(INFO) << "The server ip is:" << it.ip() << ", the port is:" << it.port(); |
|
|
|
} |
|
|
|
} |
|
|
|
@@ -862,7 +871,7 @@ const std::shared_ptr<TcpClient> &AbstractNode::GetOrCreateTcpClient(const uint3 |
|
|
|
return connected_nodes_[key]; |
|
|
|
} else { |
|
|
|
if (nodes_address_.find(key) == nodes_address_.end()) { |
|
|
|
MS_LOG(EXCEPTION) << "Worker receive nodes info from scheduler failed!"; |
|
|
|
MS_LOG(EXCEPTION) << "Worker receive nodes info from scheduler failed. Role: " << role << ", rank: " << rank_id; |
|
|
|
} |
|
|
|
if (config_ == nullptr) { |
|
|
|
MS_LOG(EXCEPTION) << "The config is empty."; |
|
|
|
|