Browse Source

Add grace duration and kill process

tags/v0.3.4-rc1
haixuanTao 1 year ago
parent
commit
b9f67a2855
10 changed files with 164 additions and 59 deletions
  1. +31
    -4
      Cargo.lock
  2. +1
    -0
      binaries/cli/Cargo.toml
  3. +1
    -0
      binaries/cli/src/attach.rs
  4. +31
    -12
      binaries/cli/src/main.rs
  5. +46
    -21
      binaries/coordinator/src/lib.rs
  6. +1
    -0
      binaries/daemon/Cargo.toml
  7. +45
    -19
      binaries/daemon/src/lib.rs
  8. +3
    -3
      binaries/daemon/src/spawn.rs
  9. +2
    -0
      libraries/core/src/daemon_messages.rs
  10. +3
    -0
      libraries/core/src/topics.rs

+ 31
- 4
Cargo.lock View File

@@ -2227,7 +2227,7 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "330c60081dcc4c72131f8eb70510f1ac07223e5d4163db481a04a0befcffa412"
dependencies = [
"libloading 0.7.4",
"libloading 0.8.3",
]

[[package]]
@@ -2262,6 +2262,7 @@ dependencies = [
"dora-operator-api-c",
"dora-runtime",
"dora-tracing",
"duration-str",
"eyre",
"futures",
"inquire",
@@ -2332,6 +2333,7 @@ dependencies = [
"serde_json",
"serde_yaml 0.8.26",
"shared-memory-server",
"sysinfo 0.30.11",
"tokio",
"tokio-stream",
"tracing",
@@ -2638,6 +2640,20 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56ce8c6da7551ec6c462cbaf3bfbc75131ebbfa1c944aeaa9dab51ca1c5f0c3b"

[[package]]
name = "duration-str"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9f037c488d179e21c87ef5fa9c331e8e62f5dddfa84618b41bb197da03edff1"
dependencies = [
"chrono",
"nom",
"rust_decimal",
"serde",
"thiserror",
"time",
]

[[package]]
name = "dyn-clone"
version = "1.0.14"
@@ -3737,7 +3753,7 @@ dependencies = [
"bitflags 2.4.0",
"com",
"libc",
"libloading 0.7.4",
"libloading 0.8.3",
"thiserror",
"widestring",
"winapi 0.3.9",
@@ -6200,7 +6216,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19de2de2a00075bf566bee3bd4db014b11587e84184d3f7a791bc17f1a8e9e48"
dependencies = [
"anyhow",
"itertools 0.11.0",
"itertools 0.12.1",
"proc-macro2",
"quote",
"syn 2.0.48",
@@ -7898,6 +7914,16 @@ dependencies = [
"tokio",
]

[[package]]
name = "rust_decimal"
version = "1.35.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1790d1c4c0ca81211399e0e0af16333276f375209e71a37b67698a373db5b47a"
dependencies = [
"arrayvec",
"num-traits",
]

[[package]]
name = "rustc-demangle"
version = "0.1.23"
@@ -8839,6 +8865,7 @@ dependencies = [
"libc",
"ntapi",
"once_cell",
"rayon",
"windows 0.52.0",
]

@@ -10015,7 +10042,7 @@ dependencies = [
"js-sys",
"khronos-egl",
"libc",
"libloading 0.7.4",
"libloading 0.8.3",
"log",
"metal",
"naga",


+ 1
- 0
binaries/cli/Cargo.toml View File

@@ -41,3 +41,4 @@ dora-runtime = { workspace = true }
tokio = { version = "1.20.1", features = ["full"] }
tokio-stream = { version = "0.1.8", features = ["io-util", "net"] }
futures = "0.3.21"
duration-str = "0.5"

+ 1
- 0
binaries/cli/src/attach.rs View File

@@ -105,6 +105,7 @@ pub fn attach_dataflow(
if ctrlc_tx
.send(ControlRequest::Stop {
dataflow_uuid: dataflow_id,
grace_duration: None,
})
.is_err()
{


+ 31
- 12
binaries/cli/src/main.rs View File

@@ -1,8 +1,3 @@
use std::{
net::{IpAddr, Ipv4Addr},
path::PathBuf,
};

use attach::attach_dataflow;
use clap::Parser;
use communication_layer_request_reply::{RequestReplyLayer, TcpLayer, TcpRequestReplyConnection};
@@ -17,8 +12,14 @@ use dora_core::{
use dora_daemon::Daemon;
#[cfg(feature = "tracing")]
use dora_tracing::set_up_tracing;
use duration_str::parse;
use eyre::{bail, Context};
use std::net::SocketAddr;
use std::{
net::{IpAddr, Ipv4Addr},
path::PathBuf,
time::Duration,
};
use tokio::runtime::Builder;
use uuid::Uuid;

@@ -87,6 +88,8 @@ enum Command {
uuid: Option<Uuid>,
#[clap(long)]
name: Option<String>,
#[arg(value_parser = parse)]
grace_duration: Option<Duration>,
},
/// List running dataflows.
List,
@@ -269,13 +272,17 @@ fn run() -> eyre::Result<()> {
bail!("No dora coordinator seems to be running.");
}
},
Command::Stop { uuid, name } => {
Command::Stop {
uuid,
name,
grace_duration,
} => {
let mut session =
connect_to_coordinator().wrap_err("could not connect to dora coordinator")?;
match (uuid, name) {
(Some(uuid), _) => stop_dataflow(uuid, &mut *session)?,
(None, Some(name)) => stop_dataflow_by_name(name, &mut *session)?,
(None, None) => stop_dataflow_interactive(&mut *session)?,
(Some(uuid), _) => stop_dataflow(uuid, grace_duration, &mut *session)?,
(None, Some(name)) => stop_dataflow_by_name(name, grace_duration, &mut *session)?,
(None, None) => stop_dataflow_interactive(grace_duration, &mut *session)?,
}
}
Command::Destroy { config } => up::destroy(config.as_deref())?,
@@ -361,13 +368,16 @@ fn start_dataflow(
}
}

fn stop_dataflow_interactive(session: &mut TcpRequestReplyConnection) -> eyre::Result<()> {
fn stop_dataflow_interactive(
grace_duration: Option<Duration>,
session: &mut TcpRequestReplyConnection,
) -> eyre::Result<()> {
let uuids = query_running_dataflows(session).wrap_err("failed to query running dataflows")?;
if uuids.is_empty() {
eprintln!("No dataflows are running");
} else {
let selection = inquire::Select::new("Choose dataflow to stop:", uuids).prompt()?;
stop_dataflow(selection.uuid, session)?;
stop_dataflow(selection.uuid, grace_duration, session)?;
}

Ok(())
@@ -375,12 +385,14 @@ fn stop_dataflow_interactive(session: &mut TcpRequestReplyConnection) -> eyre::R

fn stop_dataflow(
uuid: Uuid,
grace_duration: Option<Duration>,
session: &mut TcpRequestReplyConnection,
) -> Result<(), eyre::ErrReport> {
let reply_raw = session
.request(
&serde_json::to_vec(&ControlRequest::Stop {
dataflow_uuid: uuid,
grace_duration,
})
.unwrap(),
)
@@ -398,10 +410,17 @@ fn stop_dataflow(

fn stop_dataflow_by_name(
name: String,
grace_duration: Option<Duration>,
session: &mut TcpRequestReplyConnection,
) -> Result<(), eyre::ErrReport> {
let reply_raw = session
.request(&serde_json::to_vec(&ControlRequest::StopByName { name }).unwrap())
.request(
&serde_json::to_vec(&ControlRequest::StopByName {
name,
grace_duration,
})
.unwrap(),
)
.wrap_err("failed to send dataflow stop_by_name message")?;
let result: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;


+ 46
- 21
binaries/coordinator/src/lib.rs View File

@@ -388,7 +388,10 @@ async fn start_inner(
});
let _ = reply_sender.send(reply);
}
ControlRequest::Stop { dataflow_uuid } => {
ControlRequest::Stop {
dataflow_uuid,
grace_duration,
} => {
stop_dataflow_by_uuid(
&mut running_dataflows,
&dataflow_results,
@@ -396,27 +399,30 @@ async fn start_inner(
&mut daemon_connections,
reply_sender,
clock.new_timestamp(),
grace_duration,
)
.await?;
}
ControlRequest::StopByName { name } => {
match resolve_name(name, &running_dataflows, &archived_dataflows) {
Ok(uuid) => {
stop_dataflow_by_uuid(
&mut running_dataflows,
&dataflow_results,
uuid,
&mut daemon_connections,
reply_sender,
clock.new_timestamp(),
)
.await?
}
Err(err) => {
let _ = reply_sender.send(Err(err));
}
ControlRequest::StopByName {
name,
grace_duration,
} => match resolve_name(name, &running_dataflows, &archived_dataflows) {
Ok(uuid) => {
stop_dataflow_by_uuid(
&mut running_dataflows,
&dataflow_results,
uuid,
&mut daemon_connections,
reply_sender,
clock.new_timestamp(),
grace_duration,
)
.await?
}
}
Err(err) => {
let _ = reply_sender.send(Err(err));
}
},
ControlRequest::Logs { uuid, name, node } => {
let dataflow_uuid = if let Some(uuid) = uuid {
uuid
@@ -548,6 +554,7 @@ async fn stop_dataflow_by_uuid(
daemon_connections: &mut HashMap<String, DaemonConnection>,
reply_sender: tokio::sync::oneshot::Sender<Result<ControlRequestReply, eyre::ErrReport>>,
timestamp: uhlc::Timestamp,
grace_duration: Option<Duration>,
) -> Result<(), eyre::ErrReport> {
let Some(dataflow) = running_dataflows.get_mut(&dataflow_uuid) else {
if let Some(result) = dataflow_results.get(&dataflow_uuid) {
@@ -561,7 +568,14 @@ async fn stop_dataflow_by_uuid(
bail!("no known dataflow found with UUID `{dataflow_uuid}`")
};
let stop = async {
stop_dataflow(dataflow, dataflow_uuid, daemon_connections, timestamp).await?;
stop_dataflow(
dataflow,
dataflow_uuid,
daemon_connections,
timestamp,
grace_duration,
)
.await?;
Result::<_, eyre::Report>::Ok(())
};
match stop.await {
@@ -623,7 +637,14 @@ async fn handle_destroy(
) -> Result<(), eyre::ErrReport> {
abortable_events.abort();
for (&uuid, dataflow) in running_dataflows {
stop_dataflow(dataflow, uuid, daemon_connections, clock.new_timestamp()).await?;
stop_dataflow(
dataflow,
uuid,
daemon_connections,
clock.new_timestamp(),
None,
)
.await?;
}
destroy_daemons(daemon_connections, clock.new_timestamp()).await?;
*daemon_events_tx = None;
@@ -685,9 +706,13 @@ async fn stop_dataflow(
uuid: Uuid,
daemon_connections: &mut HashMap<String, DaemonConnection>,
timestamp: uhlc::Timestamp,
grace_duration: Option<Duration>,
) -> eyre::Result<()> {
let message = serde_json::to_vec(&Timestamped {
inner: DaemonCoordinatorEvent::StopDataflow { dataflow_id: uuid },
inner: DaemonCoordinatorEvent::StopDataflow {
dataflow_id: uuid,
grace_duration,
},
timestamp,
})?;



+ 1
- 0
binaries/daemon/Cargo.toml View File

@@ -38,3 +38,4 @@ async-trait = "0.1.64"
aligned-vec = "0.5.0"
ctrlc = "3.2.5"
which = "5.0.0"
sysinfo = "0.30.11"

+ 45
- 19
binaries/daemon/src/lib.rs View File

@@ -31,6 +31,7 @@ use std::{
path::{Path, PathBuf},
time::Duration,
};
use sysinfo::Pid;
use tcp_utils::tcp_send;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
@@ -39,7 +40,7 @@ use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::oneshot::Sender;
use tokio::sync::{mpsc, oneshot};
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
use tracing::error;
use tracing::{error, warn};
use uuid::{NoContext, Timestamp, Uuid};

mod coordinator;
@@ -295,7 +296,7 @@ impl Daemon {
}
Event::CtrlC => {
for dataflow in self.running.values_mut() {
dataflow.stop_all(&self.clock).await;
dataflow.stop_all(&self.clock, None).await;
}
}
}
@@ -428,18 +429,17 @@ impl Daemon {
.map_err(|_| error!("could not send reload reply from daemon to coordinator"));
RunStatus::Continue
}
DaemonCoordinatorEvent::StopDataflow { dataflow_id } => {
let stop = async {
let dataflow = self
.running
.get_mut(&dataflow_id)
.wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"))?;
dataflow.stop_all(&self.clock).await;
Result::<(), eyre::Report>::Ok(())
};
let reply = DaemonCoordinatorReply::StopResult(
stop.await.map_err(|err| format!("{err:?}")),
);
DaemonCoordinatorEvent::StopDataflow {
dataflow_id,
grace_duration,
} => {
let dataflow = self
.running
.get_mut(&dataflow_id)
.wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"))?;
// .stop_all(&self.clock.clone(), grace_duration);
dataflow.stop_all(&self.clock, grace_duration).await;
let reply = DaemonCoordinatorReply::StopResult(Ok(()));
let _ = reply_tx
.send(Some(reply))
.map_err(|_| error!("could not send stop reply from daemon to coordinator"));
@@ -597,8 +597,10 @@ impl Daemon {
.await
.wrap_err_with(|| format!("failed to spawn node `{node_id}`"))
{
Ok(()) => {
dataflow.running_nodes.insert(node_id);
Ok(pid) => {
dataflow
.running_nodes
.insert(node_id.clone(), RunningNode { pid });
}
Err(err) => {
tracing::error!("{err:?}");
@@ -1324,6 +1326,11 @@ fn close_input(
}
}

#[derive(Debug, Clone)]
struct RunningNode {
pid: usize,
}

pub struct RunningDataflow {
id: Uuid,
/// Local nodes that are not started yet
@@ -1334,7 +1341,7 @@ pub struct RunningDataflow {
mappings: HashMap<OutputId, BTreeSet<InputId>>,
timers: BTreeMap<Duration, BTreeSet<InputId>>,
open_inputs: BTreeMap<NodeId, BTreeSet<DataId>>,
running_nodes: BTreeSet<NodeId>,
running_nodes: BTreeMap<NodeId, RunningNode>,

open_external_mappings: HashMap<OutputId, BTreeMap<String, BTreeSet<InputId>>>,

@@ -1360,7 +1367,7 @@ impl RunningDataflow {
mappings: HashMap::new(),
timers: BTreeMap::new(),
open_inputs: BTreeMap::new(),
running_nodes: BTreeSet::new(),
running_nodes: BTreeMap::new(),
open_external_mappings: HashMap::new(),
pending_drop_tokens: HashMap::new(),
_timer_handles: Vec::new(),
@@ -1422,10 +1429,29 @@ impl RunningDataflow {
Ok(())
}

async fn stop_all(&mut self, clock: &HLC) {
async fn stop_all(&mut self, clock: &HLC, grace_duration: Option<Duration>) {
for (_node_id, channel) in self.subscribe_channels.drain() {
let _ = send_with_timestamp(&channel, daemon_messages::NodeEvent::Stop, clock);
}

let running_nodes = self.running_nodes.clone();
tokio::spawn(async move {
let duration = grace_duration.unwrap_or(Duration::from_millis(500));
tokio::time::sleep(duration).await;
let mut system = sysinfo::System::new();
system.refresh_processes();

for (node, node_details) in running_nodes.iter() {
if let Some(process) = system.process(Pid::from(node_details.pid.clone())) {
process.kill();
warn!(
"{node} was killed due to not stopping within the {:#?} grace period",
duration
)
}
}
});

self.stop_sent = true;
}



+ 3
- 3
binaries/daemon/src/spawn.rs View File

@@ -42,7 +42,7 @@ pub async fn spawn_node(
daemon_tx: mpsc::Sender<Timestamped<Event>>,
dataflow_descriptor: Descriptor,
clock: Arc<HLC>,
) -> eyre::Result<()> {
) -> eyre::Result<usize> {
let node_id = node.id.clone();
tracing::debug!("Spawning node `{dataflow_id}/{node_id}`");

@@ -264,7 +264,7 @@ pub async fn spawn_node(
.expect("Failed to create log file");
let mut child_stdout =
tokio::io::BufReader::new(child.stdout.take().expect("failed to take stdout"));
let pid = child.id().unwrap() as usize; // Todo: Manage failure
let stdout_tx = tx.clone();

// Stdout listener stream
@@ -422,5 +422,5 @@ pub async fn spawn_node(
.send(())
.map_err(|_| error!("Could not inform that log file thread finished"));
});
Ok(())
Ok(pid)
}

+ 2
- 0
libraries/core/src/daemon_messages.rs View File

@@ -3,6 +3,7 @@ use std::{
fmt,
net::SocketAddr,
path::PathBuf,
time::Duration,
};

use crate::{
@@ -213,6 +214,7 @@ pub enum DaemonCoordinatorEvent {
},
StopDataflow {
dataflow_id: DataflowId,
grace_duration: Option<Duration>,
},
ReloadDataflow {
dataflow_id: DataflowId,


+ 3
- 0
libraries/core/src/topics.rs View File

@@ -3,6 +3,7 @@ use std::{
fmt::Display,
net::{Ipv4Addr, SocketAddr},
path::PathBuf,
time::Duration,
};
use uuid::Uuid;

@@ -38,9 +39,11 @@ pub enum ControlRequest {
},
Stop {
dataflow_uuid: Uuid,
grace_duration: Option<Duration>,
},
StopByName {
name: String,
grace_duration: Option<Duration>,
},
Logs {
uuid: Option<Uuid>,


Loading…
Cancel
Save