Browse Source

feat(cli): add --tail to logs, remove bat dependency

pull/1081/head
mivik 6 months ago
parent
commit
790afa0d07
No known key found for this signature in database GPG Key ID: E8BDA67F6781E57D
8 changed files with 119 additions and 266 deletions
  1. +9
    -234
      Cargo.lock
  2. +0
    -1
      binaries/cli/Cargo.toml
  3. +34
    -24
      binaries/cli/src/command/logs.rs
  4. +4
    -1
      binaries/coordinator/src/lib.rs
  5. +1
    -0
      binaries/daemon/Cargo.toml
  6. +69
    -6
      binaries/daemon/src/lib.rs
  7. +1
    -0
      libraries/message/src/cli_to_coordinator.rs
  8. +1
    -0
      libraries/message/src/coordinator_to_daemon.rs

+ 9
- 234
Cargo.lock View File

@@ -264,15 +264,6 @@ dependencies = [
"libc",
]

[[package]]
name = "ansi_colours"
version = "1.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14eec43e0298190790f41679fe69ef7a829d2a2ddd78c8c00339e84710e435fe"
dependencies = [
"rgb",
]

[[package]]
name = "ansi_term"
version = "0.12.1"
@@ -1264,43 +1255,6 @@ version = "1.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89e25b6adfb930f02d1981565a6e5d9c547ac15a96606256d3b59040e5cd4ca3"

[[package]]
name = "bat"
version = "0.24.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9dcc9e5637c2330d8eb7b920f2aa5d9e184446c258466f825ea1412c7614cc86"
dependencies = [
"ansi_colours",
"bincode",
"bugreport",
"bytesize",
"clap 4.5.32",
"clircle",
"console",
"content_inspector",
"encoding_rs",
"etcetera",
"flate2",
"git2",
"globset",
"grep-cli",
"home",
"nu-ansi-term 0.49.0",
"once_cell",
"path_abs",
"plist",
"regex",
"semver",
"serde",
"serde_yaml 0.9.34+deprecated",
"shell-words",
"syntect",
"thiserror 1.0.69",
"unicode-width 0.1.14",
"walkdir",
"wild",
]

[[package]]
name = "benchmark-example-node"
version = "0.3.12"
@@ -1475,17 +1429,6 @@ dependencies = [
"serde",
]

[[package]]
name = "bugreport"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f280f65ce85b880919349bbfcb204930291251eedcb2e5f84ce2f51df969c162"
dependencies = [
"git-version",
"shell-escape",
"sysinfo 0.33.1",
]

[[package]]
name = "built"
version = "0.7.3"
@@ -1554,12 +1497,6 @@ dependencies = [
"serde",
]

[[package]]
name = "bytesize"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d2c12f985c78475a6b8d629afd0c360260ef34cfef52efccdcfd31972f81c2e"

[[package]]
name = "cacache"
version = "13.1.0"
@@ -2005,18 +1942,6 @@ dependencies = [
"error-code",
]

[[package]]
name = "clircle"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c8e87cbed5354f17bd8ca8821a097fb62599787fe8f611743fad7ee156a0a600"
dependencies = [
"cfg-if 1.0.0",
"libc",
"serde",
"winapi 0.3.9",
]

[[package]]
name = "codespan-reporting"
version = "0.11.1"
@@ -2192,15 +2117,6 @@ dependencies = [
"const_soft_float",
]

[[package]]
name = "content_inspector"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b7bda66e858c683005a53a9a60c69a4aca7eeaa45d124526e389f7aec8e62f38"
dependencies = [
"memchr",
]

[[package]]
name = "convert_case"
version = "0.6.0"
@@ -2939,7 +2855,6 @@ dependencies = [
name = "dora-cli"
version = "0.3.12"
dependencies = [
"bat",
"clap 4.5.32",
"colored",
"communication-layer-request-reply",
@@ -3047,6 +2962,7 @@ dependencies = [
"futures-concurrency",
"git2",
"itertools 0.14.0",
"memchr",
"serde_json",
"serde_yaml 0.9.34+deprecated",
"shared-memory-server",
@@ -4037,17 +3953,6 @@ dependencies = [
"cc",
]

[[package]]
name = "etcetera"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "136d1b5283a1ab77bd9257427ffd09d8667ced0570b6f938942bc7568ed5b943"
dependencies = [
"cfg-if 1.0.0",
"home",
"windows-sys 0.48.0",
]

[[package]]
name = "event-listener"
version = "2.5.3"
@@ -4809,19 +4714,6 @@ version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8d1add55171497b4705a648c6b583acafb01d58050a51727785f0b2c8e0a2b2"

[[package]]
name = "globset"
version = "0.4.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "54a1028dfc5f5df5da8a56a73e6c153c9a9708ec57232470703592a3f18e49f5"
dependencies = [
"aho-corasick",
"bstr",
"log",
"regex-automata 0.4.9",
"regex-syntax 0.8.5",
]

[[package]]
name = "gloo-timers"
version = "0.2.6"
@@ -4990,20 +4882,6 @@ dependencies = [
"bitflags 2.9.0",
]

[[package]]
name = "grep-cli"
version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47f1288f0e06f279f84926fa4c17e3fcd2a22b357927a82f2777f7be26e4cec0"
dependencies = [
"bstr",
"globset",
"libc",
"log",
"termcolor",
"winapi-util",
]

[[package]]
name = "h2"
version = "0.3.26"
@@ -6603,9 +6481,9 @@ checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771"

[[package]]
name = "memchr"
version = "2.7.4"
version = "2.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3"
checksum = "32a282da65faaf38286cf3be983213fcf1d2e2a58700e808f83f4ea9a4804bc0"

[[package]]
name = "memmap2"
@@ -7395,15 +7273,6 @@ dependencies = [
"winapi 0.3.9",
]

[[package]]
name = "nu-ansi-term"
version = "0.49.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c073d3c1930d0751774acf49e66653acecb416c3a54c6ec095a9b11caddb5a68"
dependencies = [
"windows-sys 0.48.0",
]

[[package]]
name = "num"
version = "0.4.3"
@@ -7873,11 +7742,11 @@ checksum = "d75b0bedcc4fe52caa0e03d9f1151a323e4aa5e2d78ba3580400cd3c9e2bc4bc"

[[package]]
name = "onig"
version = "6.4.0"
version = "6.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c4b31c8722ad9171c6d77d3557db078cab2bd50afcc9d09c8b315c59df8ca4f"
checksum = "336b9c63443aceef14bea841b899035ae3abe89b7c486aaf4c5bd8aafedac3f0"
dependencies = [
"bitflags 1.3.2",
"bitflags 2.9.0",
"libc",
"once_cell",
"onig_sys",
@@ -7885,9 +7754,9 @@ dependencies = [

[[package]]
name = "onig_sys"
version = "69.8.1"
version = "69.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b829e3d7e9cc74c7e315ee8edb185bf4190da5acde74afd7fc59c35b1f086e7"
checksum = "c7f86c6eef3d6df15f23bcfb6af487cbd2fed4e5581d58d5bf1f5f8b7f6727dc"
dependencies = [
"cc",
"pkg-config",
@@ -8251,15 +8120,6 @@ version = "1.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a"

[[package]]
name = "path_abs"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05ef02f6342ac01d8a93b65f96db53fe68a92a15f41144f97fb00a9e669633c3"
dependencies = [
"std_prelude",
]

[[package]]
name = "pathdiff"
version = "0.2.3"
@@ -8518,19 +8378,6 @@ version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c"

[[package]]
name = "plist"
version = "1.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eac26e981c03a6e53e0aee43c113e3202f5581d5360dae7bd2c70e800dd0451d"
dependencies = [
"base64 0.22.1",
"indexmap 2.8.0",
"quick-xml 0.32.0",
"serde",
"time",
]

[[package]]
name = "ply-rs"
version = "0.1.3"
@@ -9068,15 +8915,6 @@ dependencies = [
"serde",
]

[[package]]
name = "quick-xml"
version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d3a6e5838b60e0e8fa7a43f22ade549a37d61f8bdbe636d0d7816191de969c2"
dependencies = [
"memchr",
]

[[package]]
name = "quick-xml"
version = "0.36.2"
@@ -12453,18 +12291,6 @@ dependencies = [
"win-sys",
]

[[package]]
name = "shell-escape"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "45bb67a18fa91266cc7807181f62f9178a6873bfad7dc788c42e6430db40184f"

[[package]]
name = "shell-words"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24188a676b6ae68c3b2cb3a01be17fbf7240ce009799bb56d5b1409051e78fde"

[[package]]
name = "shellexpand"
version = "2.1.2"
@@ -12911,12 +12737,6 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"

[[package]]
name = "std_prelude"
version = "0.2.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8207e78455ffdf55661170876f88daf85356e4edd54e0a3dbc79586ca1e50cbe"

[[package]]
name = "stop-token"
version = "0.7.0"
@@ -13088,28 +12908,6 @@ dependencies = [
"syn 2.0.101",
]

[[package]]
name = "syntect"
version = "5.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "874dcfa363995604333cf947ae9f751ca3af4522c60886774c4963943b4746b1"
dependencies = [
"bincode",
"bitflags 1.3.2",
"flate2",
"fnv",
"once_cell",
"onig",
"plist",
"regex-syntax 0.8.5",
"serde",
"serde_derive",
"serde_json",
"thiserror 1.0.69",
"walkdir",
"yaml-rust",
]

[[package]]
name = "sysctl"
version = "0.5.5"
@@ -13139,20 +12937,6 @@ dependencies = [
"windows 0.52.0",
]

[[package]]
name = "sysinfo"
version = "0.33.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fc858248ea01b66f19d8e8a6d55f41deaf91e9d495246fd01368d99935c6c01"
dependencies = [
"core-foundation-sys",
"libc",
"memchr",
"ntapi",
"rayon",
"windows 0.57.0",
]

[[package]]
name = "sysinfo"
version = "0.34.2"
@@ -13960,7 +13744,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008"
dependencies = [
"matchers",
"nu-ansi-term 0.46.0",
"nu-ansi-term",
"once_cell",
"regex",
"serde",
@@ -14934,15 +14718,6 @@ version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd7cf3379ca1aac9eea11fba24fd7e315d621f8dfe35c8d7d2be8b793726e07d"

[[package]]
name = "wild"
version = "2.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3131afc8c575281e1e80f36ed6a092aa502c08b18ed7524e86fbbb12bb410e1"
dependencies = [
"glob",
]

[[package]]
name = "win-sys"
version = "0.3.1"


+ 0
- 1
binaries/cli/Cargo.toml View File

@@ -40,7 +40,6 @@ ctrlc = "3.2.5"
tracing = "0.1.36"
tracing-log = "0.2.0"
dora-tracing = { workspace = true, optional = true }
bat = "0.24.0"
dora-daemon = { workspace = true }
dora-coordinator = { workspace = true }
dora-runtime = { workspace = true }


+ 34
- 24
binaries/cli/src/command/logs.rs View File

@@ -1,6 +1,7 @@
use std::io::Write;

use super::{default_tracing, Executable};
use crate::common::{connect_to_coordinator, query_running_dataflows};
use bat::{Input, PrettyPrinter};
use clap::Args;
use communication_layer_request_reply::TcpRequestReplyConnection;
use dora_core::topics::{DORA_COORDINATOR_PORT_CONTROL_DEFAULT, LOCALHOST};
@@ -17,6 +18,14 @@ pub struct LogsArgs {
/// Show logs for the given node
#[clap(value_name = "NAME")]
pub node: String,
/// Number of lines to show from the end of the logs
///
/// Default (`0`) is to show all lines.
#[clap(long, short = 'n', default_value_t = 0)]
pub tail: usize,
/// Follow log output
#[clap(long, short)]
pub follow: bool,
/// Address of the dora coordinator
#[clap(long, value_name = "IP", default_value_t = LOCALHOST)]
pub coordinator_addr: std::net::IpAddr,
@@ -34,19 +43,24 @@ impl Executable for LogsArgs {
.wrap_err("failed to connect to dora coordinator")?;
let list =
query_running_dataflows(&mut *session).wrap_err("failed to query running dataflows")?;
if let Some(dataflow) = self.dataflow {
let uuid = Uuid::parse_str(&dataflow).ok();
let name = if uuid.is_some() { None } else { Some(dataflow) };
logs(&mut *session, uuid, name, self.node)
} else {
let active = list.get_active();
let uuid = match &active[..] {
[] => bail!("No dataflows are running"),
[uuid] => uuid.clone(),
_ => inquire::Select::new("Choose dataflow to show logs:", active).prompt()?,
};
logs(&mut *session, Some(uuid.uuid), None, self.node)
}
let (uuid, name) = match self.dataflow.as_ref().map(|it| Uuid::parse_str(it)) {
Some(Ok(uuid)) => (Some(uuid), None),
Some(Err(_)) => (None, self.dataflow.clone()),
None => {
let active = list.get_active();
let uuid = match &active[..] {
[] => bail!("No dataflows are running"),
[uuid] => uuid.uuid,
_ => {
let uuid = inquire::Select::new("Choose dataflow to show logs:", active)
.prompt()?;
uuid.uuid
}
};
(Some(uuid), None)
}
};
logs(&mut *session, uuid, name, self.node, self.tail, self.follow)
}
}

@@ -55,6 +69,8 @@ pub fn logs(
uuid: Option<Uuid>,
name: Option<String>,
node: String,
tail: usize,
follow: bool,
) -> Result<()> {
let logs = {
let reply_raw = session
@@ -63,6 +79,7 @@ pub fn logs(
uuid,
name,
node: node.clone(),
tail,
})
.wrap_err("")?,
)
@@ -75,16 +92,9 @@ pub fn logs(
}
};

PrettyPrinter::new()
.header(false)
.grid(false)
.line_numbers(false)
.paging_mode(bat::PagingMode::QuitIfOneScreen)
.inputs(vec![Input::from_bytes(&logs)
.name("Logs")
.title(format!("Logs from {node}.").as_str())])
.print()
.wrap_err("Something went wrong with viewing log file")?;
std::io::stdout()
.write_all(&logs)
.expect("failed to write logs to stdout");

Ok(())
}

+ 4
- 1
binaries/coordinator/src/lib.rs View File

@@ -625,7 +625,7 @@ async fn start_inner(
let _ = reply_sender.send(Err(err));
}
},
ControlRequest::Logs { uuid, name, node } => {
ControlRequest::Logs { uuid, name, node, tail } => {
let dataflow_uuid = if let Some(uuid) = uuid {
Ok(uuid)
} else if let Some(name) = name {
@@ -643,6 +643,7 @@ async fn start_inner(
node.into(),
&mut daemon_connections,
clock.new_timestamp(),
tail,
)
.await
.map(ControlRequestReply::Logs);
@@ -1182,6 +1183,7 @@ async fn retrieve_logs(
node_id: NodeId,
daemon_connections: &mut DaemonConnections,
timestamp: uhlc::Timestamp,
tail: usize,
) -> eyre::Result<Vec<u8>> {
let nodes = if let Some(dataflow) = archived_dataflows.get(&dataflow_id) {
dataflow.nodes.clone()
@@ -1195,6 +1197,7 @@ async fn retrieve_logs(
inner: DaemonCoordinatorEvent::Logs {
dataflow_id,
node_id: node_id.clone(),
tail,
},
timestamp,
})?;


+ 1
- 0
binaries/daemon/Cargo.toml View File

@@ -49,3 +49,4 @@ url = "2.5.4"
git2 = { workspace = true }
dunce = "1.0.5"
itertools = "0.14"
memchr = "2.7.5"

+ 69
- 6
binaries/daemon/src/lib.rs View File

@@ -39,9 +39,10 @@ use shared_memory_server::ShmemConf;
use socket_stream_utils::socket_stream_send;
use spawn::Spawner;
use std::{
collections::{BTreeMap, BTreeSet, HashMap},
collections::{BTreeMap, BTreeSet, HashMap, VecDeque},
env::current_dir,
future::Future,
io,
net::SocketAddr,
path::{Path, PathBuf},
pin::pin,
@@ -51,7 +52,7 @@ use std::{
use sysinfo::Pid;
use tokio::{
fs::File,
io::AsyncReadExt,
io::{AsyncReadExt, AsyncSeekExt},
net::TcpStream,
sync::{
broadcast,
@@ -801,6 +802,7 @@ impl Daemon {
DaemonCoordinatorEvent::Logs {
dataflow_id,
node_id,
tail,
} => {
match self.working_dir.get(&dataflow_id) {
Some(working_dir) => {
@@ -815,10 +817,17 @@ impl Daemon {
log::log_path(&working_dir, &dataflow_id, &node_id)
))?;

let mut contents = vec![];
file.read_to_end(&mut contents)
.await
.wrap_err("Could not read content of log file")?;
let mut contents = if tail == 0 {
let mut contents = vec![];
file.read_to_end(&mut contents).await.map(|_| contents)
} else {
read_last_n_lines(&mut file, tail).await
}
.wrap_err("Could not read last n lines of log file")?;
if !contents.ends_with(b"\n") {
// Append newline for better readability
contents.push(b'\n');
}
Result::<Vec<u8>, eyre::Report>::Ok(contents)
}
.await
@@ -2166,6 +2175,60 @@ impl Daemon {
}
}

async fn read_last_n_lines(file: &mut File, mut tail: usize) -> io::Result<Vec<u8>> {
let mut pos = file.seek(io::SeekFrom::End(0)).await?;

let mut output = VecDeque::<u8>::new();
let mut extend_slice_to_start = |slice: &[u8]| {
output.extend(slice);
output.rotate_right(slice.len());
};

let mut buffer = vec![0; 2048];
let mut estimated_line_length = 0;
let mut at_end = true;
'main: while tail > 0 && pos > 0 {
let new_pos = pos.saturating_sub(buffer.len() as u64);
file.seek(io::SeekFrom::Start(new_pos)).await?;
let read_len = (pos - new_pos) as usize;
pos = new_pos;

let read = file.read(&mut buffer[..read_len]).await?;
if read < read_len {
break;
}
let read_buf = if at_end {
at_end = false;
&buffer[..read].trim_ascii_end()
} else {
&buffer[..read]
};

let mut iter = memchr::memrchr_iter(b'\n', read_buf);
let mut lines = 1;
loop {
let Some(pos) = iter.next() else {
extend_slice_to_start(read_buf);
break;
};
lines += 1;
tail -= 1;
if tail == 0 {
extend_slice_to_start(&read_buf[(pos + 1)..]);
break 'main;
}
}

estimated_line_length = estimated_line_length.max((read_buf.len() + 1).div_ceil(lines));
let estimated_buffer_length = estimated_line_length * tail;
if estimated_buffer_length >= buffer.len() * 2 {
buffer.resize(buffer.len() * 2, 0);
}
}

Ok(output.into())
}

async fn set_up_event_stream(
coordinator_addr: SocketAddr,
machine_id: &Option<String>,


+ 1
- 0
libraries/message/src/cli_to_coordinator.rs View File

@@ -67,6 +67,7 @@ pub enum ControlRequest {
uuid: Option<Uuid>,
name: Option<String>,
node: String,
tail: usize,
},
Destroy,
List,


+ 1
- 0
libraries/message/src/coordinator_to_daemon.rs View File

@@ -51,6 +51,7 @@ pub enum DaemonCoordinatorEvent {
Logs {
dataflow_id: DataflowId,
node_id: NodeId,
tail: usize,
},
Destroy,
Heartbeat,


Loading…
Cancel
Save