From a10fd0b1cbc71c5478a0f0ae5dfb5779b7b8d114 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Tue, 13 Dec 2022 13:16:38 +0100 Subject: [PATCH] Implement timer messages --- binaries/daemon/src/main.rs | 118 ++++++++++++++++++++++-- examples/rust-dataflow/sink/src/main.rs | 4 +- libraries/core/src/daemon_messages.rs | 2 +- 3 files changed, 111 insertions(+), 13 deletions(-) diff --git a/binaries/daemon/src/main.rs b/binaries/daemon/src/main.rs index 0fd9f4fd..1352718a 100644 --- a/binaries/daemon/src/main.rs +++ b/binaries/daemon/src/main.rs @@ -3,12 +3,14 @@ use dora_core::{ daemon_messages::{self, ControlReply, DaemonCoordinatorEvent, DataflowId, SpawnDataflowNodes}, topics::DORA_COORDINATOR_PORT_DEFAULT, }; +use dora_message::uhlc::HLC; use eyre::{bail, eyre, Context, ContextCompat}; use futures_concurrency::stream::Merge; use shared_memory::{Shmem, ShmemConf}; use std::{ - collections::{BTreeSet, HashMap}, + collections::{BTreeMap, BTreeSet, HashMap}, net::{Ipv4Addr, SocketAddr}, + time::Duration, }; use tokio::{ net::TcpStream, @@ -47,6 +49,8 @@ struct Daemon { sent_out_shared_memory: HashMap, running: HashMap, + + dora_events_tx: mpsc::Sender, } impl Daemon { @@ -70,13 +74,16 @@ impl Daemon { }); tracing::info!("Listening for node connections on 127.0.0.1:{port}"); + let (dora_events_tx, dora_events_rx) = mpsc::channel(5); let daemon = Self { port, uninit_shared_memory: Default::default(), sent_out_shared_memory: Default::default(), running: HashMap::new(), + dora_events_tx, }; - let events = (coordinator_events, new_connections).merge(); + let dora_events = ReceiverStream::new(dora_events_rx).map(Event::Dora); + let events = (coordinator_events, new_connections, dora_events).merge(); daemon.run_inner(events).await } @@ -108,6 +115,7 @@ impl Daemon { self.handle_node_event(event, dataflow, node_id, reply_sender) .await? } + Event::Dora(event) => self.handle_dora_event(event).await?, } } @@ -128,14 +136,27 @@ impl Daemon { bail!("there is already a running dataflow with ID `{dataflow_id}`") } }; + for (node_id, params) in nodes { for (input_id, mapping) in params.node.run_config.inputs.clone() { - if let InputMapping::User(mapping) = mapping { - dataflow - .mappings - .entry((mapping.source, mapping.output)) - .or_default() - .insert((node_id.clone(), input_id)); + match mapping { + InputMapping::User(mapping) => { + if mapping.operator.is_some() { + bail!("operators are not supported"); + } + dataflow + .mappings + .entry((mapping.source, mapping.output)) + .or_default() + .insert((node_id.clone(), input_id)); + } + InputMapping::Timer { interval } => { + dataflow + .timers + .entry(interval) + .or_default() + .insert((node_id.clone(), input_id)); + } } } @@ -145,7 +166,31 @@ impl Daemon { dataflow.node_tasks.insert(node_id, task); } - // TODO: spawn timers + // spawn timer tasks + for interval in dataflow.timers.keys().copied() { + let events_tx = self.dora_events_tx.clone(); + let task = async move { + let mut interval_stream = tokio::time::interval(interval); + let hlc = HLC::default(); + loop { + interval_stream.tick().await; + + let event = DoraEvent::Timer { + dataflow_id, + interval, + metadata: dora_message::Metadata::from_parameters( + hlc.new_timestamp(), + Default::default(), + ), + }; + if events_tx.send(event).await.is_err() { + break; + } + } + }; + tokio::spawn(task); + } + Ok(()) } } @@ -216,7 +261,7 @@ impl Daemon { .send_async(daemon_messages::NodeEvent::Input { id: input_id.clone(), metadata: metadata.clone(), - data: unsafe { daemon_messages::InputData::new(id.clone()) }, + data: Some(unsafe { daemon_messages::InputData::new(id.clone()) }), }) .await .is_err() @@ -245,6 +290,49 @@ impl Daemon { } Ok(()) } + + async fn handle_dora_event(&mut self, event: DoraEvent) -> eyre::Result<()> { + match event { + DoraEvent::Timer { + dataflow_id, + interval, + metadata, + } => { + let Some(dataflow) = self.running.get_mut(&dataflow_id) else { + tracing::warn!("Timer event for unknown dataflow `{dataflow_id}`"); + return Ok(()) + }; + + let Some(subscribers) = dataflow.timers.get(&interval) else { + return Ok(()); + }; + + let mut closed = Vec::new(); + for (receiver_id, input_id) in subscribers { + let Some(channel) = dataflow.subscribe_channels.get(receiver_id) else { + continue; + }; + + if channel + .send_async(daemon_messages::NodeEvent::Input { + id: input_id.clone(), + metadata: metadata.clone(), + data: None, + }) + .await + .is_err() + { + closed.push(receiver_id); + } + } + for id in closed { + dataflow.subscribe_channels.remove(id); + } + + Ok(()) + } + } + } } #[derive(Default)] @@ -252,6 +340,7 @@ pub struct RunningDataflow { subscribe_channels: HashMap>, node_tasks: HashMap>>, mappings: HashMap>, + timers: BTreeMap>, } type OutputId = (NodeId, DataId); @@ -267,6 +356,7 @@ pub enum Event { reply_sender: oneshot::Sender, }, Coordinator(DaemonCoordinatorEvent), + Dora(DoraEvent), } #[derive(Debug)] @@ -285,6 +375,14 @@ pub enum DaemonNodeEvent { }, } +pub enum DoraEvent { + Timer { + dataflow_id: DataflowId, + interval: Duration, + metadata: dora_message::Metadata<'static>, + }, +} + type MessageId = String; fn set_up_tracing() -> eyre::Result<()> { diff --git a/examples/rust-dataflow/sink/src/main.rs b/examples/rust-dataflow/sink/src/main.rs index ee12b7f0..59f631f0 100644 --- a/examples/rust-dataflow/sink/src/main.rs +++ b/examples/rust-dataflow/sink/src/main.rs @@ -1,5 +1,5 @@ use dora_node_api::{self, dora_core::daemon_messages::NodeEvent, DoraNode}; -use eyre::{bail, Context}; +use eyre::{bail, Context, ContextCompat}; fn main() -> eyre::Result<()> { let (_node, events) = DoraNode::init_from_env()?; @@ -13,7 +13,7 @@ fn main() -> eyre::Result<()> { data, } => match id.as_str() { "message" => { - let data = data.map()?; + let data = data.wrap_err("no data")?.map()?; let received_string = std::str::from_utf8(&data) .wrap_err("received message was not utf8-encoded")?; println!("received message: {}", received_string); diff --git a/libraries/core/src/daemon_messages.rs b/libraries/core/src/daemon_messages.rs index 7a1eaf4a..a0494535 100644 --- a/libraries/core/src/daemon_messages.rs +++ b/libraries/core/src/daemon_messages.rs @@ -52,7 +52,7 @@ pub enum NodeEvent { Input { id: DataId, metadata: Metadata<'static>, - data: InputData, + data: Option, }, }