| @@ -1770,7 +1770,7 @@ checksum = "8a3de6e8d11b22ff9edc6d916f890800597d60f8b2da1caf2955c274638d6412" | |||||
| dependencies = [ | dependencies = [ | ||||
| "cfg-if", | "cfg-if", | ||||
| "libc", | "libc", | ||||
| "redox_syscall", | |||||
| "redox_syscall 0.2.12", | |||||
| "windows-sys 0.45.0", | "windows-sys 0.45.0", | ||||
| ] | ] | ||||
| @@ -3303,7 +3303,7 @@ checksum = "28141e0cc4143da2443301914478dc976a61ffdb3f043058310c70df2fed8954" | |||||
| dependencies = [ | dependencies = [ | ||||
| "cfg-if", | "cfg-if", | ||||
| "libc", | "libc", | ||||
| "redox_syscall", | |||||
| "redox_syscall 0.2.12", | |||||
| "smallvec", | "smallvec", | ||||
| "windows-sys 0.32.0", | "windows-sys 0.32.0", | ||||
| ] | ] | ||||
| @@ -3896,6 +3896,15 @@ dependencies = [ | |||||
| "bitflags 1.3.2", | "bitflags 1.3.2", | ||||
| ] | ] | ||||
| [[package]] | |||||
| name = "redox_syscall" | |||||
| version = "0.3.5" | |||||
| source = "registry+https://github.com/rust-lang/crates.io-index" | |||||
| checksum = "567664f262709473930a4bf9e51bf2ebf3348f2e748ccc50dea20646858f8f29" | |||||
| dependencies = [ | |||||
| "bitflags 1.3.2", | |||||
| ] | |||||
| [[package]] | [[package]] | ||||
| name = "redox_users" | name = "redox_users" | ||||
| version = "0.4.2" | version = "0.4.2" | ||||
| @@ -3903,7 +3912,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" | |||||
| checksum = "7776223e2696f1aa4c6b0170e83212f47296a00424305117d013dfe86fb0fe55" | checksum = "7776223e2696f1aa4c6b0170e83212f47296a00424305117d013dfe86fb0fe55" | ||||
| dependencies = [ | dependencies = [ | ||||
| "getrandom", | "getrandom", | ||||
| "redox_syscall", | |||||
| "redox_syscall 0.2.12", | |||||
| "thiserror", | "thiserror", | ||||
| ] | ] | ||||
| @@ -4597,15 +4606,15 @@ checksum = "c02424087780c9b71cc96799eaeddff35af2bc513278cda5c99fc1f5d026d3c1" | |||||
| [[package]] | [[package]] | ||||
| name = "tempfile" | name = "tempfile" | ||||
| version = "3.4.0" | |||||
| version = "3.5.0" | |||||
| source = "registry+https://github.com/rust-lang/crates.io-index" | source = "registry+https://github.com/rust-lang/crates.io-index" | ||||
| checksum = "af18f7ae1acd354b992402e9ec5864359d693cd8a79dcbef59f76891701c1e95" | |||||
| checksum = "b9fbec84f381d5795b08656e4912bec604d162bff9291d6189a78f4c8ab87998" | |||||
| dependencies = [ | dependencies = [ | ||||
| "cfg-if", | "cfg-if", | ||||
| "fastrand", | "fastrand", | ||||
| "redox_syscall", | |||||
| "rustix 0.36.9", | |||||
| "windows-sys 0.42.0", | |||||
| "redox_syscall 0.3.5", | |||||
| "rustix 0.37.3", | |||||
| "windows-sys 0.45.0", | |||||
| ] | ] | ||||
| [[package]] | [[package]] | ||||
| @@ -10,17 +10,20 @@ pub fn logs(uuid: Option<Uuid>, name: Option<String>, node: String) -> Result<() | |||||
| let connection = control_connection(&mut control_session)?; | let connection = control_connection(&mut control_session)?; | ||||
| let logs = { | let logs = { | ||||
| let reply_raw = connection | let reply_raw = connection | ||||
| .request(&serde_json::to_vec(&ControlRequest::Logs { | |||||
| uuid, | |||||
| name, | |||||
| node: node.clone(), | |||||
| })?) | |||||
| .wrap_err("failed to send DaemonConnected message")?; | |||||
| .request( | |||||
| &serde_json::to_vec(&ControlRequest::Logs { | |||||
| uuid, | |||||
| name, | |||||
| node: node.clone(), | |||||
| }) | |||||
| .wrap_err("")?, | |||||
| ) | |||||
| .wrap_err("failed to send Logs request message")?; | |||||
| let reply = serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; | let reply = serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; | ||||
| match reply { | match reply { | ||||
| ControlRequestReply::Logs { logs } => logs, | ControlRequestReply::Logs { logs } => logs, | ||||
| other => bail!("unexpected reply to daemon connection check: {other:?}"), | |||||
| other => bail!("unexpected reply to daemon logs: {other:?}"), | |||||
| } | } | ||||
| }; | }; | ||||
| @@ -28,12 +31,12 @@ pub fn logs(uuid: Option<Uuid>, name: Option<String>, node: String) -> Result<() | |||||
| .header(true) | .header(true) | ||||
| .grid(true) | .grid(true) | ||||
| .line_numbers(true) | .line_numbers(true) | ||||
| .paging_mode(bat::PagingMode::Always) | |||||
| .paging_mode(bat::PagingMode::QuitIfOneScreen) | |||||
| .inputs(vec![Input::from_bytes(&logs) | .inputs(vec![Input::from_bytes(&logs) | ||||
| .name("Logs") // TODO: Make a better name | |||||
| .name("Logs") | |||||
| .title(format!("Logs from {node}.").as_str())]) | .title(format!("Logs from {node}.").as_str())]) | ||||
| .print() | .print() | ||||
| .unwrap(); | |||||
| .wrap_err("Something went wrong with viewing log file")?; | |||||
| Ok(()) | Ok(()) | ||||
| } | } | ||||
| @@ -621,29 +621,30 @@ async fn retrieve_logs( | |||||
| }; | }; | ||||
| let message = serde_json::to_vec(&DaemonCoordinatorEvent::Logs { | let message = serde_json::to_vec(&DaemonCoordinatorEvent::Logs { | ||||
| dataflow_id, | dataflow_id, | ||||
| node_id, | |||||
| node_id: node_id.clone(), | |||||
| })?; | })?; | ||||
| let mut reply_logs = Vec::new(); | let mut reply_logs = Vec::new(); | ||||
| for machine_id in &dataflow.machines { | for machine_id in &dataflow.machines { | ||||
| let daemon_connection = daemon_connections | let daemon_connection = daemon_connections | ||||
| .get_mut(machine_id) | .get_mut(machine_id) | ||||
| .wrap_err("no daemon connection")?; // TODO: take from dataflow spec | .wrap_err("no daemon connection")?; // TODO: take from dataflow spec | ||||
| tcp_send(daemon_connection, &message) | tcp_send(daemon_connection, &message) | ||||
| .await | .await | ||||
| .wrap_err("failed to send reload message to daemon")?; | |||||
| .wrap_err("failed to send logs message to daemon")?; | |||||
| // wait for reply | // wait for reply | ||||
| let reply_raw = tcp_receive(daemon_connection) | let reply_raw = tcp_receive(daemon_connection) | ||||
| .await | .await | ||||
| .wrap_err("failed to receive reload reply from daemon")?; | |||||
| .wrap_err("failed to retrieve logs reply from daemon")?; | |||||
| match serde_json::from_slice(&reply_raw) | match serde_json::from_slice(&reply_raw) | ||||
| .wrap_err("failed to deserialize reload reply from daemon")? | |||||
| .wrap_err("failed to deserialize logs reply from daemon")? | |||||
| { | { | ||||
| DaemonCoordinatorReply::Logs { logs } => reply_logs = logs, | DaemonCoordinatorReply::Logs { logs } => reply_logs = logs, | ||||
| other => bail!("unexpected reply after sending reload: {other:?}"), | other => bail!("unexpected reply after sending reload: {other:?}"), | ||||
| } | } | ||||
| } | } | ||||
| tracing::info!("successfully reloaded dataflow `{dataflow_id}`"); | |||||
| tracing::info!("successfully retrieved logs for `{dataflow_id}/{node_id}`"); | |||||
| Ok(reply_logs) | Ok(reply_logs) | ||||
| } | } | ||||