diff --git a/Cargo.lock b/Cargo.lock index a288cf2c..63d16c5e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -628,6 +628,7 @@ dependencies = [ "futures-concurrency", "serde_yaml", "tokio", + "tokio-stream", "zenoh", "zenoh-config", ] diff --git a/common/src/descriptor/mod.rs b/common/src/descriptor/mod.rs index dff89642..269e2ff1 100644 --- a/common/src/descriptor/mod.rs +++ b/common/src/descriptor/mod.rs @@ -38,12 +38,12 @@ pub struct Node { #[serde(rename_all = "lowercase")] pub enum NodeKind { /// Dora runtime node - Operators(Vec), + Operators(Vec), Custom(CustomNode), } #[derive(Debug, Serialize, Deserialize)] -pub struct Operator { +pub struct OperatorConfig { pub id: OperatorId, pub name: Option, pub description: Option, diff --git a/common/src/descriptor/visualize.rs b/common/src/descriptor/visualize.rs index ad1a02e9..b3659a3b 100644 --- a/common/src/descriptor/visualize.rs +++ b/common/src/descriptor/visualize.rs @@ -1,6 +1,6 @@ use dora_api::config::{DataId, InputMapping, NodeId}; -use super::{CustomNode, Node, NodeKind, Operator}; +use super::{CustomNode, Node, NodeKind, OperatorConfig}; use std::collections::{BTreeMap, HashMap}; pub fn visualize_nodes(nodes: &[Node]) -> String { @@ -40,7 +40,7 @@ fn visualize_custom_node(node_id: &NodeId, node: &CustomNode, flowchart: &mut St } } -fn visualize_runtime_node(node_id: &NodeId, operators: &[Operator], flowchart: &mut String) { +fn visualize_runtime_node(node_id: &NodeId, operators: &[OperatorConfig], flowchart: &mut String) { flowchart.push_str(&format!("subgraph {node_id}\n")); for operator in operators { let operator_id = &operator.id; diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index 54cf8ba8..2f0ddf31 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -14,5 +14,6 @@ futures = "0.3.21" futures-concurrency = "2.0.3" serde_yaml = "0.8.23" tokio = { version = "1.17.0", features = ["full"] } +tokio-stream = "0.1.8" zenoh = { git = "https://github.com/eclipse-zenoh/zenoh.git" } zenoh-config = { git = "https://github.com/eclipse-zenoh/zenoh.git" } diff --git a/runtime/src/main.rs b/runtime/src/main.rs index 9c7729f2..1e972af4 100644 --- a/runtime/src/main.rs +++ b/runtime/src/main.rs @@ -6,17 +6,21 @@ use dora_api::{ STOP_TOPIC, }; use dora_common::{ - descriptor::{Descriptor, Operator}, + descriptor::{Descriptor, OperatorConfig}, BoxError, }; use eyre::{bail, eyre, Context}; use futures::{stream::FuturesUnordered, StreamExt}; use futures_concurrency::Merge; +use operator::{Operator, OperatorEvent}; use std::{ collections::{BTreeMap, HashSet}, path::PathBuf, - time::Duration, }; +use tokio::sync::mpsc; +use tokio_stream::{wrappers::ReceiverStream, StreamMap}; + +mod operator; #[derive(Debug, Clone, clap::Parser)] #[clap(about = "Limit the rate of incoming data")] @@ -51,40 +55,61 @@ async fn main() -> eyre::Result<()> { } }; + let mut operator_map = BTreeMap::new(); + let mut operator_events = StreamMap::new(); + for operator_config in &operators { + let (events_tx, events) = mpsc::channel(1); + let operator = Operator::init(operator_config, events_tx.clone()) + .await + .wrap_err_with(|| format!("failed to init operator {}", operator_config.id))?; + operator_map.insert(&operator_config.id, operator); + operator_events.insert(operator_config.id.clone(), ReceiverStream::new(events)); + } + let zenoh = zenoh::open(dataflow.communication.zenoh_config.clone()) .await .map_err(BoxError) .wrap_err("failed to create zenoh session")?; let mut communication: Box = Box::new(zenoh); - let mut inputs = subscribe(communication.as_mut(), &dataflow.communication, &operators) + let inputs = subscribe(communication.as_mut(), &dataflow.communication, &operators) .await .context("failed to subscribe")?; - let operator_map: BTreeMap<_, _> = operators.iter().map(|o| (&o.id, o)).collect(); - - loop { - let timeout = Duration::from_secs(15 * 60); - let input = match tokio::time::timeout(timeout, inputs.next()).await { - Ok(Some(input)) => input, - Ok(None) => break, - Err(_) => bail!("timeout while waiting for input"), - }; - - let operator = operator_map.get(&input.target_operator).ok_or_else(|| { - eyre!( - "received input for unexpected operator `{}`", - input.target_operator - ) - })?; - - let todo = "implement operator abstraction and call it here"; - println!( - "Received input {} for operator {}: {}", - input.id, - input.target_operator, - String::from_utf8_lossy(&input.data) - ); + let input_events = inputs.map(Event::Input); + let operator_events = operator_events.map(|(id, event)| Event::Operator { id, event }); + let mut events = (input_events, operator_events).merge(); + + while let Some(event) = events.next().await { + match event { + Event::Input(input) => { + let operator = operator_map + .get_mut(&input.target_operator) + .ok_or_else(|| { + eyre!( + "received input for unexpected operator `{}`", + input.target_operator + ) + })?; + + println!( + "Received input {} for operator {}: {}", + input.id, + input.target_operator, + String::from_utf8_lossy(&input.data) + ); + + operator + .handle_input(input.id.clone(), input.data) + .wrap_err_with(|| { + format!( + "operator {} failed to handle input {}", + input.target_operator, input.id + ) + })?; + } + Event::Operator { id, event } => match event {}, + } } Ok(()) @@ -93,7 +118,7 @@ async fn main() -> eyre::Result<()> { async fn subscribe<'a>( communication: &'a mut dyn CommunicationLayer, communication_config: &CommunicationConfig, - operators: &'a [Operator], + operators: &'a [OperatorConfig], ) -> eyre::Result + 'a> { let prefix = &communication_config.zenoh_prefix; @@ -145,7 +170,15 @@ async fn subscribe<'a>( Ok(streams.merge().take_until(finished)) } -pub struct OperatorInput { +enum Event { + Input(OperatorInput), + Operator { + id: OperatorId, + event: OperatorEvent, + }, +} + +struct OperatorInput { pub target_operator: OperatorId, pub id: DataId, pub data: Vec, diff --git a/runtime/src/operator/mod.rs b/runtime/src/operator/mod.rs new file mode 100644 index 00000000..054f75e5 --- /dev/null +++ b/runtime/src/operator/mod.rs @@ -0,0 +1,50 @@ +use dora_api::config::DataId; +use dora_common::descriptor::{OperatorConfig, OperatorSource}; +use eyre::eyre; +use tokio::sync::mpsc::{self, Sender}; + +mod shared_lib; + +pub struct Operator { + operator_task: Sender, +} + +impl Operator { + pub async fn init( + operator_config: &OperatorConfig, + events_tx: Sender, + ) -> eyre::Result { + let (operator_task, operator_rx) = mpsc::channel(10); + + match &operator_config.source { + OperatorSource::SharedLibrary(path) => { + let todo = + "init shared library operator at `path` with `events_tx` and `operator_rx`"; + eprintln!("WARNING: shared library operators are not supported yet"); + } + OperatorSource::Python(path) => { + eprintln!("WARNING: Python operators are not supported yet"); + } + OperatorSource::Wasm(path) => { + eprintln!("WARNING: WASM operators are not supported yet"); + } + } + Ok(Self { operator_task }) + } + + pub fn handle_input(&mut self, id: DataId, value: Vec) -> eyre::Result<()> { + self.operator_task + .try_send(OperatorInput { id, value }) + .map_err(|err| match err { + tokio::sync::mpsc::error::TrySendError::Closed(_) => eyre!("operator crashed"), + tokio::sync::mpsc::error::TrySendError::Full(_) => eyre!("operator queue full"), + }) + } +} + +pub enum OperatorEvent {} + +pub struct OperatorInput { + id: DataId, + value: Vec, +} diff --git a/runtime/src/operator/shared_lib.rs b/runtime/src/operator/shared_lib.rs new file mode 100644 index 00000000..e69de29b