diff --git a/Cargo.lock b/Cargo.lock index 1e713927..918c83fe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2227,7 +2227,7 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "330c60081dcc4c72131f8eb70510f1ac07223e5d4163db481a04a0befcffa412" dependencies = [ - "libloading 0.7.4", + "libloading 0.8.3", ] [[package]] @@ -2262,6 +2262,7 @@ dependencies = [ "dora-operator-api-c", "dora-runtime", "dora-tracing", + "duration-str", "eyre", "futures", "inquire", @@ -2332,6 +2333,7 @@ dependencies = [ "serde_json", "serde_yaml 0.8.26", "shared-memory-server", + "sysinfo 0.30.11", "tokio", "tokio-stream", "tracing", @@ -2638,6 +2640,20 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56ce8c6da7551ec6c462cbaf3bfbc75131ebbfa1c944aeaa9dab51ca1c5f0c3b" +[[package]] +name = "duration-str" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9f037c488d179e21c87ef5fa9c331e8e62f5dddfa84618b41bb197da03edff1" +dependencies = [ + "chrono", + "nom", + "rust_decimal", + "serde", + "thiserror", + "time", +] + [[package]] name = "dyn-clone" version = "1.0.14" @@ -3737,7 +3753,7 @@ dependencies = [ "bitflags 2.4.0", "com", "libc", - "libloading 0.7.4", + "libloading 0.8.3", "thiserror", "widestring", "winapi 0.3.9", @@ -6200,7 +6216,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19de2de2a00075bf566bee3bd4db014b11587e84184d3f7a791bc17f1a8e9e48" dependencies = [ "anyhow", - "itertools 0.11.0", + "itertools 0.12.1", "proc-macro2", "quote", "syn 2.0.48", @@ -7898,6 +7914,16 @@ dependencies = [ "tokio", ] +[[package]] +name = "rust_decimal" +version = "1.35.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1790d1c4c0ca81211399e0e0af16333276f375209e71a37b67698a373db5b47a" +dependencies = [ + "arrayvec", + "num-traits", +] + [[package]] name = "rustc-demangle" version = "0.1.23" @@ -8839,6 +8865,7 @@ dependencies = [ "libc", "ntapi", "once_cell", + "rayon", "windows 0.52.0", ] @@ -10015,7 +10042,7 @@ dependencies = [ "js-sys", "khronos-egl", "libc", - "libloading 0.7.4", + "libloading 0.8.3", "log", "metal", "naga", diff --git a/binaries/cli/Cargo.toml b/binaries/cli/Cargo.toml index 69ecbe9b..15ee0424 100644 --- a/binaries/cli/Cargo.toml +++ b/binaries/cli/Cargo.toml @@ -41,3 +41,4 @@ dora-runtime = { workspace = true } tokio = { version = "1.20.1", features = ["full"] } tokio-stream = { version = "0.1.8", features = ["io-util", "net"] } futures = "0.3.21" +duration-str = "0.5" diff --git a/binaries/cli/src/attach.rs b/binaries/cli/src/attach.rs index d4730643..0ee9e57b 100644 --- a/binaries/cli/src/attach.rs +++ b/binaries/cli/src/attach.rs @@ -105,6 +105,7 @@ pub fn attach_dataflow( if ctrlc_tx .send(ControlRequest::Stop { dataflow_uuid: dataflow_id, + grace_duration: None, }) .is_err() { diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 6cef5bce..e5c2e462 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -1,8 +1,3 @@ -use std::{ - net::{IpAddr, Ipv4Addr}, - path::PathBuf, -}; - use attach::attach_dataflow; use clap::Parser; use communication_layer_request_reply::{RequestReplyLayer, TcpLayer, TcpRequestReplyConnection}; @@ -17,8 +12,14 @@ use dora_core::{ use dora_daemon::Daemon; #[cfg(feature = "tracing")] use dora_tracing::set_up_tracing; +use duration_str::parse; use eyre::{bail, Context}; use std::net::SocketAddr; +use std::{ + net::{IpAddr, Ipv4Addr}, + path::PathBuf, + time::Duration, +}; use tokio::runtime::Builder; use uuid::Uuid; @@ -87,6 +88,8 @@ enum Command { uuid: Option, #[clap(long)] name: Option, + #[arg(value_parser = parse)] + grace_duration: Option, }, /// List running dataflows. List, @@ -269,13 +272,17 @@ fn run() -> eyre::Result<()> { bail!("No dora coordinator seems to be running."); } }, - Command::Stop { uuid, name } => { + Command::Stop { + uuid, + name, + grace_duration, + } => { let mut session = connect_to_coordinator().wrap_err("could not connect to dora coordinator")?; match (uuid, name) { - (Some(uuid), _) => stop_dataflow(uuid, &mut *session)?, - (None, Some(name)) => stop_dataflow_by_name(name, &mut *session)?, - (None, None) => stop_dataflow_interactive(&mut *session)?, + (Some(uuid), _) => stop_dataflow(uuid, grace_duration, &mut *session)?, + (None, Some(name)) => stop_dataflow_by_name(name, grace_duration, &mut *session)?, + (None, None) => stop_dataflow_interactive(grace_duration, &mut *session)?, } } Command::Destroy { config } => up::destroy(config.as_deref())?, @@ -361,13 +368,16 @@ fn start_dataflow( } } -fn stop_dataflow_interactive(session: &mut TcpRequestReplyConnection) -> eyre::Result<()> { +fn stop_dataflow_interactive( + grace_duration: Option, + session: &mut TcpRequestReplyConnection, +) -> eyre::Result<()> { let uuids = query_running_dataflows(session).wrap_err("failed to query running dataflows")?; if uuids.is_empty() { eprintln!("No dataflows are running"); } else { let selection = inquire::Select::new("Choose dataflow to stop:", uuids).prompt()?; - stop_dataflow(selection.uuid, session)?; + stop_dataflow(selection.uuid, grace_duration, session)?; } Ok(()) @@ -375,12 +385,14 @@ fn stop_dataflow_interactive(session: &mut TcpRequestReplyConnection) -> eyre::R fn stop_dataflow( uuid: Uuid, + grace_duration: Option, session: &mut TcpRequestReplyConnection, ) -> Result<(), eyre::ErrReport> { let reply_raw = session .request( &serde_json::to_vec(&ControlRequest::Stop { dataflow_uuid: uuid, + grace_duration, }) .unwrap(), ) @@ -398,10 +410,17 @@ fn stop_dataflow( fn stop_dataflow_by_name( name: String, + grace_duration: Option, session: &mut TcpRequestReplyConnection, ) -> Result<(), eyre::ErrReport> { let reply_raw = session - .request(&serde_json::to_vec(&ControlRequest::StopByName { name }).unwrap()) + .request( + &serde_json::to_vec(&ControlRequest::StopByName { + name, + grace_duration, + }) + .unwrap(), + ) .wrap_err("failed to send dataflow stop_by_name message")?; let result: ControlRequestReply = serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 8607a434..d4a68cdc 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -388,7 +388,10 @@ async fn start_inner( }); let _ = reply_sender.send(reply); } - ControlRequest::Stop { dataflow_uuid } => { + ControlRequest::Stop { + dataflow_uuid, + grace_duration, + } => { stop_dataflow_by_uuid( &mut running_dataflows, &dataflow_results, @@ -396,27 +399,30 @@ async fn start_inner( &mut daemon_connections, reply_sender, clock.new_timestamp(), + grace_duration, ) .await?; } - ControlRequest::StopByName { name } => { - match resolve_name(name, &running_dataflows, &archived_dataflows) { - Ok(uuid) => { - stop_dataflow_by_uuid( - &mut running_dataflows, - &dataflow_results, - uuid, - &mut daemon_connections, - reply_sender, - clock.new_timestamp(), - ) - .await? - } - Err(err) => { - let _ = reply_sender.send(Err(err)); - } + ControlRequest::StopByName { + name, + grace_duration, + } => match resolve_name(name, &running_dataflows, &archived_dataflows) { + Ok(uuid) => { + stop_dataflow_by_uuid( + &mut running_dataflows, + &dataflow_results, + uuid, + &mut daemon_connections, + reply_sender, + clock.new_timestamp(), + grace_duration, + ) + .await? } - } + Err(err) => { + let _ = reply_sender.send(Err(err)); + } + }, ControlRequest::Logs { uuid, name, node } => { let dataflow_uuid = if let Some(uuid) = uuid { uuid @@ -548,6 +554,7 @@ async fn stop_dataflow_by_uuid( daemon_connections: &mut HashMap, reply_sender: tokio::sync::oneshot::Sender>, timestamp: uhlc::Timestamp, + grace_duration: Option, ) -> Result<(), eyre::ErrReport> { let Some(dataflow) = running_dataflows.get_mut(&dataflow_uuid) else { if let Some(result) = dataflow_results.get(&dataflow_uuid) { @@ -561,7 +568,14 @@ async fn stop_dataflow_by_uuid( bail!("no known dataflow found with UUID `{dataflow_uuid}`") }; let stop = async { - stop_dataflow(dataflow, dataflow_uuid, daemon_connections, timestamp).await?; + stop_dataflow( + dataflow, + dataflow_uuid, + daemon_connections, + timestamp, + grace_duration, + ) + .await?; Result::<_, eyre::Report>::Ok(()) }; match stop.await { @@ -623,7 +637,14 @@ async fn handle_destroy( ) -> Result<(), eyre::ErrReport> { abortable_events.abort(); for (&uuid, dataflow) in running_dataflows { - stop_dataflow(dataflow, uuid, daemon_connections, clock.new_timestamp()).await?; + stop_dataflow( + dataflow, + uuid, + daemon_connections, + clock.new_timestamp(), + None, + ) + .await?; } destroy_daemons(daemon_connections, clock.new_timestamp()).await?; *daemon_events_tx = None; @@ -685,9 +706,13 @@ async fn stop_dataflow( uuid: Uuid, daemon_connections: &mut HashMap, timestamp: uhlc::Timestamp, + grace_duration: Option, ) -> eyre::Result<()> { let message = serde_json::to_vec(&Timestamped { - inner: DaemonCoordinatorEvent::StopDataflow { dataflow_id: uuid }, + inner: DaemonCoordinatorEvent::StopDataflow { + dataflow_id: uuid, + grace_duration, + }, timestamp, })?; diff --git a/binaries/daemon/Cargo.toml b/binaries/daemon/Cargo.toml index dbed43b6..a53607f9 100644 --- a/binaries/daemon/Cargo.toml +++ b/binaries/daemon/Cargo.toml @@ -38,3 +38,4 @@ async-trait = "0.1.64" aligned-vec = "0.5.0" ctrlc = "3.2.5" which = "5.0.0" +sysinfo = "0.30.11" diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 59addea1..51e4db31 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -31,6 +31,7 @@ use std::{ path::{Path, PathBuf}, time::Duration, }; +use sysinfo::Pid; use tcp_utils::tcp_send; use tokio::fs::File; use tokio::io::AsyncReadExt; @@ -39,7 +40,7 @@ use tokio::sync::mpsc::UnboundedSender; use tokio::sync::oneshot::Sender; use tokio::sync::{mpsc, oneshot}; use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; -use tracing::error; +use tracing::{error, warn}; use uuid::{NoContext, Timestamp, Uuid}; mod coordinator; @@ -295,7 +296,7 @@ impl Daemon { } Event::CtrlC => { for dataflow in self.running.values_mut() { - dataflow.stop_all(&self.clock).await; + dataflow.stop_all(&self.clock, None).await; } } } @@ -428,18 +429,17 @@ impl Daemon { .map_err(|_| error!("could not send reload reply from daemon to coordinator")); RunStatus::Continue } - DaemonCoordinatorEvent::StopDataflow { dataflow_id } => { - let stop = async { - let dataflow = self - .running - .get_mut(&dataflow_id) - .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"))?; - dataflow.stop_all(&self.clock).await; - Result::<(), eyre::Report>::Ok(()) - }; - let reply = DaemonCoordinatorReply::StopResult( - stop.await.map_err(|err| format!("{err:?}")), - ); + DaemonCoordinatorEvent::StopDataflow { + dataflow_id, + grace_duration, + } => { + let dataflow = self + .running + .get_mut(&dataflow_id) + .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"))?; + // .stop_all(&self.clock.clone(), grace_duration); + dataflow.stop_all(&self.clock, grace_duration).await; + let reply = DaemonCoordinatorReply::StopResult(Ok(())); let _ = reply_tx .send(Some(reply)) .map_err(|_| error!("could not send stop reply from daemon to coordinator")); @@ -597,8 +597,10 @@ impl Daemon { .await .wrap_err_with(|| format!("failed to spawn node `{node_id}`")) { - Ok(()) => { - dataflow.running_nodes.insert(node_id); + Ok(pid) => { + dataflow + .running_nodes + .insert(node_id.clone(), RunningNode { pid }); } Err(err) => { tracing::error!("{err:?}"); @@ -1324,6 +1326,11 @@ fn close_input( } } +#[derive(Debug, Clone)] +struct RunningNode { + pid: usize, +} + pub struct RunningDataflow { id: Uuid, /// Local nodes that are not started yet @@ -1334,7 +1341,7 @@ pub struct RunningDataflow { mappings: HashMap>, timers: BTreeMap>, open_inputs: BTreeMap>, - running_nodes: BTreeSet, + running_nodes: BTreeMap, open_external_mappings: HashMap>>, @@ -1360,7 +1367,7 @@ impl RunningDataflow { mappings: HashMap::new(), timers: BTreeMap::new(), open_inputs: BTreeMap::new(), - running_nodes: BTreeSet::new(), + running_nodes: BTreeMap::new(), open_external_mappings: HashMap::new(), pending_drop_tokens: HashMap::new(), _timer_handles: Vec::new(), @@ -1422,10 +1429,29 @@ impl RunningDataflow { Ok(()) } - async fn stop_all(&mut self, clock: &HLC) { + async fn stop_all(&mut self, clock: &HLC, grace_duration: Option) { for (_node_id, channel) in self.subscribe_channels.drain() { let _ = send_with_timestamp(&channel, daemon_messages::NodeEvent::Stop, clock); } + + let running_nodes = self.running_nodes.clone(); + tokio::spawn(async move { + let duration = grace_duration.unwrap_or(Duration::from_millis(500)); + tokio::time::sleep(duration).await; + let mut system = sysinfo::System::new(); + system.refresh_processes(); + + for (node, node_details) in running_nodes.iter() { + if let Some(process) = system.process(Pid::from(node_details.pid.clone())) { + process.kill(); + warn!( + "{node} was killed due to not stopping within the {:#?} grace period", + duration + ) + } + } + }); + self.stop_sent = true; } diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index 2da376f8..7e0eff15 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -42,7 +42,7 @@ pub async fn spawn_node( daemon_tx: mpsc::Sender>, dataflow_descriptor: Descriptor, clock: Arc, -) -> eyre::Result<()> { +) -> eyre::Result { let node_id = node.id.clone(); tracing::debug!("Spawning node `{dataflow_id}/{node_id}`"); @@ -264,7 +264,7 @@ pub async fn spawn_node( .expect("Failed to create log file"); let mut child_stdout = tokio::io::BufReader::new(child.stdout.take().expect("failed to take stdout")); - + let pid = child.id().unwrap() as usize; // Todo: Manage failure let stdout_tx = tx.clone(); // Stdout listener stream @@ -422,5 +422,5 @@ pub async fn spawn_node( .send(()) .map_err(|_| error!("Could not inform that log file thread finished")); }); - Ok(()) + Ok(pid) } diff --git a/libraries/core/src/daemon_messages.rs b/libraries/core/src/daemon_messages.rs index 8712a511..91c634cc 100644 --- a/libraries/core/src/daemon_messages.rs +++ b/libraries/core/src/daemon_messages.rs @@ -3,6 +3,7 @@ use std::{ fmt, net::SocketAddr, path::PathBuf, + time::Duration, }; use crate::{ @@ -213,6 +214,7 @@ pub enum DaemonCoordinatorEvent { }, StopDataflow { dataflow_id: DataflowId, + grace_duration: Option, }, ReloadDataflow { dataflow_id: DataflowId, diff --git a/libraries/core/src/topics.rs b/libraries/core/src/topics.rs index 658d59ee..48048864 100644 --- a/libraries/core/src/topics.rs +++ b/libraries/core/src/topics.rs @@ -3,6 +3,7 @@ use std::{ fmt::Display, net::{Ipv4Addr, SocketAddr}, path::PathBuf, + time::Duration, }; use uuid::Uuid; @@ -38,9 +39,11 @@ pub enum ControlRequest { }, Stop { dataflow_uuid: Uuid, + grace_duration: Option, }, StopByName { name: String, + grace_duration: Option, }, Logs { uuid: Option,