Browse Source

Start adding back support for daemon communication over TCP

tags/v0.2.0-candidate
Philipp Oppermann 2 years ago
parent
commit
ff836d200c
Failed to extract signature
13 changed files with 827 additions and 371 deletions
  1. +1
    -0
      Cargo.lock
  2. +23
    -15
      apis/rust/node/src/daemon.rs
  3. +3
    -9
      apis/rust/node/src/lib.rs
  4. +1
    -1
      binaries/cli/src/check.rs
  5. +1
    -0
      binaries/coordinator/src/run/mod.rs
  6. +1
    -0
      binaries/daemon/Cargo.toml
  7. +11
    -4
      binaries/daemon/src/lib.rs
  8. +2
    -298
      binaries/daemon/src/listener/mod.rs
  9. +298
    -0
      binaries/daemon/src/listener/shmem.rs
  10. +368
    -0
      binaries/daemon/src/listener/tcp.rs
  11. +85
    -39
      binaries/daemon/src/spawn.rs
  12. +27
    -4
      libraries/core/src/daemon_messages.rs
  13. +6
    -1
      libraries/core/src/descriptor/mod.rs

+ 1
- 0
Cargo.lock View File

@@ -1008,6 +1008,7 @@ dependencies = [
name = "dora-daemon"
version = "0.1.0"
dependencies = [
"bincode",
"clap 3.2.20",
"ctrlc",
"dora-core",


+ 23
- 15
apis/rust/node/src/daemon.rs View File

@@ -1,6 +1,6 @@
use dora_core::{
config::{DataId, NodeId},
daemon_messages::{DaemonReply, DaemonRequest, DataflowId, NodeEvent},
daemon_messages::{DaemonCommunication, DaemonReply, DaemonRequest, DataflowId, NodeEvent},
};
use dora_message::Metadata;
use eyre::{bail, eyre, Context};
@@ -17,21 +17,29 @@ impl DaemonConnection {
pub(crate) fn init(
dataflow_id: DataflowId,
node_id: &NodeId,
daemon_control_region_id: &str,
daemon_events_region_id: &str,
daemon_communication: &DaemonCommunication,
) -> eyre::Result<Self> {
let control_channel = ControlChannel::init(dataflow_id, node_id, daemon_control_region_id)
.wrap_err("failed to init control stream")?;

let (event_stream, event_stream_thread) =
EventStream::init(dataflow_id, node_id, daemon_events_region_id)
.wrap_err("failed to init event stream")?;

Ok(Self {
control_channel,
event_stream,
event_stream_thread,
})
match daemon_communication {
DaemonCommunication::Shmem {
daemon_control_region_id,
daemon_events_region_id,
} => {
let control_channel =
ControlChannel::init(dataflow_id, node_id, daemon_control_region_id)
.wrap_err("failed to init control stream")?;

let (event_stream, event_stream_thread) =
EventStream::init(dataflow_id, node_id, daemon_events_region_id)
.wrap_err("failed to init event stream")?;

Ok(Self {
control_channel,
event_stream,
event_stream_thread,
})
}
DaemonCommunication::Tcp { socket_addr } => todo!(),
}
}
}



+ 3
- 9
apis/rust/node/src/lib.rs View File

@@ -39,21 +39,15 @@ impl DoraNode {
dataflow_id,
node_id,
run_config,
daemon_control_region_id,
daemon_events_region_id,
daemon_communication,
} = node_config;

let DaemonConnection {
control_channel,
event_stream,
event_stream_thread,
} = DaemonConnection::init(
dataflow_id,
&node_id,
&daemon_control_region_id,
&daemon_events_region_id,
)
.wrap_err("failed to connect to dora-daemon")?;
} = DaemonConnection::init(dataflow_id, &node_id, &daemon_communication)
.wrap_err("failed to connect to dora-daemon")?;

let node = Self {
id: node_id,


+ 1
- 1
binaries/cli/src/check.rs View File

@@ -6,7 +6,7 @@ use dora_core::{
topics::ControlRequest,
};
use eyre::{bail, eyre, Context};
use std::{env::consts::EXE_EXTENSION, io::Write, path::Path, str::FromStr};
use std::{env::consts::EXE_EXTENSION, io::Write, path::Path};
use termcolor::{Color, ColorChoice, ColorSpec, WriteColor};

pub fn check_environment() -> eyre::Result<()> {


+ 1
- 0
binaries/coordinator/src/run/mod.rs View File

@@ -65,6 +65,7 @@ pub async fn spawn_dataflow(
dataflow_id: uuid,
working_dir,
nodes,
daemon_communication: descriptor.daemon_config,
};
let message = serde_json::to_vec(&DaemonCoordinatorEvent::Spawn(spawn_command))?;



+ 1
- 0
binaries/daemon/Cargo.toml View File

@@ -24,3 +24,4 @@ futures = "0.3.25"
clap = { version = "3.1.8", features = ["derive"] }
shared-memory-server = { path = "../../libraries/shared-memory-server" }
ctrlc = "3.2.5"
bincode = "1.3.3"

+ 11
- 4
binaries/daemon/src/lib.rs View File

@@ -3,8 +3,8 @@ use dora_core::{
config::{DataId, InputMapping, NodeId},
coordinator_messages::DaemonEvent,
daemon_messages::{
self, DaemonCoordinatorEvent, DaemonCoordinatorReply, DaemonReply, DataflowId, DropToken,
SpawnDataflowNodes,
self, DaemonCommunicationConfig, DaemonCoordinatorEvent, DaemonCoordinatorReply,
DaemonReply, DataflowId, DropToken, SpawnDataflowNodes,
},
descriptor::{CoreNodeKind, Descriptor, ResolvedNode},
};
@@ -68,12 +68,14 @@ impl Daemon {
.ok_or_else(|| eyre::eyre!("canonicalized dataflow path has no parent"))?
.to_owned();

let nodes = read_descriptor(dataflow_path).await?.resolve_aliases();
let descriptor = read_descriptor(dataflow_path).await?;
let nodes = descriptor.resolve_aliases();

let spawn_command = SpawnDataflowNodes {
dataflow_id: Uuid::new_v4(),
working_dir,
nodes,
daemon_communication: descriptor.daemon_config,
};

let exit_when_done = spawn_command
@@ -234,8 +236,11 @@ impl Daemon {
dataflow_id,
working_dir,
nodes,
daemon_communication,
}) => {
let result = self.spawn_dataflow(dataflow_id, working_dir, nodes).await;
let result = self
.spawn_dataflow(dataflow_id, working_dir, nodes, daemon_communication)
.await;
if let Err(err) = &result {
tracing::error!("{err:?}");
}
@@ -276,6 +281,7 @@ impl Daemon {
dataflow_id: uuid::Uuid,
working_dir: PathBuf,
nodes: Vec<ResolvedNode>,
daemon_communication_config: DaemonCommunicationConfig,
) -> eyre::Result<()> {
let dataflow = match self.running.entry(dataflow_id) {
std::collections::hash_map::Entry::Vacant(entry) => entry.insert(Default::default()),
@@ -318,6 +324,7 @@ impl Daemon {
node,
self.events_tx.clone(),
self.shared_memory_handler_node.clone(),
daemon_communication_config,
)
.await
.wrap_err_with(|| format!("failed to spawn node `{node_id}`"))?;


+ 2
- 298
binaries/daemon/src/listener/mod.rs View File

@@ -1,298 +1,2 @@
use std::collections::VecDeque;

use crate::{shared_mem_handler, DaemonNodeEvent, Event};
use dora_core::{
config::NodeId,
daemon_messages::{DaemonReply, DaemonRequest, DataflowId, DropEvent, NodeEvent},
};
use eyre::{eyre, Context};
use shared_memory_server::ShmemServer;
use tokio::sync::{mpsc, oneshot};

#[tracing::instrument(skip(server, daemon_tx, shmem_handler_tx))]
pub fn listener_loop(
mut server: ShmemServer<DaemonRequest, DaemonReply>,
daemon_tx: mpsc::Sender<Event>,
shmem_handler_tx: flume::Sender<shared_mem_handler::NodeEvent>,
) {
// receive the first message
let message = match server
.listen()
.wrap_err("failed to receive register message")
{
Ok(Some(m)) => m,
Ok(None) => {
tracing::info!("channel disconnected before register message");
return;
} // disconnected
Err(err) => {
tracing::info!("{err:?}");
return;
}
};

match message {
DaemonRequest::Register {
dataflow_id,
node_id,
} => {
let reply = DaemonReply::Result(Ok(()));
match server
.send_reply(&reply)
.wrap_err("failed to send register reply")
{
Ok(()) => {
let mut listener = Listener {
dataflow_id,
node_id,
server,
daemon_tx,
shmem_handler_tx,
subscribed_events: None,
max_queue_len: 10, // TODO: make this configurable
queue: VecDeque::new(),
};
match listener.run().wrap_err("listener failed") {
Ok(()) => {}
Err(err) => tracing::error!("{err:?}"),
}
}
Err(err) => {
tracing::warn!("{err:?}");
}
}
}
_ => {
let reply = DaemonReply::Result(Err("must send register message first".into()));
if let Err(err) = server.send_reply(&reply).wrap_err("failed to send reply") {
tracing::warn!("{err:?}");
}
}
}
}

struct Listener {
dataflow_id: DataflowId,
node_id: NodeId,
server: ShmemServer<DaemonRequest, DaemonReply>,
daemon_tx: mpsc::Sender<Event>,
shmem_handler_tx: flume::Sender<shared_mem_handler::NodeEvent>,
subscribed_events: Option<flume::Receiver<NodeEvent>>,
max_queue_len: usize,
queue: VecDeque<NodeEvent>,
}

impl Listener {
fn run(&mut self) -> eyre::Result<()> {
loop {
// receive the next node message
let message = match self
.server
.listen()
.wrap_err("failed to receive DaemonRequest")
{
Ok(Some(m)) => m,
Ok(None) => {
tracing::info!(
"channel disconnected: {}/{}",
self.dataflow_id,
self.node_id
);
break;
} // disconnected
Err(err) => {
tracing::warn!("{err:?}");
continue;
}
};

// handle incoming events
self.handle_events()?;

self.handle_message(message)?;
}
Ok(())
}

fn handle_events(&mut self) -> eyre::Result<()> {
if let Some(events) = &mut self.subscribed_events {
while let Ok(event) = events.try_recv() {
self.queue.push_back(event);
}

// drop oldest input events to maintain max queue length queue
let input_event_count = self
.queue
.iter()
.filter(|e| matches!(e, NodeEvent::Input { .. }))
.count();
let drop_n = input_event_count.saturating_sub(self.max_queue_len);
self.drop_oldest_inputs(drop_n)?;
}
Ok(())
}

fn drop_oldest_inputs(&mut self, number: usize) -> Result<(), eyre::ErrReport> {
let mut drop_tokens = Vec::new();
for i in 0..number {
// find index of oldest input event
let index = self
.queue
.iter()
.position(|e| matches!(e, NodeEvent::Input { .. }))
.expect(&format!("no input event found in drop iteration {i}"));

// remove that event
if let Some(event) = self.queue.remove(index) {
if let NodeEvent::Input {
data: Some(data), ..
} = event
{
drop_tokens.push(data.drop_token);
}
}
}
self.report_drop_tokens(drop_tokens)?;
Ok(())
}

fn handle_message(&mut self, message: DaemonRequest) -> eyre::Result<()> {
match message {
DaemonRequest::Register { .. } => {
let reply = DaemonReply::Result(Err("unexpected register message".into()));
self.send_reply(&reply)?;
}
DaemonRequest::Stopped => self.process_daemon_event(DaemonNodeEvent::Stopped)?,
DaemonRequest::CloseOutputs(outputs) => {
self.process_daemon_event(DaemonNodeEvent::CloseOutputs(outputs))?
}
DaemonRequest::PrepareOutputMessage {
output_id,
metadata,
data_len,
} => {
let (reply_sender, reply) = oneshot::channel();
let event = shared_mem_handler::NodeEvent::PrepareOutputMessage {
dataflow_id: self.dataflow_id,
node_id: self.node_id.clone(),
output_id,
metadata,
data_len,
reply_sender,
};
self.send_shared_memory_event(event)?;
let reply = reply
.blocking_recv()
.wrap_err("failed to receive prepare output reply")?;
// tracing::debug!("prepare latency: {:?}", start.elapsed()?);
self.send_reply(&reply)?;
}
DaemonRequest::SendPreparedMessage { id } => {
let (reply_sender, reply) = oneshot::channel();
let event = shared_mem_handler::NodeEvent::SendPreparedMessage { id, reply_sender };
self.send_shared_memory_event(event)?;
self.send_reply(
&reply
.blocking_recv()
.wrap_err("failed to receive send output reply")?,
)?;
}
DaemonRequest::SendEmptyMessage {
output_id,
metadata,
} => {
// let elapsed = metadata.timestamp().get_time().to_system_time().elapsed()?;
// tracing::debug!("listener SendEmptyMessage: {elapsed:?}");
let event = crate::Event::ShmemHandler(crate::ShmemHandlerEvent::SendOut {
dataflow_id: self.dataflow_id,
node_id: self.node_id.clone(),
output_id,
metadata,
data: None,
});
let result = self
.send_daemon_event(event)
.map_err(|_| "failed to receive send_empty_message reply".to_owned());
self.send_reply(&DaemonReply::Result(result))?;
}
DaemonRequest::Subscribe => {
let (tx, rx) = flume::bounded(100);
self.process_daemon_event(DaemonNodeEvent::Subscribe { event_sender: tx })?;
self.subscribed_events = Some(rx);
}
DaemonRequest::NextEvent { drop_tokens } => {
self.report_drop_tokens(drop_tokens)?;

// try to take the latest queued event first
let queued_event = self.queue.pop_front().map(DaemonReply::NodeEvent);
let reply = queued_event.unwrap_or_else(|| {
match self.subscribed_events.as_mut() {
// wait for next event
Some(events) => match events.recv() {
Ok(event) => DaemonReply::NodeEvent(event),
Err(flume::RecvError::Disconnected) => DaemonReply::Closed,
},
None => {
DaemonReply::Result(Err("Ignoring event request because no subscribe \
message was sent yet"
.into()))
}
}
});

self.send_reply(&reply)?;
}
}
Ok(())
}

fn report_drop_tokens(
&mut self,
drop_tokens: Vec<dora_core::daemon_messages::DropToken>,
) -> eyre::Result<()> {
if !drop_tokens.is_empty() {
let drop_event = shared_mem_handler::NodeEvent::Drop(DropEvent {
tokens: drop_tokens,
});
self.send_shared_memory_event(drop_event)?;
}
Ok(())
}

fn process_daemon_event(&mut self, event: DaemonNodeEvent) -> eyre::Result<()> {
// send NodeEvent to daemon main loop
let (reply_tx, reply) = oneshot::channel();
let event = Event::Node {
dataflow_id: self.dataflow_id.clone(),
node_id: self.node_id.clone(),
event,
reply_sender: reply_tx,
};
self.daemon_tx
.blocking_send(event)
.map_err(|_| eyre!("failed to send event to daemon"))?;
let reply = reply
.blocking_recv()
.map_err(|_| eyre!("failed to receive reply from daemon"))?;
self.send_reply(&reply)?;
Ok(())
}

fn send_reply(&mut self, reply: &DaemonReply) -> eyre::Result<()> {
self.server
.send_reply(&reply)
.wrap_err("failed to send reply to node")
}

fn send_shared_memory_event(&self, event: shared_mem_handler::NodeEvent) -> eyre::Result<()> {
self.shmem_handler_tx
.send(event)
.map_err(|_| eyre!("failed to send event to shared_mem_handler"))
}

fn send_daemon_event(&self, event: crate::Event) -> eyre::Result<()> {
self.daemon_tx
.blocking_send(event)
.map_err(|_| eyre!("failed to send event to daemon"))
}
}
pub mod shmem;
pub mod tcp;

+ 298
- 0
binaries/daemon/src/listener/shmem.rs View File

@@ -0,0 +1,298 @@
use std::collections::VecDeque;

use crate::{shared_mem_handler, DaemonNodeEvent, Event};
use dora_core::{
config::NodeId,
daemon_messages::{DaemonReply, DaemonRequest, DataflowId, DropEvent, NodeEvent},
};
use eyre::{eyre, Context};
use shared_memory_server::ShmemServer;
use tokio::sync::{mpsc, oneshot};

#[tracing::instrument(skip(server, daemon_tx, shmem_handler_tx))]
pub fn listener_loop(
mut server: ShmemServer<DaemonRequest, DaemonReply>,
daemon_tx: mpsc::Sender<Event>,
shmem_handler_tx: flume::Sender<shared_mem_handler::NodeEvent>,
) {
// receive the first message
let message = match server
.listen()
.wrap_err("failed to receive register message")
{
Ok(Some(m)) => m,
Ok(None) => {
tracing::info!("channel disconnected before register message");
return;
} // disconnected
Err(err) => {
tracing::info!("{err:?}");
return;
}
};

match message {
DaemonRequest::Register {
dataflow_id,
node_id,
} => {
let reply = DaemonReply::Result(Ok(()));
match server
.send_reply(&reply)
.wrap_err("failed to send register reply")
{
Ok(()) => {
let mut listener = Listener {
dataflow_id,
node_id,
server,
daemon_tx,
shmem_handler_tx,
subscribed_events: None,
max_queue_len: 10, // TODO: make this configurable
queue: VecDeque::new(),
};
match listener.run().wrap_err("listener failed") {
Ok(()) => {}
Err(err) => tracing::error!("{err:?}"),
}
}
Err(err) => {
tracing::warn!("{err:?}");
}
}
}
_ => {
let reply = DaemonReply::Result(Err("must send register message first".into()));
if let Err(err) = server.send_reply(&reply).wrap_err("failed to send reply") {
tracing::warn!("{err:?}");
}
}
}
}

struct Listener {
dataflow_id: DataflowId,
node_id: NodeId,
server: ShmemServer<DaemonRequest, DaemonReply>,
daemon_tx: mpsc::Sender<Event>,
shmem_handler_tx: flume::Sender<shared_mem_handler::NodeEvent>,
subscribed_events: Option<flume::Receiver<NodeEvent>>,
max_queue_len: usize,
queue: VecDeque<NodeEvent>,
}

impl Listener {
fn run(&mut self) -> eyre::Result<()> {
loop {
// receive the next node message
let message = match self
.server
.listen()
.wrap_err("failed to receive DaemonRequest")
{
Ok(Some(m)) => m,
Ok(None) => {
tracing::info!(
"channel disconnected: {}/{}",
self.dataflow_id,
self.node_id
);
break;
} // disconnected
Err(err) => {
tracing::warn!("{err:?}");
continue;
}
};

// handle incoming events
self.handle_events()?;

self.handle_message(message)?;
}
Ok(())
}

fn handle_events(&mut self) -> eyre::Result<()> {
if let Some(events) = &mut self.subscribed_events {
while let Ok(event) = events.try_recv() {
self.queue.push_back(event);
}

// drop oldest input events to maintain max queue length queue
let input_event_count = self
.queue
.iter()
.filter(|e| matches!(e, NodeEvent::Input { .. }))
.count();
let drop_n = input_event_count.saturating_sub(self.max_queue_len);
self.drop_oldest_inputs(drop_n)?;
}
Ok(())
}

fn drop_oldest_inputs(&mut self, number: usize) -> Result<(), eyre::ErrReport> {
let mut drop_tokens = Vec::new();
for i in 0..number {
// find index of oldest input event
let index = self
.queue
.iter()
.position(|e| matches!(e, NodeEvent::Input { .. }))
.expect(&format!("no input event found in drop iteration {i}"));

// remove that event
if let Some(event) = self.queue.remove(index) {
if let NodeEvent::Input {
data: Some(data), ..
} = event
{
drop_tokens.push(data.drop_token);
}
}
}
self.report_drop_tokens(drop_tokens)?;
Ok(())
}

fn handle_message(&mut self, message: DaemonRequest) -> eyre::Result<()> {
match message {
DaemonRequest::Register { .. } => {
let reply = DaemonReply::Result(Err("unexpected register message".into()));
self.send_reply(&reply)?;
}
DaemonRequest::Stopped => self.process_daemon_event(DaemonNodeEvent::Stopped)?,
DaemonRequest::CloseOutputs(outputs) => {
self.process_daemon_event(DaemonNodeEvent::CloseOutputs(outputs))?
}
DaemonRequest::PrepareOutputMessage {
output_id,
metadata,
data_len,
} => {
let (reply_sender, reply) = oneshot::channel();
let event = shared_mem_handler::NodeEvent::PrepareOutputMessage {
dataflow_id: self.dataflow_id,
node_id: self.node_id.clone(),
output_id,
metadata,
data_len,
reply_sender,
};
self.send_shared_memory_event(event)?;
let reply = reply
.blocking_recv()
.wrap_err("failed to receive prepare output reply")?;
// tracing::debug!("prepare latency: {:?}", start.elapsed()?);
self.send_reply(&reply)?;
}
DaemonRequest::SendPreparedMessage { id } => {
let (reply_sender, reply) = oneshot::channel();
let event = shared_mem_handler::NodeEvent::SendPreparedMessage { id, reply_sender };
self.send_shared_memory_event(event)?;
self.send_reply(
&reply
.blocking_recv()
.wrap_err("failed to receive send output reply")?,
)?;
}
DaemonRequest::SendEmptyMessage {
output_id,
metadata,
} => {
// let elapsed = metadata.timestamp().get_time().to_system_time().elapsed()?;
// tracing::debug!("listener SendEmptyMessage: {elapsed:?}");
let event = crate::Event::ShmemHandler(crate::ShmemHandlerEvent::SendOut {
dataflow_id: self.dataflow_id,
node_id: self.node_id.clone(),
output_id,
metadata,
data: None,
});
let result = self
.send_daemon_event(event)
.map_err(|_| "failed to receive send_empty_message reply".to_owned());
self.send_reply(&DaemonReply::Result(result))?;
}
DaemonRequest::Subscribe => {
let (tx, rx) = flume::bounded(100);
self.process_daemon_event(DaemonNodeEvent::Subscribe { event_sender: tx })?;
self.subscribed_events = Some(rx);
}
DaemonRequest::NextEvent { drop_tokens } => {
self.report_drop_tokens(drop_tokens)?;

// try to take the latest queued event first
let queued_event = self.queue.pop_front().map(DaemonReply::NodeEvent);
let reply = queued_event.unwrap_or_else(|| {
match self.subscribed_events.as_mut() {
// wait for next event
Some(events) => match events.recv() {
Ok(event) => DaemonReply::NodeEvent(event),
Err(flume::RecvError::Disconnected) => DaemonReply::Closed,
},
None => {
DaemonReply::Result(Err("Ignoring event request because no subscribe \
message was sent yet"
.into()))
}
}
});

self.send_reply(&reply)?;
}
}
Ok(())
}

fn report_drop_tokens(
&mut self,
drop_tokens: Vec<dora_core::daemon_messages::DropToken>,
) -> eyre::Result<()> {
if !drop_tokens.is_empty() {
let drop_event = shared_mem_handler::NodeEvent::Drop(DropEvent {
tokens: drop_tokens,
});
self.send_shared_memory_event(drop_event)?;
}
Ok(())
}

fn process_daemon_event(&mut self, event: DaemonNodeEvent) -> eyre::Result<()> {
// send NodeEvent to daemon main loop
let (reply_tx, reply) = oneshot::channel();
let event = Event::Node {
dataflow_id: self.dataflow_id.clone(),
node_id: self.node_id.clone(),
event,
reply_sender: reply_tx,
};
self.daemon_tx
.blocking_send(event)
.map_err(|_| eyre!("failed to send event to daemon"))?;
let reply = reply
.blocking_recv()
.map_err(|_| eyre!("failed to receive reply from daemon"))?;
self.send_reply(&reply)?;
Ok(())
}

fn send_reply(&mut self, reply: &DaemonReply) -> eyre::Result<()> {
self.server
.send_reply(&reply)
.wrap_err("failed to send reply to node")
}

fn send_shared_memory_event(&self, event: shared_mem_handler::NodeEvent) -> eyre::Result<()> {
self.shmem_handler_tx
.send(event)
.map_err(|_| eyre!("failed to send event to shared_mem_handler"))
}

fn send_daemon_event(&self, event: crate::Event) -> eyre::Result<()> {
self.daemon_tx
.blocking_send(event)
.map_err(|_| eyre!("failed to send event to daemon"))
}
}

+ 368
- 0
binaries/daemon/src/listener/tcp.rs View File

@@ -0,0 +1,368 @@
use std::collections::VecDeque;

use crate::{
shared_mem_handler,
tcp_utils::{tcp_receive, tcp_send},
DaemonNodeEvent, Event,
};
use dora_core::{
config::NodeId,
daemon_messages::{DaemonReply, DaemonRequest, DataflowId, DropEvent, NodeEvent},
};
use eyre::{eyre, Context};
use tokio::{
net::{TcpListener, TcpStream},
sync::{mpsc, oneshot},
};

#[tracing::instrument(skip(listener, daemon_tx, shmem_handler_tx))]
pub async fn listener_loop(
listener: TcpListener,
daemon_tx: mpsc::Sender<Event>,
shmem_handler_tx: flume::Sender<shared_mem_handler::NodeEvent>,
) {
loop {
match listener
.accept()
.await
.wrap_err("failed to accept new connection")
{
Err(err) => {
tracing::info!("{err}");
}
Ok((connection, _)) => {
tokio::spawn(handle_connection_loop(
connection,
daemon_tx.clone(),
shmem_handler_tx.clone(),
));
}
}
}
}

#[tracing::instrument(skip(connection, daemon_tx, shmem_handler_tx))]
pub async fn handle_connection_loop(
mut connection: TcpStream,
daemon_tx: mpsc::Sender<Event>,
shmem_handler_tx: flume::Sender<shared_mem_handler::NodeEvent>,
) {
if let Err(err) = connection.set_nodelay(true) {
tracing::warn!("failed to set nodelay for connection: {err}");
}

// receive the first message
let message = match receive_message(&mut connection)
.await
.wrap_err("failed to receive register message")
{
Ok(Some(m)) => m,
Ok(None) => {
tracing::info!("channel disconnected before register message");
return;
} // disconnected
Err(err) => {
tracing::info!("{err:?}");
return;
}
};

match message {
DaemonRequest::Register {
dataflow_id,
node_id,
} => {
let reply = DaemonReply::Result(Ok(()));
match send_reply(&mut connection, &reply)
.await
.wrap_err("failed to send register reply")
{
Ok(()) => {
let mut listener = Listener {
dataflow_id,
node_id,
connection,
daemon_tx,
shmem_handler_tx,
subscribed_events: None,
max_queue_len: 10, // TODO: make this configurable
queue: VecDeque::new(),
};
match listener.run().await.wrap_err("listener failed") {
Ok(()) => {}
Err(err) => tracing::error!("{err:?}"),
}
}
Err(err) => {
tracing::warn!("{err:?}");
}
}
}
_ => {
let reply = DaemonReply::Result(Err("must send register message first".into()));
if let Err(err) = send_reply(&mut connection, &reply)
.await
.wrap_err("failed to send reply")
{
tracing::warn!("{err:?}");
}
}
}
}

async fn receive_message(connection: &mut TcpStream) -> eyre::Result<Option<DaemonRequest>> {
let raw = match tcp_receive(connection).await {
Ok(raw) => raw,
Err(err) => match err.kind() {
std::io::ErrorKind::UnexpectedEof | std::io::ErrorKind::ConnectionAborted => {
return Ok(None)
}
other => {
return Err(err)
.context("unexpected I/O error while trying to receive DaemonRequest")
}
},
};
bincode::deserialize(&raw)
.wrap_err("failed to deserialize DaemonRequest")
.map(Some)
}

async fn send_reply(connection: &mut TcpStream, message: &DaemonReply) -> eyre::Result<()> {
let serialized = bincode::serialize(&message).wrap_err("failed to serialize DaemonReply")?;
tcp_send(connection, &serialized)
.await
.wrap_err("failed to send DaemonReply")?;
Ok(())
}

struct Listener {
dataflow_id: DataflowId,
node_id: NodeId,
connection: TcpStream,
daemon_tx: mpsc::Sender<Event>,
shmem_handler_tx: flume::Sender<shared_mem_handler::NodeEvent>,
subscribed_events: Option<flume::Receiver<NodeEvent>>,
max_queue_len: usize,
queue: VecDeque<NodeEvent>,
}

impl Listener {
async fn run(&mut self) -> eyre::Result<()> {
loop {
// receive the next node message
let message = match receive_message(&mut self.connection)
.await
.wrap_err("failed to receive DaemonRequest")
{
Ok(Some(m)) => m,
Ok(None) => {
tracing::info!(
"channel disconnected: {}/{}",
self.dataflow_id,
self.node_id
);
break;
} // disconnected
Err(err) => {
tracing::warn!("{err:?}");
continue;
}
};

// handle incoming events
self.handle_events()?;

self.handle_message(message).await?;
}
Ok(())
}

fn handle_events(&mut self) -> eyre::Result<()> {
if let Some(events) = &mut self.subscribed_events {
while let Ok(event) = events.try_recv() {
self.queue.push_back(event);
}

// drop oldest input events to maintain max queue length queue
let input_event_count = self
.queue
.iter()
.filter(|e| matches!(e, NodeEvent::Input { .. }))
.count();
let drop_n = input_event_count.saturating_sub(self.max_queue_len);
self.drop_oldest_inputs(drop_n)?;
}
Ok(())
}

fn drop_oldest_inputs(&mut self, number: usize) -> Result<(), eyre::ErrReport> {
let mut drop_tokens = Vec::new();
for i in 0..number {
// find index of oldest input event
let index = self
.queue
.iter()
.position(|e| matches!(e, NodeEvent::Input { .. }))
.expect(&format!("no input event found in drop iteration {i}"));

// remove that event
if let Some(event) = self.queue.remove(index) {
if let NodeEvent::Input {
data: Some(data), ..
} = event
{
drop_tokens.push(data.drop_token);
}
}
}
self.report_drop_tokens(drop_tokens)?;
Ok(())
}

async fn handle_message(&mut self, message: DaemonRequest) -> eyre::Result<()> {
match message {
DaemonRequest::Register { .. } => {
let reply = DaemonReply::Result(Err("unexpected register message".into()));
self.send_reply(&reply).await?;
}
DaemonRequest::Stopped => self.process_daemon_event(DaemonNodeEvent::Stopped).await?,
DaemonRequest::CloseOutputs(outputs) => {
self.process_daemon_event(DaemonNodeEvent::CloseOutputs(outputs))
.await?
}
DaemonRequest::PrepareOutputMessage {
output_id,
metadata,
data_len,
} => {
let (reply_sender, reply) = oneshot::channel();
let event = shared_mem_handler::NodeEvent::PrepareOutputMessage {
dataflow_id: self.dataflow_id,
node_id: self.node_id.clone(),
output_id,
metadata,
data_len,
reply_sender,
};
self.send_shared_memory_event(event)?;
let reply = reply
.await
.wrap_err("failed to receive prepare output reply")?;
// tracing::debug!("prepare latency: {:?}", start.elapsed()?);
self.send_reply(&reply).await?;
}
DaemonRequest::SendPreparedMessage { id } => {
let (reply_sender, reply) = oneshot::channel();
let event = shared_mem_handler::NodeEvent::SendPreparedMessage { id, reply_sender };
self.send_shared_memory_event(event)?;
self.send_reply(
&reply
.await
.wrap_err("failed to receive send output reply")?,
)
.await?;
}
DaemonRequest::SendEmptyMessage {
output_id,
metadata,
} => {
// let elapsed = metadata.timestamp().get_time().to_system_time().elapsed()?;
// tracing::debug!("listener SendEmptyMessage: {elapsed:?}");
let event = crate::Event::ShmemHandler(crate::ShmemHandlerEvent::SendOut {
dataflow_id: self.dataflow_id,
node_id: self.node_id.clone(),
output_id,
metadata,
data: None,
});
let result = self
.send_daemon_event(event)
.await
.map_err(|_| "failed to receive send_empty_message reply".to_owned());
self.send_reply(&DaemonReply::Result(result)).await?;
}
DaemonRequest::Subscribe => {
let (tx, rx) = flume::bounded(100);
self.process_daemon_event(DaemonNodeEvent::Subscribe { event_sender: tx })
.await?;
self.subscribed_events = Some(rx);
}
DaemonRequest::NextEvent { drop_tokens } => {
self.report_drop_tokens(drop_tokens)?;

// try to take the latest queued event first
let queued_event = self.queue.pop_front().map(DaemonReply::NodeEvent);
let reply = queued_event.unwrap_or_else(|| {
match self.subscribed_events.as_mut() {
// wait for next event
Some(events) => match events.recv() {
Ok(event) => DaemonReply::NodeEvent(event),
Err(flume::RecvError::Disconnected) => DaemonReply::Closed,
},
None => {
DaemonReply::Result(Err("Ignoring event request because no subscribe \
message was sent yet"
.into()))
}
}
});

self.send_reply(&reply).await?;
}
}
Ok(())
}

fn report_drop_tokens(
&mut self,
drop_tokens: Vec<dora_core::daemon_messages::DropToken>,
) -> eyre::Result<()> {
if !drop_tokens.is_empty() {
let drop_event = shared_mem_handler::NodeEvent::Drop(DropEvent {
tokens: drop_tokens,
});
self.send_shared_memory_event(drop_event)?;
}
Ok(())
}

async fn process_daemon_event(&mut self, event: DaemonNodeEvent) -> eyre::Result<()> {
// send NodeEvent to daemon main loop
let (reply_tx, reply) = oneshot::channel();
let event = Event::Node {
dataflow_id: self.dataflow_id.clone(),
node_id: self.node_id.clone(),
event,
reply_sender: reply_tx,
};
self.daemon_tx
.send(event)
.await
.map_err(|_| eyre!("failed to send event to daemon"))?;
let reply = reply
.await
.map_err(|_| eyre!("failed to receive reply from daemon"))?;
self.send_reply(&reply).await?;
Ok(())
}

async fn send_reply(&mut self, reply: &DaemonReply) -> eyre::Result<()> {
send_reply(&mut self.connection, reply)
.await
.wrap_err("failed to send reply to node")
}

fn send_shared_memory_event(&self, event: shared_mem_handler::NodeEvent) -> eyre::Result<()> {
self.shmem_handler_tx
.send(event)
.map_err(|_| eyre!("failed to send event to shared_mem_handler"))
}

async fn send_daemon_event(&self, event: crate::Event) -> eyre::Result<()> {
self.daemon_tx
.send(event)
.await
.map_err(|_| eyre!("failed to send event to daemon"))
}
}

+ 85
- 39
binaries/daemon/src/spawn.rs View File

@@ -1,17 +1,18 @@
use crate::{
listener::listener_loop, runtime_node_inputs, runtime_node_outputs, shared_mem_handler,
DoraEvent, Event,
listener, runtime_node_inputs, runtime_node_outputs, shared_mem_handler, DoraEvent, Event,
};
use dora_core::{
config::NodeRunConfig,
daemon_messages::{DataflowId, NodeConfig, RuntimeConfig},
config::{NodeId, NodeRunConfig},
daemon_messages::{
DaemonCommunication, DaemonCommunicationConfig, DataflowId, NodeConfig, RuntimeConfig,
},
descriptor::{resolve_path, source_is_url, OperatorSource, ResolvedNode},
};
use dora_download::download_file;
use eyre::{eyre, WrapErr};
use shared_memory_server::{ShmemConf, ShmemServer};
use std::{env::consts::EXE_EXTENSION, path::Path, process::Stdio};
use tokio::sync::mpsc;
use std::{env::consts::EXE_EXTENSION, net::Ipv4Addr, path::Path, process::Stdio};
use tokio::{net::TcpListener, sync::mpsc};

pub async fn spawn_node(
dataflow_id: DataflowId,
@@ -19,38 +20,19 @@ pub async fn spawn_node(
node: ResolvedNode,
daemon_tx: mpsc::Sender<Event>,
shmem_handler_tx: flume::Sender<shared_mem_handler::NodeEvent>,
config: DaemonCommunicationConfig,
) -> eyre::Result<()> {
let node_id = node.id.clone();
tracing::debug!("Spawning node `{dataflow_id}/{node_id}`");

let daemon_control_region = ShmemConf::new()
.size(4096)
.create()
.wrap_err("failed to allocate daemon_control_region")?;
let daemon_events_region = ShmemConf::new()
.size(4096)
.create()
.wrap_err("failed to allocate daemon_events_region")?;
let daemon_control_region_id = daemon_control_region.get_os_id().to_owned();
let daemon_events_region_id = daemon_events_region.get_os_id().to_owned();
{
let server = unsafe { ShmemServer::new(daemon_control_region) }
.wrap_err("failed to create control server")?;
let daemon_tx = daemon_tx.clone();
let shmem_handler_tx = shmem_handler_tx.clone();
tokio::task::spawn_blocking(move || listener_loop(server, daemon_tx, shmem_handler_tx));
}
{
let server = unsafe { ShmemServer::new(daemon_events_region) }
.wrap_err("failed to create events server")?;
let event_loop_node_id = format!("{dataflow_id}/{node_id}");
let daemon_tx = daemon_tx.clone();
let shmem_handler_tx = shmem_handler_tx.clone();
tokio::task::spawn_blocking(move || {
listener_loop(server, daemon_tx, shmem_handler_tx);
tracing::debug!("event listener loop finished for `{event_loop_node_id}`");
});
}
let daemon_communication = daemon_communication_config(
&dataflow_id,
&node_id,
&daemon_tx,
&shmem_handler_tx,
config,
)
.await?;

let mut child = match node.kind {
dora_core::descriptor::CoreNodeKind::Custom(n) => {
@@ -64,7 +46,7 @@ pub async fn spawn_node(
.wrap_err("failed to download custom node")?;
target_path.clone()
} else {
resolve_path(&n.source, &working_dir)
resolve_path(&n.source, working_dir)
.wrap_err_with(|| format!("failed to resolve node source `{}`", n.source))?
};

@@ -76,8 +58,7 @@ pub async fn spawn_node(
dataflow_id,
node_id: node_id.clone(),
run_config: n.run_config.clone(),
daemon_control_region_id,
daemon_events_region_id,
daemon_communication,
};
if let Some(args) = &n.args {
command.args(args.split_ascii_whitespace());
@@ -133,8 +114,7 @@ pub async fn spawn_node(
inputs: runtime_node_inputs(&n),
outputs: runtime_node_outputs(&n),
},
daemon_control_region_id,
daemon_events_region_id,
daemon_communication,
},
operators: n.operators,
};
@@ -170,3 +150,69 @@ pub async fn spawn_node(
});
Ok(())
}

async fn daemon_communication_config(
dataflow_id: &DataflowId,
node_id: &NodeId,
daemon_tx: &mpsc::Sender<Event>,
shmem_handler_tx: &flume::Sender<shared_mem_handler::NodeEvent>,
config: DaemonCommunicationConfig,
) -> eyre::Result<DaemonCommunication> {
match config {
DaemonCommunicationConfig::Tcp => {
let localhost = Ipv4Addr::new(127, 0, 0, 1);
let socket = match TcpListener::bind((localhost, 0)).await {
Ok(socket) => socket,
Err(err) => {
return Err(
eyre::Report::new(err).wrap_err("failed to create local TCP listener")
)
}
};
let socket_addr = socket
.local_addr()
.wrap_err("failed to get local addr of socket")?;

Ok(DaemonCommunication::Tcp { socket_addr })
}
DaemonCommunicationConfig::Shmem => {
let daemon_control_region = ShmemConf::new()
.size(4096)
.create()
.wrap_err("failed to allocate daemon_control_region")?;
let daemon_events_region = ShmemConf::new()
.size(4096)
.create()
.wrap_err("failed to allocate daemon_events_region")?;
let daemon_control_region_id = daemon_control_region.get_os_id().to_owned();
let daemon_events_region_id = daemon_events_region.get_os_id().to_owned();

{
let server = unsafe { ShmemServer::new(daemon_control_region) }
.wrap_err("failed to create control server")?;
let daemon_tx = daemon_tx.clone();
let shmem_handler_tx = shmem_handler_tx.clone();
tokio::task::spawn_blocking(move || {
listener::shmem::listener_loop(server, daemon_tx, shmem_handler_tx)
});
}

{
let server = unsafe { ShmemServer::new(daemon_events_region) }
.wrap_err("failed to create events server")?;
let event_loop_node_id = format!("{dataflow_id}/{node_id}");
let daemon_tx = daemon_tx.clone();
let shmem_handler_tx = shmem_handler_tx.clone();
tokio::task::spawn_blocking(move || {
listener::shmem::listener_loop(server, daemon_tx, shmem_handler_tx);
tracing::debug!("event listener loop finished for `{event_loop_node_id}`");
});
}

Ok(DaemonCommunication::Shmem {
daemon_control_region_id,
daemon_events_region_id,
})
}
}
}

+ 27
- 4
libraries/core/src/daemon_messages.rs View File

@@ -1,8 +1,8 @@
use std::{collections::BTreeMap, path::PathBuf};
use std::{net::SocketAddr, path::PathBuf};

use crate::{
config::{DataId, NodeId, NodeRunConfig},
descriptor::{self, OperatorDefinition, ResolvedNode},
descriptor::{OperatorDefinition, ResolvedNode},
};
use dora_message::Metadata;
use uuid::Uuid;
@@ -12,8 +12,18 @@ pub struct NodeConfig {
pub dataflow_id: DataflowId,
pub node_id: NodeId,
pub run_config: NodeRunConfig,
pub daemon_control_region_id: SharedMemoryId,
pub daemon_events_region_id: SharedMemoryId,
pub daemon_communication: DaemonCommunication,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum DaemonCommunication {
Shmem {
daemon_control_region_id: SharedMemoryId,
daemon_events_region_id: SharedMemoryId,
},
Tcp {
socket_addr: SocketAddr,
},
}

#[derive(Debug, serde::Serialize, serde::Deserialize)]
@@ -117,4 +127,17 @@ pub struct SpawnDataflowNodes {
pub dataflow_id: DataflowId,
pub working_dir: PathBuf,
pub nodes: Vec<ResolvedNode>,
pub daemon_communication: DaemonCommunicationConfig,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum DaemonCommunicationConfig {
Tcp,
Shmem,
}

impl Default for DaemonCommunicationConfig {
fn default() -> Self {
Self::Shmem // TODO change to TCP
}
}

+ 6
- 1
libraries/core/src/descriptor/mod.rs View File

@@ -1,4 +1,7 @@
use crate::config::{CommunicationConfig, DataId, InputMapping, NodeId, NodeRunConfig, OperatorId};
use crate::{
config::{CommunicationConfig, DataId, InputMapping, NodeId, NodeRunConfig, OperatorId},
daemon_messages::DaemonCommunicationConfig,
};
use eyre::{bail, Result};
use serde::{Deserialize, Serialize};
use std::{
@@ -18,6 +21,8 @@ pub struct Descriptor {
#[serde(with = "serde_yaml::with::singleton_map")]
pub communication: CommunicationConfig,
pub nodes: Vec<Node>,
#[serde(default)]
pub daemon_config: DaemonCommunicationConfig,
}
pub const SINGLE_OPERATOR_DEFAULT_ID: &str = "op";



Loading…
Cancel
Save