|
|
|
@@ -6,17 +6,21 @@ use dora_api::{ |
|
|
|
STOP_TOPIC, |
|
|
|
}; |
|
|
|
use dora_common::{ |
|
|
|
descriptor::{Descriptor, Operator}, |
|
|
|
descriptor::{Descriptor, OperatorConfig}, |
|
|
|
BoxError, |
|
|
|
}; |
|
|
|
use eyre::{bail, eyre, Context}; |
|
|
|
use futures::{stream::FuturesUnordered, StreamExt}; |
|
|
|
use futures_concurrency::Merge; |
|
|
|
use operator::{Operator, OperatorEvent}; |
|
|
|
use std::{ |
|
|
|
collections::{BTreeMap, HashSet}, |
|
|
|
path::PathBuf, |
|
|
|
time::Duration, |
|
|
|
}; |
|
|
|
use tokio::sync::mpsc; |
|
|
|
use tokio_stream::{wrappers::ReceiverStream, StreamMap}; |
|
|
|
|
|
|
|
mod operator; |
|
|
|
|
|
|
|
#[derive(Debug, Clone, clap::Parser)] |
|
|
|
#[clap(about = "Limit the rate of incoming data")] |
|
|
|
@@ -51,40 +55,61 @@ async fn main() -> eyre::Result<()> { |
|
|
|
} |
|
|
|
}; |
|
|
|
|
|
|
|
let mut operator_map = BTreeMap::new(); |
|
|
|
let mut operator_events = StreamMap::new(); |
|
|
|
for operator_config in &operators { |
|
|
|
let (events_tx, events) = mpsc::channel(1); |
|
|
|
let operator = Operator::init(operator_config, events_tx.clone()) |
|
|
|
.await |
|
|
|
.wrap_err_with(|| format!("failed to init operator {}", operator_config.id))?; |
|
|
|
operator_map.insert(&operator_config.id, operator); |
|
|
|
operator_events.insert(operator_config.id.clone(), ReceiverStream::new(events)); |
|
|
|
} |
|
|
|
|
|
|
|
let zenoh = zenoh::open(dataflow.communication.zenoh_config.clone()) |
|
|
|
.await |
|
|
|
.map_err(BoxError) |
|
|
|
.wrap_err("failed to create zenoh session")?; |
|
|
|
let mut communication: Box<dyn CommunicationLayer> = Box::new(zenoh); |
|
|
|
|
|
|
|
let mut inputs = subscribe(communication.as_mut(), &dataflow.communication, &operators) |
|
|
|
let inputs = subscribe(communication.as_mut(), &dataflow.communication, &operators) |
|
|
|
.await |
|
|
|
.context("failed to subscribe")?; |
|
|
|
|
|
|
|
let operator_map: BTreeMap<_, _> = operators.iter().map(|o| (&o.id, o)).collect(); |
|
|
|
|
|
|
|
loop { |
|
|
|
let timeout = Duration::from_secs(15 * 60); |
|
|
|
let input = match tokio::time::timeout(timeout, inputs.next()).await { |
|
|
|
Ok(Some(input)) => input, |
|
|
|
Ok(None) => break, |
|
|
|
Err(_) => bail!("timeout while waiting for input"), |
|
|
|
}; |
|
|
|
|
|
|
|
let operator = operator_map.get(&input.target_operator).ok_or_else(|| { |
|
|
|
eyre!( |
|
|
|
"received input for unexpected operator `{}`", |
|
|
|
input.target_operator |
|
|
|
) |
|
|
|
})?; |
|
|
|
|
|
|
|
let todo = "implement operator abstraction and call it here"; |
|
|
|
println!( |
|
|
|
"Received input {} for operator {}: {}", |
|
|
|
input.id, |
|
|
|
input.target_operator, |
|
|
|
String::from_utf8_lossy(&input.data) |
|
|
|
); |
|
|
|
let input_events = inputs.map(Event::Input); |
|
|
|
let operator_events = operator_events.map(|(id, event)| Event::Operator { id, event }); |
|
|
|
let mut events = (input_events, operator_events).merge(); |
|
|
|
|
|
|
|
while let Some(event) = events.next().await { |
|
|
|
match event { |
|
|
|
Event::Input(input) => { |
|
|
|
let operator = operator_map |
|
|
|
.get_mut(&input.target_operator) |
|
|
|
.ok_or_else(|| { |
|
|
|
eyre!( |
|
|
|
"received input for unexpected operator `{}`", |
|
|
|
input.target_operator |
|
|
|
) |
|
|
|
})?; |
|
|
|
|
|
|
|
println!( |
|
|
|
"Received input {} for operator {}: {}", |
|
|
|
input.id, |
|
|
|
input.target_operator, |
|
|
|
String::from_utf8_lossy(&input.data) |
|
|
|
); |
|
|
|
|
|
|
|
operator |
|
|
|
.handle_input(input.id.clone(), input.data) |
|
|
|
.wrap_err_with(|| { |
|
|
|
format!( |
|
|
|
"operator {} failed to handle input {}", |
|
|
|
input.target_operator, input.id |
|
|
|
) |
|
|
|
})?; |
|
|
|
} |
|
|
|
Event::Operator { id, event } => match event {}, |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
Ok(()) |
|
|
|
@@ -93,7 +118,7 @@ async fn main() -> eyre::Result<()> { |
|
|
|
async fn subscribe<'a>( |
|
|
|
communication: &'a mut dyn CommunicationLayer, |
|
|
|
communication_config: &CommunicationConfig, |
|
|
|
operators: &'a [Operator], |
|
|
|
operators: &'a [OperatorConfig], |
|
|
|
) -> eyre::Result<impl futures::Stream<Item = OperatorInput> + 'a> { |
|
|
|
let prefix = &communication_config.zenoh_prefix; |
|
|
|
|
|
|
|
@@ -145,7 +170,15 @@ async fn subscribe<'a>( |
|
|
|
Ok(streams.merge().take_until(finished)) |
|
|
|
} |
|
|
|
|
|
|
|
pub struct OperatorInput { |
|
|
|
enum Event { |
|
|
|
Input(OperatorInput), |
|
|
|
Operator { |
|
|
|
id: OperatorId, |
|
|
|
event: OperatorEvent, |
|
|
|
}, |
|
|
|
} |
|
|
|
|
|
|
|
struct OperatorInput { |
|
|
|
pub target_operator: OperatorId, |
|
|
|
pub id: DataId, |
|
|
|
pub data: Vec<u8>, |
|
|
|
|