Browse Source

Implement timer messages

tags/v0.2.0-candidate
Philipp Oppermann 3 years ago
parent
commit
a10fd0b1cb
Failed to extract signature
3 changed files with 111 additions and 13 deletions
  1. +108
    -10
      binaries/daemon/src/main.rs
  2. +2
    -2
      examples/rust-dataflow/sink/src/main.rs
  3. +1
    -1
      libraries/core/src/daemon_messages.rs

+ 108
- 10
binaries/daemon/src/main.rs View File

@@ -3,12 +3,14 @@ use dora_core::{
daemon_messages::{self, ControlReply, DaemonCoordinatorEvent, DataflowId, SpawnDataflowNodes},
topics::DORA_COORDINATOR_PORT_DEFAULT,
};
use dora_message::uhlc::HLC;
use eyre::{bail, eyre, Context, ContextCompat};
use futures_concurrency::stream::Merge;
use shared_memory::{Shmem, ShmemConf};
use std::{
collections::{BTreeSet, HashMap},
collections::{BTreeMap, BTreeSet, HashMap},
net::{Ipv4Addr, SocketAddr},
time::Duration,
};
use tokio::{
net::TcpStream,
@@ -47,6 +49,8 @@ struct Daemon {
sent_out_shared_memory: HashMap<String, Shmem>,

running: HashMap<DataflowId, RunningDataflow>,

dora_events_tx: mpsc::Sender<DoraEvent>,
}

impl Daemon {
@@ -70,13 +74,16 @@ impl Daemon {
});
tracing::info!("Listening for node connections on 127.0.0.1:{port}");

let (dora_events_tx, dora_events_rx) = mpsc::channel(5);
let daemon = Self {
port,
uninit_shared_memory: Default::default(),
sent_out_shared_memory: Default::default(),
running: HashMap::new(),
dora_events_tx,
};
let events = (coordinator_events, new_connections).merge();
let dora_events = ReceiverStream::new(dora_events_rx).map(Event::Dora);
let events = (coordinator_events, new_connections, dora_events).merge();
daemon.run_inner(events).await
}

@@ -108,6 +115,7 @@ impl Daemon {
self.handle_node_event(event, dataflow, node_id, reply_sender)
.await?
}
Event::Dora(event) => self.handle_dora_event(event).await?,
}
}

@@ -128,14 +136,27 @@ impl Daemon {
bail!("there is already a running dataflow with ID `{dataflow_id}`")
}
};

for (node_id, params) in nodes {
for (input_id, mapping) in params.node.run_config.inputs.clone() {
if let InputMapping::User(mapping) = mapping {
dataflow
.mappings
.entry((mapping.source, mapping.output))
.or_default()
.insert((node_id.clone(), input_id));
match mapping {
InputMapping::User(mapping) => {
if mapping.operator.is_some() {
bail!("operators are not supported");
}
dataflow
.mappings
.entry((mapping.source, mapping.output))
.or_default()
.insert((node_id.clone(), input_id));
}
InputMapping::Timer { interval } => {
dataflow
.timers
.entry(interval)
.or_default()
.insert((node_id.clone(), input_id));
}
}
}

@@ -145,7 +166,31 @@ impl Daemon {
dataflow.node_tasks.insert(node_id, task);
}

// TODO: spawn timers
// spawn timer tasks
for interval in dataflow.timers.keys().copied() {
let events_tx = self.dora_events_tx.clone();
let task = async move {
let mut interval_stream = tokio::time::interval(interval);
let hlc = HLC::default();
loop {
interval_stream.tick().await;

let event = DoraEvent::Timer {
dataflow_id,
interval,
metadata: dora_message::Metadata::from_parameters(
hlc.new_timestamp(),
Default::default(),
),
};
if events_tx.send(event).await.is_err() {
break;
}
}
};
tokio::spawn(task);
}

Ok(())
}
}
@@ -216,7 +261,7 @@ impl Daemon {
.send_async(daemon_messages::NodeEvent::Input {
id: input_id.clone(),
metadata: metadata.clone(),
data: unsafe { daemon_messages::InputData::new(id.clone()) },
data: Some(unsafe { daemon_messages::InputData::new(id.clone()) }),
})
.await
.is_err()
@@ -245,6 +290,49 @@ impl Daemon {
}
Ok(())
}

async fn handle_dora_event(&mut self, event: DoraEvent) -> eyre::Result<()> {
match event {
DoraEvent::Timer {
dataflow_id,
interval,
metadata,
} => {
let Some(dataflow) = self.running.get_mut(&dataflow_id) else {
tracing::warn!("Timer event for unknown dataflow `{dataflow_id}`");
return Ok(())
};

let Some(subscribers) = dataflow.timers.get(&interval) else {
return Ok(());
};

let mut closed = Vec::new();
for (receiver_id, input_id) in subscribers {
let Some(channel) = dataflow.subscribe_channels.get(receiver_id) else {
continue;
};

if channel
.send_async(daemon_messages::NodeEvent::Input {
id: input_id.clone(),
metadata: metadata.clone(),
data: None,
})
.await
.is_err()
{
closed.push(receiver_id);
}
}
for id in closed {
dataflow.subscribe_channels.remove(id);
}

Ok(())
}
}
}
}

#[derive(Default)]
@@ -252,6 +340,7 @@ pub struct RunningDataflow {
subscribe_channels: HashMap<NodeId, flume::Sender<daemon_messages::NodeEvent>>,
node_tasks: HashMap<NodeId, tokio::task::JoinHandle<eyre::Result<()>>>,
mappings: HashMap<OutputId, BTreeSet<InputId>>,
timers: BTreeMap<Duration, BTreeSet<InputId>>,
}

type OutputId = (NodeId, DataId);
@@ -267,6 +356,7 @@ pub enum Event {
reply_sender: oneshot::Sender<ControlReply>,
},
Coordinator(DaemonCoordinatorEvent),
Dora(DoraEvent),
}

#[derive(Debug)]
@@ -285,6 +375,14 @@ pub enum DaemonNodeEvent {
},
}

pub enum DoraEvent {
Timer {
dataflow_id: DataflowId,
interval: Duration,
metadata: dora_message::Metadata<'static>,
},
}

type MessageId = String;

fn set_up_tracing() -> eyre::Result<()> {


+ 2
- 2
examples/rust-dataflow/sink/src/main.rs View File

@@ -1,5 +1,5 @@
use dora_node_api::{self, dora_core::daemon_messages::NodeEvent, DoraNode};
use eyre::{bail, Context};
use eyre::{bail, Context, ContextCompat};

fn main() -> eyre::Result<()> {
let (_node, events) = DoraNode::init_from_env()?;
@@ -13,7 +13,7 @@ fn main() -> eyre::Result<()> {
data,
} => match id.as_str() {
"message" => {
let data = data.map()?;
let data = data.wrap_err("no data")?.map()?;
let received_string = std::str::from_utf8(&data)
.wrap_err("received message was not utf8-encoded")?;
println!("received message: {}", received_string);


+ 1
- 1
libraries/core/src/daemon_messages.rs View File

@@ -52,7 +52,7 @@ pub enum NodeEvent {
Input {
id: DataId,
metadata: Metadata<'static>,
data: InputData,
data: Option<InputData>,
},
}



Loading…
Cancel
Save