From 2e0419b100117059e69e4de9d7ad8dcff72341c6 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Fri, 21 Jun 2024 17:40:17 +0200 Subject: [PATCH] Add basic log forwarding from daemon to CLI Forwarded by coordinator --- Cargo.lock | 17 +++- binaries/cli/Cargo.toml | 2 + binaries/cli/src/attach.rs | 78 +++++++++++++++++-- binaries/cli/src/main.rs | 4 +- binaries/coordinator/Cargo.toml | 1 + binaries/coordinator/src/control.rs | 31 ++++++-- binaries/coordinator/src/lib.rs | 31 +++++++- binaries/coordinator/src/listener.rs | 6 ++ binaries/coordinator/src/log_subscriber.rs | 23 ++++++ binaries/daemon/src/lib.rs | 37 ++++++++- binaries/daemon/src/pending.rs | 18 ++++- binaries/daemon/src/spawn.rs | 8 -- .../request-reply/src/tcp.rs | 8 +- libraries/core/Cargo.toml | 1 + libraries/core/src/coordinator_messages.rs | 15 ++++ libraries/core/src/topics.rs | 4 + 16 files changed, 248 insertions(+), 36 deletions(-) create mode 100644 binaries/coordinator/src/log_subscriber.rs diff --git a/Cargo.lock b/Cargo.lock index ad71dee9..118ff83c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1687,6 +1687,16 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b6a852b24ab71dffc585bcb46eaf7959d175cb865a7152e35b348d1b2960422" +[[package]] +name = "colored" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cbf2150cce219b664a8a70df7a1f933836724b503f8a413af9365b4dcc4d90b8" +dependencies = [ + "lazy_static", + "windows-sys 0.48.0", +] + [[package]] name = "com" version = "0.6.0" @@ -2251,6 +2261,7 @@ version = "0.3.4" dependencies = [ "bat", "clap 4.5.7", + "colored", "communication-layer-request-reply", "ctrlc", "dora-coordinator", @@ -2264,6 +2275,7 @@ dependencies = [ "eyre", "futures", "inquire", + "log", "notify 5.2.0", "serde", "serde_json", @@ -2286,6 +2298,7 @@ dependencies = [ "eyre", "futures", "futures-concurrency", + "log", "names", "serde_json", "tokio", @@ -2301,6 +2314,7 @@ dependencies = [ "aligned-vec", "dora-message", "eyre", + "log", "once_cell", "schemars", "serde", @@ -4690,6 +4704,7 @@ version = "0.4.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" dependencies = [ + "serde", "value-bag", ] @@ -9605,7 +9620,7 @@ version = "1.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ - "cfg-if 0.1.10", + "cfg-if 1.0.0", "static_assertions", ] diff --git a/binaries/cli/Cargo.toml b/binaries/cli/Cargo.toml index 15ee0424..d0155f25 100644 --- a/binaries/cli/Cargo.toml +++ b/binaries/cli/Cargo.toml @@ -42,3 +42,5 @@ tokio = { version = "1.20.1", features = ["full"] } tokio-stream = { version = "0.1.8", features = ["io-util", "net"] } futures = "0.3.21" duration-str = "0.5" +log = { version = "0.4.21", features = ["serde"] } +colored = "2.1.0" diff --git a/binaries/cli/src/attach.rs b/binaries/cli/src/attach.rs index 1d5f8275..3ab82055 100644 --- a/binaries/cli/src/attach.rs +++ b/binaries/cli/src/attach.rs @@ -1,12 +1,17 @@ -use communication_layer_request_reply::TcpRequestReplyConnection; +use colored::Colorize; +use communication_layer_request_reply::{TcpConnection, TcpRequestReplyConnection}; use dora_core::{ + coordinator_messages::LogMessage, descriptor::{resolve_path, CoreNodeKind, Descriptor}, topics::{ControlRequest, ControlRequestReply}, }; use eyre::Context; use notify::event::ModifyKind; use notify::{Config, Event as NotifyEvent, EventKind, RecommendedWatcher, RecursiveMode, Watcher}; -use std::collections::HashMap; +use std::{ + collections::HashMap, + net::{SocketAddr, TcpStream}, +}; use std::{path::PathBuf, sync::mpsc, time::Duration}; use tracing::{error, info}; use uuid::Uuid; @@ -19,6 +24,7 @@ pub fn attach_dataflow( dataflow_id: Uuid, session: &mut TcpRequestReplyConnection, hot_reload: bool, + coordinator_socket: SocketAddr, ) -> Result<(), eyre::ErrReport> { let (tx, rx) = mpsc::sync_channel(2); @@ -71,11 +77,11 @@ pub fn attach_dataflow( for path in paths { if let Some((dataflow_id, node_id, operator_id)) = node_path_lookup.get(&path) { watcher_tx - .send(ControlRequest::Reload { + .send(AttachEvent::Control(ControlRequest::Reload { dataflow_id: *dataflow_id, node_id: node_id.clone(), operator_id: operator_id.clone(), - }) + })) .context("Could not send reload request to the cli loop") .unwrap(); } @@ -98,17 +104,17 @@ pub fn attach_dataflow( }; // Setup Ctrlc Watcher to stop dataflow after ctrlc - let ctrlc_tx = tx; + let ctrlc_tx = tx.clone(); let mut ctrlc_sent = false; ctrlc::set_handler(move || { if ctrlc_sent { std::process::abort(); } else { if ctrlc_tx - .send(ControlRequest::Stop { + .send(AttachEvent::Control(ControlRequest::Stop { dataflow_uuid: dataflow_id, grace_duration: None, - }) + })) .is_err() { // bail!("failed to report ctrl-c event to dora-daemon"); @@ -118,12 +124,63 @@ pub fn attach_dataflow( }) .wrap_err("failed to set ctrl-c handler")?; + // subscribe to log messages + let mut log_session = TcpConnection { + stream: TcpStream::connect(coordinator_socket) + .wrap_err("failed to connect to dora coordinator")?, + }; + let level = log::Level::Warn; + log_session + .send( + &serde_json::to_vec(&ControlRequest::LogSubscribe { dataflow_id, level }) + .wrap_err("failed to serialize message")?, + ) + .wrap_err("failed to send log subscribe request to coordinator")?; + std::thread::spawn(move || { + while let Ok(raw) = log_session.receive() { + let parsed: eyre::Result = + serde_json::from_slice(&raw).context("failed to parse log message"); + if tx.send(AttachEvent::Log(parsed)).is_err() { + break; + } + } + }); + loop { let control_request = match rx.recv_timeout(Duration::from_secs(1)) { Err(_err) => ControlRequest::Check { dataflow_uuid: dataflow_id, }, - Ok(reload_event) => reload_event, + Ok(AttachEvent::Control(control_request)) => control_request, + Ok(AttachEvent::Log(Ok(log_message))) => { + let LogMessage { + dataflow_id, + node_id, + level, + target, + module_path, + file, + line, + message, + } = log_message; + let level = match level { + log::Level::Error => "ERROR".red(), + log::Level::Warn => "WARN ".yellow(), + other => format!("{other:5}").normal(), + }; + let target = target.dimmed(); + let node = match node_id { + Some(node_id) => format!("{node_id} ").normal(), + None => "".normal(), + }; + + println!("{level} {node}{target}: {message}"); + continue; + } + Ok(AttachEvent::Log(Err(err))) => { + tracing::warn!("failed to parse log message: {:#?}", err); + continue; + } }; let reply_raw = session @@ -144,3 +201,8 @@ pub fn attach_dataflow( }; } } + +enum AttachEvent { + Control(ControlRequest), + Log(eyre::Result), +} diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 070a3640..a1c2ba8e 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -361,7 +361,8 @@ fn run() -> eyre::Result<()> { .wrap_err("Could not validate yaml")?; } - let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into()) + let coordinator_socket = (coordinator_addr, coordinator_port).into(); + let mut session = connect_to_coordinator(coordinator_socket) .wrap_err("failed to connect to dora coordinator")?; let dataflow_id = start_dataflow( dataflow_descriptor.clone(), @@ -377,6 +378,7 @@ fn run() -> eyre::Result<()> { dataflow_id, &mut *session, hot_reload, + coordinator_socket, )? } } diff --git a/binaries/coordinator/Cargo.toml b/binaries/coordinator/Cargo.toml index 4c6b6887..4132fd66 100644 --- a/binaries/coordinator/Cargo.toml +++ b/binaries/coordinator/Cargo.toml @@ -25,3 +25,4 @@ futures-concurrency = "7.1.0" serde_json = "1.0.86" names = "0.14.0" ctrlc = "3.2.5" +log = { version = "0.4.21", features = ["serde"] } diff --git a/binaries/coordinator/src/control.rs b/binaries/coordinator/src/control.rs index c8987a9d..ed24a930 100644 --- a/binaries/coordinator/src/control.rs +++ b/binaries/coordinator/src/control.rs @@ -17,6 +17,7 @@ use tokio::{ task::JoinHandle, }; use tokio_stream::wrappers::ReceiverStream; +use uuid::Uuid; pub(crate) async fn control_events( control_listen_addr: SocketAddr, @@ -99,14 +100,27 @@ async fn handle_requests( }, }; - let result = - match serde_json::from_slice(&raw).wrap_err("failed to deserialize incoming message") { - Ok(request) => handle_request(request, &tx).await, - Err(err) => Err(err), - }; + let request = + serde_json::from_slice(&raw).wrap_err("failed to deserialize incoming message"); + + if let Ok(ControlRequest::LogSubscribe { dataflow_id, level }) = request { + let _ = tx + .send(ControlEvent::LogSubscribe { + dataflow_id, + level, + connection, + }) + .await; + break; + } + + let result = match request { + Ok(request) => handle_request(request, &tx).await, + Err(err) => Err(err), + }; let reply = result.unwrap_or_else(|err| ControlRequestReply::Error(format!("{err}"))); - let serialized = + let serialized: Vec = match serde_json::to_vec(&reply).wrap_err("failed to serialize ControlRequestReply") { Ok(s) => s, Err(err) => { @@ -160,6 +174,11 @@ pub enum ControlEvent { request: ControlRequest, reply_sender: oneshot::Sender>, }, + LogSubscribe { + dataflow_id: Uuid, + level: log::Level, + connection: TcpStream, + }, Error(eyre::Report), } diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 1ca1985a..504c6136 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -5,7 +5,7 @@ use crate::{ pub use control::ControlEvent; use dora_core::{ config::{NodeId, OperatorId}, - coordinator_messages::RegisterResult, + coordinator_messages::{LogMessage, RegisterResult}, daemon_messages::{DaemonCoordinatorEvent, DaemonCoordinatorReply, Timestamped}, descriptor::{Descriptor, ResolvedNode}, message::uhlc::{self, HLC}, @@ -16,6 +16,7 @@ use dora_core::{ use eyre::{bail, eyre, ContextCompat, WrapErr}; use futures::{stream::FuturesUnordered, Future, Stream, StreamExt}; use futures_concurrency::stream::Merge; +use log_subscriber::LogSubscriber; use run::SpawnedDataflow; use std::{ collections::{BTreeMap, BTreeSet, HashMap}, @@ -30,6 +31,7 @@ use uuid::Uuid; mod control; mod listener; +mod log_subscriber; mod run; mod tcp_utils; @@ -488,9 +490,25 @@ async fn start_inner( )); let _ = reply_sender.send(reply); } + ControlRequest::LogSubscribe { .. } => { + let _ = reply_sender.send(Err(eyre::eyre!( + "LogSubscribe request should be handled separately" + ))); + } } } ControlEvent::Error(err) => tracing::error!("{err:?}"), + ControlEvent::LogSubscribe { + dataflow_id, + level, + connection, + } => { + if let Some(dataflow) = running_dataflows.get_mut(&dataflow_id) { + dataflow + .log_subscribers + .push(LogSubscriber::new(level, connection)); + } + } }, Event::DaemonHeartbeatInterval => { let mut disconnected = BTreeSet::new(); @@ -543,6 +561,13 @@ async fn start_inner( connection.last_heartbeat = Instant::now(); } } + Event::Log(message) => { + if let Some(dataflow) = running_dataflows.get_mut(&message.dataflow_id) { + for subscriber in &mut dataflow.log_subscribers { + subscriber.send_message(&message).await?; + } + } + } } } @@ -669,6 +694,8 @@ struct RunningDataflow { nodes: Vec, reply_senders: Vec>>, + + log_subscribers: Vec, } struct ArchivedDataflow { @@ -868,6 +895,7 @@ async fn start_dataflow( machines, nodes, reply_senders: Vec::new(), + log_subscribers: Vec::new(), }) } @@ -914,6 +942,7 @@ pub enum Event { Daemon(DaemonEvent), DaemonHeartbeatInterval, CtrlC, + Log(LogMessage), } impl Event { diff --git a/binaries/coordinator/src/listener.rs b/binaries/coordinator/src/listener.rs index f6f9b56c..8152d26e 100644 --- a/binaries/coordinator/src/listener.rs +++ b/binaries/coordinator/src/listener.rs @@ -97,6 +97,12 @@ pub async fn handle_connection( break; } } + coordinator_messages::DaemonEvent::Log(message) => { + let event = Event::Log(message); + if events_tx.send(event).await.is_err() { + break; + } + } }, }; } diff --git a/binaries/coordinator/src/log_subscriber.rs b/binaries/coordinator/src/log_subscriber.rs new file mode 100644 index 00000000..f4803b7e --- /dev/null +++ b/binaries/coordinator/src/log_subscriber.rs @@ -0,0 +1,23 @@ +use dora_core::coordinator_messages::LogMessage; + +use crate::tcp_utils::tcp_send; + +pub struct LogSubscriber { + pub level: log::Level, + connection: tokio::net::TcpStream, +} + +impl LogSubscriber { + pub fn new(level: log::Level, connection: tokio::net::TcpStream) -> Self { + Self { level, connection } + } + + pub async fn send_message(&mut self, message: &LogMessage) -> eyre::Result<()> { + if message.level > self.level { + return Ok(()); + } + let message = serde_json::to_vec(&message)?; + tcp_send(&mut self.connection, &message).await?; + Ok(()) + } +} diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index f31e284f..8353e361 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -2,7 +2,7 @@ use aligned_vec::{AVec, ConstAlign}; use coordinator::CoordinatorEvent; use crossbeam::queue::ArrayQueue; use dora_core::config::{Input, OperatorId}; -use dora_core::coordinator_messages::CoordinatorRequest; +use dora_core::coordinator_messages::{CoordinatorRequest, LogMessage}; use dora_core::daemon_messages::{ DataMessage, DynamicNodeEvent, InterDaemonEvent, NodeConfig, Timestamped, }; @@ -332,6 +332,26 @@ impl Daemon { Ok(self.dataflow_node_results) } + async fn send_log_message(&mut self, message: LogMessage) -> eyre::Result<()> { + if let Some(connection) = &mut self.coordinator_connection { + let msg = serde_json::to_vec(&Timestamped { + inner: CoordinatorRequest::Event { + machine_id: self.machine_id.clone(), + event: DaemonEvent::Log(message), + }, + timestamp: self.clock.new_timestamp(), + })?; + tcp_send(connection, &msg) + .await + .wrap_err("failed to send watchdog message to dora-coordinator")?; + + if self.last_coordinator_heartbeat.elapsed() > Duration::from_secs(20) { + bail!("lost connection to coordinator") + } + } + Ok(()) + } + async fn handle_coordinator_event( &mut self, event: DaemonCoordinatorEvent, @@ -577,6 +597,7 @@ impl Daemon { } }; + let mut log_messages = Vec::new(); for node in nodes { let local = node.deploy.machine == self.machine_id; @@ -640,7 +661,7 @@ impl Daemon { } Err(err) => { tracing::error!("{err:?}"); - dataflow + let messages = dataflow .pending_nodes .handle_node_stop( &node_id, @@ -649,6 +670,7 @@ impl Daemon { &mut dataflow.cascading_error_causes, ) .await?; + log_messages.extend(messages); } } } else { @@ -656,6 +678,10 @@ impl Daemon { } } + for log_message in log_messages { + self.send_log_message(log_message).await?; + } + Ok(()) } @@ -1006,7 +1032,7 @@ impl Daemon { format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`") })?; - dataflow + let log_messages = dataflow .pending_nodes .handle_node_stop( node_id, @@ -1060,6 +1086,11 @@ impl Daemon { } self.running.remove(&dataflow_id); } + + for log_message in log_messages { + self.send_log_message(log_message).await?; + } + Ok(()) } diff --git a/binaries/daemon/src/pending.rs b/binaries/daemon/src/pending.rs index d1fb1b30..c1210a1b 100644 --- a/binaries/daemon/src/pending.rs +++ b/binaries/daemon/src/pending.rs @@ -2,7 +2,7 @@ use std::collections::{HashMap, HashSet}; use dora_core::{ config::NodeId, - coordinator_messages::{CoordinatorRequest, DaemonEvent}, + coordinator_messages::{CoordinatorRequest, DaemonEvent, Level, LogMessage}, daemon_messages::{DaemonReply, DataflowId, Timestamped}, message::uhlc::{Timestamp, HLC}, }; @@ -77,14 +77,24 @@ impl PendingNodes { coordinator_connection: &mut Option, clock: &HLC, cascading_errors: &mut CascadingErrorCauses, - ) -> eyre::Result<()> { + ) -> eyre::Result> { + let mut log = Vec::new(); if self.local_nodes.remove(node_id) { - tracing::warn!("node `{node_id}` exited before initializing dora connection"); + log.push(LogMessage { + dataflow_id: self.dataflow_id, + node_id: Some(node_id.clone()), + level: Level::Warn, + target: "exit".into(), + module_path: None, + file: None, + line: None, + message: "node exited before initializing dora connection".into(), + }); self.exited_before_subscribe.push(node_id.clone()); self.update_dataflow_status(coordinator_connection, clock, cascading_errors) .await?; } - Ok(()) + Ok(log) } pub async fn handle_external_all_nodes_ready( diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index b0af2e31..f84fac75 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -376,14 +376,6 @@ pub async fn spawn_node( node_stderr_most_recent.force_push(new); - if buffer.starts_with("Traceback (most recent call last):") { - if !finished { - continue; - } else { - tracing::error!("{dataflow_id}/{}: \n{buffer}", node_id); - } - } - // send the buffered lines let lines = std::mem::take(&mut buffer); let sent = stderr_tx.send(lines.clone()).await; diff --git a/libraries/communication-layer/request-reply/src/tcp.rs b/libraries/communication-layer/request-reply/src/tcp.rs index eea7c5e6..27dd59cf 100644 --- a/libraries/communication-layer/request-reply/src/tcp.rs +++ b/libraries/communication-layer/request-reply/src/tcp.rs @@ -94,8 +94,8 @@ impl RequestReplyLayer for TcpLayer { } } -struct TcpConnection { - stream: TcpStream, +pub struct TcpConnection { + pub stream: TcpStream, } impl ListenConnection for TcpConnection { @@ -128,14 +128,14 @@ impl RequestReplyConnection for TcpConnection { } impl TcpConnection { - fn send(&mut self, request: &[u8]) -> std::io::Result<()> { + pub fn send(&mut self, request: &[u8]) -> std::io::Result<()> { let len_raw = (request.len() as u64).to_le_bytes(); self.stream.write_all(&len_raw)?; self.stream.write_all(request)?; Ok(()) } - fn receive(&mut self) -> std::io::Result> { + pub fn receive(&mut self) -> std::io::Result> { let reply_len = { let mut raw = [0; 8]; self.stream.read_exact(&mut raw)?; diff --git a/libraries/core/Cargo.toml b/libraries/core/Cargo.toml index a211d8d7..24dc0e5a 100644 --- a/libraries/core/Cargo.toml +++ b/libraries/core/Cargo.toml @@ -22,3 +22,4 @@ tokio = { version = "1.24.1", features = ["fs", "process", "sync"] } aligned-vec = { version = "0.5.0", features = ["serde"] } schemars = "0.8.19" serde_json = "1.0.117" +log = { version = "0.4.21", features = ["serde"] } diff --git a/libraries/core/src/coordinator_messages.rs b/libraries/core/src/coordinator_messages.rs index 5a4a1db9..d471791a 100644 --- a/libraries/core/src/coordinator_messages.rs +++ b/libraries/core/src/coordinator_messages.rs @@ -1,5 +1,6 @@ use crate::{config::NodeId, daemon_messages::DataflowId, topics::DataflowDaemonResult}; use eyre::eyre; +pub use log::Level; #[derive(Debug, serde::Serialize, serde::Deserialize)] pub enum CoordinatorRequest { @@ -14,6 +15,19 @@ pub enum CoordinatorRequest { }, } +#[derive(Debug, serde::Serialize, serde::Deserialize)] +#[must_use] +pub struct LogMessage { + pub dataflow_id: DataflowId, + pub node_id: Option, + pub level: log::Level, + pub target: String, + pub module_path: Option, + pub file: Option, + pub line: Option, + pub message: String, +} + #[derive(Debug, serde::Serialize, serde::Deserialize)] pub enum DaemonEvent { AllNodesReady { @@ -25,6 +39,7 @@ pub enum DaemonEvent { result: DataflowDaemonResult, }, Heartbeat, + Log(LogMessage), } #[derive(Debug, serde::Serialize, serde::Deserialize)] diff --git a/libraries/core/src/topics.rs b/libraries/core/src/topics.rs index cb4061e5..820395f3 100644 --- a/libraries/core/src/topics.rs +++ b/libraries/core/src/topics.rs @@ -55,6 +55,10 @@ pub enum ControlRequest { List, DaemonConnected, ConnectedMachines, + LogSubscribe { + dataflow_id: Uuid, + level: log::Level, + }, } #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]