| @@ -272,6 +272,14 @@ jobs: | |||
| sleep 10 | |||
| dora stop --name ci-rust-test --grace-duration 5s | |||
| dora destroy | |||
| export IP="$(curl ipinfo.io/ip)" | |||
| dora up --coordinator-addr $IP:6012 | |||
| dora list --coordinator-addr $IP:6012 | |||
| dora start dataflow.yml --name ci-rust-test --coordinator-addr $IP:6012 | |||
| sleep 10 | |||
| dora stop --name ci-rust-test --grace-duration 5 --coordinator-addr $IP:6012 | |||
| dora destroy --coordinator-addr $IP:6012 | |||
| - name: "Test CLI (Python)" | |||
| timeout-minutes: 30 | |||
| # fail-fast by using bash shell explictly | |||
| @@ -294,6 +302,13 @@ jobs: | |||
| sleep 10 | |||
| dora stop --name ci-python-test --grace-duration 5s | |||
| dora destroy | |||
| export IP="$(curl ipinfo.io/ip)" | |||
| dora up --coordinator-addr $IP:6012 | |||
| dora list --coordinator-addr $IP:6012 | |||
| dora start dataflow.yml --name ci-rust-test --coordinator-addr $IP:6012 | |||
| sleep 10 | |||
| dora stop --name ci-rust-test --grace-duration 5 --coordinator-addr $IP:6012 | |||
| dora destroy --coordinator-addr $IP:6012 | |||
| clippy: | |||
| name: "Clippy" | |||
| @@ -1,6 +1,5 @@ | |||
| use crate::connect_to_coordinator; | |||
| use communication_layer_request_reply::TcpRequestReplyConnection; | |||
| use dora_core::topics::control_socket_addr; | |||
| use dora_core::topics::{ControlRequest, ControlRequestReply}; | |||
| use eyre::{bail, Context}; | |||
| use std::{ | |||
| @@ -21,8 +20,7 @@ pub fn check_environment(coordinator_addr: Option<SocketAddr>) -> eyre::Result<( | |||
| // check whether coordinator is running | |||
| write!(stdout, "Dora Coordinator: ")?; | |||
| let coordination_addr = coordinator_addr.unwrap_or_else(|| control_socket_addr()); | |||
| let mut session = match connect_to_coordinator(coordination_addr) { | |||
| let mut session = match connect_to_coordinator(coordinator_addr) { | |||
| Ok(session) => { | |||
| let _ = stdout.set_color(ColorSpec::new().set_fg(Some(Color::Green))); | |||
| writeln!(stdout, "ok")?; | |||
| @@ -68,12 +68,14 @@ enum Command { | |||
| Up { | |||
| #[clap(long)] | |||
| config: Option<PathBuf>, | |||
| #[clap(long)] | |||
| coordinator_addr: Option<SocketAddr>, | |||
| }, | |||
| /// Destroy running coordinator and daemon. If some dataflows are still running, they will be stopped first. | |||
| Destroy { | |||
| #[clap(long)] | |||
| config: Option<PathBuf>, | |||
| #[clap(long)] | |||
| coordinator_addr: Option<SocketAddr>, | |||
| }, | |||
| /// Start the given dataflow path. Attach a name to the running dataflow by using --name. | |||
| @@ -96,10 +98,12 @@ enum Command { | |||
| #[clap(long)] | |||
| #[arg(value_parser = parse)] | |||
| grace_duration: Option<Duration>, | |||
| #[clap(long)] | |||
| coordinator_addr: Option<SocketAddr>, | |||
| }, | |||
| /// List running dataflows. | |||
| List { | |||
| #[clap(long)] | |||
| coordinator_addr: Option<SocketAddr>, | |||
| }, | |||
| // Planned for future releases: | |||
| @@ -109,6 +113,7 @@ enum Command { | |||
| Logs { | |||
| dataflow: Option<String>, | |||
| node: String, | |||
| #[clap(long)] | |||
| coordinator_addr: Option<SocketAddr>, | |||
| }, | |||
| // Metrics, | |||
| @@ -234,8 +239,7 @@ fn run() -> eyre::Result<()> { | |||
| node, | |||
| coordinator_addr, | |||
| } => { | |||
| let coordination_addr = coordinator_addr.unwrap_or_else(|| control_socket_addr()); | |||
| let mut session = connect_to_coordinator(coordination_addr) | |||
| let mut session = connect_to_coordinator(coordinator_addr) | |||
| .wrap_err("failed to connect to dora coordinator")?; | |||
| let uuids = query_running_dataflows(&mut *session) | |||
| .wrap_err("failed to query running dataflows")?; | |||
| @@ -271,8 +275,7 @@ fn run() -> eyre::Result<()> { | |||
| .check(&working_dir) | |||
| .wrap_err("Could not validate yaml")?; | |||
| let coordination_addr = coordinator_addr.unwrap_or_else(|| control_socket_addr()); | |||
| let mut session = connect_to_coordinator(coordination_addr) | |||
| let mut session = connect_to_coordinator(coordinator_addr) | |||
| .wrap_err("failed to connect to dora coordinator")?; | |||
| let dataflow_id = start_dataflow( | |||
| dataflow_descriptor.clone(), | |||
| @@ -291,23 +294,19 @@ fn run() -> eyre::Result<()> { | |||
| )? | |||
| } | |||
| } | |||
| Command::List { coordinator_addr } => { | |||
| let coordination_addr = coordinator_addr.unwrap_or_else(|| control_socket_addr()); | |||
| match connect_to_coordinator(coordination_addr) { | |||
| Ok(mut session) => list(&mut *session)?, | |||
| Err(_) => { | |||
| bail!("No dora coordinator seems to be running."); | |||
| } | |||
| Command::List { coordinator_addr } => match connect_to_coordinator(coordinator_addr) { | |||
| Ok(mut session) => list(&mut *session)?, | |||
| Err(_) => { | |||
| bail!("No dora coordinator seems to be running."); | |||
| } | |||
| } | |||
| }, | |||
| Command::Stop { | |||
| uuid, | |||
| name, | |||
| grace_duration, | |||
| coordinator_addr, | |||
| } => { | |||
| let coordination_addr = coordinator_addr.unwrap_or_else(|| control_socket_addr()); | |||
| let mut session = connect_to_coordinator(coordination_addr) | |||
| let mut session = connect_to_coordinator(coordinator_addr) | |||
| .wrap_err("could not connect to dora coordinator")?; | |||
| match (uuid, name) { | |||
| (Some(uuid), _) => stop_dataflow(uuid, grace_duration, &mut *session)?, | |||
| @@ -499,7 +498,8 @@ fn query_running_dataflows( | |||
| } | |||
| fn connect_to_coordinator( | |||
| coordinator_addr: SocketAddr, | |||
| coordinator_addr: Option<SocketAddr>, | |||
| ) -> std::io::Result<Box<TcpRequestReplyConnection>> { | |||
| TcpLayer::new().connect(coordinator_addr) | |||
| let coordination_addr = coordinator_addr.unwrap_or_else(control_socket_addr); | |||
| TcpLayer::new().connect(coordination_addr) | |||
| } | |||
| @@ -1,5 +1,5 @@ | |||
| use crate::{check::daemon_running, connect_to_coordinator}; | |||
| use dora_core::topics::{control_socket_addr, ControlRequest}; | |||
| use dora_core::topics::ControlRequest; | |||
| use eyre::Context; | |||
| use std::{fs, net::SocketAddr, path::Path, process::Command, time::Duration}; | |||
| #[derive(Debug, Default, serde::Serialize, serde::Deserialize)] | |||
| @@ -10,14 +10,13 @@ pub(crate) fn up( | |||
| coordinator_addr: Option<SocketAddr>, | |||
| ) -> eyre::Result<()> { | |||
| let UpConfig {} = parse_dora_config(config_path)?; | |||
| let coordination_addr = coordinator_addr.unwrap_or_else(|| control_socket_addr()); | |||
| let mut session = match connect_to_coordinator(coordination_addr) { | |||
| let mut session = match connect_to_coordinator(coordinator_addr) { | |||
| Ok(session) => session, | |||
| Err(_) => { | |||
| start_coordinator().wrap_err("failed to start dora-coordinator")?; | |||
| loop { | |||
| match connect_to_coordinator(coordination_addr) { | |||
| match connect_to_coordinator(coordinator_addr) { | |||
| Ok(session) => break session, | |||
| Err(_) => { | |||
| // sleep a bit until the coordinator accepts connections | |||
| @@ -54,8 +53,7 @@ pub(crate) fn destroy( | |||
| coordinator_addr: Option<SocketAddr>, | |||
| ) -> Result<(), eyre::ErrReport> { | |||
| let UpConfig {} = parse_dora_config(config_path)?; | |||
| let coordination_addr = coordinator_addr.unwrap_or_else(|| control_socket_addr()); | |||
| match connect_to_coordinator(coordination_addr) { | |||
| match connect_to_coordinator(coordinator_addr) { | |||
| Ok(mut session) => { | |||
| // send destroy command to dora-coordinator | |||
| session | |||