diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4f0a35c2..8c172af5 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -272,6 +272,14 @@ jobs: sleep 10 dora stop --name ci-rust-test --grace-duration 5s dora destroy + export IP="$(curl ipinfo.io/ip)" + dora up --coordinator-addr $IP:6012 + dora list --coordinator-addr $IP:6012 + dora start dataflow.yml --name ci-rust-test --coordinator-addr $IP:6012 + sleep 10 + dora stop --name ci-rust-test --grace-duration 5 --coordinator-addr $IP:6012 + dora destroy --coordinator-addr $IP:6012 + - name: "Test CLI (Python)" timeout-minutes: 30 # fail-fast by using bash shell explictly @@ -294,6 +302,13 @@ jobs: sleep 10 dora stop --name ci-python-test --grace-duration 5s dora destroy + export IP="$(curl ipinfo.io/ip)" + dora up --coordinator-addr $IP:6012 + dora list --coordinator-addr $IP:6012 + dora start dataflow.yml --name ci-rust-test --coordinator-addr $IP:6012 + sleep 10 + dora stop --name ci-rust-test --grace-duration 5 --coordinator-addr $IP:6012 + dora destroy --coordinator-addr $IP:6012 clippy: name: "Clippy" diff --git a/binaries/cli/src/check.rs b/binaries/cli/src/check.rs index 6e3b5fd1..19dea806 100644 --- a/binaries/cli/src/check.rs +++ b/binaries/cli/src/check.rs @@ -1,6 +1,5 @@ use crate::connect_to_coordinator; use communication_layer_request_reply::TcpRequestReplyConnection; -use dora_core::topics::control_socket_addr; use dora_core::topics::{ControlRequest, ControlRequestReply}; use eyre::{bail, Context}; use std::{ @@ -21,8 +20,7 @@ pub fn check_environment(coordinator_addr: Option) -> eyre::Result<( // check whether coordinator is running write!(stdout, "Dora Coordinator: ")?; - let coordination_addr = coordinator_addr.unwrap_or_else(|| control_socket_addr()); - let mut session = match connect_to_coordinator(coordination_addr) { + let mut session = match connect_to_coordinator(coordinator_addr) { Ok(session) => { let _ = stdout.set_color(ColorSpec::new().set_fg(Some(Color::Green))); writeln!(stdout, "ok")?; diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index c10bb8c2..a746c080 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -68,12 +68,14 @@ enum Command { Up { #[clap(long)] config: Option, + #[clap(long)] coordinator_addr: Option, }, /// Destroy running coordinator and daemon. If some dataflows are still running, they will be stopped first. Destroy { #[clap(long)] config: Option, + #[clap(long)] coordinator_addr: Option, }, /// Start the given dataflow path. Attach a name to the running dataflow by using --name. @@ -96,10 +98,12 @@ enum Command { #[clap(long)] #[arg(value_parser = parse)] grace_duration: Option, + #[clap(long)] coordinator_addr: Option, }, /// List running dataflows. List { + #[clap(long)] coordinator_addr: Option, }, // Planned for future releases: @@ -109,6 +113,7 @@ enum Command { Logs { dataflow: Option, node: String, + #[clap(long)] coordinator_addr: Option, }, // Metrics, @@ -234,8 +239,7 @@ fn run() -> eyre::Result<()> { node, coordinator_addr, } => { - let coordination_addr = coordinator_addr.unwrap_or_else(|| control_socket_addr()); - let mut session = connect_to_coordinator(coordination_addr) + let mut session = connect_to_coordinator(coordinator_addr) .wrap_err("failed to connect to dora coordinator")?; let uuids = query_running_dataflows(&mut *session) .wrap_err("failed to query running dataflows")?; @@ -271,8 +275,7 @@ fn run() -> eyre::Result<()> { .check(&working_dir) .wrap_err("Could not validate yaml")?; - let coordination_addr = coordinator_addr.unwrap_or_else(|| control_socket_addr()); - let mut session = connect_to_coordinator(coordination_addr) + let mut session = connect_to_coordinator(coordinator_addr) .wrap_err("failed to connect to dora coordinator")?; let dataflow_id = start_dataflow( dataflow_descriptor.clone(), @@ -291,23 +294,19 @@ fn run() -> eyre::Result<()> { )? } } - Command::List { coordinator_addr } => { - let coordination_addr = coordinator_addr.unwrap_or_else(|| control_socket_addr()); - match connect_to_coordinator(coordination_addr) { - Ok(mut session) => list(&mut *session)?, - Err(_) => { - bail!("No dora coordinator seems to be running."); - } + Command::List { coordinator_addr } => match connect_to_coordinator(coordinator_addr) { + Ok(mut session) => list(&mut *session)?, + Err(_) => { + bail!("No dora coordinator seems to be running."); } - } + }, Command::Stop { uuid, name, grace_duration, coordinator_addr, } => { - let coordination_addr = coordinator_addr.unwrap_or_else(|| control_socket_addr()); - let mut session = connect_to_coordinator(coordination_addr) + let mut session = connect_to_coordinator(coordinator_addr) .wrap_err("could not connect to dora coordinator")?; match (uuid, name) { (Some(uuid), _) => stop_dataflow(uuid, grace_duration, &mut *session)?, @@ -499,7 +498,8 @@ fn query_running_dataflows( } fn connect_to_coordinator( - coordinator_addr: SocketAddr, + coordinator_addr: Option, ) -> std::io::Result> { - TcpLayer::new().connect(coordinator_addr) + let coordination_addr = coordinator_addr.unwrap_or_else(control_socket_addr); + TcpLayer::new().connect(coordination_addr) } diff --git a/binaries/cli/src/up.rs b/binaries/cli/src/up.rs index 623869ba..f4d7e96e 100644 --- a/binaries/cli/src/up.rs +++ b/binaries/cli/src/up.rs @@ -1,5 +1,5 @@ use crate::{check::daemon_running, connect_to_coordinator}; -use dora_core::topics::{control_socket_addr, ControlRequest}; +use dora_core::topics::ControlRequest; use eyre::Context; use std::{fs, net::SocketAddr, path::Path, process::Command, time::Duration}; #[derive(Debug, Default, serde::Serialize, serde::Deserialize)] @@ -10,14 +10,13 @@ pub(crate) fn up( coordinator_addr: Option, ) -> eyre::Result<()> { let UpConfig {} = parse_dora_config(config_path)?; - let coordination_addr = coordinator_addr.unwrap_or_else(|| control_socket_addr()); - let mut session = match connect_to_coordinator(coordination_addr) { + let mut session = match connect_to_coordinator(coordinator_addr) { Ok(session) => session, Err(_) => { start_coordinator().wrap_err("failed to start dora-coordinator")?; loop { - match connect_to_coordinator(coordination_addr) { + match connect_to_coordinator(coordinator_addr) { Ok(session) => break session, Err(_) => { // sleep a bit until the coordinator accepts connections @@ -54,8 +53,7 @@ pub(crate) fn destroy( coordinator_addr: Option, ) -> Result<(), eyre::ErrReport> { let UpConfig {} = parse_dora_config(config_path)?; - let coordination_addr = coordinator_addr.unwrap_or_else(|| control_socket_addr()); - match connect_to_coordinator(coordination_addr) { + match connect_to_coordinator(coordinator_addr) { Ok(mut session) => { // send destroy command to dora-coordinator session