diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index 34398603..443f72d2 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -19,7 +19,7 @@ use tokio::{ io::{AsyncBufReadExt, AsyncWriteExt}, sync::mpsc, }; -use tracing::info; +use tracing::debug; pub async fn spawn_node( dataflow_id: DataflowId, @@ -173,78 +173,71 @@ pub async fn spawn_node( } }; + let log_dir = temp_dir(); + + let (tx, mut rx) = mpsc::channel(10); + let mut file = File::create( + &log_dir.join(PathBuf::from(format!("{dataflow_id}-{node_id}.txt")).with_extension("txt")), + ) + .await + .expect("Failed to create log file"); + + let mut stdout_lines = + (tokio::io::BufReader::new(child.stdout.take().expect("failed to take stdout"))).lines(); + + let stdout_tx = tx.clone(); + + // Stdout listener stream tokio::spawn(async move { - let log_dir = temp_dir(); - - let (tx, mut rx) = mpsc::channel(10); - let mut file = File::create( - &log_dir - .join(PathBuf::from(format!("{dataflow_id}-{node_id}.txt")).with_extension("txt")), - ) - .await - .expect("Failed to create log file"); - - let mut stdout_lines = - (tokio::io::BufReader::new(child.stdout.take().expect("failed to take stdout"))) - .lines(); - - let stdout_tx = tx.clone(); - - // Stdout listener stream - tokio::spawn(async move { - while let Ok(Some(line)) = stdout_lines.next_line().await { - let sent = stdout_tx.send(Some(line)).await; - if sent.is_err() { - break; - } + while let Ok(Some(line)) = stdout_lines.next_line().await { + let sent = stdout_tx.send(Some(line)).await; + if sent.is_err() { + break; } - }); - - let mut stderr_lines = - (tokio::io::BufReader::new(child.stderr.take().expect("failed to take stderr"))) - .lines(); - - // Stderr listener stream - let stderr_tx = tx.clone(); - tokio::spawn(async move { - while let Ok(Some(line)) = stderr_lines.next_line().await { - let sent = stderr_tx.send(Some(line)).await; - if sent.is_err() { - break; - } - } - }); + } + }); - let exit_status_tx = tx.clone(); - tokio::spawn(async move { - let exit_status = NodeExitStatus::from(child.wait().await); - let event = DoraEvent::SpawnedNodeResult { - dataflow_id, - node_id, - exit_status, - }; + let mut stderr_lines = + (tokio::io::BufReader::new(child.stderr.take().expect("failed to take stderr"))).lines(); - let _ = daemon_tx.send(event.into()).await; - exit_status_tx.send(None).await.unwrap(); - }); - - // Log to file stream. - tokio::spawn(async move { - while let Some(Some(line)) = rx.recv().await { - file.write_all(line.as_bytes()) - .await - .expect("Could not log stdout/stderr to file"); - file.write_all(b"\n") - .await - .expect("Could not add newline to log file."); - info!(line); - - // Make sure that all data has been synced to disk. - file.sync_all().await.unwrap(); + // Stderr listener stream + let stderr_tx = tx.clone(); + tokio::spawn(async move { + while let Ok(Some(line)) = stderr_lines.next_line().await { + let sent = stderr_tx.send(Some(line)).await; + if sent.is_err() { + break; } - }) - .await - .expect("Could not write logs to file"); + } + }); + + let exit_status_tx = tx.clone(); + tokio::spawn(async move { + let exit_status = NodeExitStatus::from(child.wait().await); + let event = DoraEvent::SpawnedNodeResult { + dataflow_id, + node_id, + exit_status, + }; + + exit_status_tx.send(None).await.unwrap(); + let _ = daemon_tx.send(event.into()).await; + }); + + // Log to file stream. + tokio::spawn(async move { + while let Some(Some(line)) = rx.recv().await { + file.write_all(line.as_bytes()) + .await + .expect("Could not log stdout/stderr to file"); + file.write_all(b"\n") + .await + .expect("Could not add newline to log file."); + debug!("{dataflow_id}/{node_id} logged {line}"); + + // Make sure that all data has been synced to disk. + file.sync_all().await.unwrap(); + } }); Ok(()) }