From 01707fc14debcafde0ef86ea03720bd25fe511c8 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 17 Apr 2024 15:02:49 +0200 Subject: [PATCH] Fix: Use `peer_addr` of incoming daemon register request for listen socket We are not interested in the local bind address of the daemon. Instead, we want to use the IP address under which the daemon is available from other machines. This should also avoids the issue that connecting to 0.0.0.0 is not possible on Windows (we want to use 0.0.0.0 as default bind address). --- binaries/coordinator/src/lib.rs | 37 ++++++++++++++-------- binaries/coordinator/src/listener.rs | 4 +-- binaries/daemon/src/coordinator.rs | 4 +-- binaries/daemon/src/inter_daemon.rs | 9 +++--- binaries/daemon/src/lib.rs | 4 +-- libraries/core/src/coordinator_messages.rs | 3 +- 6 files changed, 35 insertions(+), 26 deletions(-) diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index c7c303d1..d9b77fa9 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -171,29 +171,38 @@ async fn start_inner( machine_id, mut connection, dora_version: daemon_version, - listen_socket, + listen_port, } => { - let coordinator_version = &env!("CARGO_PKG_VERSION"); - let reply = if &daemon_version == coordinator_version { - RegisterResult::Ok + let coordinator_version: &&str = &env!("CARGO_PKG_VERSION"); + let version_check = if &daemon_version == coordinator_version { + Ok(()) } else { - RegisterResult::Err(format!( + Err(format!( "version mismatch: daemon v{daemon_version} is \ - not compatible with coordinator v{coordinator_version}" + not compatible with coordinator v{coordinator_version}" )) }; - let reply = Timestamped { - inner: reply, + let peer_ip = connection + .peer_addr() + .map(|addr| addr.ip()) + .map_err(|err| format!("failed to get peer addr of connection: {err}")); + let register_result = version_check.and(peer_ip); + + let reply: Timestamped = Timestamped { + inner: match ®ister_result { + Ok(_) => RegisterResult::Ok, + Err(err) => RegisterResult::Err(err.clone()), + }, timestamp: clock.new_timestamp(), }; let send_result = tcp_send(&mut connection, &serde_json::to_vec(&reply)?).await; - match (reply.inner, send_result) { - (RegisterResult::Ok, Ok(())) => { + match (register_result, send_result) { + (Ok(ip), Ok(())) => { let previous = daemon_connections.insert( machine_id.clone(), DaemonConnection { stream: connection, - listen_socket, + listen_socket: (ip, listen_port).into(), last_heartbeat: Instant::now(), }, ); @@ -203,10 +212,10 @@ async fn start_inner( ); } } - (RegisterResult::Err(err), _) => { + (Err(err), _) => { tracing::warn!("failed to register daemon connection for machine `{machine_id}`: {err}"); } - (RegisterResult::Ok, Err(err)) => { + (Ok(_), Err(err)) => { tracing::warn!("failed to confirm daemon connection for machine `{machine_id}`: {err}"); } } @@ -907,7 +916,7 @@ pub enum DaemonEvent { dora_version: String, machine_id: String, connection: TcpStream, - listen_socket: SocketAddr, + listen_port: u16, }, } diff --git a/binaries/coordinator/src/listener.rs b/binaries/coordinator/src/listener.rs index 824d1168..86600a4b 100644 --- a/binaries/coordinator/src/listener.rs +++ b/binaries/coordinator/src/listener.rs @@ -52,13 +52,13 @@ pub async fn handle_connection( coordinator_messages::CoordinatorRequest::Register { machine_id, dora_version, - listen_socket, + listen_port, } => { let event = DaemonEvent::Register { dora_version, machine_id, connection, - listen_socket, + listen_port, }; let _ = events_tx.send(Event::Daemon(event)).await; break; diff --git a/binaries/daemon/src/coordinator.rs b/binaries/daemon/src/coordinator.rs index 27ae8e49..d2f86b3c 100644 --- a/binaries/daemon/src/coordinator.rs +++ b/binaries/daemon/src/coordinator.rs @@ -24,7 +24,7 @@ pub struct CoordinatorEvent { pub async fn register( addr: SocketAddr, machine_id: String, - listen_socket: SocketAddr, + listen_port: u16, clock: &HLC, ) -> eyre::Result>> { let mut stream = TcpStream::connect(addr) @@ -37,7 +37,7 @@ pub async fn register( inner: CoordinatorRequest::Register { dora_version: env!("CARGO_PKG_VERSION").to_owned(), machine_id, - listen_socket, + listen_port, }, timestamp: clock.new_timestamp(), })?; diff --git a/binaries/daemon/src/inter_daemon.rs b/binaries/daemon/src/inter_daemon.rs index ed771e96..7eb4b948 100644 --- a/binaries/daemon/src/inter_daemon.rs +++ b/binaries/daemon/src/inter_daemon.rs @@ -64,23 +64,24 @@ pub async fn spawn_listener_loop( bind: SocketAddr, machine_id: String, events_tx: flume::Sender>, -) -> eyre::Result { +) -> eyre::Result { let socket = match TcpListener::bind(bind).await { Ok(socket) => socket, Err(err) => { return Err(eyre::Report::new(err).wrap_err("failed to create local TCP listener")) } }; - let socket_addr = socket + let listen_port = socket .local_addr() - .wrap_err("failed to get local addr of socket")?; + .wrap_err("failed to get local addr of socket")? + .port(); tokio::spawn(async move { listener_loop(socket, events_tx).await; tracing::debug!("inter-daemon listener loop finished for machine `{machine_id}`"); }); - Ok(socket_addr) + Ok(listen_port) } async fn listener_loop( diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 857ecd17..59addea1 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -88,7 +88,7 @@ impl Daemon { // spawn listen loop let (events_tx, events_rx) = flume::bounded(10); - let listen_socket = + let listen_port = inter_daemon::spawn_listener_loop(bind_addr, machine_id.clone(), events_tx).await?; let daemon_events = events_rx.into_stream().map(|e| Timestamped { inner: Event::Daemon(e.inner), @@ -97,7 +97,7 @@ impl Daemon { // connect to the coordinator let coordinator_events = - coordinator::register(coordinator_addr, machine_id.clone(), listen_socket, &clock) + coordinator::register(coordinator_addr, machine_id.clone(), listen_port, &clock) .await .wrap_err("failed to connect to dora-coordinator")? .map( diff --git a/libraries/core/src/coordinator_messages.rs b/libraries/core/src/coordinator_messages.rs index e83ee784..38e9eae2 100644 --- a/libraries/core/src/coordinator_messages.rs +++ b/libraries/core/src/coordinator_messages.rs @@ -1,13 +1,12 @@ use crate::daemon_messages::DataflowId; use eyre::eyre; -use std::net::SocketAddr; #[derive(Debug, serde::Serialize, serde::Deserialize)] pub enum CoordinatorRequest { Register { dora_version: String, machine_id: String, - listen_socket: SocketAddr, + listen_port: u16, }, Event { machine_id: String,