From 987ccde1e749caca886ccd95ae9cb2289d2a8151 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 12 Jun 2024 17:18:29 +0200 Subject: [PATCH] Split CLI into library and binary part Enables other executables to invoke dora CLI commands directly without going through the argument parsing. --- binaries/cli/src/lib.rs | 622 ++++++++++++++++++++++++++++++++++++++ binaries/cli/src/main.rs | 633 +-------------------------------------- 2 files changed, 634 insertions(+), 621 deletions(-) create mode 100644 binaries/cli/src/lib.rs diff --git a/binaries/cli/src/lib.rs b/binaries/cli/src/lib.rs new file mode 100644 index 00000000..e78199d4 --- /dev/null +++ b/binaries/cli/src/lib.rs @@ -0,0 +1,622 @@ +use attach::attach_dataflow; +use communication_layer_request_reply::{RequestReplyLayer, TcpLayer, TcpRequestReplyConnection}; +use dora_coordinator::Event; +use dora_core::{ + descriptor::Descriptor, + topics::{ + ControlRequest, ControlRequestReply, DataflowList, DORA_COORDINATOR_PORT_CONTROL_DEFAULT, + DORA_COORDINATOR_PORT_DEFAULT, DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, + }, +}; +use dora_daemon::Daemon; +use duration_str::parse; +use eyre::{bail, Context}; +use formatting::FormatDataflowError; +use std::{io::Write, net::SocketAddr}; +use std::{ + net::{IpAddr, Ipv4Addr}, + path::PathBuf, + time::Duration, +}; +use tabwriter::TabWriter; +use tokio::runtime::Builder; +use uuid::Uuid; + +mod attach; +mod build; +mod check; +mod formatting; +mod graph; +mod logs; +mod template; +mod up; + +const LOCALHOST: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); +const LISTEN_WILDCARD: IpAddr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)); + +#[derive(Debug, clap::Parser)] +#[clap(version)] +struct Args { + #[clap(subcommand)] + command: Command, +} + +/// dora-rs cli client +#[derive(Debug, clap::Subcommand)] +pub enum Command { + /// Check if the coordinator and the daemon is running. + Check { + /// Path to the dataflow descriptor file (enables additional checks) + #[clap(long, value_name = "PATH", value_hint = clap::ValueHint::FilePath)] + dataflow: Option, + /// Address of the dora coordinator + #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] + coordinator_addr: IpAddr, + /// Port number of the coordinator control server + #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] + coordinator_port: u16, + }, + /// Generate a visualization of the given graph using mermaid.js. Use --open to open browser. + Graph { + /// Path to the dataflow descriptor file + #[clap(value_name = "PATH", value_hint = clap::ValueHint::FilePath)] + dataflow: PathBuf, + /// Visualize the dataflow as a Mermaid diagram (instead of HTML) + #[clap(long, action)] + mermaid: bool, + /// Open the HTML visualization in the browser + #[clap(long, action)] + open: bool, + }, + /// Run build commands provided in the given dataflow. + Build { + /// Path to the dataflow descriptor file + #[clap(value_name = "PATH", value_hint = clap::ValueHint::FilePath)] + dataflow: PathBuf, + }, + /// Generate a new project or node. Choose the language between Rust, Python, C or C++. + New { + #[clap(flatten)] + args: CommandNew, + #[clap(hide = true, long)] + internal_create_with_path_dependencies: bool, + }, + /// Spawn coordinator and daemon in local mode (with default config) + Up { + /// Use a custom configuration + #[clap(long, hide = true, value_name = "PATH", value_hint = clap::ValueHint::FilePath)] + config: Option, + }, + /// Destroy running coordinator and daemon. If some dataflows are still running, they will be stopped first. + Destroy { + /// Use a custom configuration + #[clap(long, hide = true)] + config: Option, + /// Address of the dora coordinator + #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] + coordinator_addr: IpAddr, + /// Port number of the coordinator control server + #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] + coordinator_port: u16, + }, + /// Start the given dataflow path. Attach a name to the running dataflow by using --name. + Start { + /// Path to the dataflow descriptor file + #[clap(value_name = "PATH", value_hint = clap::ValueHint::FilePath)] + dataflow: PathBuf, + /// Assign a name to the dataflow + #[clap(long)] + name: Option, + /// Address of the dora coordinator + #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] + coordinator_addr: IpAddr, + /// Port number of the coordinator control server + #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] + coordinator_port: u16, + /// Attach to the dataflow and wait for its completion + #[clap(long, action)] + attach: bool, + /// Run the dataflow in background + #[clap(long, action)] + detach: bool, + /// Enable hot reloading (Python only) + #[clap(long, action)] + hot_reload: bool, + }, + /// Stop the given dataflow UUID. If no id is provided, you will be able to choose between the running dataflows. + Stop { + /// UUID of the dataflow that should be stopped + uuid: Option, + /// Name of the dataflow that should be stopped + #[clap(long)] + name: Option, + /// Kill the dataflow if it doesn't stop after the given duration + #[clap(long, value_name = "DURATION")] + #[arg(value_parser = parse)] + grace_duration: Option, + /// Address of the dora coordinator + #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] + coordinator_addr: IpAddr, + /// Port number of the coordinator control server + #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] + coordinator_port: u16, + }, + /// List running dataflows. + List { + /// Address of the dora coordinator + #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] + coordinator_addr: IpAddr, + /// Port number of the coordinator control server + #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] + coordinator_port: u16, + }, + // Planned for future releases: + // Dashboard, + /// Show logs of a given dataflow and node. + #[command(allow_missing_positional = true)] + Logs { + /// Identifier of the dataflow + #[clap(value_name = "UUID_OR_NAME")] + dataflow: Option, + /// Show logs for the given node + #[clap(value_name = "NAME")] + node: String, + /// Address of the dora coordinator + #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] + coordinator_addr: IpAddr, + /// Port number of the coordinator control server + #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] + coordinator_port: u16, + }, + // Metrics, + // Stats, + // Get, + // Upgrade, + /// Run daemon + Daemon { + /// Unique identifier for the machine (required for distributed dataflows) + #[clap(long)] + machine_id: Option, + /// The inter daemon IP address and port this daemon will bind to. + #[clap(long, default_value_t = SocketAddr::new(LISTEN_WILDCARD, 0))] + inter_daemon_addr: SocketAddr, + /// Local listen port for event such as dynamic node. + #[clap(long, default_value_t = DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT)] + local_listen_port: u16, + /// Address and port number of the dora coordinator + #[clap(long, default_value_t = SocketAddr::new(LOCALHOST, DORA_COORDINATOR_PORT_DEFAULT))] + coordinator_addr: SocketAddr, + #[clap(long, hide = true)] + run_dataflow: Option, + /// Suppresses all log output to stdout. + #[clap(long)] + quiet: bool, + }, + /// Run runtime + Runtime, + /// Run coordinator + Coordinator { + /// Network interface to bind to for daemon communication + #[clap(long, default_value_t = LISTEN_WILDCARD)] + interface: IpAddr, + /// Port number to bind to for daemon communication + #[clap(long, default_value_t = DORA_COORDINATOR_PORT_DEFAULT)] + port: u16, + /// Network interface to bind to for control communication + #[clap(long, default_value_t = LISTEN_WILDCARD)] + control_interface: IpAddr, + /// Port number to bind to for control communication + #[clap(long, default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] + control_port: u16, + /// Suppresses all log output to stdout. + #[clap(long)] + quiet: bool, + }, +} + +#[derive(Debug, clap::Args)] +pub struct CommandNew { + /// The entity that should be created + #[clap(long, value_enum, default_value_t = Kind::Dataflow)] + kind: Kind, + /// The programming language that should be used + #[clap(long, value_enum, default_value_t = Lang::Rust)] + lang: Lang, + /// Desired name of the entity + name: String, + /// Where to create the entity + #[clap(hide = true)] + path: Option, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)] +enum Kind { + Dataflow, + CustomNode, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)] +enum Lang { + Rust, + Python, + C, + Cxx, +} + +pub fn run(command: Command) -> eyre::Result<()> { + let log_level = env_logger::Builder::new() + .filter_level(log::LevelFilter::Info) + .parse_default_env() + .build() + .filter(); + + match command { + Command::Check { + dataflow, + coordinator_addr, + coordinator_port, + } => match dataflow { + Some(dataflow) => { + let working_dir = dataflow + .canonicalize() + .context("failed to canonicalize dataflow path")? + .parent() + .ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))? + .to_owned(); + Descriptor::blocking_read(&dataflow)?.check(&working_dir)?; + check::check_environment((coordinator_addr, coordinator_port).into())? + } + None => check::check_environment((coordinator_addr, coordinator_port).into())?, + }, + Command::Graph { + dataflow, + mermaid, + open, + } => { + graph::create(dataflow, mermaid, open)?; + } + Command::Build { dataflow } => { + build::build(&dataflow)?; + } + Command::New { + args, + internal_create_with_path_dependencies, + } => template::create(args, internal_create_with_path_dependencies)?, + Command::Up { config } => { + up::up(config.as_deref())?; + } + Command::Logs { + dataflow, + node, + coordinator_addr, + coordinator_port, + } => { + let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into()) + .wrap_err("failed to connect to dora coordinator")?; + let list = query_running_dataflows(&mut *session) + .wrap_err("failed to query running dataflows")?; + if let Some(dataflow) = dataflow { + let uuid = Uuid::parse_str(&dataflow).ok(); + let name = if uuid.is_some() { None } else { Some(dataflow) }; + logs::logs(&mut *session, uuid, name, node)? + } else { + let active = list.get_active(); + let uuid = match &active[..] { + [] => bail!("No dataflows are running"), + [uuid] => uuid.clone(), + _ => inquire::Select::new("Choose dataflow to show logs:", active).prompt()?, + }; + logs::logs(&mut *session, Some(uuid.uuid), None, node)? + } + } + Command::Start { + dataflow, + name, + coordinator_addr, + coordinator_port, + attach, + detach, + hot_reload, + } => { + let dataflow_descriptor = + Descriptor::blocking_read(&dataflow).wrap_err("Failed to read yaml dataflow")?; + let working_dir = dataflow + .canonicalize() + .context("failed to canonicalize dataflow path")? + .parent() + .ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))? + .to_owned(); + if !coordinator_addr.is_loopback() { + dataflow_descriptor.check_in_daemon(&working_dir, &[], true)?; + } else { + dataflow_descriptor + .check(&working_dir) + .wrap_err("Could not validate yaml")?; + } + + let coordinator_socket = (coordinator_addr, coordinator_port).into(); + let mut session = connect_to_coordinator(coordinator_socket) + .wrap_err("failed to connect to dora coordinator")?; + let dataflow_id = start_dataflow( + dataflow_descriptor.clone(), + name, + working_dir, + &mut *session, + )?; + + let attach = match (attach, detach) { + (true, true) => eyre::bail!("both `--attach` and `--detach` are given"), + (true, false) => true, + (false, true) => false, + (false, false) => { + println!("attaching to dataflow (use `--detach` to run in background)"); + true + } + }; + + if attach { + attach_dataflow( + dataflow_descriptor, + dataflow, + dataflow_id, + &mut *session, + hot_reload, + coordinator_socket, + log_level, + )? + } + } + Command::List { + coordinator_addr, + coordinator_port, + } => match connect_to_coordinator((coordinator_addr, coordinator_port).into()) { + Ok(mut session) => list(&mut *session)?, + Err(_) => { + bail!("No dora coordinator seems to be running."); + } + }, + Command::Stop { + uuid, + name, + grace_duration, + coordinator_addr, + coordinator_port, + } => { + let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into()) + .wrap_err("could not connect to dora coordinator")?; + match (uuid, name) { + (Some(uuid), _) => stop_dataflow(uuid, grace_duration, &mut *session)?, + (None, Some(name)) => stop_dataflow_by_name(name, grace_duration, &mut *session)?, + (None, None) => stop_dataflow_interactive(grace_duration, &mut *session)?, + } + } + Command::Destroy { + config, + coordinator_addr, + coordinator_port, + } => up::destroy( + config.as_deref(), + (coordinator_addr, coordinator_port).into(), + )?, + Command::Coordinator { + interface, + port, + control_interface, + control_port, + quiet, + } => { + let rt = Builder::new_multi_thread() + .enable_all() + .build() + .context("tokio runtime failed")?; + rt.block_on(async { + let bind = SocketAddr::new(interface, port); + let bind_control = SocketAddr::new(control_interface, control_port); + let (port, task) = + dora_coordinator::start(bind, bind_control, futures::stream::empty::()) + .await?; + if !quiet { + println!("Listening for incoming daemon connection on {port}"); + } + task.await + }) + .context("failed to run dora-coordinator")? + } + Command::Daemon { + coordinator_addr, + inter_daemon_addr, + local_listen_port, + machine_id, + run_dataflow, + quiet: _, + } => { + let rt = Builder::new_multi_thread() + .enable_all() + .build() + .context("tokio runtime failed")?; + rt.block_on(async { + match run_dataflow { + Some(dataflow_path) => { + tracing::info!("Starting dataflow `{}`", dataflow_path.display()); + if coordinator_addr != SocketAddr::new(LOCALHOST, DORA_COORDINATOR_PORT_DEFAULT){ + tracing::info!( + "Not using coordinator addr {} as `run_dataflow` is for local dataflow only. Please use the `start` command for remote coordinator", + coordinator_addr + ); + } + + let result = Daemon::run_dataflow(&dataflow_path).await?; + handle_dataflow_result(result, None) + } + None => { + if coordinator_addr.ip() == LOCALHOST { + tracing::info!("Starting in local mode"); + } + Daemon::run(coordinator_addr, machine_id.unwrap_or_default(), inter_daemon_addr, local_listen_port).await + } + } + }) + .context("failed to run dora-daemon")? + } + Command::Runtime => dora_runtime::main().context("Failed to run dora-runtime")?, + }; + + Ok(()) +} + +fn start_dataflow( + dataflow: Descriptor, + name: Option, + local_working_dir: PathBuf, + session: &mut TcpRequestReplyConnection, +) -> Result { + let reply_raw = session + .request( + &serde_json::to_vec(&ControlRequest::Start { + dataflow, + name, + local_working_dir, + }) + .unwrap(), + ) + .wrap_err("failed to send start dataflow message")?; + + let result: ControlRequestReply = + serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; + match result { + ControlRequestReply::DataflowStarted { uuid } => { + eprintln!("{uuid}"); + Ok(uuid) + } + ControlRequestReply::Error(err) => bail!("{err}"), + other => bail!("unexpected start dataflow reply: {other:?}"), + } +} + +fn stop_dataflow_interactive( + grace_duration: Option, + session: &mut TcpRequestReplyConnection, +) -> eyre::Result<()> { + let list = query_running_dataflows(session).wrap_err("failed to query running dataflows")?; + let active = list.get_active(); + if active.is_empty() { + eprintln!("No dataflows are running"); + } else { + let selection = inquire::Select::new("Choose dataflow to stop:", active).prompt()?; + stop_dataflow(selection.uuid, grace_duration, session)?; + } + + Ok(()) +} + +fn stop_dataflow( + uuid: Uuid, + grace_duration: Option, + session: &mut TcpRequestReplyConnection, +) -> Result<(), eyre::ErrReport> { + let reply_raw = session + .request( + &serde_json::to_vec(&ControlRequest::Stop { + dataflow_uuid: uuid, + grace_duration, + }) + .unwrap(), + ) + .wrap_err("failed to send dataflow stop message")?; + let result: ControlRequestReply = + serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; + match result { + ControlRequestReply::DataflowStopped { uuid, result } => { + handle_dataflow_result(result, Some(uuid)) + } + ControlRequestReply::Error(err) => bail!("{err}"), + other => bail!("unexpected stop dataflow reply: {other:?}"), + } +} + +fn handle_dataflow_result( + result: dora_core::topics::DataflowResult, + uuid: Option, +) -> Result<(), eyre::Error> { + if result.is_ok() { + Ok(()) + } else { + Err(match uuid { + Some(uuid) => { + eyre::eyre!("Dataflow {uuid} failed:\n{}", FormatDataflowError(&result)) + } + None => { + eyre::eyre!("Dataflow failed:\n{}", FormatDataflowError(&result)) + } + }) + } +} + +fn stop_dataflow_by_name( + name: String, + grace_duration: Option, + session: &mut TcpRequestReplyConnection, +) -> Result<(), eyre::ErrReport> { + let reply_raw = session + .request( + &serde_json::to_vec(&ControlRequest::StopByName { + name, + grace_duration, + }) + .unwrap(), + ) + .wrap_err("failed to send dataflow stop_by_name message")?; + let result: ControlRequestReply = + serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; + match result { + ControlRequestReply::DataflowStopped { uuid, result } => { + handle_dataflow_result(result, Some(uuid)) + } + ControlRequestReply::Error(err) => bail!("{err}"), + other => bail!("unexpected stop dataflow reply: {other:?}"), + } +} + +fn list(session: &mut TcpRequestReplyConnection) -> Result<(), eyre::ErrReport> { + let list = query_running_dataflows(session)?; + + let mut tw = TabWriter::new(vec![]); + tw.write_all(b"UUID\tName\tStatus\n")?; + for entry in list.0 { + let uuid = entry.id.uuid; + let name = entry.id.name.unwrap_or_default(); + let status = match entry.status { + dora_core::topics::DataflowStatus::Running => "Running", + dora_core::topics::DataflowStatus::Finished => "Succeeded", + dora_core::topics::DataflowStatus::Failed => "Failed", + }; + tw.write_all(format!("{uuid}\t{name}\t{status}\n").as_bytes())?; + } + tw.flush()?; + let formatted = String::from_utf8(tw.into_inner()?)?; + + println!("{formatted}"); + + Ok(()) +} + +fn query_running_dataflows(session: &mut TcpRequestReplyConnection) -> eyre::Result { + let reply_raw = session + .request(&serde_json::to_vec(&ControlRequest::List).unwrap()) + .wrap_err("failed to send list message")?; + let reply: ControlRequestReply = + serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; + let ids = match reply { + ControlRequestReply::DataflowList(list) => list, + ControlRequestReply::Error(err) => bail!("{err}"), + other => bail!("unexpected list dataflow reply: {other:?}"), + }; + + Ok(ids) +} + +fn connect_to_coordinator( + coordinator_addr: SocketAddr, +) -> std::io::Result> { + TcpLayer::new().connect(coordinator_addr) +} diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 8dcb6590..cce31c4e 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -1,262 +1,27 @@ -use attach::attach_dataflow; use clap::Parser; use colored::Colorize; -use communication_layer_request_reply::{RequestReplyLayer, TcpLayer, TcpRequestReplyConnection}; -use dora_coordinator::Event; -use dora_core::{ - descriptor::Descriptor, - topics::{ - ControlRequest, ControlRequestReply, DataflowList, DORA_COORDINATOR_PORT_CONTROL_DEFAULT, - DORA_COORDINATOR_PORT_DEFAULT, DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, - }, -}; -use dora_daemon::Daemon; +use dora_cli::{run, Command}; #[cfg(feature = "tracing")] use dora_tracing::set_up_tracing; use dora_tracing::set_up_tracing_opts; -use duration_str::parse; -use eyre::{bail, Context}; -use formatting::FormatDataflowError; -use std::{io::Write, net::SocketAddr}; -use std::{ - net::{IpAddr, Ipv4Addr}, - path::PathBuf, - time::Duration, -}; -use tabwriter::TabWriter; -use tokio::runtime::Builder; -use uuid::Uuid; - -mod attach; -mod build; -mod check; -mod formatting; -mod graph; -mod logs; -mod template; -mod up; - -const LOCALHOST: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); -const LISTEN_WILDCARD: IpAddr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)); - -#[derive(Debug, clap::Parser)] -#[clap(version)] -struct Args { - #[clap(subcommand)] - command: Command, -} - -/// dora-rs cli client -#[derive(Debug, clap::Subcommand)] -enum Command { - /// Check if the coordinator and the daemon is running. - Check { - /// Path to the dataflow descriptor file (enables additional checks) - #[clap(long, value_name = "PATH", value_hint = clap::ValueHint::FilePath)] - dataflow: Option, - /// Address of the dora coordinator - #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] - coordinator_addr: IpAddr, - /// Port number of the coordinator control server - #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] - coordinator_port: u16, - }, - /// Generate a visualization of the given graph using mermaid.js. Use --open to open browser. - Graph { - /// Path to the dataflow descriptor file - #[clap(value_name = "PATH", value_hint = clap::ValueHint::FilePath)] - dataflow: PathBuf, - /// Visualize the dataflow as a Mermaid diagram (instead of HTML) - #[clap(long, action)] - mermaid: bool, - /// Open the HTML visualization in the browser - #[clap(long, action)] - open: bool, - }, - /// Run build commands provided in the given dataflow. - Build { - /// Path to the dataflow descriptor file - #[clap(value_name = "PATH", value_hint = clap::ValueHint::FilePath)] - dataflow: PathBuf, - }, - /// Generate a new project or node. Choose the language between Rust, Python, C or C++. - New { - #[clap(flatten)] - args: CommandNew, - #[clap(hide = true, long)] - internal_create_with_path_dependencies: bool, - }, - /// Spawn coordinator and daemon in local mode (with default config) - Up { - /// Use a custom configuration - #[clap(long, hide = true, value_name = "PATH", value_hint = clap::ValueHint::FilePath)] - config: Option, - }, - /// Destroy running coordinator and daemon. If some dataflows are still running, they will be stopped first. - Destroy { - /// Use a custom configuration - #[clap(long, hide = true)] - config: Option, - /// Address of the dora coordinator - #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] - coordinator_addr: IpAddr, - /// Port number of the coordinator control server - #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] - coordinator_port: u16, - }, - /// Start the given dataflow path. Attach a name to the running dataflow by using --name. - Start { - /// Path to the dataflow descriptor file - #[clap(value_name = "PATH", value_hint = clap::ValueHint::FilePath)] - dataflow: PathBuf, - /// Assign a name to the dataflow - #[clap(long)] - name: Option, - /// Address of the dora coordinator - #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] - coordinator_addr: IpAddr, - /// Port number of the coordinator control server - #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] - coordinator_port: u16, - /// Attach to the dataflow and wait for its completion - #[clap(long, action)] - attach: bool, - /// Run the dataflow in background - #[clap(long, action)] - detach: bool, - /// Enable hot reloading (Python only) - #[clap(long, action)] - hot_reload: bool, - }, - /// Stop the given dataflow UUID. If no id is provided, you will be able to choose between the running dataflows. - Stop { - /// UUID of the dataflow that should be stopped - uuid: Option, - /// Name of the dataflow that should be stopped - #[clap(long)] - name: Option, - /// Kill the dataflow if it doesn't stop after the given duration - #[clap(long, value_name = "DURATION")] - #[arg(value_parser = parse)] - grace_duration: Option, - /// Address of the dora coordinator - #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] - coordinator_addr: IpAddr, - /// Port number of the coordinator control server - #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] - coordinator_port: u16, - }, - /// List running dataflows. - List { - /// Address of the dora coordinator - #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] - coordinator_addr: IpAddr, - /// Port number of the coordinator control server - #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] - coordinator_port: u16, - }, - // Planned for future releases: - // Dashboard, - /// Show logs of a given dataflow and node. - #[command(allow_missing_positional = true)] - Logs { - /// Identifier of the dataflow - #[clap(value_name = "UUID_OR_NAME")] - dataflow: Option, - /// Show logs for the given node - #[clap(value_name = "NAME")] - node: String, - /// Address of the dora coordinator - #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] - coordinator_addr: IpAddr, - /// Port number of the coordinator control server - #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] - coordinator_port: u16, - }, - // Metrics, - // Stats, - // Get, - // Upgrade, - /// Run daemon - Daemon { - /// Unique identifier for the machine (required for distributed dataflows) - #[clap(long)] - machine_id: Option, - /// The inter daemon IP address and port this daemon will bind to. - #[clap(long, default_value_t = SocketAddr::new(LISTEN_WILDCARD, 0))] - inter_daemon_addr: SocketAddr, - /// Local listen port for event such as dynamic node. - #[clap(long, default_value_t = DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT)] - local_listen_port: u16, - /// Address and port number of the dora coordinator - #[clap(long, default_value_t = SocketAddr::new(LOCALHOST, DORA_COORDINATOR_PORT_DEFAULT))] - coordinator_addr: SocketAddr, - #[clap(long, hide = true)] - run_dataflow: Option, - /// Suppresses all log output to stdout. - #[clap(long)] - quiet: bool, - }, - /// Run runtime - Runtime, - /// Run coordinator - Coordinator { - /// Network interface to bind to for daemon communication - #[clap(long, default_value_t = LISTEN_WILDCARD)] - interface: IpAddr, - /// Port number to bind to for daemon communication - #[clap(long, default_value_t = DORA_COORDINATOR_PORT_DEFAULT)] - port: u16, - /// Network interface to bind to for control communication - #[clap(long, default_value_t = LISTEN_WILDCARD)] - control_interface: IpAddr, - /// Port number to bind to for control communication - #[clap(long, default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] - control_port: u16, - /// Suppresses all log output to stdout. - #[clap(long)] - quiet: bool, - }, -} - -#[derive(Debug, clap::Args)] -pub struct CommandNew { - /// The entity that should be created - #[clap(long, value_enum, default_value_t = Kind::Dataflow)] - kind: Kind, - /// The programming language that should be used - #[clap(long, value_enum, default_value_t = Lang::Rust)] - lang: Lang, - /// Desired name of the entity - name: String, - /// Where to create the entity - #[clap(hide = true)] - path: Option, -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)] -enum Kind { - Dataflow, - CustomNode, -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)] -enum Lang { - Rust, - Python, - C, - Cxx, -} +use eyre::Context; fn main() { - if let Err(err) = run() { + if let Err(err) = main_inner() { eprintln!("\n\n{}", "[ERROR]".bold().red()); eprintln!("{err:#}"); std::process::exit(1); } } -fn run() -> eyre::Result<()> { +#[derive(Debug, clap::Parser)] +#[clap(version)] +pub struct Args { + #[clap(subcommand)] + command: Command, +} + +fn main_inner() -> eyre::Result<()> { let args = Args::parse(); #[cfg(feature = "tracing")] @@ -285,379 +50,5 @@ fn run() -> eyre::Result<()> { } }; - let log_level = env_logger::Builder::new() - .filter_level(log::LevelFilter::Info) - .parse_default_env() - .build() - .filter(); - - match args.command { - Command::Check { - dataflow, - coordinator_addr, - coordinator_port, - } => match dataflow { - Some(dataflow) => { - let working_dir = dataflow - .canonicalize() - .context("failed to canonicalize dataflow path")? - .parent() - .ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))? - .to_owned(); - Descriptor::blocking_read(&dataflow)?.check(&working_dir)?; - check::check_environment((coordinator_addr, coordinator_port).into())? - } - None => check::check_environment((coordinator_addr, coordinator_port).into())?, - }, - Command::Graph { - dataflow, - mermaid, - open, - } => { - graph::create(dataflow, mermaid, open)?; - } - Command::Build { dataflow } => { - build::build(&dataflow)?; - } - Command::New { - args, - internal_create_with_path_dependencies, - } => template::create(args, internal_create_with_path_dependencies)?, - Command::Up { config } => { - up::up(config.as_deref())?; - } - Command::Logs { - dataflow, - node, - coordinator_addr, - coordinator_port, - } => { - let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into()) - .wrap_err("failed to connect to dora coordinator")?; - let list = query_running_dataflows(&mut *session) - .wrap_err("failed to query running dataflows")?; - if let Some(dataflow) = dataflow { - let uuid = Uuid::parse_str(&dataflow).ok(); - let name = if uuid.is_some() { None } else { Some(dataflow) }; - logs::logs(&mut *session, uuid, name, node)? - } else { - let active = list.get_active(); - let uuid = match &active[..] { - [] => bail!("No dataflows are running"), - [uuid] => uuid.clone(), - _ => inquire::Select::new("Choose dataflow to show logs:", active).prompt()?, - }; - logs::logs(&mut *session, Some(uuid.uuid), None, node)? - } - } - Command::Start { - dataflow, - name, - coordinator_addr, - coordinator_port, - attach, - detach, - hot_reload, - } => { - let dataflow_descriptor = - Descriptor::blocking_read(&dataflow).wrap_err("Failed to read yaml dataflow")?; - let working_dir = dataflow - .canonicalize() - .context("failed to canonicalize dataflow path")? - .parent() - .ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))? - .to_owned(); - if !coordinator_addr.is_loopback() { - dataflow_descriptor.check_in_daemon(&working_dir, &[], true)?; - } else { - dataflow_descriptor - .check(&working_dir) - .wrap_err("Could not validate yaml")?; - } - - let coordinator_socket = (coordinator_addr, coordinator_port).into(); - let mut session = connect_to_coordinator(coordinator_socket) - .wrap_err("failed to connect to dora coordinator")?; - let dataflow_id = start_dataflow( - dataflow_descriptor.clone(), - name, - working_dir, - &mut *session, - )?; - - let attach = match (attach, detach) { - (true, true) => eyre::bail!("both `--attach` and `--detach` are given"), - (true, false) => true, - (false, true) => false, - (false, false) => { - println!("attaching to dataflow (use `--detach` to run in background)"); - true - } - }; - - if attach { - attach_dataflow( - dataflow_descriptor, - dataflow, - dataflow_id, - &mut *session, - hot_reload, - coordinator_socket, - log_level, - )? - } - } - Command::List { - coordinator_addr, - coordinator_port, - } => match connect_to_coordinator((coordinator_addr, coordinator_port).into()) { - Ok(mut session) => list(&mut *session)?, - Err(_) => { - bail!("No dora coordinator seems to be running."); - } - }, - Command::Stop { - uuid, - name, - grace_duration, - coordinator_addr, - coordinator_port, - } => { - let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into()) - .wrap_err("could not connect to dora coordinator")?; - match (uuid, name) { - (Some(uuid), _) => stop_dataflow(uuid, grace_duration, &mut *session)?, - (None, Some(name)) => stop_dataflow_by_name(name, grace_duration, &mut *session)?, - (None, None) => stop_dataflow_interactive(grace_duration, &mut *session)?, - } - } - Command::Destroy { - config, - coordinator_addr, - coordinator_port, - } => up::destroy( - config.as_deref(), - (coordinator_addr, coordinator_port).into(), - )?, - Command::Coordinator { - interface, - port, - control_interface, - control_port, - quiet, - } => { - let rt = Builder::new_multi_thread() - .enable_all() - .build() - .context("tokio runtime failed")?; - rt.block_on(async { - let bind = SocketAddr::new(interface, port); - let bind_control = SocketAddr::new(control_interface, control_port); - let (port, task) = - dora_coordinator::start(bind, bind_control, futures::stream::empty::()) - .await?; - if !quiet { - println!("Listening for incoming daemon connection on {port}"); - } - task.await - }) - .context("failed to run dora-coordinator")? - } - Command::Daemon { - coordinator_addr, - inter_daemon_addr, - local_listen_port, - machine_id, - run_dataflow, - quiet: _, - } => { - let rt = Builder::new_multi_thread() - .enable_all() - .build() - .context("tokio runtime failed")?; - rt.block_on(async { - match run_dataflow { - Some(dataflow_path) => { - tracing::info!("Starting dataflow `{}`", dataflow_path.display()); - if coordinator_addr != SocketAddr::new(LOCALHOST, DORA_COORDINATOR_PORT_DEFAULT){ - tracing::info!( - "Not using coordinator addr {} as `run_dataflow` is for local dataflow only. Please use the `start` command for remote coordinator", - coordinator_addr - ); - } - - let result = Daemon::run_dataflow(&dataflow_path).await?; - handle_dataflow_result(result, None) - } - None => { - if coordinator_addr.ip() == LOCALHOST { - tracing::info!("Starting in local mode"); - } - Daemon::run(coordinator_addr, machine_id.unwrap_or_default(), inter_daemon_addr, local_listen_port).await - } - } - }) - .context("failed to run dora-daemon")? - } - Command::Runtime => dora_runtime::main().context("Failed to run dora-runtime")?, - }; - - Ok(()) -} - -fn start_dataflow( - dataflow: Descriptor, - name: Option, - local_working_dir: PathBuf, - session: &mut TcpRequestReplyConnection, -) -> Result { - let reply_raw = session - .request( - &serde_json::to_vec(&ControlRequest::Start { - dataflow, - name, - local_working_dir, - }) - .unwrap(), - ) - .wrap_err("failed to send start dataflow message")?; - - let result: ControlRequestReply = - serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; - match result { - ControlRequestReply::DataflowStarted { uuid } => { - eprintln!("{uuid}"); - Ok(uuid) - } - ControlRequestReply::Error(err) => bail!("{err}"), - other => bail!("unexpected start dataflow reply: {other:?}"), - } -} - -fn stop_dataflow_interactive( - grace_duration: Option, - session: &mut TcpRequestReplyConnection, -) -> eyre::Result<()> { - let list = query_running_dataflows(session).wrap_err("failed to query running dataflows")?; - let active = list.get_active(); - if active.is_empty() { - eprintln!("No dataflows are running"); - } else { - let selection = inquire::Select::new("Choose dataflow to stop:", active).prompt()?; - stop_dataflow(selection.uuid, grace_duration, session)?; - } - - Ok(()) -} - -fn stop_dataflow( - uuid: Uuid, - grace_duration: Option, - session: &mut TcpRequestReplyConnection, -) -> Result<(), eyre::ErrReport> { - let reply_raw = session - .request( - &serde_json::to_vec(&ControlRequest::Stop { - dataflow_uuid: uuid, - grace_duration, - }) - .unwrap(), - ) - .wrap_err("failed to send dataflow stop message")?; - let result: ControlRequestReply = - serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; - match result { - ControlRequestReply::DataflowStopped { uuid, result } => { - handle_dataflow_result(result, Some(uuid)) - } - ControlRequestReply::Error(err) => bail!("{err}"), - other => bail!("unexpected stop dataflow reply: {other:?}"), - } -} - -fn handle_dataflow_result( - result: dora_core::topics::DataflowResult, - uuid: Option, -) -> Result<(), eyre::Error> { - if result.is_ok() { - Ok(()) - } else { - Err(match uuid { - Some(uuid) => { - eyre::eyre!("Dataflow {uuid} failed:\n{}", FormatDataflowError(&result)) - } - None => { - eyre::eyre!("Dataflow failed:\n{}", FormatDataflowError(&result)) - } - }) - } -} - -fn stop_dataflow_by_name( - name: String, - grace_duration: Option, - session: &mut TcpRequestReplyConnection, -) -> Result<(), eyre::ErrReport> { - let reply_raw = session - .request( - &serde_json::to_vec(&ControlRequest::StopByName { - name, - grace_duration, - }) - .unwrap(), - ) - .wrap_err("failed to send dataflow stop_by_name message")?; - let result: ControlRequestReply = - serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; - match result { - ControlRequestReply::DataflowStopped { uuid, result } => { - handle_dataflow_result(result, Some(uuid)) - } - ControlRequestReply::Error(err) => bail!("{err}"), - other => bail!("unexpected stop dataflow reply: {other:?}"), - } -} - -fn list(session: &mut TcpRequestReplyConnection) -> Result<(), eyre::ErrReport> { - let list = query_running_dataflows(session)?; - - let mut tw = TabWriter::new(vec![]); - tw.write_all(b"UUID\tName\tStatus\n")?; - for entry in list.0 { - let uuid = entry.id.uuid; - let name = entry.id.name.unwrap_or_default(); - let status = match entry.status { - dora_core::topics::DataflowStatus::Running => "Running", - dora_core::topics::DataflowStatus::Finished => "Succeeded", - dora_core::topics::DataflowStatus::Failed => "Failed", - }; - tw.write_all(format!("{uuid}\t{name}\t{status}\n").as_bytes())?; - } - tw.flush()?; - let formatted = String::from_utf8(tw.into_inner()?)?; - - println!("{formatted}"); - - Ok(()) -} - -fn query_running_dataflows(session: &mut TcpRequestReplyConnection) -> eyre::Result { - let reply_raw = session - .request(&serde_json::to_vec(&ControlRequest::List).unwrap()) - .wrap_err("failed to send list message")?; - let reply: ControlRequestReply = - serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; - let ids = match reply { - ControlRequestReply::DataflowList(list) => list, - ControlRequestReply::Error(err) => bail!("{err}"), - other => bail!("unexpected list dataflow reply: {other:?}"), - }; - - Ok(ids) -} - -fn connect_to_coordinator( - coordinator_addr: SocketAddr, -) -> std::io::Result> { - TcpLayer::new().connect(coordinator_addr) + run(args.command) }