|
|
|
@@ -244,20 +244,47 @@ pub async fn spawn_node( |
|
|
|
} |
|
|
|
}); |
|
|
|
|
|
|
|
let mut stderr_lines = |
|
|
|
(tokio::io::BufReader::new(child.stderr.take().expect("failed to take stderr"))).lines(); |
|
|
|
let mut child_stderr = |
|
|
|
tokio::io::BufReader::new(child.stderr.take().expect("failed to take stderr")); |
|
|
|
|
|
|
|
// Stderr listener stream |
|
|
|
let stderr_tx = tx.clone(); |
|
|
|
let node_id = node.id.clone(); |
|
|
|
tokio::spawn(async move { |
|
|
|
while let Ok(Some(line)) = stderr_lines.next_line().await { |
|
|
|
let sent = stderr_tx.send(line.clone()).await; |
|
|
|
let mut buffer = String::new(); |
|
|
|
let mut finished = false; |
|
|
|
while !finished { |
|
|
|
finished = match child_stderr |
|
|
|
.read_line(&mut buffer) |
|
|
|
.await |
|
|
|
.wrap_err("failed to read stderr line from spawned node") |
|
|
|
{ |
|
|
|
Ok(0) => true, |
|
|
|
Ok(_) => false, |
|
|
|
Err(err) => { |
|
|
|
tracing::warn!("{err:?}"); |
|
|
|
true |
|
|
|
} |
|
|
|
}; |
|
|
|
|
|
|
|
if buffer.starts_with("Traceback (most recent call last):") { |
|
|
|
if !finished { |
|
|
|
continue; |
|
|
|
} else { |
|
|
|
tracing::error!("{dataflow_id}/{}: \n{buffer}", node_id); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// send the buffered lines |
|
|
|
let lines = std::mem::take(&mut buffer); |
|
|
|
let sent = stderr_tx.send(lines.clone()).await; |
|
|
|
if sent.is_err() { |
|
|
|
eprintln!("Could not log: {line}"); |
|
|
|
println!("Could not log: {lines}"); |
|
|
|
} |
|
|
|
} |
|
|
|
}); |
|
|
|
|
|
|
|
let node_id = node.id.clone(); |
|
|
|
let (log_finish_tx, log_finish_rx) = oneshot::channel(); |
|
|
|
tokio::spawn(async move { |
|
|
|
let exit_status = NodeExitStatus::from(child.wait().await); |
|
|
|
|