From 2ccba2b953de977419d91aa9c056fc14b64f046c Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Thu, 30 May 2024 13:27:17 +0200 Subject: [PATCH] Allow setting custom control port in coordinator --- binaries/cli/src/main.rs | 41 +++++++++++++++++++------- binaries/coordinator/src/lib.rs | 52 +++++++++++++++------------------ libraries/core/src/topics.rs | 17 ++--------- 3 files changed, 56 insertions(+), 54 deletions(-) diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 2f36fc34..c1393cc9 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -5,8 +5,8 @@ use dora_coordinator::Event; use dora_core::{ descriptor::Descriptor, topics::{ - control_socket_addr, ControlRequest, ControlRequestReply, DataflowId, - DORA_COORDINATOR_PORT_CONTROL, DORA_COORDINATOR_PORT_DEFAULT, + ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_CONTROL_DEFAULT, + DORA_COORDINATOR_PORT_DEFAULT, }, }; use dora_daemon::Daemon; @@ -140,10 +140,18 @@ enum Command { Runtime, /// Run coordinator Coordinator { - #[clap(long, default_value_t = SocketAddr::new( - IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), DORA_COORDINATOR_PORT_DEFAULT) - )] - addr: SocketAddr, + /// Network interface to bind to for daemon communication + #[clap(long, default_value_t = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))] + interface: IpAddr, + /// Port number to bind to for daemon communication + #[clap(long, default_value_t = DORA_COORDINATOR_PORT_DEFAULT)] + port: u16, + /// Network interface to bind to for control communication + #[clap(long, default_value_t = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))] + control_interface: IpAddr, + /// Port number to bind to for control communication + #[clap(long, default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] + control_port: u16, }, } @@ -319,14 +327,22 @@ fn run() -> eyre::Result<()> { config, coordinator_addr, } => up::destroy(config.as_deref(), coordinator_addr)?, - Command::Coordinator { addr } => { + Command::Coordinator { + interface, + port, + control_interface, + control_port, + } => { let rt = Builder::new_multi_thread() .enable_all() .build() .context("tokio runtime failed")?; rt.block_on(async { - let (_port, task) = - dora_coordinator::start(addr, futures::stream::empty::()).await?; + let bind = SocketAddr::new(interface, port); + let bind_control = SocketAddr::new(control_interface, control_port); + let (port, task) = + dora_coordinator::start(bind, bind_control, futures::stream::empty::()) + .await?; task.await }) .context("failed to run dora-coordinator")? @@ -504,9 +520,12 @@ fn connect_to_coordinator( if let Some(coordinator_addr) = coordinator_addr { TcpLayer::new().connect(SocketAddr::new( coordinator_addr, - DORA_COORDINATOR_PORT_CONTROL, + DORA_COORDINATOR_PORT_CONTROL_DEFAULT, )) } else { - TcpLayer::new().connect(control_socket_addr()) + TcpLayer::new().connect(SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), + DORA_COORDINATOR_PORT_CONTROL_DEFAULT, + )) } } diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 0a30ac10..5032d6a5 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -9,7 +9,7 @@ use dora_core::{ daemon_messages::{DaemonCoordinatorEvent, DaemonCoordinatorReply, Timestamped}, descriptor::{Descriptor, ResolvedNode}, message::uhlc::{self, HLC}, - topics::{control_socket_addr, ControlRequest, ControlRequestReply, DataflowId}, + topics::{ControlRequest, ControlRequestReply, DataflowId}, }; use eyre::{bail, eyre, ContextCompat, WrapErr}; use futures::{stream::FuturesUnordered, Future, Stream, StreamExt}; @@ -22,11 +22,7 @@ use std::{ sync::Arc, time::{Duration, Instant}, }; -use tokio::{ - net::{TcpListener, TcpStream}, - sync::mpsc, - task::JoinHandle, -}; +use tokio::{net::TcpStream, sync::mpsc, task::JoinHandle}; use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream}; use uuid::Uuid; @@ -37,6 +33,7 @@ mod tcp_utils; pub async fn start( bind: SocketAddr, + bind_control: SocketAddr, external_events: impl Stream + Unpin, ) -> Result<(u16, impl Future>), eyre::ErrReport> { let listener = listener::create_listener(bind).await?; @@ -44,13 +41,30 @@ pub async fn start( .local_addr() .wrap_err("failed to get local addr of listener")? .port(); + let new_daemon_connections = TcpListenerStream::new(listener).map(|c| { + c.map(Event::NewDaemonConnection) + .wrap_err("failed to open connection") + .unwrap_or_else(Event::DaemonConnectError) + }); + let mut tasks = FuturesUnordered::new(); + let control_events = control::control_events(bind_control, &tasks) + .await + .wrap_err("failed to create control events")?; // Setup ctrl-c handler let ctrlc_events = set_up_ctrlc_handler()?; + let events = ( + external_events, + new_daemon_connections, + control_events, + ctrlc_events, + ) + .merge(); + let future = async move { - start_inner(listener, &tasks, (ctrlc_events, external_events).merge()).await?; + start_inner(events, &tasks).await?; tracing::debug!("coordinator main loop finished, waiting on spawned tasks"); while let Some(join_result) = tasks.next().await { @@ -100,40 +114,22 @@ fn resolve_name( } async fn start_inner( - listener: TcpListener, + events: impl Stream + Unpin, tasks: &FuturesUnordered>, - external_events: impl Stream + Unpin, ) -> eyre::Result<()> { let clock = Arc::new(HLC::default()); - let new_daemon_connections = TcpListenerStream::new(listener).map(|c| { - c.map(Event::NewDaemonConnection) - .wrap_err("failed to open connection") - .unwrap_or_else(Event::DaemonConnectError) - }); - let (daemon_events_tx, daemon_events) = tokio::sync::mpsc::channel(2); let mut daemon_events_tx = Some(daemon_events_tx); let daemon_events = ReceiverStream::new(daemon_events); - let control_events = control::control_events(control_socket_addr(), tasks) - .await - .wrap_err("failed to create control events")?; - let daemon_heartbeat_interval = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(Duration::from_secs(3))) .map(|_| Event::DaemonHeartbeatInterval); // events that should be aborted on `dora destroy` - let (abortable_events, abort_handle) = futures::stream::abortable( - ( - control_events, - new_daemon_connections, - external_events, - daemon_heartbeat_interval, - ) - .merge(), - ); + let (abortable_events, abort_handle) = + futures::stream::abortable((events, daemon_heartbeat_interval).merge()); let mut events = (abortable_events, daemon_events).merge(); diff --git a/libraries/core/src/topics.rs b/libraries/core/src/topics.rs index 69ad75a5..506c1b42 100644 --- a/libraries/core/src/topics.rs +++ b/libraries/core/src/topics.rs @@ -1,10 +1,4 @@ -use std::{ - collections::BTreeSet, - fmt::Display, - net::{Ipv4Addr, SocketAddr}, - path::PathBuf, - time::Duration, -}; +use std::{collections::BTreeSet, fmt::Display, path::PathBuf, time::Duration}; use uuid::Uuid; use crate::{ @@ -13,17 +7,10 @@ use crate::{ }; pub const DORA_COORDINATOR_PORT_DEFAULT: u16 = 0xD02A; -pub const DORA_COORDINATOR_PORT_CONTROL: u16 = 0x177C; +pub const DORA_COORDINATOR_PORT_CONTROL_DEFAULT: u16 = 0x177C; pub const MANUAL_STOP: &str = "dora/stop"; -pub fn control_socket_addr() -> SocketAddr { - SocketAddr::new( - Ipv4Addr::new(127, 0, 0, 1).into(), - DORA_COORDINATOR_PORT_CONTROL, - ) -} - #[derive(Debug, serde::Deserialize, serde::Serialize)] pub enum ControlRequest { Start {