| @@ -11,13 +11,11 @@ use std::{path::PathBuf, sync::mpsc, time::Duration}; | |||
| use tracing::{error, info}; | |||
| use uuid::Uuid; | |||
| use crate::control_connection; | |||
| pub fn attach_dataflow( | |||
| dataflow: Descriptor, | |||
| dataflow_path: PathBuf, | |||
| dataflow_id: Uuid, | |||
| session: &mut Option<Box<TcpRequestReplyConnection>>, | |||
| session: &mut TcpRequestReplyConnection, | |||
| hot_reload: bool, | |||
| ) -> Result<(), eyre::ErrReport> { | |||
| let (tx, rx) = mpsc::sync_channel(2); | |||
| @@ -70,7 +68,7 @@ pub fn attach_dataflow( | |||
| if let Some((dataflow_id, node_id, operator_id)) = node_path_lookup.get(&path) { | |||
| watcher_tx | |||
| .send(ControlRequest::Reload { | |||
| dataflow_id: dataflow_id.clone(), | |||
| dataflow_id: *dataflow_id, | |||
| node_id: node_id.clone(), | |||
| operator_id: operator_id.clone(), | |||
| }) | |||
| @@ -123,7 +121,7 @@ pub fn attach_dataflow( | |||
| Ok(reload_event) => reload_event, | |||
| }; | |||
| let reply_raw = control_connection(session)? | |||
| let reply_raw = session | |||
| .request(&serde_json::to_vec(&control_request)?) | |||
| .wrap_err("failed to send request message to coordinator")?; | |||
| let result: ControlRequestReply = | |||
| @@ -1,4 +1,5 @@ | |||
| use crate::control_connection; | |||
| use crate::connect_to_coordinator; | |||
| use communication_layer_request_reply::TcpRequestReplyConnection; | |||
| use dora_core::topics::{ControlRequest, ControlRequestReply}; | |||
| use eyre::{bail, Context}; | |||
| use std::io::Write; | |||
| @@ -16,19 +17,30 @@ pub fn check_environment() -> eyre::Result<()> { | |||
| // check whether coordinator is running | |||
| write!(stdout, "Dora Coordinator: ")?; | |||
| if coordinator_running()? { | |||
| let _ = stdout.set_color(ColorSpec::new().set_fg(Some(Color::Green))); | |||
| writeln!(stdout, "ok")?; | |||
| } else { | |||
| let _ = stdout.set_color(ColorSpec::new().set_fg(Some(Color::Red))); | |||
| writeln!(stdout, "not running")?; | |||
| error_occured = true; | |||
| } | |||
| let mut session = match connect_to_coordinator() { | |||
| Ok(session) => { | |||
| let _ = stdout.set_color(ColorSpec::new().set_fg(Some(Color::Green))); | |||
| writeln!(stdout, "ok")?; | |||
| Some(session) | |||
| } | |||
| Err(_) => { | |||
| let _ = stdout.set_color(ColorSpec::new().set_fg(Some(Color::Red))); | |||
| writeln!(stdout, "not running")?; | |||
| error_occured = true; | |||
| None | |||
| } | |||
| }; | |||
| let _ = stdout.reset(); | |||
| // check whether daemon is running | |||
| write!(stdout, "Dora Daemon: ")?; | |||
| if daemon_running()? { | |||
| if session | |||
| .as_deref_mut() | |||
| .map(daemon_running) | |||
| .transpose()? | |||
| .unwrap_or(false) | |||
| { | |||
| let _ = stdout.set_color(ColorSpec::new().set_fg(Some(Color::Green))); | |||
| writeln!(stdout, "ok")?; | |||
| } else { | |||
| @@ -47,30 +59,16 @@ pub fn check_environment() -> eyre::Result<()> { | |||
| Ok(()) | |||
| } | |||
| pub fn coordinator_running() -> Result<bool, eyre::ErrReport> { | |||
| let mut control_session = None; | |||
| let connected = control_connection(&mut control_session).is_ok(); | |||
| Ok(connected) | |||
| } | |||
| pub fn daemon_running() -> Result<bool, eyre::ErrReport> { | |||
| let mut control_session = None; | |||
| let running = match control_connection(&mut control_session) { | |||
| Ok(connection) => { | |||
| let reply_raw = connection | |||
| .request(&serde_json::to_vec(&ControlRequest::DaemonConnected).unwrap()) | |||
| .wrap_err("failed to send DaemonConnected message")?; | |||
| pub fn daemon_running(session: &mut TcpRequestReplyConnection) -> Result<bool, eyre::ErrReport> { | |||
| let reply_raw = session | |||
| .request(&serde_json::to_vec(&ControlRequest::DaemonConnected).unwrap()) | |||
| .wrap_err("failed to send DaemonConnected message")?; | |||
| let reply = serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; | |||
| match reply { | |||
| ControlRequestReply::DaemonConnected(running) => running, | |||
| other => bail!("unexpected reply to daemon connection check: {other:?}"), | |||
| } | |||
| } | |||
| Err(_) => { | |||
| // coordinator is not running | |||
| false | |||
| } | |||
| let reply = serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; | |||
| let running = match reply { | |||
| ControlRequestReply::DaemonConnected(running) => running, | |||
| other => bail!("unexpected reply to daemon connection check: {other:?}"), | |||
| }; | |||
| Ok(running) | |||
| } | |||
| @@ -124,8 +124,6 @@ fn main() -> eyre::Result<()> { | |||
| set_up_tracing("dora-cli").context("failed to set up tracing subscriber")?; | |||
| let args = Args::parse(); | |||
| let mut session = None; | |||
| match args.command { | |||
| Command::Check { | |||
| dataflow, | |||
| @@ -171,25 +169,36 @@ fn main() -> eyre::Result<()> { | |||
| dataflow_description | |||
| .check(&dataflow, None) | |||
| .wrap_err("Could not validate yaml")?; | |||
| let dataflow_id = start_dataflow(dataflow.clone(), name, &mut session)?; | |||
| let mut session = | |||
| connect_to_coordinator().wrap_err("failed to connect to dora coordinator")?; | |||
| let dataflow_id = start_dataflow(dataflow.clone(), name, &mut *session)?; | |||
| if attach { | |||
| attach_dataflow( | |||
| dataflow_description, | |||
| dataflow, | |||
| dataflow_id, | |||
| &mut session, | |||
| &mut *session, | |||
| hot_reload, | |||
| )? | |||
| } | |||
| } | |||
| Command::List => list(&mut session)?, | |||
| Command::Stop { uuid, name } => match (uuid, name) { | |||
| (Some(uuid), _) => stop_dataflow(uuid, &mut session)?, | |||
| (None, Some(name)) => stop_dataflow_by_name(name, &mut session)?, | |||
| (None, None) => stop_dataflow_interactive(&mut session)?, | |||
| Command::List => match connect_to_coordinator() { | |||
| Ok(mut session) => list(&mut *session)?, | |||
| Err(_) => { | |||
| eprintln!("No dora coordinator seems to be running."); | |||
| } | |||
| }, | |||
| Command::Destroy { config } => up::destroy(config.as_deref(), &mut session)?, | |||
| Command::Stop { uuid, name } => { | |||
| let mut session = | |||
| connect_to_coordinator().wrap_err("could not connect to dora coordinator")?; | |||
| match (uuid, name) { | |||
| (Some(uuid), _) => stop_dataflow(uuid, &mut *session)?, | |||
| (None, Some(name)) => stop_dataflow_by_name(name, &mut *session)?, | |||
| (None, None) => stop_dataflow_interactive(&mut *session)?, | |||
| } | |||
| } | |||
| Command::Destroy { config } => up::destroy(config.as_deref())?, | |||
| } | |||
| Ok(()) | |||
| @@ -198,12 +207,12 @@ fn main() -> eyre::Result<()> { | |||
| fn start_dataflow( | |||
| dataflow: PathBuf, | |||
| name: Option<String>, | |||
| session: &mut Option<Box<TcpRequestReplyConnection>>, | |||
| session: &mut TcpRequestReplyConnection, | |||
| ) -> Result<Uuid, eyre::ErrReport> { | |||
| let canonicalized = dataflow | |||
| .canonicalize() | |||
| .wrap_err("given dataflow file does not exist")?; | |||
| let reply_raw = control_connection(session)? | |||
| let reply_raw = session | |||
| .request( | |||
| &serde_json::to_vec(&ControlRequest::Start { | |||
| dataflow_path: canonicalized, | |||
| @@ -225,9 +234,7 @@ fn start_dataflow( | |||
| } | |||
| } | |||
| fn stop_dataflow_interactive( | |||
| session: &mut Option<Box<TcpRequestReplyConnection>>, | |||
| ) -> eyre::Result<()> { | |||
| fn stop_dataflow_interactive(session: &mut TcpRequestReplyConnection) -> eyre::Result<()> { | |||
| let uuids = query_running_dataflows(session).wrap_err("failed to query running dataflows")?; | |||
| if uuids.is_empty() { | |||
| eprintln!("No dataflows are running"); | |||
| @@ -241,9 +248,9 @@ fn stop_dataflow_interactive( | |||
| fn stop_dataflow( | |||
| uuid: Uuid, | |||
| session: &mut Option<Box<TcpRequestReplyConnection>>, | |||
| session: &mut TcpRequestReplyConnection, | |||
| ) -> Result<(), eyre::ErrReport> { | |||
| let reply_raw = control_connection(session)? | |||
| let reply_raw = session | |||
| .request( | |||
| &serde_json::to_vec(&ControlRequest::Stop { | |||
| dataflow_uuid: uuid, | |||
| @@ -262,9 +269,9 @@ fn stop_dataflow( | |||
| fn stop_dataflow_by_name( | |||
| name: String, | |||
| session: &mut Option<Box<TcpRequestReplyConnection>>, | |||
| session: &mut TcpRequestReplyConnection, | |||
| ) -> Result<(), eyre::ErrReport> { | |||
| let reply_raw = control_connection(session)? | |||
| let reply_raw = session | |||
| .request(&serde_json::to_vec(&ControlRequest::StopByName { name }).unwrap()) | |||
| .wrap_err("failed to send dataflow stop_by_name message")?; | |||
| let result: ControlRequestReply = | |||
| @@ -276,7 +283,7 @@ fn stop_dataflow_by_name( | |||
| } | |||
| } | |||
| fn list(session: &mut Option<Box<TcpRequestReplyConnection>>) -> Result<(), eyre::ErrReport> { | |||
| fn list(session: &mut TcpRequestReplyConnection) -> Result<(), eyre::ErrReport> { | |||
| let ids = query_running_dataflows(session)?; | |||
| if ids.is_empty() { | |||
| @@ -292,9 +299,9 @@ fn list(session: &mut Option<Box<TcpRequestReplyConnection>>) -> Result<(), eyre | |||
| } | |||
| fn query_running_dataflows( | |||
| session: &mut Option<Box<TcpRequestReplyConnection>>, | |||
| session: &mut TcpRequestReplyConnection, | |||
| ) -> Result<Vec<DataflowId>, eyre::ErrReport> { | |||
| let reply_raw = control_connection(session)? | |||
| let reply_raw = session | |||
| .request(&serde_json::to_vec(&ControlRequest::List).unwrap()) | |||
| .wrap_err("failed to send list message")?; | |||
| let reply: ControlRequestReply = | |||
| @@ -308,11 +315,6 @@ fn query_running_dataflows( | |||
| Ok(ids) | |||
| } | |||
| fn control_connection( | |||
| session: &mut Option<Box<TcpRequestReplyConnection>>, | |||
| ) -> eyre::Result<&mut Box<TcpRequestReplyConnection>> { | |||
| Ok(match session { | |||
| Some(session) => session, | |||
| None => session.insert(TcpLayer::new().connect(control_socket_addr())?), | |||
| }) | |||
| fn connect_to_coordinator() -> std::io::Result<Box<TcpRequestReplyConnection>> { | |||
| TcpLayer::new().connect(control_socket_addr()) | |||
| } | |||
| @@ -1,8 +1,4 @@ | |||
| use crate::{ | |||
| check::{coordinator_running, daemon_running}, | |||
| control_connection, | |||
| }; | |||
| use communication_layer_request_reply::TcpRequestReplyConnection; | |||
| use crate::{check::daemon_running, connect_to_coordinator}; | |||
| use dora_core::topics::ControlRequest; | |||
| use eyre::Context; | |||
| use std::{fs, path::Path, process::Command, time::Duration}; | |||
| @@ -17,34 +13,44 @@ pub(crate) fn up( | |||
| ) -> eyre::Result<()> { | |||
| let UpConfig {} = parse_dora_config(config_path)?; | |||
| if !coordinator_running()? { | |||
| start_coordinator(coordinator).wrap_err("failed to start dora-coordinator")?; | |||
| // sleep a bit until the coordinator accepts connections | |||
| while !coordinator_running()? { | |||
| std::thread::sleep(Duration::from_millis(50)); | |||
| let mut session = match connect_to_coordinator() { | |||
| Ok(session) => session, | |||
| Err(_) => { | |||
| start_coordinator(coordinator).wrap_err("failed to start dora-coordinator")?; | |||
| loop { | |||
| match connect_to_coordinator() { | |||
| Ok(session) => break session, | |||
| Err(_) => { | |||
| // sleep a bit until the coordinator accepts connections | |||
| std::thread::sleep(Duration::from_millis(50)); | |||
| } | |||
| } | |||
| } | |||
| } | |||
| } | |||
| if !daemon_running()? { | |||
| }; | |||
| if !daemon_running(&mut *session)? { | |||
| start_daemon(daemon).wrap_err("failed to start dora-daemon")?; | |||
| } | |||
| Ok(()) | |||
| } | |||
| pub(crate) fn destroy( | |||
| config_path: Option<&Path>, | |||
| session: &mut Option<Box<TcpRequestReplyConnection>>, | |||
| ) -> Result<(), eyre::ErrReport> { | |||
| pub(crate) fn destroy(config_path: Option<&Path>) -> Result<(), eyre::ErrReport> { | |||
| let UpConfig {} = parse_dora_config(config_path)?; | |||
| if coordinator_running()? { | |||
| // send destroy command to dora-coordinator | |||
| control_connection(session)? | |||
| .request(&serde_json::to_vec(&ControlRequest::Destroy).unwrap()) | |||
| .wrap_err("failed to send destroy message")?; | |||
| println!("Send destroy command to dora-coordinator"); | |||
| } else { | |||
| eprintln!("The dora-coordinator is not running"); | |||
| match connect_to_coordinator() { | |||
| Ok(mut session) => { | |||
| // send destroy command to dora-coordinator | |||
| session | |||
| .request(&serde_json::to_vec(&ControlRequest::Destroy).unwrap()) | |||
| .wrap_err("failed to send destroy message")?; | |||
| println!("Send destroy command to dora-coordinator"); | |||
| } | |||
| Err(_) => { | |||
| eprintln!("The dora-coordinator does not seem to be running"); | |||
| } | |||
| } | |||
| Ok(()) | |||