Browse Source

Report when shared memory region is mapped to allow faster cleanup

The shared memory region can be safely removed by the sender once it's mapped in the receiver. The OS will just delete the file handle associated with the shared memory region, but keep the data alive until it has been unmapped from all address spaces.

By notifying the sender that a message has been mapped to the address space we enable faster cleanup on exit. The sender can safely close all of its shared memory regions once all of its sent messages are at least mapped. So it does not need to wait until all messages have been _dropped_ anymore, which can take considerably longer, especially if the Python GC is involved.

This commit modifies the message format, so we need to bump the version of the `dora-message` crate to `0.5.0`.
report-when-data-is-mapped
Philipp Oppermann haixuantao 1 year ago
parent
commit
b3fd01848c
10 changed files with 211 additions and 125 deletions
  1. +3
    -50
      apis/rust/node/src/event_stream/mod.rs
  2. +2
    -8
      apis/rust/node/src/event_stream/scheduler.rs
  3. +61
    -24
      apis/rust/node/src/event_stream/thread.rs
  4. +21
    -14
      apis/rust/node/src/node/drop_stream.rs
  5. +34
    -7
      apis/rust/node/src/node/mod.rs
  6. +66
    -16
      binaries/daemon/src/lib.rs
  7. +4
    -4
      binaries/daemon/src/node_communication/mod.rs
  8. +16
    -0
      libraries/message/src/common.rs
  9. +1
    -0
      libraries/message/src/daemon_to_node.rs
  10. +3
    -2
      libraries/message/src/node_to_daemon.rs

+ 3
- 50
apis/rust/node/src/event_stream/mod.rs View File

@@ -6,7 +6,7 @@ use std::{
}; };


use dora_message::{ use dora_message::{
daemon_to_node::{DaemonCommunication, DaemonReply, DataMessage, NodeEvent},
daemon_to_node::{DaemonCommunication, DaemonReply},
id::DataId, id::DataId,
node_to_daemon::{DaemonRequest, Timestamped}, node_to_daemon::{DaemonRequest, Timestamped},
DataflowId, DataflowId,
@@ -19,10 +19,7 @@ use futures::{
use futures_timer::Delay; use futures_timer::Delay;
use scheduler::{Scheduler, NON_INPUT_EVENT}; use scheduler::{Scheduler, NON_INPUT_EVENT};


use self::{
event::SharedMemoryData,
thread::{EventItem, EventStreamThreadHandle},
};
use self::thread::{EventItem, EventStreamThreadHandle};
use crate::daemon_connection::DaemonChannel; use crate::daemon_connection::DaemonChannel;
use dora_core::{ use dora_core::{
config::{Input, NodeId}, config::{Input, NodeId},
@@ -199,51 +196,7 @@ impl EventStream {


fn convert_event_item(item: EventItem) -> Event { fn convert_event_item(item: EventItem) -> Event {
match item { match item {
EventItem::NodeEvent { event, ack_channel } => match event {
NodeEvent::Stop => Event::Stop,
NodeEvent::Reload { operator_id } => Event::Reload { operator_id },
NodeEvent::InputClosed { id } => Event::InputClosed { id },
NodeEvent::Input { id, metadata, data } => {
let data = match data {
None => Ok(None),
Some(DataMessage::Vec(v)) => Ok(Some(RawData::Vec(v))),
Some(DataMessage::SharedMemory {
shared_memory_id,
len,
drop_token: _, // handled in `event_stream_loop`
}) => unsafe {
MappedInputData::map(&shared_memory_id, len).map(|data| {
Some(RawData::SharedMemory(SharedMemoryData {
data,
_drop: ack_channel,
}))
})
},
};
let data = data.and_then(|data| {
let raw_data = data.unwrap_or(RawData::Empty);
raw_data
.into_arrow_array(&metadata.type_info)
.map(arrow::array::make_array)
});
match data {
Ok(data) => Event::Input {
id,
metadata,
data: data.into(),
},
Err(err) => Event::Error(format!("{err:?}")),
}
}
NodeEvent::AllInputsClosed => {
let err = eyre!(
"received `AllInputsClosed` event, which should be handled by background task"
);
tracing::error!("{err:?}");
Event::Error(err.wrap_err("internal error").to_string())
}
},

EventItem::NodeEvent { event } => event,
EventItem::FatalError(err) => { EventItem::FatalError(err) => {
Event::Error(format!("fatal event stream error: {err:?}")) Event::Error(format!("fatal event stream error: {err:?}"))
} }


+ 2
- 8
apis/rust/node/src/event_stream/scheduler.rs View File

@@ -2,7 +2,7 @@ use std::collections::{HashMap, VecDeque};


use dora_message::{daemon_to_node::NodeEvent, id::DataId}; use dora_message::{daemon_to_node::NodeEvent, id::DataId};


use super::thread::EventItem;
use super::{thread::EventItem, Event};
pub const NON_INPUT_EVENT: &str = "dora/non_input_event"; pub const NON_INPUT_EVENT: &str = "dora/non_input_event";


// This scheduler will make sure that there is fairness between // This scheduler will make sure that there is fairness between
@@ -40,13 +40,7 @@ impl Scheduler {
pub fn add_event(&mut self, event: EventItem) { pub fn add_event(&mut self, event: EventItem) {
let event_id = match &event { let event_id = match &event {
EventItem::NodeEvent { EventItem::NodeEvent {
event:
NodeEvent::Input {
id,
metadata: _,
data: _,
},
ack_channel: _,
event: Event::Input { id, .. },
} => id, } => id,
_ => &DataId::from(NON_INPUT_EVENT.to_string()), _ => &DataId::from(NON_INPUT_EVENT.to_string()),
}; };


+ 61
- 24
apis/rust/node/src/event_stream/thread.rs View File

@@ -3,6 +3,7 @@ use dora_core::{
uhlc::{self, Timestamp}, uhlc::{self, Timestamp},
}; };
use dora_message::{ use dora_message::{
common::{DataMessage, DropTokenState, DropTokenStatus},
daemon_to_node::{DaemonReply, NodeEvent}, daemon_to_node::{DaemonReply, NodeEvent},
node_to_daemon::{DaemonRequest, DropToken, Timestamped}, node_to_daemon::{DaemonRequest, DropToken, Timestamped},
}; };
@@ -15,6 +16,8 @@ use std::{


use crate::daemon_connection::DaemonChannel; use crate::daemon_connection::DaemonChannel;


use super::{event::SharedMemoryData, Event, MappedInputData, RawData};

pub fn init( pub fn init(
node_id: NodeId, node_id: NodeId,
tx: flume::Sender<EventItem>, tx: flume::Sender<EventItem>,
@@ -28,10 +31,7 @@ pub fn init(


#[derive(Debug)] #[derive(Debug)]
pub enum EventItem { pub enum EventItem {
NodeEvent {
event: NodeEvent,
ack_channel: flume::Sender<()>,
},
NodeEvent { event: super::Event },
FatalError(eyre::Report), FatalError(eyre::Report),
TimeoutError(eyre::Report), TimeoutError(eyre::Report),
} }
@@ -130,25 +130,60 @@ fn event_stream_loop(
if let Err(err) = clock.update_with_timestamp(&timestamp) { if let Err(err) = clock.update_with_timestamp(&timestamp) {
tracing::warn!("failed to update HLC: {err}"); tracing::warn!("failed to update HLC: {err}");
} }
let drop_token = match &inner {
NodeEvent::Input {
data: Some(data), ..
} => data.drop_token(),

let event = match inner {
NodeEvent::Stop => Event::Stop,
NodeEvent::Reload { operator_id } => Event::Reload { operator_id },
NodeEvent::InputClosed { id } => Event::InputClosed { id },
NodeEvent::Input { id, metadata, data } => {
let data = match data {
None => Ok(None),
Some(DataMessage::Vec(v)) => Ok(Some(RawData::Vec(v))),
Some(DataMessage::SharedMemory {
shared_memory_id,
len,
drop_token,
}) => unsafe {
let (drop_tx, drop_rx) = flume::bounded(0);
let data = MappedInputData::map(&shared_memory_id, len).map(|data| {
Some(RawData::SharedMemory(SharedMemoryData {
data,
_drop: drop_tx,
}))
});
drop_tokens.push(DropTokenStatus {
token: drop_token,
state: DropTokenState::Mapped,
});
pending_drop_tokens.push((drop_token, drop_rx, Instant::now(), 1));
data
},
};
let data = data.and_then(|data| {
let raw_data = data.unwrap_or(RawData::Empty);
raw_data
.into_arrow_array(&metadata.type_info)
.map(arrow::array::make_array)
});
match data {
Ok(data) => Event::Input {
id,
metadata,
data: data.into(),
},
Err(err) => Event::Error(format!("{err:?}")),
}
}
NodeEvent::AllInputsClosed => { NodeEvent::AllInputsClosed => {
// close the event stream // close the event stream
tx = None; tx = None;
// skip this internal event // skip this internal event
continue; continue;
} }
_ => None,
}; };


if let Some(tx) = tx.as_ref() { if let Some(tx) = tx.as_ref() {
let (drop_tx, drop_rx) = flume::bounded(0);
match tx.send(EventItem::NodeEvent {
event: inner,
ack_channel: drop_tx,
}) {
match tx.send(EventItem::NodeEvent { event }) {
Ok(()) => {} Ok(()) => {}
Err(send_error) => { Err(send_error) => {
let event = send_error.into_inner(); let event = send_error.into_inner();
@@ -159,12 +194,8 @@ fn event_stream_loop(
break 'outer Ok(()); break 'outer Ok(());
} }
} }

if let Some(token) = drop_token {
pending_drop_tokens.push((token, drop_rx, Instant::now(), 1));
}
} else { } else {
tracing::warn!("dropping event because event `tx` was already closed: `{inner:?}`");
tracing::warn!("dropping event because event `tx` was already closed: `{event:?}`");
} }
} }
}; };
@@ -196,7 +227,7 @@ fn event_stream_loop(


fn handle_pending_drop_tokens( fn handle_pending_drop_tokens(
pending_drop_tokens: &mut Vec<(DropToken, flume::Receiver<()>, Instant, u64)>, pending_drop_tokens: &mut Vec<(DropToken, flume::Receiver<()>, Instant, u64)>,
drop_tokens: &mut Vec<DropToken>,
drop_tokens: &mut Vec<DropTokenStatus>,
) -> eyre::Result<()> { ) -> eyre::Result<()> {
let mut still_pending = Vec::new(); let mut still_pending = Vec::new();
for (token, rx, since, warn) in pending_drop_tokens.drain(..) { for (token, rx, since, warn) in pending_drop_tokens.drain(..) {
@@ -204,7 +235,10 @@ fn handle_pending_drop_tokens(
Ok(()) => return Err(eyre!("Node API should not send anything on ACK channel")), Ok(()) => return Err(eyre!("Node API should not send anything on ACK channel")),
Err(flume::TryRecvError::Disconnected) => { Err(flume::TryRecvError::Disconnected) => {
// the event was dropped -> add the drop token to the list // the event was dropped -> add the drop token to the list
drop_tokens.push(token);
drop_tokens.push(DropTokenStatus {
token,
state: DropTokenState::Dropped,
});
} }
Err(flume::TryRecvError::Empty) => { Err(flume::TryRecvError::Empty) => {
let duration = Duration::from_secs(30 * warn); let duration = Duration::from_secs(30 * warn);
@@ -221,7 +255,7 @@ fn handle_pending_drop_tokens(


fn report_remaining_drop_tokens( fn report_remaining_drop_tokens(
mut channel: DaemonChannel, mut channel: DaemonChannel,
mut drop_tokens: Vec<DropToken>,
mut drop_tokens: Vec<DropTokenStatus>,
mut pending_drop_tokens: Vec<(DropToken, flume::Receiver<()>, Instant, u64)>, mut pending_drop_tokens: Vec<(DropToken, flume::Receiver<()>, Instant, u64)>,
timestamp: Timestamp, timestamp: Timestamp,
) -> eyre::Result<()> { ) -> eyre::Result<()> {
@@ -234,7 +268,10 @@ fn report_remaining_drop_tokens(
Ok(()) => return Err(eyre!("Node API should not send anything on ACK channel")), Ok(()) => return Err(eyre!("Node API should not send anything on ACK channel")),
Err(flume::RecvTimeoutError::Disconnected) => { Err(flume::RecvTimeoutError::Disconnected) => {
// the event was dropped -> add the drop token to the list // the event was dropped -> add the drop token to the list
drop_tokens.push(token);
drop_tokens.push(DropTokenStatus {
token,
state: DropTokenState::Dropped,
});
} }
Err(flume::RecvTimeoutError::Timeout) => { Err(flume::RecvTimeoutError::Timeout) => {
let duration = Duration::from_secs(1); let duration = Duration::from_secs(1);
@@ -259,7 +296,7 @@ fn report_remaining_drop_tokens(
} }


fn report_drop_tokens( fn report_drop_tokens(
drop_tokens: &mut Vec<DropToken>,
drop_tokens: &mut Vec<DropTokenStatus>,
channel: &mut DaemonChannel, channel: &mut DaemonChannel,
timestamp: Timestamp, timestamp: Timestamp,
) -> Result<(), eyre::ErrReport> { ) -> Result<(), eyre::ErrReport> {


+ 21
- 14
apis/rust/node/src/node/drop_stream.rs View File

@@ -3,15 +3,16 @@ use std::{sync::Arc, time::Duration};
use crate::daemon_connection::DaemonChannel; use crate::daemon_connection::DaemonChannel;
use dora_core::{config::NodeId, uhlc}; use dora_core::{config::NodeId, uhlc};
use dora_message::{ use dora_message::{
common::{DropTokenState, DropTokenStatus},
daemon_to_node::{DaemonCommunication, DaemonReply, NodeDropEvent}, daemon_to_node::{DaemonCommunication, DaemonReply, NodeDropEvent},
node_to_daemon::{DaemonRequest, DropToken, Timestamped},
node_to_daemon::{DaemonRequest, Timestamped},
DataflowId, DataflowId,
}; };
use eyre::{eyre, Context}; use eyre::{eyre, Context};
use flume::RecvTimeoutError; use flume::RecvTimeoutError;


pub struct DropStream { pub struct DropStream {
receiver: flume::Receiver<DropToken>,
receiver: flume::Receiver<DropTokenStatus>,
_thread_handle: DropStreamThreadHandle, _thread_handle: DropStreamThreadHandle,
} }


@@ -82,7 +83,7 @@ impl DropStream {
} }


impl std::ops::Deref for DropStream { impl std::ops::Deref for DropStream {
type Target = flume::Receiver<DropToken>;
type Target = flume::Receiver<DropTokenStatus>;


fn deref(&self) -> &Self::Target { fn deref(&self) -> &Self::Target {
&self.receiver &self.receiver
@@ -92,7 +93,7 @@ impl std::ops::Deref for DropStream {
#[tracing::instrument(skip(tx, channel, clock))] #[tracing::instrument(skip(tx, channel, clock))]
fn drop_stream_loop( fn drop_stream_loop(
node_id: NodeId, node_id: NodeId,
tx: flume::Sender<DropToken>,
tx: flume::Sender<DropTokenStatus>,
mut channel: DaemonChannel, mut channel: DaemonChannel,
clock: Arc<uhlc::HLC>, clock: Arc<uhlc::HLC>,
) { ) {
@@ -125,16 +126,22 @@ fn drop_stream_loop(
if let Err(err) = clock.update_with_timestamp(&timestamp) { if let Err(err) = clock.update_with_timestamp(&timestamp) {
tracing::warn!("failed to update HLC: {err}"); tracing::warn!("failed to update HLC: {err}");
} }
match inner {
NodeDropEvent::OutputDropped { drop_token } => {
if tx.send(drop_token).is_err() {
tracing::warn!(
"drop channel was closed already, could not forward \
drop token`{drop_token:?}`"
);
break 'outer;
}
}
let event = match inner {
NodeDropEvent::OutputMapped { drop_token } => DropTokenStatus {
token: drop_token,
state: DropTokenState::Mapped,
},
NodeDropEvent::OutputDropped { drop_token } => DropTokenStatus {
token: drop_token,
state: DropTokenState::Dropped,
},
};
if tx.send(event).is_err() {
tracing::warn!(
"drop channel was closed already, could not forward \
drop token event `{event:?}`"
);
break 'outer;
} }
} }
} }


+ 34
- 7
apis/rust/node/src/node/mod.rs View File

@@ -16,6 +16,7 @@ use dora_core::{
}; };


use dora_message::{ use dora_message::{
common::DropTokenStatus,
daemon_to_node::{DaemonReply, NodeConfig}, daemon_to_node::{DaemonReply, NodeConfig},
metadata::{ArrowTypeInfo, Metadata, MetadataParameters}, metadata::{ArrowTypeInfo, Metadata, MetadataParameters},
node_to_daemon::{DaemonRequest, DataMessage, DropToken, Timestamped}, node_to_daemon::{DaemonRequest, DataMessage, DropToken, Timestamped},
@@ -48,6 +49,7 @@ pub struct DoraNode {
clock: Arc<uhlc::HLC>, clock: Arc<uhlc::HLC>,


sent_out_shared_memory: HashMap<DropToken, ShmemHandle>, sent_out_shared_memory: HashMap<DropToken, ShmemHandle>,
shared_memory_in_use: HashMap<DropToken, ShmemHandle>,
drop_stream: DropStream, drop_stream: DropStream,
cache: VecDeque<ShmemHandle>, cache: VecDeque<ShmemHandle>,


@@ -155,6 +157,7 @@ impl DoraNode {
control_channel, control_channel,
clock, clock,
sent_out_shared_memory: HashMap::new(), sent_out_shared_memory: HashMap::new(),
shared_memory_in_use: HashMap::new(),
drop_stream, drop_stream,
cache: VecDeque::new(), cache: VecDeque::new(),
dataflow_descriptor, dataflow_descriptor,
@@ -380,10 +383,7 @@ impl DoraNode {
fn handle_finished_drop_tokens(&mut self) -> eyre::Result<()> { fn handle_finished_drop_tokens(&mut self) -> eyre::Result<()> {
loop { loop {
match self.drop_stream.try_recv() { match self.drop_stream.try_recv() {
Ok(token) => match self.sent_out_shared_memory.remove(&token) {
Some(region) => self.add_to_cache(region),
None => tracing::warn!("received unknown finished drop token `{token:?}`"),
},
Ok(event) => self.handle_drop_token_event(event),
Err(flume::TryRecvError::Empty) => break, Err(flume::TryRecvError::Empty) => break,
Err(flume::TryRecvError::Disconnected) => { Err(flume::TryRecvError::Disconnected) => {
bail!("event stream was closed before sending all expected drop tokens") bail!("event stream was closed before sending all expected drop tokens")
@@ -393,6 +393,35 @@ impl DoraNode {
Ok(()) Ok(())
} }


fn handle_drop_token_event(&mut self, event: DropTokenStatus) {
let DropTokenStatus { token, state } = event;
match state {
dora_message::common::DropTokenState::Mapped => {
let region = self.sent_out_shared_memory.remove(&token);
match region {
Some(region) => {
self.shared_memory_in_use.insert(token, region);
}
None => {
tracing::warn!("received unknown mapped drop token `{token:?}`")
}
};
}
dora_message::common::DropTokenState::Dropped => {
let region = self
.sent_out_shared_memory
.remove(&token)
.or_else(|| self.shared_memory_in_use.remove(&token));
match region {
Some(region) => self.add_to_cache(region),
None => {
tracing::warn!("received unknown finished drop token `{token:?}`")
}
}
}
};
}

fn add_to_cache(&mut self, memory: ShmemHandle) { fn add_to_cache(&mut self, memory: ShmemHandle) {
const MAX_CACHE_SIZE: usize = 20; const MAX_CACHE_SIZE: usize = 20;


@@ -435,9 +464,7 @@ impl Drop for DoraNode {
} }


match self.drop_stream.recv_timeout(Duration::from_secs(2)) { match self.drop_stream.recv_timeout(Duration::from_secs(2)) {
Ok(token) => {
self.sent_out_shared_memory.remove(&token);
}
Ok(event) => self.handle_drop_token_event(event),
Err(flume::RecvTimeoutError::Disconnected) => { Err(flume::RecvTimeoutError::Disconnected) => {
tracing::warn!( tracing::warn!(
"finished_drop_tokens channel closed while still waiting for drop tokens; \ "finished_drop_tokens channel closed while still waiting for drop tokens; \


+ 66
- 16
binaries/daemon/src/lib.rs View File

@@ -12,7 +12,8 @@ use dora_core::{
}; };
use dora_message::{ use dora_message::{
common::{ common::{
DaemonId, DataMessage, DropToken, LogLevel, NodeError, NodeErrorCause, NodeExitStatus,
DaemonId, DataMessage, DropToken, DropTokenStatus, LogLevel, NodeError, NodeErrorCause,
NodeExitStatus,
}, },
coordinator_to_cli::DataflowResult, coordinator_to_cli::DataflowResult,
coordinator_to_daemon::{DaemonCoordinatorEvent, SpawnDataflowNodes}, coordinator_to_daemon::{DaemonCoordinatorEvent, SpawnDataflowNodes},
@@ -1014,7 +1015,7 @@ impl Daemon {
.send_out(dataflow_id, node_id, output_id, metadata, data) .send_out(dataflow_id, node_id, output_id, metadata, data)
.await .await
.context("failed to send out")?, .context("failed to send out")?,
DaemonNodeEvent::ReportDrop { tokens } => {
DaemonNodeEvent::ReportTokenState { token_events } => {
let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| { let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
format!( format!(
"failed to get handle drop tokens: \ "failed to get handle drop tokens: \
@@ -1024,17 +1025,27 @@ impl Daemon {


match dataflow { match dataflow {
Ok(dataflow) => { Ok(dataflow) => {
for token in tokens {
for event in token_events {
let DropTokenStatus { token, state } = event;
match dataflow.pending_drop_tokens.get_mut(&token) { match dataflow.pending_drop_tokens.get_mut(&token) {
Some(info) => {
if info.pending_nodes.remove(&node_id) {
dataflow.check_drop_token(token, &self.clock).await?;
} else {
tracing::warn!(
"node `{node_id}` is not pending for drop token `{token:?}`"
);
Some(info) => match state {
dora_message::common::DropTokenState::Mapped => {
let changed = info.pending_nodes.remove(&node_id);
info.mapped_in_nodes.insert(node_id.clone());
if changed {
dataflow
.check_drop_token_mapped(token, &self.clock)
.await?;
}
} }
}
dora_message::common::DropTokenState::Dropped => {
let mut changed = info.pending_nodes.remove(&node_id);
changed |= info.mapped_in_nodes.remove(&node_id);
if changed {
dataflow.check_drop_token(token, &self.clock).await?;
}
}
},
None => tracing::warn!("unknown drop token `{token:?}`"), None => tracing::warn!("unknown drop token `{token:?}`"),
} }
} }
@@ -1620,6 +1631,7 @@ async fn send_output_to_local_receivers(
.or_insert_with(|| DropTokenInformation { .or_insert_with(|| DropTokenInformation {
owner: node_id.clone(), owner: node_id.clone(),
pending_nodes: Default::default(), pending_nodes: Default::default(),
mapped_in_nodes: Default::default(),
}) })
.pending_nodes .pending_nodes
.insert(receiver_id.clone()); .insert(receiver_id.clone());
@@ -1658,6 +1670,7 @@ async fn send_output_to_local_receivers(
.or_insert_with(|| DropTokenInformation { .or_insert_with(|| DropTokenInformation {
owner: node_id.clone(), owner: node_id.clone(),
pending_nodes: Default::default(), pending_nodes: Default::default(),
mapped_in_nodes: Default::default(),
}); });
// check if all local subscribers are finished with the token // check if all local subscribers are finished with the token
dataflow.check_drop_token(token, clock).await?; dataflow.check_drop_token(token, clock).await?;
@@ -1927,7 +1940,7 @@ impl RunningDataflow {
async fn check_drop_token(&mut self, token: DropToken, clock: &HLC) -> eyre::Result<()> { async fn check_drop_token(&mut self, token: DropToken, clock: &HLC) -> eyre::Result<()> {
match self.pending_drop_tokens.entry(token) { match self.pending_drop_tokens.entry(token) {
std::collections::hash_map::Entry::Occupied(entry) => { std::collections::hash_map::Entry::Occupied(entry) => {
if entry.get().pending_nodes.is_empty() {
if entry.get().pending_nodes.is_empty() && entry.get().mapped_in_nodes.is_empty() {
let (drop_token, info) = entry.remove_entry(); let (drop_token, info) = entry.remove_entry();
let result = match self.drop_channels.get_mut(&info.owner) { let result = match self.drop_channels.get_mut(&info.owner) {
Some(channel) => send_with_timestamp( Some(channel) => send_with_timestamp(
@@ -1962,6 +1975,38 @@ impl RunningDataflow {
let OutputId(node_id, output_id) = output_id; let OutputId(node_id, output_id) = output_id;
format!("dora/{network_id}/{dataflow_id}/output/{node_id}/{output_id}") format!("dora/{network_id}/{dataflow_id}/output/{node_id}/{output_id}")
} }

async fn check_drop_token_mapped(&mut self, token: DropToken, clock: &HLC) -> eyre::Result<()> {
match self.pending_drop_tokens.entry(token) {
std::collections::hash_map::Entry::Occupied(entry) => {
if entry.get().pending_nodes.is_empty() && !entry.get().mapped_in_nodes.is_empty() {
let info = entry.get();
let result = match self.drop_channels.get_mut(&info.owner) {
Some(channel) => send_with_timestamp(
channel,
NodeDropEvent::OutputMapped { drop_token: token },
clock,
)
.wrap_err("send failed"),
None => Err(eyre!("no subscribe channel for node `{}`", &info.owner)),
};
if let Err(err) = result.wrap_err_with(|| {
format!(
"failed to report drop token mapped `{token:?}` to owner `{}`",
&info.owner
)
}) {
tracing::warn!("{err:?}");
}
}
}
std::collections::hash_map::Entry::Vacant(_) => {
tracing::warn!("check_drop_token_mapped called with already closed token")
}
}

Ok(())
}
} }


fn empty_type_info() -> ArrowTypeInfo { fn empty_type_info() -> ArrowTypeInfo {
@@ -1983,9 +2028,14 @@ type InputId = (NodeId, DataId);
struct DropTokenInformation { struct DropTokenInformation {
/// The node that created the associated drop token. /// The node that created the associated drop token.
owner: NodeId, owner: NodeId,
/// Contains the set of pending nodes that still have access to the input
/// associated with a drop token.
/// Contains the set of nodes that have not mapped the input associated
/// with a drop token yet. The shared memory region needs to be kept
/// alive until this list is empty.
pending_nodes: BTreeSet<NodeId>, pending_nodes: BTreeSet<NodeId>,
/// Contains the set of nodes that still have the input data associated
/// with a drop token mapped in their address space. The shared memory
/// region must not be overwritten until this list is empty.
mapped_in_nodes: BTreeSet<NodeId>,
} }


#[derive(Debug)] #[derive(Debug)]
@@ -2033,8 +2083,8 @@ pub enum DaemonNodeEvent {
metadata: metadata::Metadata, metadata: metadata::Metadata,
data: Option<DataMessage>, data: Option<DataMessage>,
}, },
ReportDrop {
tokens: Vec<DropToken>,
ReportTokenState {
token_events: Vec<DropTokenStatus>,
}, },
EventStreamDropped { EventStreamDropped {
reply_sender: oneshot::Sender<DaemonReply>, reply_sender: oneshot::Sender<DaemonReply>,


+ 4
- 4
binaries/daemon/src/node_communication/mod.rs View File

@@ -5,7 +5,7 @@ use dora_core::{
uhlc, uhlc,
}; };
use dora_message::{ use dora_message::{
common::{DropToken, Timestamped},
common::{DropTokenState, DropTokenStatus, Timestamped},
daemon_to_node::{DaemonCommunication, DaemonReply, NodeDropEvent, NodeEvent}, daemon_to_node::{DaemonCommunication, DaemonReply, NodeDropEvent, NodeEvent},
node_to_daemon::DaemonRequest, node_to_daemon::DaemonRequest,
DataflowId, DataflowId,
@@ -461,13 +461,13 @@ impl Listener {
Ok(()) Ok(())
} }


async fn report_drop_tokens(&mut self, drop_tokens: Vec<DropToken>) -> eyre::Result<()> {
async fn report_drop_tokens(&mut self, drop_tokens: Vec<DropTokenStatus>) -> eyre::Result<()> {
if !drop_tokens.is_empty() { if !drop_tokens.is_empty() {
let event = Event::Node { let event = Event::Node {
dataflow_id: self.dataflow_id, dataflow_id: self.dataflow_id,
node_id: self.node_id.clone(), node_id: self.node_id.clone(),
event: DaemonNodeEvent::ReportDrop {
tokens: drop_tokens,
event: DaemonNodeEvent::ReportTokenState {
token_events: drop_tokens,
}, },
}; };
let event = Timestamped { let event = Timestamped {


+ 16
- 0
libraries/message/src/common.rs View File

@@ -189,6 +189,22 @@ impl fmt::Debug for DataMessage {
} }
} }


#[derive(
Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize,
)]
pub struct DropTokenStatus {
pub token: DropToken,
pub state: DropTokenState,
}

#[derive(
Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize,
)]
pub enum DropTokenState {
Mapped,
Dropped,
}

#[derive( #[derive(
Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize, Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize,
)] )]


+ 1
- 0
libraries/message/src/daemon_to_node.rs View File

@@ -74,5 +74,6 @@ pub enum NodeEvent {


#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum NodeDropEvent { pub enum NodeDropEvent {
OutputMapped { drop_token: DropToken },
OutputDropped { drop_token: DropToken }, OutputDropped { drop_token: DropToken },
} }

+ 3
- 2
libraries/message/src/node_to_daemon.rs View File

@@ -2,6 +2,7 @@ pub use crate::common::{
DataMessage, DropToken, LogLevel, LogMessage, SharedMemoryId, Timestamped, DataMessage, DropToken, LogLevel, LogMessage, SharedMemoryId, Timestamped,
}; };
use crate::{ use crate::{
common::DropTokenStatus,
current_crate_version, current_crate_version,
id::{DataId, NodeId}, id::{DataId, NodeId},
metadata::Metadata, metadata::Metadata,
@@ -22,10 +23,10 @@ pub enum DaemonRequest {
/// required drop tokens. /// required drop tokens.
OutputsDone, OutputsDone,
NextEvent { NextEvent {
drop_tokens: Vec<DropToken>,
drop_tokens: Vec<DropTokenStatus>,
}, },
ReportDropTokens { ReportDropTokens {
drop_tokens: Vec<DropToken>,
drop_tokens: Vec<DropTokenStatus>,
}, },
SubscribeDrop, SubscribeDrop,
NextFinishedDropTokens, NextFinishedDropTokens,


Loading…
Cancel
Save