|
|
|
@@ -1,11 +1,13 @@ |
|
|
|
use dora_rs::descriptor::Descriptor; |
|
|
|
use eyre::{eyre, Context}; |
|
|
|
use futures::{stream::FuturesUnordered, StreamExt}; |
|
|
|
use futures_concurrency::Merge; |
|
|
|
use std::path::{Path, PathBuf}; |
|
|
|
use std::{ |
|
|
|
collections::BTreeMap, |
|
|
|
path::{Path, PathBuf}, |
|
|
|
}; |
|
|
|
use structopt::StructOpt; |
|
|
|
use tokio::io::{AsyncBufReadExt, BufReader}; |
|
|
|
use tokio_stream::wrappers::LinesStream; |
|
|
|
use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream}; |
|
|
|
use tokio_util::codec::{Framed, LengthDelimitedCodec}; |
|
|
|
|
|
|
|
#[derive(Debug, Clone, StructOpt)] |
|
|
|
#[structopt(about = "Dora control")] |
|
|
|
@@ -38,49 +40,66 @@ async fn main() -> eyre::Result<()> { |
|
|
|
Command::StartPython(command) => { |
|
|
|
dora_rs::python::server::run(command).context("python server failed")?; |
|
|
|
} |
|
|
|
Command::Run { file } => { |
|
|
|
let descriptor = read_descriptor(&file).await?; |
|
|
|
Command::Run { file } => run_dataflow(file).await?, |
|
|
|
} |
|
|
|
|
|
|
|
let mut outputs = Vec::new(); |
|
|
|
let tasks = FuturesUnordered::new(); |
|
|
|
Ok(()) |
|
|
|
} |
|
|
|
|
|
|
|
for source in &descriptor.sources { |
|
|
|
let mut command = tokio::process::Command::new(&source.run); |
|
|
|
command.stdout(std::process::Stdio::piped()); |
|
|
|
async fn run_dataflow(file: PathBuf) -> eyre::Result<()> { |
|
|
|
let descriptor = read_descriptor(&file).await?; |
|
|
|
|
|
|
|
let mut child = command |
|
|
|
.spawn() |
|
|
|
.with_context(|| format!("failed to spawn source {}", source.id))?; |
|
|
|
let stdout = child |
|
|
|
.stdout |
|
|
|
.take() |
|
|
|
.ok_or_else(|| eyre!("failed to take stdout handle of source"))?; |
|
|
|
let reader = LinesStream::new(BufReader::new(stdout).lines()); |
|
|
|
let socket = tokio::net::TcpListener::bind("127.0.0.1:0") |
|
|
|
.await |
|
|
|
.wrap_err("failed to create TCP listener")?; |
|
|
|
let socket_addr = socket |
|
|
|
.local_addr() |
|
|
|
.wrap_err("failed to get socket address")?; |
|
|
|
|
|
|
|
let source_id = source.id.clone(); |
|
|
|
let result = tokio::spawn(async move { |
|
|
|
let status = child.wait().await.context("child process failed")?; |
|
|
|
if status.success() { |
|
|
|
Ok(()) |
|
|
|
} else if let Some(code) = status.code() { |
|
|
|
Err(eyre!("Source {source_id} failed with exit code: {code}")) |
|
|
|
} else { |
|
|
|
Err(eyre!("Source {source_id} failed (unknown exit code)")) |
|
|
|
} |
|
|
|
}); |
|
|
|
let (outputs_tx, outputs) = tokio::sync::mpsc::channel(10); |
|
|
|
tokio::spawn(async move { |
|
|
|
let mut incoming = TcpListenerStream::new(socket); |
|
|
|
while let Some(connection) = incoming.next().await.transpose()? { |
|
|
|
let outputs_tx = outputs_tx.clone(); |
|
|
|
tokio::spawn(async move { |
|
|
|
let mut framed = Framed::new(connection, LengthDelimitedCodec::new()); |
|
|
|
while let Some(frame) = framed.next().await.transpose()? { |
|
|
|
let deserialized: BTreeMap<String, Vec<u8>> = bincode::deserialize(&frame) |
|
|
|
.wrap_err("failed to deserialize output message")?; |
|
|
|
outputs_tx.send(deserialized).await?; |
|
|
|
} |
|
|
|
Result::<_, eyre::Error>::Ok(()) |
|
|
|
}); |
|
|
|
} |
|
|
|
Result::<_, eyre::Error>::Ok(()) |
|
|
|
}); |
|
|
|
|
|
|
|
outputs.push(reader.map(|l| (source.output.clone(), l))); |
|
|
|
tasks.push(result); |
|
|
|
} |
|
|
|
let tasks = FuturesUnordered::new(); |
|
|
|
for source in &descriptor.sources { |
|
|
|
let mut command = tokio::process::Command::new(&source.run); |
|
|
|
command.env("SERVER_SOCKET_ADDR", socket_addr.to_string()); |
|
|
|
|
|
|
|
let mut child = command |
|
|
|
.spawn() |
|
|
|
.with_context(|| format!("failed to spawn source {}", source.id))?; |
|
|
|
|
|
|
|
// print all output for now (the eventual goal is to pass it to operators) |
|
|
|
let mut merged = outputs.merge(); |
|
|
|
while let Some((name, line)) = merged.next().await { |
|
|
|
let output = |
|
|
|
line.with_context(|| format!("failed to get next line of output {name}"))?; |
|
|
|
println!("Output {name}: {output}"); |
|
|
|
let source_id = source.id.clone(); |
|
|
|
let result = tokio::spawn(async move { |
|
|
|
let status = child.wait().await.context("child process failed")?; |
|
|
|
if status.success() { |
|
|
|
Ok(()) |
|
|
|
} else if let Some(code) = status.code() { |
|
|
|
Err(eyre!("Source {source_id} failed with exit code: {code}")) |
|
|
|
} else { |
|
|
|
Err(eyre!("Source {source_id} failed (unknown exit code)")) |
|
|
|
} |
|
|
|
} |
|
|
|
}); |
|
|
|
tasks.push(result); |
|
|
|
} |
|
|
|
|
|
|
|
let mut outputs_stream = ReceiverStream::new(outputs); |
|
|
|
while let Some(output) = outputs_stream.next().await { |
|
|
|
println!("Output: {output:?}"); |
|
|
|
} |
|
|
|
|
|
|
|
Ok(()) |
|
|
|
|