Browse Source

Use drop tokens and reference counting to free shared memory again after usage

tags/v0.2.0-candidate
Philipp Oppermann 3 years ago
parent
commit
a2cc06ca50
Failed to extract signature
5 changed files with 192 additions and 117 deletions
  1. +115
    -96
      apis/rust/node/src/daemon.rs
  2. +26
    -4
      binaries/daemon/src/listener.rs
  3. +29
    -6
      binaries/daemon/src/main.rs
  4. +6
    -6
      binaries/daemon/src/tcp_utils.rs
  5. +16
    -5
      libraries/core/src/daemon_messages.rs

+ 115
- 96
apis/rust/node/src/daemon.rs View File

@@ -7,7 +7,7 @@ use std::{

use dora_core::{
config::{DataId, NodeId},
daemon_messages::{ControlRequest, DataflowId, NodeEvent},
daemon_messages::{ControlRequest, DataflowId, DropEvent, NodeEvent},
};
use dora_message::Metadata;
use eyre::{bail, eyre, Context};
@@ -21,14 +21,14 @@ pub struct DaemonConnection {
impl DaemonConnection {
pub fn init(dataflow_id: DataflowId, node_id: &NodeId, daemon_port: u16) -> eyre::Result<Self> {
let daemon_addr = (Ipv4Addr::new(127, 0, 0, 1), daemon_port).into();
let control_stream = init_control_stream(daemon_addr, dataflow_id, &node_id)
let control_channel = ControlChannel::init(daemon_addr, dataflow_id, node_id)
.wrap_err("failed to init control stream")?;

let event_stream = init_event_stream(daemon_addr, dataflow_id, &node_id)
let event_stream = EventStream::init(daemon_addr, dataflow_id, node_id)
.wrap_err("failed to init event stream")?;

Ok(Self {
control_channel: ControlChannel(control_stream),
control_channel,
event_stream,
})
}
@@ -37,6 +37,35 @@ impl DaemonConnection {
pub struct ControlChannel(TcpStream);

impl ControlChannel {
fn init(
daemon_addr: SocketAddr,
dataflow_id: DataflowId,
node_id: &NodeId,
) -> eyre::Result<Self> {
let mut control_stream =
TcpStream::connect(daemon_addr).wrap_err("failed to connect to dora-daemon")?;
control_stream
.set_nodelay(true)
.wrap_err("failed to set TCP_NODELAY")?;
tcp_send(
&mut control_stream,
&ControlRequest::Register {
dataflow_id,
node_id: node_id.clone(),
},
)
.wrap_err("failed to send register request to dora-daemon")?;
match tcp_receive(&mut control_stream)
.wrap_err("failed to receive register reply from dora-daemon")?
{
dora_core::daemon_messages::ControlReply::Result(result) => result
.map_err(|e| eyre!(e))
.wrap_err("failed to register node with dora-daemon")?,
other => bail!("unexpected register reply: {other:?}"),
}
Ok(Self(control_stream))
}

pub fn report_stop(&mut self) -> eyre::Result<()> {
tcp_send(&mut self.0, &ControlRequest::Stopped)
.wrap_err("failed to send subscribe request to dora-daemon")?;
@@ -101,8 +130,84 @@ pub struct EventStream {
}

impl EventStream {
fn init(
daemon_addr: SocketAddr,
dataflow_id: DataflowId,
node_id: &NodeId,
) -> eyre::Result<Self> {
let mut event_stream =
TcpStream::connect(daemon_addr).wrap_err("failed to connect to dora-daemon")?;
event_stream
.set_nodelay(true)
.wrap_err("failed to set TCP_NODELAY")?;
tcp_send(
&mut event_stream,
&ControlRequest::Subscribe {
dataflow_id,
node_id: node_id.clone(),
},
)
.wrap_err("failed to send subscribe request to dora-daemon")?;
match tcp_receive(&mut event_stream)
.wrap_err("failed to receive subscribe reply from dora-daemon")?
{
dora_core::daemon_messages::ControlReply::Result(result) => result
.map_err(|e| eyre!(e))
.wrap_err("failed to create subscription with dora-daemon")?,
other => bail!("unexpected subscribe reply: {other:?}"),
}

let (tx, rx) = flume::bounded(1);
std::thread::spawn(move || loop {
let event: NodeEvent = match tcp_receive(&mut event_stream) {
Ok(event) => event,
Err(err) if err.kind() == ErrorKind::UnexpectedEof => break,
Err(err) => {
let err = eyre!(err).wrap_err("failed to receive incoming event");
tracing::warn!("{err:?}");
continue;
}
};
let drop_token = match &event {
NodeEvent::Input {
data: Some(data), ..
} => Some(data.drop_token.clone()),
NodeEvent::Stop
| NodeEvent::InputClosed { .. }
| NodeEvent::Input { data: None, .. } => None,
};

let (drop_tx, drop_rx) = std::sync::mpsc::channel();
match tx.send((event, drop_tx)) {
Ok(()) => {}
Err(_) => {
// receiving end of channel was closed
break;
}
}

match drop_rx.recv_timeout(Duration::from_secs(30)) {
Ok(()) => panic!("Node API should not send anything on ACK channel"),
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
tracing::warn!("timeout while waiting for input ACK");
}
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {} // expected result
}

if let Some(token) = drop_token {
let message = DropEvent { token };
if let Err(err) = tcp_send(&mut event_stream, &message) {
tracing::warn!("failed to send drop token: {err}");
break;
}
}
});

Ok(EventStream { receiver: rx })
}

pub fn recv(&mut self) -> Option<Event> {
let (node_event, ack) = match self.receiver.recv() {
let (node_event, drop_sender) = match self.receiver.recv() {
Ok(d) => d,
Err(flume::RecvError::Disconnected) => return None,
};
@@ -117,7 +222,10 @@ impl EventStream {
Ok(mapped) => Event::Input {
id,
metadata,
data: mapped.map(|data| Data { data, _ack: ack }),
data: mapped.map(|data| Data {
data,
_drop: drop_sender,
}),
},
Err(err) => Event::Error(format!("{err:?}")),
}
@@ -132,66 +240,6 @@ pub struct MessageSample {
pub id: String,
}

fn init_event_stream(
daemon_addr: SocketAddr,
dataflow_id: DataflowId,
node_id: &NodeId,
) -> eyre::Result<EventStream> {
let mut event_stream =
TcpStream::connect(daemon_addr).wrap_err("failed to connect to dora-daemon")?;
event_stream
.set_nodelay(true)
.wrap_err("failed to set TCP_NODELAY")?;
tcp_send(
&mut event_stream,
&ControlRequest::Subscribe {
dataflow_id,
node_id: node_id.clone(),
},
)
.wrap_err("failed to send subscribe request to dora-daemon")?;
match tcp_receive(&mut event_stream)
.wrap_err("failed to receive subscribe reply from dora-daemon")?
{
dora_core::daemon_messages::ControlReply::Result(result) => result
.map_err(|e| eyre!(e))
.wrap_err("failed to create subscription with dora-daemon")?,
other => bail!("unexpected subscribe reply: {other:?}"),
}

let (tx, rx) = flume::bounded(1);
std::thread::spawn(move || loop {
let event: NodeEvent = match tcp_receive(&mut event_stream) {
Ok(event) => event,
Err(err) if err.kind() == ErrorKind::UnexpectedEof => break,
Err(err) => {
let err = eyre!(err).wrap_err("failed to receive incoming event");
tracing::warn!("{err:?}");
continue;
}
};

let (ack_tx, ack_rx) = std::sync::mpsc::channel();
match tx.send((event, ack_tx)) {
Ok(()) => {}
Err(_) => {
// receiving end of channel was closed
break;
}
}

match ack_rx.recv_timeout(Duration::from_secs(30)) {
Ok(()) => panic!("Node API should not send anything on ACK channel"),
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
tracing::warn!("timeout while waiting for input ACK");
}
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {} // expected result
}
});

Ok(EventStream { receiver: rx })
}

#[derive(Debug)]
#[non_exhaustive]
pub enum Event<'a> {
@@ -209,7 +257,7 @@ pub enum Event<'a> {

pub struct Data<'a> {
data: MappedInputData<'a>,
_ack: std::sync::mpsc::Sender<()>,
_drop: std::sync::mpsc::Sender<()>,
}

impl std::ops::Deref for Data<'_> {
@@ -252,35 +300,6 @@ impl std::ops::Deref for MappedInputData<'_> {
}
}

fn init_control_stream(
daemon_addr: SocketAddr,
dataflow_id: DataflowId,
node_id: &NodeId,
) -> eyre::Result<TcpStream> {
let mut control_stream =
TcpStream::connect(daemon_addr).wrap_err("failed to connect to dora-daemon")?;
control_stream
.set_nodelay(true)
.wrap_err("failed to set TCP_NODELAY")?;
tcp_send(
&mut control_stream,
&ControlRequest::Register {
dataflow_id,
node_id: node_id.clone(),
},
)
.wrap_err("failed to send register request to dora-daemon")?;
match tcp_receive(&mut control_stream)
.wrap_err("failed to receive register reply from dora-daemon")?
{
dora_core::daemon_messages::ControlReply::Result(result) => result
.map_err(|e| eyre!(e))
.wrap_err("failed to register node with dora-daemon")?,
other => bail!("unexpected register reply: {other:?}"),
}
Ok(control_stream)
}

fn tcp_send<T: serde::Serialize>(connection: &mut TcpStream, request: &T) -> std::io::Result<()> {
let serialized = serde_json::to_vec(request)?;



+ 26
- 4
binaries/daemon/src/listener.rs View File

@@ -2,7 +2,7 @@ use crate::{
tcp_utils::{tcp_receive, tcp_send},
DaemonNodeEvent, Event,
};
use dora_core::daemon_messages;
use dora_core::daemon_messages::{self, DropEvent};
use eyre::{eyre, Context};
use std::{io::ErrorKind, net::Ipv4Addr};
use tokio::{
@@ -136,16 +136,38 @@ pub async fn handle_connection(mut connection: TcpStream, events_tx: mpsc::Sende

// enter subscribe loop after receiving a subscribe message
if let Some(events) = enter_subscribe_loop {
subscribe_loop(connection, events).await;
subscribe_loop(connection, events, events_tx).await;
break; // the subscribe loop only exits when the connection was closed
}
}
}

async fn subscribe_loop(
mut connection: TcpStream,
connection: TcpStream,
events: flume::Receiver<daemon_messages::NodeEvent>,
events_tx: mpsc::Sender<Event>,
) {
let (mut rx, mut tx) = connection.into_split();

tokio::spawn(async move {
loop {
let Ok(raw) = tcp_receive(&mut rx).await else {
break;
};

let event: DropEvent = match serde_json::from_slice(&raw) {
Ok(e) => e,
Err(err) => {
tracing::error!("Failed to parse incoming message: {err}");
continue;
}
};
if events_tx.send(Event::Drop(event)).await.is_err() {
break;
}
}
});

while let Some(event) = events.stream().next().await {
let message = match serde_json::to_vec(&event) {
Ok(m) => m,
@@ -155,7 +177,7 @@ async fn subscribe_loop(
continue;
}
};
match tcp_send(&mut connection, &message).await {
match tcp_send(&mut tx, &message).await {
Ok(()) => {}
Err(err)
if err.kind() == ErrorKind::UnexpectedEof


+ 29
- 6
binaries/daemon/src/main.rs View File

@@ -1,6 +1,9 @@
use dora_core::{
config::{DataId, InputMapping, NodeId},
daemon_messages::{self, ControlReply, DaemonCoordinatorEvent, DataflowId, SpawnDataflowNodes},
daemon_messages::{
self, ControlReply, DaemonCoordinatorEvent, DataflowId, DropEvent, DropToken,
SpawnDataflowNodes,
},
topics::DORA_COORDINATOR_PORT_DEFAULT,
};
use dora_message::uhlc::HLC;
@@ -11,6 +14,7 @@ use shared_memory::{Shmem, ShmemConf};
use std::{
collections::{BTreeMap, BTreeSet, HashMap},
net::{Ipv4Addr, SocketAddr},
rc::Rc,
time::Duration,
};
use tokio::{
@@ -47,7 +51,7 @@ async fn run() -> eyre::Result<()> {
struct Daemon {
port: u16,
uninit_shared_memory: HashMap<String, (DataId, dora_message::Metadata<'static>, Shmem)>,
sent_out_shared_memory: HashMap<String, Shmem>,
sent_out_shared_memory: HashMap<DropToken, Rc<Shmem>>,

running: HashMap<DataflowId, RunningDataflow>,

@@ -117,6 +121,18 @@ impl Daemon {
.await?
}
Event::Dora(event) => self.handle_dora_event(event).await?,
Event::Drop(DropEvent { token }) => {
match self.sent_out_shared_memory.remove(&token) {
Some(rc) => {
if let Ok(_shmem) = Rc::try_unwrap(rc) {
tracing::trace!(
"freeing shared memory after receiving last drop token"
)
}
}
None => tracing::warn!("received unknown drop token {token:?}"),
}
}
}
}

@@ -243,6 +259,8 @@ impl Daemon {
.remove(&id)
.ok_or_else(|| eyre!("invalid shared memory id"))?;

let memory = Rc::new(memory);

let dataflow = self
.running
.get_mut(&dataflow_id)
@@ -259,17 +277,24 @@ impl Daemon {
let mut closed = Vec::new();
for (receiver_id, input_id) in local_receivers {
if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) {
let drop_token = DropToken::generate();
if channel
.send_async(daemon_messages::NodeEvent::Input {
id: input_id.clone(),
metadata: metadata.clone(),
data: Some(unsafe { daemon_messages::InputData::new(id.clone()) }),
data: Some(daemon_messages::InputData {
shared_memory_id: id.clone(),
drop_token: drop_token.clone(),
}),
})
.await
.is_err()
{
closed.push(receiver_id);
}
// keep shared memory ptr in order to free it once all subscribers are done
self.sent_out_shared_memory
.insert(drop_token, memory.clone());
}
}
for id in closed {
@@ -279,9 +304,6 @@ impl Daemon {
// TODO send `data` via network to all remove receivers
let data = std::ptr::slice_from_raw_parts(memory.as_ptr(), memory.len());

// keep shared memory ptr in order to free it once all subscribers are done
self.sent_out_shared_memory.insert(id, memory);

let _ = reply_sender.send(ControlReply::Result(Ok(())));
}
DaemonNodeEvent::Stopped => {
@@ -417,6 +439,7 @@ pub enum Event {
},
Coordinator(DaemonCoordinatorEvent),
Dora(DoraEvent),
Drop(DropEvent),
}

#[derive(Debug)]


+ 6
- 6
binaries/daemon/src/tcp_utils.rs View File

@@ -1,16 +1,16 @@
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpStream,
};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};

pub async fn tcp_send(connection: &mut TcpStream, message: &[u8]) -> std::io::Result<()> {
pub async fn tcp_send(
connection: &mut (impl AsyncWrite + Unpin),
message: &[u8],
) -> std::io::Result<()> {
let len_raw = (message.len() as u64).to_le_bytes();
connection.write_all(&len_raw).await?;
connection.write_all(message).await?;
Ok(())
}

pub async fn tcp_receive(connection: &mut TcpStream) -> std::io::Result<Vec<u8>> {
pub async fn tcp_receive(connection: &mut (impl AsyncRead + Unpin)) -> std::io::Result<Vec<u8>> {
let reply_len = {
let mut raw = [0; 8];
connection.read_exact(&mut raw).await?;


+ 16
- 5
libraries/core/src/daemon_messages.rs View File

@@ -58,16 +58,27 @@ pub enum NodeEvent {
}

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct InputData {
pub shared_memory_id: SharedMemoryId,
pub struct DropEvent {
pub token: DropToken,
}

impl InputData {
pub unsafe fn new(shared_memory_id: SharedMemoryId) -> Self {
Self { shared_memory_id }
#[derive(
Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize,
)]
pub struct DropToken(Uuid);

impl DropToken {
pub fn generate() -> Self {
Self(Uuid::new_v4())
}
}

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct InputData {
pub shared_memory_id: SharedMemoryId,
pub drop_token: DropToken,
}

#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub enum DaemonCoordinatorEvent {
Spawn(SpawnDataflowNodes),


Loading…
Cancel
Save