Browse Source

Add basic log forwarding from daemon to CLI

Forwarded by coordinator
tags/v0.3.5-rc0
Philipp Oppermann 1 year ago
parent
commit
2e0419b100
Failed to extract signature
16 changed files with 248 additions and 36 deletions
  1. +16
    -1
      Cargo.lock
  2. +2
    -0
      binaries/cli/Cargo.toml
  3. +70
    -8
      binaries/cli/src/attach.rs
  4. +3
    -1
      binaries/cli/src/main.rs
  5. +1
    -0
      binaries/coordinator/Cargo.toml
  6. +25
    -6
      binaries/coordinator/src/control.rs
  7. +30
    -1
      binaries/coordinator/src/lib.rs
  8. +6
    -0
      binaries/coordinator/src/listener.rs
  9. +23
    -0
      binaries/coordinator/src/log_subscriber.rs
  10. +34
    -3
      binaries/daemon/src/lib.rs
  11. +14
    -4
      binaries/daemon/src/pending.rs
  12. +0
    -8
      binaries/daemon/src/spawn.rs
  13. +4
    -4
      libraries/communication-layer/request-reply/src/tcp.rs
  14. +1
    -0
      libraries/core/Cargo.toml
  15. +15
    -0
      libraries/core/src/coordinator_messages.rs
  16. +4
    -0
      libraries/core/src/topics.rs

+ 16
- 1
Cargo.lock View File

@@ -1687,6 +1687,16 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b6a852b24ab71dffc585bcb46eaf7959d175cb865a7152e35b348d1b2960422"

[[package]]
name = "colored"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cbf2150cce219b664a8a70df7a1f933836724b503f8a413af9365b4dcc4d90b8"
dependencies = [
"lazy_static",
"windows-sys 0.48.0",
]

[[package]]
name = "com"
version = "0.6.0"
@@ -2251,6 +2261,7 @@ version = "0.3.4"
dependencies = [
"bat",
"clap 4.5.7",
"colored",
"communication-layer-request-reply",
"ctrlc",
"dora-coordinator",
@@ -2264,6 +2275,7 @@ dependencies = [
"eyre",
"futures",
"inquire",
"log",
"notify 5.2.0",
"serde",
"serde_json",
@@ -2286,6 +2298,7 @@ dependencies = [
"eyre",
"futures",
"futures-concurrency",
"log",
"names",
"serde_json",
"tokio",
@@ -2301,6 +2314,7 @@ dependencies = [
"aligned-vec",
"dora-message",
"eyre",
"log",
"once_cell",
"schemars",
"serde",
@@ -4690,6 +4704,7 @@ version = "0.4.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c"
dependencies = [
"serde",
"value-bag",
]

@@ -9605,7 +9620,7 @@ version = "1.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675"
dependencies = [
"cfg-if 0.1.10",
"cfg-if 1.0.0",
"static_assertions",
]



+ 2
- 0
binaries/cli/Cargo.toml View File

@@ -42,3 +42,5 @@ tokio = { version = "1.20.1", features = ["full"] }
tokio-stream = { version = "0.1.8", features = ["io-util", "net"] }
futures = "0.3.21"
duration-str = "0.5"
log = { version = "0.4.21", features = ["serde"] }
colored = "2.1.0"

+ 70
- 8
binaries/cli/src/attach.rs View File

@@ -1,12 +1,17 @@
use communication_layer_request_reply::TcpRequestReplyConnection;
use colored::Colorize;
use communication_layer_request_reply::{TcpConnection, TcpRequestReplyConnection};
use dora_core::{
coordinator_messages::LogMessage,
descriptor::{resolve_path, CoreNodeKind, Descriptor},
topics::{ControlRequest, ControlRequestReply},
};
use eyre::Context;
use notify::event::ModifyKind;
use notify::{Config, Event as NotifyEvent, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use std::collections::HashMap;
use std::{
collections::HashMap,
net::{SocketAddr, TcpStream},
};
use std::{path::PathBuf, sync::mpsc, time::Duration};
use tracing::{error, info};
use uuid::Uuid;
@@ -19,6 +24,7 @@ pub fn attach_dataflow(
dataflow_id: Uuid,
session: &mut TcpRequestReplyConnection,
hot_reload: bool,
coordinator_socket: SocketAddr,
) -> Result<(), eyre::ErrReport> {
let (tx, rx) = mpsc::sync_channel(2);

@@ -71,11 +77,11 @@ pub fn attach_dataflow(
for path in paths {
if let Some((dataflow_id, node_id, operator_id)) = node_path_lookup.get(&path) {
watcher_tx
.send(ControlRequest::Reload {
.send(AttachEvent::Control(ControlRequest::Reload {
dataflow_id: *dataflow_id,
node_id: node_id.clone(),
operator_id: operator_id.clone(),
})
}))
.context("Could not send reload request to the cli loop")
.unwrap();
}
@@ -98,17 +104,17 @@ pub fn attach_dataflow(
};

// Setup Ctrlc Watcher to stop dataflow after ctrlc
let ctrlc_tx = tx;
let ctrlc_tx = tx.clone();
let mut ctrlc_sent = false;
ctrlc::set_handler(move || {
if ctrlc_sent {
std::process::abort();
} else {
if ctrlc_tx
.send(ControlRequest::Stop {
.send(AttachEvent::Control(ControlRequest::Stop {
dataflow_uuid: dataflow_id,
grace_duration: None,
})
}))
.is_err()
{
// bail!("failed to report ctrl-c event to dora-daemon");
@@ -118,12 +124,63 @@ pub fn attach_dataflow(
})
.wrap_err("failed to set ctrl-c handler")?;

// subscribe to log messages
let mut log_session = TcpConnection {
stream: TcpStream::connect(coordinator_socket)
.wrap_err("failed to connect to dora coordinator")?,
};
let level = log::Level::Warn;
log_session
.send(
&serde_json::to_vec(&ControlRequest::LogSubscribe { dataflow_id, level })
.wrap_err("failed to serialize message")?,
)
.wrap_err("failed to send log subscribe request to coordinator")?;
std::thread::spawn(move || {
while let Ok(raw) = log_session.receive() {
let parsed: eyre::Result<LogMessage> =
serde_json::from_slice(&raw).context("failed to parse log message");
if tx.send(AttachEvent::Log(parsed)).is_err() {
break;
}
}
});

loop {
let control_request = match rx.recv_timeout(Duration::from_secs(1)) {
Err(_err) => ControlRequest::Check {
dataflow_uuid: dataflow_id,
},
Ok(reload_event) => reload_event,
Ok(AttachEvent::Control(control_request)) => control_request,
Ok(AttachEvent::Log(Ok(log_message))) => {
let LogMessage {
dataflow_id,
node_id,
level,
target,
module_path,
file,
line,
message,
} = log_message;
let level = match level {
log::Level::Error => "ERROR".red(),
log::Level::Warn => "WARN ".yellow(),
other => format!("{other:5}").normal(),
};
let target = target.dimmed();
let node = match node_id {
Some(node_id) => format!("{node_id} ").normal(),
None => "".normal(),
};

println!("{level} {node}{target}: {message}");
continue;
}
Ok(AttachEvent::Log(Err(err))) => {
tracing::warn!("failed to parse log message: {:#?}", err);
continue;
}
};

let reply_raw = session
@@ -144,3 +201,8 @@ pub fn attach_dataflow(
};
}
}

enum AttachEvent {
Control(ControlRequest),
Log(eyre::Result<LogMessage>),
}

+ 3
- 1
binaries/cli/src/main.rs View File

@@ -361,7 +361,8 @@ fn run() -> eyre::Result<()> {
.wrap_err("Could not validate yaml")?;
}

let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into())
let coordinator_socket = (coordinator_addr, coordinator_port).into();
let mut session = connect_to_coordinator(coordinator_socket)
.wrap_err("failed to connect to dora coordinator")?;
let dataflow_id = start_dataflow(
dataflow_descriptor.clone(),
@@ -377,6 +378,7 @@ fn run() -> eyre::Result<()> {
dataflow_id,
&mut *session,
hot_reload,
coordinator_socket,
)?
}
}


+ 1
- 0
binaries/coordinator/Cargo.toml View File

@@ -25,3 +25,4 @@ futures-concurrency = "7.1.0"
serde_json = "1.0.86"
names = "0.14.0"
ctrlc = "3.2.5"
log = { version = "0.4.21", features = ["serde"] }

+ 25
- 6
binaries/coordinator/src/control.rs View File

@@ -17,6 +17,7 @@ use tokio::{
task::JoinHandle,
};
use tokio_stream::wrappers::ReceiverStream;
use uuid::Uuid;

pub(crate) async fn control_events(
control_listen_addr: SocketAddr,
@@ -99,14 +100,27 @@ async fn handle_requests(
},
};

let result =
match serde_json::from_slice(&raw).wrap_err("failed to deserialize incoming message") {
Ok(request) => handle_request(request, &tx).await,
Err(err) => Err(err),
};
let request =
serde_json::from_slice(&raw).wrap_err("failed to deserialize incoming message");

if let Ok(ControlRequest::LogSubscribe { dataflow_id, level }) = request {
let _ = tx
.send(ControlEvent::LogSubscribe {
dataflow_id,
level,
connection,
})
.await;
break;
}

let result = match request {
Ok(request) => handle_request(request, &tx).await,
Err(err) => Err(err),
};

let reply = result.unwrap_or_else(|err| ControlRequestReply::Error(format!("{err}")));
let serialized =
let serialized: Vec<u8> =
match serde_json::to_vec(&reply).wrap_err("failed to serialize ControlRequestReply") {
Ok(s) => s,
Err(err) => {
@@ -160,6 +174,11 @@ pub enum ControlEvent {
request: ControlRequest,
reply_sender: oneshot::Sender<eyre::Result<ControlRequestReply>>,
},
LogSubscribe {
dataflow_id: Uuid,
level: log::Level,
connection: TcpStream,
},
Error(eyre::Report),
}



+ 30
- 1
binaries/coordinator/src/lib.rs View File

@@ -5,7 +5,7 @@ use crate::{
pub use control::ControlEvent;
use dora_core::{
config::{NodeId, OperatorId},
coordinator_messages::RegisterResult,
coordinator_messages::{LogMessage, RegisterResult},
daemon_messages::{DaemonCoordinatorEvent, DaemonCoordinatorReply, Timestamped},
descriptor::{Descriptor, ResolvedNode},
message::uhlc::{self, HLC},
@@ -16,6 +16,7 @@ use dora_core::{
use eyre::{bail, eyre, ContextCompat, WrapErr};
use futures::{stream::FuturesUnordered, Future, Stream, StreamExt};
use futures_concurrency::stream::Merge;
use log_subscriber::LogSubscriber;
use run::SpawnedDataflow;
use std::{
collections::{BTreeMap, BTreeSet, HashMap},
@@ -30,6 +31,7 @@ use uuid::Uuid;

mod control;
mod listener;
mod log_subscriber;
mod run;
mod tcp_utils;

@@ -488,9 +490,25 @@ async fn start_inner(
));
let _ = reply_sender.send(reply);
}
ControlRequest::LogSubscribe { .. } => {
let _ = reply_sender.send(Err(eyre::eyre!(
"LogSubscribe request should be handled separately"
)));
}
}
}
ControlEvent::Error(err) => tracing::error!("{err:?}"),
ControlEvent::LogSubscribe {
dataflow_id,
level,
connection,
} => {
if let Some(dataflow) = running_dataflows.get_mut(&dataflow_id) {
dataflow
.log_subscribers
.push(LogSubscriber::new(level, connection));
}
}
},
Event::DaemonHeartbeatInterval => {
let mut disconnected = BTreeSet::new();
@@ -543,6 +561,13 @@ async fn start_inner(
connection.last_heartbeat = Instant::now();
}
}
Event::Log(message) => {
if let Some(dataflow) = running_dataflows.get_mut(&message.dataflow_id) {
for subscriber in &mut dataflow.log_subscribers {
subscriber.send_message(&message).await?;
}
}
}
}
}

@@ -669,6 +694,8 @@ struct RunningDataflow {
nodes: Vec<ResolvedNode>,

reply_senders: Vec<tokio::sync::oneshot::Sender<eyre::Result<ControlRequestReply>>>,

log_subscribers: Vec<LogSubscriber>,
}

struct ArchivedDataflow {
@@ -868,6 +895,7 @@ async fn start_dataflow(
machines,
nodes,
reply_senders: Vec::new(),
log_subscribers: Vec::new(),
})
}

@@ -914,6 +942,7 @@ pub enum Event {
Daemon(DaemonEvent),
DaemonHeartbeatInterval,
CtrlC,
Log(LogMessage),
}

impl Event {


+ 6
- 0
binaries/coordinator/src/listener.rs View File

@@ -97,6 +97,12 @@ pub async fn handle_connection(
break;
}
}
coordinator_messages::DaemonEvent::Log(message) => {
let event = Event::Log(message);
if events_tx.send(event).await.is_err() {
break;
}
}
},
};
}


+ 23
- 0
binaries/coordinator/src/log_subscriber.rs View File

@@ -0,0 +1,23 @@
use dora_core::coordinator_messages::LogMessage;

use crate::tcp_utils::tcp_send;

pub struct LogSubscriber {
pub level: log::Level,
connection: tokio::net::TcpStream,
}

impl LogSubscriber {
pub fn new(level: log::Level, connection: tokio::net::TcpStream) -> Self {
Self { level, connection }
}

pub async fn send_message(&mut self, message: &LogMessage) -> eyre::Result<()> {
if message.level > self.level {
return Ok(());
}
let message = serde_json::to_vec(&message)?;
tcp_send(&mut self.connection, &message).await?;
Ok(())
}
}

+ 34
- 3
binaries/daemon/src/lib.rs View File

@@ -2,7 +2,7 @@ use aligned_vec::{AVec, ConstAlign};
use coordinator::CoordinatorEvent;
use crossbeam::queue::ArrayQueue;
use dora_core::config::{Input, OperatorId};
use dora_core::coordinator_messages::CoordinatorRequest;
use dora_core::coordinator_messages::{CoordinatorRequest, LogMessage};
use dora_core::daemon_messages::{
DataMessage, DynamicNodeEvent, InterDaemonEvent, NodeConfig, Timestamped,
};
@@ -332,6 +332,26 @@ impl Daemon {
Ok(self.dataflow_node_results)
}

async fn send_log_message(&mut self, message: LogMessage) -> eyre::Result<()> {
if let Some(connection) = &mut self.coordinator_connection {
let msg = serde_json::to_vec(&Timestamped {
inner: CoordinatorRequest::Event {
machine_id: self.machine_id.clone(),
event: DaemonEvent::Log(message),
},
timestamp: self.clock.new_timestamp(),
})?;
tcp_send(connection, &msg)
.await
.wrap_err("failed to send watchdog message to dora-coordinator")?;

if self.last_coordinator_heartbeat.elapsed() > Duration::from_secs(20) {
bail!("lost connection to coordinator")
}
}
Ok(())
}

async fn handle_coordinator_event(
&mut self,
event: DaemonCoordinatorEvent,
@@ -577,6 +597,7 @@ impl Daemon {
}
};

let mut log_messages = Vec::new();
for node in nodes {
let local = node.deploy.machine == self.machine_id;

@@ -640,7 +661,7 @@ impl Daemon {
}
Err(err) => {
tracing::error!("{err:?}");
dataflow
let messages = dataflow
.pending_nodes
.handle_node_stop(
&node_id,
@@ -649,6 +670,7 @@ impl Daemon {
&mut dataflow.cascading_error_causes,
)
.await?;
log_messages.extend(messages);
}
}
} else {
@@ -656,6 +678,10 @@ impl Daemon {
}
}

for log_message in log_messages {
self.send_log_message(log_message).await?;
}

Ok(())
}

@@ -1006,7 +1032,7 @@ impl Daemon {
format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`")
})?;

dataflow
let log_messages = dataflow
.pending_nodes
.handle_node_stop(
node_id,
@@ -1060,6 +1086,11 @@ impl Daemon {
}
self.running.remove(&dataflow_id);
}

for log_message in log_messages {
self.send_log_message(log_message).await?;
}

Ok(())
}



+ 14
- 4
binaries/daemon/src/pending.rs View File

@@ -2,7 +2,7 @@ use std::collections::{HashMap, HashSet};

use dora_core::{
config::NodeId,
coordinator_messages::{CoordinatorRequest, DaemonEvent},
coordinator_messages::{CoordinatorRequest, DaemonEvent, Level, LogMessage},
daemon_messages::{DaemonReply, DataflowId, Timestamped},
message::uhlc::{Timestamp, HLC},
};
@@ -77,14 +77,24 @@ impl PendingNodes {
coordinator_connection: &mut Option<TcpStream>,
clock: &HLC,
cascading_errors: &mut CascadingErrorCauses,
) -> eyre::Result<()> {
) -> eyre::Result<Vec<LogMessage>> {
let mut log = Vec::new();
if self.local_nodes.remove(node_id) {
tracing::warn!("node `{node_id}` exited before initializing dora connection");
log.push(LogMessage {
dataflow_id: self.dataflow_id,
node_id: Some(node_id.clone()),
level: Level::Warn,
target: "exit".into(),
module_path: None,
file: None,
line: None,
message: "node exited before initializing dora connection".into(),
});
self.exited_before_subscribe.push(node_id.clone());
self.update_dataflow_status(coordinator_connection, clock, cascading_errors)
.await?;
}
Ok(())
Ok(log)
}

pub async fn handle_external_all_nodes_ready(


+ 0
- 8
binaries/daemon/src/spawn.rs View File

@@ -376,14 +376,6 @@ pub async fn spawn_node(

node_stderr_most_recent.force_push(new);

if buffer.starts_with("Traceback (most recent call last):") {
if !finished {
continue;
} else {
tracing::error!("{dataflow_id}/{}: \n{buffer}", node_id);
}
}

// send the buffered lines
let lines = std::mem::take(&mut buffer);
let sent = stderr_tx.send(lines.clone()).await;


+ 4
- 4
libraries/communication-layer/request-reply/src/tcp.rs View File

@@ -94,8 +94,8 @@ impl RequestReplyLayer for TcpLayer {
}
}

struct TcpConnection {
stream: TcpStream,
pub struct TcpConnection {
pub stream: TcpStream,
}

impl ListenConnection for TcpConnection {
@@ -128,14 +128,14 @@ impl RequestReplyConnection for TcpConnection {
}

impl TcpConnection {
fn send(&mut self, request: &[u8]) -> std::io::Result<()> {
pub fn send(&mut self, request: &[u8]) -> std::io::Result<()> {
let len_raw = (request.len() as u64).to_le_bytes();
self.stream.write_all(&len_raw)?;
self.stream.write_all(request)?;
Ok(())
}

fn receive(&mut self) -> std::io::Result<Vec<u8>> {
pub fn receive(&mut self) -> std::io::Result<Vec<u8>> {
let reply_len = {
let mut raw = [0; 8];
self.stream.read_exact(&mut raw)?;


+ 1
- 0
libraries/core/Cargo.toml View File

@@ -22,3 +22,4 @@ tokio = { version = "1.24.1", features = ["fs", "process", "sync"] }
aligned-vec = { version = "0.5.0", features = ["serde"] }
schemars = "0.8.19"
serde_json = "1.0.117"
log = { version = "0.4.21", features = ["serde"] }

+ 15
- 0
libraries/core/src/coordinator_messages.rs View File

@@ -1,5 +1,6 @@
use crate::{config::NodeId, daemon_messages::DataflowId, topics::DataflowDaemonResult};
use eyre::eyre;
pub use log::Level;

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub enum CoordinatorRequest {
@@ -14,6 +15,19 @@ pub enum CoordinatorRequest {
},
}

#[derive(Debug, serde::Serialize, serde::Deserialize)]
#[must_use]
pub struct LogMessage {
pub dataflow_id: DataflowId,
pub node_id: Option<NodeId>,
pub level: log::Level,
pub target: String,
pub module_path: Option<String>,
pub file: Option<String>,
pub line: Option<u32>,
pub message: String,
}

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub enum DaemonEvent {
AllNodesReady {
@@ -25,6 +39,7 @@ pub enum DaemonEvent {
result: DataflowDaemonResult,
},
Heartbeat,
Log(LogMessage),
}

#[derive(Debug, serde::Serialize, serde::Deserialize)]


+ 4
- 0
libraries/core/src/topics.rs View File

@@ -55,6 +55,10 @@ pub enum ControlRequest {
List,
DaemonConnected,
ConnectedMachines,
LogSubscribe {
dataflow_id: Uuid,
level: log::Level,
},
}

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]


Loading…
Cancel
Save