diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index c7c303d1..d9b77fa9 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -171,29 +171,38 @@ async fn start_inner( machine_id, mut connection, dora_version: daemon_version, - listen_socket, + listen_port, } => { - let coordinator_version = &env!("CARGO_PKG_VERSION"); - let reply = if &daemon_version == coordinator_version { - RegisterResult::Ok + let coordinator_version: &&str = &env!("CARGO_PKG_VERSION"); + let version_check = if &daemon_version == coordinator_version { + Ok(()) } else { - RegisterResult::Err(format!( + Err(format!( "version mismatch: daemon v{daemon_version} is \ - not compatible with coordinator v{coordinator_version}" + not compatible with coordinator v{coordinator_version}" )) }; - let reply = Timestamped { - inner: reply, + let peer_ip = connection + .peer_addr() + .map(|addr| addr.ip()) + .map_err(|err| format!("failed to get peer addr of connection: {err}")); + let register_result = version_check.and(peer_ip); + + let reply: Timestamped = Timestamped { + inner: match ®ister_result { + Ok(_) => RegisterResult::Ok, + Err(err) => RegisterResult::Err(err.clone()), + }, timestamp: clock.new_timestamp(), }; let send_result = tcp_send(&mut connection, &serde_json::to_vec(&reply)?).await; - match (reply.inner, send_result) { - (RegisterResult::Ok, Ok(())) => { + match (register_result, send_result) { + (Ok(ip), Ok(())) => { let previous = daemon_connections.insert( machine_id.clone(), DaemonConnection { stream: connection, - listen_socket, + listen_socket: (ip, listen_port).into(), last_heartbeat: Instant::now(), }, ); @@ -203,10 +212,10 @@ async fn start_inner( ); } } - (RegisterResult::Err(err), _) => { + (Err(err), _) => { tracing::warn!("failed to register daemon connection for machine `{machine_id}`: {err}"); } - (RegisterResult::Ok, Err(err)) => { + (Ok(_), Err(err)) => { tracing::warn!("failed to confirm daemon connection for machine `{machine_id}`: {err}"); } } @@ -907,7 +916,7 @@ pub enum DaemonEvent { dora_version: String, machine_id: String, connection: TcpStream, - listen_socket: SocketAddr, + listen_port: u16, }, } diff --git a/binaries/coordinator/src/listener.rs b/binaries/coordinator/src/listener.rs index 824d1168..86600a4b 100644 --- a/binaries/coordinator/src/listener.rs +++ b/binaries/coordinator/src/listener.rs @@ -52,13 +52,13 @@ pub async fn handle_connection( coordinator_messages::CoordinatorRequest::Register { machine_id, dora_version, - listen_socket, + listen_port, } => { let event = DaemonEvent::Register { dora_version, machine_id, connection, - listen_socket, + listen_port, }; let _ = events_tx.send(Event::Daemon(event)).await; break; diff --git a/binaries/daemon/src/coordinator.rs b/binaries/daemon/src/coordinator.rs index 27ae8e49..d2f86b3c 100644 --- a/binaries/daemon/src/coordinator.rs +++ b/binaries/daemon/src/coordinator.rs @@ -24,7 +24,7 @@ pub struct CoordinatorEvent { pub async fn register( addr: SocketAddr, machine_id: String, - listen_socket: SocketAddr, + listen_port: u16, clock: &HLC, ) -> eyre::Result>> { let mut stream = TcpStream::connect(addr) @@ -37,7 +37,7 @@ pub async fn register( inner: CoordinatorRequest::Register { dora_version: env!("CARGO_PKG_VERSION").to_owned(), machine_id, - listen_socket, + listen_port, }, timestamp: clock.new_timestamp(), })?; diff --git a/binaries/daemon/src/inter_daemon.rs b/binaries/daemon/src/inter_daemon.rs index ed771e96..7eb4b948 100644 --- a/binaries/daemon/src/inter_daemon.rs +++ b/binaries/daemon/src/inter_daemon.rs @@ -64,23 +64,24 @@ pub async fn spawn_listener_loop( bind: SocketAddr, machine_id: String, events_tx: flume::Sender>, -) -> eyre::Result { +) -> eyre::Result { let socket = match TcpListener::bind(bind).await { Ok(socket) => socket, Err(err) => { return Err(eyre::Report::new(err).wrap_err("failed to create local TCP listener")) } }; - let socket_addr = socket + let listen_port = socket .local_addr() - .wrap_err("failed to get local addr of socket")?; + .wrap_err("failed to get local addr of socket")? + .port(); tokio::spawn(async move { listener_loop(socket, events_tx).await; tracing::debug!("inter-daemon listener loop finished for machine `{machine_id}`"); }); - Ok(socket_addr) + Ok(listen_port) } async fn listener_loop( diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 857ecd17..59addea1 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -88,7 +88,7 @@ impl Daemon { // spawn listen loop let (events_tx, events_rx) = flume::bounded(10); - let listen_socket = + let listen_port = inter_daemon::spawn_listener_loop(bind_addr, machine_id.clone(), events_tx).await?; let daemon_events = events_rx.into_stream().map(|e| Timestamped { inner: Event::Daemon(e.inner), @@ -97,7 +97,7 @@ impl Daemon { // connect to the coordinator let coordinator_events = - coordinator::register(coordinator_addr, machine_id.clone(), listen_socket, &clock) + coordinator::register(coordinator_addr, machine_id.clone(), listen_port, &clock) .await .wrap_err("failed to connect to dora-coordinator")? .map( diff --git a/libraries/core/src/coordinator_messages.rs b/libraries/core/src/coordinator_messages.rs index e83ee784..38e9eae2 100644 --- a/libraries/core/src/coordinator_messages.rs +++ b/libraries/core/src/coordinator_messages.rs @@ -1,13 +1,12 @@ use crate::daemon_messages::DataflowId; use eyre::eyre; -use std::net::SocketAddr; #[derive(Debug, serde::Serialize, serde::Deserialize)] pub enum CoordinatorRequest { Register { dora_version: String, machine_id: String, - listen_socket: SocketAddr, + listen_port: u16, }, Event { machine_id: String,