Browse Source

Merge pull request #471 from Michael-J-Ward/configurable-bind-mounts

Configurable bind addrs
tags/v0.3.4-rc1
Philipp Oppermann GitHub 1 year ago
parent
commit
bbabdc389e
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
8 changed files with 85 additions and 56 deletions
  1. +20
    -6
      binaries/cli/src/main.rs
  2. +33
    -22
      binaries/coordinator/src/lib.rs
  3. +5
    -6
      binaries/coordinator/src/listener.rs
  4. +2
    -2
      binaries/daemon/src/coordinator.rs
  5. +8
    -11
      binaries/daemon/src/inter_daemon.rs
  6. +8
    -4
      binaries/daemon/src/lib.rs
  7. +8
    -3
      examples/multiple-daemons/run.rs
  8. +1
    -2
      libraries/core/src/coordinator_messages.rs

+ 20
- 6
binaries/cli/src/main.rs View File

@@ -1,4 +1,7 @@
use std::{net::Ipv4Addr, path::PathBuf};
use std::{
net::{IpAddr, Ipv4Addr},
path::PathBuf,
};

use attach::attach_dataflow;
use clap::Parser;
@@ -103,6 +106,11 @@ enum Command {
Daemon {
#[clap(long)]
machine_id: Option<String>,
/// The IP address and port this daemon will bind to.
#[clap(long, default_value_t = SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0)
)]
addr: SocketAddr,
#[clap(long)]
coordinator_addr: Option<SocketAddr>,

@@ -112,7 +120,12 @@ enum Command {
/// Run runtime
Runtime,
/// Run coordinator
Coordinator { port: Option<u16> },
Coordinator {
#[clap(long, default_value_t = SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), DORA_COORDINATOR_PORT_DEFAULT)
)]
addr: SocketAddr,
},
}

#[derive(Debug, clap::Args)]
@@ -266,20 +279,21 @@ fn run() -> eyre::Result<()> {
}
}
Command::Destroy { config } => up::destroy(config.as_deref())?,
Command::Coordinator { port } => {
Command::Coordinator { addr } => {
let rt = Builder::new_multi_thread()
.enable_all()
.build()
.context("tokio runtime failed")?;
rt.block_on(async {
let (_port, task) =
dora_coordinator::start(port, futures::stream::empty::<Event>()).await?;
dora_coordinator::start(addr, futures::stream::empty::<Event>()).await?;
task.await
})
.context("failed to run dora-coordinator")?
}
Command::Daemon {
coordinator_addr,
addr,
machine_id,
run_dataflow,
} => {
@@ -301,12 +315,12 @@ fn run() -> eyre::Result<()> {
Daemon::run_dataflow(&dataflow_path).await
}
None => {
let addr = coordinator_addr.unwrap_or_else(|| {
let coordination_addr = coordinator_addr.unwrap_or_else(|| {
tracing::info!("Starting in local mode");
let localhost = Ipv4Addr::new(127, 0, 0, 1);
(localhost, DORA_COORDINATOR_PORT_DEFAULT).into()
});
Daemon::run(addr, machine_id.unwrap_or_default()).await
Daemon::run(coordination_addr, machine_id.unwrap_or_default(), addr).await
}
}
})


+ 33
- 22
binaries/coordinator/src/lib.rs View File

@@ -9,10 +9,7 @@ use dora_core::{
daemon_messages::{DaemonCoordinatorEvent, DaemonCoordinatorReply, Timestamped},
descriptor::{Descriptor, ResolvedNode},
message::uhlc::{self, HLC},
topics::{
control_socket_addr, ControlRequest, ControlRequestReply, DataflowId,
DORA_COORDINATOR_PORT_DEFAULT,
},
topics::{control_socket_addr, ControlRequest, ControlRequestReply, DataflowId},
};
use eyre::{bail, eyre, ContextCompat, WrapErr};
use futures::{stream::FuturesUnordered, Future, Stream, StreamExt};
@@ -39,11 +36,10 @@ mod run;
mod tcp_utils;

pub async fn start(
port: Option<u16>,
bind: SocketAddr,
external_events: impl Stream<Item = Event> + Unpin,
) -> Result<(u16, impl Future<Output = eyre::Result<()>>), eyre::ErrReport> {
let port = port.unwrap_or(DORA_COORDINATOR_PORT_DEFAULT);
let listener = listener::create_listener(port).await?;
let listener = listener::create_listener(bind).await?;
let port = listener
.local_addr()
.wrap_err("failed to get local addr of listener")?
@@ -175,29 +171,38 @@ async fn start_inner(
machine_id,
mut connection,
dora_version: daemon_version,
listen_socket,
listen_port,
} => {
let coordinator_version = &env!("CARGO_PKG_VERSION");
let reply = if &daemon_version == coordinator_version {
RegisterResult::Ok
let coordinator_version: &&str = &env!("CARGO_PKG_VERSION");
let version_check = if &daemon_version == coordinator_version {
Ok(())
} else {
RegisterResult::Err(format!(
Err(format!(
"version mismatch: daemon v{daemon_version} is \
not compatible with coordinator v{coordinator_version}"
not compatible with coordinator v{coordinator_version}"
))
};
let reply = Timestamped {
inner: reply,
let peer_ip = connection
.peer_addr()
.map(|addr| addr.ip())
.map_err(|err| format!("failed to get peer addr of connection: {err}"));
let register_result = version_check.and(peer_ip);

let reply: Timestamped<RegisterResult> = Timestamped {
inner: match &register_result {
Ok(_) => RegisterResult::Ok,
Err(err) => RegisterResult::Err(err.clone()),
},
timestamp: clock.new_timestamp(),
};
let send_result = tcp_send(&mut connection, &serde_json::to_vec(&reply)?).await;
match (reply.inner, send_result) {
(RegisterResult::Ok, Ok(())) => {
match (register_result, send_result) {
(Ok(ip), Ok(())) => {
let previous = daemon_connections.insert(
machine_id.clone(),
DaemonConnection {
stream: connection,
listen_socket,
listen_socket: (ip, listen_port).into(),
last_heartbeat: Instant::now(),
},
);
@@ -207,10 +212,10 @@ async fn start_inner(
);
}
}
(RegisterResult::Err(err), _) => {
(Err(err), _) => {
tracing::warn!("failed to register daemon connection for machine `{machine_id}`: {err}");
}
(RegisterResult::Ok, Err(err)) => {
(Ok(_), Err(err)) => {
tracing::warn!("failed to confirm daemon connection for machine `{machine_id}`: {err}");
}
}
@@ -481,6 +486,12 @@ async fn start_inner(
let mut disconnected = BTreeSet::new();
for (machine_id, connection) in &mut daemon_connections {
if connection.last_heartbeat.elapsed() > Duration::from_secs(15) {
tracing::warn!(
"no heartbeat message from machine `{machine_id}` since {:?}",
connection.last_heartbeat.elapsed()
)
}
if connection.last_heartbeat.elapsed() > Duration::from_secs(30) {
disconnected.insert(machine_id.clone());
continue;
}
@@ -500,7 +511,7 @@ async fn start_inner(
}
}
if !disconnected.is_empty() {
tracing::info!("Disconnecting daemons that failed watchdog: {disconnected:?}");
tracing::error!("Disconnecting daemons that failed watchdog: {disconnected:?}");
for machine_id in disconnected {
daemon_connections.remove(&machine_id);
}
@@ -905,7 +916,7 @@ pub enum DaemonEvent {
dora_version: String,
machine_id: String,
connection: TcpStream,
listen_socket: SocketAddr,
listen_port: u16,
},
}



+ 5
- 6
binaries/coordinator/src/listener.rs View File

@@ -1,15 +1,14 @@
use crate::{tcp_utils::tcp_receive, DaemonEvent, DataflowEvent, Event};
use dora_core::{coordinator_messages, daemon_messages::Timestamped, message::uhlc::HLC};
use eyre::{eyre, Context};
use std::{io::ErrorKind, net::Ipv4Addr, sync::Arc};
use std::{io::ErrorKind, net::SocketAddr, sync::Arc};
use tokio::{
net::{TcpListener, TcpStream},
sync::mpsc,
};

pub async fn create_listener(port: u16) -> eyre::Result<TcpListener> {
let localhost = Ipv4Addr::new(127, 0, 0, 1);
let socket = match TcpListener::bind((localhost, port)).await {
pub async fn create_listener(bind: SocketAddr) -> eyre::Result<TcpListener> {
let socket = match TcpListener::bind(bind).await {
Ok(socket) => socket,
Err(err) => {
return Err(eyre::Report::new(err).wrap_err("failed to create local TCP listener"))
@@ -53,13 +52,13 @@ pub async fn handle_connection(
coordinator_messages::CoordinatorRequest::Register {
machine_id,
dora_version,
listen_socket,
listen_port,
} => {
let event = DaemonEvent::Register {
dora_version,
machine_id,
connection,
listen_socket,
listen_port,
};
let _ = events_tx.send(Event::Daemon(event)).await;
break;


+ 2
- 2
binaries/daemon/src/coordinator.rs View File

@@ -24,7 +24,7 @@ pub struct CoordinatorEvent {
pub async fn register(
addr: SocketAddr,
machine_id: String,
listen_socket: SocketAddr,
listen_port: u16,
clock: &HLC,
) -> eyre::Result<impl Stream<Item = Timestamped<CoordinatorEvent>>> {
let mut stream = TcpStream::connect(addr)
@@ -37,7 +37,7 @@ pub async fn register(
inner: CoordinatorRequest::Register {
dora_version: env!("CARGO_PKG_VERSION").to_owned(),
machine_id,
listen_socket,
listen_port,
},
timestamp: clock.new_timestamp(),
})?;


+ 8
- 11
binaries/daemon/src/inter_daemon.rs View File

@@ -1,11 +1,7 @@
use crate::tcp_utils::{tcp_receive, tcp_send};
use dora_core::daemon_messages::{InterDaemonEvent, Timestamped};
use eyre::{Context, ContextCompat};
use std::{
collections::BTreeMap,
io::ErrorKind,
net::{Ipv4Addr, SocketAddr},
};
use std::{collections::BTreeMap, io::ErrorKind, net::SocketAddr};
use tokio::net::{TcpListener, TcpStream};

pub struct InterDaemonConnection {
@@ -65,26 +61,27 @@ pub async fn send_inter_daemon_event(
}

pub async fn spawn_listener_loop(
bind: SocketAddr,
machine_id: String,
events_tx: flume::Sender<Timestamped<InterDaemonEvent>>,
) -> eyre::Result<SocketAddr> {
let localhost = Ipv4Addr::new(127, 0, 0, 1);
let socket = match TcpListener::bind((localhost, 0)).await {
) -> eyre::Result<u16> {
let socket = match TcpListener::bind(bind).await {
Ok(socket) => socket,
Err(err) => {
return Err(eyre::Report::new(err).wrap_err("failed to create local TCP listener"))
}
};
let socket_addr = socket
let listen_port = socket
.local_addr()
.wrap_err("failed to get local addr of socket")?;
.wrap_err("failed to get local addr of socket")?
.port();

tokio::spawn(async move {
listener_loop(socket, events_tx).await;
tracing::debug!("inter-daemon listener loop finished for machine `{machine_id}`");
});

Ok(socket_addr)
Ok(listen_port)
}

async fn listener_loop(


+ 8
- 4
binaries/daemon/src/lib.rs View File

@@ -77,15 +77,19 @@ pub struct Daemon {
}

impl Daemon {
pub async fn run(coordinator_addr: SocketAddr, machine_id: String) -> eyre::Result<()> {
pub async fn run(
coordinator_addr: SocketAddr,
machine_id: String,
bind_addr: SocketAddr,
) -> eyre::Result<()> {
let clock = Arc::new(HLC::default());

let ctrlc_events = set_up_ctrlc_handler(clock.clone())?;

// spawn listen loop
let (events_tx, events_rx) = flume::bounded(10);
let listen_socket =
inter_daemon::spawn_listener_loop(machine_id.clone(), events_tx).await?;
let listen_port =
inter_daemon::spawn_listener_loop(bind_addr, machine_id.clone(), events_tx).await?;
let daemon_events = events_rx.into_stream().map(|e| Timestamped {
inner: Event::Daemon(e.inner),
timestamp: e.timestamp,
@@ -93,7 +97,7 @@ impl Daemon {

// connect to the coordinator
let coordinator_events =
coordinator::register(coordinator_addr, machine_id.clone(), listen_socket, &clock)
coordinator::register(coordinator_addr, machine_id.clone(), listen_port, &clock)
.await
.wrap_err("failed to connect to dora-coordinator")?
.map(


+ 8
- 3
examples/multiple-daemons/run.rs View File

@@ -1,14 +1,14 @@
use dora_coordinator::{ControlEvent, Event};
use dora_core::{
descriptor::Descriptor,
topics::{ControlRequest, ControlRequestReply, DataflowId},
topics::{ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_DEFAULT},
};
use dora_tracing::set_up_tracing;
use eyre::{bail, Context};

use std::{
collections::BTreeSet,
net::{Ipv4Addr, SocketAddr},
net::{IpAddr, Ipv4Addr, SocketAddr},
path::Path,
time::Duration,
};
@@ -34,8 +34,13 @@ async fn main() -> eyre::Result<()> {
build_dataflow(dataflow).await?;

let (coordinator_events_tx, coordinator_events_rx) = mpsc::channel(1);
let coordinator_bind = SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
DORA_COORDINATOR_PORT_DEFAULT,
);
let (coordinator_port, coordinator) =
dora_coordinator::start(None, ReceiverStream::new(coordinator_events_rx)).await?;
dora_coordinator::start(coordinator_bind, ReceiverStream::new(coordinator_events_rx))
.await?;
let coordinator_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), coordinator_port);
let daemon_a = run_daemon(coordinator_addr.to_string(), "A".into());
let daemon_b = run_daemon(coordinator_addr.to_string(), "B".into());


+ 1
- 2
libraries/core/src/coordinator_messages.rs View File

@@ -1,13 +1,12 @@
use crate::daemon_messages::DataflowId;
use eyre::eyre;
use std::net::SocketAddr;

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub enum CoordinatorRequest {
Register {
dora_version: String,
machine_id: String,
listen_socket: SocketAddr,
listen_port: u16,
},
Event {
machine_id: String,


Loading…
Cancel
Save