@@ -1,60 +1,38 @@
#![warn(unsafe_op_in_unsafe_fn)]
use clap::StructOpt;
use dora_api::{
self,
communication::CommunicationLayer,
config::{CommunicationConfig, DataId, InputMapping, NodeId, OperatorId},
STOP_TOPIC,
};
use dora_common::{
descriptor::{Descriptor, OperatorConfig},
BoxError,
};
use dora_common::{descriptor::OperatorConfig, BoxError};
use eyre::{bail, eyre, Context};
use futures::{stream::FuturesUnordered, StreamExt};
use futures_concurrency::Merge;
use operator::{Operator, OperatorEvent};
use std::{
collections::{BTreeMap, HashSet},
path::PathBuf,
};
use std::collections::{BTreeMap, HashSet};
use tokio::sync::mpsc;
use tokio_stream::{wrappers::ReceiverStream, StreamMap};
mod operator;
#[derive(Debug, Clone, clap::Parser)]
#[clap(about = "Limit the rate of incoming data")]
struct Args {
#[clap(long)]
node_id: NodeId,
#[clap(long)]
dataflow: PathBuf,
}
#[tokio::main]
async fn main() -> eyre::Result<()> {
let args = Args::parse();
let dataflow: Descriptor = {
let raw = tokio::fs::read(&args.dataflow)
.await
.wrap_err("failed to read dataflow file")?;
serde_yaml::from_slice(&raw).wrap_err("failed to parse dataflow file")?
let node_id = {
let raw =
std::env::var("DORA_NODE_ID").wrap_err("env variable DORA_NODE_ID must be set")?;
serde_yaml::from_str(&raw).context("failed to deserialize operator config")?
};
let node = dataflow
.nodes
.into_iter()
.find(|n| n.id == args.node_id)
.ok_or_else(|| eyre!("did not find node ID `{}` in dataflow file", args.node_id))?;
let operators = match node.kind {
dora_common::descriptor::NodeKind::Operators(operators) => operators,
dora_common::descriptor::NodeKind::Custom(_) => {
bail!("node `{}` is a custom node", args.node_id)
}
let communication_config: CommunicationConfig = {
let raw = std::env::var("DORA_COMMUNICATION_CONFIG")
.wrap_err("env variable DORA_COMMUNICATION_CONFIG must be set")?;
serde_yaml::from_str(&raw).context("failed to deserialize communication config")?
};
let operators: Vec<OperatorConfig> = {
let raw =
std::env::var("DORA_OPERATORS").wrap_err("env variable DORA_OPERATORS must be set")?;
serde_yaml::from_str(&raw).context("failed to deserialize operator config")?
};
let mut operator_map = BTreeMap::new();
@@ -68,13 +46,13 @@ async fn main() -> eyre::Result<()> {
operator_events.insert(operator_config.id.clone(), ReceiverStream::new(events));
}
let zenoh = zenoh::open(dataflow. communication.zenoh_config.clone())
let zenoh = zenoh::open(communication_config .zenoh_config.clone())
.await
.map_err(BoxError)
.wrap_err("failed to create zenoh session")?;
let communication: Box<dyn CommunicationLayer> = Box::new(zenoh);
let inputs = subscribe(communication.as_ref(), &dataflow. communication, &operators)
let inputs = subscribe(communication.as_ref(), &communication_config , &operators)
.await
.context("failed to subscribe")?;
@@ -120,12 +98,12 @@ async fn main() -> eyre::Result<()> {
eyre::bail!("unknown output {data_id} for operator {id}");
}
publish(
&args. node_id,
&node_id,
id,
data_id,
&value,
communication.as_ref(),
&dataflow. communication,
&communication_config ,
)
.await
.context("failed to publish operator output")?;