Browse Source

Refactor dora library: Add `DoraConnection` struct

split-cli
Philipp Oppermann 1 year ago
parent
commit
d610140e68
Failed to extract signature
16 changed files with 846 additions and 776 deletions
  1. +12
    -12
      binaries/cli/src/attach.rs
  2. +6
    -21
      binaries/cli/src/check.rs
  3. +0
    -50
      binaries/cli/src/formatting.rs
  4. +1
    -47
      binaries/cli/src/graph/mod.rs
  5. +145
    -579
      binaries/cli/src/lib.rs
  6. +6
    -4
      binaries/cli/src/logs.rs
  7. +604
    -9
      binaries/cli/src/main.rs
  8. +6
    -4
      binaries/cli/src/template/c/mod.rs
  9. +6
    -4
      binaries/cli/src/template/cxx/mod.rs
  10. +36
    -5
      binaries/cli/src/template/mod.rs
  11. +6
    -4
      binaries/cli/src/template/python/mod.rs
  12. +6
    -4
      binaries/cli/src/template/rust/mod.rs
  13. +9
    -29
      binaries/cli/src/up.rs
  14. +0
    -3
      binaries/coordinator/src/lib.rs
  15. +1
    -0
      binaries/daemon/src/spawn.rs
  16. +2
    -1
      libraries/core/src/topics.rs

+ 12
- 12
binaries/cli/src/attach.rs View File

@@ -1,9 +1,9 @@
use colored::Colorize;
use communication_layer_request_reply::{TcpConnection, TcpRequestReplyConnection};
use communication_layer_request_reply::TcpConnection;
use dora_core::{
coordinator_messages::LogMessage,
descriptor::{resolve_path, CoreNodeKind, Descriptor},
topics::{ControlRequest, ControlRequestReply},
topics::{ControlRequest, ControlRequestReply, DataflowResult},
};
use eyre::Context;
use notify::event::ModifyKind;
@@ -16,17 +16,17 @@ use std::{path::PathBuf, sync::mpsc, time::Duration};
use tracing::{error, info};
use uuid::Uuid;

use crate::handle_dataflow_result;
pub fn attach_dataflow(
#[allow(clippy::too_many_arguments)]
pub fn attach_to_dataflow(
connection: &mut crate::DoraConnection,
dataflow: Descriptor,
dataflow_path: PathBuf,
dataflow_id: Uuid,
session: &mut TcpRequestReplyConnection,
hot_reload: bool,
coordinator_socket: SocketAddr,
log_level: log::LevelFilter,
) -> Result<(), eyre::ErrReport> {
log_output: &mut impl std::io::Write,
) -> eyre::Result<DataflowResult> {
let (tx, rx) = mpsc::sync_channel(2);

// Generate path hashmap
@@ -181,7 +181,7 @@ pub fn attach_dataflow(
None => "".normal(),
};

println!("{level}{node}{target}: {message}");
writeln!(log_output, "{level}{node}{target}: {message}")?;
continue;
}
Ok(AttachEvent::Log(Err(err))) => {
@@ -190,16 +190,16 @@ pub fn attach_dataflow(
}
};

let reply_raw = session
let reply_raw = connection
.session
.request(&serde_json::to_vec(&control_request)?)
.wrap_err("failed to send request message to coordinator")?;
let result: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match result {
ControlRequestReply::DataflowStarted { uuid: _ } => (),
ControlRequestReply::DataflowStopped { uuid, result } => {
info!("dataflow {uuid} stopped");
break handle_dataflow_result(result, Some(uuid));
ControlRequestReply::DataflowStopped { result } => {
break Ok(result);
}
ControlRequestReply::DataflowReloaded { uuid } => {
info!("dataflow {uuid} reloaded")


+ 6
- 21
binaries/cli/src/check.rs View File

@@ -1,13 +1,12 @@
use crate::connect_to_coordinator;
use communication_layer_request_reply::TcpRequestReplyConnection;
use dora_core::topics::{ControlRequest, ControlRequestReply};
use eyre::{bail, Context};
use eyre::bail;
use std::{
io::{IsTerminal, Write},
net::SocketAddr,
};
use termcolor::{Color, ColorChoice, ColorSpec, WriteColor};

use crate::DoraConnection;

pub fn check_environment(coordinator_addr: SocketAddr) -> eyre::Result<()> {
let mut error_occurred = false;

@@ -20,7 +19,7 @@ pub fn check_environment(coordinator_addr: SocketAddr) -> eyre::Result<()> {

// check whether coordinator is running
write!(stdout, "Dora Coordinator: ")?;
let mut session = match connect_to_coordinator(coordinator_addr) {
let mut session = match DoraConnection::connect(coordinator_addr) {
Ok(session) => {
let _ = stdout.set_color(ColorSpec::new().set_fg(Some(Color::Green)));
writeln!(stdout, "ok")?;
@@ -39,8 +38,8 @@ pub fn check_environment(coordinator_addr: SocketAddr) -> eyre::Result<()> {
// check whether daemon is running
write!(stdout, "Dora Daemon: ")?;
if session
.as_deref_mut()
.map(daemon_running)
.as_mut()
.map(|c| c.daemon_running())
.transpose()?
.unwrap_or(false)
{
@@ -61,17 +60,3 @@ pub fn check_environment(coordinator_addr: SocketAddr) -> eyre::Result<()> {

Ok(())
}

pub fn daemon_running(session: &mut TcpRequestReplyConnection) -> Result<bool, eyre::ErrReport> {
let reply_raw = session
.request(&serde_json::to_vec(&ControlRequest::DaemonConnected).unwrap())
.wrap_err("failed to send DaemonConnected message")?;

let reply = serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
let running = match reply {
ControlRequestReply::DaemonConnected(running) => running,
other => bail!("unexpected reply to daemon connection check: {other:?}"),
};

Ok(running)
}

+ 0
- 50
binaries/cli/src/formatting.rs View File

@@ -1,50 +0,0 @@
use dora_core::topics::{DataflowResult, NodeErrorCause};

pub struct FormatDataflowError<'a>(pub &'a DataflowResult);

impl std::fmt::Display for FormatDataflowError<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
writeln!(f)?;
let failed = self
.0
.node_results
.iter()
.filter_map(|(id, r)| r.as_ref().err().map(|e| (id, e)));
let total_failed = failed.clone().count();

let mut non_cascading: Vec<_> = failed
.clone()
.filter(|(_, e)| !matches!(e.cause, NodeErrorCause::Cascading { .. }))
.collect();
non_cascading.sort_by_key(|(_, e)| e.timestamp);
// try to print earliest non-cascading error
let hidden = if !non_cascading.is_empty() {
let printed = non_cascading.len();
for (id, err) in non_cascading {
writeln!(f, "Node `{id}` failed: {err}")?;
}
total_failed - printed
} else {
// no non-cascading errors -> print earliest cascading
let mut all: Vec<_> = failed.collect();
all.sort_by_key(|(_, e)| e.timestamp);
if let Some((id, err)) = all.first() {
write!(f, "Node `{id}` failed: {err}")?;
total_failed - 1
} else {
write!(f, "unknown error")?;
0
}
};

if hidden > 1 {
write!(
f,
"\n\nThere are {hidden} consequential errors. Check the `out/{}` folder for full details.",
self.0.uuid
)?;
}

Ok(())
}
}

+ 1
- 47
binaries/cli/src/graph/mod.rs View File

@@ -1,56 +1,10 @@
use std::{fs::File, io::Write, path::Path};
use std::path::Path;

use dora_core::descriptor::Descriptor;
use eyre::Context;

const MERMAID_TEMPLATE: &str = include_str!("mermaid-template.html");

pub(crate) fn create(dataflow: std::path::PathBuf, mermaid: bool, open: bool) -> eyre::Result<()> {
if mermaid {
let visualized = visualize_as_mermaid(&dataflow)?;
println!("{visualized}");
println!(
"Paste the above output on https://mermaid.live/ or in a \
```mermaid code block on GitHub to display it."
);
} else {
let html = visualize_as_html(&dataflow)?;

let working_dir = std::env::current_dir().wrap_err("failed to get current working dir")?;
let graph_filename = match dataflow.file_stem().and_then(|n| n.to_str()) {
Some(name) => format!("{name}-graph"),
None => "graph".into(),
};
let mut extra = 0;
let path = loop {
let adjusted_file_name = if extra == 0 {
format!("{graph_filename}.html")
} else {
format!("{graph_filename}.{extra}.html")
};
let path = working_dir.join(&adjusted_file_name);
if path.exists() {
extra += 1;
} else {
break path;
}
};

let mut file = File::create(&path).context("failed to create graph HTML file")?;
file.write_all(html.as_bytes())?;

println!(
"View graph by opening the following in your browser:\n file://{}",
path.display()
);

if open {
webbrowser::open(path.as_os_str().to_str().unwrap())?;
}
}
Ok(())
}

pub fn visualize_as_html(dataflow: &Path) -> eyre::Result<String> {
let mermaid = visualize_as_mermaid(dataflow)?;
Ok(MERMAID_TEMPLATE.replacen("____insert____", &mermaid, 1))


+ 145
- 579
binaries/cli/src/lib.rs View File

@@ -1,622 +1,188 @@
use attach::attach_dataflow;
pub use crate::{
build::build as build_dataflow,
check::check_environment,
graph::{visualize_as_html, visualize_as_mermaid},
up::up,
};
use attach::attach_to_dataflow;
use communication_layer_request_reply::{RequestReplyLayer, TcpLayer, TcpRequestReplyConnection};
use dora_coordinator::Event;
use dora_core::{
descriptor::Descriptor,
topics::{
ControlRequest, ControlRequestReply, DataflowList, DORA_COORDINATOR_PORT_CONTROL_DEFAULT,
DORA_COORDINATOR_PORT_DEFAULT, DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT,
},
topics::{ControlRequest, ControlRequestReply, DataflowList, DataflowResult},
};
use dora_daemon::Daemon;
use duration_str::parse;
use eyre::{bail, Context};
use formatting::FormatDataflowError;
use std::{io::Write, net::SocketAddr, path::Path};
use std::{
net::{IpAddr, Ipv4Addr},
net::{IpAddr, Ipv4Addr, SocketAddr},
path::PathBuf,
time::Duration,
};
use tabwriter::TabWriter;
use tokio::runtime::Builder;
use uuid::Uuid;

mod attach;
mod build;
mod check;
mod formatting;
mod graph;
mod logs;
mod template;
pub mod template;
mod up;

const LOCALHOST: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
const LISTEN_WILDCARD: IpAddr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0));

#[derive(Debug, clap::Parser)]
#[clap(version)]
struct Args {
#[clap(subcommand)]
command: Command,
}
pub const LOCALHOST: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
pub const LISTEN_WILDCARD: IpAddr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0));

/// dora-rs cli client
#[derive(Debug, clap::Subcommand)]
pub enum Command {
/// Check if the coordinator and the daemon is running.
Check {
/// Path to the dataflow descriptor file (enables additional checks)
#[clap(long, value_name = "PATH", value_hint = clap::ValueHint::FilePath)]
dataflow: Option<PathBuf>,
/// Address of the dora coordinator
#[clap(long, value_name = "IP", default_value_t = LOCALHOST)]
coordinator_addr: IpAddr,
/// Port number of the coordinator control server
#[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)]
coordinator_port: u16,
},
/// Generate a visualization of the given graph using mermaid.js. Use --open to open browser.
Graph {
/// Path to the dataflow descriptor file
#[clap(value_name = "PATH", value_hint = clap::ValueHint::FilePath)]
dataflow: PathBuf,
/// Visualize the dataflow as a Mermaid diagram (instead of HTML)
#[clap(long, action)]
mermaid: bool,
/// Open the HTML visualization in the browser
#[clap(long, action)]
open: bool,
},
/// Run build commands provided in the given dataflow.
Build {
/// Path to the dataflow descriptor file
#[clap(value_name = "PATH", value_hint = clap::ValueHint::FilePath)]
dataflow: PathBuf,
},
/// Generate a new project or node. Choose the language between Rust, Python, C or C++.
New {
#[clap(flatten)]
args: CommandNew,
#[clap(hide = true, long)]
internal_create_with_path_dependencies: bool,
},
/// Spawn coordinator and daemon in local mode (with default config)
Up {
/// Use a custom configuration
#[clap(long, hide = true, value_name = "PATH", value_hint = clap::ValueHint::FilePath)]
config: Option<PathBuf>,
},
/// Destroy running coordinator and daemon. If some dataflows are still running, they will be stopped first.
Destroy {
/// Use a custom configuration
#[clap(long, hide = true)]
config: Option<PathBuf>,
/// Address of the dora coordinator
#[clap(long, value_name = "IP", default_value_t = LOCALHOST)]
coordinator_addr: IpAddr,
/// Port number of the coordinator control server
#[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)]
coordinator_port: u16,
},
/// Start the given dataflow path. Attach a name to the running dataflow by using --name.
Start {
/// Path to the dataflow descriptor file
#[clap(value_name = "PATH", value_hint = clap::ValueHint::FilePath)]
dataflow: PathBuf,
/// Assign a name to the dataflow
#[clap(long)]
name: Option<String>,
/// Address of the dora coordinator
#[clap(long, value_name = "IP", default_value_t = LOCALHOST)]
coordinator_addr: IpAddr,
/// Port number of the coordinator control server
#[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)]
coordinator_port: u16,
/// Attach to the dataflow and wait for its completion
#[clap(long, action)]
attach: bool,
/// Run the dataflow in background
#[clap(long, action)]
detach: bool,
/// Enable hot reloading (Python only)
#[clap(long, action)]
hot_reload: bool,
},
/// Stop the given dataflow UUID. If no id is provided, you will be able to choose between the running dataflows.
Stop {
/// UUID of the dataflow that should be stopped
uuid: Option<Uuid>,
/// Name of the dataflow that should be stopped
#[clap(long)]
name: Option<String>,
/// Kill the dataflow if it doesn't stop after the given duration
#[clap(long, value_name = "DURATION")]
#[arg(value_parser = parse)]
grace_duration: Option<Duration>,
/// Address of the dora coordinator
#[clap(long, value_name = "IP", default_value_t = LOCALHOST)]
coordinator_addr: IpAddr,
/// Port number of the coordinator control server
#[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)]
coordinator_port: u16,
},
/// List running dataflows.
List {
/// Address of the dora coordinator
#[clap(long, value_name = "IP", default_value_t = LOCALHOST)]
coordinator_addr: IpAddr,
/// Port number of the coordinator control server
#[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)]
coordinator_port: u16,
},
// Planned for future releases:
// Dashboard,
/// Show logs of a given dataflow and node.
#[command(allow_missing_positional = true)]
Logs {
/// Identifier of the dataflow
#[clap(value_name = "UUID_OR_NAME")]
dataflow: Option<String>,
/// Show logs for the given node
#[clap(value_name = "NAME")]
node: String,
/// Address of the dora coordinator
#[clap(long, value_name = "IP", default_value_t = LOCALHOST)]
coordinator_addr: IpAddr,
/// Port number of the coordinator control server
#[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)]
coordinator_port: u16,
},
// Metrics,
// Stats,
// Get,
// Upgrade,
/// Run daemon
Daemon {
/// Unique identifier for the machine (required for distributed dataflows)
#[clap(long)]
machine_id: Option<String>,
/// The inter daemon IP address and port this daemon will bind to.
#[clap(long, default_value_t = SocketAddr::new(LISTEN_WILDCARD, 0))]
inter_daemon_addr: SocketAddr,
/// Local listen port for event such as dynamic node.
#[clap(long, default_value_t = DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT)]
local_listen_port: u16,
/// Address and port number of the dora coordinator
#[clap(long, default_value_t = SocketAddr::new(LOCALHOST, DORA_COORDINATOR_PORT_DEFAULT))]
coordinator_addr: SocketAddr,
#[clap(long, hide = true)]
run_dataflow: Option<PathBuf>,
/// Suppresses all log output to stdout.
#[clap(long)]
quiet: bool,
},
/// Run runtime
Runtime,
/// Run coordinator
Coordinator {
/// Network interface to bind to for daemon communication
#[clap(long, default_value_t = LISTEN_WILDCARD)]
interface: IpAddr,
/// Port number to bind to for daemon communication
#[clap(long, default_value_t = DORA_COORDINATOR_PORT_DEFAULT)]
port: u16,
/// Network interface to bind to for control communication
#[clap(long, default_value_t = LISTEN_WILDCARD)]
control_interface: IpAddr,
/// Port number to bind to for control communication
#[clap(long, default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)]
control_port: u16,
/// Suppresses all log output to stdout.
#[clap(long)]
quiet: bool,
},
pub struct DoraConnection {
session: Box<TcpRequestReplyConnection>,
}

#[derive(Debug, clap::Args)]
pub struct CommandNew {
/// The entity that should be created
#[clap(long, value_enum, default_value_t = Kind::Dataflow)]
kind: Kind,
/// The programming language that should be used
#[clap(long, value_enum, default_value_t = Lang::Rust)]
lang: Lang,
/// Desired name of the entity
name: String,
/// Where to create the entity
#[clap(hide = true)]
path: Option<PathBuf>,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)]
enum Kind {
Dataflow,
CustomNode,
}
impl DoraConnection {
pub fn connect(coordinator_addr: SocketAddr) -> std::io::Result<Self> {
Ok(Self {
session: TcpLayer::new().connect(coordinator_addr)?,
})
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)]
enum Lang {
Rust,
Python,
C,
Cxx,
}
pub fn daemon_running(&mut self) -> eyre::Result<bool> {
let reply_raw = self
.session
.request(&serde_json::to_vec(&ControlRequest::DaemonConnected).unwrap())
.wrap_err("failed to send DaemonConnected message")?;

pub fn run(command: Command, dora_cli_path: PathBuf) -> eyre::Result<()> {
let log_level = env_logger::Builder::new()
.filter_level(log::LevelFilter::Info)
.parse_default_env()
.build()
.filter();
let reply = serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
let running = match reply {
ControlRequestReply::DaemonConnected(running) => running,
other => bail!("unexpected reply to daemon connection check: {other:?}"),
};

match command {
Command::Check {
dataflow,
coordinator_addr,
coordinator_port,
} => match dataflow {
Some(dataflow) => {
let working_dir = dataflow
.canonicalize()
.context("failed to canonicalize dataflow path")?
.parent()
.ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))?
.to_owned();
Descriptor::blocking_read(&dataflow)?.check(&working_dir)?;
check::check_environment((coordinator_addr, coordinator_port).into())?
}
None => check::check_environment((coordinator_addr, coordinator_port).into())?,
},
Command::Graph {
dataflow,
mermaid,
open,
} => {
graph::create(dataflow, mermaid, open)?;
}
Command::Build { dataflow } => {
build::build(&dataflow)?;
}
Command::New {
args,
internal_create_with_path_dependencies,
} => template::create(args, internal_create_with_path_dependencies)?,
Command::Up { config } => {
up::up(config.as_deref(), &dora_cli_path)?;
}
Command::Logs {
dataflow,
node,
coordinator_addr,
coordinator_port,
} => {
let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into())
.wrap_err("failed to connect to dora coordinator")?;
let list = query_running_dataflows(&mut *session)
.wrap_err("failed to query running dataflows")?;
if let Some(dataflow) = dataflow {
let uuid = Uuid::parse_str(&dataflow).ok();
let name = if uuid.is_some() { None } else { Some(dataflow) };
logs::logs(&mut *session, uuid, name, node)?
} else {
let active = list.get_active();
let uuid = match &active[..] {
[] => bail!("No dataflows are running"),
[uuid] => uuid.clone(),
_ => inquire::Select::new("Choose dataflow to show logs:", active).prompt()?,
};
logs::logs(&mut *session, Some(uuid.uuid), None, node)?
}
}
Command::Start {
dataflow,
name,
coordinator_addr,
coordinator_port,
attach,
detach,
hot_reload,
} => {
let dataflow_descriptor =
Descriptor::blocking_read(&dataflow).wrap_err("Failed to read yaml dataflow")?;
let working_dir = dataflow
.canonicalize()
.context("failed to canonicalize dataflow path")?
.parent()
.ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))?
.to_owned();
if !coordinator_addr.is_loopback() {
dataflow_descriptor.check_in_daemon(&working_dir, &[], true)?;
} else {
dataflow_descriptor
.check(&working_dir)
.wrap_err("Could not validate yaml")?;
}
Ok(running)
}

let coordinator_socket = (coordinator_addr, coordinator_port).into();
let mut session = connect_to_coordinator(coordinator_socket)
.wrap_err("failed to connect to dora coordinator")?;
let dataflow_id = start_dataflow(
dataflow_descriptor.clone(),
name,
working_dir,
&mut *session,
)?;
pub fn query_running_dataflows(&mut self) -> eyre::Result<DataflowList> {
let reply_raw = self
.session
.request(&serde_json::to_vec(&ControlRequest::List).unwrap())
.wrap_err("failed to send list message")?;
let reply: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
let ids = match reply {
ControlRequestReply::DataflowList(list) => list,
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected list dataflow reply: {other:?}"),
};

let attach = match (attach, detach) {
(true, true) => eyre::bail!("both `--attach` and `--detach` are given"),
(true, false) => true,
(false, true) => false,
(false, false) => {
println!("attaching to dataflow (use `--detach` to run in background)");
true
}
};
Ok(ids)
}

if attach {
attach_dataflow(
dataflow_descriptor,
pub fn start_dataflow(
&mut self,
dataflow: Descriptor,
name: Option<String>,
local_working_dir: PathBuf,
) -> Result<Uuid, eyre::ErrReport> {
let reply_raw = self
.session
.request(
&serde_json::to_vec(&ControlRequest::Start {
dataflow,
dataflow_id,
&mut *session,
hot_reload,
coordinator_socket,
log_level,
)?
}
}
Command::List {
coordinator_addr,
coordinator_port,
} => match connect_to_coordinator((coordinator_addr, coordinator_port).into()) {
Ok(mut session) => list(&mut *session)?,
Err(_) => {
bail!("No dora coordinator seems to be running.");
}
},
Command::Stop {
uuid,
name,
grace_duration,
coordinator_addr,
coordinator_port,
} => {
let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into())
.wrap_err("could not connect to dora coordinator")?;
match (uuid, name) {
(Some(uuid), _) => stop_dataflow(uuid, grace_duration, &mut *session)?,
(None, Some(name)) => stop_dataflow_by_name(name, grace_duration, &mut *session)?,
(None, None) => stop_dataflow_interactive(grace_duration, &mut *session)?,
}
}
Command::Destroy {
config,
coordinator_addr,
coordinator_port,
} => up::destroy(
config.as_deref(),
(coordinator_addr, coordinator_port).into(),
)?,
Command::Coordinator {
interface,
port,
control_interface,
control_port,
quiet,
} => {
let rt = Builder::new_multi_thread()
.enable_all()
.build()
.context("tokio runtime failed")?;
rt.block_on(async {
let bind = SocketAddr::new(interface, port);
let bind_control = SocketAddr::new(control_interface, control_port);
let (port, task) =
dora_coordinator::start(bind, bind_control, futures::stream::empty::<Event>())
.await?;
if !quiet {
println!("Listening for incoming daemon connection on {port}");
}
task.await
})
.context("failed to run dora-coordinator")?
}
Command::Daemon {
coordinator_addr,
inter_daemon_addr,
local_listen_port,
machine_id,
run_dataflow,
quiet: _,
} => {
let rt = Builder::new_multi_thread()
.enable_all()
.build()
.context("tokio runtime failed")?;
rt.block_on(async {
match run_dataflow {
Some(dataflow_path) => {
tracing::info!("Starting dataflow `{}`", dataflow_path.display());
if coordinator_addr != SocketAddr::new(LOCALHOST, DORA_COORDINATOR_PORT_DEFAULT){
tracing::info!(
"Not using coordinator addr {} as `run_dataflow` is for local dataflow only. Please use the `start` command for remote coordinator",
coordinator_addr
);
}

let result = Daemon::run_dataflow(&dataflow_path, dora_cli_path.to_owned()).await?;
handle_dataflow_result(result, None)
}
None => {
if coordinator_addr.ip() == LOCALHOST {
tracing::info!("Starting in local mode");
}
Daemon::run(coordinator_addr, machine_id.unwrap_or_default(), inter_daemon_addr, local_listen_port, dora_cli_path.to_owned()).await
}
}
})
.context("failed to run dora-daemon")?
}
Command::Runtime => dora_runtime::main().context("Failed to run dora-runtime")?,
};

Ok(())
}

fn start_dataflow(
dataflow: Descriptor,
name: Option<String>,
local_working_dir: PathBuf,
session: &mut TcpRequestReplyConnection,
) -> Result<Uuid, eyre::ErrReport> {
let reply_raw = session
.request(
&serde_json::to_vec(&ControlRequest::Start {
dataflow,
name,
local_working_dir,
})
.unwrap(),
)
.wrap_err("failed to send start dataflow message")?;

let result: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match result {
ControlRequestReply::DataflowStarted { uuid } => {
eprintln!("{uuid}");
Ok(uuid)
name,
local_working_dir,
})
.unwrap(),
)
.wrap_err("failed to send start dataflow message")?;

let result: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match result {
ControlRequestReply::DataflowStarted { uuid } => Ok(uuid),
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected start dataflow reply: {other:?}"),
}
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected start dataflow reply: {other:?}"),
}
}

fn stop_dataflow_interactive(
grace_duration: Option<Duration>,
session: &mut TcpRequestReplyConnection,
) -> eyre::Result<()> {
let list = query_running_dataflows(session).wrap_err("failed to query running dataflows")?;
let active = list.get_active();
if active.is_empty() {
eprintln!("No dataflows are running");
} else {
let selection = inquire::Select::new("Choose dataflow to stop:", active).prompt()?;
stop_dataflow(selection.uuid, grace_duration, session)?;
pub fn dataflow_logs(
&mut self,
uuid: Option<Uuid>,
name: Option<String>,
node: String,
) -> eyre::Result<()> {
logs::dataflow_logs(self, uuid, name, node)
}

Ok(())
}

fn stop_dataflow(
uuid: Uuid,
grace_duration: Option<Duration>,
session: &mut TcpRequestReplyConnection,
) -> Result<(), eyre::ErrReport> {
let reply_raw = session
.request(
&serde_json::to_vec(&ControlRequest::Stop {
dataflow_uuid: uuid,
grace_duration,
})
.unwrap(),
#[allow(clippy::too_many_arguments)]
pub fn attach_to_dataflow(
&mut self,
dataflow: Descriptor,
dataflow_path: PathBuf,
dataflow_id: Uuid,
hot_reload: bool,
coordinator_socket: SocketAddr,
log_level: log::LevelFilter,
log_output: &mut impl std::io::Write,
) -> eyre::Result<DataflowResult> {
attach_to_dataflow(
self,
dataflow,
dataflow_path,
dataflow_id,
hot_reload,
coordinator_socket,
log_level,
log_output,
)
.wrap_err("failed to send dataflow stop message")?;
let result: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match result {
ControlRequestReply::DataflowStopped { uuid, result } => {
handle_dataflow_result(result, Some(uuid))
}
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected stop dataflow reply: {other:?}"),
}
}

fn handle_dataflow_result(
result: dora_core::topics::DataflowResult,
uuid: Option<Uuid>,
) -> Result<(), eyre::Error> {
if result.is_ok() {
Ok(())
} else {
Err(match uuid {
Some(uuid) => {
eyre::eyre!("Dataflow {uuid} failed:\n{}", FormatDataflowError(&result))
}
None => {
eyre::eyre!("Dataflow failed:\n{}", FormatDataflowError(&result))
}
})
pub fn stop_dataflow(
&mut self,
uuid: Uuid,
grace_duration: Option<Duration>,
) -> eyre::Result<DataflowResult> {
let reply_raw = self
.session
.request(
&serde_json::to_vec(&ControlRequest::Stop {
dataflow_uuid: uuid,
grace_duration,
})
.unwrap(),
)
.wrap_err("failed to send dataflow stop message")?;
let result: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match result {
ControlRequestReply::DataflowStopped { result } => Ok(result),
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected stop dataflow reply: {other:?}"),
}
}
}

fn stop_dataflow_by_name(
name: String,
grace_duration: Option<Duration>,
session: &mut TcpRequestReplyConnection,
) -> Result<(), eyre::ErrReport> {
let reply_raw = session
.request(
&serde_json::to_vec(&ControlRequest::StopByName {
name,
grace_duration,
})
.unwrap(),
)
.wrap_err("failed to send dataflow stop_by_name message")?;
let result: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match result {
ControlRequestReply::DataflowStopped { uuid, result } => {
handle_dataflow_result(result, Some(uuid))
pub fn stop_dataflow_by_name(
&mut self,
name: String,
grace_duration: Option<Duration>,
) -> eyre::Result<DataflowResult> {
let reply_raw = self
.session
.request(
&serde_json::to_vec(&ControlRequest::StopByName {
name,
grace_duration,
})
.unwrap(),
)
.wrap_err("failed to send dataflow stop_by_name message")?;
let result: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match result {
ControlRequestReply::DataflowStopped { result } => Ok(result),
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected stop dataflow reply: {other:?}"),
}
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected stop dataflow reply: {other:?}"),
}
}

fn list(session: &mut TcpRequestReplyConnection) -> Result<(), eyre::ErrReport> {
let list = query_running_dataflows(session)?;

let mut tw = TabWriter::new(vec![]);
tw.write_all(b"UUID\tName\tStatus\n")?;
for entry in list.0 {
let uuid = entry.id.uuid;
let name = entry.id.name.unwrap_or_default();
let status = match entry.status {
dora_core::topics::DataflowStatus::Running => "Running",
dora_core::topics::DataflowStatus::Finished => "Succeeded",
dora_core::topics::DataflowStatus::Failed => "Failed",
};
tw.write_all(format!("{uuid}\t{name}\t{status}\n").as_bytes())?;
pub fn destroy(mut self) -> eyre::Result<()> {
// send destroy command to dora-coordinator
self.session
.request(&serde_json::to_vec(&ControlRequest::Destroy).unwrap())
.wrap_err("failed to send destroy message")?;
Ok(())
}
tw.flush()?;
let formatted = String::from_utf8(tw.into_inner()?)?;

println!("{formatted}");

Ok(())
}

fn query_running_dataflows(session: &mut TcpRequestReplyConnection) -> eyre::Result<DataflowList> {
let reply_raw = session
.request(&serde_json::to_vec(&ControlRequest::List).unwrap())
.wrap_err("failed to send list message")?;
let reply: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
let ids = match reply {
ControlRequestReply::DataflowList(list) => list,
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected list dataflow reply: {other:?}"),
};

Ok(ids)
}

fn connect_to_coordinator(
coordinator_addr: SocketAddr,
) -> std::io::Result<Box<TcpRequestReplyConnection>> {
TcpLayer::new().connect(coordinator_addr)
}

+ 6
- 4
binaries/cli/src/logs.rs View File

@@ -1,18 +1,20 @@
use communication_layer_request_reply::TcpRequestReplyConnection;
use dora_core::topics::{ControlRequest, ControlRequestReply};
use eyre::{bail, Context, Result};
use uuid::Uuid;

use bat::{Input, PrettyPrinter};

pub fn logs(
session: &mut TcpRequestReplyConnection,
use crate::DoraConnection;

pub fn dataflow_logs(
connection: &mut DoraConnection,
uuid: Option<Uuid>,
name: Option<String>,
node: String,
) -> Result<()> {
let logs = {
let reply_raw = session
let reply_raw = connection
.session
.request(
&serde_json::to_vec(&ControlRequest::Logs {
uuid,


+ 604
- 9
binaries/cli/src/main.rs View File

@@ -1,10 +1,29 @@
use std::{
fs::File,
io::Write,
net::{IpAddr, SocketAddr},
path::PathBuf,
time::Duration,
};

use clap::Parser;
use colored::Colorize;
use dora_cli::{run, Command};
use dora_cli::{check_environment, template, DoraConnection, LISTEN_WILDCARD, LOCALHOST};
use dora_core::{
descriptor::Descriptor,
topics::{
DataflowResult, NodeErrorCause, DORA_COORDINATOR_PORT_CONTROL_DEFAULT,
DORA_COORDINATOR_PORT_DEFAULT, DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT,
},
};
use dora_daemon::Daemon;
#[cfg(feature = "tracing")]
use dora_tracing::set_up_tracing;
use dora_tracing::set_up_tracing_opts;
use eyre::Context;
use eyre::{bail, Context};
use tabwriter::TabWriter;
use tracing::info;
use uuid::Uuid;

fn main() {
if let Err(err) = main_inner() {
@@ -14,13 +33,6 @@ fn main() {
}
}

#[derive(Debug, clap::Parser)]
#[clap(version)]
pub struct Args {
#[clap(subcommand)]
command: Command,
}

fn main_inner() -> eyre::Result<()> {
let args = Args::parse();

@@ -54,3 +66,586 @@ fn main_inner() -> eyre::Result<()> {
std::env::current_exe().wrap_err("failed to get current executable path")?;
run(args.command, dora_cli_path)
}

#[derive(Debug, clap::Parser)]
#[clap(version)]
pub struct Args {
#[clap(subcommand)]
command: Command,
}

/// dora-rs cli client
#[derive(Debug, clap::Subcommand)]
pub enum Command {
/// Check if the coordinator and the daemon is running.
Check {
/// Path to the dataflow descriptor file (enables additional checks)
#[clap(long, value_name = "PATH", value_hint = clap::ValueHint::FilePath)]
dataflow: Option<PathBuf>,
/// Address of the dora coordinator
#[clap(long, value_name = "IP", default_value_t = LOCALHOST)]
coordinator_addr: IpAddr,
/// Port number of the coordinator control server
#[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)]
coordinator_port: u16,
},
/// Generate a visualization of the given graph using mermaid.js. Use --open to open browser.
Graph {
/// Path to the dataflow descriptor file
#[clap(value_name = "PATH", value_hint = clap::ValueHint::FilePath)]
dataflow: PathBuf,
/// Visualize the dataflow as a Mermaid diagram (instead of HTML)
#[clap(long, action)]
mermaid: bool,
/// Open the HTML visualization in the browser
#[clap(long, action)]
open: bool,
},
/// Run build commands provided in the given dataflow.
Build {
/// Path to the dataflow descriptor file
#[clap(value_name = "PATH", value_hint = clap::ValueHint::FilePath)]
dataflow: PathBuf,
},
/// Generate a new project or node. Choose the language between Rust, Python, C or C++.
New {
#[clap(flatten)]
args: template::CreateArgs,
#[clap(hide = true, long)]
internal_create_with_path_dependencies: bool,
},
/// Spawn coordinator and daemon in local mode (with default config)
Up {
/// Use a custom configuration
#[clap(long, hide = true, value_name = "PATH", value_hint = clap::ValueHint::FilePath)]
config: Option<PathBuf>,
},
/// Destroy running coordinator and daemon. If some dataflows are still running, they will be stopped first.
Destroy {
/// Use a custom configuration
#[clap(long, hide = true)]
config: Option<PathBuf>,
/// Address of the dora coordinator
#[clap(long, value_name = "IP", default_value_t = LOCALHOST)]
coordinator_addr: IpAddr,
/// Port number of the coordinator control server
#[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)]
coordinator_port: u16,
},
/// Start the given dataflow path. Attach a name to the running dataflow by using --name.
Start {
/// Path to the dataflow descriptor file
#[clap(value_name = "PATH", value_hint = clap::ValueHint::FilePath)]
dataflow: PathBuf,
/// Assign a name to the dataflow
#[clap(long)]
name: Option<String>,
/// Address of the dora coordinator
#[clap(long, value_name = "IP", default_value_t = LOCALHOST)]
coordinator_addr: IpAddr,
/// Port number of the coordinator control server
#[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)]
coordinator_port: u16,
/// Attach to the dataflow and wait for its completion
#[clap(long, action)]
attach: bool,
/// Run the dataflow in background
#[clap(long, action)]
detach: bool,
/// Enable hot reloading (Python only)
#[clap(long, action)]
hot_reload: bool,
},
/// Stop the given dataflow UUID. If no id is provided, you will be able to choose between the running dataflows.
Stop {
/// UUID of the dataflow that should be stopped
uuid: Option<Uuid>,
/// Name of the dataflow that should be stopped
#[clap(long)]
name: Option<String>,
/// Kill the dataflow if it doesn't stop after the given duration
#[clap(long, value_name = "DURATION")]
#[arg(value_parser = duration_str::parse)]
grace_duration: Option<Duration>,
/// Address of the dora coordinator
#[clap(long, value_name = "IP", default_value_t = LOCALHOST)]
coordinator_addr: IpAddr,
/// Port number of the coordinator control server
#[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)]
coordinator_port: u16,
},
/// List running dataflows.
List {
/// Address of the dora coordinator
#[clap(long, value_name = "IP", default_value_t = LOCALHOST)]
coordinator_addr: IpAddr,
/// Port number of the coordinator control server
#[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)]
coordinator_port: u16,
},
// Planned for future releases:
// Dashboard,
/// Show logs of a given dataflow and node.
#[command(allow_missing_positional = true)]
Logs {
/// Identifier of the dataflow
#[clap(value_name = "UUID_OR_NAME")]
dataflow: Option<String>,
/// Show logs for the given node
#[clap(value_name = "NAME")]
node: String,
/// Address of the dora coordinator
#[clap(long, value_name = "IP", default_value_t = LOCALHOST)]
coordinator_addr: IpAddr,
/// Port number of the coordinator control server
#[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)]
coordinator_port: u16,
},
// Metrics,
// Stats,
// Get,
// Upgrade,
/// Run daemon
Daemon {
/// Unique identifier for the machine (required for distributed dataflows)
#[clap(long)]
machine_id: Option<String>,
/// The inter daemon IP address and port this daemon will bind to.
#[clap(long, default_value_t = SocketAddr::new(LISTEN_WILDCARD, 0))]
inter_daemon_addr: SocketAddr,
/// Local listen port for event such as dynamic node.
#[clap(long, default_value_t = DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT)]
local_listen_port: u16,
/// Address and port number of the dora coordinator
#[clap(long, default_value_t = SocketAddr::new(LOCALHOST, DORA_COORDINATOR_PORT_DEFAULT))]
coordinator_addr: SocketAddr,
#[clap(long, hide = true)]
run_dataflow: Option<PathBuf>,
/// Suppresses all log output to stdout.
#[clap(long)]
quiet: bool,
},
/// Run runtime
Runtime,
/// Run coordinator
Coordinator {
/// Network interface to bind to for daemon communication
#[clap(long, default_value_t = LISTEN_WILDCARD)]
interface: IpAddr,
/// Port number to bind to for daemon communication
#[clap(long, default_value_t = DORA_COORDINATOR_PORT_DEFAULT)]
port: u16,
/// Network interface to bind to for control communication
#[clap(long, default_value_t = LISTEN_WILDCARD)]
control_interface: IpAddr,
/// Port number to bind to for control communication
#[clap(long, default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)]
control_port: u16,
/// Suppresses all log output to stdout.
#[clap(long)]
quiet: bool,
},
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)]
enum Kind {
Dataflow,
CustomNode,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)]
enum Lang {
Rust,
Python,
C,
Cxx,
}

pub fn run(command: Command, dora_cli_path: PathBuf) -> eyre::Result<()> {
let log_level = env_logger::Builder::new()
.filter_level(log::LevelFilter::Info)
.parse_default_env()
.build()
.filter();

match command {
Command::Check {
dataflow,
coordinator_addr,
coordinator_port,
} => match dataflow {
Some(dataflow) => {
let working_dir = dataflow
.canonicalize()
.context("failed to canonicalize dataflow path")?
.parent()
.ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))?
.to_owned();
Descriptor::blocking_read(&dataflow)?.check(&working_dir)?;
check_environment((coordinator_addr, coordinator_port).into())?
}
None => check_environment((coordinator_addr, coordinator_port).into())?,
},
Command::Graph {
dataflow,
mermaid,
open,
} => {
create_dataflow_graph(dataflow, mermaid, open)?;
}
Command::Build { dataflow } => {
dora_cli::build_dataflow(&dataflow)?;
}
Command::New {
args,
internal_create_with_path_dependencies,
} => template::create(args, internal_create_with_path_dependencies)?,
Command::Up { config } => {
dora_cli::up(config.as_deref(), &dora_cli_path)?;
}
Command::Logs {
dataflow,
node,
coordinator_addr,
coordinator_port,
} => {
let mut session = DoraConnection::connect((coordinator_addr, coordinator_port).into())
.wrap_err("failed to connect to dora coordinator")?;
let list = session
.query_running_dataflows()
.wrap_err("failed to query running dataflows")?;
if let Some(dataflow) = dataflow {
let uuid = Uuid::parse_str(&dataflow).ok();
let name = if uuid.is_some() { None } else { Some(dataflow) };
session.dataflow_logs(uuid, name, node)?
} else {
let active = list.get_active();
let uuid = match &active[..] {
[] => bail!("No dataflows are running"),
[uuid] => uuid.clone(),
_ => inquire::Select::new("Choose dataflow to show logs:", active).prompt()?,
};
session.dataflow_logs(Some(uuid.uuid), None, node)?
}
}
Command::Start {
dataflow,
name,
coordinator_addr,
coordinator_port,
attach,
detach,
hot_reload,
} => {
let dataflow_descriptor =
Descriptor::blocking_read(&dataflow).wrap_err("Failed to read yaml dataflow")?;
let working_dir = dataflow
.canonicalize()
.context("failed to canonicalize dataflow path")?
.parent()
.ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))?
.to_owned();
if !coordinator_addr.is_loopback() {
dataflow_descriptor.check_in_daemon(&working_dir, &[], true)?;
} else {
dataflow_descriptor
.check(&working_dir)
.wrap_err("Could not validate yaml")?;
}

let coordinator_socket = (coordinator_addr, coordinator_port).into();
let mut session = DoraConnection::connect(coordinator_socket)
.wrap_err("failed to connect to dora coordinator")?;
let dataflow_id =
session.start_dataflow(dataflow_descriptor.clone(), name, working_dir)?;
eprintln!("{dataflow_id}");

let attach = match (attach, detach) {
(true, true) => eyre::bail!("both `--attach` and `--detach` are given"),
(true, false) => true,
(false, true) => false,
(false, false) => {
println!("attaching to dataflow (use `--detach` to run in background)");
true
}
};

if attach {
let result = session.attach_to_dataflow(
dataflow_descriptor,
dataflow,
dataflow_id,
hot_reload,
coordinator_socket,
log_level,
&mut std::io::stdout(),
)?;
info!("dataflow {} stopped", result.uuid);
handle_dataflow_result(result)?;
}
}
Command::List {
coordinator_addr,
coordinator_port,
} => match DoraConnection::connect((coordinator_addr, coordinator_port).into()) {
Ok(mut session) => list_dataflows(&mut session)?,
Err(_) => {
bail!("No dora coordinator seems to be running.");
}
},
Command::Stop {
uuid,
name,
grace_duration,
coordinator_addr,
coordinator_port,
} => {
let mut session = DoraConnection::connect((coordinator_addr, coordinator_port).into())
.wrap_err("could not connect to dora coordinator")?;
let result = match (uuid, name) {
(Some(uuid), _) => Some(session.stop_dataflow(uuid, grace_duration)?),
(None, Some(name)) => Some(session.stop_dataflow_by_name(name, grace_duration)?),
(None, None) => stop_dataflow_interactive(grace_duration, &mut session)?,
};
if let Some(result) = result {
handle_dataflow_result(result)?;
}
}
Command::Destroy {
config: _,
coordinator_addr,
coordinator_port,
} => destroy((coordinator_addr, coordinator_port).into())?,
Command::Coordinator {
interface,
port,
control_interface,
control_port,
quiet,
} => {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.context("tokio runtime failed")?;
rt.block_on(async {
let bind = SocketAddr::new(interface, port);
let bind_control = SocketAddr::new(control_interface, control_port);
let (port, task) = dora_coordinator::start(
bind,
bind_control,
futures::stream::empty::<dora_coordinator::Event>(),
)
.await?;
if !quiet {
println!("Listening for incoming daemon connection on {port}");
}
task.await
})
.context("failed to run dora-coordinator")?
}
Command::Daemon {
coordinator_addr,
inter_daemon_addr,
local_listen_port,
machine_id,
run_dataflow,
quiet: _,
} => {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.context("tokio runtime failed")?;
rt.block_on(async {
match run_dataflow {
Some(dataflow_path) => {
tracing::info!("Starting dataflow `{}`", dataflow_path.display());
if coordinator_addr != SocketAddr::new(LOCALHOST, DORA_COORDINATOR_PORT_DEFAULT){
tracing::info!(
"Not using coordinator addr {} as `run_dataflow` is for local dataflow only. Please use the `start` command for remote coordinator",
coordinator_addr
);
}

let result = Daemon::run_dataflow(&dataflow_path, dora_cli_path.to_owned()).await?;
handle_dataflow_result(result)
}
None => {
if coordinator_addr.ip() == LOCALHOST {
tracing::info!("Starting in local mode");
}
Daemon::run(coordinator_addr, machine_id.unwrap_or_default(), inter_daemon_addr, local_listen_port, dora_cli_path.to_owned()).await
}
}
})
.context("failed to run dora-daemon")?
}
Command::Runtime => dora_runtime::main().context("Failed to run dora-runtime")?,
};

Ok(())
}

fn create_dataflow_graph(
dataflow: std::path::PathBuf,
mermaid: bool,
open: bool,
) -> eyre::Result<()> {
if mermaid {
let visualized = dora_cli::visualize_as_mermaid(&dataflow)?;
println!("{visualized}");
println!(
"Paste the above output on https://mermaid.live/ or in a \
```mermaid code block on GitHub to display it."
);
} else {
let html = dora_cli::visualize_as_html(&dataflow)?;

let working_dir = std::env::current_dir().wrap_err("failed to get current working dir")?;
let graph_filename = match dataflow.file_stem().and_then(|n| n.to_str()) {
Some(name) => format!("{name}-graph"),
None => "graph".into(),
};
let mut extra = 0;
let path = loop {
let adjusted_file_name = if extra == 0 {
format!("{graph_filename}.html")
} else {
format!("{graph_filename}.{extra}.html")
};
let path = working_dir.join(&adjusted_file_name);
if path.exists() {
extra += 1;
} else {
break path;
}
};

let mut file = File::create(&path).context("failed to create graph HTML file")?;
file.write_all(html.as_bytes())?;

println!(
"View graph by opening the following in your browser:\n file://{}",
path.display()
);

if open {
webbrowser::open(path.as_os_str().to_str().unwrap())?;
}
}
Ok(())
}

fn list_dataflows(session: &mut DoraConnection) -> Result<(), eyre::ErrReport> {
let list = session.query_running_dataflows()?;

let mut tw = TabWriter::new(vec![]);
tw.write_all(b"UUID\tName\tStatus\n")?;
for entry in list.0 {
let uuid = entry.id.uuid;
let name = entry.id.name.unwrap_or_default();
let status = match entry.status {
dora_core::topics::DataflowStatus::Running => "Running",
dora_core::topics::DataflowStatus::Finished => "Succeeded",
dora_core::topics::DataflowStatus::Failed => "Failed",
};
tw.write_all(format!("{uuid}\t{name}\t{status}\n").as_bytes())?;
}
tw.flush()?;
let formatted = String::from_utf8(tw.into_inner()?)?;

println!("{formatted}");

Ok(())
}

fn stop_dataflow_interactive(
grace_duration: Option<Duration>,
session: &mut DoraConnection,
) -> eyre::Result<Option<DataflowResult>> {
let list = session
.query_running_dataflows()
.wrap_err("failed to query running dataflows")?;
let active = list.get_active();
if active.is_empty() {
eprintln!("No dataflows are running");
Ok(None)
} else {
let selection = inquire::Select::new("Choose dataflow to stop:", active).prompt()?;
Ok(Some(session.stop_dataflow(selection.uuid, grace_duration)?))
}
}

pub fn destroy(coordinator_addr: SocketAddr) -> Result<(), eyre::ErrReport> {
match DoraConnection::connect(coordinator_addr) {
Ok(session) => {
session.destroy()?;
println!("Send destroy command to dora-coordinator");
}
Err(_) => {
eprintln!("Could not connect to dora-coordinator");
}
}

Ok(())
}

fn handle_dataflow_result(result: dora_core::topics::DataflowResult) -> Result<(), eyre::Error> {
if result.is_ok() {
Ok(())
} else {
Err(eyre::eyre!(
"Dataflow {} failed:\n{}",
result.uuid,
FormatDataflowError(&result)
))
}
}

struct FormatDataflowError<'a>(pub &'a DataflowResult);

impl std::fmt::Display for FormatDataflowError<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
writeln!(f)?;
let failed = self
.0
.node_results
.iter()
.filter_map(|(id, r)| r.as_ref().err().map(|e| (id, e)));
let total_failed = failed.clone().count();

let mut non_cascading: Vec<_> = failed
.clone()
.filter(|(_, e)| !matches!(e.cause, NodeErrorCause::Cascading { .. }))
.collect();
non_cascading.sort_by_key(|(_, e)| e.timestamp);
// try to print earliest non-cascading error
let hidden = if !non_cascading.is_empty() {
let printed = non_cascading.len();
for (id, err) in non_cascading {
writeln!(f, "Node `{id}` failed: {err}")?;
}
total_failed - printed
} else {
// no non-cascading errors -> print earliest cascading
let mut all: Vec<_> = failed.collect();
all.sort_by_key(|(_, e)| e.timestamp);
if let Some((id, err)) = all.first() {
write!(f, "Node `{id}` failed: {err}")?;
total_failed - 1
} else {
write!(f, "unknown error")?;
0
}
};

if hidden > 1 {
write!(
f,
"\n\nThere are {hidden} consequential errors. Check the `out/{}` folder for full details.",
self.0.uuid
)?;
}

Ok(())
}
}

+ 6
- 4
binaries/cli/src/template/c/mod.rs View File

@@ -5,12 +5,14 @@ use std::{
path::{Path, PathBuf},
};

use super::Kind;

const NODE: &str = include_str!("node/node-template.c");
const TALKER: &str = include_str!("talker/talker-template.c");
const LISTENER: &str = include_str!("listener/listener-template.c");

pub fn create(args: crate::CommandNew, use_path_deps: bool) -> eyre::Result<()> {
let crate::CommandNew {
pub fn create(args: super::CreateArgs, use_path_deps: bool) -> eyre::Result<()> {
let super::CreateArgs {
kind,
lang: _,
name,
@@ -18,8 +20,8 @@ pub fn create(args: crate::CommandNew, use_path_deps: bool) -> eyre::Result<()>
} = args;

match kind {
crate::Kind::CustomNode => create_custom_node(name, path, NODE),
crate::Kind::Dataflow => create_dataflow(name, path, use_path_deps),
Kind::CustomNode => create_custom_node(name, path, NODE),
Kind::Dataflow => create_dataflow(name, path, use_path_deps),
}
}



+ 6
- 4
binaries/cli/src/template/cxx/mod.rs View File

@@ -4,12 +4,14 @@ use std::{
path::{Path, PathBuf},
};

use super::Kind;

const NODE: &str = include_str!("node-template.cc");
const TALKER: &str = include_str!("talker-template.cc");
const LISTENER: &str = include_str!("listener-template.cc");

pub fn create(args: crate::CommandNew, use_path_deps: bool) -> eyre::Result<()> {
let crate::CommandNew {
pub fn create(args: super::CreateArgs, use_path_deps: bool) -> eyre::Result<()> {
let super::CreateArgs {
kind,
lang: _,
name,
@@ -17,8 +19,8 @@ pub fn create(args: crate::CommandNew, use_path_deps: bool) -> eyre::Result<()>
} = args;

match kind {
crate::Kind::CustomNode => create_custom_node(name, path, NODE),
crate::Kind::Dataflow => create_dataflow(name, path, use_path_deps),
Kind::CustomNode => create_custom_node(name, path, NODE),
Kind::Dataflow => create_dataflow(name, path, use_path_deps),
}
}



+ 36
- 5
binaries/cli/src/template/mod.rs View File

@@ -1,13 +1,44 @@
use std::path::PathBuf;

mod c;
mod cxx;
mod python;
mod rust;

pub fn create(args: crate::CommandNew, use_path_deps: bool) -> eyre::Result<()> {
#[derive(Debug, clap::Args)]
pub struct CreateArgs {
/// The entity that should be created
#[clap(long, value_enum, default_value_t = Kind::Dataflow)]
pub kind: Kind,
/// The programming language that should be used
#[clap(long, value_enum, default_value_t = Lang::Rust)]
pub lang: Lang,
/// Desired name of the entity
pub name: String,
/// Where to create the entity
#[clap(hide = true)]
pub path: Option<PathBuf>,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)]
pub enum Kind {
Dataflow,
CustomNode,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)]
pub enum Lang {
Rust,
Python,
C,
Cxx,
}

pub fn create(args: CreateArgs, use_path_deps: bool) -> eyre::Result<()> {
match args.lang {
crate::Lang::Rust => rust::create(args, use_path_deps),
crate::Lang::Python => python::create(args),
crate::Lang::C => c::create(args, use_path_deps),
crate::Lang::Cxx => cxx::create(args, use_path_deps),
Lang::Rust => rust::create(args, use_path_deps),
Lang::Python => python::create(args),
Lang::C => c::create(args, use_path_deps),
Lang::Cxx => cxx::create(args, use_path_deps),
}
}

+ 6
- 4
binaries/cli/src/template/python/mod.rs View File

@@ -4,12 +4,14 @@ use std::{
path::{Path, PathBuf},
};

use super::Kind;

const NODE_PY: &str = include_str!("node/node-template.py");
const TALKER_PY: &str = include_str!("talker/talker-template.py");
const LISTENER_PY: &str = include_str!("listener/listener-template.py");

pub fn create(args: crate::CommandNew) -> eyre::Result<()> {
let crate::CommandNew {
pub fn create(args: super::CreateArgs) -> eyre::Result<()> {
let super::CreateArgs {
kind,
lang: _,
name,
@@ -17,8 +19,8 @@ pub fn create(args: crate::CommandNew) -> eyre::Result<()> {
} = args;

match kind {
crate::Kind::CustomNode => create_custom_node(name, path, NODE_PY),
crate::Kind::Dataflow => create_dataflow(name, path),
Kind::CustomNode => create_custom_node(name, path, NODE_PY),
Kind::Dataflow => create_dataflow(name, path),
}
}



+ 6
- 4
binaries/cli/src/template/rust/mod.rs View File

@@ -4,13 +4,15 @@ use std::{
path::{Path, PathBuf},
};

use super::Kind;

const MAIN_RS: &str = include_str!("node/main-template.rs");
const TALKER_RS: &str = include_str!("talker/main-template.rs");
const LISTENER_RS: &str = include_str!("listener/main-template.rs");

const VERSION: &str = env!("CARGO_PKG_VERSION");
pub fn create(args: crate::CommandNew, use_path_deps: bool) -> eyre::Result<()> {
let crate::CommandNew {
pub fn create(args: super::CreateArgs, use_path_deps: bool) -> eyre::Result<()> {
let super::CreateArgs {
kind,
lang: _,
name,
@@ -18,8 +20,8 @@ pub fn create(args: crate::CommandNew, use_path_deps: bool) -> eyre::Result<()>
} = args;

match kind {
crate::Kind::CustomNode => create_custom_node(name, path, use_path_deps, MAIN_RS),
crate::Kind::Dataflow => create_dataflow(name, path, use_path_deps),
Kind::CustomNode => create_custom_node(name, path, use_path_deps, MAIN_RS),
Kind::Dataflow => create_dataflow(name, path, use_path_deps),
}
}



+ 9
- 29
binaries/cli/src/up.rs View File

@@ -1,20 +1,21 @@
use crate::{check::daemon_running, connect_to_coordinator, LOCALHOST};
use dora_core::topics::{ControlRequest, DORA_COORDINATOR_PORT_CONTROL_DEFAULT};
use crate::{DoraConnection, LOCALHOST};
use dora_core::topics::DORA_COORDINATOR_PORT_CONTROL_DEFAULT;
use eyre::Context;
use std::{fs, net::SocketAddr, path::Path, process::Command, time::Duration};
use std::{fs, path::Path, process::Command, time::Duration};

#[derive(Debug, Default, serde::Serialize, serde::Deserialize)]
struct UpConfig {}

pub(crate) fn up(config_path: Option<&Path>, dora_cli_path: &Path) -> eyre::Result<()> {
pub fn up(config_path: Option<&Path>, dora_cli_path: &Path) -> eyre::Result<()> {
let UpConfig {} = parse_dora_config(config_path)?;
let coordinator_addr = (LOCALHOST, DORA_COORDINATOR_PORT_CONTROL_DEFAULT).into();
let mut session = match connect_to_coordinator(coordinator_addr) {
let mut session = match DoraConnection::connect(coordinator_addr) {
Ok(session) => session,
Err(_) => {
start_coordinator(dora_cli_path).wrap_err("failed to start dora-coordinator")?;

loop {
match connect_to_coordinator(coordinator_addr) {
match DoraConnection::connect(coordinator_addr) {
Ok(session) => break session,
Err(_) => {
// sleep a bit until the coordinator accepts connections
@@ -25,14 +26,14 @@ pub(crate) fn up(config_path: Option<&Path>, dora_cli_path: &Path) -> eyre::Resu
}
};

if !daemon_running(&mut *session)? {
if !session.daemon_running()? {
start_daemon(dora_cli_path).wrap_err("failed to start dora-daemon")?;

// wait a bit until daemon is connected
let mut i = 0;
const WAIT_S: f32 = 0.1;
loop {
if daemon_running(&mut *session)? {
if session.daemon_running()? {
break;
}
i += 1;
@@ -46,27 +47,6 @@ pub(crate) fn up(config_path: Option<&Path>, dora_cli_path: &Path) -> eyre::Resu
Ok(())
}

pub(crate) fn destroy(
config_path: Option<&Path>,
coordinator_addr: SocketAddr,
) -> Result<(), eyre::ErrReport> {
let UpConfig {} = parse_dora_config(config_path)?;
match connect_to_coordinator(coordinator_addr) {
Ok(mut session) => {
// send destroy command to dora-coordinator
session
.request(&serde_json::to_vec(&ControlRequest::Destroy).unwrap())
.wrap_err("failed to send destroy message")?;
println!("Send destroy command to dora-coordinator");
}
Err(_) => {
eprintln!("Could not connect to dora-coordinator");
}
}

Ok(())
}

fn parse_dora_config(config_path: Option<&Path>) -> Result<UpConfig, eyre::ErrReport> {
let path = config_path.or_else(|| Some(Path::new("dora-config.yml")).filter(|p| p.exists()));
let config = match path {


+ 0
- 3
binaries/coordinator/src/lib.rs View File

@@ -288,7 +288,6 @@ async fn start_inner(
if entry.get_mut().machines.is_empty() {
let finished_dataflow = entry.remove();
let reply = ControlRequestReply::DataflowStopped {
uuid,
result: dataflow_results
.get(&uuid)
.map(|r| dataflow_result(r, uuid, &clock))
@@ -354,7 +353,6 @@ async fn start_inner(
uuid: dataflow_uuid,
},
None => ControlRequestReply::DataflowStopped {
uuid: dataflow_uuid,
result: dataflow_results
.get(&dataflow_uuid)
.map(|r| dataflow_result(r, dataflow_uuid, &clock))
@@ -615,7 +613,6 @@ async fn stop_dataflow_by_uuid(
let Some(dataflow) = running_dataflows.get_mut(&dataflow_uuid) else {
if let Some(result) = dataflow_results.get(&dataflow_uuid) {
let reply = ControlRequestReply::DataflowStopped {
uuid: dataflow_uuid,
result: dataflow_result(result, dataflow_uuid, clock),
};
let _ = reply_sender.send(Ok(reply));


+ 1
- 0
binaries/daemon/src/spawn.rs View File

@@ -36,6 +36,7 @@ use tokio::{
use tracing::error;

/// clock is required for generating timestamps when dropping messages early because queue is full
#[allow(clippy::too_many_arguments)]
pub async fn spawn_node(
dataflow_id: DataflowId,
working_dir: &Path,


+ 2
- 1
libraries/core/src/topics.rs View File

@@ -93,7 +93,7 @@ pub enum ControlRequestReply {
CoordinatorStopped,
DataflowStarted { uuid: Uuid },
DataflowReloaded { uuid: Uuid },
DataflowStopped { uuid: Uuid, result: DataflowResult },
DataflowStopped { result: DataflowResult },
DataflowList(DataflowList),
DestroyOk,
DaemonConnected(bool),
@@ -118,6 +118,7 @@ impl Display for DataflowId {
}

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
#[must_use]
pub struct DataflowResult {
pub uuid: Uuid,
pub timestamp: uhlc::Timestamp,


Loading…
Cancel
Save