Browse Source

Fix: Use `peer_addr` of incoming daemon register request for listen socket

We are not interested in the local bind address of the daemon. Instead, we want to use the IP address under which the daemon is available from other machines.

This should also avoids the issue that connecting to 0.0.0.0 is not possible on Windows (we want to use 0.0.0.0 as default bind address).
tags/v0.3.4-rc1
Philipp Oppermann 1 year ago
parent
commit
01707fc14d
Failed to extract signature
6 changed files with 35 additions and 26 deletions
  1. +23
    -14
      binaries/coordinator/src/lib.rs
  2. +2
    -2
      binaries/coordinator/src/listener.rs
  3. +2
    -2
      binaries/daemon/src/coordinator.rs
  4. +5
    -4
      binaries/daemon/src/inter_daemon.rs
  5. +2
    -2
      binaries/daemon/src/lib.rs
  6. +1
    -2
      libraries/core/src/coordinator_messages.rs

+ 23
- 14
binaries/coordinator/src/lib.rs View File

@@ -171,29 +171,38 @@ async fn start_inner(
machine_id,
mut connection,
dora_version: daemon_version,
listen_socket,
listen_port,
} => {
let coordinator_version = &env!("CARGO_PKG_VERSION");
let reply = if &daemon_version == coordinator_version {
RegisterResult::Ok
let coordinator_version: &&str = &env!("CARGO_PKG_VERSION");
let version_check = if &daemon_version == coordinator_version {
Ok(())
} else {
RegisterResult::Err(format!(
Err(format!(
"version mismatch: daemon v{daemon_version} is \
not compatible with coordinator v{coordinator_version}"
not compatible with coordinator v{coordinator_version}"
))
};
let reply = Timestamped {
inner: reply,
let peer_ip = connection
.peer_addr()
.map(|addr| addr.ip())
.map_err(|err| format!("failed to get peer addr of connection: {err}"));
let register_result = version_check.and(peer_ip);

let reply: Timestamped<RegisterResult> = Timestamped {
inner: match &register_result {
Ok(_) => RegisterResult::Ok,
Err(err) => RegisterResult::Err(err.clone()),
},
timestamp: clock.new_timestamp(),
};
let send_result = tcp_send(&mut connection, &serde_json::to_vec(&reply)?).await;
match (reply.inner, send_result) {
(RegisterResult::Ok, Ok(())) => {
match (register_result, send_result) {
(Ok(ip), Ok(())) => {
let previous = daemon_connections.insert(
machine_id.clone(),
DaemonConnection {
stream: connection,
listen_socket,
listen_socket: (ip, listen_port).into(),
last_heartbeat: Instant::now(),
},
);
@@ -203,10 +212,10 @@ async fn start_inner(
);
}
}
(RegisterResult::Err(err), _) => {
(Err(err), _) => {
tracing::warn!("failed to register daemon connection for machine `{machine_id}`: {err}");
}
(RegisterResult::Ok, Err(err)) => {
(Ok(_), Err(err)) => {
tracing::warn!("failed to confirm daemon connection for machine `{machine_id}`: {err}");
}
}
@@ -907,7 +916,7 @@ pub enum DaemonEvent {
dora_version: String,
machine_id: String,
connection: TcpStream,
listen_socket: SocketAddr,
listen_port: u16,
},
}



+ 2
- 2
binaries/coordinator/src/listener.rs View File

@@ -52,13 +52,13 @@ pub async fn handle_connection(
coordinator_messages::CoordinatorRequest::Register {
machine_id,
dora_version,
listen_socket,
listen_port,
} => {
let event = DaemonEvent::Register {
dora_version,
machine_id,
connection,
listen_socket,
listen_port,
};
let _ = events_tx.send(Event::Daemon(event)).await;
break;


+ 2
- 2
binaries/daemon/src/coordinator.rs View File

@@ -24,7 +24,7 @@ pub struct CoordinatorEvent {
pub async fn register(
addr: SocketAddr,
machine_id: String,
listen_socket: SocketAddr,
listen_port: u16,
clock: &HLC,
) -> eyre::Result<impl Stream<Item = Timestamped<CoordinatorEvent>>> {
let mut stream = TcpStream::connect(addr)
@@ -37,7 +37,7 @@ pub async fn register(
inner: CoordinatorRequest::Register {
dora_version: env!("CARGO_PKG_VERSION").to_owned(),
machine_id,
listen_socket,
listen_port,
},
timestamp: clock.new_timestamp(),
})?;


+ 5
- 4
binaries/daemon/src/inter_daemon.rs View File

@@ -64,23 +64,24 @@ pub async fn spawn_listener_loop(
bind: SocketAddr,
machine_id: String,
events_tx: flume::Sender<Timestamped<InterDaemonEvent>>,
) -> eyre::Result<SocketAddr> {
) -> eyre::Result<u16> {
let socket = match TcpListener::bind(bind).await {
Ok(socket) => socket,
Err(err) => {
return Err(eyre::Report::new(err).wrap_err("failed to create local TCP listener"))
}
};
let socket_addr = socket
let listen_port = socket
.local_addr()
.wrap_err("failed to get local addr of socket")?;
.wrap_err("failed to get local addr of socket")?
.port();

tokio::spawn(async move {
listener_loop(socket, events_tx).await;
tracing::debug!("inter-daemon listener loop finished for machine `{machine_id}`");
});

Ok(socket_addr)
Ok(listen_port)
}

async fn listener_loop(


+ 2
- 2
binaries/daemon/src/lib.rs View File

@@ -88,7 +88,7 @@ impl Daemon {

// spawn listen loop
let (events_tx, events_rx) = flume::bounded(10);
let listen_socket =
let listen_port =
inter_daemon::spawn_listener_loop(bind_addr, machine_id.clone(), events_tx).await?;
let daemon_events = events_rx.into_stream().map(|e| Timestamped {
inner: Event::Daemon(e.inner),
@@ -97,7 +97,7 @@ impl Daemon {

// connect to the coordinator
let coordinator_events =
coordinator::register(coordinator_addr, machine_id.clone(), listen_socket, &clock)
coordinator::register(coordinator_addr, machine_id.clone(), listen_port, &clock)
.await
.wrap_err("failed to connect to dora-coordinator")?
.map(


+ 1
- 2
libraries/core/src/coordinator_messages.rs View File

@@ -1,13 +1,12 @@
use crate::daemon_messages::DataflowId;
use eyre::eyre;
use std::net::SocketAddr;

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub enum CoordinatorRequest {
Register {
dora_version: String,
machine_id: String,
listen_socket: SocketAddr,
listen_port: u16,
},
Event {
machine_id: String,


Loading…
Cancel
Save