Browse Source

Refactor spawning of log thread to avoid deadlock

tags/v0.2.3-rc
haixuanTao 2 years ago
parent
commit
29d7ed728e
1 changed files with 61 additions and 68 deletions
  1. +61
    -68
      binaries/daemon/src/spawn.rs

+ 61
- 68
binaries/daemon/src/spawn.rs View File

@@ -19,7 +19,7 @@ use tokio::{
io::{AsyncBufReadExt, AsyncWriteExt},
sync::mpsc,
};
use tracing::info;
use tracing::debug;

pub async fn spawn_node(
dataflow_id: DataflowId,
@@ -173,78 +173,71 @@ pub async fn spawn_node(
}
};

let log_dir = temp_dir();

let (tx, mut rx) = mpsc::channel(10);
let mut file = File::create(
&log_dir.join(PathBuf::from(format!("{dataflow_id}-{node_id}.txt")).with_extension("txt")),
)
.await
.expect("Failed to create log file");

let mut stdout_lines =
(tokio::io::BufReader::new(child.stdout.take().expect("failed to take stdout"))).lines();

let stdout_tx = tx.clone();

// Stdout listener stream
tokio::spawn(async move {
let log_dir = temp_dir();

let (tx, mut rx) = mpsc::channel(10);
let mut file = File::create(
&log_dir
.join(PathBuf::from(format!("{dataflow_id}-{node_id}.txt")).with_extension("txt")),
)
.await
.expect("Failed to create log file");

let mut stdout_lines =
(tokio::io::BufReader::new(child.stdout.take().expect("failed to take stdout")))
.lines();

let stdout_tx = tx.clone();

// Stdout listener stream
tokio::spawn(async move {
while let Ok(Some(line)) = stdout_lines.next_line().await {
let sent = stdout_tx.send(Some(line)).await;
if sent.is_err() {
break;
}
while let Ok(Some(line)) = stdout_lines.next_line().await {
let sent = stdout_tx.send(Some(line)).await;
if sent.is_err() {
break;
}
});

let mut stderr_lines =
(tokio::io::BufReader::new(child.stderr.take().expect("failed to take stderr")))
.lines();

// Stderr listener stream
let stderr_tx = tx.clone();
tokio::spawn(async move {
while let Ok(Some(line)) = stderr_lines.next_line().await {
let sent = stderr_tx.send(Some(line)).await;
if sent.is_err() {
break;
}
}
});
}
});

let exit_status_tx = tx.clone();
tokio::spawn(async move {
let exit_status = NodeExitStatus::from(child.wait().await);
let event = DoraEvent::SpawnedNodeResult {
dataflow_id,
node_id,
exit_status,
};
let mut stderr_lines =
(tokio::io::BufReader::new(child.stderr.take().expect("failed to take stderr"))).lines();

let _ = daemon_tx.send(event.into()).await;
exit_status_tx.send(None).await.unwrap();
});

// Log to file stream.
tokio::spawn(async move {
while let Some(Some(line)) = rx.recv().await {
file.write_all(line.as_bytes())
.await
.expect("Could not log stdout/stderr to file");
file.write_all(b"\n")
.await
.expect("Could not add newline to log file.");
info!(line);

// Make sure that all data has been synced to disk.
file.sync_all().await.unwrap();
// Stderr listener stream
let stderr_tx = tx.clone();
tokio::spawn(async move {
while let Ok(Some(line)) = stderr_lines.next_line().await {
let sent = stderr_tx.send(Some(line)).await;
if sent.is_err() {
break;
}
})
.await
.expect("Could not write logs to file");
}
});

let exit_status_tx = tx.clone();
tokio::spawn(async move {
let exit_status = NodeExitStatus::from(child.wait().await);
let event = DoraEvent::SpawnedNodeResult {
dataflow_id,
node_id,
exit_status,
};

exit_status_tx.send(None).await.unwrap();
let _ = daemon_tx.send(event.into()).await;
});

// Log to file stream.
tokio::spawn(async move {
while let Some(Some(line)) = rx.recv().await {
file.write_all(line.as_bytes())
.await
.expect("Could not log stdout/stderr to file");
file.write_all(b"\n")
.await
.expect("Could not add newline to log file.");
debug!("{dataflow_id}/{node_id} logged {line}");

// Make sure that all data has been synced to disk.
file.sync_all().await.unwrap();
}
});
Ok(())
}

Loading…
Cancel
Save