Compare commits

...

3 Commits

Author SHA1 Message Date
  Philipp Oppermann d610140e68
Refactor dora library: Add `DoraConnection` struct 1 year ago
  Philipp Oppermann ce18beb232
Pass dora CLI path as argument instead of using `current_exe` 1 year ago
  Philipp Oppermann 987ccde1e7
Split CLI into library and binary part 1 year ago
17 changed files with 532 additions and 440 deletions
Split View
  1. +12
    -12
      binaries/cli/src/attach.rs
  2. +6
    -21
      binaries/cli/src/check.rs
  3. +0
    -50
      binaries/cli/src/formatting.rs
  4. +1
    -47
      binaries/cli/src/graph/mod.rs
  5. +188
    -0
      binaries/cli/src/lib.rs
  6. +6
    -4
      binaries/cli/src/logs.rs
  7. +228
    -240
      binaries/cli/src/main.rs
  8. +6
    -4
      binaries/cli/src/template/c/mod.rs
  9. +6
    -4
      binaries/cli/src/template/cxx/mod.rs
  10. +36
    -5
      binaries/cli/src/template/mod.rs
  11. +6
    -4
      binaries/cli/src/template/python/mod.rs
  12. +6
    -4
      binaries/cli/src/template/rust/mod.rs
  13. +15
    -37
      binaries/cli/src/up.rs
  14. +0
    -3
      binaries/coordinator/src/lib.rs
  15. +11
    -1
      binaries/daemon/src/lib.rs
  16. +3
    -3
      binaries/daemon/src/spawn.rs
  17. +2
    -1
      libraries/core/src/topics.rs

+ 12
- 12
binaries/cli/src/attach.rs View File

@@ -1,9 +1,9 @@
use colored::Colorize;
use communication_layer_request_reply::{TcpConnection, TcpRequestReplyConnection};
use communication_layer_request_reply::TcpConnection;
use dora_core::{
coordinator_messages::LogMessage,
descriptor::{resolve_path, CoreNodeKind, Descriptor},
topics::{ControlRequest, ControlRequestReply},
topics::{ControlRequest, ControlRequestReply, DataflowResult},
};
use eyre::Context;
use notify::event::ModifyKind;
@@ -16,17 +16,17 @@ use std::{path::PathBuf, sync::mpsc, time::Duration};
use tracing::{error, info};
use uuid::Uuid;

use crate::handle_dataflow_result;
pub fn attach_dataflow(
#[allow(clippy::too_many_arguments)]
pub fn attach_to_dataflow(
connection: &mut crate::DoraConnection,
dataflow: Descriptor,
dataflow_path: PathBuf,
dataflow_id: Uuid,
session: &mut TcpRequestReplyConnection,
hot_reload: bool,
coordinator_socket: SocketAddr,
log_level: log::LevelFilter,
) -> Result<(), eyre::ErrReport> {
log_output: &mut impl std::io::Write,
) -> eyre::Result<DataflowResult> {
let (tx, rx) = mpsc::sync_channel(2);

// Generate path hashmap
@@ -181,7 +181,7 @@ pub fn attach_dataflow(
None => "".normal(),
};

println!("{level}{node}{target}: {message}");
writeln!(log_output, "{level}{node}{target}: {message}")?;
continue;
}
Ok(AttachEvent::Log(Err(err))) => {
@@ -190,16 +190,16 @@ pub fn attach_dataflow(
}
};

let reply_raw = session
let reply_raw = connection
.session
.request(&serde_json::to_vec(&control_request)?)
.wrap_err("failed to send request message to coordinator")?;
let result: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match result {
ControlRequestReply::DataflowStarted { uuid: _ } => (),
ControlRequestReply::DataflowStopped { uuid, result } => {
info!("dataflow {uuid} stopped");
break handle_dataflow_result(result, Some(uuid));
ControlRequestReply::DataflowStopped { result } => {
break Ok(result);
}
ControlRequestReply::DataflowReloaded { uuid } => {
info!("dataflow {uuid} reloaded")


+ 6
- 21
binaries/cli/src/check.rs View File

@@ -1,13 +1,12 @@
use crate::connect_to_coordinator;
use communication_layer_request_reply::TcpRequestReplyConnection;
use dora_core::topics::{ControlRequest, ControlRequestReply};
use eyre::{bail, Context};
use eyre::bail;
use std::{
io::{IsTerminal, Write},
net::SocketAddr,
};
use termcolor::{Color, ColorChoice, ColorSpec, WriteColor};

use crate::DoraConnection;

pub fn check_environment(coordinator_addr: SocketAddr) -> eyre::Result<()> {
let mut error_occurred = false;

@@ -20,7 +19,7 @@ pub fn check_environment(coordinator_addr: SocketAddr) -> eyre::Result<()> {

// check whether coordinator is running
write!(stdout, "Dora Coordinator: ")?;
let mut session = match connect_to_coordinator(coordinator_addr) {
let mut session = match DoraConnection::connect(coordinator_addr) {
Ok(session) => {
let _ = stdout.set_color(ColorSpec::new().set_fg(Some(Color::Green)));
writeln!(stdout, "ok")?;
@@ -39,8 +38,8 @@ pub fn check_environment(coordinator_addr: SocketAddr) -> eyre::Result<()> {
// check whether daemon is running
write!(stdout, "Dora Daemon: ")?;
if session
.as_deref_mut()
.map(daemon_running)
.as_mut()
.map(|c| c.daemon_running())
.transpose()?
.unwrap_or(false)
{
@@ -61,17 +60,3 @@ pub fn check_environment(coordinator_addr: SocketAddr) -> eyre::Result<()> {

Ok(())
}

pub fn daemon_running(session: &mut TcpRequestReplyConnection) -> Result<bool, eyre::ErrReport> {
let reply_raw = session
.request(&serde_json::to_vec(&ControlRequest::DaemonConnected).unwrap())
.wrap_err("failed to send DaemonConnected message")?;

let reply = serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
let running = match reply {
ControlRequestReply::DaemonConnected(running) => running,
other => bail!("unexpected reply to daemon connection check: {other:?}"),
};

Ok(running)
}

+ 0
- 50
binaries/cli/src/formatting.rs View File

@@ -1,50 +0,0 @@
use dora_core::topics::{DataflowResult, NodeErrorCause};

pub struct FormatDataflowError<'a>(pub &'a DataflowResult);

impl std::fmt::Display for FormatDataflowError<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
writeln!(f)?;
let failed = self
.0
.node_results
.iter()
.filter_map(|(id, r)| r.as_ref().err().map(|e| (id, e)));
let total_failed = failed.clone().count();

let mut non_cascading: Vec<_> = failed
.clone()
.filter(|(_, e)| !matches!(e.cause, NodeErrorCause::Cascading { .. }))
.collect();
non_cascading.sort_by_key(|(_, e)| e.timestamp);
// try to print earliest non-cascading error
let hidden = if !non_cascading.is_empty() {
let printed = non_cascading.len();
for (id, err) in non_cascading {
writeln!(f, "Node `{id}` failed: {err}")?;
}
total_failed - printed
} else {
// no non-cascading errors -> print earliest cascading
let mut all: Vec<_> = failed.collect();
all.sort_by_key(|(_, e)| e.timestamp);
if let Some((id, err)) = all.first() {
write!(f, "Node `{id}` failed: {err}")?;
total_failed - 1
} else {
write!(f, "unknown error")?;
0
}
};

if hidden > 1 {
write!(
f,
"\n\nThere are {hidden} consequential errors. Check the `out/{}` folder for full details.",
self.0.uuid
)?;
}

Ok(())
}
}

+ 1
- 47
binaries/cli/src/graph/mod.rs View File

@@ -1,56 +1,10 @@
use std::{fs::File, io::Write, path::Path};
use std::path::Path;

use dora_core::descriptor::Descriptor;
use eyre::Context;

const MERMAID_TEMPLATE: &str = include_str!("mermaid-template.html");

pub(crate) fn create(dataflow: std::path::PathBuf, mermaid: bool, open: bool) -> eyre::Result<()> {
if mermaid {
let visualized = visualize_as_mermaid(&dataflow)?;
println!("{visualized}");
println!(
"Paste the above output on https://mermaid.live/ or in a \
```mermaid code block on GitHub to display it."
);
} else {
let html = visualize_as_html(&dataflow)?;

let working_dir = std::env::current_dir().wrap_err("failed to get current working dir")?;
let graph_filename = match dataflow.file_stem().and_then(|n| n.to_str()) {
Some(name) => format!("{name}-graph"),
None => "graph".into(),
};
let mut extra = 0;
let path = loop {
let adjusted_file_name = if extra == 0 {
format!("{graph_filename}.html")
} else {
format!("{graph_filename}.{extra}.html")
};
let path = working_dir.join(&adjusted_file_name);
if path.exists() {
extra += 1;
} else {
break path;
}
};

let mut file = File::create(&path).context("failed to create graph HTML file")?;
file.write_all(html.as_bytes())?;

println!(
"View graph by opening the following in your browser:\n file://{}",
path.display()
);

if open {
webbrowser::open(path.as_os_str().to_str().unwrap())?;
}
}
Ok(())
}

pub fn visualize_as_html(dataflow: &Path) -> eyre::Result<String> {
let mermaid = visualize_as_mermaid(dataflow)?;
Ok(MERMAID_TEMPLATE.replacen("____insert____", &mermaid, 1))


+ 188
- 0
binaries/cli/src/lib.rs View File

@@ -0,0 +1,188 @@
pub use crate::{
build::build as build_dataflow,
check::check_environment,
graph::{visualize_as_html, visualize_as_mermaid},
up::up,
};
use attach::attach_to_dataflow;
use communication_layer_request_reply::{RequestReplyLayer, TcpLayer, TcpRequestReplyConnection};
use dora_core::{
descriptor::Descriptor,
topics::{ControlRequest, ControlRequestReply, DataflowList, DataflowResult},
};
use eyre::{bail, Context};
use std::{
net::{IpAddr, Ipv4Addr, SocketAddr},
path::PathBuf,
time::Duration,
};
use uuid::Uuid;

mod attach;
mod build;
mod check;
mod graph;
mod logs;
pub mod template;
mod up;

pub const LOCALHOST: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
pub const LISTEN_WILDCARD: IpAddr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0));

pub struct DoraConnection {
session: Box<TcpRequestReplyConnection>,
}

impl DoraConnection {
pub fn connect(coordinator_addr: SocketAddr) -> std::io::Result<Self> {
Ok(Self {
session: TcpLayer::new().connect(coordinator_addr)?,
})
}

pub fn daemon_running(&mut self) -> eyre::Result<bool> {
let reply_raw = self
.session
.request(&serde_json::to_vec(&ControlRequest::DaemonConnected).unwrap())
.wrap_err("failed to send DaemonConnected message")?;

let reply = serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
let running = match reply {
ControlRequestReply::DaemonConnected(running) => running,
other => bail!("unexpected reply to daemon connection check: {other:?}"),
};

Ok(running)
}

pub fn query_running_dataflows(&mut self) -> eyre::Result<DataflowList> {
let reply_raw = self
.session
.request(&serde_json::to_vec(&ControlRequest::List).unwrap())
.wrap_err("failed to send list message")?;
let reply: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
let ids = match reply {
ControlRequestReply::DataflowList(list) => list,
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected list dataflow reply: {other:?}"),
};

Ok(ids)
}

pub fn start_dataflow(
&mut self,
dataflow: Descriptor,
name: Option<String>,
local_working_dir: PathBuf,
) -> Result<Uuid, eyre::ErrReport> {
let reply_raw = self
.session
.request(
&serde_json::to_vec(&ControlRequest::Start {
dataflow,
name,
local_working_dir,
})
.unwrap(),
)
.wrap_err("failed to send start dataflow message")?;

let result: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match result {
ControlRequestReply::DataflowStarted { uuid } => Ok(uuid),
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected start dataflow reply: {other:?}"),
}
}

pub fn dataflow_logs(
&mut self,
uuid: Option<Uuid>,
name: Option<String>,
node: String,
) -> eyre::Result<()> {
logs::dataflow_logs(self, uuid, name, node)
}

#[allow(clippy::too_many_arguments)]
pub fn attach_to_dataflow(
&mut self,
dataflow: Descriptor,
dataflow_path: PathBuf,
dataflow_id: Uuid,
hot_reload: bool,
coordinator_socket: SocketAddr,
log_level: log::LevelFilter,
log_output: &mut impl std::io::Write,
) -> eyre::Result<DataflowResult> {
attach_to_dataflow(
self,
dataflow,
dataflow_path,
dataflow_id,
hot_reload,
coordinator_socket,
log_level,
log_output,
)
}

pub fn stop_dataflow(
&mut self,
uuid: Uuid,
grace_duration: Option<Duration>,
) -> eyre::Result<DataflowResult> {
let reply_raw = self
.session
.request(
&serde_json::to_vec(&ControlRequest::Stop {
dataflow_uuid: uuid,
grace_duration,
})
.unwrap(),
)
.wrap_err("failed to send dataflow stop message")?;
let result: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match result {
ControlRequestReply::DataflowStopped { result } => Ok(result),
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected stop dataflow reply: {other:?}"),
}
}

pub fn stop_dataflow_by_name(
&mut self,
name: String,
grace_duration: Option<Duration>,
) -> eyre::Result<DataflowResult> {
let reply_raw = self
.session
.request(
&serde_json::to_vec(&ControlRequest::StopByName {
name,
grace_duration,
})
.unwrap(),
)
.wrap_err("failed to send dataflow stop_by_name message")?;
let result: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match result {
ControlRequestReply::DataflowStopped { result } => Ok(result),
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected stop dataflow reply: {other:?}"),
}
}

pub fn destroy(mut self) -> eyre::Result<()> {
// send destroy command to dora-coordinator
self.session
.request(&serde_json::to_vec(&ControlRequest::Destroy).unwrap())
.wrap_err("failed to send destroy message")?;
Ok(())
}
}

+ 6
- 4
binaries/cli/src/logs.rs View File

@@ -1,18 +1,20 @@
use communication_layer_request_reply::TcpRequestReplyConnection;
use dora_core::topics::{ControlRequest, ControlRequestReply};
use eyre::{bail, Context, Result};
use uuid::Uuid;

use bat::{Input, PrettyPrinter};

pub fn logs(
session: &mut TcpRequestReplyConnection,
use crate::DoraConnection;

pub fn dataflow_logs(
connection: &mut DoraConnection,
uuid: Option<Uuid>,
name: Option<String>,
node: String,
) -> Result<()> {
let logs = {
let reply_raw = session
let reply_raw = connection
.session
.request(
&serde_json::to_vec(&ControlRequest::Logs {
uuid,


+ 228
- 240
binaries/cli/src/main.rs View File

@@ -1,12 +1,18 @@
use attach::attach_dataflow;
use std::{
fs::File,
io::Write,
net::{IpAddr, SocketAddr},
path::PathBuf,
time::Duration,
};

use clap::Parser;
use colored::Colorize;
use communication_layer_request_reply::{RequestReplyLayer, TcpLayer, TcpRequestReplyConnection};
use dora_coordinator::Event;
use dora_cli::{check_environment, template, DoraConnection, LISTEN_WILDCARD, LOCALHOST};
use dora_core::{
descriptor::Descriptor,
topics::{
ControlRequest, ControlRequestReply, DataflowList, DORA_COORDINATOR_PORT_CONTROL_DEFAULT,
DataflowResult, NodeErrorCause, DORA_COORDINATOR_PORT_CONTROL_DEFAULT,
DORA_COORDINATOR_PORT_DEFAULT, DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT,
},
};
@@ -14,41 +20,63 @@ use dora_daemon::Daemon;
#[cfg(feature = "tracing")]
use dora_tracing::set_up_tracing;
use dora_tracing::set_up_tracing_opts;
use duration_str::parse;
use eyre::{bail, Context};
use formatting::FormatDataflowError;
use std::{io::Write, net::SocketAddr};
use std::{
net::{IpAddr, Ipv4Addr},
path::PathBuf,
time::Duration,
};
use tabwriter::TabWriter;
use tokio::runtime::Builder;
use tracing::info;
use uuid::Uuid;

mod attach;
mod build;
mod check;
mod formatting;
mod graph;
mod logs;
mod template;
mod up;
fn main() {
if let Err(err) = main_inner() {
eprintln!("\n\n{}", "[ERROR]".bold().red());
eprintln!("{err:#}");
std::process::exit(1);
}
}

fn main_inner() -> eyre::Result<()> {
let args = Args::parse();

#[cfg(feature = "tracing")]
match &args.command {
Command::Daemon {
quiet, machine_id, ..
} => {
let name = "dora-daemon";
let filename = machine_id
.as_ref()
.map(|id| format!("{name}-{id}"))
.unwrap_or(name.to_string());
set_up_tracing_opts(name, !quiet, Some(&filename))
.context("failed to set up tracing subscriber")?;
}
Command::Runtime => {
// Do not set the runtime in the cli.
}
Command::Coordinator { quiet, .. } => {
let name = "dora-coordinator";
set_up_tracing_opts(name, !quiet, Some(name))
.context("failed to set up tracing subscriber")?;
}
_ => {
set_up_tracing("dora-cli").context("failed to set up tracing subscriber")?;
}
};

const LOCALHOST: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
const LISTEN_WILDCARD: IpAddr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0));
let dora_cli_path =
std::env::current_exe().wrap_err("failed to get current executable path")?;
run(args.command, dora_cli_path)
}

#[derive(Debug, clap::Parser)]
#[clap(version)]
struct Args {
pub struct Args {
#[clap(subcommand)]
command: Command,
}

/// dora-rs cli client
#[derive(Debug, clap::Subcommand)]
enum Command {
pub enum Command {
/// Check if the coordinator and the daemon is running.
Check {
/// Path to the dataflow descriptor file (enables additional checks)
@@ -82,7 +110,7 @@ enum Command {
/// Generate a new project or node. Choose the language between Rust, Python, C or C++.
New {
#[clap(flatten)]
args: CommandNew,
args: template::CreateArgs,
#[clap(hide = true, long)]
internal_create_with_path_dependencies: bool,
},
@@ -137,7 +165,7 @@ enum Command {
name: Option<String>,
/// Kill the dataflow if it doesn't stop after the given duration
#[clap(long, value_name = "DURATION")]
#[arg(value_parser = parse)]
#[arg(value_parser = duration_str::parse)]
grace_duration: Option<Duration>,
/// Address of the dora coordinator
#[clap(long, value_name = "IP", default_value_t = LOCALHOST)]
@@ -219,21 +247,6 @@ enum Command {
},
}

#[derive(Debug, clap::Args)]
pub struct CommandNew {
/// The entity that should be created
#[clap(long, value_enum, default_value_t = Kind::Dataflow)]
kind: Kind,
/// The programming language that should be used
#[clap(long, value_enum, default_value_t = Lang::Rust)]
lang: Lang,
/// Desired name of the entity
name: String,
/// Where to create the entity
#[clap(hide = true)]
path: Option<PathBuf>,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)]
enum Kind {
Dataflow,
@@ -248,50 +261,14 @@ enum Lang {
Cxx,
}

fn main() {
if let Err(err) = run() {
eprintln!("\n\n{}", "[ERROR]".bold().red());
eprintln!("{err:#}");
std::process::exit(1);
}
}

fn run() -> eyre::Result<()> {
let args = Args::parse();

#[cfg(feature = "tracing")]
match &args.command {
Command::Daemon {
quiet, machine_id, ..
} => {
let name = "dora-daemon";
let filename = machine_id
.as_ref()
.map(|id| format!("{name}-{id}"))
.unwrap_or(name.to_string());
set_up_tracing_opts(name, !quiet, Some(&filename))
.context("failed to set up tracing subscriber")?;
}
Command::Runtime => {
// Do not set the runtime in the cli.
}
Command::Coordinator { quiet, .. } => {
let name = "dora-coordinator";
set_up_tracing_opts(name, !quiet, Some(name))
.context("failed to set up tracing subscriber")?;
}
_ => {
set_up_tracing("dora-cli").context("failed to set up tracing subscriber")?;
}
};

pub fn run(command: Command, dora_cli_path: PathBuf) -> eyre::Result<()> {
let log_level = env_logger::Builder::new()
.filter_level(log::LevelFilter::Info)
.parse_default_env()
.build()
.filter();

match args.command {
match command {
Command::Check {
dataflow,
coordinator_addr,
@@ -305,26 +282,26 @@ fn run() -> eyre::Result<()> {
.ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))?
.to_owned();
Descriptor::blocking_read(&dataflow)?.check(&working_dir)?;
check::check_environment((coordinator_addr, coordinator_port).into())?
check_environment((coordinator_addr, coordinator_port).into())?
}
None => check::check_environment((coordinator_addr, coordinator_port).into())?,
None => check_environment((coordinator_addr, coordinator_port).into())?,
},
Command::Graph {
dataflow,
mermaid,
open,
} => {
graph::create(dataflow, mermaid, open)?;
create_dataflow_graph(dataflow, mermaid, open)?;
}
Command::Build { dataflow } => {
build::build(&dataflow)?;
dora_cli::build_dataflow(&dataflow)?;
}
Command::New {
args,
internal_create_with_path_dependencies,
} => template::create(args, internal_create_with_path_dependencies)?,
Command::Up { config } => {
up::up(config.as_deref())?;
dora_cli::up(config.as_deref(), &dora_cli_path)?;
}
Command::Logs {
dataflow,
@@ -332,14 +309,15 @@ fn run() -> eyre::Result<()> {
coordinator_addr,
coordinator_port,
} => {
let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into())
let mut session = DoraConnection::connect((coordinator_addr, coordinator_port).into())
.wrap_err("failed to connect to dora coordinator")?;
let list = query_running_dataflows(&mut *session)
let list = session
.query_running_dataflows()
.wrap_err("failed to query running dataflows")?;
if let Some(dataflow) = dataflow {
let uuid = Uuid::parse_str(&dataflow).ok();
let name = if uuid.is_some() { None } else { Some(dataflow) };
logs::logs(&mut *session, uuid, name, node)?
session.dataflow_logs(uuid, name, node)?
} else {
let active = list.get_active();
let uuid = match &active[..] {
@@ -347,7 +325,7 @@ fn run() -> eyre::Result<()> {
[uuid] => uuid.clone(),
_ => inquire::Select::new("Choose dataflow to show logs:", active).prompt()?,
};
logs::logs(&mut *session, Some(uuid.uuid), None, node)?
session.dataflow_logs(Some(uuid.uuid), None, node)?
}
}
Command::Start {
@@ -376,14 +354,11 @@ fn run() -> eyre::Result<()> {
}

let coordinator_socket = (coordinator_addr, coordinator_port).into();
let mut session = connect_to_coordinator(coordinator_socket)
let mut session = DoraConnection::connect(coordinator_socket)
.wrap_err("failed to connect to dora coordinator")?;
let dataflow_id = start_dataflow(
dataflow_descriptor.clone(),
name,
working_dir,
&mut *session,
)?;
let dataflow_id =
session.start_dataflow(dataflow_descriptor.clone(), name, working_dir)?;
eprintln!("{dataflow_id}");

let attach = match (attach, detach) {
(true, true) => eyre::bail!("both `--attach` and `--detach` are given"),
@@ -396,22 +371,24 @@ fn run() -> eyre::Result<()> {
};

if attach {
attach_dataflow(
let result = session.attach_to_dataflow(
dataflow_descriptor,
dataflow,
dataflow_id,
&mut *session,
hot_reload,
coordinator_socket,
log_level,
)?
&mut std::io::stdout(),
)?;
info!("dataflow {} stopped", result.uuid);
handle_dataflow_result(result)?;
}
}
Command::List {
coordinator_addr,
coordinator_port,
} => match connect_to_coordinator((coordinator_addr, coordinator_port).into()) {
Ok(mut session) => list(&mut *session)?,
} => match DoraConnection::connect((coordinator_addr, coordinator_port).into()) {
Ok(mut session) => list_dataflows(&mut session)?,
Err(_) => {
bail!("No dora coordinator seems to be running.");
}
@@ -423,22 +400,22 @@ fn run() -> eyre::Result<()> {
coordinator_addr,
coordinator_port,
} => {
let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into())
let mut session = DoraConnection::connect((coordinator_addr, coordinator_port).into())
.wrap_err("could not connect to dora coordinator")?;
match (uuid, name) {
(Some(uuid), _) => stop_dataflow(uuid, grace_duration, &mut *session)?,
(None, Some(name)) => stop_dataflow_by_name(name, grace_duration, &mut *session)?,
(None, None) => stop_dataflow_interactive(grace_duration, &mut *session)?,
let result = match (uuid, name) {
(Some(uuid), _) => Some(session.stop_dataflow(uuid, grace_duration)?),
(None, Some(name)) => Some(session.stop_dataflow_by_name(name, grace_duration)?),
(None, None) => stop_dataflow_interactive(grace_duration, &mut session)?,
};
if let Some(result) = result {
handle_dataflow_result(result)?;
}
}
Command::Destroy {
config,
config: _,
coordinator_addr,
coordinator_port,
} => up::destroy(
config.as_deref(),
(coordinator_addr, coordinator_port).into(),
)?,
} => destroy((coordinator_addr, coordinator_port).into())?,
Command::Coordinator {
interface,
port,
@@ -446,16 +423,19 @@ fn run() -> eyre::Result<()> {
control_port,
quiet,
} => {
let rt = Builder::new_multi_thread()
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.context("tokio runtime failed")?;
rt.block_on(async {
let bind = SocketAddr::new(interface, port);
let bind_control = SocketAddr::new(control_interface, control_port);
let (port, task) =
dora_coordinator::start(bind, bind_control, futures::stream::empty::<Event>())
.await?;
let (port, task) = dora_coordinator::start(
bind,
bind_control,
futures::stream::empty::<dora_coordinator::Event>(),
)
.await?;
if !quiet {
println!("Listening for incoming daemon connection on {port}");
}
@@ -471,7 +451,7 @@ fn run() -> eyre::Result<()> {
run_dataflow,
quiet: _,
} => {
let rt = Builder::new_multi_thread()
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.context("tokio runtime failed")?;
@@ -486,14 +466,14 @@ fn run() -> eyre::Result<()> {
);
}

let result = Daemon::run_dataflow(&dataflow_path).await?;
handle_dataflow_result(result, None)
let result = Daemon::run_dataflow(&dataflow_path, dora_cli_path.to_owned()).await?;
handle_dataflow_result(result)
}
None => {
if coordinator_addr.ip() == LOCALHOST {
tracing::info!("Starting in local mode");
}
Daemon::run(coordinator_addr, machine_id.unwrap_or_default(), inter_daemon_addr, local_listen_port).await
Daemon::run(coordinator_addr, machine_id.unwrap_or_default(), inter_daemon_addr, local_listen_port, dora_cli_path.to_owned()).await
}
}
})
@@ -505,121 +485,58 @@ fn run() -> eyre::Result<()> {
Ok(())
}

fn start_dataflow(
dataflow: Descriptor,
name: Option<String>,
local_working_dir: PathBuf,
session: &mut TcpRequestReplyConnection,
) -> Result<Uuid, eyre::ErrReport> {
let reply_raw = session
.request(
&serde_json::to_vec(&ControlRequest::Start {
dataflow,
name,
local_working_dir,
})
.unwrap(),
)
.wrap_err("failed to send start dataflow message")?;

let result: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match result {
ControlRequestReply::DataflowStarted { uuid } => {
eprintln!("{uuid}");
Ok(uuid)
}
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected start dataflow reply: {other:?}"),
}
}

fn stop_dataflow_interactive(
grace_duration: Option<Duration>,
session: &mut TcpRequestReplyConnection,
fn create_dataflow_graph(
dataflow: std::path::PathBuf,
mermaid: bool,
open: bool,
) -> eyre::Result<()> {
let list = query_running_dataflows(session).wrap_err("failed to query running dataflows")?;
let active = list.get_active();
if active.is_empty() {
eprintln!("No dataflows are running");
if mermaid {
let visualized = dora_cli::visualize_as_mermaid(&dataflow)?;
println!("{visualized}");
println!(
"Paste the above output on https://mermaid.live/ or in a \
```mermaid code block on GitHub to display it."
);
} else {
let selection = inquire::Select::new("Choose dataflow to stop:", active).prompt()?;
stop_dataflow(selection.uuid, grace_duration, session)?;
}
let html = dora_cli::visualize_as_html(&dataflow)?;

Ok(())
}
let working_dir = std::env::current_dir().wrap_err("failed to get current working dir")?;
let graph_filename = match dataflow.file_stem().and_then(|n| n.to_str()) {
Some(name) => format!("{name}-graph"),
None => "graph".into(),
};
let mut extra = 0;
let path = loop {
let adjusted_file_name = if extra == 0 {
format!("{graph_filename}.html")
} else {
format!("{graph_filename}.{extra}.html")
};
let path = working_dir.join(&adjusted_file_name);
if path.exists() {
extra += 1;
} else {
break path;
}
};

fn stop_dataflow(
uuid: Uuid,
grace_duration: Option<Duration>,
session: &mut TcpRequestReplyConnection,
) -> Result<(), eyre::ErrReport> {
let reply_raw = session
.request(
&serde_json::to_vec(&ControlRequest::Stop {
dataflow_uuid: uuid,
grace_duration,
})
.unwrap(),
)
.wrap_err("failed to send dataflow stop message")?;
let result: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match result {
ControlRequestReply::DataflowStopped { uuid, result } => {
handle_dataflow_result(result, Some(uuid))
}
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected stop dataflow reply: {other:?}"),
}
}
let mut file = File::create(&path).context("failed to create graph HTML file")?;
file.write_all(html.as_bytes())?;

fn handle_dataflow_result(
result: dora_core::topics::DataflowResult,
uuid: Option<Uuid>,
) -> Result<(), eyre::Error> {
if result.is_ok() {
Ok(())
} else {
Err(match uuid {
Some(uuid) => {
eyre::eyre!("Dataflow {uuid} failed:\n{}", FormatDataflowError(&result))
}
None => {
eyre::eyre!("Dataflow failed:\n{}", FormatDataflowError(&result))
}
})
}
}
println!(
"View graph by opening the following in your browser:\n file://{}",
path.display()
);

fn stop_dataflow_by_name(
name: String,
grace_duration: Option<Duration>,
session: &mut TcpRequestReplyConnection,
) -> Result<(), eyre::ErrReport> {
let reply_raw = session
.request(
&serde_json::to_vec(&ControlRequest::StopByName {
name,
grace_duration,
})
.unwrap(),
)
.wrap_err("failed to send dataflow stop_by_name message")?;
let result: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match result {
ControlRequestReply::DataflowStopped { uuid, result } => {
handle_dataflow_result(result, Some(uuid))
if open {
webbrowser::open(path.as_os_str().to_str().unwrap())?;
}
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected stop dataflow reply: {other:?}"),
}
Ok(())
}

fn list(session: &mut TcpRequestReplyConnection) -> Result<(), eyre::ErrReport> {
let list = query_running_dataflows(session)?;
fn list_dataflows(session: &mut DoraConnection) -> Result<(), eyre::ErrReport> {
let list = session.query_running_dataflows()?;

let mut tw = TabWriter::new(vec![]);
tw.write_all(b"UUID\tName\tStatus\n")?;
@@ -641,23 +558,94 @@ fn list(session: &mut TcpRequestReplyConnection) -> Result<(), eyre::ErrReport>
Ok(())
}

fn query_running_dataflows(session: &mut TcpRequestReplyConnection) -> eyre::Result<DataflowList> {
let reply_raw = session
.request(&serde_json::to_vec(&ControlRequest::List).unwrap())
.wrap_err("failed to send list message")?;
let reply: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
let ids = match reply {
ControlRequestReply::DataflowList(list) => list,
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected list dataflow reply: {other:?}"),
};
fn stop_dataflow_interactive(
grace_duration: Option<Duration>,
session: &mut DoraConnection,
) -> eyre::Result<Option<DataflowResult>> {
let list = session
.query_running_dataflows()
.wrap_err("failed to query running dataflows")?;
let active = list.get_active();
if active.is_empty() {
eprintln!("No dataflows are running");
Ok(None)
} else {
let selection = inquire::Select::new("Choose dataflow to stop:", active).prompt()?;
Ok(Some(session.stop_dataflow(selection.uuid, grace_duration)?))
}
}

Ok(ids)
pub fn destroy(coordinator_addr: SocketAddr) -> Result<(), eyre::ErrReport> {
match DoraConnection::connect(coordinator_addr) {
Ok(session) => {
session.destroy()?;
println!("Send destroy command to dora-coordinator");
}
Err(_) => {
eprintln!("Could not connect to dora-coordinator");
}
}

Ok(())
}

fn connect_to_coordinator(
coordinator_addr: SocketAddr,
) -> std::io::Result<Box<TcpRequestReplyConnection>> {
TcpLayer::new().connect(coordinator_addr)
fn handle_dataflow_result(result: dora_core::topics::DataflowResult) -> Result<(), eyre::Error> {
if result.is_ok() {
Ok(())
} else {
Err(eyre::eyre!(
"Dataflow {} failed:\n{}",
result.uuid,
FormatDataflowError(&result)
))
}
}

struct FormatDataflowError<'a>(pub &'a DataflowResult);

impl std::fmt::Display for FormatDataflowError<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
writeln!(f)?;
let failed = self
.0
.node_results
.iter()
.filter_map(|(id, r)| r.as_ref().err().map(|e| (id, e)));
let total_failed = failed.clone().count();

let mut non_cascading: Vec<_> = failed
.clone()
.filter(|(_, e)| !matches!(e.cause, NodeErrorCause::Cascading { .. }))
.collect();
non_cascading.sort_by_key(|(_, e)| e.timestamp);
// try to print earliest non-cascading error
let hidden = if !non_cascading.is_empty() {
let printed = non_cascading.len();
for (id, err) in non_cascading {
writeln!(f, "Node `{id}` failed: {err}")?;
}
total_failed - printed
} else {
// no non-cascading errors -> print earliest cascading
let mut all: Vec<_> = failed.collect();
all.sort_by_key(|(_, e)| e.timestamp);
if let Some((id, err)) = all.first() {
write!(f, "Node `{id}` failed: {err}")?;
total_failed - 1
} else {
write!(f, "unknown error")?;
0
}
};

if hidden > 1 {
write!(
f,
"\n\nThere are {hidden} consequential errors. Check the `out/{}` folder for full details.",
self.0.uuid
)?;
}

Ok(())
}
}

+ 6
- 4
binaries/cli/src/template/c/mod.rs View File

@@ -5,12 +5,14 @@ use std::{
path::{Path, PathBuf},
};

use super::Kind;

const NODE: &str = include_str!("node/node-template.c");
const TALKER: &str = include_str!("talker/talker-template.c");
const LISTENER: &str = include_str!("listener/listener-template.c");

pub fn create(args: crate::CommandNew, use_path_deps: bool) -> eyre::Result<()> {
let crate::CommandNew {
pub fn create(args: super::CreateArgs, use_path_deps: bool) -> eyre::Result<()> {
let super::CreateArgs {
kind,
lang: _,
name,
@@ -18,8 +20,8 @@ pub fn create(args: crate::CommandNew, use_path_deps: bool) -> eyre::Result<()>
} = args;

match kind {
crate::Kind::CustomNode => create_custom_node(name, path, NODE),
crate::Kind::Dataflow => create_dataflow(name, path, use_path_deps),
Kind::CustomNode => create_custom_node(name, path, NODE),
Kind::Dataflow => create_dataflow(name, path, use_path_deps),
}
}



+ 6
- 4
binaries/cli/src/template/cxx/mod.rs View File

@@ -4,12 +4,14 @@ use std::{
path::{Path, PathBuf},
};

use super::Kind;

const NODE: &str = include_str!("node-template.cc");
const TALKER: &str = include_str!("talker-template.cc");
const LISTENER: &str = include_str!("listener-template.cc");

pub fn create(args: crate::CommandNew, use_path_deps: bool) -> eyre::Result<()> {
let crate::CommandNew {
pub fn create(args: super::CreateArgs, use_path_deps: bool) -> eyre::Result<()> {
let super::CreateArgs {
kind,
lang: _,
name,
@@ -17,8 +19,8 @@ pub fn create(args: crate::CommandNew, use_path_deps: bool) -> eyre::Result<()>
} = args;

match kind {
crate::Kind::CustomNode => create_custom_node(name, path, NODE),
crate::Kind::Dataflow => create_dataflow(name, path, use_path_deps),
Kind::CustomNode => create_custom_node(name, path, NODE),
Kind::Dataflow => create_dataflow(name, path, use_path_deps),
}
}



+ 36
- 5
binaries/cli/src/template/mod.rs View File

@@ -1,13 +1,44 @@
use std::path::PathBuf;

mod c;
mod cxx;
mod python;
mod rust;

pub fn create(args: crate::CommandNew, use_path_deps: bool) -> eyre::Result<()> {
#[derive(Debug, clap::Args)]
pub struct CreateArgs {
/// The entity that should be created
#[clap(long, value_enum, default_value_t = Kind::Dataflow)]
pub kind: Kind,
/// The programming language that should be used
#[clap(long, value_enum, default_value_t = Lang::Rust)]
pub lang: Lang,
/// Desired name of the entity
pub name: String,
/// Where to create the entity
#[clap(hide = true)]
pub path: Option<PathBuf>,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)]
pub enum Kind {
Dataflow,
CustomNode,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)]
pub enum Lang {
Rust,
Python,
C,
Cxx,
}

pub fn create(args: CreateArgs, use_path_deps: bool) -> eyre::Result<()> {
match args.lang {
crate::Lang::Rust => rust::create(args, use_path_deps),
crate::Lang::Python => python::create(args),
crate::Lang::C => c::create(args, use_path_deps),
crate::Lang::Cxx => cxx::create(args, use_path_deps),
Lang::Rust => rust::create(args, use_path_deps),
Lang::Python => python::create(args),
Lang::C => c::create(args, use_path_deps),
Lang::Cxx => cxx::create(args, use_path_deps),
}
}

+ 6
- 4
binaries/cli/src/template/python/mod.rs View File

@@ -4,12 +4,14 @@ use std::{
path::{Path, PathBuf},
};

use super::Kind;

const NODE_PY: &str = include_str!("node/node-template.py");
const TALKER_PY: &str = include_str!("talker/talker-template.py");
const LISTENER_PY: &str = include_str!("listener/listener-template.py");

pub fn create(args: crate::CommandNew) -> eyre::Result<()> {
let crate::CommandNew {
pub fn create(args: super::CreateArgs) -> eyre::Result<()> {
let super::CreateArgs {
kind,
lang: _,
name,
@@ -17,8 +19,8 @@ pub fn create(args: crate::CommandNew) -> eyre::Result<()> {
} = args;

match kind {
crate::Kind::CustomNode => create_custom_node(name, path, NODE_PY),
crate::Kind::Dataflow => create_dataflow(name, path),
Kind::CustomNode => create_custom_node(name, path, NODE_PY),
Kind::Dataflow => create_dataflow(name, path),
}
}



+ 6
- 4
binaries/cli/src/template/rust/mod.rs View File

@@ -4,13 +4,15 @@ use std::{
path::{Path, PathBuf},
};

use super::Kind;

const MAIN_RS: &str = include_str!("node/main-template.rs");
const TALKER_RS: &str = include_str!("talker/main-template.rs");
const LISTENER_RS: &str = include_str!("listener/main-template.rs");

const VERSION: &str = env!("CARGO_PKG_VERSION");
pub fn create(args: crate::CommandNew, use_path_deps: bool) -> eyre::Result<()> {
let crate::CommandNew {
pub fn create(args: super::CreateArgs, use_path_deps: bool) -> eyre::Result<()> {
let super::CreateArgs {
kind,
lang: _,
name,
@@ -18,8 +20,8 @@ pub fn create(args: crate::CommandNew, use_path_deps: bool) -> eyre::Result<()>
} = args;

match kind {
crate::Kind::CustomNode => create_custom_node(name, path, use_path_deps, MAIN_RS),
crate::Kind::Dataflow => create_dataflow(name, path, use_path_deps),
Kind::CustomNode => create_custom_node(name, path, use_path_deps, MAIN_RS),
Kind::Dataflow => create_dataflow(name, path, use_path_deps),
}
}



+ 15
- 37
binaries/cli/src/up.rs View File

@@ -1,20 +1,21 @@
use crate::{check::daemon_running, connect_to_coordinator, LOCALHOST};
use dora_core::topics::{ControlRequest, DORA_COORDINATOR_PORT_CONTROL_DEFAULT};
use crate::{DoraConnection, LOCALHOST};
use dora_core::topics::DORA_COORDINATOR_PORT_CONTROL_DEFAULT;
use eyre::Context;
use std::{fs, net::SocketAddr, path::Path, process::Command, time::Duration};
use std::{fs, path::Path, process::Command, time::Duration};

#[derive(Debug, Default, serde::Serialize, serde::Deserialize)]
struct UpConfig {}

pub(crate) fn up(config_path: Option<&Path>) -> eyre::Result<()> {
pub fn up(config_path: Option<&Path>, dora_cli_path: &Path) -> eyre::Result<()> {
let UpConfig {} = parse_dora_config(config_path)?;
let coordinator_addr = (LOCALHOST, DORA_COORDINATOR_PORT_CONTROL_DEFAULT).into();
let mut session = match connect_to_coordinator(coordinator_addr) {
let mut session = match DoraConnection::connect(coordinator_addr) {
Ok(session) => session,
Err(_) => {
start_coordinator().wrap_err("failed to start dora-coordinator")?;
start_coordinator(dora_cli_path).wrap_err("failed to start dora-coordinator")?;

loop {
match connect_to_coordinator(coordinator_addr) {
match DoraConnection::connect(coordinator_addr) {
Ok(session) => break session,
Err(_) => {
// sleep a bit until the coordinator accepts connections
@@ -25,14 +26,14 @@ pub(crate) fn up(config_path: Option<&Path>) -> eyre::Result<()> {
}
};

if !daemon_running(&mut *session)? {
start_daemon().wrap_err("failed to start dora-daemon")?;
if !session.daemon_running()? {
start_daemon(dora_cli_path).wrap_err("failed to start dora-daemon")?;

// wait a bit until daemon is connected
let mut i = 0;
const WAIT_S: f32 = 0.1;
loop {
if daemon_running(&mut *session)? {
if session.daemon_running()? {
break;
}
i += 1;
@@ -46,27 +47,6 @@ pub(crate) fn up(config_path: Option<&Path>) -> eyre::Result<()> {
Ok(())
}

pub(crate) fn destroy(
config_path: Option<&Path>,
coordinator_addr: SocketAddr,
) -> Result<(), eyre::ErrReport> {
let UpConfig {} = parse_dora_config(config_path)?;
match connect_to_coordinator(coordinator_addr) {
Ok(mut session) => {
// send destroy command to dora-coordinator
session
.request(&serde_json::to_vec(&ControlRequest::Destroy).unwrap())
.wrap_err("failed to send destroy message")?;
println!("Send destroy command to dora-coordinator");
}
Err(_) => {
eprintln!("Could not connect to dora-coordinator");
}
}

Ok(())
}

fn parse_dora_config(config_path: Option<&Path>) -> Result<UpConfig, eyre::ErrReport> {
let path = config_path.or_else(|| Some(Path::new("dora-config.yml")).filter(|p| p.exists()));
let config = match path {
@@ -81,9 +61,8 @@ fn parse_dora_config(config_path: Option<&Path>) -> Result<UpConfig, eyre::ErrRe
Ok(config)
}

fn start_coordinator() -> eyre::Result<()> {
let mut cmd =
Command::new(std::env::current_exe().wrap_err("failed to get current executable path")?);
fn start_coordinator(dora_cli_path: &Path) -> eyre::Result<()> {
let mut cmd = Command::new(dora_cli_path);
cmd.arg("coordinator");
cmd.arg("--quiet");
cmd.spawn().wrap_err("failed to run `dora coordinator`")?;
@@ -93,9 +72,8 @@ fn start_coordinator() -> eyre::Result<()> {
Ok(())
}

fn start_daemon() -> eyre::Result<()> {
let mut cmd =
Command::new(std::env::current_exe().wrap_err("failed to get current executable path")?);
fn start_daemon(dora_cli_path: &Path) -> eyre::Result<()> {
let mut cmd = Command::new(dora_cli_path);
cmd.arg("daemon");
cmd.arg("--quiet");
cmd.spawn().wrap_err("failed to run `dora daemon`")?;


+ 0
- 3
binaries/coordinator/src/lib.rs View File

@@ -288,7 +288,6 @@ async fn start_inner(
if entry.get_mut().machines.is_empty() {
let finished_dataflow = entry.remove();
let reply = ControlRequestReply::DataflowStopped {
uuid,
result: dataflow_results
.get(&uuid)
.map(|r| dataflow_result(r, uuid, &clock))
@@ -354,7 +353,6 @@ async fn start_inner(
uuid: dataflow_uuid,
},
None => ControlRequestReply::DataflowStopped {
uuid: dataflow_uuid,
result: dataflow_results
.get(&dataflow_uuid)
.map(|r| dataflow_result(r, dataflow_uuid, &clock))
@@ -615,7 +613,6 @@ async fn stop_dataflow_by_uuid(
let Some(dataflow) = running_dataflows.get_mut(&dataflow_uuid) else {
if let Some(result) = dataflow_results.get(&dataflow_uuid) {
let reply = ControlRequestReply::DataflowStopped {
uuid: dataflow_uuid,
result: dataflow_result(result, dataflow_uuid, clock),
};
let _ = reply_sender.send(Ok(reply));


+ 11
- 1
binaries/daemon/src/lib.rs View File

@@ -86,6 +86,7 @@ pub struct Daemon {
dataflow_node_results: BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>,

clock: Arc<uhlc::HLC>,
dora_cli_path: PathBuf,
}

type DaemonRunResult = BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>;
@@ -96,6 +97,7 @@ impl Daemon {
machine_id: String,
inter_daemon_addr: SocketAddr,
local_listen_port: u16,
dora_cli_path: PathBuf,
) -> eyre::Result<()> {
let clock = Arc::new(HLC::default());

@@ -150,12 +152,16 @@ impl Daemon {
machine_id,
None,
clock,
dora_cli_path,
)
.await
.map(|_| ())
}

pub async fn run_dataflow(dataflow_path: &Path) -> eyre::Result<DataflowResult> {
pub async fn run_dataflow(
dataflow_path: &Path,
dora_cli_path: PathBuf,
) -> eyre::Result<DataflowResult> {
let working_dir = dataflow_path
.canonicalize()
.context("failed to canoncialize dataflow path")?
@@ -200,6 +206,7 @@ impl Daemon {
"".to_string(),
Some(exit_when_done),
clock.clone(),
dora_cli_path,
);

let spawn_result = reply_rx
@@ -230,6 +237,7 @@ impl Daemon {
machine_id: String,
exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>,
clock: Arc<HLC>,
dora_cli_path: PathBuf,
) -> eyre::Result<DaemonRunResult> {
let coordinator_connection = match coordinator_addr {
Some(addr) => {
@@ -256,6 +264,7 @@ impl Daemon {
exit_when_done,
dataflow_node_results: BTreeMap::new(),
clock,
dora_cli_path,
};

let dora_events = ReceiverStream::new(dora_events_rx);
@@ -667,6 +676,7 @@ impl Daemon {
dataflow_descriptor.clone(),
self.clock.clone(),
node_stderr_most_recent,
&self.dora_cli_path,
)
.await
.wrap_err_with(|| format!("failed to spawn node `{node_id}`"))


+ 3
- 3
binaries/daemon/src/spawn.rs View File

@@ -36,6 +36,7 @@ use tokio::{
use tracing::error;

/// clock is required for generating timestamps when dropping messages early because queue is full
#[allow(clippy::too_many_arguments)]
pub async fn spawn_node(
dataflow_id: DataflowId,
working_dir: &Path,
@@ -44,6 +45,7 @@ pub async fn spawn_node(
dataflow_descriptor: Descriptor,
clock: Arc<HLC>,
node_stderr_most_recent: Arc<ArrayQueue<String>>,
dora_cli_path: &Path,
) -> eyre::Result<RunningNode> {
let node_id = node.id.clone();
tracing::debug!("Spawning node `{dataflow_id}/{node_id}`");
@@ -223,9 +225,7 @@ pub async fn spawn_node(
command
}
} else if python_operators.is_empty() && other_operators {
let mut cmd = tokio::process::Command::new(
std::env::current_exe().wrap_err("failed to get current executable path")?,
);
let mut cmd = tokio::process::Command::new(dora_cli_path);
cmd.arg("runtime");
cmd
} else {


+ 2
- 1
libraries/core/src/topics.rs View File

@@ -93,7 +93,7 @@ pub enum ControlRequestReply {
CoordinatorStopped,
DataflowStarted { uuid: Uuid },
DataflowReloaded { uuid: Uuid },
DataflowStopped { uuid: Uuid, result: DataflowResult },
DataflowStopped { result: DataflowResult },
DataflowList(DataflowList),
DestroyOk,
DaemonConnected(bool),
@@ -118,6 +118,7 @@ impl Display for DataflowId {
}

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
#[must_use]
pub struct DataflowResult {
pub uuid: Uuid,
pub timestamp: uhlc::Timestamp,


Loading…
Cancel
Save