diff --git a/runtime/src/main.rs b/runtime/src/main.rs index 7f4a94d2..3db914bb 100644 --- a/runtime/src/main.rs +++ b/runtime/src/main.rs @@ -1,60 +1,38 @@ #![warn(unsafe_op_in_unsafe_fn)] -use clap::StructOpt; use dora_api::{ self, communication::CommunicationLayer, config::{CommunicationConfig, DataId, InputMapping, NodeId, OperatorId}, STOP_TOPIC, }; -use dora_common::{ - descriptor::{Descriptor, OperatorConfig}, - BoxError, -}; +use dora_common::{descriptor::OperatorConfig, BoxError}; use eyre::{bail, eyre, Context}; use futures::{stream::FuturesUnordered, StreamExt}; use futures_concurrency::Merge; use operator::{Operator, OperatorEvent}; -use std::{ - collections::{BTreeMap, HashSet}, - path::PathBuf, -}; +use std::collections::{BTreeMap, HashSet}; use tokio::sync::mpsc; use tokio_stream::{wrappers::ReceiverStream, StreamMap}; mod operator; -#[derive(Debug, Clone, clap::Parser)] -#[clap(about = "Limit the rate of incoming data")] -struct Args { - #[clap(long)] - node_id: NodeId, - #[clap(long)] - dataflow: PathBuf, -} - #[tokio::main] async fn main() -> eyre::Result<()> { - let args = Args::parse(); - - let dataflow: Descriptor = { - let raw = tokio::fs::read(&args.dataflow) - .await - .wrap_err("failed to read dataflow file")?; - serde_yaml::from_slice(&raw).wrap_err("failed to parse dataflow file")? + let node_id = { + let raw = + std::env::var("DORA_NODE_ID").wrap_err("env variable DORA_NODE_ID must be set")?; + serde_yaml::from_str(&raw).context("failed to deserialize operator config")? }; - - let node = dataflow - .nodes - .into_iter() - .find(|n| n.id == args.node_id) - .ok_or_else(|| eyre!("did not find node ID `{}` in dataflow file", args.node_id))?; - - let operators = match node.kind { - dora_common::descriptor::NodeKind::Operators(operators) => operators, - dora_common::descriptor::NodeKind::Custom(_) => { - bail!("node `{}` is a custom node", args.node_id) - } + let communication_config: CommunicationConfig = { + let raw = std::env::var("DORA_COMMUNICATION_CONFIG") + .wrap_err("env variable DORA_COMMUNICATION_CONFIG must be set")?; + serde_yaml::from_str(&raw).context("failed to deserialize communication config")? + }; + let operators: Vec = { + let raw = + std::env::var("DORA_OPERATORS").wrap_err("env variable DORA_OPERATORS must be set")?; + serde_yaml::from_str(&raw).context("failed to deserialize operator config")? }; let mut operator_map = BTreeMap::new(); @@ -68,13 +46,13 @@ async fn main() -> eyre::Result<()> { operator_events.insert(operator_config.id.clone(), ReceiverStream::new(events)); } - let zenoh = zenoh::open(dataflow.communication.zenoh_config.clone()) + let zenoh = zenoh::open(communication_config.zenoh_config.clone()) .await .map_err(BoxError) .wrap_err("failed to create zenoh session")?; let communication: Box = Box::new(zenoh); - let inputs = subscribe(communication.as_ref(), &dataflow.communication, &operators) + let inputs = subscribe(communication.as_ref(), &communication_config, &operators) .await .context("failed to subscribe")?; @@ -120,12 +98,12 @@ async fn main() -> eyre::Result<()> { eyre::bail!("unknown output {data_id} for operator {id}"); } publish( - &args.node_id, + &node_id, id, data_id, &value, communication.as_ref(), - &dataflow.communication, + &communication_config, ) .await .context("failed to publish operator output")?; diff --git a/runtime/src/operator/shared_lib.rs b/runtime/src/operator/shared_lib.rs index a22bfbc3..d7c3e713 100644 --- a/runtime/src/operator/shared_lib.rs +++ b/runtime/src/operator/shared_lib.rs @@ -21,7 +21,7 @@ pub fn spawn( thread::spawn(move || { let closure = AssertUnwindSafe(|| { - let bindings = Bindings::init(&library)?; + let bindings = Bindings::init(&library).context("failed to init operator")?; let operator = SharedLibraryOperator { events_tx: events_tx.clone(),