diff --git a/binaries/cli/src/attach.rs b/binaries/cli/src/attach.rs index d5edeffd..bbc43d3a 100644 --- a/binaries/cli/src/attach.rs +++ b/binaries/cli/src/attach.rs @@ -1,9 +1,9 @@ use colored::Colorize; -use communication_layer_request_reply::{TcpConnection, TcpRequestReplyConnection}; +use communication_layer_request_reply::TcpConnection; use dora_core::{ coordinator_messages::LogMessage, descriptor::{resolve_path, CoreNodeKind, Descriptor}, - topics::{ControlRequest, ControlRequestReply}, + topics::{ControlRequest, ControlRequestReply, DataflowResult}, }; use eyre::Context; use notify::event::ModifyKind; @@ -16,17 +16,17 @@ use std::{path::PathBuf, sync::mpsc, time::Duration}; use tracing::{error, info}; use uuid::Uuid; -use crate::handle_dataflow_result; - -pub fn attach_dataflow( +#[allow(clippy::too_many_arguments)] +pub fn attach_to_dataflow( + connection: &mut crate::DoraConnection, dataflow: Descriptor, dataflow_path: PathBuf, dataflow_id: Uuid, - session: &mut TcpRequestReplyConnection, hot_reload: bool, coordinator_socket: SocketAddr, log_level: log::LevelFilter, -) -> Result<(), eyre::ErrReport> { + log_output: &mut impl std::io::Write, +) -> eyre::Result { let (tx, rx) = mpsc::sync_channel(2); // Generate path hashmap @@ -181,7 +181,7 @@ pub fn attach_dataflow( None => "".normal(), }; - println!("{level}{node}{target}: {message}"); + writeln!(log_output, "{level}{node}{target}: {message}")?; continue; } Ok(AttachEvent::Log(Err(err))) => { @@ -190,16 +190,16 @@ pub fn attach_dataflow( } }; - let reply_raw = session + let reply_raw = connection + .session .request(&serde_json::to_vec(&control_request)?) .wrap_err("failed to send request message to coordinator")?; let result: ControlRequestReply = serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; match result { ControlRequestReply::DataflowStarted { uuid: _ } => (), - ControlRequestReply::DataflowStopped { uuid, result } => { - info!("dataflow {uuid} stopped"); - break handle_dataflow_result(result, Some(uuid)); + ControlRequestReply::DataflowStopped { result } => { + break Ok(result); } ControlRequestReply::DataflowReloaded { uuid } => { info!("dataflow {uuid} reloaded") diff --git a/binaries/cli/src/check.rs b/binaries/cli/src/check.rs index 49bc948e..1bb0ef6f 100644 --- a/binaries/cli/src/check.rs +++ b/binaries/cli/src/check.rs @@ -1,13 +1,12 @@ -use crate::connect_to_coordinator; -use communication_layer_request_reply::TcpRequestReplyConnection; -use dora_core::topics::{ControlRequest, ControlRequestReply}; -use eyre::{bail, Context}; +use eyre::bail; use std::{ io::{IsTerminal, Write}, net::SocketAddr, }; use termcolor::{Color, ColorChoice, ColorSpec, WriteColor}; +use crate::DoraConnection; + pub fn check_environment(coordinator_addr: SocketAddr) -> eyre::Result<()> { let mut error_occurred = false; @@ -20,7 +19,7 @@ pub fn check_environment(coordinator_addr: SocketAddr) -> eyre::Result<()> { // check whether coordinator is running write!(stdout, "Dora Coordinator: ")?; - let mut session = match connect_to_coordinator(coordinator_addr) { + let mut session = match DoraConnection::connect(coordinator_addr) { Ok(session) => { let _ = stdout.set_color(ColorSpec::new().set_fg(Some(Color::Green))); writeln!(stdout, "ok")?; @@ -39,8 +38,8 @@ pub fn check_environment(coordinator_addr: SocketAddr) -> eyre::Result<()> { // check whether daemon is running write!(stdout, "Dora Daemon: ")?; if session - .as_deref_mut() - .map(daemon_running) + .as_mut() + .map(|c| c.daemon_running()) .transpose()? .unwrap_or(false) { @@ -61,17 +60,3 @@ pub fn check_environment(coordinator_addr: SocketAddr) -> eyre::Result<()> { Ok(()) } - -pub fn daemon_running(session: &mut TcpRequestReplyConnection) -> Result { - let reply_raw = session - .request(&serde_json::to_vec(&ControlRequest::DaemonConnected).unwrap()) - .wrap_err("failed to send DaemonConnected message")?; - - let reply = serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; - let running = match reply { - ControlRequestReply::DaemonConnected(running) => running, - other => bail!("unexpected reply to daemon connection check: {other:?}"), - }; - - Ok(running) -} diff --git a/binaries/cli/src/formatting.rs b/binaries/cli/src/formatting.rs deleted file mode 100644 index f19e1599..00000000 --- a/binaries/cli/src/formatting.rs +++ /dev/null @@ -1,50 +0,0 @@ -use dora_core::topics::{DataflowResult, NodeErrorCause}; - -pub struct FormatDataflowError<'a>(pub &'a DataflowResult); - -impl std::fmt::Display for FormatDataflowError<'_> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - writeln!(f)?; - let failed = self - .0 - .node_results - .iter() - .filter_map(|(id, r)| r.as_ref().err().map(|e| (id, e))); - let total_failed = failed.clone().count(); - - let mut non_cascading: Vec<_> = failed - .clone() - .filter(|(_, e)| !matches!(e.cause, NodeErrorCause::Cascading { .. })) - .collect(); - non_cascading.sort_by_key(|(_, e)| e.timestamp); - // try to print earliest non-cascading error - let hidden = if !non_cascading.is_empty() { - let printed = non_cascading.len(); - for (id, err) in non_cascading { - writeln!(f, "Node `{id}` failed: {err}")?; - } - total_failed - printed - } else { - // no non-cascading errors -> print earliest cascading - let mut all: Vec<_> = failed.collect(); - all.sort_by_key(|(_, e)| e.timestamp); - if let Some((id, err)) = all.first() { - write!(f, "Node `{id}` failed: {err}")?; - total_failed - 1 - } else { - write!(f, "unknown error")?; - 0 - } - }; - - if hidden > 1 { - write!( - f, - "\n\nThere are {hidden} consequential errors. Check the `out/{}` folder for full details.", - self.0.uuid - )?; - } - - Ok(()) - } -} diff --git a/binaries/cli/src/graph/mod.rs b/binaries/cli/src/graph/mod.rs index c28f9eb4..f9199585 100644 --- a/binaries/cli/src/graph/mod.rs +++ b/binaries/cli/src/graph/mod.rs @@ -1,56 +1,10 @@ -use std::{fs::File, io::Write, path::Path}; +use std::path::Path; use dora_core::descriptor::Descriptor; use eyre::Context; const MERMAID_TEMPLATE: &str = include_str!("mermaid-template.html"); -pub(crate) fn create(dataflow: std::path::PathBuf, mermaid: bool, open: bool) -> eyre::Result<()> { - if mermaid { - let visualized = visualize_as_mermaid(&dataflow)?; - println!("{visualized}"); - println!( - "Paste the above output on https://mermaid.live/ or in a \ - ```mermaid code block on GitHub to display it." - ); - } else { - let html = visualize_as_html(&dataflow)?; - - let working_dir = std::env::current_dir().wrap_err("failed to get current working dir")?; - let graph_filename = match dataflow.file_stem().and_then(|n| n.to_str()) { - Some(name) => format!("{name}-graph"), - None => "graph".into(), - }; - let mut extra = 0; - let path = loop { - let adjusted_file_name = if extra == 0 { - format!("{graph_filename}.html") - } else { - format!("{graph_filename}.{extra}.html") - }; - let path = working_dir.join(&adjusted_file_name); - if path.exists() { - extra += 1; - } else { - break path; - } - }; - - let mut file = File::create(&path).context("failed to create graph HTML file")?; - file.write_all(html.as_bytes())?; - - println!( - "View graph by opening the following in your browser:\n file://{}", - path.display() - ); - - if open { - webbrowser::open(path.as_os_str().to_str().unwrap())?; - } - } - Ok(()) -} - pub fn visualize_as_html(dataflow: &Path) -> eyre::Result { let mermaid = visualize_as_mermaid(dataflow)?; Ok(MERMAID_TEMPLATE.replacen("____insert____", &mermaid, 1)) diff --git a/binaries/cli/src/lib.rs b/binaries/cli/src/lib.rs index b9128b28..e28c9db0 100644 --- a/binaries/cli/src/lib.rs +++ b/binaries/cli/src/lib.rs @@ -1,622 +1,188 @@ -use attach::attach_dataflow; +pub use crate::{ + build::build as build_dataflow, + check::check_environment, + graph::{visualize_as_html, visualize_as_mermaid}, + up::up, +}; +use attach::attach_to_dataflow; use communication_layer_request_reply::{RequestReplyLayer, TcpLayer, TcpRequestReplyConnection}; -use dora_coordinator::Event; use dora_core::{ descriptor::Descriptor, - topics::{ - ControlRequest, ControlRequestReply, DataflowList, DORA_COORDINATOR_PORT_CONTROL_DEFAULT, - DORA_COORDINATOR_PORT_DEFAULT, DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, - }, + topics::{ControlRequest, ControlRequestReply, DataflowList, DataflowResult}, }; -use dora_daemon::Daemon; -use duration_str::parse; use eyre::{bail, Context}; -use formatting::FormatDataflowError; -use std::{io::Write, net::SocketAddr, path::Path}; use std::{ - net::{IpAddr, Ipv4Addr}, + net::{IpAddr, Ipv4Addr, SocketAddr}, path::PathBuf, time::Duration, }; -use tabwriter::TabWriter; -use tokio::runtime::Builder; use uuid::Uuid; mod attach; mod build; mod check; -mod formatting; mod graph; mod logs; -mod template; +pub mod template; mod up; -const LOCALHOST: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); -const LISTEN_WILDCARD: IpAddr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)); - -#[derive(Debug, clap::Parser)] -#[clap(version)] -struct Args { - #[clap(subcommand)] - command: Command, -} +pub const LOCALHOST: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); +pub const LISTEN_WILDCARD: IpAddr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)); -/// dora-rs cli client -#[derive(Debug, clap::Subcommand)] -pub enum Command { - /// Check if the coordinator and the daemon is running. - Check { - /// Path to the dataflow descriptor file (enables additional checks) - #[clap(long, value_name = "PATH", value_hint = clap::ValueHint::FilePath)] - dataflow: Option, - /// Address of the dora coordinator - #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] - coordinator_addr: IpAddr, - /// Port number of the coordinator control server - #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] - coordinator_port: u16, - }, - /// Generate a visualization of the given graph using mermaid.js. Use --open to open browser. - Graph { - /// Path to the dataflow descriptor file - #[clap(value_name = "PATH", value_hint = clap::ValueHint::FilePath)] - dataflow: PathBuf, - /// Visualize the dataflow as a Mermaid diagram (instead of HTML) - #[clap(long, action)] - mermaid: bool, - /// Open the HTML visualization in the browser - #[clap(long, action)] - open: bool, - }, - /// Run build commands provided in the given dataflow. - Build { - /// Path to the dataflow descriptor file - #[clap(value_name = "PATH", value_hint = clap::ValueHint::FilePath)] - dataflow: PathBuf, - }, - /// Generate a new project or node. Choose the language between Rust, Python, C or C++. - New { - #[clap(flatten)] - args: CommandNew, - #[clap(hide = true, long)] - internal_create_with_path_dependencies: bool, - }, - /// Spawn coordinator and daemon in local mode (with default config) - Up { - /// Use a custom configuration - #[clap(long, hide = true, value_name = "PATH", value_hint = clap::ValueHint::FilePath)] - config: Option, - }, - /// Destroy running coordinator and daemon. If some dataflows are still running, they will be stopped first. - Destroy { - /// Use a custom configuration - #[clap(long, hide = true)] - config: Option, - /// Address of the dora coordinator - #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] - coordinator_addr: IpAddr, - /// Port number of the coordinator control server - #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] - coordinator_port: u16, - }, - /// Start the given dataflow path. Attach a name to the running dataflow by using --name. - Start { - /// Path to the dataflow descriptor file - #[clap(value_name = "PATH", value_hint = clap::ValueHint::FilePath)] - dataflow: PathBuf, - /// Assign a name to the dataflow - #[clap(long)] - name: Option, - /// Address of the dora coordinator - #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] - coordinator_addr: IpAddr, - /// Port number of the coordinator control server - #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] - coordinator_port: u16, - /// Attach to the dataflow and wait for its completion - #[clap(long, action)] - attach: bool, - /// Run the dataflow in background - #[clap(long, action)] - detach: bool, - /// Enable hot reloading (Python only) - #[clap(long, action)] - hot_reload: bool, - }, - /// Stop the given dataflow UUID. If no id is provided, you will be able to choose between the running dataflows. - Stop { - /// UUID of the dataflow that should be stopped - uuid: Option, - /// Name of the dataflow that should be stopped - #[clap(long)] - name: Option, - /// Kill the dataflow if it doesn't stop after the given duration - #[clap(long, value_name = "DURATION")] - #[arg(value_parser = parse)] - grace_duration: Option, - /// Address of the dora coordinator - #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] - coordinator_addr: IpAddr, - /// Port number of the coordinator control server - #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] - coordinator_port: u16, - }, - /// List running dataflows. - List { - /// Address of the dora coordinator - #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] - coordinator_addr: IpAddr, - /// Port number of the coordinator control server - #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] - coordinator_port: u16, - }, - // Planned for future releases: - // Dashboard, - /// Show logs of a given dataflow and node. - #[command(allow_missing_positional = true)] - Logs { - /// Identifier of the dataflow - #[clap(value_name = "UUID_OR_NAME")] - dataflow: Option, - /// Show logs for the given node - #[clap(value_name = "NAME")] - node: String, - /// Address of the dora coordinator - #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] - coordinator_addr: IpAddr, - /// Port number of the coordinator control server - #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] - coordinator_port: u16, - }, - // Metrics, - // Stats, - // Get, - // Upgrade, - /// Run daemon - Daemon { - /// Unique identifier for the machine (required for distributed dataflows) - #[clap(long)] - machine_id: Option, - /// The inter daemon IP address and port this daemon will bind to. - #[clap(long, default_value_t = SocketAddr::new(LISTEN_WILDCARD, 0))] - inter_daemon_addr: SocketAddr, - /// Local listen port for event such as dynamic node. - #[clap(long, default_value_t = DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT)] - local_listen_port: u16, - /// Address and port number of the dora coordinator - #[clap(long, default_value_t = SocketAddr::new(LOCALHOST, DORA_COORDINATOR_PORT_DEFAULT))] - coordinator_addr: SocketAddr, - #[clap(long, hide = true)] - run_dataflow: Option, - /// Suppresses all log output to stdout. - #[clap(long)] - quiet: bool, - }, - /// Run runtime - Runtime, - /// Run coordinator - Coordinator { - /// Network interface to bind to for daemon communication - #[clap(long, default_value_t = LISTEN_WILDCARD)] - interface: IpAddr, - /// Port number to bind to for daemon communication - #[clap(long, default_value_t = DORA_COORDINATOR_PORT_DEFAULT)] - port: u16, - /// Network interface to bind to for control communication - #[clap(long, default_value_t = LISTEN_WILDCARD)] - control_interface: IpAddr, - /// Port number to bind to for control communication - #[clap(long, default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] - control_port: u16, - /// Suppresses all log output to stdout. - #[clap(long)] - quiet: bool, - }, +pub struct DoraConnection { + session: Box, } -#[derive(Debug, clap::Args)] -pub struct CommandNew { - /// The entity that should be created - #[clap(long, value_enum, default_value_t = Kind::Dataflow)] - kind: Kind, - /// The programming language that should be used - #[clap(long, value_enum, default_value_t = Lang::Rust)] - lang: Lang, - /// Desired name of the entity - name: String, - /// Where to create the entity - #[clap(hide = true)] - path: Option, -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)] -enum Kind { - Dataflow, - CustomNode, -} +impl DoraConnection { + pub fn connect(coordinator_addr: SocketAddr) -> std::io::Result { + Ok(Self { + session: TcpLayer::new().connect(coordinator_addr)?, + }) + } -#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)] -enum Lang { - Rust, - Python, - C, - Cxx, -} + pub fn daemon_running(&mut self) -> eyre::Result { + let reply_raw = self + .session + .request(&serde_json::to_vec(&ControlRequest::DaemonConnected).unwrap()) + .wrap_err("failed to send DaemonConnected message")?; -pub fn run(command: Command, dora_cli_path: PathBuf) -> eyre::Result<()> { - let log_level = env_logger::Builder::new() - .filter_level(log::LevelFilter::Info) - .parse_default_env() - .build() - .filter(); + let reply = serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; + let running = match reply { + ControlRequestReply::DaemonConnected(running) => running, + other => bail!("unexpected reply to daemon connection check: {other:?}"), + }; - match command { - Command::Check { - dataflow, - coordinator_addr, - coordinator_port, - } => match dataflow { - Some(dataflow) => { - let working_dir = dataflow - .canonicalize() - .context("failed to canonicalize dataflow path")? - .parent() - .ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))? - .to_owned(); - Descriptor::blocking_read(&dataflow)?.check(&working_dir)?; - check::check_environment((coordinator_addr, coordinator_port).into())? - } - None => check::check_environment((coordinator_addr, coordinator_port).into())?, - }, - Command::Graph { - dataflow, - mermaid, - open, - } => { - graph::create(dataflow, mermaid, open)?; - } - Command::Build { dataflow } => { - build::build(&dataflow)?; - } - Command::New { - args, - internal_create_with_path_dependencies, - } => template::create(args, internal_create_with_path_dependencies)?, - Command::Up { config } => { - up::up(config.as_deref(), &dora_cli_path)?; - } - Command::Logs { - dataflow, - node, - coordinator_addr, - coordinator_port, - } => { - let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into()) - .wrap_err("failed to connect to dora coordinator")?; - let list = query_running_dataflows(&mut *session) - .wrap_err("failed to query running dataflows")?; - if let Some(dataflow) = dataflow { - let uuid = Uuid::parse_str(&dataflow).ok(); - let name = if uuid.is_some() { None } else { Some(dataflow) }; - logs::logs(&mut *session, uuid, name, node)? - } else { - let active = list.get_active(); - let uuid = match &active[..] { - [] => bail!("No dataflows are running"), - [uuid] => uuid.clone(), - _ => inquire::Select::new("Choose dataflow to show logs:", active).prompt()?, - }; - logs::logs(&mut *session, Some(uuid.uuid), None, node)? - } - } - Command::Start { - dataflow, - name, - coordinator_addr, - coordinator_port, - attach, - detach, - hot_reload, - } => { - let dataflow_descriptor = - Descriptor::blocking_read(&dataflow).wrap_err("Failed to read yaml dataflow")?; - let working_dir = dataflow - .canonicalize() - .context("failed to canonicalize dataflow path")? - .parent() - .ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))? - .to_owned(); - if !coordinator_addr.is_loopback() { - dataflow_descriptor.check_in_daemon(&working_dir, &[], true)?; - } else { - dataflow_descriptor - .check(&working_dir) - .wrap_err("Could not validate yaml")?; - } + Ok(running) + } - let coordinator_socket = (coordinator_addr, coordinator_port).into(); - let mut session = connect_to_coordinator(coordinator_socket) - .wrap_err("failed to connect to dora coordinator")?; - let dataflow_id = start_dataflow( - dataflow_descriptor.clone(), - name, - working_dir, - &mut *session, - )?; + pub fn query_running_dataflows(&mut self) -> eyre::Result { + let reply_raw = self + .session + .request(&serde_json::to_vec(&ControlRequest::List).unwrap()) + .wrap_err("failed to send list message")?; + let reply: ControlRequestReply = + serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; + let ids = match reply { + ControlRequestReply::DataflowList(list) => list, + ControlRequestReply::Error(err) => bail!("{err}"), + other => bail!("unexpected list dataflow reply: {other:?}"), + }; - let attach = match (attach, detach) { - (true, true) => eyre::bail!("both `--attach` and `--detach` are given"), - (true, false) => true, - (false, true) => false, - (false, false) => { - println!("attaching to dataflow (use `--detach` to run in background)"); - true - } - }; + Ok(ids) + } - if attach { - attach_dataflow( - dataflow_descriptor, + pub fn start_dataflow( + &mut self, + dataflow: Descriptor, + name: Option, + local_working_dir: PathBuf, + ) -> Result { + let reply_raw = self + .session + .request( + &serde_json::to_vec(&ControlRequest::Start { dataflow, - dataflow_id, - &mut *session, - hot_reload, - coordinator_socket, - log_level, - )? - } - } - Command::List { - coordinator_addr, - coordinator_port, - } => match connect_to_coordinator((coordinator_addr, coordinator_port).into()) { - Ok(mut session) => list(&mut *session)?, - Err(_) => { - bail!("No dora coordinator seems to be running."); - } - }, - Command::Stop { - uuid, - name, - grace_duration, - coordinator_addr, - coordinator_port, - } => { - let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into()) - .wrap_err("could not connect to dora coordinator")?; - match (uuid, name) { - (Some(uuid), _) => stop_dataflow(uuid, grace_duration, &mut *session)?, - (None, Some(name)) => stop_dataflow_by_name(name, grace_duration, &mut *session)?, - (None, None) => stop_dataflow_interactive(grace_duration, &mut *session)?, - } - } - Command::Destroy { - config, - coordinator_addr, - coordinator_port, - } => up::destroy( - config.as_deref(), - (coordinator_addr, coordinator_port).into(), - )?, - Command::Coordinator { - interface, - port, - control_interface, - control_port, - quiet, - } => { - let rt = Builder::new_multi_thread() - .enable_all() - .build() - .context("tokio runtime failed")?; - rt.block_on(async { - let bind = SocketAddr::new(interface, port); - let bind_control = SocketAddr::new(control_interface, control_port); - let (port, task) = - dora_coordinator::start(bind, bind_control, futures::stream::empty::()) - .await?; - if !quiet { - println!("Listening for incoming daemon connection on {port}"); - } - task.await - }) - .context("failed to run dora-coordinator")? - } - Command::Daemon { - coordinator_addr, - inter_daemon_addr, - local_listen_port, - machine_id, - run_dataflow, - quiet: _, - } => { - let rt = Builder::new_multi_thread() - .enable_all() - .build() - .context("tokio runtime failed")?; - rt.block_on(async { - match run_dataflow { - Some(dataflow_path) => { - tracing::info!("Starting dataflow `{}`", dataflow_path.display()); - if coordinator_addr != SocketAddr::new(LOCALHOST, DORA_COORDINATOR_PORT_DEFAULT){ - tracing::info!( - "Not using coordinator addr {} as `run_dataflow` is for local dataflow only. Please use the `start` command for remote coordinator", - coordinator_addr - ); - } - - let result = Daemon::run_dataflow(&dataflow_path, dora_cli_path.to_owned()).await?; - handle_dataflow_result(result, None) - } - None => { - if coordinator_addr.ip() == LOCALHOST { - tracing::info!("Starting in local mode"); - } - Daemon::run(coordinator_addr, machine_id.unwrap_or_default(), inter_daemon_addr, local_listen_port, dora_cli_path.to_owned()).await - } - } - }) - .context("failed to run dora-daemon")? - } - Command::Runtime => dora_runtime::main().context("Failed to run dora-runtime")?, - }; - - Ok(()) -} - -fn start_dataflow( - dataflow: Descriptor, - name: Option, - local_working_dir: PathBuf, - session: &mut TcpRequestReplyConnection, -) -> Result { - let reply_raw = session - .request( - &serde_json::to_vec(&ControlRequest::Start { - dataflow, - name, - local_working_dir, - }) - .unwrap(), - ) - .wrap_err("failed to send start dataflow message")?; - - let result: ControlRequestReply = - serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; - match result { - ControlRequestReply::DataflowStarted { uuid } => { - eprintln!("{uuid}"); - Ok(uuid) + name, + local_working_dir, + }) + .unwrap(), + ) + .wrap_err("failed to send start dataflow message")?; + + let result: ControlRequestReply = + serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; + match result { + ControlRequestReply::DataflowStarted { uuid } => Ok(uuid), + ControlRequestReply::Error(err) => bail!("{err}"), + other => bail!("unexpected start dataflow reply: {other:?}"), } - ControlRequestReply::Error(err) => bail!("{err}"), - other => bail!("unexpected start dataflow reply: {other:?}"), } -} -fn stop_dataflow_interactive( - grace_duration: Option, - session: &mut TcpRequestReplyConnection, -) -> eyre::Result<()> { - let list = query_running_dataflows(session).wrap_err("failed to query running dataflows")?; - let active = list.get_active(); - if active.is_empty() { - eprintln!("No dataflows are running"); - } else { - let selection = inquire::Select::new("Choose dataflow to stop:", active).prompt()?; - stop_dataflow(selection.uuid, grace_duration, session)?; + pub fn dataflow_logs( + &mut self, + uuid: Option, + name: Option, + node: String, + ) -> eyre::Result<()> { + logs::dataflow_logs(self, uuid, name, node) } - Ok(()) -} - -fn stop_dataflow( - uuid: Uuid, - grace_duration: Option, - session: &mut TcpRequestReplyConnection, -) -> Result<(), eyre::ErrReport> { - let reply_raw = session - .request( - &serde_json::to_vec(&ControlRequest::Stop { - dataflow_uuid: uuid, - grace_duration, - }) - .unwrap(), + #[allow(clippy::too_many_arguments)] + pub fn attach_to_dataflow( + &mut self, + dataflow: Descriptor, + dataflow_path: PathBuf, + dataflow_id: Uuid, + hot_reload: bool, + coordinator_socket: SocketAddr, + log_level: log::LevelFilter, + log_output: &mut impl std::io::Write, + ) -> eyre::Result { + attach_to_dataflow( + self, + dataflow, + dataflow_path, + dataflow_id, + hot_reload, + coordinator_socket, + log_level, + log_output, ) - .wrap_err("failed to send dataflow stop message")?; - let result: ControlRequestReply = - serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; - match result { - ControlRequestReply::DataflowStopped { uuid, result } => { - handle_dataflow_result(result, Some(uuid)) - } - ControlRequestReply::Error(err) => bail!("{err}"), - other => bail!("unexpected stop dataflow reply: {other:?}"), } -} -fn handle_dataflow_result( - result: dora_core::topics::DataflowResult, - uuid: Option, -) -> Result<(), eyre::Error> { - if result.is_ok() { - Ok(()) - } else { - Err(match uuid { - Some(uuid) => { - eyre::eyre!("Dataflow {uuid} failed:\n{}", FormatDataflowError(&result)) - } - None => { - eyre::eyre!("Dataflow failed:\n{}", FormatDataflowError(&result)) - } - }) + pub fn stop_dataflow( + &mut self, + uuid: Uuid, + grace_duration: Option, + ) -> eyre::Result { + let reply_raw = self + .session + .request( + &serde_json::to_vec(&ControlRequest::Stop { + dataflow_uuid: uuid, + grace_duration, + }) + .unwrap(), + ) + .wrap_err("failed to send dataflow stop message")?; + let result: ControlRequestReply = + serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; + match result { + ControlRequestReply::DataflowStopped { result } => Ok(result), + ControlRequestReply::Error(err) => bail!("{err}"), + other => bail!("unexpected stop dataflow reply: {other:?}"), + } } -} -fn stop_dataflow_by_name( - name: String, - grace_duration: Option, - session: &mut TcpRequestReplyConnection, -) -> Result<(), eyre::ErrReport> { - let reply_raw = session - .request( - &serde_json::to_vec(&ControlRequest::StopByName { - name, - grace_duration, - }) - .unwrap(), - ) - .wrap_err("failed to send dataflow stop_by_name message")?; - let result: ControlRequestReply = - serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; - match result { - ControlRequestReply::DataflowStopped { uuid, result } => { - handle_dataflow_result(result, Some(uuid)) + pub fn stop_dataflow_by_name( + &mut self, + name: String, + grace_duration: Option, + ) -> eyre::Result { + let reply_raw = self + .session + .request( + &serde_json::to_vec(&ControlRequest::StopByName { + name, + grace_duration, + }) + .unwrap(), + ) + .wrap_err("failed to send dataflow stop_by_name message")?; + let result: ControlRequestReply = + serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; + match result { + ControlRequestReply::DataflowStopped { result } => Ok(result), + ControlRequestReply::Error(err) => bail!("{err}"), + other => bail!("unexpected stop dataflow reply: {other:?}"), } - ControlRequestReply::Error(err) => bail!("{err}"), - other => bail!("unexpected stop dataflow reply: {other:?}"), } -} -fn list(session: &mut TcpRequestReplyConnection) -> Result<(), eyre::ErrReport> { - let list = query_running_dataflows(session)?; - - let mut tw = TabWriter::new(vec![]); - tw.write_all(b"UUID\tName\tStatus\n")?; - for entry in list.0 { - let uuid = entry.id.uuid; - let name = entry.id.name.unwrap_or_default(); - let status = match entry.status { - dora_core::topics::DataflowStatus::Running => "Running", - dora_core::topics::DataflowStatus::Finished => "Succeeded", - dora_core::topics::DataflowStatus::Failed => "Failed", - }; - tw.write_all(format!("{uuid}\t{name}\t{status}\n").as_bytes())?; + pub fn destroy(mut self) -> eyre::Result<()> { + // send destroy command to dora-coordinator + self.session + .request(&serde_json::to_vec(&ControlRequest::Destroy).unwrap()) + .wrap_err("failed to send destroy message")?; + Ok(()) } - tw.flush()?; - let formatted = String::from_utf8(tw.into_inner()?)?; - - println!("{formatted}"); - - Ok(()) -} - -fn query_running_dataflows(session: &mut TcpRequestReplyConnection) -> eyre::Result { - let reply_raw = session - .request(&serde_json::to_vec(&ControlRequest::List).unwrap()) - .wrap_err("failed to send list message")?; - let reply: ControlRequestReply = - serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; - let ids = match reply { - ControlRequestReply::DataflowList(list) => list, - ControlRequestReply::Error(err) => bail!("{err}"), - other => bail!("unexpected list dataflow reply: {other:?}"), - }; - - Ok(ids) -} - -fn connect_to_coordinator( - coordinator_addr: SocketAddr, -) -> std::io::Result> { - TcpLayer::new().connect(coordinator_addr) } diff --git a/binaries/cli/src/logs.rs b/binaries/cli/src/logs.rs index a15f11b1..150d1771 100644 --- a/binaries/cli/src/logs.rs +++ b/binaries/cli/src/logs.rs @@ -1,18 +1,20 @@ -use communication_layer_request_reply::TcpRequestReplyConnection; use dora_core::topics::{ControlRequest, ControlRequestReply}; use eyre::{bail, Context, Result}; use uuid::Uuid; use bat::{Input, PrettyPrinter}; -pub fn logs( - session: &mut TcpRequestReplyConnection, +use crate::DoraConnection; + +pub fn dataflow_logs( + connection: &mut DoraConnection, uuid: Option, name: Option, node: String, ) -> Result<()> { let logs = { - let reply_raw = session + let reply_raw = connection + .session .request( &serde_json::to_vec(&ControlRequest::Logs { uuid, diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index f8e2b6e9..5787230b 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -1,10 +1,29 @@ +use std::{ + fs::File, + io::Write, + net::{IpAddr, SocketAddr}, + path::PathBuf, + time::Duration, +}; + use clap::Parser; use colored::Colorize; -use dora_cli::{run, Command}; +use dora_cli::{check_environment, template, DoraConnection, LISTEN_WILDCARD, LOCALHOST}; +use dora_core::{ + descriptor::Descriptor, + topics::{ + DataflowResult, NodeErrorCause, DORA_COORDINATOR_PORT_CONTROL_DEFAULT, + DORA_COORDINATOR_PORT_DEFAULT, DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, + }, +}; +use dora_daemon::Daemon; #[cfg(feature = "tracing")] use dora_tracing::set_up_tracing; use dora_tracing::set_up_tracing_opts; -use eyre::Context; +use eyre::{bail, Context}; +use tabwriter::TabWriter; +use tracing::info; +use uuid::Uuid; fn main() { if let Err(err) = main_inner() { @@ -14,13 +33,6 @@ fn main() { } } -#[derive(Debug, clap::Parser)] -#[clap(version)] -pub struct Args { - #[clap(subcommand)] - command: Command, -} - fn main_inner() -> eyre::Result<()> { let args = Args::parse(); @@ -54,3 +66,586 @@ fn main_inner() -> eyre::Result<()> { std::env::current_exe().wrap_err("failed to get current executable path")?; run(args.command, dora_cli_path) } + +#[derive(Debug, clap::Parser)] +#[clap(version)] +pub struct Args { + #[clap(subcommand)] + command: Command, +} + +/// dora-rs cli client +#[derive(Debug, clap::Subcommand)] +pub enum Command { + /// Check if the coordinator and the daemon is running. + Check { + /// Path to the dataflow descriptor file (enables additional checks) + #[clap(long, value_name = "PATH", value_hint = clap::ValueHint::FilePath)] + dataflow: Option, + /// Address of the dora coordinator + #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] + coordinator_addr: IpAddr, + /// Port number of the coordinator control server + #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] + coordinator_port: u16, + }, + /// Generate a visualization of the given graph using mermaid.js. Use --open to open browser. + Graph { + /// Path to the dataflow descriptor file + #[clap(value_name = "PATH", value_hint = clap::ValueHint::FilePath)] + dataflow: PathBuf, + /// Visualize the dataflow as a Mermaid diagram (instead of HTML) + #[clap(long, action)] + mermaid: bool, + /// Open the HTML visualization in the browser + #[clap(long, action)] + open: bool, + }, + /// Run build commands provided in the given dataflow. + Build { + /// Path to the dataflow descriptor file + #[clap(value_name = "PATH", value_hint = clap::ValueHint::FilePath)] + dataflow: PathBuf, + }, + /// Generate a new project or node. Choose the language between Rust, Python, C or C++. + New { + #[clap(flatten)] + args: template::CreateArgs, + #[clap(hide = true, long)] + internal_create_with_path_dependencies: bool, + }, + /// Spawn coordinator and daemon in local mode (with default config) + Up { + /// Use a custom configuration + #[clap(long, hide = true, value_name = "PATH", value_hint = clap::ValueHint::FilePath)] + config: Option, + }, + /// Destroy running coordinator and daemon. If some dataflows are still running, they will be stopped first. + Destroy { + /// Use a custom configuration + #[clap(long, hide = true)] + config: Option, + /// Address of the dora coordinator + #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] + coordinator_addr: IpAddr, + /// Port number of the coordinator control server + #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] + coordinator_port: u16, + }, + /// Start the given dataflow path. Attach a name to the running dataflow by using --name. + Start { + /// Path to the dataflow descriptor file + #[clap(value_name = "PATH", value_hint = clap::ValueHint::FilePath)] + dataflow: PathBuf, + /// Assign a name to the dataflow + #[clap(long)] + name: Option, + /// Address of the dora coordinator + #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] + coordinator_addr: IpAddr, + /// Port number of the coordinator control server + #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] + coordinator_port: u16, + /// Attach to the dataflow and wait for its completion + #[clap(long, action)] + attach: bool, + /// Run the dataflow in background + #[clap(long, action)] + detach: bool, + /// Enable hot reloading (Python only) + #[clap(long, action)] + hot_reload: bool, + }, + /// Stop the given dataflow UUID. If no id is provided, you will be able to choose between the running dataflows. + Stop { + /// UUID of the dataflow that should be stopped + uuid: Option, + /// Name of the dataflow that should be stopped + #[clap(long)] + name: Option, + /// Kill the dataflow if it doesn't stop after the given duration + #[clap(long, value_name = "DURATION")] + #[arg(value_parser = duration_str::parse)] + grace_duration: Option, + /// Address of the dora coordinator + #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] + coordinator_addr: IpAddr, + /// Port number of the coordinator control server + #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] + coordinator_port: u16, + }, + /// List running dataflows. + List { + /// Address of the dora coordinator + #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] + coordinator_addr: IpAddr, + /// Port number of the coordinator control server + #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] + coordinator_port: u16, + }, + // Planned for future releases: + // Dashboard, + /// Show logs of a given dataflow and node. + #[command(allow_missing_positional = true)] + Logs { + /// Identifier of the dataflow + #[clap(value_name = "UUID_OR_NAME")] + dataflow: Option, + /// Show logs for the given node + #[clap(value_name = "NAME")] + node: String, + /// Address of the dora coordinator + #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] + coordinator_addr: IpAddr, + /// Port number of the coordinator control server + #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] + coordinator_port: u16, + }, + // Metrics, + // Stats, + // Get, + // Upgrade, + /// Run daemon + Daemon { + /// Unique identifier for the machine (required for distributed dataflows) + #[clap(long)] + machine_id: Option, + /// The inter daemon IP address and port this daemon will bind to. + #[clap(long, default_value_t = SocketAddr::new(LISTEN_WILDCARD, 0))] + inter_daemon_addr: SocketAddr, + /// Local listen port for event such as dynamic node. + #[clap(long, default_value_t = DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT)] + local_listen_port: u16, + /// Address and port number of the dora coordinator + #[clap(long, default_value_t = SocketAddr::new(LOCALHOST, DORA_COORDINATOR_PORT_DEFAULT))] + coordinator_addr: SocketAddr, + #[clap(long, hide = true)] + run_dataflow: Option, + /// Suppresses all log output to stdout. + #[clap(long)] + quiet: bool, + }, + /// Run runtime + Runtime, + /// Run coordinator + Coordinator { + /// Network interface to bind to for daemon communication + #[clap(long, default_value_t = LISTEN_WILDCARD)] + interface: IpAddr, + /// Port number to bind to for daemon communication + #[clap(long, default_value_t = DORA_COORDINATOR_PORT_DEFAULT)] + port: u16, + /// Network interface to bind to for control communication + #[clap(long, default_value_t = LISTEN_WILDCARD)] + control_interface: IpAddr, + /// Port number to bind to for control communication + #[clap(long, default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] + control_port: u16, + /// Suppresses all log output to stdout. + #[clap(long)] + quiet: bool, + }, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)] +enum Kind { + Dataflow, + CustomNode, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)] +enum Lang { + Rust, + Python, + C, + Cxx, +} + +pub fn run(command: Command, dora_cli_path: PathBuf) -> eyre::Result<()> { + let log_level = env_logger::Builder::new() + .filter_level(log::LevelFilter::Info) + .parse_default_env() + .build() + .filter(); + + match command { + Command::Check { + dataflow, + coordinator_addr, + coordinator_port, + } => match dataflow { + Some(dataflow) => { + let working_dir = dataflow + .canonicalize() + .context("failed to canonicalize dataflow path")? + .parent() + .ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))? + .to_owned(); + Descriptor::blocking_read(&dataflow)?.check(&working_dir)?; + check_environment((coordinator_addr, coordinator_port).into())? + } + None => check_environment((coordinator_addr, coordinator_port).into())?, + }, + Command::Graph { + dataflow, + mermaid, + open, + } => { + create_dataflow_graph(dataflow, mermaid, open)?; + } + Command::Build { dataflow } => { + dora_cli::build_dataflow(&dataflow)?; + } + Command::New { + args, + internal_create_with_path_dependencies, + } => template::create(args, internal_create_with_path_dependencies)?, + Command::Up { config } => { + dora_cli::up(config.as_deref(), &dora_cli_path)?; + } + Command::Logs { + dataflow, + node, + coordinator_addr, + coordinator_port, + } => { + let mut session = DoraConnection::connect((coordinator_addr, coordinator_port).into()) + .wrap_err("failed to connect to dora coordinator")?; + let list = session + .query_running_dataflows() + .wrap_err("failed to query running dataflows")?; + if let Some(dataflow) = dataflow { + let uuid = Uuid::parse_str(&dataflow).ok(); + let name = if uuid.is_some() { None } else { Some(dataflow) }; + session.dataflow_logs(uuid, name, node)? + } else { + let active = list.get_active(); + let uuid = match &active[..] { + [] => bail!("No dataflows are running"), + [uuid] => uuid.clone(), + _ => inquire::Select::new("Choose dataflow to show logs:", active).prompt()?, + }; + session.dataflow_logs(Some(uuid.uuid), None, node)? + } + } + Command::Start { + dataflow, + name, + coordinator_addr, + coordinator_port, + attach, + detach, + hot_reload, + } => { + let dataflow_descriptor = + Descriptor::blocking_read(&dataflow).wrap_err("Failed to read yaml dataflow")?; + let working_dir = dataflow + .canonicalize() + .context("failed to canonicalize dataflow path")? + .parent() + .ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))? + .to_owned(); + if !coordinator_addr.is_loopback() { + dataflow_descriptor.check_in_daemon(&working_dir, &[], true)?; + } else { + dataflow_descriptor + .check(&working_dir) + .wrap_err("Could not validate yaml")?; + } + + let coordinator_socket = (coordinator_addr, coordinator_port).into(); + let mut session = DoraConnection::connect(coordinator_socket) + .wrap_err("failed to connect to dora coordinator")?; + let dataflow_id = + session.start_dataflow(dataflow_descriptor.clone(), name, working_dir)?; + eprintln!("{dataflow_id}"); + + let attach = match (attach, detach) { + (true, true) => eyre::bail!("both `--attach` and `--detach` are given"), + (true, false) => true, + (false, true) => false, + (false, false) => { + println!("attaching to dataflow (use `--detach` to run in background)"); + true + } + }; + + if attach { + let result = session.attach_to_dataflow( + dataflow_descriptor, + dataflow, + dataflow_id, + hot_reload, + coordinator_socket, + log_level, + &mut std::io::stdout(), + )?; + info!("dataflow {} stopped", result.uuid); + handle_dataflow_result(result)?; + } + } + Command::List { + coordinator_addr, + coordinator_port, + } => match DoraConnection::connect((coordinator_addr, coordinator_port).into()) { + Ok(mut session) => list_dataflows(&mut session)?, + Err(_) => { + bail!("No dora coordinator seems to be running."); + } + }, + Command::Stop { + uuid, + name, + grace_duration, + coordinator_addr, + coordinator_port, + } => { + let mut session = DoraConnection::connect((coordinator_addr, coordinator_port).into()) + .wrap_err("could not connect to dora coordinator")?; + let result = match (uuid, name) { + (Some(uuid), _) => Some(session.stop_dataflow(uuid, grace_duration)?), + (None, Some(name)) => Some(session.stop_dataflow_by_name(name, grace_duration)?), + (None, None) => stop_dataflow_interactive(grace_duration, &mut session)?, + }; + if let Some(result) = result { + handle_dataflow_result(result)?; + } + } + Command::Destroy { + config: _, + coordinator_addr, + coordinator_port, + } => destroy((coordinator_addr, coordinator_port).into())?, + Command::Coordinator { + interface, + port, + control_interface, + control_port, + quiet, + } => { + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .context("tokio runtime failed")?; + rt.block_on(async { + let bind = SocketAddr::new(interface, port); + let bind_control = SocketAddr::new(control_interface, control_port); + let (port, task) = dora_coordinator::start( + bind, + bind_control, + futures::stream::empty::(), + ) + .await?; + if !quiet { + println!("Listening for incoming daemon connection on {port}"); + } + task.await + }) + .context("failed to run dora-coordinator")? + } + Command::Daemon { + coordinator_addr, + inter_daemon_addr, + local_listen_port, + machine_id, + run_dataflow, + quiet: _, + } => { + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .context("tokio runtime failed")?; + rt.block_on(async { + match run_dataflow { + Some(dataflow_path) => { + tracing::info!("Starting dataflow `{}`", dataflow_path.display()); + if coordinator_addr != SocketAddr::new(LOCALHOST, DORA_COORDINATOR_PORT_DEFAULT){ + tracing::info!( + "Not using coordinator addr {} as `run_dataflow` is for local dataflow only. Please use the `start` command for remote coordinator", + coordinator_addr + ); + } + + let result = Daemon::run_dataflow(&dataflow_path, dora_cli_path.to_owned()).await?; + handle_dataflow_result(result) + } + None => { + if coordinator_addr.ip() == LOCALHOST { + tracing::info!("Starting in local mode"); + } + Daemon::run(coordinator_addr, machine_id.unwrap_or_default(), inter_daemon_addr, local_listen_port, dora_cli_path.to_owned()).await + } + } + }) + .context("failed to run dora-daemon")? + } + Command::Runtime => dora_runtime::main().context("Failed to run dora-runtime")?, + }; + + Ok(()) +} + +fn create_dataflow_graph( + dataflow: std::path::PathBuf, + mermaid: bool, + open: bool, +) -> eyre::Result<()> { + if mermaid { + let visualized = dora_cli::visualize_as_mermaid(&dataflow)?; + println!("{visualized}"); + println!( + "Paste the above output on https://mermaid.live/ or in a \ + ```mermaid code block on GitHub to display it." + ); + } else { + let html = dora_cli::visualize_as_html(&dataflow)?; + + let working_dir = std::env::current_dir().wrap_err("failed to get current working dir")?; + let graph_filename = match dataflow.file_stem().and_then(|n| n.to_str()) { + Some(name) => format!("{name}-graph"), + None => "graph".into(), + }; + let mut extra = 0; + let path = loop { + let adjusted_file_name = if extra == 0 { + format!("{graph_filename}.html") + } else { + format!("{graph_filename}.{extra}.html") + }; + let path = working_dir.join(&adjusted_file_name); + if path.exists() { + extra += 1; + } else { + break path; + } + }; + + let mut file = File::create(&path).context("failed to create graph HTML file")?; + file.write_all(html.as_bytes())?; + + println!( + "View graph by opening the following in your browser:\n file://{}", + path.display() + ); + + if open { + webbrowser::open(path.as_os_str().to_str().unwrap())?; + } + } + Ok(()) +} + +fn list_dataflows(session: &mut DoraConnection) -> Result<(), eyre::ErrReport> { + let list = session.query_running_dataflows()?; + + let mut tw = TabWriter::new(vec![]); + tw.write_all(b"UUID\tName\tStatus\n")?; + for entry in list.0 { + let uuid = entry.id.uuid; + let name = entry.id.name.unwrap_or_default(); + let status = match entry.status { + dora_core::topics::DataflowStatus::Running => "Running", + dora_core::topics::DataflowStatus::Finished => "Succeeded", + dora_core::topics::DataflowStatus::Failed => "Failed", + }; + tw.write_all(format!("{uuid}\t{name}\t{status}\n").as_bytes())?; + } + tw.flush()?; + let formatted = String::from_utf8(tw.into_inner()?)?; + + println!("{formatted}"); + + Ok(()) +} + +fn stop_dataflow_interactive( + grace_duration: Option, + session: &mut DoraConnection, +) -> eyre::Result> { + let list = session + .query_running_dataflows() + .wrap_err("failed to query running dataflows")?; + let active = list.get_active(); + if active.is_empty() { + eprintln!("No dataflows are running"); + Ok(None) + } else { + let selection = inquire::Select::new("Choose dataflow to stop:", active).prompt()?; + Ok(Some(session.stop_dataflow(selection.uuid, grace_duration)?)) + } +} + +pub fn destroy(coordinator_addr: SocketAddr) -> Result<(), eyre::ErrReport> { + match DoraConnection::connect(coordinator_addr) { + Ok(session) => { + session.destroy()?; + println!("Send destroy command to dora-coordinator"); + } + Err(_) => { + eprintln!("Could not connect to dora-coordinator"); + } + } + + Ok(()) +} + +fn handle_dataflow_result(result: dora_core::topics::DataflowResult) -> Result<(), eyre::Error> { + if result.is_ok() { + Ok(()) + } else { + Err(eyre::eyre!( + "Dataflow {} failed:\n{}", + result.uuid, + FormatDataflowError(&result) + )) + } +} + +struct FormatDataflowError<'a>(pub &'a DataflowResult); + +impl std::fmt::Display for FormatDataflowError<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + writeln!(f)?; + let failed = self + .0 + .node_results + .iter() + .filter_map(|(id, r)| r.as_ref().err().map(|e| (id, e))); + let total_failed = failed.clone().count(); + + let mut non_cascading: Vec<_> = failed + .clone() + .filter(|(_, e)| !matches!(e.cause, NodeErrorCause::Cascading { .. })) + .collect(); + non_cascading.sort_by_key(|(_, e)| e.timestamp); + // try to print earliest non-cascading error + let hidden = if !non_cascading.is_empty() { + let printed = non_cascading.len(); + for (id, err) in non_cascading { + writeln!(f, "Node `{id}` failed: {err}")?; + } + total_failed - printed + } else { + // no non-cascading errors -> print earliest cascading + let mut all: Vec<_> = failed.collect(); + all.sort_by_key(|(_, e)| e.timestamp); + if let Some((id, err)) = all.first() { + write!(f, "Node `{id}` failed: {err}")?; + total_failed - 1 + } else { + write!(f, "unknown error")?; + 0 + } + }; + + if hidden > 1 { + write!( + f, + "\n\nThere are {hidden} consequential errors. Check the `out/{}` folder for full details.", + self.0.uuid + )?; + } + + Ok(()) + } +} diff --git a/binaries/cli/src/template/c/mod.rs b/binaries/cli/src/template/c/mod.rs index 378c572c..e5103852 100644 --- a/binaries/cli/src/template/c/mod.rs +++ b/binaries/cli/src/template/c/mod.rs @@ -5,12 +5,14 @@ use std::{ path::{Path, PathBuf}, }; +use super::Kind; + const NODE: &str = include_str!("node/node-template.c"); const TALKER: &str = include_str!("talker/talker-template.c"); const LISTENER: &str = include_str!("listener/listener-template.c"); -pub fn create(args: crate::CommandNew, use_path_deps: bool) -> eyre::Result<()> { - let crate::CommandNew { +pub fn create(args: super::CreateArgs, use_path_deps: bool) -> eyre::Result<()> { + let super::CreateArgs { kind, lang: _, name, @@ -18,8 +20,8 @@ pub fn create(args: crate::CommandNew, use_path_deps: bool) -> eyre::Result<()> } = args; match kind { - crate::Kind::CustomNode => create_custom_node(name, path, NODE), - crate::Kind::Dataflow => create_dataflow(name, path, use_path_deps), + Kind::CustomNode => create_custom_node(name, path, NODE), + Kind::Dataflow => create_dataflow(name, path, use_path_deps), } } diff --git a/binaries/cli/src/template/cxx/mod.rs b/binaries/cli/src/template/cxx/mod.rs index 09935031..0bc9404e 100644 --- a/binaries/cli/src/template/cxx/mod.rs +++ b/binaries/cli/src/template/cxx/mod.rs @@ -4,12 +4,14 @@ use std::{ path::{Path, PathBuf}, }; +use super::Kind; + const NODE: &str = include_str!("node-template.cc"); const TALKER: &str = include_str!("talker-template.cc"); const LISTENER: &str = include_str!("listener-template.cc"); -pub fn create(args: crate::CommandNew, use_path_deps: bool) -> eyre::Result<()> { - let crate::CommandNew { +pub fn create(args: super::CreateArgs, use_path_deps: bool) -> eyre::Result<()> { + let super::CreateArgs { kind, lang: _, name, @@ -17,8 +19,8 @@ pub fn create(args: crate::CommandNew, use_path_deps: bool) -> eyre::Result<()> } = args; match kind { - crate::Kind::CustomNode => create_custom_node(name, path, NODE), - crate::Kind::Dataflow => create_dataflow(name, path, use_path_deps), + Kind::CustomNode => create_custom_node(name, path, NODE), + Kind::Dataflow => create_dataflow(name, path, use_path_deps), } } diff --git a/binaries/cli/src/template/mod.rs b/binaries/cli/src/template/mod.rs index a1d59367..e6a8e29d 100644 --- a/binaries/cli/src/template/mod.rs +++ b/binaries/cli/src/template/mod.rs @@ -1,13 +1,44 @@ +use std::path::PathBuf; + mod c; mod cxx; mod python; mod rust; -pub fn create(args: crate::CommandNew, use_path_deps: bool) -> eyre::Result<()> { +#[derive(Debug, clap::Args)] +pub struct CreateArgs { + /// The entity that should be created + #[clap(long, value_enum, default_value_t = Kind::Dataflow)] + pub kind: Kind, + /// The programming language that should be used + #[clap(long, value_enum, default_value_t = Lang::Rust)] + pub lang: Lang, + /// Desired name of the entity + pub name: String, + /// Where to create the entity + #[clap(hide = true)] + pub path: Option, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)] +pub enum Kind { + Dataflow, + CustomNode, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)] +pub enum Lang { + Rust, + Python, + C, + Cxx, +} + +pub fn create(args: CreateArgs, use_path_deps: bool) -> eyre::Result<()> { match args.lang { - crate::Lang::Rust => rust::create(args, use_path_deps), - crate::Lang::Python => python::create(args), - crate::Lang::C => c::create(args, use_path_deps), - crate::Lang::Cxx => cxx::create(args, use_path_deps), + Lang::Rust => rust::create(args, use_path_deps), + Lang::Python => python::create(args), + Lang::C => c::create(args, use_path_deps), + Lang::Cxx => cxx::create(args, use_path_deps), } } diff --git a/binaries/cli/src/template/python/mod.rs b/binaries/cli/src/template/python/mod.rs index 4c8eb435..22f38ca5 100644 --- a/binaries/cli/src/template/python/mod.rs +++ b/binaries/cli/src/template/python/mod.rs @@ -4,12 +4,14 @@ use std::{ path::{Path, PathBuf}, }; +use super::Kind; + const NODE_PY: &str = include_str!("node/node-template.py"); const TALKER_PY: &str = include_str!("talker/talker-template.py"); const LISTENER_PY: &str = include_str!("listener/listener-template.py"); -pub fn create(args: crate::CommandNew) -> eyre::Result<()> { - let crate::CommandNew { +pub fn create(args: super::CreateArgs) -> eyre::Result<()> { + let super::CreateArgs { kind, lang: _, name, @@ -17,8 +19,8 @@ pub fn create(args: crate::CommandNew) -> eyre::Result<()> { } = args; match kind { - crate::Kind::CustomNode => create_custom_node(name, path, NODE_PY), - crate::Kind::Dataflow => create_dataflow(name, path), + Kind::CustomNode => create_custom_node(name, path, NODE_PY), + Kind::Dataflow => create_dataflow(name, path), } } diff --git a/binaries/cli/src/template/rust/mod.rs b/binaries/cli/src/template/rust/mod.rs index a0f4717c..3307b213 100644 --- a/binaries/cli/src/template/rust/mod.rs +++ b/binaries/cli/src/template/rust/mod.rs @@ -4,13 +4,15 @@ use std::{ path::{Path, PathBuf}, }; +use super::Kind; + const MAIN_RS: &str = include_str!("node/main-template.rs"); const TALKER_RS: &str = include_str!("talker/main-template.rs"); const LISTENER_RS: &str = include_str!("listener/main-template.rs"); const VERSION: &str = env!("CARGO_PKG_VERSION"); -pub fn create(args: crate::CommandNew, use_path_deps: bool) -> eyre::Result<()> { - let crate::CommandNew { +pub fn create(args: super::CreateArgs, use_path_deps: bool) -> eyre::Result<()> { + let super::CreateArgs { kind, lang: _, name, @@ -18,8 +20,8 @@ pub fn create(args: crate::CommandNew, use_path_deps: bool) -> eyre::Result<()> } = args; match kind { - crate::Kind::CustomNode => create_custom_node(name, path, use_path_deps, MAIN_RS), - crate::Kind::Dataflow => create_dataflow(name, path, use_path_deps), + Kind::CustomNode => create_custom_node(name, path, use_path_deps, MAIN_RS), + Kind::Dataflow => create_dataflow(name, path, use_path_deps), } } diff --git a/binaries/cli/src/up.rs b/binaries/cli/src/up.rs index a59a6121..81f3c2ae 100644 --- a/binaries/cli/src/up.rs +++ b/binaries/cli/src/up.rs @@ -1,20 +1,21 @@ -use crate::{check::daemon_running, connect_to_coordinator, LOCALHOST}; -use dora_core::topics::{ControlRequest, DORA_COORDINATOR_PORT_CONTROL_DEFAULT}; +use crate::{DoraConnection, LOCALHOST}; +use dora_core::topics::DORA_COORDINATOR_PORT_CONTROL_DEFAULT; use eyre::Context; -use std::{fs, net::SocketAddr, path::Path, process::Command, time::Duration}; +use std::{fs, path::Path, process::Command, time::Duration}; + #[derive(Debug, Default, serde::Serialize, serde::Deserialize)] struct UpConfig {} -pub(crate) fn up(config_path: Option<&Path>, dora_cli_path: &Path) -> eyre::Result<()> { +pub fn up(config_path: Option<&Path>, dora_cli_path: &Path) -> eyre::Result<()> { let UpConfig {} = parse_dora_config(config_path)?; let coordinator_addr = (LOCALHOST, DORA_COORDINATOR_PORT_CONTROL_DEFAULT).into(); - let mut session = match connect_to_coordinator(coordinator_addr) { + let mut session = match DoraConnection::connect(coordinator_addr) { Ok(session) => session, Err(_) => { start_coordinator(dora_cli_path).wrap_err("failed to start dora-coordinator")?; loop { - match connect_to_coordinator(coordinator_addr) { + match DoraConnection::connect(coordinator_addr) { Ok(session) => break session, Err(_) => { // sleep a bit until the coordinator accepts connections @@ -25,14 +26,14 @@ pub(crate) fn up(config_path: Option<&Path>, dora_cli_path: &Path) -> eyre::Resu } }; - if !daemon_running(&mut *session)? { + if !session.daemon_running()? { start_daemon(dora_cli_path).wrap_err("failed to start dora-daemon")?; // wait a bit until daemon is connected let mut i = 0; const WAIT_S: f32 = 0.1; loop { - if daemon_running(&mut *session)? { + if session.daemon_running()? { break; } i += 1; @@ -46,27 +47,6 @@ pub(crate) fn up(config_path: Option<&Path>, dora_cli_path: &Path) -> eyre::Resu Ok(()) } -pub(crate) fn destroy( - config_path: Option<&Path>, - coordinator_addr: SocketAddr, -) -> Result<(), eyre::ErrReport> { - let UpConfig {} = parse_dora_config(config_path)?; - match connect_to_coordinator(coordinator_addr) { - Ok(mut session) => { - // send destroy command to dora-coordinator - session - .request(&serde_json::to_vec(&ControlRequest::Destroy).unwrap()) - .wrap_err("failed to send destroy message")?; - println!("Send destroy command to dora-coordinator"); - } - Err(_) => { - eprintln!("Could not connect to dora-coordinator"); - } - } - - Ok(()) -} - fn parse_dora_config(config_path: Option<&Path>) -> Result { let path = config_path.or_else(|| Some(Path::new("dora-config.yml")).filter(|p| p.exists())); let config = match path { diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 1c88d981..88ef16b5 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -288,7 +288,6 @@ async fn start_inner( if entry.get_mut().machines.is_empty() { let finished_dataflow = entry.remove(); let reply = ControlRequestReply::DataflowStopped { - uuid, result: dataflow_results .get(&uuid) .map(|r| dataflow_result(r, uuid, &clock)) @@ -354,7 +353,6 @@ async fn start_inner( uuid: dataflow_uuid, }, None => ControlRequestReply::DataflowStopped { - uuid: dataflow_uuid, result: dataflow_results .get(&dataflow_uuid) .map(|r| dataflow_result(r, dataflow_uuid, &clock)) @@ -615,7 +613,6 @@ async fn stop_dataflow_by_uuid( let Some(dataflow) = running_dataflows.get_mut(&dataflow_uuid) else { if let Some(result) = dataflow_results.get(&dataflow_uuid) { let reply = ControlRequestReply::DataflowStopped { - uuid: dataflow_uuid, result: dataflow_result(result, dataflow_uuid, clock), }; let _ = reply_sender.send(Ok(reply)); diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index b38d77d0..acb7dc14 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -36,6 +36,7 @@ use tokio::{ use tracing::error; /// clock is required for generating timestamps when dropping messages early because queue is full +#[allow(clippy::too_many_arguments)] pub async fn spawn_node( dataflow_id: DataflowId, working_dir: &Path, diff --git a/libraries/core/src/topics.rs b/libraries/core/src/topics.rs index 5448555d..0ee9237f 100644 --- a/libraries/core/src/topics.rs +++ b/libraries/core/src/topics.rs @@ -93,7 +93,7 @@ pub enum ControlRequestReply { CoordinatorStopped, DataflowStarted { uuid: Uuid }, DataflowReloaded { uuid: Uuid }, - DataflowStopped { uuid: Uuid, result: DataflowResult }, + DataflowStopped { result: DataflowResult }, DataflowList(DataflowList), DestroyOk, DaemonConnected(bool), @@ -118,6 +118,7 @@ impl Display for DataflowId { } #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] +#[must_use] pub struct DataflowResult { pub uuid: Uuid, pub timestamp: uhlc::Timestamp,