diff --git a/binaries/cli/src/check.rs b/binaries/cli/src/check.rs index fea9dfc4..14a3cd35 100644 --- a/binaries/cli/src/check.rs +++ b/binaries/cli/src/check.rs @@ -4,11 +4,11 @@ use dora_core::topics::{ControlRequest, ControlRequestReply}; use eyre::{bail, Context}; use std::{ io::{IsTerminal, Write}, - net::IpAddr, + net::SocketAddr, }; use termcolor::{Color, ColorChoice, ColorSpec, WriteColor}; -pub fn check_environment(coordinator_addr: IpAddr) -> eyre::Result<()> { +pub fn check_environment(coordinator_addr: SocketAddr) -> eyre::Result<()> { let mut error_occured = false; let color_choice = if std::io::stdout().is_terminal() { diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index f4b2c4bb..92d08b87 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -45,8 +45,12 @@ enum Command { Check { #[clap(long)] dataflow: Option, - #[clap(long, default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))] + /// Address of the dora coordinator + #[clap(long, value_name = "IP", default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))] coordinator_addr: IpAddr, + /// Port number of the coordinator control server + #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] + coordinator_port: u16, }, /// Generate a visualization of the given graph using mermaid.js. Use --open to open browser. Graph { @@ -69,23 +73,35 @@ enum Command { Up { #[clap(long)] config: Option, - #[clap(long, default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))] + /// Address of the dora coordinator + #[clap(long, value_name = "IP", default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))] coordinator_addr: IpAddr, + /// Port number of the coordinator control server + #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] + coordinator_port: u16, }, /// Destroy running coordinator and daemon. If some dataflows are still running, they will be stopped first. Destroy { #[clap(long)] config: Option, - #[clap(long, default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))] + /// Address of the dora coordinator + #[clap(long, value_name = "IP", default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))] coordinator_addr: IpAddr, + /// Port number of the coordinator control server + #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] + coordinator_port: u16, }, /// Start the given dataflow path. Attach a name to the running dataflow by using --name. Start { dataflow: PathBuf, #[clap(long)] name: Option, - #[clap(long, default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))] + /// Address of the dora coordinator + #[clap(long, value_name = "IP", default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))] coordinator_addr: IpAddr, + /// Port number of the coordinator control server + #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] + coordinator_port: u16, #[clap(long, action)] attach: bool, #[clap(long, action)] @@ -99,13 +115,21 @@ enum Command { #[clap(long)] #[arg(value_parser = parse)] grace_duration: Option, - #[clap(long, default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))] + /// Address of the dora coordinator + #[clap(long, value_name = "IP", default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))] coordinator_addr: IpAddr, + /// Port number of the coordinator control server + #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] + coordinator_port: u16, }, /// List running dataflows. List { - #[clap(long, default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))] + /// Address of the dora coordinator + #[clap(long, value_name = "IP", default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))] coordinator_addr: IpAddr, + /// Port number of the coordinator control server + #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] + coordinator_port: u16, }, // Planned for future releases: // Dashboard, @@ -114,8 +138,12 @@ enum Command { Logs { dataflow: Option, node: String, - #[clap(long, default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))] + /// Address of the dora coordinator + #[clap(long, value_name = "IP", default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))] coordinator_addr: IpAddr, + /// Port number of the coordinator control server + #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] + coordinator_port: u16, }, // Metrics, // Stats, @@ -130,9 +158,9 @@ enum Command { IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0) )] addr: SocketAddr, - #[clap(long)] - coordinator_addr: Option, - + /// Address and port number of the dora coordinator + #[clap(long, default_value_t = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), DORA_COORDINATOR_PORT_DEFAULT))] + coordinator_addr: SocketAddr, #[clap(long)] run_dataflow: Option, }, @@ -210,6 +238,7 @@ fn run() -> eyre::Result<()> { Command::Check { dataflow, coordinator_addr, + coordinator_port, } => match dataflow { Some(dataflow) => { let working_dir = dataflow @@ -219,9 +248,9 @@ fn run() -> eyre::Result<()> { .ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))? .to_owned(); Descriptor::blocking_read(&dataflow)?.check(&working_dir)?; - check::check_environment(coordinator_addr)? + check::check_environment((coordinator_addr, coordinator_port).into())? } - None => check::check_environment(coordinator_addr)?, + None => check::check_environment((coordinator_addr, coordinator_port).into())?, }, Command::Graph { dataflow, @@ -240,15 +269,20 @@ fn run() -> eyre::Result<()> { Command::Up { config, coordinator_addr, + coordinator_port, } => { - up::up(config.as_deref(), coordinator_addr)?; + up::up( + config.as_deref(), + (coordinator_addr, coordinator_port).into(), + )?; } Command::Logs { dataflow, node, coordinator_addr, + coordinator_port, } => { - let mut session = connect_to_coordinator(coordinator_addr) + let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into()) .wrap_err("failed to connect to dora coordinator")?; let uuids = query_running_dataflows(&mut *session) .wrap_err("failed to query running dataflows")?; @@ -269,6 +303,7 @@ fn run() -> eyre::Result<()> { dataflow, name, coordinator_addr, + coordinator_port, attach, hot_reload, } => { @@ -284,7 +319,7 @@ fn run() -> eyre::Result<()> { .check(&working_dir) .wrap_err("Could not validate yaml")?; - let mut session = connect_to_coordinator(coordinator_addr) + let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into()) .wrap_err("failed to connect to dora coordinator")?; let dataflow_id = start_dataflow( dataflow_descriptor.clone(), @@ -303,7 +338,10 @@ fn run() -> eyre::Result<()> { )? } } - Command::List { coordinator_addr } => match connect_to_coordinator(coordinator_addr) { + Command::List { + coordinator_addr, + coordinator_port, + } => match connect_to_coordinator((coordinator_addr, coordinator_port).into()) { Ok(mut session) => list(&mut *session)?, Err(_) => { bail!("No dora coordinator seems to be running."); @@ -314,8 +352,9 @@ fn run() -> eyre::Result<()> { name, grace_duration, coordinator_addr, + coordinator_port, } => { - let mut session = connect_to_coordinator(coordinator_addr) + let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into()) .wrap_err("could not connect to dora coordinator")?; match (uuid, name) { (Some(uuid), _) => stop_dataflow(uuid, grace_duration, &mut *session)?, @@ -326,7 +365,11 @@ fn run() -> eyre::Result<()> { Command::Destroy { config, coordinator_addr, - } => up::destroy(config.as_deref(), coordinator_addr)?, + coordinator_port, + } => up::destroy( + config.as_deref(), + (coordinator_addr, coordinator_port).into(), + )?, Command::Coordinator { interface, port, @@ -343,6 +386,7 @@ fn run() -> eyre::Result<()> { let (port, task) = dora_coordinator::start(bind, bind_control, futures::stream::empty::()) .await?; + println!("Listening for incoming daemon connection on {port}"); task.await }) .context("failed to run dora-coordinator")? @@ -357,11 +401,12 @@ fn run() -> eyre::Result<()> { .enable_all() .build() .context("tokio runtime failed")?; + let localhost = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); rt.block_on(async { match run_dataflow { Some(dataflow_path) => { tracing::info!("Starting dataflow `{}`", dataflow_path.display()); - if let Some(coordinator_addr) = coordinator_addr { + if coordinator_addr != SocketAddr::new(localhost, DORA_COORDINATOR_PORT_DEFAULT){ tracing::info!( "Not using coordinator addr {} as `run_dataflow` is for local dataflow only. Please use the `start` command for remote coordinator", coordinator_addr @@ -371,12 +416,10 @@ fn run() -> eyre::Result<()> { Daemon::run_dataflow(&dataflow_path).await } None => { - let coordination_addr = coordinator_addr.unwrap_or_else(|| { + if coordinator_addr.ip() == localhost { tracing::info!("Starting in local mode"); - let localhost = Ipv4Addr::new(127, 0, 0, 1); - (localhost, DORA_COORDINATOR_PORT_DEFAULT).into() - }); - Daemon::run(coordination_addr, machine_id.unwrap_or_default(), addr).await + } + Daemon::run(coordinator_addr, machine_id.unwrap_or_default(), addr).await } } }) @@ -515,10 +558,7 @@ fn query_running_dataflows( } fn connect_to_coordinator( - coordinator_addr: IpAddr, + coordinator_addr: SocketAddr, ) -> std::io::Result> { - TcpLayer::new().connect(SocketAddr::new( - coordinator_addr, - DORA_COORDINATOR_PORT_CONTROL_DEFAULT, - )) + TcpLayer::new().connect(coordinator_addr) } diff --git a/binaries/cli/src/up.rs b/binaries/cli/src/up.rs index 4816a01c..38889edf 100644 --- a/binaries/cli/src/up.rs +++ b/binaries/cli/src/up.rs @@ -1,11 +1,11 @@ use crate::{check::daemon_running, connect_to_coordinator}; use dora_core::topics::ControlRequest; use eyre::Context; -use std::{fs, net::IpAddr, path::Path, process::Command, time::Duration}; +use std::{fs, net::SocketAddr, path::Path, process::Command, time::Duration}; #[derive(Debug, Default, serde::Serialize, serde::Deserialize)] struct UpConfig {} -pub(crate) fn up(config_path: Option<&Path>, coordinator_addr: IpAddr) -> eyre::Result<()> { +pub(crate) fn up(config_path: Option<&Path>, coordinator_addr: SocketAddr) -> eyre::Result<()> { let UpConfig {} = parse_dora_config(config_path)?; let mut session = match connect_to_coordinator(coordinator_addr) { Ok(session) => session, @@ -47,7 +47,7 @@ pub(crate) fn up(config_path: Option<&Path>, coordinator_addr: IpAddr) -> eyre:: pub(crate) fn destroy( config_path: Option<&Path>, - coordinator_addr: IpAddr, + coordinator_addr: SocketAddr, ) -> Result<(), eyre::ErrReport> { let UpConfig {} = parse_dora_config(config_path)?; match connect_to_coordinator(coordinator_addr) { diff --git a/examples/multiple-daemons/run.rs b/examples/multiple-daemons/run.rs index 6f9bcd4e..32a695cc 100644 --- a/examples/multiple-daemons/run.rs +++ b/examples/multiple-daemons/run.rs @@ -1,7 +1,10 @@ use dora_coordinator::{ControlEvent, Event}; use dora_core::{ descriptor::Descriptor, - topics::{ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_DEFAULT}, + topics::{ + ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_CONTROL_DEFAULT, + DORA_COORDINATOR_PORT_DEFAULT, + }, }; use dora_tracing::set_up_tracing; use eyre::{bail, Context}; @@ -38,9 +41,16 @@ async fn main() -> eyre::Result<()> { IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), DORA_COORDINATOR_PORT_DEFAULT, ); - let (coordinator_port, coordinator) = - dora_coordinator::start(coordinator_bind, ReceiverStream::new(coordinator_events_rx)) - .await?; + let coordinator_control_bind = SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), + DORA_COORDINATOR_PORT_CONTROL_DEFAULT, + ); + let (coordinator_port, coordinator) = dora_coordinator::start( + coordinator_bind, + coordinator_control_bind, + ReceiverStream::new(coordinator_events_rx), + ) + .await?; let coordinator_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), coordinator_port); let daemon_a = run_daemon(coordinator_addr.to_string(), "A"); let daemon_b = run_daemon(coordinator_addr.to_string(), "B");