From 790afa0d0729cbaeb42ed93dfc628a35cad4741e Mon Sep 17 00:00:00 2001 From: mivik Date: Wed, 23 Jul 2025 00:42:55 +0800 Subject: [PATCH 1/4] feat(cli): add --tail to logs, remove bat dependency --- Cargo.lock | 243 +----------------- binaries/cli/Cargo.toml | 1 - binaries/cli/src/command/logs.rs | 58 +++-- binaries/coordinator/src/lib.rs | 5 +- binaries/daemon/Cargo.toml | 1 + binaries/daemon/src/lib.rs | 75 +++++- libraries/message/src/cli_to_coordinator.rs | 1 + .../message/src/coordinator_to_daemon.rs | 1 + 8 files changed, 119 insertions(+), 266 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3c4956e8..32762dc8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -264,15 +264,6 @@ dependencies = [ "libc", ] -[[package]] -name = "ansi_colours" -version = "1.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14eec43e0298190790f41679fe69ef7a829d2a2ddd78c8c00339e84710e435fe" -dependencies = [ - "rgb", -] - [[package]] name = "ansi_term" version = "0.12.1" @@ -1264,43 +1255,6 @@ version = "1.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89e25b6adfb930f02d1981565a6e5d9c547ac15a96606256d3b59040e5cd4ca3" -[[package]] -name = "bat" -version = "0.24.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9dcc9e5637c2330d8eb7b920f2aa5d9e184446c258466f825ea1412c7614cc86" -dependencies = [ - "ansi_colours", - "bincode", - "bugreport", - "bytesize", - "clap 4.5.32", - "clircle", - "console", - "content_inspector", - "encoding_rs", - "etcetera", - "flate2", - "git2", - "globset", - "grep-cli", - "home", - "nu-ansi-term 0.49.0", - "once_cell", - "path_abs", - "plist", - "regex", - "semver", - "serde", - "serde_yaml 0.9.34+deprecated", - "shell-words", - "syntect", - "thiserror 1.0.69", - "unicode-width 0.1.14", - "walkdir", - "wild", -] - [[package]] name = "benchmark-example-node" version = "0.3.12" @@ -1475,17 +1429,6 @@ dependencies = [ "serde", ] -[[package]] -name = "bugreport" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f280f65ce85b880919349bbfcb204930291251eedcb2e5f84ce2f51df969c162" -dependencies = [ - "git-version", - "shell-escape", - "sysinfo 0.33.1", -] - [[package]] name = "built" version = "0.7.3" @@ -1554,12 +1497,6 @@ dependencies = [ "serde", ] -[[package]] -name = "bytesize" -version = "1.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d2c12f985c78475a6b8d629afd0c360260ef34cfef52efccdcfd31972f81c2e" - [[package]] name = "cacache" version = "13.1.0" @@ -2005,18 +1942,6 @@ dependencies = [ "error-code", ] -[[package]] -name = "clircle" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8e87cbed5354f17bd8ca8821a097fb62599787fe8f611743fad7ee156a0a600" -dependencies = [ - "cfg-if 1.0.0", - "libc", - "serde", - "winapi 0.3.9", -] - [[package]] name = "codespan-reporting" version = "0.11.1" @@ -2192,15 +2117,6 @@ dependencies = [ "const_soft_float", ] -[[package]] -name = "content_inspector" -version = "0.2.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7bda66e858c683005a53a9a60c69a4aca7eeaa45d124526e389f7aec8e62f38" -dependencies = [ - "memchr", -] - [[package]] name = "convert_case" version = "0.6.0" @@ -2939,7 +2855,6 @@ dependencies = [ name = "dora-cli" version = "0.3.12" dependencies = [ - "bat", "clap 4.5.32", "colored", "communication-layer-request-reply", @@ -3047,6 +2962,7 @@ dependencies = [ "futures-concurrency", "git2", "itertools 0.14.0", + "memchr", "serde_json", "serde_yaml 0.9.34+deprecated", "shared-memory-server", @@ -4037,17 +3953,6 @@ dependencies = [ "cc", ] -[[package]] -name = "etcetera" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "136d1b5283a1ab77bd9257427ffd09d8667ced0570b6f938942bc7568ed5b943" -dependencies = [ - "cfg-if 1.0.0", - "home", - "windows-sys 0.48.0", -] - [[package]] name = "event-listener" version = "2.5.3" @@ -4809,19 +4714,6 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8d1add55171497b4705a648c6b583acafb01d58050a51727785f0b2c8e0a2b2" -[[package]] -name = "globset" -version = "0.4.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54a1028dfc5f5df5da8a56a73e6c153c9a9708ec57232470703592a3f18e49f5" -dependencies = [ - "aho-corasick", - "bstr", - "log", - "regex-automata 0.4.9", - "regex-syntax 0.8.5", -] - [[package]] name = "gloo-timers" version = "0.2.6" @@ -4990,20 +4882,6 @@ dependencies = [ "bitflags 2.9.0", ] -[[package]] -name = "grep-cli" -version = "0.1.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "47f1288f0e06f279f84926fa4c17e3fcd2a22b357927a82f2777f7be26e4cec0" -dependencies = [ - "bstr", - "globset", - "libc", - "log", - "termcolor", - "winapi-util", -] - [[package]] name = "h2" version = "0.3.26" @@ -6603,9 +6481,9 @@ checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771" [[package]] name = "memchr" -version = "2.7.4" +version = "2.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" +checksum = "32a282da65faaf38286cf3be983213fcf1d2e2a58700e808f83f4ea9a4804bc0" [[package]] name = "memmap2" @@ -7395,15 +7273,6 @@ dependencies = [ "winapi 0.3.9", ] -[[package]] -name = "nu-ansi-term" -version = "0.49.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c073d3c1930d0751774acf49e66653acecb416c3a54c6ec095a9b11caddb5a68" -dependencies = [ - "windows-sys 0.48.0", -] - [[package]] name = "num" version = "0.4.3" @@ -7873,11 +7742,11 @@ checksum = "d75b0bedcc4fe52caa0e03d9f1151a323e4aa5e2d78ba3580400cd3c9e2bc4bc" [[package]] name = "onig" -version = "6.4.0" +version = "6.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c4b31c8722ad9171c6d77d3557db078cab2bd50afcc9d09c8b315c59df8ca4f" +checksum = "336b9c63443aceef14bea841b899035ae3abe89b7c486aaf4c5bd8aafedac3f0" dependencies = [ - "bitflags 1.3.2", + "bitflags 2.9.0", "libc", "once_cell", "onig_sys", @@ -7885,9 +7754,9 @@ dependencies = [ [[package]] name = "onig_sys" -version = "69.8.1" +version = "69.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b829e3d7e9cc74c7e315ee8edb185bf4190da5acde74afd7fc59c35b1f086e7" +checksum = "c7f86c6eef3d6df15f23bcfb6af487cbd2fed4e5581d58d5bf1f5f8b7f6727dc" dependencies = [ "cc", "pkg-config", @@ -8251,15 +8120,6 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" -[[package]] -name = "path_abs" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05ef02f6342ac01d8a93b65f96db53fe68a92a15f41144f97fb00a9e669633c3" -dependencies = [ - "std_prelude", -] - [[package]] name = "pathdiff" version = "0.2.3" @@ -8518,19 +8378,6 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" -[[package]] -name = "plist" -version = "1.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eac26e981c03a6e53e0aee43c113e3202f5581d5360dae7bd2c70e800dd0451d" -dependencies = [ - "base64 0.22.1", - "indexmap 2.8.0", - "quick-xml 0.32.0", - "serde", - "time", -] - [[package]] name = "ply-rs" version = "0.1.3" @@ -9068,15 +8915,6 @@ dependencies = [ "serde", ] -[[package]] -name = "quick-xml" -version = "0.32.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d3a6e5838b60e0e8fa7a43f22ade549a37d61f8bdbe636d0d7816191de969c2" -dependencies = [ - "memchr", -] - [[package]] name = "quick-xml" version = "0.36.2" @@ -12453,18 +12291,6 @@ dependencies = [ "win-sys", ] -[[package]] -name = "shell-escape" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45bb67a18fa91266cc7807181f62f9178a6873bfad7dc788c42e6430db40184f" - -[[package]] -name = "shell-words" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24188a676b6ae68c3b2cb3a01be17fbf7240ce009799bb56d5b1409051e78fde" - [[package]] name = "shellexpand" version = "2.1.2" @@ -12911,12 +12737,6 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" -[[package]] -name = "std_prelude" -version = "0.2.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8207e78455ffdf55661170876f88daf85356e4edd54e0a3dbc79586ca1e50cbe" - [[package]] name = "stop-token" version = "0.7.0" @@ -13088,28 +12908,6 @@ dependencies = [ "syn 2.0.101", ] -[[package]] -name = "syntect" -version = "5.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "874dcfa363995604333cf947ae9f751ca3af4522c60886774c4963943b4746b1" -dependencies = [ - "bincode", - "bitflags 1.3.2", - "flate2", - "fnv", - "once_cell", - "onig", - "plist", - "regex-syntax 0.8.5", - "serde", - "serde_derive", - "serde_json", - "thiserror 1.0.69", - "walkdir", - "yaml-rust", -] - [[package]] name = "sysctl" version = "0.5.5" @@ -13139,20 +12937,6 @@ dependencies = [ "windows 0.52.0", ] -[[package]] -name = "sysinfo" -version = "0.33.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fc858248ea01b66f19d8e8a6d55f41deaf91e9d495246fd01368d99935c6c01" -dependencies = [ - "core-foundation-sys", - "libc", - "memchr", - "ntapi", - "rayon", - "windows 0.57.0", -] - [[package]] name = "sysinfo" version = "0.34.2" @@ -13960,7 +13744,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008" dependencies = [ "matchers", - "nu-ansi-term 0.46.0", + "nu-ansi-term", "once_cell", "regex", "serde", @@ -14934,15 +14718,6 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd7cf3379ca1aac9eea11fba24fd7e315d621f8dfe35c8d7d2be8b793726e07d" -[[package]] -name = "wild" -version = "2.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3131afc8c575281e1e80f36ed6a092aa502c08b18ed7524e86fbbb12bb410e1" -dependencies = [ - "glob", -] - [[package]] name = "win-sys" version = "0.3.1" diff --git a/binaries/cli/Cargo.toml b/binaries/cli/Cargo.toml index 797fb9e0..b879b4af 100644 --- a/binaries/cli/Cargo.toml +++ b/binaries/cli/Cargo.toml @@ -40,7 +40,6 @@ ctrlc = "3.2.5" tracing = "0.1.36" tracing-log = "0.2.0" dora-tracing = { workspace = true, optional = true } -bat = "0.24.0" dora-daemon = { workspace = true } dora-coordinator = { workspace = true } dora-runtime = { workspace = true } diff --git a/binaries/cli/src/command/logs.rs b/binaries/cli/src/command/logs.rs index 06496ad6..9f9da6b9 100644 --- a/binaries/cli/src/command/logs.rs +++ b/binaries/cli/src/command/logs.rs @@ -1,6 +1,7 @@ +use std::io::Write; + use super::{default_tracing, Executable}; use crate::common::{connect_to_coordinator, query_running_dataflows}; -use bat::{Input, PrettyPrinter}; use clap::Args; use communication_layer_request_reply::TcpRequestReplyConnection; use dora_core::topics::{DORA_COORDINATOR_PORT_CONTROL_DEFAULT, LOCALHOST}; @@ -17,6 +18,14 @@ pub struct LogsArgs { /// Show logs for the given node #[clap(value_name = "NAME")] pub node: String, + /// Number of lines to show from the end of the logs + /// + /// Default (`0`) is to show all lines. + #[clap(long, short = 'n', default_value_t = 0)] + pub tail: usize, + /// Follow log output + #[clap(long, short)] + pub follow: bool, /// Address of the dora coordinator #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] pub coordinator_addr: std::net::IpAddr, @@ -34,19 +43,24 @@ impl Executable for LogsArgs { .wrap_err("failed to connect to dora coordinator")?; let list = query_running_dataflows(&mut *session).wrap_err("failed to query running dataflows")?; - if let Some(dataflow) = self.dataflow { - let uuid = Uuid::parse_str(&dataflow).ok(); - let name = if uuid.is_some() { None } else { Some(dataflow) }; - logs(&mut *session, uuid, name, self.node) - } else { - let active = list.get_active(); - let uuid = match &active[..] { - [] => bail!("No dataflows are running"), - [uuid] => uuid.clone(), - _ => inquire::Select::new("Choose dataflow to show logs:", active).prompt()?, - }; - logs(&mut *session, Some(uuid.uuid), None, self.node) - } + let (uuid, name) = match self.dataflow.as_ref().map(|it| Uuid::parse_str(it)) { + Some(Ok(uuid)) => (Some(uuid), None), + Some(Err(_)) => (None, self.dataflow.clone()), + None => { + let active = list.get_active(); + let uuid = match &active[..] { + [] => bail!("No dataflows are running"), + [uuid] => uuid.uuid, + _ => { + let uuid = inquire::Select::new("Choose dataflow to show logs:", active) + .prompt()?; + uuid.uuid + } + }; + (Some(uuid), None) + } + }; + logs(&mut *session, uuid, name, self.node, self.tail, self.follow) } } @@ -55,6 +69,8 @@ pub fn logs( uuid: Option, name: Option, node: String, + tail: usize, + follow: bool, ) -> Result<()> { let logs = { let reply_raw = session @@ -63,6 +79,7 @@ pub fn logs( uuid, name, node: node.clone(), + tail, }) .wrap_err("")?, ) @@ -75,16 +92,9 @@ pub fn logs( } }; - PrettyPrinter::new() - .header(false) - .grid(false) - .line_numbers(false) - .paging_mode(bat::PagingMode::QuitIfOneScreen) - .inputs(vec![Input::from_bytes(&logs) - .name("Logs") - .title(format!("Logs from {node}.").as_str())]) - .print() - .wrap_err("Something went wrong with viewing log file")?; + std::io::stdout() + .write_all(&logs) + .expect("failed to write logs to stdout"); Ok(()) } diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index e3a8d767..d518db2b 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -625,7 +625,7 @@ async fn start_inner( let _ = reply_sender.send(Err(err)); } }, - ControlRequest::Logs { uuid, name, node } => { + ControlRequest::Logs { uuid, name, node, tail } => { let dataflow_uuid = if let Some(uuid) = uuid { Ok(uuid) } else if let Some(name) = name { @@ -643,6 +643,7 @@ async fn start_inner( node.into(), &mut daemon_connections, clock.new_timestamp(), + tail, ) .await .map(ControlRequestReply::Logs); @@ -1182,6 +1183,7 @@ async fn retrieve_logs( node_id: NodeId, daemon_connections: &mut DaemonConnections, timestamp: uhlc::Timestamp, + tail: usize, ) -> eyre::Result> { let nodes = if let Some(dataflow) = archived_dataflows.get(&dataflow_id) { dataflow.nodes.clone() @@ -1195,6 +1197,7 @@ async fn retrieve_logs( inner: DaemonCoordinatorEvent::Logs { dataflow_id, node_id: node_id.clone(), + tail, }, timestamp, })?; diff --git a/binaries/daemon/Cargo.toml b/binaries/daemon/Cargo.toml index c9218d59..b4f9a76c 100644 --- a/binaries/daemon/Cargo.toml +++ b/binaries/daemon/Cargo.toml @@ -49,3 +49,4 @@ url = "2.5.4" git2 = { workspace = true } dunce = "1.0.5" itertools = "0.14" +memchr = "2.7.5" diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index d23dd2b8..f03428d4 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -39,9 +39,10 @@ use shared_memory_server::ShmemConf; use socket_stream_utils::socket_stream_send; use spawn::Spawner; use std::{ - collections::{BTreeMap, BTreeSet, HashMap}, + collections::{BTreeMap, BTreeSet, HashMap, VecDeque}, env::current_dir, future::Future, + io, net::SocketAddr, path::{Path, PathBuf}, pin::pin, @@ -51,7 +52,7 @@ use std::{ use sysinfo::Pid; use tokio::{ fs::File, - io::AsyncReadExt, + io::{AsyncReadExt, AsyncSeekExt}, net::TcpStream, sync::{ broadcast, @@ -801,6 +802,7 @@ impl Daemon { DaemonCoordinatorEvent::Logs { dataflow_id, node_id, + tail, } => { match self.working_dir.get(&dataflow_id) { Some(working_dir) => { @@ -815,10 +817,17 @@ impl Daemon { log::log_path(&working_dir, &dataflow_id, &node_id) ))?; - let mut contents = vec![]; - file.read_to_end(&mut contents) - .await - .wrap_err("Could not read content of log file")?; + let mut contents = if tail == 0 { + let mut contents = vec![]; + file.read_to_end(&mut contents).await.map(|_| contents) + } else { + read_last_n_lines(&mut file, tail).await + } + .wrap_err("Could not read last n lines of log file")?; + if !contents.ends_with(b"\n") { + // Append newline for better readability + contents.push(b'\n'); + } Result::, eyre::Report>::Ok(contents) } .await @@ -2166,6 +2175,60 @@ impl Daemon { } } +async fn read_last_n_lines(file: &mut File, mut tail: usize) -> io::Result> { + let mut pos = file.seek(io::SeekFrom::End(0)).await?; + + let mut output = VecDeque::::new(); + let mut extend_slice_to_start = |slice: &[u8]| { + output.extend(slice); + output.rotate_right(slice.len()); + }; + + let mut buffer = vec![0; 2048]; + let mut estimated_line_length = 0; + let mut at_end = true; + 'main: while tail > 0 && pos > 0 { + let new_pos = pos.saturating_sub(buffer.len() as u64); + file.seek(io::SeekFrom::Start(new_pos)).await?; + let read_len = (pos - new_pos) as usize; + pos = new_pos; + + let read = file.read(&mut buffer[..read_len]).await?; + if read < read_len { + break; + } + let read_buf = if at_end { + at_end = false; + &buffer[..read].trim_ascii_end() + } else { + &buffer[..read] + }; + + let mut iter = memchr::memrchr_iter(b'\n', read_buf); + let mut lines = 1; + loop { + let Some(pos) = iter.next() else { + extend_slice_to_start(read_buf); + break; + }; + lines += 1; + tail -= 1; + if tail == 0 { + extend_slice_to_start(&read_buf[(pos + 1)..]); + break 'main; + } + } + + estimated_line_length = estimated_line_length.max((read_buf.len() + 1).div_ceil(lines)); + let estimated_buffer_length = estimated_line_length * tail; + if estimated_buffer_length >= buffer.len() * 2 { + buffer.resize(buffer.len() * 2, 0); + } + } + + Ok(output.into()) +} + async fn set_up_event_stream( coordinator_addr: SocketAddr, machine_id: &Option, diff --git a/libraries/message/src/cli_to_coordinator.rs b/libraries/message/src/cli_to_coordinator.rs index bf3d3a03..5e75e955 100644 --- a/libraries/message/src/cli_to_coordinator.rs +++ b/libraries/message/src/cli_to_coordinator.rs @@ -67,6 +67,7 @@ pub enum ControlRequest { uuid: Option, name: Option, node: String, + tail: usize, }, Destroy, List, diff --git a/libraries/message/src/coordinator_to_daemon.rs b/libraries/message/src/coordinator_to_daemon.rs index 69da8923..3e73aeae 100644 --- a/libraries/message/src/coordinator_to_daemon.rs +++ b/libraries/message/src/coordinator_to_daemon.rs @@ -51,6 +51,7 @@ pub enum DaemonCoordinatorEvent { Logs { dataflow_id: DataflowId, node_id: NodeId, + tail: usize, }, Destroy, Heartbeat, From c4ebe94e5942c66b8d92435551c5aed919314a8e Mon Sep 17 00:00:00 2001 From: mivik Date: Wed, 23 Jul 2025 10:20:41 +0800 Subject: [PATCH 2/4] feat(cli): add `-f` option to logs --- binaries/cli/src/command/logs.rs | 62 ++++++++++++++++++--- binaries/coordinator/src/lib.rs | 9 ++- libraries/message/src/coordinator_to_cli.rs | 5 +- 3 files changed, 66 insertions(+), 10 deletions(-) diff --git a/binaries/cli/src/command/logs.rs b/binaries/cli/src/command/logs.rs index 9f9da6b9..44118d33 100644 --- a/binaries/cli/src/command/logs.rs +++ b/binaries/cli/src/command/logs.rs @@ -1,11 +1,14 @@ -use std::io::Write; +use std::{ + io::Write, + net::{SocketAddr, TcpStream}, +}; use super::{default_tracing, Executable}; -use crate::common::{connect_to_coordinator, query_running_dataflows}; +use crate::{common::{connect_to_coordinator, query_running_dataflows}, output::print_log_message}; use clap::Args; -use communication_layer_request_reply::TcpRequestReplyConnection; +use communication_layer_request_reply::{TcpConnection, TcpRequestReplyConnection}; use dora_core::topics::{DORA_COORDINATOR_PORT_CONTROL_DEFAULT, LOCALHOST}; -use dora_message::{cli_to_coordinator::ControlRequest, coordinator_to_cli::ControlRequestReply}; +use dora_message::{cli_to_coordinator::ControlRequest, common::LogMessage, coordinator_to_cli::ControlRequestReply}; use eyre::{bail, Context, Result}; use uuid::Uuid; @@ -60,7 +63,15 @@ impl Executable for LogsArgs { (Some(uuid), None) } }; - logs(&mut *session, uuid, name, self.node, self.tail, self.follow) + logs( + &mut *session, + uuid, + name, + self.node, + self.tail, + self.follow, + (self.coordinator_addr, self.coordinator_port).into(), + ) } } @@ -71,8 +82,9 @@ pub fn logs( node: String, tail: usize, follow: bool, + coordinator_addr: SocketAddr, ) -> Result<()> { - let logs = { + let (uuid, logs) = { let reply_raw = session .request( &serde_json::to_vec(&ControlRequest::Logs { @@ -87,7 +99,7 @@ pub fn logs( let reply = serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; match reply { - ControlRequestReply::Logs(logs) => logs, + ControlRequestReply::Logs { uuid, data } => (uuid, data), other => bail!("unexpected reply to daemon logs: {other:?}"), } }; @@ -96,5 +108,41 @@ pub fn logs( .write_all(&logs) .expect("failed to write logs to stdout"); + if !follow { + return Ok(()); + } + let log_level = env_logger::Builder::new() + .filter_level(log::LevelFilter::Info) + .parse_default_env() + .build() + .filter(); + + // subscribe to log messages + let mut log_session = TcpConnection { + stream: TcpStream::connect(coordinator_addr) + .wrap_err("failed to connect to dora coordinator")?, + }; + log_session + .send( + &serde_json::to_vec(&ControlRequest::LogSubscribe { + dataflow_id: uuid, + level: log_level, + }) + .wrap_err("failed to serialize message")?, + ) + .wrap_err("failed to send log subscribe request to coordinator")?; + while let Ok(raw) = log_session.receive() { + let parsed: eyre::Result = + serde_json::from_slice(&raw).context("failed to parse log message"); + match parsed { + Ok(log_message) => { + print_log_message(log_message, false, false); + } + Err(err) => { + tracing::warn!("failed to parse log message: {err:?}") + } + } + } + Ok(()) } diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index d518db2b..77175235 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -625,7 +625,12 @@ async fn start_inner( let _ = reply_sender.send(Err(err)); } }, - ControlRequest::Logs { uuid, name, node, tail } => { + ControlRequest::Logs { + uuid, + name, + node, + tail, + } => { let dataflow_uuid = if let Some(uuid) = uuid { Ok(uuid) } else if let Some(name) = name { @@ -646,7 +651,7 @@ async fn start_inner( tail, ) .await - .map(ControlRequestReply::Logs); + .map(|data| ControlRequestReply::Logs { uuid, data }); let _ = reply_sender.send(reply); } Err(err) => { diff --git a/libraries/message/src/coordinator_to_cli.rs b/libraries/message/src/coordinator_to_cli.rs index 02243468..837835a2 100644 --- a/libraries/message/src/coordinator_to_cli.rs +++ b/libraries/message/src/coordinator_to_cli.rs @@ -36,7 +36,10 @@ pub enum ControlRequestReply { DestroyOk, DaemonConnected(bool), ConnectedDaemons(BTreeSet), - Logs(Vec), + Logs { + uuid: Uuid, + data: Vec, + }, CliAndDefaultDaemonIps { default_daemon: Option, cli: Option, From ed5b450546d8507b8c3f578c7022b337e331b7e2 Mon Sep 17 00:00:00 2001 From: mivik Date: Wed, 23 Jul 2025 15:09:42 +0800 Subject: [PATCH 3/4] style: fix format --- binaries/cli/src/command/logs.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/binaries/cli/src/command/logs.rs b/binaries/cli/src/command/logs.rs index 44118d33..54314117 100644 --- a/binaries/cli/src/command/logs.rs +++ b/binaries/cli/src/command/logs.rs @@ -4,11 +4,16 @@ use std::{ }; use super::{default_tracing, Executable}; -use crate::{common::{connect_to_coordinator, query_running_dataflows}, output::print_log_message}; +use crate::{ + common::{connect_to_coordinator, query_running_dataflows}, + output::print_log_message, +}; use clap::Args; use communication_layer_request_reply::{TcpConnection, TcpRequestReplyConnection}; use dora_core::topics::{DORA_COORDINATOR_PORT_CONTROL_DEFAULT, LOCALHOST}; -use dora_message::{cli_to_coordinator::ControlRequest, common::LogMessage, coordinator_to_cli::ControlRequestReply}; +use dora_message::{ + cli_to_coordinator::ControlRequest, common::LogMessage, coordinator_to_cli::ControlRequestReply, +}; use eyre::{bail, Context, Result}; use uuid::Uuid; From 1251ccb2977153c693fb08cda8a1e6620b78e5a0 Mon Sep 17 00:00:00 2001 From: mivik Date: Thu, 24 Jul 2025 10:56:33 +0800 Subject: [PATCH 4/4] fix(cli/log): read_last_n_lines short read --- binaries/daemon/src/lib.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index f03428d4..27e7b774 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -2193,15 +2193,12 @@ async fn read_last_n_lines(file: &mut File, mut tail: usize) -> io::Result