| @@ -13,7 +13,7 @@ pub const STOP_TOPIC: &str = "__dora_rs_internal__operator_stopped"; | |||
| pub struct DoraNode { | |||
| id: NodeId, | |||
| operator_config: NodeRunConfig, | |||
| node_config: NodeRunConfig, | |||
| communication_config: CommunicationConfig, | |||
| communication: Box<dyn CommunicationLayer>, | |||
| } | |||
| @@ -25,7 +25,7 @@ impl DoraNode { | |||
| std::env::var("DORA_NODE_ID").wrap_err("env variable DORA_NODE_ID must be set")?; | |||
| serde_yaml::from_str(&raw).context("failed to deserialize operator config")? | |||
| }; | |||
| let operator_config = { | |||
| let node_config = { | |||
| let raw = std::env::var("DORA_NODE_RUN_CONFIG") | |||
| .wrap_err("env variable DORA_NODE_RUN_CONFIG must be set")?; | |||
| serde_yaml::from_str(&raw).context("failed to deserialize operator config")? | |||
| @@ -35,12 +35,12 @@ impl DoraNode { | |||
| .wrap_err("env variable DORA_COMMUNICATION_CONFIG must be set")?; | |||
| serde_yaml::from_str(&raw).context("failed to deserialize communication config")? | |||
| }; | |||
| Self::init(id, operator_config, communication_config).await | |||
| Self::init(id, node_config, communication_config).await | |||
| } | |||
| pub async fn init( | |||
| id: NodeId, | |||
| operator_config: NodeRunConfig, | |||
| node_config: NodeRunConfig, | |||
| communication_config: CommunicationConfig, | |||
| ) -> eyre::Result<Self> { | |||
| let zenoh = zenoh::open(communication_config.zenoh_config.clone()) | |||
| @@ -50,7 +50,7 @@ impl DoraNode { | |||
| Ok(Self { | |||
| id, | |||
| operator_config, | |||
| node_config, | |||
| communication_config, | |||
| communication: Box::new(zenoh), | |||
| }) | |||
| @@ -67,7 +67,7 @@ impl DoraNode { | |||
| operator, | |||
| output, | |||
| }, | |||
| ) in &self.operator_config.inputs | |||
| ) in &self.node_config.inputs | |||
| { | |||
| let topic = match operator { | |||
| Some(operator) => format!("{prefix}/{source}/{operator}/{output}"), | |||
| @@ -86,7 +86,7 @@ impl DoraNode { | |||
| let stop_messages = FuturesUnordered::new(); | |||
| let sources: HashSet<_> = self | |||
| .operator_config | |||
| .node_config | |||
| .inputs | |||
| .values() | |||
| .map(|v| (&v.source, &v.operator)) | |||
| @@ -109,7 +109,7 @@ impl DoraNode { | |||
| } | |||
| pub async fn send_output(&self, output_id: &DataId, data: &[u8]) -> eyre::Result<()> { | |||
| if !self.operator_config.outputs.contains(output_id) { | |||
| if !self.node_config.outputs.contains(output_id) { | |||
| eyre::bail!("unknown output"); | |||
| } | |||
| @@ -182,7 +182,7 @@ mod tests { | |||
| #[test] | |||
| fn no_op_operator() { | |||
| let id = uuid::Uuid::new_v4().to_string().into(); | |||
| let operator_config = config::NodeRunConfig { | |||
| let node_config = config::NodeRunConfig { | |||
| inputs: Default::default(), | |||
| outputs: Default::default(), | |||
| }; | |||
| @@ -192,7 +192,7 @@ mod tests { | |||
| }; | |||
| run(async { | |||
| let operator = DoraNode::init(id, operator_config, communication_config) | |||
| let operator = DoraNode::init(id, node_config, communication_config) | |||
| .await | |||
| .unwrap(); | |||
| let mut inputs = operator.inputs().await.unwrap(); | |||