Browse Source

Implement `dora run` command (#703)

Runs a dataflow locally, without requiring any any daemon or coordinator
processes. Multi-machine dataflows are not supported. The default log
level is set to `INFO`, overwriting it is possible by setting the
`RUST_LOG` environment variable.

This exposes the internal `dora daemon --run-dataflow` command that we
use for testing.

This addition was proposed in
https://github.com/orgs/dora-rs/discussions/698#discussioncomment-11125465
.

The second commit adds a ctrl-c handler. On first ctrl-c, we send a stop
command to all nodes. On second ctrl-c, we exit immediately and kill all
spawned nodes. On third ctrl-c, we abort the process directly without
waiting (child processes keep running).
tags/v0.3.7rc2
Philipp Oppermann GitHub 1 year ago
parent
commit
70ca30dd2e
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
4 changed files with 146 additions and 40 deletions
  1. +36
    -3
      binaries/cli/src/lib.rs
  2. +80
    -27
      binaries/daemon/src/lib.rs
  3. +13
    -3
      binaries/daemon/src/spawn.rs
  4. +17
    -7
      libraries/extensions/telemetry/tracing/src/lib.rs

+ 36
- 3
binaries/cli/src/lib.rs View File

@@ -17,7 +17,7 @@ use dora_message::{
};
#[cfg(feature = "tracing")]
use dora_tracing::set_up_tracing;
use dora_tracing::set_up_tracing_opts;
use dora_tracing::{set_up_tracing_opts, FileLogging};
use duration_str::parse;
use eyre::{bail, Context};
use formatting::FormatDataflowError;
@@ -29,6 +29,7 @@ use std::{
};
use tabwriter::TabWriter;
use tokio::runtime::Builder;
use tracing::level_filters::LevelFilter;
use uuid::Uuid;

mod attach;
@@ -90,6 +91,15 @@ enum Command {
#[clap(hide = true, long)]
internal_create_with_path_dependencies: bool,
},
/// Run a dataflow locally.
///
/// Directly runs the given dataflow without connecting to a dora
/// coordinator or daemon. The dataflow is executed on the local machine.
Run {
/// Path to the dataflow descriptor file
#[clap(value_name = "PATH")]
dataflow: String,
},
/// Spawn coordinator and daemon in local mode (with default config)
Up {
/// Use a custom configuration
@@ -274,7 +284,12 @@ fn run(args: Args) -> eyre::Result<()> {
.as_ref()
.map(|id| format!("{name}-{id}"))
.unwrap_or(name.to_string());
set_up_tracing_opts(name, !quiet, Some(&filename))
let stdout = (!quiet).then_some(LevelFilter::WARN);
let file = Some(FileLogging {
file_name: filename,
filter: LevelFilter::INFO,
});
set_up_tracing_opts(name, stdout, file)
.context("failed to set up tracing subscriber")?;
}
Command::Runtime => {
@@ -282,7 +297,16 @@ fn run(args: Args) -> eyre::Result<()> {
}
Command::Coordinator { quiet, .. } => {
let name = "dora-coordinator";
set_up_tracing_opts(name, !quiet, Some(name))
let stdout = (!quiet).then_some(LevelFilter::WARN);
let file = Some(FileLogging {
file_name: name.to_owned(),
filter: LevelFilter::INFO,
});
set_up_tracing_opts(name, stdout, file)
.context("failed to set up tracing subscriber")?;
}
Command::Run { .. } => {
set_up_tracing_opts("run", Some(LevelFilter::INFO), None)
.context("failed to set up tracing subscriber")?;
}
_ => {
@@ -328,6 +352,15 @@ fn run(args: Args) -> eyre::Result<()> {
args,
internal_create_with_path_dependencies,
} => template::create(args, internal_create_with_path_dependencies)?,
Command::Run { dataflow } => {
let dataflow_path = resolve_dataflow(dataflow).context("could not resolve dataflow")?;
let rt = Builder::new_multi_thread()
.enable_all()
.build()
.context("tokio runtime failed")?;
let result = rt.block_on(Daemon::run_dataflow(&dataflow_path))?;
handle_dataflow_result(result, None)?
}
Command::Up { config } => {
up::up(config.as_deref())?;
}


+ 80
- 27
binaries/daemon/src/lib.rs View File

@@ -180,6 +180,8 @@ impl Daemon {

let clock = Arc::new(HLC::default());

let ctrlc_events = set_up_ctrlc_handler(clock.clone())?;

let exit_when_done = spawn_command
.nodes
.iter()
@@ -196,8 +198,9 @@ impl Daemon {
timestamp,
}
});
let events = (coordinator_events, ctrlc_events).merge();
let run_result = Self::run_general(
Box::pin(coordinator_events),
Box::pin(events),
None,
"".to_string(),
Some(exit_when_done),
@@ -327,12 +330,17 @@ impl Daemon {
}
}
Event::CtrlC => {
tracing::info!("received ctrlc signal -> stopping all dataflows");
for dataflow in self.running.values_mut() {
dataflow
.stop_all(&mut self.coordinator_connection, &self.clock, None)
.await?;
}
}
Event::SecondCtrlC => {
tracing::warn!("received second ctrlc signal -> exit immediately");
bail!("received second ctrl-c signal");
}
}
}

@@ -1088,7 +1096,9 @@ impl Daemon {
)
.await?;

dataflow.running_nodes.remove(node_id);
if let Some(mut pid) = dataflow.running_nodes.remove(node_id).and_then(|n| n.pid) {
pid.mark_as_stopped()
}
if dataflow
.running_nodes
.iter()
@@ -1472,12 +1482,51 @@ fn close_input(
}
}

#[derive(Debug, Clone)]
#[derive(Debug)]
struct RunningNode {
pid: Option<u32>,
pid: Option<ProcessId>,
node_config: NodeConfig,
}

#[derive(Debug)]
struct ProcessId(Option<u32>);

impl ProcessId {
pub fn new(process_id: u32) -> Self {
Self(Some(process_id))
}

pub fn mark_as_stopped(&mut self) {
self.0 = None;
}

pub fn kill(&mut self) -> bool {
if let Some(pid) = self.0 {
let mut system = sysinfo::System::new();
system.refresh_processes();

if let Some(process) = system.process(Pid::from(pid as usize)) {
process.kill();
self.mark_as_stopped();
return true;
}
}

false
}
}

impl Drop for ProcessId {
fn drop(&mut self) {
// kill the process if it's still running
if let Some(pid) = self.0 {
if self.kill() {
warn!("process {pid} was killed on drop because it was still running")
}
}
}
}

pub struct RunningDataflow {
id: Uuid,
/// Local nodes that are not started yet
@@ -1613,19 +1662,20 @@ impl RunningDataflow {
let _ = send_with_timestamp(&channel, NodeEvent::Stop, clock);
}

let running_nodes = self.running_nodes.clone();
let running_processes: Vec<_> = self
.running_nodes
.iter_mut()
.map(|(id, n)| (id.clone(), n.pid.take()))
.collect();
let grace_duration_kills = self.grace_duration_kills.clone();
tokio::spawn(async move {
let duration = grace_duration.unwrap_or(Duration::from_millis(15000));
tokio::time::sleep(duration).await;
let mut system = sysinfo::System::new();
system.refresh_processes();

for (node, node_details) in running_nodes.iter() {
if let Some(pid) = node_details.pid {
if let Some(process) = system.process(Pid::from(pid as usize)) {
for (node, pid) in running_processes {
if let Some(mut pid) = pid {
if pid.kill() {
grace_duration_kills.insert(node.clone());
process.kill();
warn!(
"{node} was killed due to not stopping within the {:#?} grace period",
duration
@@ -1712,6 +1762,7 @@ pub enum Event {
DynamicNode(DynamicNodeEventWrapper),
HeartbeatInterval,
CtrlC,
SecondCtrlC,
}

impl From<DoraEvent> for Event {
@@ -1792,25 +1843,27 @@ fn set_up_ctrlc_handler(
) -> Result<impl Stream<Item = Timestamped<Event>>, eyre::ErrReport> {
let (ctrlc_tx, ctrlc_rx) = mpsc::channel(1);

let mut ctrlc_sent = false;
let mut ctrlc_sent = 0;
ctrlc::set_handler(move || {
if ctrlc_sent {
tracing::warn!("received second ctrlc signal -> aborting immediately");
std::process::abort();
} else {
tracing::info!("received ctrlc signal");
if ctrlc_tx
.blocking_send(Timestamped {
inner: Event::CtrlC,
timestamp: clock.new_timestamp(),
})
.is_err()
{
tracing::error!("failed to report ctrl-c event to dora-coordinator");
let event = match ctrlc_sent {
0 => Event::CtrlC,
1 => Event::SecondCtrlC,
_ => {
tracing::warn!("received 3rd ctrlc signal -> aborting immediately");
std::process::abort();
}

ctrlc_sent = true;
};
if ctrlc_tx
.blocking_send(Timestamped {
inner: event,
timestamp: clock.new_timestamp(),
})
.is_err()
{
tracing::error!("failed to report ctrl-c event to dora-coordinator");
}

ctrlc_sent += 1;
})
.wrap_err("failed to set ctrl-c handler")?;



+ 13
- 3
binaries/daemon/src/spawn.rs View File

@@ -153,6 +153,11 @@ pub async fn spawn_node(
command.env(key, value.to_string());
}
}

// Set the process group to 0 to ensure that the spawned process does not exit immediately on CTRL-C
#[cfg(unix)]
command.process_group(0);

command
.stdin(Stdio::null())
.stdout(Stdio::piped())
@@ -249,6 +254,9 @@ pub async fn spawn_node(
command.env(key, value.to_string());
}
}
// Set the process group to 0 to ensure that the spawned process does not exit immediately on CTRL-C
#[cfg(unix)]
command.process_group(0);

command
.stdin(Stdio::null())
@@ -262,6 +270,11 @@ pub async fn spawn_node(
}
};

let pid = crate::ProcessId::new(child.id().context(
"Could not get the pid for the just spawned node and indicate that there is an error",
)?);
tracing::debug!("Spawned node `{dataflow_id}/{node_id}` with pid {pid:?}");

let dataflow_dir: PathBuf = working_dir.join("out").join(dataflow_id.to_string());
if !dataflow_dir.exists() {
std::fs::create_dir_all(&dataflow_dir).context("could not create dataflow_dir")?;
@@ -272,9 +285,6 @@ pub async fn spawn_node(
.expect("Failed to create log file");
let mut child_stdout =
tokio::io::BufReader::new(child.stdout.take().expect("failed to take stdout"));
let pid = child.id().context(
"Could not get the pid for the just spawned node and indicate that there is an error",
)?;
let running_node = RunningNode {
pid: Some(pid),
node_config,


+ 17
- 7
libraries/extensions/telemetry/tracing/src/lib.rs View File

@@ -16,25 +16,35 @@ use tracing_subscriber::Registry;
pub mod telemetry;

pub fn set_up_tracing(name: &str) -> eyre::Result<()> {
set_up_tracing_opts(name, true, None)
set_up_tracing_opts(name, Some(LevelFilter::WARN), None)
}

pub fn set_up_tracing_opts(name: &str, stdout: bool, filename: Option<&str>) -> eyre::Result<()> {
pub struct FileLogging {
pub file_name: String,
pub filter: LevelFilter,
}

pub fn set_up_tracing_opts(
name: &str,
stdout: Option<LevelFilter>,
file: Option<FileLogging>,
) -> eyre::Result<()> {
let mut layers = Vec::new();

if stdout {
if let Some(level) = stdout {
// Filter log using `RUST_LOG`. More useful for CLI.
let env_filter = EnvFilter::from_default_env().or(LevelFilter::WARN);
let env_filter = EnvFilter::from_default_env().or(level);
let layer = tracing_subscriber::fmt::layer()
.compact()
.with_filter(env_filter);
layers.push(layer.boxed());
}

if let Some(filename) = filename {
if let Some(file) = file {
let FileLogging { file_name, filter } = file;
let out_dir = Path::new("out");
std::fs::create_dir_all(out_dir).context("failed to create `out` directory")?;
let path = out_dir.join(filename).with_extension("txt");
let path = out_dir.join(file_name).with_extension("txt");
let file = std::fs::OpenOptions::new()
.create(true)
.append(true)
@@ -44,7 +54,7 @@ pub fn set_up_tracing_opts(name: &str, stdout: bool, filename: Option<&str>) ->
let layer = tracing_subscriber::fmt::layer()
.with_ansi(false)
.with_writer(file)
.with_filter(LevelFilter::INFO);
.with_filter(filter);
layers.push(layer.boxed());
}



Loading…
Cancel
Save