From 8f05ec79a1ad67c20a3c799538f0a41f2c2c0772 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Thu, 30 Mar 2023 17:46:58 +0800 Subject: [PATCH] Add attach logic and hot-reloading logic to the cli --- Cargo.lock | 5 +- .../src/daemon_connection/event_stream.rs | 7 +- apis/rust/node/src/event.rs | 2 +- binaries/cli/Cargo.toml | 8 + binaries/cli/src/attach.rs | 143 ++++++++++++++++++ binaries/cli/src/main.rs | 43 +++++- binaries/daemon/src/lib.rs | 12 +- binaries/runtime/src/lib.rs | 7 +- 8 files changed, 202 insertions(+), 25 deletions(-) create mode 100644 binaries/cli/src/attach.rs diff --git a/Cargo.lock b/Cargo.lock index 8da71e74..837f9216 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -822,15 +822,19 @@ dependencies = [ "atty", "clap 4.0.3", "communication-layer-request-reply", + "ctrlc", "dora-core", "dora-node-api-c", "dora-operator-api-c", + "dora-tracing", "eyre", "inquire", + "notify", "serde", "serde_json", "serde_yaml 0.9.19", "termcolor", + "tracing", "uuid", "webbrowser", ] @@ -889,7 +893,6 @@ dependencies = [ "flume", "futures", "futures-concurrency", - "notify", "serde", "serde_json", "serde_yaml 0.8.23", diff --git a/apis/rust/node/src/daemon_connection/event_stream.rs b/apis/rust/node/src/daemon_connection/event_stream.rs index d5bd10f5..7a47f924 100644 --- a/apis/rust/node/src/daemon_connection/event_stream.rs +++ b/apis/rust/node/src/daemon_connection/event_stream.rs @@ -171,12 +171,7 @@ impl EventStream { let event = match event { EventItem::NodeEvent { event, ack_channel } => match event { NodeEvent::Stop => Event::Stop, - NodeEvent::Reload { - operator_id: Some(operator_id), - } => Event::Reload { operator_id }, - NodeEvent::Reload { operator_id: None } => { - Event::Error(format!("Received reload event without operator id")) - } + NodeEvent::Reload { operator_id } => Event::Reload { operator_id }, NodeEvent::InputClosed { id } => Event::InputClosed { id }, NodeEvent::Input { id, metadata, data } => { let data = match data { diff --git a/apis/rust/node/src/event.rs b/apis/rust/node/src/event.rs index 11027f6c..090513ef 100644 --- a/apis/rust/node/src/event.rs +++ b/apis/rust/node/src/event.rs @@ -12,7 +12,7 @@ use shared_memory::{Shmem, ShmemConf}; pub enum Event<'a> { Stop, Reload { - operator_id: OperatorId, + operator_id: Option, }, Input { id: DataId, diff --git a/binaries/cli/Cargo.toml b/binaries/cli/Cargo.toml index d3e2e2d8..e7bab9c8 100644 --- a/binaries/cli/Cargo.toml +++ b/binaries/cli/Cargo.toml @@ -12,6 +12,10 @@ license.workspace = true name = "dora-cli" path = "src/main.rs" +[features] +default = ["tracing"] +tracing = ["dep:dora-tracing"] + [dependencies] clap = { version = "4.0.3", features = ["derive"] } eyre = "0.6.8" @@ -27,3 +31,7 @@ atty = "0.2.14" uuid = { version = "1.2.1", features = ["v4", "serde"] } inquire = "0.5.2" communication-layer-request-reply = { workspace = true } +notify = "5.1.0" +ctrlc = "3.2.5" +tracing = "0.1.36" +dora-tracing = { workspace = true, optional = true } diff --git a/binaries/cli/src/attach.rs b/binaries/cli/src/attach.rs new file mode 100644 index 00000000..5a45394f --- /dev/null +++ b/binaries/cli/src/attach.rs @@ -0,0 +1,143 @@ +use communication_layer_request_reply::TcpRequestReplyConnection; +use dora_core::{ + descriptor::{resolve_path, CoreNodeKind, Descriptor}, + topics::{ControlRequest, ControlRequestReply}, +}; +use eyre::Context; +use notify::event::ModifyKind; +use notify::{Config, Event as NotifyEvent, EventKind, RecommendedWatcher, RecursiveMode, Watcher}; +use std::collections::HashMap; +use std::{path::PathBuf, sync::mpsc, time::Duration}; +use tracing::{error, info}; +use uuid::Uuid; + +use crate::control_connection; + +pub fn attach_dataflow( + dataflow: Descriptor, + dataflow_path: PathBuf, + dataflow_id: Uuid, + session: &mut Option>, + hot_reload: bool, +) -> Result<(), eyre::ErrReport> { + let (tx, rx) = mpsc::sync_channel(2); + + // Generate path hashmap + let mut node_path_lookup = HashMap::new(); + + let nodes = dataflow.resolve_aliases(); + + let working_dir = dataflow_path + .canonicalize() + .context("failed to canoncialize dataflow path")? + .parent() + .ok_or_else(|| eyre::eyre!("canonicalized dataflow path has no parent"))? + .to_owned(); + + for node in nodes { + match node.kind { + CoreNodeKind::Custom(_cn) => (), // TODO: Reloading for custom node + CoreNodeKind::Runtime(rn) => { + for op in rn.operators.iter() { + match &op.config.source { + dora_core::descriptor::OperatorSource::Python(source) => { + let path = resolve_path(&source, &working_dir).wrap_err_with(|| { + format!("failed to resolve node source `{}`", source) + })?; + node_path_lookup + .insert(path, (dataflow_id, node.id.clone(), Some(op.id.clone()))); + } + _ => (), // TODO: Reloading for non-Python Operator + } + } + } + } + } + + // Setup dataflow file watcher if reload option is set. + let watcher_tx = tx.clone(); + let _watcher = if hot_reload { + let hash = node_path_lookup.clone(); + let paths = hash.keys(); + let notifier = move |event| match event { + Ok(NotifyEvent { + paths, + kind: EventKind::Modify(ModifyKind::Data(_data)), + .. + }) => { + for path in paths { + let (dataflow_id, node_id, operator_id) = + node_path_lookup.get(&path).unwrap().clone(); + watcher_tx + .send(ControlRequest::Reload { + dataflow_id, + node_id, + operator_id, + }) + .unwrap(); + } + } + _ => (), // TODO: Manage different file event + }; + + let mut watcher = RecommendedWatcher::new( + notifier, + Config::default().with_poll_interval(Duration::from_secs(1)), + )?; + + for path in paths { + watcher.watch(path, RecursiveMode::Recursive)?; + } + Some(watcher) + } else { + None + }; + + // Setup Ctrlc Watcher to stop dataflow after ctrlc + let ctrlc_tx = tx.clone(); + let mut ctrlc_sent = false; + ctrlc::set_handler(move || { + if ctrlc_sent { + std::process::abort(); + } else { + if ctrlc_tx + .send(ControlRequest::Stop { + dataflow_uuid: dataflow_id, + }) + .is_err() + { + // bail!("failed to report ctrl-c event to dora-daemon"); + } + ctrlc_sent = true; + } + }) + .wrap_err("failed to set ctrl-c handler")?; + + loop { + let control_request = match rx.recv_timeout(Duration::from_secs(1)) { + Err(_err) => ControlRequest::Check { + dataflow_uuid: dataflow_id, + }, + Ok(reload_event) => reload_event, + }; + + let reply_raw = control_connection(session)? + .request(&serde_json::to_vec(&control_request)?) + .wrap_err("failed to send request message to coordinator")?; + let result: ControlRequestReply = + serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; + match result { + ControlRequestReply::DataflowStarted { uuid } => info!("dataflow {uuid} started"), + ControlRequestReply::DataflowStopped { uuid } => { + info!("dataflow {uuid} stopped"); + break; + } + ControlRequestReply::DataflowReloaded { uuid } => { + info!("dataflow {uuid} reloaded") + } + other => error!("Received unexpected Coordinator Reply: {:#?}", other), + }; + } + + Ok(()) +} diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index d3cbbe11..7949738b 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -1,13 +1,18 @@ +use std::path::PathBuf; + +use attach::attach_dataflow; use clap::Parser; use communication_layer_request_reply::{RequestReplyLayer, TcpLayer, TcpRequestReplyConnection}; use dora_core::{ descriptor::Descriptor, topics::{control_socket_addr, ControlRequest, ControlRequestReply, DataflowId}, }; +#[cfg(feature = "tracing")] +use dora_tracing::set_up_tracing; use eyre::{bail, Context}; -use std::path::PathBuf; use uuid::Uuid; +mod attach; mod build; mod check; mod graph; @@ -67,6 +72,10 @@ enum Command { dataflow: PathBuf, #[clap(long)] name: Option, + #[clap(long, action)] + attach: bool, + #[clap(long, action)] + hot_reload: bool, }, /// Stop the given dataflow UUID. If no id is provided, you will be able to choose between the running dataflows. Stop { @@ -111,6 +120,8 @@ enum Lang { } fn main() -> eyre::Result<()> { + #[cfg(feature = "tracing")] + set_up_tracing().context("failed to set up tracing subscriber")?; let args = Args::parse(); let mut session = None; @@ -149,7 +160,29 @@ fn main() -> eyre::Result<()> { coordinator_path.as_deref(), daemon_path.as_deref(), )?, - Command::Start { dataflow, name } => start_dataflow(dataflow, name, &mut session)?, + Command::Start { + dataflow, + name, + attach, + hot_reload, + } => { + let dataflow_description = + Descriptor::blocking_read(&dataflow).wrap_err("Failed to read yaml dataflow")?; + dataflow_description + .check(&dataflow, None) + .wrap_err("Could not validate yaml")?; + let dataflow_id = start_dataflow(dataflow.clone(), name, &mut session)?; + + if attach { + attach_dataflow( + dataflow_description, + dataflow, + dataflow_id, + &mut session, + hot_reload, + )? + } + } Command::List => list(&mut session)?, Command::Stop { uuid, name } => match (uuid, name) { (Some(uuid), _) => stop_dataflow(uuid, &mut session)?, @@ -166,7 +199,7 @@ fn start_dataflow( dataflow: PathBuf, name: Option, session: &mut Option>, -) -> Result<(), eyre::ErrReport> { +) -> Result { let canonicalized = dataflow .canonicalize() .wrap_err("given dataflow file does not exist")?; @@ -184,8 +217,8 @@ fn start_dataflow( serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; match result { ControlRequestReply::DataflowStarted { uuid } => { - println!("{uuid}"); - Ok(()) + eprintln!("{uuid}"); + Ok(uuid) } ControlRequestReply::Error(err) => bail!("{err}"), other => bail!("unexpected start dataflow reply: {other:?}"), diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 5b8d5350..d7945427 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -27,10 +27,7 @@ use std::{ }; use tcp_utils::tcp_receive; use tokio::sync::mpsc::UnboundedSender; -use tokio::{ - fs, - sync::{mpsc, oneshot}, -}; +use tokio::sync::{mpsc, oneshot}; use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; use uuid::Uuid; @@ -1062,10 +1059,3 @@ enum RunStatus { Continue, Exit, } - -pub async fn read_descriptor(file: &Path) -> eyre::Result { - let descriptor_file = fs::read(file).await.wrap_err("failed to open given file")?; - let descriptor: Descriptor = - serde_yaml::from_slice(&descriptor_file).context("failed to parse given descriptor")?; - Ok(descriptor) -} diff --git a/binaries/runtime/src/lib.rs b/binaries/runtime/src/lib.rs index 40e407da..58e13912 100644 --- a/binaries/runtime/src/lib.rs +++ b/binaries/runtime/src/lib.rs @@ -140,7 +140,12 @@ async fn run( let daemon_events = Box::pin(futures::stream::unfold(daemon_events, |mut stream| async { let event = stream.recv_async().await.map(|event| match event { dora_node_api::Event::Stop => Event::Stop, - dora_node_api::Event::Reload { operator_id } => Event::Reload { operator_id }, + dora_node_api::Event::Reload { + operator_id: Some(operator_id), + } => Event::Reload { operator_id }, + dora_node_api::Event::Reload { operator_id: None } => Event::Error( + "Dora runtime node received reload event without operator id".to_string(), + ), dora_node_api::Event::Input { id, metadata, data } => Event::Input { id, metadata,