Browse Source

Minor refactoring

tags/v0.2.3-rc
haixuanTao 2 years ago
parent
commit
664e8d48ba
5 changed files with 15 additions and 12 deletions
  1. +1
    -1
      binaries/cli/src/logs.rs
  2. +3
    -3
      binaries/coordinator/src/lib.rs
  3. +5
    -1
      binaries/daemon/src/lib.rs
  4. +5
    -6
      binaries/daemon/src/spawn.rs
  5. +1
    -1
      libraries/core/src/topics.rs

+ 1
- 1
binaries/cli/src/logs.rs View File

@@ -25,7 +25,7 @@ pub fn logs(

let reply = serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match reply {
ControlRequestReply::Logs { logs } => logs,
ControlRequestReply::Logs(logs) => logs,
other => bail!("unexpected reply to daemon logs: {other:?}"),
}
};


+ 3
- 3
binaries/coordinator/src/lib.rs View File

@@ -399,7 +399,7 @@ async fn start_inner(
&mut daemon_connections,
)
.await
.map(|logs| ControlRequestReply::Logs { logs })
.map(|logs| ControlRequestReply::Logs(logs))
}
ControlRequest::Destroy => {
tracing::info!("Received destroy command");
@@ -692,7 +692,7 @@ async fn retrieve_logs(

let daemon_connection = daemon_connections
.get_mut(machine_id.as_str())
.wrap_err("no daemon connection")?; // TODO: take from dataflow spec
.wrap_err("no daemon connection")?;
tcp_send(&mut daemon_connection.stream, &message)
.await
.wrap_err("failed to send logs message to daemon")?;
@@ -705,7 +705,7 @@ async fn retrieve_logs(
.wrap_err("failed to deserialize logs reply from daemon")?
{
DaemonCoordinatorReply::Logs(logs) => logs,
other => bail!("unexpected reply after sending reload: {other:?}"),
other => bail!("unexpected reply after sending logs: {other:?}"),
};
tracing::info!("successfully retrieved logs for `{dataflow_id}/{node_id}`");



+ 5
- 1
binaries/daemon/src/lib.rs View File

@@ -383,7 +383,11 @@ impl Daemon {
RunStatus::Exit
}
DaemonCoordinatorEvent::Watchdog => {
let _ = reply_tx.send(Some(DaemonCoordinatorReply::WatchdogAck));
let _ = reply_tx
.send(Some(DaemonCoordinatorReply::WatchdogAck))
.map_err(|_| {
error!("could not send WatchdogAck reply from daemon to coordinator")
});
RunStatus::Continue
}
};


+ 5
- 6
binaries/daemon/src/spawn.rs View File

@@ -11,7 +11,7 @@ use dora_download::download_file;
use eyre::WrapErr;
use std::{
env::{consts::EXE_EXTENSION, temp_dir},
path::{Path, PathBuf},
path::Path,
process::Stdio,
};
use tokio::{
@@ -180,11 +180,10 @@ pub async fn spawn_node(
let log_dir = temp_dir();

let (tx, mut rx) = mpsc::channel(10);
let mut file = File::create(
&log_dir.join(PathBuf::from(log::log_path(&dataflow_id, &node_id)).with_extension("txt")),
)
.await
.expect("Failed to create log file");
let mut file =
File::create(&log_dir.join(log::log_path(&dataflow_id, &node_id).with_extension("txt")))
.await
.expect("Failed to create log file");
let mut stdout_lines =
(tokio::io::BufReader::new(child.stdout.take().expect("failed to take stdout"))).lines();



+ 1
- 1
libraries/core/src/topics.rs View File

@@ -64,7 +64,7 @@ pub enum ControlRequestReply {
DestroyOk,
DaemonConnected(bool),
ConnectedMachines(BTreeSet<String>),
Logs { logs: Vec<u8> },
Logs(Vec<u8>),
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]


Loading…
Cancel
Save