From 664e8d48bab6ca032af573f9404d054c7de90c67 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Tue, 2 May 2023 22:49:55 +0800 Subject: [PATCH] Minor refactoring --- binaries/cli/src/logs.rs | 2 +- binaries/coordinator/src/lib.rs | 6 +++--- binaries/daemon/src/lib.rs | 6 +++++- binaries/daemon/src/spawn.rs | 11 +++++------ libraries/core/src/topics.rs | 2 +- 5 files changed, 15 insertions(+), 12 deletions(-) diff --git a/binaries/cli/src/logs.rs b/binaries/cli/src/logs.rs index 5f879615..20230a94 100644 --- a/binaries/cli/src/logs.rs +++ b/binaries/cli/src/logs.rs @@ -25,7 +25,7 @@ pub fn logs( let reply = serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; match reply { - ControlRequestReply::Logs { logs } => logs, + ControlRequestReply::Logs(logs) => logs, other => bail!("unexpected reply to daemon logs: {other:?}"), } }; diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 436239cd..95f70db3 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -399,7 +399,7 @@ async fn start_inner( &mut daemon_connections, ) .await - .map(|logs| ControlRequestReply::Logs { logs }) + .map(|logs| ControlRequestReply::Logs(logs)) } ControlRequest::Destroy => { tracing::info!("Received destroy command"); @@ -692,7 +692,7 @@ async fn retrieve_logs( let daemon_connection = daemon_connections .get_mut(machine_id.as_str()) - .wrap_err("no daemon connection")?; // TODO: take from dataflow spec + .wrap_err("no daemon connection")?; tcp_send(&mut daemon_connection.stream, &message) .await .wrap_err("failed to send logs message to daemon")?; @@ -705,7 +705,7 @@ async fn retrieve_logs( .wrap_err("failed to deserialize logs reply from daemon")? { DaemonCoordinatorReply::Logs(logs) => logs, - other => bail!("unexpected reply after sending reload: {other:?}"), + other => bail!("unexpected reply after sending logs: {other:?}"), }; tracing::info!("successfully retrieved logs for `{dataflow_id}/{node_id}`"); diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 512ee9cc..7a774137 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -383,7 +383,11 @@ impl Daemon { RunStatus::Exit } DaemonCoordinatorEvent::Watchdog => { - let _ = reply_tx.send(Some(DaemonCoordinatorReply::WatchdogAck)); + let _ = reply_tx + .send(Some(DaemonCoordinatorReply::WatchdogAck)) + .map_err(|_| { + error!("could not send WatchdogAck reply from daemon to coordinator") + }); RunStatus::Continue } }; diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index 40952a74..91853aab 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -11,7 +11,7 @@ use dora_download::download_file; use eyre::WrapErr; use std::{ env::{consts::EXE_EXTENSION, temp_dir}, - path::{Path, PathBuf}, + path::Path, process::Stdio, }; use tokio::{ @@ -180,11 +180,10 @@ pub async fn spawn_node( let log_dir = temp_dir(); let (tx, mut rx) = mpsc::channel(10); - let mut file = File::create( - &log_dir.join(PathBuf::from(log::log_path(&dataflow_id, &node_id)).with_extension("txt")), - ) - .await - .expect("Failed to create log file"); + let mut file = + File::create(&log_dir.join(log::log_path(&dataflow_id, &node_id).with_extension("txt"))) + .await + .expect("Failed to create log file"); let mut stdout_lines = (tokio::io::BufReader::new(child.stdout.take().expect("failed to take stdout"))).lines(); diff --git a/libraries/core/src/topics.rs b/libraries/core/src/topics.rs index 542c55bc..a77318c0 100644 --- a/libraries/core/src/topics.rs +++ b/libraries/core/src/topics.rs @@ -64,7 +64,7 @@ pub enum ControlRequestReply { DestroyOk, DaemonConnected(bool), ConnectedMachines(BTreeSet), - Logs { logs: Vec }, + Logs(Vec), } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]