Compare commits

...

Author SHA1 Message Date
  Philipp Oppermann b3fd01848c Report when shared memory region is mapped to allow faster cleanup 1 year ago
10 changed files with 211 additions and 125 deletions
Split View
  1. +3
    -50
      apis/rust/node/src/event_stream/mod.rs
  2. +2
    -8
      apis/rust/node/src/event_stream/scheduler.rs
  3. +61
    -24
      apis/rust/node/src/event_stream/thread.rs
  4. +21
    -14
      apis/rust/node/src/node/drop_stream.rs
  5. +34
    -7
      apis/rust/node/src/node/mod.rs
  6. +66
    -16
      binaries/daemon/src/lib.rs
  7. +4
    -4
      binaries/daemon/src/node_communication/mod.rs
  8. +16
    -0
      libraries/message/src/common.rs
  9. +1
    -0
      libraries/message/src/daemon_to_node.rs
  10. +3
    -2
      libraries/message/src/node_to_daemon.rs

+ 3
- 50
apis/rust/node/src/event_stream/mod.rs View File

@@ -6,7 +6,7 @@ use std::{
};

use dora_message::{
daemon_to_node::{DaemonCommunication, DaemonReply, DataMessage, NodeEvent},
daemon_to_node::{DaemonCommunication, DaemonReply},
id::DataId,
node_to_daemon::{DaemonRequest, Timestamped},
DataflowId,
@@ -19,10 +19,7 @@ use futures::{
use futures_timer::Delay;
use scheduler::{Scheduler, NON_INPUT_EVENT};

use self::{
event::SharedMemoryData,
thread::{EventItem, EventStreamThreadHandle},
};
use self::thread::{EventItem, EventStreamThreadHandle};
use crate::daemon_connection::DaemonChannel;
use dora_core::{
config::{Input, NodeId},
@@ -199,51 +196,7 @@ impl EventStream {

fn convert_event_item(item: EventItem) -> Event {
match item {
EventItem::NodeEvent { event, ack_channel } => match event {
NodeEvent::Stop => Event::Stop,
NodeEvent::Reload { operator_id } => Event::Reload { operator_id },
NodeEvent::InputClosed { id } => Event::InputClosed { id },
NodeEvent::Input { id, metadata, data } => {
let data = match data {
None => Ok(None),
Some(DataMessage::Vec(v)) => Ok(Some(RawData::Vec(v))),
Some(DataMessage::SharedMemory {
shared_memory_id,
len,
drop_token: _, // handled in `event_stream_loop`
}) => unsafe {
MappedInputData::map(&shared_memory_id, len).map(|data| {
Some(RawData::SharedMemory(SharedMemoryData {
data,
_drop: ack_channel,
}))
})
},
};
let data = data.and_then(|data| {
let raw_data = data.unwrap_or(RawData::Empty);
raw_data
.into_arrow_array(&metadata.type_info)
.map(arrow::array::make_array)
});
match data {
Ok(data) => Event::Input {
id,
metadata,
data: data.into(),
},
Err(err) => Event::Error(format!("{err:?}")),
}
}
NodeEvent::AllInputsClosed => {
let err = eyre!(
"received `AllInputsClosed` event, which should be handled by background task"
);
tracing::error!("{err:?}");
Event::Error(err.wrap_err("internal error").to_string())
}
},

EventItem::NodeEvent { event } => event,
EventItem::FatalError(err) => {
Event::Error(format!("fatal event stream error: {err:?}"))
}


+ 2
- 8
apis/rust/node/src/event_stream/scheduler.rs View File

@@ -2,7 +2,7 @@ use std::collections::{HashMap, VecDeque};

use dora_message::{daemon_to_node::NodeEvent, id::DataId};

use super::thread::EventItem;
use super::{thread::EventItem, Event};
pub const NON_INPUT_EVENT: &str = "dora/non_input_event";

// This scheduler will make sure that there is fairness between
@@ -40,13 +40,7 @@ impl Scheduler {
pub fn add_event(&mut self, event: EventItem) {
let event_id = match &event {
EventItem::NodeEvent {
event:
NodeEvent::Input {
id,
metadata: _,
data: _,
},
ack_channel: _,
event: Event::Input { id, .. },
} => id,
_ => &DataId::from(NON_INPUT_EVENT.to_string()),
};


+ 61
- 24
apis/rust/node/src/event_stream/thread.rs View File

@@ -3,6 +3,7 @@ use dora_core::{
uhlc::{self, Timestamp},
};
use dora_message::{
common::{DataMessage, DropTokenState, DropTokenStatus},
daemon_to_node::{DaemonReply, NodeEvent},
node_to_daemon::{DaemonRequest, DropToken, Timestamped},
};
@@ -15,6 +16,8 @@ use std::{

use crate::daemon_connection::DaemonChannel;

use super::{event::SharedMemoryData, Event, MappedInputData, RawData};

pub fn init(
node_id: NodeId,
tx: flume::Sender<EventItem>,
@@ -28,10 +31,7 @@ pub fn init(

#[derive(Debug)]
pub enum EventItem {
NodeEvent {
event: NodeEvent,
ack_channel: flume::Sender<()>,
},
NodeEvent { event: super::Event },
FatalError(eyre::Report),
TimeoutError(eyre::Report),
}
@@ -130,25 +130,60 @@ fn event_stream_loop(
if let Err(err) = clock.update_with_timestamp(&timestamp) {
tracing::warn!("failed to update HLC: {err}");
}
let drop_token = match &inner {
NodeEvent::Input {
data: Some(data), ..
} => data.drop_token(),

let event = match inner {
NodeEvent::Stop => Event::Stop,
NodeEvent::Reload { operator_id } => Event::Reload { operator_id },
NodeEvent::InputClosed { id } => Event::InputClosed { id },
NodeEvent::Input { id, metadata, data } => {
let data = match data {
None => Ok(None),
Some(DataMessage::Vec(v)) => Ok(Some(RawData::Vec(v))),
Some(DataMessage::SharedMemory {
shared_memory_id,
len,
drop_token,
}) => unsafe {
let (drop_tx, drop_rx) = flume::bounded(0);
let data = MappedInputData::map(&shared_memory_id, len).map(|data| {
Some(RawData::SharedMemory(SharedMemoryData {
data,
_drop: drop_tx,
}))
});
drop_tokens.push(DropTokenStatus {
token: drop_token,
state: DropTokenState::Mapped,
});
pending_drop_tokens.push((drop_token, drop_rx, Instant::now(), 1));
data
},
};
let data = data.and_then(|data| {
let raw_data = data.unwrap_or(RawData::Empty);
raw_data
.into_arrow_array(&metadata.type_info)
.map(arrow::array::make_array)
});
match data {
Ok(data) => Event::Input {
id,
metadata,
data: data.into(),
},
Err(err) => Event::Error(format!("{err:?}")),
}
}
NodeEvent::AllInputsClosed => {
// close the event stream
tx = None;
// skip this internal event
continue;
}
_ => None,
};

if let Some(tx) = tx.as_ref() {
let (drop_tx, drop_rx) = flume::bounded(0);
match tx.send(EventItem::NodeEvent {
event: inner,
ack_channel: drop_tx,
}) {
match tx.send(EventItem::NodeEvent { event }) {
Ok(()) => {}
Err(send_error) => {
let event = send_error.into_inner();
@@ -159,12 +194,8 @@ fn event_stream_loop(
break 'outer Ok(());
}
}

if let Some(token) = drop_token {
pending_drop_tokens.push((token, drop_rx, Instant::now(), 1));
}
} else {
tracing::warn!("dropping event because event `tx` was already closed: `{inner:?}`");
tracing::warn!("dropping event because event `tx` was already closed: `{event:?}`");
}
}
};
@@ -196,7 +227,7 @@ fn event_stream_loop(

fn handle_pending_drop_tokens(
pending_drop_tokens: &mut Vec<(DropToken, flume::Receiver<()>, Instant, u64)>,
drop_tokens: &mut Vec<DropToken>,
drop_tokens: &mut Vec<DropTokenStatus>,
) -> eyre::Result<()> {
let mut still_pending = Vec::new();
for (token, rx, since, warn) in pending_drop_tokens.drain(..) {
@@ -204,7 +235,10 @@ fn handle_pending_drop_tokens(
Ok(()) => return Err(eyre!("Node API should not send anything on ACK channel")),
Err(flume::TryRecvError::Disconnected) => {
// the event was dropped -> add the drop token to the list
drop_tokens.push(token);
drop_tokens.push(DropTokenStatus {
token,
state: DropTokenState::Dropped,
});
}
Err(flume::TryRecvError::Empty) => {
let duration = Duration::from_secs(30 * warn);
@@ -221,7 +255,7 @@ fn handle_pending_drop_tokens(

fn report_remaining_drop_tokens(
mut channel: DaemonChannel,
mut drop_tokens: Vec<DropToken>,
mut drop_tokens: Vec<DropTokenStatus>,
mut pending_drop_tokens: Vec<(DropToken, flume::Receiver<()>, Instant, u64)>,
timestamp: Timestamp,
) -> eyre::Result<()> {
@@ -234,7 +268,10 @@ fn report_remaining_drop_tokens(
Ok(()) => return Err(eyre!("Node API should not send anything on ACK channel")),
Err(flume::RecvTimeoutError::Disconnected) => {
// the event was dropped -> add the drop token to the list
drop_tokens.push(token);
drop_tokens.push(DropTokenStatus {
token,
state: DropTokenState::Dropped,
});
}
Err(flume::RecvTimeoutError::Timeout) => {
let duration = Duration::from_secs(1);
@@ -259,7 +296,7 @@ fn report_remaining_drop_tokens(
}

fn report_drop_tokens(
drop_tokens: &mut Vec<DropToken>,
drop_tokens: &mut Vec<DropTokenStatus>,
channel: &mut DaemonChannel,
timestamp: Timestamp,
) -> Result<(), eyre::ErrReport> {


+ 21
- 14
apis/rust/node/src/node/drop_stream.rs View File

@@ -3,15 +3,16 @@ use std::{sync::Arc, time::Duration};
use crate::daemon_connection::DaemonChannel;
use dora_core::{config::NodeId, uhlc};
use dora_message::{
common::{DropTokenState, DropTokenStatus},
daemon_to_node::{DaemonCommunication, DaemonReply, NodeDropEvent},
node_to_daemon::{DaemonRequest, DropToken, Timestamped},
node_to_daemon::{DaemonRequest, Timestamped},
DataflowId,
};
use eyre::{eyre, Context};
use flume::RecvTimeoutError;

pub struct DropStream {
receiver: flume::Receiver<DropToken>,
receiver: flume::Receiver<DropTokenStatus>,
_thread_handle: DropStreamThreadHandle,
}

@@ -82,7 +83,7 @@ impl DropStream {
}

impl std::ops::Deref for DropStream {
type Target = flume::Receiver<DropToken>;
type Target = flume::Receiver<DropTokenStatus>;

fn deref(&self) -> &Self::Target {
&self.receiver
@@ -92,7 +93,7 @@ impl std::ops::Deref for DropStream {
#[tracing::instrument(skip(tx, channel, clock))]
fn drop_stream_loop(
node_id: NodeId,
tx: flume::Sender<DropToken>,
tx: flume::Sender<DropTokenStatus>,
mut channel: DaemonChannel,
clock: Arc<uhlc::HLC>,
) {
@@ -125,16 +126,22 @@ fn drop_stream_loop(
if let Err(err) = clock.update_with_timestamp(&timestamp) {
tracing::warn!("failed to update HLC: {err}");
}
match inner {
NodeDropEvent::OutputDropped { drop_token } => {
if tx.send(drop_token).is_err() {
tracing::warn!(
"drop channel was closed already, could not forward \
drop token`{drop_token:?}`"
);
break 'outer;
}
}
let event = match inner {
NodeDropEvent::OutputMapped { drop_token } => DropTokenStatus {
token: drop_token,
state: DropTokenState::Mapped,
},
NodeDropEvent::OutputDropped { drop_token } => DropTokenStatus {
token: drop_token,
state: DropTokenState::Dropped,
},
};
if tx.send(event).is_err() {
tracing::warn!(
"drop channel was closed already, could not forward \
drop token event `{event:?}`"
);
break 'outer;
}
}
}


+ 34
- 7
apis/rust/node/src/node/mod.rs View File

@@ -16,6 +16,7 @@ use dora_core::{
};

use dora_message::{
common::DropTokenStatus,
daemon_to_node::{DaemonReply, NodeConfig},
metadata::{ArrowTypeInfo, Metadata, MetadataParameters},
node_to_daemon::{DaemonRequest, DataMessage, DropToken, Timestamped},
@@ -48,6 +49,7 @@ pub struct DoraNode {
clock: Arc<uhlc::HLC>,

sent_out_shared_memory: HashMap<DropToken, ShmemHandle>,
shared_memory_in_use: HashMap<DropToken, ShmemHandle>,
drop_stream: DropStream,
cache: VecDeque<ShmemHandle>,

@@ -155,6 +157,7 @@ impl DoraNode {
control_channel,
clock,
sent_out_shared_memory: HashMap::new(),
shared_memory_in_use: HashMap::new(),
drop_stream,
cache: VecDeque::new(),
dataflow_descriptor,
@@ -380,10 +383,7 @@ impl DoraNode {
fn handle_finished_drop_tokens(&mut self) -> eyre::Result<()> {
loop {
match self.drop_stream.try_recv() {
Ok(token) => match self.sent_out_shared_memory.remove(&token) {
Some(region) => self.add_to_cache(region),
None => tracing::warn!("received unknown finished drop token `{token:?}`"),
},
Ok(event) => self.handle_drop_token_event(event),
Err(flume::TryRecvError::Empty) => break,
Err(flume::TryRecvError::Disconnected) => {
bail!("event stream was closed before sending all expected drop tokens")
@@ -393,6 +393,35 @@ impl DoraNode {
Ok(())
}

fn handle_drop_token_event(&mut self, event: DropTokenStatus) {
let DropTokenStatus { token, state } = event;
match state {
dora_message::common::DropTokenState::Mapped => {
let region = self.sent_out_shared_memory.remove(&token);
match region {
Some(region) => {
self.shared_memory_in_use.insert(token, region);
}
None => {
tracing::warn!("received unknown mapped drop token `{token:?}`")
}
};
}
dora_message::common::DropTokenState::Dropped => {
let region = self
.sent_out_shared_memory
.remove(&token)
.or_else(|| self.shared_memory_in_use.remove(&token));
match region {
Some(region) => self.add_to_cache(region),
None => {
tracing::warn!("received unknown finished drop token `{token:?}`")
}
}
}
};
}

fn add_to_cache(&mut self, memory: ShmemHandle) {
const MAX_CACHE_SIZE: usize = 20;

@@ -435,9 +464,7 @@ impl Drop for DoraNode {
}

match self.drop_stream.recv_timeout(Duration::from_secs(2)) {
Ok(token) => {
self.sent_out_shared_memory.remove(&token);
}
Ok(event) => self.handle_drop_token_event(event),
Err(flume::RecvTimeoutError::Disconnected) => {
tracing::warn!(
"finished_drop_tokens channel closed while still waiting for drop tokens; \


+ 66
- 16
binaries/daemon/src/lib.rs View File

@@ -12,7 +12,8 @@ use dora_core::{
};
use dora_message::{
common::{
DaemonId, DataMessage, DropToken, LogLevel, NodeError, NodeErrorCause, NodeExitStatus,
DaemonId, DataMessage, DropToken, DropTokenStatus, LogLevel, NodeError, NodeErrorCause,
NodeExitStatus,
},
coordinator_to_cli::DataflowResult,
coordinator_to_daemon::{DaemonCoordinatorEvent, SpawnDataflowNodes},
@@ -1014,7 +1015,7 @@ impl Daemon {
.send_out(dataflow_id, node_id, output_id, metadata, data)
.await
.context("failed to send out")?,
DaemonNodeEvent::ReportDrop { tokens } => {
DaemonNodeEvent::ReportTokenState { token_events } => {
let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
format!(
"failed to get handle drop tokens: \
@@ -1024,17 +1025,27 @@ impl Daemon {

match dataflow {
Ok(dataflow) => {
for token in tokens {
for event in token_events {
let DropTokenStatus { token, state } = event;
match dataflow.pending_drop_tokens.get_mut(&token) {
Some(info) => {
if info.pending_nodes.remove(&node_id) {
dataflow.check_drop_token(token, &self.clock).await?;
} else {
tracing::warn!(
"node `{node_id}` is not pending for drop token `{token:?}`"
);
Some(info) => match state {
dora_message::common::DropTokenState::Mapped => {
let changed = info.pending_nodes.remove(&node_id);
info.mapped_in_nodes.insert(node_id.clone());
if changed {
dataflow
.check_drop_token_mapped(token, &self.clock)
.await?;
}
}
}
dora_message::common::DropTokenState::Dropped => {
let mut changed = info.pending_nodes.remove(&node_id);
changed |= info.mapped_in_nodes.remove(&node_id);
if changed {
dataflow.check_drop_token(token, &self.clock).await?;
}
}
},
None => tracing::warn!("unknown drop token `{token:?}`"),
}
}
@@ -1620,6 +1631,7 @@ async fn send_output_to_local_receivers(
.or_insert_with(|| DropTokenInformation {
owner: node_id.clone(),
pending_nodes: Default::default(),
mapped_in_nodes: Default::default(),
})
.pending_nodes
.insert(receiver_id.clone());
@@ -1658,6 +1670,7 @@ async fn send_output_to_local_receivers(
.or_insert_with(|| DropTokenInformation {
owner: node_id.clone(),
pending_nodes: Default::default(),
mapped_in_nodes: Default::default(),
});
// check if all local subscribers are finished with the token
dataflow.check_drop_token(token, clock).await?;
@@ -1927,7 +1940,7 @@ impl RunningDataflow {
async fn check_drop_token(&mut self, token: DropToken, clock: &HLC) -> eyre::Result<()> {
match self.pending_drop_tokens.entry(token) {
std::collections::hash_map::Entry::Occupied(entry) => {
if entry.get().pending_nodes.is_empty() {
if entry.get().pending_nodes.is_empty() && entry.get().mapped_in_nodes.is_empty() {
let (drop_token, info) = entry.remove_entry();
let result = match self.drop_channels.get_mut(&info.owner) {
Some(channel) => send_with_timestamp(
@@ -1962,6 +1975,38 @@ impl RunningDataflow {
let OutputId(node_id, output_id) = output_id;
format!("dora/{network_id}/{dataflow_id}/output/{node_id}/{output_id}")
}

async fn check_drop_token_mapped(&mut self, token: DropToken, clock: &HLC) -> eyre::Result<()> {
match self.pending_drop_tokens.entry(token) {
std::collections::hash_map::Entry::Occupied(entry) => {
if entry.get().pending_nodes.is_empty() && !entry.get().mapped_in_nodes.is_empty() {
let info = entry.get();
let result = match self.drop_channels.get_mut(&info.owner) {
Some(channel) => send_with_timestamp(
channel,
NodeDropEvent::OutputMapped { drop_token: token },
clock,
)
.wrap_err("send failed"),
None => Err(eyre!("no subscribe channel for node `{}`", &info.owner)),
};
if let Err(err) = result.wrap_err_with(|| {
format!(
"failed to report drop token mapped `{token:?}` to owner `{}`",
&info.owner
)
}) {
tracing::warn!("{err:?}");
}
}
}
std::collections::hash_map::Entry::Vacant(_) => {
tracing::warn!("check_drop_token_mapped called with already closed token")
}
}

Ok(())
}
}

fn empty_type_info() -> ArrowTypeInfo {
@@ -1983,9 +2028,14 @@ type InputId = (NodeId, DataId);
struct DropTokenInformation {
/// The node that created the associated drop token.
owner: NodeId,
/// Contains the set of pending nodes that still have access to the input
/// associated with a drop token.
/// Contains the set of nodes that have not mapped the input associated
/// with a drop token yet. The shared memory region needs to be kept
/// alive until this list is empty.
pending_nodes: BTreeSet<NodeId>,
/// Contains the set of nodes that still have the input data associated
/// with a drop token mapped in their address space. The shared memory
/// region must not be overwritten until this list is empty.
mapped_in_nodes: BTreeSet<NodeId>,
}

#[derive(Debug)]
@@ -2033,8 +2083,8 @@ pub enum DaemonNodeEvent {
metadata: metadata::Metadata,
data: Option<DataMessage>,
},
ReportDrop {
tokens: Vec<DropToken>,
ReportTokenState {
token_events: Vec<DropTokenStatus>,
},
EventStreamDropped {
reply_sender: oneshot::Sender<DaemonReply>,


+ 4
- 4
binaries/daemon/src/node_communication/mod.rs View File

@@ -5,7 +5,7 @@ use dora_core::{
uhlc,
};
use dora_message::{
common::{DropToken, Timestamped},
common::{DropTokenState, DropTokenStatus, Timestamped},
daemon_to_node::{DaemonCommunication, DaemonReply, NodeDropEvent, NodeEvent},
node_to_daemon::DaemonRequest,
DataflowId,
@@ -461,13 +461,13 @@ impl Listener {
Ok(())
}

async fn report_drop_tokens(&mut self, drop_tokens: Vec<DropToken>) -> eyre::Result<()> {
async fn report_drop_tokens(&mut self, drop_tokens: Vec<DropTokenStatus>) -> eyre::Result<()> {
if !drop_tokens.is_empty() {
let event = Event::Node {
dataflow_id: self.dataflow_id,
node_id: self.node_id.clone(),
event: DaemonNodeEvent::ReportDrop {
tokens: drop_tokens,
event: DaemonNodeEvent::ReportTokenState {
token_events: drop_tokens,
},
};
let event = Timestamped {


+ 16
- 0
libraries/message/src/common.rs View File

@@ -189,6 +189,22 @@ impl fmt::Debug for DataMessage {
}
}

#[derive(
Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize,
)]
pub struct DropTokenStatus {
pub token: DropToken,
pub state: DropTokenState,
}

#[derive(
Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize,
)]
pub enum DropTokenState {
Mapped,
Dropped,
}

#[derive(
Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize,
)]


+ 1
- 0
libraries/message/src/daemon_to_node.rs View File

@@ -74,5 +74,6 @@ pub enum NodeEvent {

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum NodeDropEvent {
OutputMapped { drop_token: DropToken },
OutputDropped { drop_token: DropToken },
}

+ 3
- 2
libraries/message/src/node_to_daemon.rs View File

@@ -2,6 +2,7 @@ pub use crate::common::{
DataMessage, DropToken, LogLevel, LogMessage, SharedMemoryId, Timestamped,
};
use crate::{
common::DropTokenStatus,
current_crate_version,
id::{DataId, NodeId},
metadata::Metadata,
@@ -22,10 +23,10 @@ pub enum DaemonRequest {
/// required drop tokens.
OutputsDone,
NextEvent {
drop_tokens: Vec<DropToken>,
drop_tokens: Vec<DropTokenStatus>,
},
ReportDropTokens {
drop_tokens: Vec<DropToken>,
drop_tokens: Vec<DropTokenStatus>,
},
SubscribeDrop,
NextFinishedDropTokens,


Loading…
Cancel
Save