diff --git a/Cargo.lock b/Cargo.lock index 96ae2434..8505b90b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -515,11 +515,14 @@ dependencies = [ "envy", "eyre", "futures", + "futures-concurrency", + "log", "pyo3", "serde", "serde_yaml", "structopt", "tokio", + "tokio-stream", "zenoh", ] @@ -620,6 +623,17 @@ dependencies = [ "futures-sink", ] +[[package]] +name = "futures-concurrency" +version = "2.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48e98b7b5aedee7c34a5cfb1ee1681af8faf46e2f30c0b8af5ea08eba517d61c" +dependencies = [ + "async-trait", + "futures-core", + "pin-project", +] + [[package]] name = "futures-core" version = "0.3.21" @@ -967,9 +981,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.14" +version = "0.4.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710" +checksum = "6389c490849ff5bc16be905ae24bc913a9c8892e19b2341dbc175e14c341c2b8" dependencies = [ "cfg-if", "value-bag", @@ -2160,6 +2174,17 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-stream" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50145484efff8818b5ccd256697f36863f587da82cf8b409c53adf1e840798e3" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tracing" version = "0.1.32" diff --git a/Cargo.toml b/Cargo.toml index d3c42e4b..7306ac82 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,5 +14,8 @@ zenoh = { git="https://github.com/eclipse-zenoh/zenoh.git" } env_logger = "0.9.0" tokio = { version="1.17.0", features=["full"]} pyo3 = "0.16.1" -futures = "0.3.12" -envy = "0.4.2" \ No newline at end of file +futures = "0.3.21" +envy = "0.4.2" +log = "0.4.16" +tokio-stream = { version = "0.1.8", features = ["io-util"] } +futures-concurrency = "2.0.3" diff --git a/examples/dataflow-example.yml b/examples/dataflow-example.yml index 0fef5e03..a36de2cf 100644 --- a/examples/dataflow-example.yml +++ b/examples/dataflow-example.yml @@ -6,8 +6,10 @@ sinks: sources: - id: source-1 output: C + run: target/debug/examples/example_source - id: source-2 output: G + run: target/debug/examples/example_source operators: - id: op-1 inputs: diff --git a/examples/example_source.rs b/examples/example_source.rs new file mode 100644 index 00000000..fd2b8ca2 --- /dev/null +++ b/examples/example_source.rs @@ -0,0 +1,8 @@ +use std::time::Duration; + +fn main() { + for i in 0..100 { + println!("{i}"); + std::thread::sleep(Duration::from_millis(100)); + } +} diff --git a/src/descriptor.rs b/src/descriptor.rs index 55e3f397..0f0d18ee 100644 --- a/src/descriptor.rs +++ b/src/descriptor.rs @@ -4,11 +4,11 @@ use std::collections::{BTreeSet, HashMap, HashSet}; #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct Descriptor { #[serde(default)] - sources: HashSet, + pub sources: HashSet, #[serde(default)] - sinks: HashSet, + pub sinks: HashSet, #[serde(default)] - operators: HashSet, + pub operators: HashSet, } impl Descriptor { @@ -74,19 +74,20 @@ impl Descriptor { #[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct Source { - id: String, - output: String, + pub id: String, + pub output: String, + pub run: String, } #[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct Sink { - id: String, - input: String, + pub id: String, + pub input: String, } #[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct Operator { - id: String, - inputs: BTreeSet, - outputs: BTreeSet, + pub id: String, + pub inputs: BTreeSet, + pub outputs: BTreeSet, } diff --git a/src/main.rs b/src/main.rs index 93097efb..9ad5d082 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,27 +1,31 @@ use dora_rs::descriptor::Descriptor; -use eyre::Context; -use std::{fs::File, path::PathBuf}; +use eyre::{eyre, Context}; +use futures::{stream::FuturesUnordered, StreamExt}; +use futures_concurrency::Merge; +use std::path::{Path, PathBuf}; use structopt::StructOpt; +use tokio::io::{AsyncBufReadExt, BufReader}; +use tokio_stream::wrappers::LinesStream; #[derive(Debug, Clone, StructOpt)] #[structopt(about = "Dora control")] enum Command { #[structopt(about = "Print Graph")] - Graph { file: PathBuf }, + Visualize { file: PathBuf }, #[structopt(about = "Run Python server")] StartPython(dora_rs::python::server::PythonCommand), + #[structopt(about = "Run dataflow pipeline")] + Run { file: PathBuf }, } -fn main() -> eyre::Result<()> { +#[tokio::main] +async fn main() -> eyre::Result<()> { env_logger::init(); let command = Command::from_args(); match command { - Command::Graph { file } => { - let descriptor_file = File::open(&file).context("failed to open given file")?; - - let descriptor: Descriptor = serde_yaml::from_reader(descriptor_file) - .context("failed to parse given descriptor")?; + Command::Visualize { file } => { + let descriptor = read_descriptor(&file).await?; let visualized = descriptor .visualize_as_mermaid() .context("failed to visualize descriptor")?; @@ -34,7 +38,59 @@ fn main() -> eyre::Result<()> { Command::StartPython(command) => { dora_rs::python::server::run(command).context("python server failed")?; } + Command::Run { file } => { + let descriptor = read_descriptor(&file).await?; + + let mut outputs = Vec::new(); + let tasks = FuturesUnordered::new(); + + for source in &descriptor.sources { + let mut command = tokio::process::Command::new(&source.run); + command.stdout(std::process::Stdio::piped()); + + let mut child = command + .spawn() + .with_context(|| format!("failed to spawn source {}", source.id))?; + let stdout = child + .stdout + .take() + .ok_or_else(|| eyre!("failed to take stdout handle of source"))?; + let reader = LinesStream::new(BufReader::new(stdout).lines()); + + let source_id = source.id.clone(); + let result = tokio::spawn(async move { + let status = child.wait().await.context("child process failed")?; + if status.success() { + Ok(()) + } else if let Some(code) = status.code() { + Err(eyre!("Source {source_id} failed with exit code: {code}")) + } else { + Err(eyre!("Source {source_id} failed (unknown exit code)")) + } + }); + + outputs.push(reader.map(|l| (source.output.clone(), l))); + tasks.push(result); + } + + // print all output for now (the eventual goal is to pass it to operators) + let mut merged = outputs.merge(); + while let Some((name, line)) = merged.next().await { + let output = + line.with_context(|| format!("failed to get next line of output {name}"))?; + println!("Output {name}: {output}"); + } + } } Ok(()) } + +async fn read_descriptor(file: &Path) -> Result { + let descriptor_file = tokio::fs::read(file) + .await + .context("failed to open given file")?; + let descriptor: Descriptor = + serde_yaml::from_slice(&descriptor_file).context("failed to parse given descriptor")?; + Ok(descriptor) +}