From 534e0acf3182bc28151de7dd3e0f17a35f88c459 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Thu, 24 Mar 2022 14:01:37 +0100 Subject: [PATCH] Use local tcp sockets encoded with bincode to allow multiple outputs --- Cargo.lock | 16 ++++++ Cargo.toml | 8 +-- examples/example_source.rs | 40 +++++++++++++-- src/main.rs | 99 +++++++++++++++++++++++--------------- 4 files changed, 116 insertions(+), 47 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8505b90b..cbd953c4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -511,6 +511,7 @@ dependencies = [ name = "dora-rs" version = "0.1.0" dependencies = [ + "bincode", "env_logger", "envy", "eyre", @@ -523,6 +524,7 @@ dependencies = [ "structopt", "tokio", "tokio-stream", + "tokio-util", "zenoh", ] @@ -2185,6 +2187,20 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-util" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64910e1b9c1901aaf5375561e35b9c057d95ff41a44ede043a03e09279eabaf1" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "log", + "pin-project-lite", + "tokio", +] + [[package]] name = "tracing" version = "0.1.32" diff --git a/Cargo.toml b/Cargo.toml index 7306ac82..415999d6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,12 +10,14 @@ eyre = "0.6.7" serde = { version = "1.0", features = ["derive"] } serde_yaml = "0.8.23" structopt = "0.3.26" -zenoh = { git="https://github.com/eclipse-zenoh/zenoh.git" } +zenoh = { git = "https://github.com/eclipse-zenoh/zenoh.git" } env_logger = "0.9.0" -tokio = { version="1.17.0", features=["full"]} +tokio = { version = "1.17.0", features = ["full"] } pyo3 = "0.16.1" futures = "0.3.21" envy = "0.4.2" log = "0.4.16" -tokio-stream = { version = "0.1.8", features = ["io-util"] } +tokio-stream = { version = "0.1.8", features = ["net"] } futures-concurrency = "2.0.3" +tokio-util = { version = "0.7.0", features = ["codec"] } +bincode = "1.3.3" diff --git a/examples/example_source.rs b/examples/example_source.rs index fd2b8ca2..3cd79fda 100644 --- a/examples/example_source.rs +++ b/examples/example_source.rs @@ -1,8 +1,40 @@ -use std::time::Duration; +use eyre::Context; +use futures::SinkExt; +use std::{collections::BTreeMap, time::Duration}; +use tokio_util::codec::{Framed, LengthDelimitedCodec}; + +#[tokio::main] +async fn main() -> eyre::Result<()> { + let server_socket_addr = std::env::var("SERVER_SOCKET_ADDR") + .wrap_err("failed to read `SERVER_SOCKET_ADDR` environment variable")?; + + let tcp = tokio::net::TcpStream::connect(&server_socket_addr) + .await + .with_context(|| { + format!("failed to open TCP connection to server at {server_socket_addr}") + })?; + let mut framed = Framed::new(tcp, LengthDelimitedCodec::new()); + + let mut i: u8 = 0; + loop { + let mut outputs: BTreeMap<_, Vec> = Default::default(); + outputs.insert("A", vec![i]); + if i % 3 == 0 { + outputs.insert("fizz", vec![]); + } + if i % 5 == 0 { + outputs.insert("buzz", vec![]); + } + outputs.insert("Squared", vec![i.wrapping_mul(i)]); + + let serialized = bincode::serialize(&outputs).wrap_err("failed to serialize output")?; + + framed + .send(serialized.into()) + .await + .wrap_err("failed to send output")?; -fn main() { - for i in 0..100 { - println!("{i}"); std::thread::sleep(Duration::from_millis(100)); + i = i.wrapping_add(1); } } diff --git a/src/main.rs b/src/main.rs index 9ad5d082..d6096baa 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,11 +1,13 @@ use dora_rs::descriptor::Descriptor; use eyre::{eyre, Context}; use futures::{stream::FuturesUnordered, StreamExt}; -use futures_concurrency::Merge; -use std::path::{Path, PathBuf}; +use std::{ + collections::BTreeMap, + path::{Path, PathBuf}, +}; use structopt::StructOpt; -use tokio::io::{AsyncBufReadExt, BufReader}; -use tokio_stream::wrappers::LinesStream; +use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream}; +use tokio_util::codec::{Framed, LengthDelimitedCodec}; #[derive(Debug, Clone, StructOpt)] #[structopt(about = "Dora control")] @@ -38,49 +40,66 @@ async fn main() -> eyre::Result<()> { Command::StartPython(command) => { dora_rs::python::server::run(command).context("python server failed")?; } - Command::Run { file } => { - let descriptor = read_descriptor(&file).await?; + Command::Run { file } => run_dataflow(file).await?, + } - let mut outputs = Vec::new(); - let tasks = FuturesUnordered::new(); + Ok(()) +} - for source in &descriptor.sources { - let mut command = tokio::process::Command::new(&source.run); - command.stdout(std::process::Stdio::piped()); +async fn run_dataflow(file: PathBuf) -> eyre::Result<()> { + let descriptor = read_descriptor(&file).await?; - let mut child = command - .spawn() - .with_context(|| format!("failed to spawn source {}", source.id))?; - let stdout = child - .stdout - .take() - .ok_or_else(|| eyre!("failed to take stdout handle of source"))?; - let reader = LinesStream::new(BufReader::new(stdout).lines()); + let socket = tokio::net::TcpListener::bind("127.0.0.1:0") + .await + .wrap_err("failed to create TCP listener")?; + let socket_addr = socket + .local_addr() + .wrap_err("failed to get socket address")?; - let source_id = source.id.clone(); - let result = tokio::spawn(async move { - let status = child.wait().await.context("child process failed")?; - if status.success() { - Ok(()) - } else if let Some(code) = status.code() { - Err(eyre!("Source {source_id} failed with exit code: {code}")) - } else { - Err(eyre!("Source {source_id} failed (unknown exit code)")) - } - }); + let (outputs_tx, outputs) = tokio::sync::mpsc::channel(10); + tokio::spawn(async move { + let mut incoming = TcpListenerStream::new(socket); + while let Some(connection) = incoming.next().await.transpose()? { + let outputs_tx = outputs_tx.clone(); + tokio::spawn(async move { + let mut framed = Framed::new(connection, LengthDelimitedCodec::new()); + while let Some(frame) = framed.next().await.transpose()? { + let deserialized: BTreeMap> = bincode::deserialize(&frame) + .wrap_err("failed to deserialize output message")?; + outputs_tx.send(deserialized).await?; + } + Result::<_, eyre::Error>::Ok(()) + }); + } + Result::<_, eyre::Error>::Ok(()) + }); - outputs.push(reader.map(|l| (source.output.clone(), l))); - tasks.push(result); - } + let tasks = FuturesUnordered::new(); + for source in &descriptor.sources { + let mut command = tokio::process::Command::new(&source.run); + command.env("SERVER_SOCKET_ADDR", socket_addr.to_string()); + + let mut child = command + .spawn() + .with_context(|| format!("failed to spawn source {}", source.id))?; - // print all output for now (the eventual goal is to pass it to operators) - let mut merged = outputs.merge(); - while let Some((name, line)) = merged.next().await { - let output = - line.with_context(|| format!("failed to get next line of output {name}"))?; - println!("Output {name}: {output}"); + let source_id = source.id.clone(); + let result = tokio::spawn(async move { + let status = child.wait().await.context("child process failed")?; + if status.success() { + Ok(()) + } else if let Some(code) = status.code() { + Err(eyre!("Source {source_id} failed with exit code: {code}")) + } else { + Err(eyre!("Source {source_id} failed (unknown exit code)")) } - } + }); + tasks.push(result); + } + + let mut outputs_stream = ReceiverStream::new(outputs); + while let Some(output) = outputs_stream.next().await { + println!("Output: {output:?}"); } Ok(())