diff --git a/binaries/cli/src/attach.rs b/binaries/cli/src/attach.rs index e9e34549..aec86562 100644 --- a/binaries/cli/src/attach.rs +++ b/binaries/cli/src/attach.rs @@ -11,13 +11,11 @@ use std::{path::PathBuf, sync::mpsc, time::Duration}; use tracing::{error, info}; use uuid::Uuid; -use crate::control_connection; - pub fn attach_dataflow( dataflow: Descriptor, dataflow_path: PathBuf, dataflow_id: Uuid, - session: &mut Option>, + session: &mut TcpRequestReplyConnection, hot_reload: bool, ) -> Result<(), eyre::ErrReport> { let (tx, rx) = mpsc::sync_channel(2); @@ -70,7 +68,7 @@ pub fn attach_dataflow( if let Some((dataflow_id, node_id, operator_id)) = node_path_lookup.get(&path) { watcher_tx .send(ControlRequest::Reload { - dataflow_id: dataflow_id.clone(), + dataflow_id: *dataflow_id, node_id: node_id.clone(), operator_id: operator_id.clone(), }) @@ -123,7 +121,7 @@ pub fn attach_dataflow( Ok(reload_event) => reload_event, }; - let reply_raw = control_connection(session)? + let reply_raw = session .request(&serde_json::to_vec(&control_request)?) .wrap_err("failed to send request message to coordinator")?; let result: ControlRequestReply = diff --git a/binaries/cli/src/check.rs b/binaries/cli/src/check.rs index a364630f..4b77eb13 100644 --- a/binaries/cli/src/check.rs +++ b/binaries/cli/src/check.rs @@ -1,4 +1,5 @@ -use crate::control_connection; +use crate::connect_to_coordinator; +use communication_layer_request_reply::TcpRequestReplyConnection; use dora_core::topics::{ControlRequest, ControlRequestReply}; use eyre::{bail, Context}; use std::io::Write; @@ -16,19 +17,30 @@ pub fn check_environment() -> eyre::Result<()> { // check whether coordinator is running write!(stdout, "Dora Coordinator: ")?; - if coordinator_running()? { - let _ = stdout.set_color(ColorSpec::new().set_fg(Some(Color::Green))); - writeln!(stdout, "ok")?; - } else { - let _ = stdout.set_color(ColorSpec::new().set_fg(Some(Color::Red))); - writeln!(stdout, "not running")?; - error_occured = true; - } + let mut session = match connect_to_coordinator() { + Ok(session) => { + let _ = stdout.set_color(ColorSpec::new().set_fg(Some(Color::Green))); + writeln!(stdout, "ok")?; + Some(session) + } + Err(_) => { + let _ = stdout.set_color(ColorSpec::new().set_fg(Some(Color::Red))); + writeln!(stdout, "not running")?; + error_occured = true; + None + } + }; + let _ = stdout.reset(); // check whether daemon is running write!(stdout, "Dora Daemon: ")?; - if daemon_running()? { + if session + .as_deref_mut() + .map(daemon_running) + .transpose()? + .unwrap_or(false) + { let _ = stdout.set_color(ColorSpec::new().set_fg(Some(Color::Green))); writeln!(stdout, "ok")?; } else { @@ -47,30 +59,16 @@ pub fn check_environment() -> eyre::Result<()> { Ok(()) } -pub fn coordinator_running() -> Result { - let mut control_session = None; - let connected = control_connection(&mut control_session).is_ok(); - Ok(connected) -} - -pub fn daemon_running() -> Result { - let mut control_session = None; - let running = match control_connection(&mut control_session) { - Ok(connection) => { - let reply_raw = connection - .request(&serde_json::to_vec(&ControlRequest::DaemonConnected).unwrap()) - .wrap_err("failed to send DaemonConnected message")?; +pub fn daemon_running(session: &mut TcpRequestReplyConnection) -> Result { + let reply_raw = session + .request(&serde_json::to_vec(&ControlRequest::DaemonConnected).unwrap()) + .wrap_err("failed to send DaemonConnected message")?; - let reply = serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; - match reply { - ControlRequestReply::DaemonConnected(running) => running, - other => bail!("unexpected reply to daemon connection check: {other:?}"), - } - } - Err(_) => { - // coordinator is not running - false - } + let reply = serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; + let running = match reply { + ControlRequestReply::DaemonConnected(running) => running, + other => bail!("unexpected reply to daemon connection check: {other:?}"), }; + Ok(running) } diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 521becc4..4781c1c8 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -124,8 +124,6 @@ fn main() -> eyre::Result<()> { set_up_tracing("dora-cli").context("failed to set up tracing subscriber")?; let args = Args::parse(); - let mut session = None; - match args.command { Command::Check { dataflow, @@ -171,25 +169,36 @@ fn main() -> eyre::Result<()> { dataflow_description .check(&dataflow, None) .wrap_err("Could not validate yaml")?; - let dataflow_id = start_dataflow(dataflow.clone(), name, &mut session)?; + let mut session = + connect_to_coordinator().wrap_err("failed to connect to dora coordinator")?; + let dataflow_id = start_dataflow(dataflow.clone(), name, &mut *session)?; if attach { attach_dataflow( dataflow_description, dataflow, dataflow_id, - &mut session, + &mut *session, hot_reload, )? } } - Command::List => list(&mut session)?, - Command::Stop { uuid, name } => match (uuid, name) { - (Some(uuid), _) => stop_dataflow(uuid, &mut session)?, - (None, Some(name)) => stop_dataflow_by_name(name, &mut session)?, - (None, None) => stop_dataflow_interactive(&mut session)?, + Command::List => match connect_to_coordinator() { + Ok(mut session) => list(&mut *session)?, + Err(_) => { + eprintln!("No dora coordinator seems to be running."); + } }, - Command::Destroy { config } => up::destroy(config.as_deref(), &mut session)?, + Command::Stop { uuid, name } => { + let mut session = + connect_to_coordinator().wrap_err("could not connect to dora coordinator")?; + match (uuid, name) { + (Some(uuid), _) => stop_dataflow(uuid, &mut *session)?, + (None, Some(name)) => stop_dataflow_by_name(name, &mut *session)?, + (None, None) => stop_dataflow_interactive(&mut *session)?, + } + } + Command::Destroy { config } => up::destroy(config.as_deref())?, } Ok(()) @@ -198,12 +207,12 @@ fn main() -> eyre::Result<()> { fn start_dataflow( dataflow: PathBuf, name: Option, - session: &mut Option>, + session: &mut TcpRequestReplyConnection, ) -> Result { let canonicalized = dataflow .canonicalize() .wrap_err("given dataflow file does not exist")?; - let reply_raw = control_connection(session)? + let reply_raw = session .request( &serde_json::to_vec(&ControlRequest::Start { dataflow_path: canonicalized, @@ -225,9 +234,7 @@ fn start_dataflow( } } -fn stop_dataflow_interactive( - session: &mut Option>, -) -> eyre::Result<()> { +fn stop_dataflow_interactive(session: &mut TcpRequestReplyConnection) -> eyre::Result<()> { let uuids = query_running_dataflows(session).wrap_err("failed to query running dataflows")?; if uuids.is_empty() { eprintln!("No dataflows are running"); @@ -241,9 +248,9 @@ fn stop_dataflow_interactive( fn stop_dataflow( uuid: Uuid, - session: &mut Option>, + session: &mut TcpRequestReplyConnection, ) -> Result<(), eyre::ErrReport> { - let reply_raw = control_connection(session)? + let reply_raw = session .request( &serde_json::to_vec(&ControlRequest::Stop { dataflow_uuid: uuid, @@ -262,9 +269,9 @@ fn stop_dataflow( fn stop_dataflow_by_name( name: String, - session: &mut Option>, + session: &mut TcpRequestReplyConnection, ) -> Result<(), eyre::ErrReport> { - let reply_raw = control_connection(session)? + let reply_raw = session .request(&serde_json::to_vec(&ControlRequest::StopByName { name }).unwrap()) .wrap_err("failed to send dataflow stop_by_name message")?; let result: ControlRequestReply = @@ -276,7 +283,7 @@ fn stop_dataflow_by_name( } } -fn list(session: &mut Option>) -> Result<(), eyre::ErrReport> { +fn list(session: &mut TcpRequestReplyConnection) -> Result<(), eyre::ErrReport> { let ids = query_running_dataflows(session)?; if ids.is_empty() { @@ -292,9 +299,9 @@ fn list(session: &mut Option>) -> Result<(), eyre } fn query_running_dataflows( - session: &mut Option>, + session: &mut TcpRequestReplyConnection, ) -> Result, eyre::ErrReport> { - let reply_raw = control_connection(session)? + let reply_raw = session .request(&serde_json::to_vec(&ControlRequest::List).unwrap()) .wrap_err("failed to send list message")?; let reply: ControlRequestReply = @@ -308,11 +315,6 @@ fn query_running_dataflows( Ok(ids) } -fn control_connection( - session: &mut Option>, -) -> eyre::Result<&mut Box> { - Ok(match session { - Some(session) => session, - None => session.insert(TcpLayer::new().connect(control_socket_addr())?), - }) +fn connect_to_coordinator() -> std::io::Result> { + TcpLayer::new().connect(control_socket_addr()) } diff --git a/binaries/cli/src/up.rs b/binaries/cli/src/up.rs index 54cd6b5f..499cf8b2 100644 --- a/binaries/cli/src/up.rs +++ b/binaries/cli/src/up.rs @@ -1,8 +1,4 @@ -use crate::{ - check::{coordinator_running, daemon_running}, - control_connection, -}; -use communication_layer_request_reply::TcpRequestReplyConnection; +use crate::{check::daemon_running, connect_to_coordinator}; use dora_core::topics::ControlRequest; use eyre::Context; use std::{fs, path::Path, process::Command, time::Duration}; @@ -17,34 +13,44 @@ pub(crate) fn up( ) -> eyre::Result<()> { let UpConfig {} = parse_dora_config(config_path)?; - if !coordinator_running()? { - start_coordinator(coordinator).wrap_err("failed to start dora-coordinator")?; - // sleep a bit until the coordinator accepts connections - while !coordinator_running()? { - std::thread::sleep(Duration::from_millis(50)); + let mut session = match connect_to_coordinator() { + Ok(session) => session, + Err(_) => { + start_coordinator(coordinator).wrap_err("failed to start dora-coordinator")?; + + loop { + match connect_to_coordinator() { + Ok(session) => break session, + Err(_) => { + // sleep a bit until the coordinator accepts connections + std::thread::sleep(Duration::from_millis(50)); + } + } + } } - } - if !daemon_running()? { + }; + + if !daemon_running(&mut *session)? { start_daemon(daemon).wrap_err("failed to start dora-daemon")?; } Ok(()) } -pub(crate) fn destroy( - config_path: Option<&Path>, - session: &mut Option>, -) -> Result<(), eyre::ErrReport> { +pub(crate) fn destroy(config_path: Option<&Path>) -> Result<(), eyre::ErrReport> { let UpConfig {} = parse_dora_config(config_path)?; - if coordinator_running()? { - // send destroy command to dora-coordinator - control_connection(session)? - .request(&serde_json::to_vec(&ControlRequest::Destroy).unwrap()) - .wrap_err("failed to send destroy message")?; - println!("Send destroy command to dora-coordinator"); - } else { - eprintln!("The dora-coordinator is not running"); + match connect_to_coordinator() { + Ok(mut session) => { + // send destroy command to dora-coordinator + session + .request(&serde_json::to_vec(&ControlRequest::Destroy).unwrap()) + .wrap_err("failed to send destroy message")?; + println!("Send destroy command to dora-coordinator"); + } + Err(_) => { + eprintln!("The dora-coordinator does not seem to be running"); + } } Ok(())