Add basic log forwarding from daemon to CLItags/v0.3.5
| @@ -1680,6 +1680,16 @@ version = "1.0.1" | |||
| source = "registry+https://github.com/rust-lang/crates.io-index" | |||
| checksum = "0b6a852b24ab71dffc585bcb46eaf7959d175cb865a7152e35b348d1b2960422" | |||
| [[package]] | |||
| name = "colored" | |||
| version = "2.1.0" | |||
| source = "registry+https://github.com/rust-lang/crates.io-index" | |||
| checksum = "cbf2150cce219b664a8a70df7a1f933836724b503f8a413af9365b4dcc4d90b8" | |||
| dependencies = [ | |||
| "lazy_static", | |||
| "windows-sys 0.48.0", | |||
| ] | |||
| [[package]] | |||
| name = "com" | |||
| version = "0.6.0" | |||
| @@ -2254,6 +2264,7 @@ version = "0.3.4" | |||
| dependencies = [ | |||
| "bat", | |||
| "clap 4.5.7", | |||
| "colored", | |||
| "communication-layer-request-reply", | |||
| "ctrlc", | |||
| "dora-coordinator", | |||
| @@ -2264,9 +2275,11 @@ dependencies = [ | |||
| "dora-runtime", | |||
| "dora-tracing", | |||
| "duration-str", | |||
| "env_logger 0.11.3", | |||
| "eyre", | |||
| "futures", | |||
| "inquire", | |||
| "log", | |||
| "notify 5.2.0", | |||
| "serde", | |||
| "serde_json", | |||
| @@ -2290,6 +2303,7 @@ dependencies = [ | |||
| "eyre", | |||
| "futures", | |||
| "futures-concurrency", | |||
| "log", | |||
| "names", | |||
| "serde_json", | |||
| "tokio", | |||
| @@ -2305,6 +2319,7 @@ dependencies = [ | |||
| "aligned-vec", | |||
| "dora-message", | |||
| "eyre", | |||
| "log", | |||
| "once_cell", | |||
| "schemars", | |||
| "serde", | |||
| @@ -2962,6 +2977,16 @@ dependencies = [ | |||
| "syn 2.0.68", | |||
| ] | |||
| [[package]] | |||
| name = "env_filter" | |||
| version = "0.1.0" | |||
| source = "registry+https://github.com/rust-lang/crates.io-index" | |||
| checksum = "a009aa4810eb158359dda09d0c87378e4bbb89b5a801f016885a4707ba24f7ea" | |||
| dependencies = [ | |||
| "log", | |||
| "regex", | |||
| ] | |||
| [[package]] | |||
| name = "env_logger" | |||
| version = "0.10.2" | |||
| @@ -2975,6 +3000,19 @@ dependencies = [ | |||
| "termcolor", | |||
| ] | |||
| [[package]] | |||
| name = "env_logger" | |||
| version = "0.11.3" | |||
| source = "registry+https://github.com/rust-lang/crates.io-index" | |||
| checksum = "38b35839ba51819680ba087cd351788c9a3c476841207e0b8cee0b04722343b9" | |||
| dependencies = [ | |||
| "anstream", | |||
| "anstyle", | |||
| "env_filter", | |||
| "humantime", | |||
| "log", | |||
| ] | |||
| [[package]] | |||
| name = "epaint" | |||
| version = "0.27.2" | |||
| @@ -4685,6 +4723,7 @@ version = "0.4.21" | |||
| source = "registry+https://github.com/rust-lang/crates.io-index" | |||
| checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" | |||
| dependencies = [ | |||
| "serde", | |||
| "value-bag", | |||
| ] | |||
| @@ -7029,7 +7068,7 @@ version = "0.15.1" | |||
| source = "registry+https://github.com/rust-lang/crates.io-index" | |||
| checksum = "35056426bb497a2bd30ec54f1aac35f32c6b0f2cd64fd833509ebb1e4be19f11" | |||
| dependencies = [ | |||
| "env_logger", | |||
| "env_logger 0.10.2", | |||
| "js-sys", | |||
| "log", | |||
| "log-once", | |||
| @@ -7963,7 +8002,7 @@ checksum = "6978682653fc699d484b92d0ee75e25b73fdf9b0e50d62b67707d6bcd7f4cc96" | |||
| dependencies = [ | |||
| "anyhow", | |||
| "document-features", | |||
| "env_logger", | |||
| "env_logger 0.10.2", | |||
| "itertools 0.12.1", | |||
| "log", | |||
| "puffin", | |||
| @@ -11079,7 +11118,7 @@ dependencies = [ | |||
| "async-std", | |||
| "async-trait", | |||
| "base64 0.13.1", | |||
| "env_logger", | |||
| "env_logger 0.10.2", | |||
| "event-listener 2.5.3", | |||
| "flume 0.10.14", | |||
| "form_urlencoded", | |||
| @@ -43,3 +43,6 @@ tokio-stream = { version = "0.1.8", features = ["io-util", "net"] } | |||
| futures = "0.3.21" | |||
| duration-str = "0.5" | |||
| tabwriter = "1.4.0" | |||
| log = { version = "0.4.21", features = ["serde"] } | |||
| colored = "2.1.0" | |||
| env_logger = "0.11.3" | |||
| @@ -1,12 +1,17 @@ | |||
| use communication_layer_request_reply::TcpRequestReplyConnection; | |||
| use colored::Colorize; | |||
| use communication_layer_request_reply::{TcpConnection, TcpRequestReplyConnection}; | |||
| use dora_core::{ | |||
| coordinator_messages::LogMessage, | |||
| descriptor::{resolve_path, CoreNodeKind, Descriptor}, | |||
| topics::{ControlRequest, ControlRequestReply}, | |||
| }; | |||
| use eyre::Context; | |||
| use notify::event::ModifyKind; | |||
| use notify::{Config, Event as NotifyEvent, EventKind, RecommendedWatcher, RecursiveMode, Watcher}; | |||
| use std::collections::HashMap; | |||
| use std::{ | |||
| collections::HashMap, | |||
| net::{SocketAddr, TcpStream}, | |||
| }; | |||
| use std::{path::PathBuf, sync::mpsc, time::Duration}; | |||
| use tracing::{error, info}; | |||
| use uuid::Uuid; | |||
| @@ -19,6 +24,8 @@ pub fn attach_dataflow( | |||
| dataflow_id: Uuid, | |||
| session: &mut TcpRequestReplyConnection, | |||
| hot_reload: bool, | |||
| coordinator_socket: SocketAddr, | |||
| log_level: log::LevelFilter, | |||
| ) -> Result<(), eyre::ErrReport> { | |||
| let (tx, rx) = mpsc::sync_channel(2); | |||
| @@ -71,11 +78,11 @@ pub fn attach_dataflow( | |||
| for path in paths { | |||
| if let Some((dataflow_id, node_id, operator_id)) = node_path_lookup.get(&path) { | |||
| watcher_tx | |||
| .send(ControlRequest::Reload { | |||
| .send(AttachEvent::Control(ControlRequest::Reload { | |||
| dataflow_id: *dataflow_id, | |||
| node_id: node_id.clone(), | |||
| operator_id: operator_id.clone(), | |||
| }) | |||
| })) | |||
| .context("Could not send reload request to the cli loop") | |||
| .unwrap(); | |||
| } | |||
| @@ -98,17 +105,17 @@ pub fn attach_dataflow( | |||
| }; | |||
| // Setup Ctrlc Watcher to stop dataflow after ctrlc | |||
| let ctrlc_tx = tx; | |||
| let ctrlc_tx = tx.clone(); | |||
| let mut ctrlc_sent = false; | |||
| ctrlc::set_handler(move || { | |||
| if ctrlc_sent { | |||
| std::process::abort(); | |||
| } else { | |||
| if ctrlc_tx | |||
| .send(ControlRequest::Stop { | |||
| .send(AttachEvent::Control(ControlRequest::Stop { | |||
| dataflow_uuid: dataflow_id, | |||
| grace_duration: None, | |||
| }) | |||
| })) | |||
| .is_err() | |||
| { | |||
| // bail!("failed to report ctrl-c event to dora-daemon"); | |||
| @@ -118,12 +125,69 @@ pub fn attach_dataflow( | |||
| }) | |||
| .wrap_err("failed to set ctrl-c handler")?; | |||
| // subscribe to log messages | |||
| let mut log_session = TcpConnection { | |||
| stream: TcpStream::connect(coordinator_socket) | |||
| .wrap_err("failed to connect to dora coordinator")?, | |||
| }; | |||
| log_session | |||
| .send( | |||
| &serde_json::to_vec(&ControlRequest::LogSubscribe { | |||
| dataflow_id, | |||
| level: log_level, | |||
| }) | |||
| .wrap_err("failed to serialize message")?, | |||
| ) | |||
| .wrap_err("failed to send log subscribe request to coordinator")?; | |||
| std::thread::spawn(move || { | |||
| while let Ok(raw) = log_session.receive() { | |||
| let parsed: eyre::Result<LogMessage> = | |||
| serde_json::from_slice(&raw).context("failed to parse log message"); | |||
| if tx.send(AttachEvent::Log(parsed)).is_err() { | |||
| break; | |||
| } | |||
| } | |||
| }); | |||
| loop { | |||
| let control_request = match rx.recv_timeout(Duration::from_secs(1)) { | |||
| Err(_err) => ControlRequest::Check { | |||
| dataflow_uuid: dataflow_id, | |||
| }, | |||
| Ok(reload_event) => reload_event, | |||
| Ok(AttachEvent::Control(control_request)) => control_request, | |||
| Ok(AttachEvent::Log(Ok(log_message))) => { | |||
| let LogMessage { | |||
| dataflow_id: _, | |||
| node_id, | |||
| level, | |||
| target, | |||
| module_path: _, | |||
| file: _, | |||
| line: _, | |||
| message, | |||
| } = log_message; | |||
| let level = match level { | |||
| log::Level::Error => "ERROR".red(), | |||
| log::Level::Warn => "WARN ".yellow(), | |||
| log::Level::Info => "INFO ".green(), | |||
| other => format!("{other:5}").normal(), | |||
| }; | |||
| let node = match node_id { | |||
| Some(node_id) => format!(" {node_id}").bold(), | |||
| None => "".normal(), | |||
| }; | |||
| let target = match target { | |||
| Some(target) => format!(" {target}").dimmed(), | |||
| None => "".normal(), | |||
| }; | |||
| println!("{level}{node}{target}: {message}"); | |||
| continue; | |||
| } | |||
| Ok(AttachEvent::Log(Err(err))) => { | |||
| tracing::warn!("failed to parse log message: {:#?}", err); | |||
| continue; | |||
| } | |||
| }; | |||
| let reply_raw = session | |||
| @@ -144,3 +208,8 @@ pub fn attach_dataflow( | |||
| }; | |||
| } | |||
| } | |||
| enum AttachEvent { | |||
| Control(ControlRequest), | |||
| Log(eyre::Result<LogMessage>), | |||
| } | |||
| @@ -1,5 +1,6 @@ | |||
| use attach::attach_dataflow; | |||
| use clap::Parser; | |||
| use colored::Colorize; | |||
| use communication_layer_request_reply::{RequestReplyLayer, TcpLayer, TcpRequestReplyConnection}; | |||
| use dora_coordinator::Event; | |||
| use dora_core::{ | |||
| @@ -249,6 +250,7 @@ enum Lang { | |||
| fn main() { | |||
| if let Err(err) = run() { | |||
| eprintln!("\n\n{}", "[ERROR]".bold().red()); | |||
| eprintln!("{err:#}"); | |||
| std::process::exit(1); | |||
| } | |||
| @@ -283,6 +285,12 @@ fn run() -> eyre::Result<()> { | |||
| } | |||
| }; | |||
| let log_level = env_logger::Builder::new() | |||
| .filter_level(log::LevelFilter::Info) | |||
| .parse_default_env() | |||
| .build() | |||
| .filter(); | |||
| match args.command { | |||
| Command::Check { | |||
| dataflow, | |||
| @@ -367,7 +375,8 @@ fn run() -> eyre::Result<()> { | |||
| .wrap_err("Could not validate yaml")?; | |||
| } | |||
| let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into()) | |||
| let coordinator_socket = (coordinator_addr, coordinator_port).into(); | |||
| let mut session = connect_to_coordinator(coordinator_socket) | |||
| .wrap_err("failed to connect to dora coordinator")?; | |||
| let dataflow_id = start_dataflow( | |||
| dataflow_descriptor.clone(), | |||
| @@ -393,6 +402,8 @@ fn run() -> eyre::Result<()> { | |||
| dataflow_id, | |||
| &mut *session, | |||
| hot_reload, | |||
| coordinator_socket, | |||
| log_level, | |||
| )? | |||
| } | |||
| } | |||
| @@ -25,3 +25,4 @@ futures-concurrency = "7.1.0" | |||
| serde_json = "1.0.86" | |||
| names = "0.14.0" | |||
| ctrlc = "3.2.5" | |||
| log = { version = "0.4.21", features = ["serde"] } | |||
| @@ -17,6 +17,7 @@ use tokio::{ | |||
| task::JoinHandle, | |||
| }; | |||
| use tokio_stream::wrappers::ReceiverStream; | |||
| use uuid::Uuid; | |||
| pub(crate) async fn control_events( | |||
| control_listen_addr: SocketAddr, | |||
| @@ -99,14 +100,27 @@ async fn handle_requests( | |||
| }, | |||
| }; | |||
| let result = | |||
| match serde_json::from_slice(&raw).wrap_err("failed to deserialize incoming message") { | |||
| Ok(request) => handle_request(request, &tx).await, | |||
| Err(err) => Err(err), | |||
| }; | |||
| let request = | |||
| serde_json::from_slice(&raw).wrap_err("failed to deserialize incoming message"); | |||
| if let Ok(ControlRequest::LogSubscribe { dataflow_id, level }) = request { | |||
| let _ = tx | |||
| .send(ControlEvent::LogSubscribe { | |||
| dataflow_id, | |||
| level, | |||
| connection, | |||
| }) | |||
| .await; | |||
| break; | |||
| } | |||
| let result = match request { | |||
| Ok(request) => handle_request(request, &tx).await, | |||
| Err(err) => Err(err), | |||
| }; | |||
| let reply = result.unwrap_or_else(|err| ControlRequestReply::Error(format!("{err}"))); | |||
| let serialized = | |||
| let serialized: Vec<u8> = | |||
| match serde_json::to_vec(&reply).wrap_err("failed to serialize ControlRequestReply") { | |||
| Ok(s) => s, | |||
| Err(err) => { | |||
| @@ -160,6 +174,11 @@ pub enum ControlEvent { | |||
| request: ControlRequest, | |||
| reply_sender: oneshot::Sender<eyre::Result<ControlRequestReply>>, | |||
| }, | |||
| LogSubscribe { | |||
| dataflow_id: Uuid, | |||
| level: log::LevelFilter, | |||
| connection: TcpStream, | |||
| }, | |||
| Error(eyre::Report), | |||
| } | |||
| @@ -5,7 +5,7 @@ use crate::{ | |||
| pub use control::ControlEvent; | |||
| use dora_core::{ | |||
| config::{NodeId, OperatorId}, | |||
| coordinator_messages::RegisterResult, | |||
| coordinator_messages::{LogMessage, RegisterResult}, | |||
| daemon_messages::{DaemonCoordinatorEvent, DaemonCoordinatorReply, Timestamped}, | |||
| descriptor::{Descriptor, ResolvedNode}, | |||
| message::uhlc::{self, HLC}, | |||
| @@ -17,6 +17,7 @@ use dora_core::{ | |||
| use eyre::{bail, eyre, ContextCompat, WrapErr}; | |||
| use futures::{stream::FuturesUnordered, Future, Stream, StreamExt}; | |||
| use futures_concurrency::stream::Merge; | |||
| use log_subscriber::LogSubscriber; | |||
| use run::SpawnedDataflow; | |||
| use std::{ | |||
| collections::{BTreeMap, BTreeSet, HashMap}, | |||
| @@ -31,6 +32,7 @@ use uuid::Uuid; | |||
| mod control; | |||
| mod listener; | |||
| mod log_subscriber; | |||
| mod run; | |||
| mod tcp_utils; | |||
| @@ -505,9 +507,25 @@ async fn start_inner( | |||
| )); | |||
| let _ = reply_sender.send(reply); | |||
| } | |||
| ControlRequest::LogSubscribe { .. } => { | |||
| let _ = reply_sender.send(Err(eyre::eyre!( | |||
| "LogSubscribe request should be handled separately" | |||
| ))); | |||
| } | |||
| } | |||
| } | |||
| ControlEvent::Error(err) => tracing::error!("{err:?}"), | |||
| ControlEvent::LogSubscribe { | |||
| dataflow_id, | |||
| level, | |||
| connection, | |||
| } => { | |||
| if let Some(dataflow) = running_dataflows.get_mut(&dataflow_id) { | |||
| dataflow | |||
| .log_subscribers | |||
| .push(LogSubscriber::new(level, connection)); | |||
| } | |||
| } | |||
| }, | |||
| Event::DaemonHeartbeatInterval => { | |||
| let mut disconnected = BTreeSet::new(); | |||
| @@ -560,6 +578,21 @@ async fn start_inner( | |||
| connection.last_heartbeat = Instant::now(); | |||
| } | |||
| } | |||
| Event::Log(message) => { | |||
| if let Some(dataflow) = running_dataflows.get_mut(&message.dataflow_id) { | |||
| for subscriber in &mut dataflow.log_subscribers { | |||
| let send_result = tokio::time::timeout( | |||
| Duration::from_millis(100), | |||
| subscriber.send_message(&message), | |||
| ); | |||
| if send_result.await.is_err() { | |||
| subscriber.close(); | |||
| } | |||
| } | |||
| dataflow.log_subscribers.retain(|s| !s.is_closed()); | |||
| } | |||
| } | |||
| } | |||
| } | |||
| @@ -568,6 +601,7 @@ async fn start_inner( | |||
| Ok(()) | |||
| } | |||
| #[allow(clippy::too_many_arguments)] | |||
| async fn stop_dataflow_by_uuid( | |||
| running_dataflows: &mut HashMap<Uuid, RunningDataflow>, | |||
| dataflow_results: &HashMap<Uuid, BTreeMap<String, DataflowDaemonResult>>, | |||
| @@ -686,6 +720,8 @@ struct RunningDataflow { | |||
| nodes: Vec<ResolvedNode>, | |||
| reply_senders: Vec<tokio::sync::oneshot::Sender<eyre::Result<ControlRequestReply>>>, | |||
| log_subscribers: Vec<LogSubscriber>, | |||
| } | |||
| struct ArchivedDataflow { | |||
| @@ -885,6 +921,7 @@ async fn start_dataflow( | |||
| machines, | |||
| nodes, | |||
| reply_senders: Vec::new(), | |||
| log_subscribers: Vec::new(), | |||
| }) | |||
| } | |||
| @@ -931,6 +968,7 @@ pub enum Event { | |||
| Daemon(DaemonEvent), | |||
| DaemonHeartbeatInterval, | |||
| CtrlC, | |||
| Log(LogMessage), | |||
| } | |||
| impl Event { | |||
| @@ -97,6 +97,12 @@ pub async fn handle_connection( | |||
| break; | |||
| } | |||
| } | |||
| coordinator_messages::DaemonEvent::Log(message) => { | |||
| let event = Event::Log(message); | |||
| if events_tx.send(event).await.is_err() { | |||
| break; | |||
| } | |||
| } | |||
| }, | |||
| }; | |||
| } | |||
| @@ -0,0 +1,38 @@ | |||
| use dora_core::coordinator_messages::LogMessage; | |||
| use eyre::{Context, ContextCompat}; | |||
| use crate::tcp_utils::tcp_send; | |||
| pub struct LogSubscriber { | |||
| pub level: log::LevelFilter, | |||
| connection: Option<tokio::net::TcpStream>, | |||
| } | |||
| impl LogSubscriber { | |||
| pub fn new(level: log::LevelFilter, connection: tokio::net::TcpStream) -> Self { | |||
| Self { | |||
| level, | |||
| connection: Some(connection), | |||
| } | |||
| } | |||
| pub async fn send_message(&mut self, message: &LogMessage) -> eyre::Result<()> { | |||
| if message.level > self.level { | |||
| return Ok(()); | |||
| } | |||
| let message = serde_json::to_vec(&message)?; | |||
| let connection = self.connection.as_mut().context("connection is closed")?; | |||
| tcp_send(connection, &message) | |||
| .await | |||
| .context("failed to send message")?; | |||
| Ok(()) | |||
| } | |||
| pub fn is_closed(&self) -> bool { | |||
| self.connection.is_none() | |||
| } | |||
| pub fn close(&mut self) { | |||
| self.connection = None; | |||
| } | |||
| } | |||
| @@ -2,7 +2,7 @@ use aligned_vec::{AVec, ConstAlign}; | |||
| use coordinator::CoordinatorEvent; | |||
| use crossbeam::queue::ArrayQueue; | |||
| use dora_core::config::{Input, OperatorId}; | |||
| use dora_core::coordinator_messages::CoordinatorRequest; | |||
| use dora_core::coordinator_messages::{CoordinatorRequest, Level, LogMessage}; | |||
| use dora_core::daemon_messages::{ | |||
| DataMessage, DynamicNodeEvent, InterDaemonEvent, NodeConfig, Timestamped, | |||
| }; | |||
| @@ -87,6 +87,8 @@ pub struct Daemon { | |||
| clock: Arc<uhlc::HLC>, | |||
| } | |||
| type DaemonRunResult = BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>; | |||
| impl Daemon { | |||
| pub async fn run( | |||
| coordinator_addr: SocketAddr, | |||
| @@ -227,7 +229,7 @@ impl Daemon { | |||
| machine_id: String, | |||
| exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>, | |||
| clock: Arc<HLC>, | |||
| ) -> eyre::Result<BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>> { | |||
| ) -> eyre::Result<DaemonRunResult> { | |||
| let coordinator_connection = match coordinator_addr { | |||
| Some(addr) => { | |||
| let stream = TcpStream::connect(addr) | |||
| @@ -272,7 +274,7 @@ impl Daemon { | |||
| async fn run_inner( | |||
| mut self, | |||
| incoming_events: impl Stream<Item = Timestamped<Event>> + Unpin, | |||
| ) -> eyre::Result<BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>> { | |||
| ) -> eyre::Result<DaemonRunResult> { | |||
| let mut events = incoming_events; | |||
| while let Some(event) = events.next().await { | |||
| @@ -332,6 +334,26 @@ impl Daemon { | |||
| Ok(self.dataflow_node_results) | |||
| } | |||
| async fn send_log_message(&mut self, message: LogMessage) -> eyre::Result<()> { | |||
| if let Some(connection) = &mut self.coordinator_connection { | |||
| let msg = serde_json::to_vec(&Timestamped { | |||
| inner: CoordinatorRequest::Event { | |||
| machine_id: self.machine_id.clone(), | |||
| event: DaemonEvent::Log(message), | |||
| }, | |||
| timestamp: self.clock.new_timestamp(), | |||
| })?; | |||
| tcp_send(connection, &msg) | |||
| .await | |||
| .wrap_err("failed to send watchdog message to dora-coordinator")?; | |||
| if self.last_coordinator_heartbeat.elapsed() > Duration::from_secs(20) { | |||
| bail!("lost connection to coordinator") | |||
| } | |||
| } | |||
| Ok(()) | |||
| } | |||
| async fn handle_coordinator_event( | |||
| &mut self, | |||
| event: DaemonCoordinatorEvent, | |||
| @@ -577,6 +599,7 @@ impl Daemon { | |||
| } | |||
| }; | |||
| let mut log_messages = Vec::new(); | |||
| for node in nodes { | |||
| let local = node.deploy.machine == self.machine_id; | |||
| @@ -639,8 +662,17 @@ impl Daemon { | |||
| dataflow.running_nodes.insert(node_id, running_node); | |||
| } | |||
| Err(err) => { | |||
| tracing::error!("{err:?}"); | |||
| dataflow | |||
| log_messages.push(LogMessage { | |||
| dataflow_id, | |||
| node_id: Some(node_id.clone()), | |||
| level: Level::Error, | |||
| target: None, | |||
| module_path: None, | |||
| file: None, | |||
| line: None, | |||
| message: format!("{err:?}"), | |||
| }); | |||
| let messages = dataflow | |||
| .pending_nodes | |||
| .handle_node_stop( | |||
| &node_id, | |||
| @@ -649,6 +681,7 @@ impl Daemon { | |||
| &mut dataflow.cascading_error_causes, | |||
| ) | |||
| .await?; | |||
| log_messages.extend(messages); | |||
| } | |||
| } | |||
| } else { | |||
| @@ -656,6 +689,10 @@ impl Daemon { | |||
| } | |||
| } | |||
| for log_message in log_messages { | |||
| self.send_log_message(log_message).await?; | |||
| } | |||
| Ok(()) | |||
| } | |||
| @@ -1006,7 +1043,7 @@ impl Daemon { | |||
| format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`") | |||
| })?; | |||
| dataflow | |||
| let log_messages = dataflow | |||
| .pending_nodes | |||
| .handle_node_stop( | |||
| node_id, | |||
| @@ -1060,6 +1097,11 @@ impl Daemon { | |||
| } | |||
| self.running.remove(&dataflow_id); | |||
| } | |||
| for log_message in log_messages { | |||
| self.send_log_message(log_message).await?; | |||
| } | |||
| Ok(()) | |||
| } | |||
| @@ -1204,6 +1246,25 @@ impl Daemon { | |||
| } | |||
| }; | |||
| self.send_log_message(LogMessage { | |||
| dataflow_id, | |||
| node_id: Some(node_id.clone()), | |||
| level: if node_result.is_ok() { | |||
| Level::Info | |||
| } else { | |||
| Level::Error | |||
| }, | |||
| target: None, | |||
| module_path: None, | |||
| file: None, | |||
| line: None, | |||
| message: match &node_result { | |||
| Ok(()) => "node finished successfully".to_string(), | |||
| Err(err) => format!("{err}"), | |||
| }, | |||
| }) | |||
| .await?; | |||
| self.dataflow_node_results | |||
| .entry(dataflow_id) | |||
| .or_default() | |||
| @@ -2,7 +2,7 @@ use std::collections::{HashMap, HashSet}; | |||
| use dora_core::{ | |||
| config::NodeId, | |||
| coordinator_messages::{CoordinatorRequest, DaemonEvent}, | |||
| coordinator_messages::{CoordinatorRequest, DaemonEvent, Level, LogMessage}, | |||
| daemon_messages::{DaemonReply, DataflowId, Timestamped}, | |||
| message::uhlc::{Timestamp, HLC}, | |||
| }; | |||
| @@ -77,14 +77,24 @@ impl PendingNodes { | |||
| coordinator_connection: &mut Option<TcpStream>, | |||
| clock: &HLC, | |||
| cascading_errors: &mut CascadingErrorCauses, | |||
| ) -> eyre::Result<()> { | |||
| ) -> eyre::Result<Vec<LogMessage>> { | |||
| let mut log = Vec::new(); | |||
| if self.local_nodes.remove(node_id) { | |||
| tracing::warn!("node `{node_id}` exited before initializing dora connection"); | |||
| log.push(LogMessage { | |||
| dataflow_id: self.dataflow_id, | |||
| node_id: Some(node_id.clone()), | |||
| level: Level::Warn, | |||
| target: None, | |||
| module_path: None, | |||
| file: None, | |||
| line: None, | |||
| message: "node exited before initializing dora connection".into(), | |||
| }); | |||
| self.exited_before_subscribe.push(node_id.clone()); | |||
| self.update_dataflow_status(coordinator_connection, clock, cascading_errors) | |||
| .await?; | |||
| } | |||
| Ok(()) | |||
| Ok(log) | |||
| } | |||
| pub async fn handle_external_all_nodes_ready( | |||
| @@ -376,14 +376,6 @@ pub async fn spawn_node( | |||
| node_stderr_most_recent.force_push(new); | |||
| if buffer.starts_with("Traceback (most recent call last):") { | |||
| if !finished { | |||
| continue; | |||
| } else { | |||
| tracing::error!("{dataflow_id}/{}: \n{buffer}", node_id); | |||
| } | |||
| } | |||
| // send the buffered lines | |||
| let lines = std::mem::take(&mut buffer); | |||
| let sent = stderr_tx.send(lines.clone()).await; | |||
| @@ -94,8 +94,8 @@ impl RequestReplyLayer for TcpLayer { | |||
| } | |||
| } | |||
| struct TcpConnection { | |||
| stream: TcpStream, | |||
| pub struct TcpConnection { | |||
| pub stream: TcpStream, | |||
| } | |||
| impl ListenConnection for TcpConnection { | |||
| @@ -128,14 +128,14 @@ impl RequestReplyConnection for TcpConnection { | |||
| } | |||
| impl TcpConnection { | |||
| fn send(&mut self, request: &[u8]) -> std::io::Result<()> { | |||
| pub fn send(&mut self, request: &[u8]) -> std::io::Result<()> { | |||
| let len_raw = (request.len() as u64).to_le_bytes(); | |||
| self.stream.write_all(&len_raw)?; | |||
| self.stream.write_all(request)?; | |||
| Ok(()) | |||
| } | |||
| fn receive(&mut self) -> std::io::Result<Vec<u8>> { | |||
| pub fn receive(&mut self) -> std::io::Result<Vec<u8>> { | |||
| let reply_len = { | |||
| let mut raw = [0; 8]; | |||
| self.stream.read_exact(&mut raw)?; | |||
| @@ -22,3 +22,4 @@ tokio = { version = "1.24.1", features = ["fs", "process", "sync"] } | |||
| aligned-vec = { version = "0.5.0", features = ["serde"] } | |||
| schemars = "0.8.19" | |||
| serde_json = "1.0.117" | |||
| log = { version = "0.4.21", features = ["serde"] } | |||
| @@ -1,5 +1,6 @@ | |||
| use crate::{config::NodeId, daemon_messages::DataflowId, topics::DataflowDaemonResult}; | |||
| use eyre::eyre; | |||
| pub use log::Level; | |||
| #[derive(Debug, serde::Serialize, serde::Deserialize)] | |||
| pub enum CoordinatorRequest { | |||
| @@ -14,6 +15,19 @@ pub enum CoordinatorRequest { | |||
| }, | |||
| } | |||
| #[derive(Debug, serde::Serialize, serde::Deserialize)] | |||
| #[must_use] | |||
| pub struct LogMessage { | |||
| pub dataflow_id: DataflowId, | |||
| pub node_id: Option<NodeId>, | |||
| pub level: log::Level, | |||
| pub target: Option<String>, | |||
| pub module_path: Option<String>, | |||
| pub file: Option<String>, | |||
| pub line: Option<u32>, | |||
| pub message: String, | |||
| } | |||
| #[derive(Debug, serde::Serialize, serde::Deserialize)] | |||
| pub enum DaemonEvent { | |||
| AllNodesReady { | |||
| @@ -25,6 +39,7 @@ pub enum DaemonEvent { | |||
| result: DataflowDaemonResult, | |||
| }, | |||
| Heartbeat, | |||
| Log(LogMessage), | |||
| } | |||
| #[derive(Debug, serde::Serialize, serde::Deserialize)] | |||
| @@ -55,6 +55,10 @@ pub enum ControlRequest { | |||
| List, | |||
| DaemonConnected, | |||
| ConnectedMachines, | |||
| LogSubscribe { | |||
| dataflow_id: Uuid, | |||
| level: log::LevelFilter, | |||
| }, | |||
| } | |||
| #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] | |||
| @@ -189,7 +193,7 @@ impl std::fmt::Display for NodeError { | |||
| NodeErrorCause::GraceDuration => {}, // handled above | |||
| NodeErrorCause::Cascading { caused_by_node } => write!( | |||
| f, | |||
| "\n\nThis error occurred because node `{caused_by_node}` exited before connecting to dora." | |||
| ". This error occurred because node `{caused_by_node}` exited before connecting to dora." | |||
| )?, | |||
| NodeErrorCause::Other { stderr } if stderr.is_empty() => {} | |||
| NodeErrorCause::Other { stderr } => { | |||