Browse Source

Refactor: Move message definitions to `dora-message` crate (#613)

First step towards versioning the message definitions indepedently.
tags/v0.3.6-rc0
Philipp Oppermann GitHub 1 year ago
parent
commit
d6032265a2
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
61 changed files with 1118 additions and 1010 deletions
  1. +1
    -1
      .github/workflows/release.yml
  2. +12
    -3
      Cargo.lock
  3. +7
    -2
      Cargo.toml
  4. +1
    -2
      apis/python/node/src/lib.rs
  5. +1
    -0
      apis/rust/node/Cargo.toml
  6. +7
    -10
      apis/rust/node/src/daemon_connection/mod.rs
  7. +4
    -1
      apis/rust/node/src/daemon_connection/tcp.rs
  8. +4
    -1
      apis/rust/node/src/daemon_connection/unix_domain.rs
  9. +2
    -4
      apis/rust/node/src/event_stream/event.rs
  10. +12
    -15
      apis/rust/node/src/event_stream/mod.rs
  11. +6
    -3
      apis/rust/node/src/event_stream/thread.rs
  12. +5
    -2
      apis/rust/node/src/lib.rs
  13. +1
    -1
      apis/rust/node/src/node/arrow_utils.rs
  14. +10
    -5
      apis/rust/node/src/node/control_channel.rs
  15. +7
    -9
      apis/rust/node/src/node/drop_stream.rs
  16. +9
    -4
      apis/rust/node/src/node/mod.rs
  17. +1
    -0
      binaries/cli/Cargo.toml
  18. +4
    -5
      binaries/cli/src/attach.rs
  19. +1
    -1
      binaries/cli/src/check.rs
  20. +1
    -1
      binaries/cli/src/formatting.rs
  21. +1
    -1
      binaries/cli/src/logs.rs
  22. +10
    -9
      binaries/cli/src/main.rs
  23. +2
    -1
      binaries/cli/src/up.rs
  24. +1
    -0
      binaries/coordinator/Cargo.toml
  25. +1
    -1
      binaries/coordinator/src/control.rs
  26. +23
    -31
      binaries/coordinator/src/lib.rs
  27. +14
    -17
      binaries/coordinator/src/listener.rs
  28. +1
    -1
      binaries/coordinator/src/log_subscriber.rs
  29. +5
    -4
      binaries/coordinator/src/run/mod.rs
  30. +1
    -0
      binaries/daemon/Cargo.toml
  31. +6
    -9
      binaries/daemon/src/coordinator.rs
  32. +1
    -1
      binaries/daemon/src/inter_daemon.rs
  33. +52
    -62
      binaries/daemon/src/lib.rs
  34. +4
    -1
      binaries/daemon/src/local_listener.rs
  35. +12
    -23
      binaries/daemon/src/node_communication/mod.rs
  36. +3
    -4
      binaries/daemon/src/node_communication/shmem.rs
  37. +3
    -4
      binaries/daemon/src/node_communication/tcp.rs
  38. +3
    -4
      binaries/daemon/src/node_communication/unix_domain.rs
  39. +7
    -4
      binaries/daemon/src/pending.rs
  40. +8
    -4
      binaries/daemon/src/spawn.rs
  41. +1
    -0
      binaries/runtime/Cargo.toml
  42. +1
    -1
      binaries/runtime/src/lib.rs
  43. +2
    -2
      binaries/runtime/src/operator/mod.rs
  44. +1
    -1
      binaries/runtime/src/operator/python.rs
  45. +8
    -5
      examples/multiple-daemons/run.rs
  46. +1
    -2
      libraries/core/Cargo.toml
  47. +0
    -58
      libraries/core/src/coordinator_messages.rs
  48. +0
    -302
      libraries/core/src/daemon_messages.rs
  49. +1
    -3
      libraries/core/src/lib.rs
  50. +1
    -246
      libraries/core/src/topics.rs
  51. +5
    -1
      libraries/message/Cargo.toml
  52. +47
    -0
      libraries/message/src/cli_to_coordinator.rs
  53. +184
    -0
      libraries/message/src/common.rs
  54. +85
    -0
      libraries/message/src/coordinator_to_cli.rs
  55. +59
    -0
      libraries/message/src/coordinator_to_daemon.rs
  56. +86
    -0
      libraries/message/src/daemon_to_coordinator.rs
  57. +21
    -0
      libraries/message/src/daemon_to_daemon.rs
  58. +77
    -0
      libraries/message/src/daemon_to_node.rs
  59. +10
    -138
      libraries/message/src/lib.rs
  60. +143
    -0
      libraries/message/src/metadata.rs
  61. +131
    -0
      libraries/message/src/node_to_daemon.rs

+ 1
- 1
.github/workflows/release.yml View File

@@ -54,11 +54,11 @@ jobs:
# workspace crates.

# Publish libraries crates
cargo publish -p dora-message --token ${{ secrets.CARGO_REGISTRY_TOKEN }}
cargo publish -p dora-tracing --token ${{ secrets.CARGO_REGISTRY_TOKEN }}
cargo publish -p dora-metrics --token ${{ secrets.CARGO_REGISTRY_TOKEN }}
cargo publish -p dora-download --token ${{ secrets.CARGO_REGISTRY_TOKEN }}
cargo publish -p dora-core --token ${{ secrets.CARGO_REGISTRY_TOKEN }}
cargo publish -p dora-message --token ${{ secrets.CARGO_REGISTRY_TOKEN }}
cargo publish -p communication-layer-pub-sub --token ${{ secrets.CARGO_REGISTRY_TOKEN }}
cargo publish -p communication-layer-request-reply --token ${{ secrets.CARGO_REGISTRY_TOKEN }}
cargo publish -p shared-memory-server --token ${{ secrets.CARGO_REGISTRY_TOKEN }}


+ 12
- 3
Cargo.lock View File

@@ -2284,6 +2284,7 @@ dependencies = [
"dora-coordinator",
"dora-core",
"dora-daemon",
"dora-message",
"dora-node-api-c",
"dora-operator-api-c",
"dora-runtime",
@@ -2313,6 +2314,7 @@ version = "0.3.5"
dependencies = [
"ctrlc",
"dora-core",
"dora-message",
"dora-tracing",
"eyre",
"futures",
@@ -2330,8 +2332,6 @@ dependencies = [
name = "dora-core"
version = "0.3.5"
dependencies = [
"aligned-vec",
"dora-message",
"eyre",
"log",
"once_cell",
@@ -2342,6 +2342,7 @@ dependencies = [
"serde_yaml 0.9.34+deprecated",
"tokio",
"tracing",
"uhlc",
"uuid",
"which",
]
@@ -2359,6 +2360,7 @@ dependencies = [
"dora-arrow-convert",
"dora-core",
"dora-download",
"dora-message",
"dora-node-api",
"dora-tracing",
"eyre",
@@ -2394,6 +2396,7 @@ dependencies = [
"dora-coordinator",
"dora-core",
"dora-download",
"dora-message",
"dora-tracing",
"dunce",
"eyre",
@@ -2409,11 +2412,15 @@ dependencies = [
name = "dora-message"
version = "0.3.5"
dependencies = [
"aligned-vec",
"arrow-data",
"arrow-schema",
"dora-core",
"eyre",
"log",
"serde",
"uhlc",
"tokio",
"uuid",
]

[[package]]
@@ -2436,6 +2443,7 @@ dependencies = [
"bincode",
"dora-arrow-convert",
"dora-core",
"dora-message",
"dora-tracing",
"eyre",
"flume 0.10.14",
@@ -2641,6 +2649,7 @@ dependencies = [
"arrow",
"dora-core",
"dora-download",
"dora-message",
"dora-metrics",
"dora-node-api",
"dora-operator-api-python",


+ 7
- 2
Cargo.toml View File

@@ -60,18 +60,22 @@ dora-metrics = { version = "0.3.5", path = "libraries/extensions/telemetry/metri
dora-download = { version = "0.3.5", path = "libraries/extensions/download" }
shared-memory-server = { version = "0.3.5", path = "libraries/shared-memory-server" }
communication-layer-request-reply = { version = "0.3.5", path = "libraries/communication-layer/request-reply" }
dora-message = { version = "0.3.5", path = "libraries/message" }
dora-runtime = { version = "0.3.5", path = "binaries/runtime" }
dora-daemon = { version = "0.3.5", path = "binaries/daemon" }
dora-coordinator = { version = "0.3.5", path = "binaries/coordinator" }
dora-ros2-bridge = { path = "libraries/extensions/ros2-bridge" }
dora-ros2-bridge-msg-gen = { path = "libraries/extensions/ros2-bridge/msg-gen" }
dora-ros2-bridge-python = { path = "libraries/extensions/ros2-bridge/python" }
dora-message = { version = "0.3.5", path = "libraries/message" }
arrow = { version = "52" }
arrow-schema = { version = "52" }
arrow-data = { version = "52" }
arrow-array = { version = "52" }
pyo3 = { version = "0.21", features = ["eyre", "abi3-py37", "multiple-pymethods"] }
pyo3 = { version = "0.21", features = [
"eyre",
"abi3-py37",
"multiple-pymethods",
] }
pythonize = "0.21"

[package]
@@ -90,6 +94,7 @@ eyre = "0.6.8"
tokio = "1.24.2"
dora-coordinator = { workspace = true }
dora-core = { workspace = true }
dora-message = { workspace = true }
dora-tracing = { workspace = true }
dora-download = { workspace = true }
dunce = "1.0.2"


+ 1
- 2
apis/python/node/src/lib.rs View File

@@ -5,9 +5,8 @@ use std::time::Duration;

use arrow::pyarrow::{FromPyArrow, ToPyArrow};
use dora_node_api::dora_core::config::NodeId;
use dora_node_api::dora_core::daemon_messages::DataflowId;
use dora_node_api::merged::{MergeExternalSend, MergedEvent};
use dora_node_api::{DoraNode, EventStream};
use dora_node_api::{DataflowId, DoraNode, EventStream};
use dora_operator_api_python::{pydict_to_metadata, DelayedCleanup, NodeCleanupHandle, PyEvent};
use dora_ros2_bridge_python::Ros2Subscription;
use eyre::Context;


+ 1
- 0
apis/rust/node/Cargo.toml View File

@@ -12,6 +12,7 @@ tracing = ["dep:dora-tracing"]

[dependencies]
dora-core = { workspace = true }
dora-message = { workspace = true }
shared-memory-server = { workspace = true }
eyre = "0.6.7"
serde_yaml = "0.8.23"


+ 7
- 10
apis/rust/node/src/daemon_connection/mod.rs View File

@@ -1,7 +1,8 @@
use dora_core::{
config::NodeId,
daemon_messages::{DaemonReply, DaemonRequest, DataflowId, Timestamped},
message::uhlc::Timestamp,
use dora_core::{config::NodeId, uhlc::Timestamp};
use dora_message::{
daemon_to_node::DaemonReply,
node_to_daemon::{DaemonRequest, NodeRegisterRequest, Timestamped},
DataflowId,
};
use eyre::{bail, eyre, Context};
use shared_memory_server::{ShmemClient, ShmemConf};
@@ -58,11 +59,7 @@ impl DaemonChannel {
timestamp: Timestamp,
) -> eyre::Result<()> {
let msg = Timestamped {
inner: DaemonRequest::Register {
dataflow_id,
node_id,
dora_version: env!("CARGO_PKG_VERSION").to_owned(),
},
inner: DaemonRequest::Register(NodeRegisterRequest::new(dataflow_id, node_id)),
timestamp,
};
let reply = self
@@ -70,7 +67,7 @@ impl DaemonChannel {
.wrap_err("failed to send register request to dora-daemon")?;

match reply {
dora_core::daemon_messages::DaemonReply::Result(result) => result
DaemonReply::Result(result) => result
.map_err(|e| eyre!(e))
.wrap_err("failed to register node with dora-daemon")?,
other => bail!("unexpected register reply: {other:?}"),


+ 4
- 1
apis/rust/node/src/daemon_connection/tcp.rs View File

@@ -1,4 +1,7 @@
use dora_core::daemon_messages::{DaemonReply, DaemonRequest, Timestamped};
use dora_message::{
daemon_to_node::DaemonReply,
node_to_daemon::{DaemonRequest, Timestamped},
};
use eyre::{eyre, Context};
use std::{
io::{Read, Write},


+ 4
- 1
apis/rust/node/src/daemon_connection/unix_domain.rs View File

@@ -1,4 +1,7 @@
use dora_core::daemon_messages::{DaemonReply, DaemonRequest, Timestamped};
use dora_message::{
daemon_to_node::DaemonReply,
node_to_daemon::{DaemonRequest, Timestamped},
};
use eyre::{eyre, Context};
use std::{
io::{Read, Write},


+ 2
- 4
apis/rust/node/src/event_stream/event.rs View File

@@ -2,10 +2,8 @@ use std::{ptr::NonNull, sync::Arc};

use aligned_vec::{AVec, ConstAlign};
use dora_arrow_convert::{ArrowData, IntoArrow};
use dora_core::{
config::{DataId, OperatorId},
message::{ArrowTypeInfo, BufferOffset, Metadata},
};
use dora_core::config::{DataId, OperatorId};
use dora_message::metadata::{ArrowTypeInfo, BufferOffset, Metadata};
use eyre::{Context, Result};
use shared_memory_extended::{Shmem, ShmemConf};



+ 12
- 15
apis/rust/node/src/event_stream/mod.rs View File

@@ -1,5 +1,10 @@
use std::{sync::Arc, time::Duration};

use dora_message::{
daemon_to_node::{DaemonCommunication, DaemonReply, DataMessage, NodeEvent},
node_to_daemon::{DaemonRequest, Timestamped},
DataflowId,
};
pub use event::{Event, MappedInputData, RawData};
use futures::{
future::{select, Either},
@@ -12,13 +17,7 @@ use self::{
thread::{EventItem, EventStreamThreadHandle},
};
use crate::daemon_connection::DaemonChannel;
use dora_core::{
config::NodeId,
daemon_messages::{
self, DaemonCommunication, DaemonRequest, DataflowId, NodeEvent, Timestamped,
},
message::uhlc,
};
use dora_core::{config::NodeId, uhlc};
use eyre::{eyre, Context};

mod event;
@@ -97,8 +96,8 @@ impl EventStream {
.wrap_err("failed to create subscription with dora-daemon")?;

match reply {
daemon_messages::DaemonReply::Result(Ok(())) => {}
daemon_messages::DaemonReply::Result(Err(err)) => {
DaemonReply::Result(Ok(())) => {}
DaemonReply::Result(Err(err)) => {
eyre::bail!("subscribe failed: {err}")
}
other => eyre::bail!("unexpected subscribe reply: {other:?}"),
@@ -151,8 +150,8 @@ impl EventStream {
NodeEvent::Input { id, metadata, data } => {
let data = match data {
None => Ok(None),
Some(daemon_messages::DataMessage::Vec(v)) => Ok(Some(RawData::Vec(v))),
Some(daemon_messages::DataMessage::SharedMemory {
Some(DataMessage::Vec(v)) => Ok(Some(RawData::Vec(v))),
Some(DataMessage::SharedMemory {
shared_memory_id,
len,
drop_token: _, // handled in `event_stream_loop`
@@ -225,10 +224,8 @@ impl Drop for EventStream {
.map_err(|e| eyre!(e))
.wrap_err("failed to signal event stream closure to dora-daemon")
.and_then(|r| match r {
daemon_messages::DaemonReply::Result(Ok(())) => Ok(()),
daemon_messages::DaemonReply::Result(Err(err)) => {
Err(eyre!("EventStreamClosed failed: {err}"))
}
DaemonReply::Result(Ok(())) => Ok(()),
DaemonReply::Result(Err(err)) => Err(eyre!("EventStreamClosed failed: {err}")),
other => Err(eyre!("unexpected EventStreamClosed reply: {other:?}")),
});
if let Err(err) = result {


+ 6
- 3
apis/rust/node/src/event_stream/thread.rs View File

@@ -1,7 +1,10 @@
use dora_core::{
config::NodeId,
daemon_messages::{DaemonReply, DaemonRequest, DropToken, NodeEvent, Timestamped},
message::uhlc::{self, Timestamp},
uhlc::{self, Timestamp},
};
use dora_message::{
daemon_to_node::{DaemonReply, NodeEvent},
node_to_daemon::{DaemonRequest, DropToken, Timestamped},
};
use eyre::{eyre, Context};
use flume::RecvTimeoutError;
@@ -263,7 +266,7 @@ fn report_drop_tokens(
timestamp,
};
match channel.request(&daemon_request)? {
dora_core::daemon_messages::DaemonReply::Empty => Ok(()),
DaemonReply::Empty => Ok(()),
other => Err(eyre!("unexpected ReportDropTokens reply: {other:?}")),
}
}

+ 5
- 2
apis/rust/node/src/lib.rs View File

@@ -15,8 +15,11 @@
//!
pub use arrow;
pub use dora_arrow_convert::*;
pub use dora_core;
pub use dora_core::message::{uhlc, Metadata, MetadataParameters, Parameter};
pub use dora_core::{self, uhlc};
pub use dora_message::{
metadata::{Metadata, MetadataParameters, Parameter},
DataflowId,
};
pub use event_stream::{merged, Event, EventStream, MappedInputData, RawData};
pub use flume::Receiver;
pub use node::{arrow_utils, DataSample, DoraNode, ZERO_COPY_THRESHOLD};


+ 1
- 1
apis/rust/node/src/node/arrow_utils.rs View File

@@ -1,5 +1,5 @@
use arrow::array::{ArrayData, BufferSpec};
use dora_core::message::{ArrowTypeInfo, BufferOffset};
use dora_message::metadata::{ArrowTypeInfo, BufferOffset};

pub fn required_data_size(array: &ArrayData) -> usize {
let mut next_offset = 0;


+ 10
- 5
apis/rust/node/src/node/control_channel.rs View File

@@ -3,8 +3,13 @@ use std::sync::Arc;
use crate::daemon_connection::DaemonChannel;
use dora_core::{
config::{DataId, NodeId},
daemon_messages::{DaemonCommunication, DaemonRequest, DataMessage, DataflowId, Timestamped},
message::{uhlc::HLC, Metadata},
uhlc::HLC,
};
use dora_message::{
daemon_to_node::{DaemonCommunication, DaemonReply},
metadata::Metadata,
node_to_daemon::{DaemonRequest, DataMessage, Timestamped},
DataflowId,
};
use eyre::{bail, eyre, Context};

@@ -60,7 +65,7 @@ impl ControlChannel {
})
.wrap_err("failed to report outputs done to dora-daemon")?;
match reply {
dora_core::daemon_messages::DaemonReply::Result(result) => result
DaemonReply::Result(result) => result
.map_err(|e| eyre!(e))
.wrap_err("failed to report outputs done event to dora-daemon")?,
other => bail!("unexpected outputs done reply: {other:?}"),
@@ -77,7 +82,7 @@ impl ControlChannel {
})
.wrap_err("failed to report closed outputs to dora-daemon")?;
match reply {
dora_core::daemon_messages::DaemonReply::Result(result) => result
DaemonReply::Result(result) => result
.map_err(|e| eyre!(e))
.wrap_err("failed to receive closed outputs reply from dora-daemon")?,
other => bail!("unexpected closed outputs reply: {other:?}"),
@@ -104,7 +109,7 @@ impl ControlChannel {
})
.wrap_err("failed to send SendMessage request to dora-daemon")?;
match reply {
dora_core::daemon_messages::DaemonReply::Empty => Ok(()),
DaemonReply::Empty => Ok(()),
other => bail!("unexpected SendMessage reply: {other:?}"),
}
}


+ 7
- 9
apis/rust/node/src/node/drop_stream.rs View File

@@ -1,13 +1,11 @@
use std::{sync::Arc, time::Duration};

use crate::daemon_connection::DaemonChannel;
use dora_core::{
config::NodeId,
daemon_messages::{
self, DaemonCommunication, DaemonReply, DaemonRequest, DataflowId, DropToken,
NodeDropEvent, Timestamped,
},
message::uhlc,
use dora_core::{config::NodeId, uhlc};
use dora_message::{
daemon_to_node::{DaemonCommunication, DaemonReply, NodeDropEvent},
node_to_daemon::{DaemonRequest, DropToken, Timestamped},
DataflowId,
};
use eyre::{eyre, Context};
use flume::RecvTimeoutError;
@@ -64,8 +62,8 @@ impl DropStream {
.wrap_err("failed to create subscription with dora-daemon")?;

match reply {
daemon_messages::DaemonReply::Result(Ok(())) => {}
daemon_messages::DaemonReply::Result(Err(err)) => {
DaemonReply::Result(Ok(())) => {}
DaemonReply::Result(Err(err)) => {
eyre::bail!("drop subscribe failed: {err}")
}
other => eyre::bail!("unexpected drop subscribe reply: {other:?}"),


+ 9
- 4
apis/rust/node/src/node/mod.rs View File

@@ -9,12 +9,17 @@ use aligned_vec::{AVec, ConstAlign};
use arrow::array::Array;
use dora_core::{
config::{DataId, NodeId, NodeRunConfig},
daemon_messages::{DaemonRequest, DataMessage, DataflowId, DropToken, NodeConfig, Timestamped},
descriptor::Descriptor,
message::{uhlc, ArrowTypeInfo, Metadata, MetadataParameters},
topics::{DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, LOCALHOST},
uhlc,
};

use dora_message::{
daemon_to_node::{DaemonReply, NodeConfig},
metadata::{ArrowTypeInfo, Metadata, MetadataParameters},
node_to_daemon::{DaemonRequest, DataMessage, DropToken, Timestamped},
DataflowId,
};
use eyre::{bail, WrapErr};
use shared_memory_extended::{Shmem, ShmemConf};
use std::{
@@ -94,10 +99,10 @@ impl DoraNode {
})
.wrap_err("failed to request node config from daemon")?;
match reply {
dora_core::daemon_messages::DaemonReply::NodeConfig {
DaemonReply::NodeConfig {
result: Ok(node_config),
} => Self::init(node_config),
dora_core::daemon_messages::DaemonReply::NodeConfig { result: Err(error) } => {
DaemonReply::NodeConfig { result: Err(error) } => {
bail!("failed to get node config from daemon: {error}")
}
_ => bail!("unexpected reply from daemon"),


+ 1
- 0
binaries/cli/Cargo.toml View File

@@ -20,6 +20,7 @@ tracing = ["dep:dora-tracing"]
clap = { version = "4.0.3", features = ["derive"] }
eyre = "0.6.8"
dora-core = { workspace = true }
dora-message = { workspace = true }
dora-node-api-c = { workspace = true }
dora-operator-api-c = { workspace = true }
serde = { version = "1.0.136", features = ["derive"] }


+ 4
- 5
binaries/cli/src/attach.rs View File

@@ -1,10 +1,9 @@
use colored::Colorize;
use communication_layer_request_reply::{TcpConnection, TcpRequestReplyConnection};
use dora_core::{
coordinator_messages::LogMessage,
descriptor::{resolve_path, CoreNodeKind, Descriptor},
topics::{ControlRequest, ControlRequestReply},
};
use dora_core::descriptor::{resolve_path, CoreNodeKind, Descriptor};
use dora_message::cli_to_coordinator::ControlRequest;
use dora_message::common::LogMessage;
use dora_message::coordinator_to_cli::ControlRequestReply;
use eyre::Context;
use notify::event::ModifyKind;
use notify::{Config, Event as NotifyEvent, EventKind, RecommendedWatcher, RecursiveMode, Watcher};


+ 1
- 1
binaries/cli/src/check.rs View File

@@ -1,6 +1,6 @@
use crate::connect_to_coordinator;
use communication_layer_request_reply::TcpRequestReplyConnection;
use dora_core::topics::{ControlRequest, ControlRequestReply};
use dora_message::{cli_to_coordinator::ControlRequest, coordinator_to_cli::ControlRequestReply};
use eyre::{bail, Context};
use std::{
io::{IsTerminal, Write},


+ 1
- 1
binaries/cli/src/formatting.rs View File

@@ -1,4 +1,4 @@
use dora_core::topics::{DataflowResult, NodeErrorCause};
use dora_message::{common::NodeErrorCause, coordinator_to_cli::DataflowResult};

pub struct FormatDataflowError<'a>(pub &'a DataflowResult);



+ 1
- 1
binaries/cli/src/logs.rs View File

@@ -1,5 +1,5 @@
use communication_layer_request_reply::TcpRequestReplyConnection;
use dora_core::topics::{ControlRequest, ControlRequestReply};
use dora_message::{cli_to_coordinator::ControlRequest, coordinator_to_cli::ControlRequestReply};
use eyre::{bail, Context, Result};
use uuid::Uuid;



+ 10
- 9
binaries/cli/src/main.rs View File

@@ -6,11 +6,15 @@ use dora_coordinator::Event;
use dora_core::{
descriptor::Descriptor,
topics::{
ControlRequest, ControlRequestReply, DataflowList, DORA_COORDINATOR_PORT_CONTROL_DEFAULT,
DORA_COORDINATOR_PORT_DEFAULT, DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT,
DORA_COORDINATOR_PORT_CONTROL_DEFAULT, DORA_COORDINATOR_PORT_DEFAULT,
DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT,
},
};
use dora_daemon::Daemon;
use dora_message::{
cli_to_coordinator::ControlRequest,
coordinator_to_cli::{ControlRequestReply, DataflowList, DataflowResult, DataflowStatus},
};
#[cfg(feature = "tracing")]
use dora_tracing::set_up_tracing;
use dora_tracing::set_up_tracing_opts;
@@ -575,10 +579,7 @@ fn stop_dataflow(
}
}

fn handle_dataflow_result(
result: dora_core::topics::DataflowResult,
uuid: Option<Uuid>,
) -> Result<(), eyre::Error> {
fn handle_dataflow_result(result: DataflowResult, uuid: Option<Uuid>) -> Result<(), eyre::Error> {
if result.is_ok() {
Ok(())
} else {
@@ -627,9 +628,9 @@ fn list(session: &mut TcpRequestReplyConnection) -> Result<(), eyre::ErrReport>
let uuid = entry.id.uuid;
let name = entry.id.name.unwrap_or_default();
let status = match entry.status {
dora_core::topics::DataflowStatus::Running => "Running",
dora_core::topics::DataflowStatus::Finished => "Succeeded",
dora_core::topics::DataflowStatus::Failed => "Failed",
DataflowStatus::Running => "Running",
DataflowStatus::Finished => "Succeeded",
DataflowStatus::Failed => "Failed",
};
tw.write_all(format!("{uuid}\t{name}\t{status}\n").as_bytes())?;
}


+ 2
- 1
binaries/cli/src/up.rs View File

@@ -1,5 +1,6 @@
use crate::{check::daemon_running, connect_to_coordinator, LOCALHOST};
use dora_core::topics::{ControlRequest, DORA_COORDINATOR_PORT_CONTROL_DEFAULT};
use dora_core::topics::DORA_COORDINATOR_PORT_CONTROL_DEFAULT;
use dora_message::cli_to_coordinator::ControlRequest;
use eyre::Context;
use std::{fs, net::SocketAddr, path::Path, process::Command, time::Duration};
#[derive(Debug, Default, serde::Serialize, serde::Deserialize)]


+ 1
- 0
binaries/coordinator/Cargo.toml View File

@@ -26,3 +26,4 @@ serde_json = "1.0.86"
names = "0.14.0"
ctrlc = "3.2.5"
log = { version = "0.4.21", features = ["serde"] }
dora-message = { workspace = true }

+ 1
- 1
binaries/coordinator/src/control.rs View File

@@ -2,7 +2,7 @@ use crate::{
tcp_utils::{tcp_receive, tcp_send},
Event,
};
use dora_core::topics::{ControlRequest, ControlRequestReply};
use dora_message::{cli_to_coordinator::ControlRequest, coordinator_to_cli::ControlRequestReply};
use eyre::{eyre, Context};
use futures::{
future::{self, Either},


+ 23
- 31
binaries/coordinator/src/lib.rs View File

@@ -5,14 +5,17 @@ use crate::{
pub use control::ControlEvent;
use dora_core::{
config::{NodeId, OperatorId},
coordinator_messages::{LogMessage, RegisterResult},
daemon_messages::{DaemonCoordinatorEvent, DaemonCoordinatorReply, Timestamped},
descriptor::{Descriptor, ResolvedNode},
message::uhlc::{self, HLC},
topics::{
ControlRequest, ControlRequestReply, DataflowDaemonResult, DataflowId, DataflowListEntry,
DataflowResult,
uhlc::{self, HLC},
};
use dora_message::{
cli_to_coordinator::ControlRequest,
coordinator_to_cli::{
ControlRequestReply, DataflowIdAndName, DataflowList, DataflowListEntry, DataflowResult,
DataflowStatus, LogMessage,
},
coordinator_to_daemon::{DaemonCoordinatorEvent, RegisterResult, Timestamped},
daemon_to_coordinator::{DaemonCoordinatorReply, DataflowDaemonResult},
};
use eyre::{bail, eyre, ContextCompat, WrapErr};
use futures::{stream::FuturesUnordered, Future, Stream, StreamExt};
@@ -169,26 +172,17 @@ async fn start_inner(
tracing::warn!("{:?}", err.wrap_err("failed to connect to dora-daemon"));
}
Event::Daemon(event) => match event {
DaemonEvent::Register {
DaemonRequest::Register {
machine_id,
mut connection,
dora_version: daemon_version,
version_check_result,
listen_port,
} => {
let coordinator_version: &&str = &env!("CARGO_PKG_VERSION");
let version_check = if &daemon_version == coordinator_version {
Ok(())
} else {
Err(format!(
"version mismatch: daemon v{daemon_version} is \
not compatible with coordinator v{coordinator_version}"
))
};
let peer_ip = connection
.peer_addr()
.map(|addr| addr.ip())
.map_err(|err| format!("failed to get peer addr of connection: {err}"));
let register_result = version_check.and(peer_ip);
let register_result = version_check_result.and(peer_ip);

let reply: Timestamped<RegisterResult> = Timestamped {
inner: match &register_result {
@@ -470,30 +464,28 @@ async fn start_inner(
dataflows.sort_by_key(|d| (&d.name, d.uuid));

let running = dataflows.into_iter().map(|d| DataflowListEntry {
id: DataflowId {
id: DataflowIdAndName {
uuid: d.uuid,
name: d.name.clone(),
},
status: dora_core::topics::DataflowStatus::Running,
status: DataflowStatus::Running,
});
let finished_failed =
dataflow_results.iter().map(|(&uuid, results)| {
let name =
archived_dataflows.get(&uuid).and_then(|d| d.name.clone());
let id = DataflowId { uuid, name };
let id = DataflowIdAndName { uuid, name };
let status = if results.values().all(|r| r.is_ok()) {
dora_core::topics::DataflowStatus::Finished
DataflowStatus::Finished
} else {
dora_core::topics::DataflowStatus::Failed
DataflowStatus::Failed
};
DataflowListEntry { id, status }
});

let reply = Ok(ControlRequestReply::DataflowList(
dora_core::topics::DataflowList(
running.chain(finished_failed).collect(),
),
));
let reply = Ok(ControlRequestReply::DataflowList(DataflowList(
running.chain(finished_failed).collect(),
)));
let _ = reply_sender.send(reply);
}
ControlRequest::DaemonConnected => {
@@ -965,7 +957,7 @@ pub enum Event {
DaemonHeartbeat { machine_id: String },
Dataflow { uuid: Uuid, event: DataflowEvent },
Control(ControlEvent),
Daemon(DaemonEvent),
Daemon(DaemonRequest),
DaemonHeartbeatInterval,
CtrlC,
Log(LogMessage),
@@ -995,9 +987,9 @@ pub enum DataflowEvent {
}

#[derive(Debug)]
pub enum DaemonEvent {
pub enum DaemonRequest {
Register {
dora_version: String,
version_check_result: Result<(), String>,
machine_id: String,
connection: TcpStream,
listen_port: u16,


+ 14
- 17
binaries/coordinator/src/listener.rs View File

@@ -1,5 +1,6 @@
use crate::{tcp_utils::tcp_receive, DaemonEvent, DataflowEvent, Event};
use dora_core::{coordinator_messages, daemon_messages::Timestamped, message::uhlc::HLC};
use crate::{tcp_utils::tcp_receive, DaemonRequest, DataflowEvent, Event};
use dora_core::uhlc::HLC;
use dora_message::daemon_to_coordinator::{CoordinatorRequest, DaemonEvent, Timestamped};
use eyre::Context;
use std::{io::ErrorKind, net::SocketAddr, sync::Arc};
use tokio::{
@@ -34,7 +35,7 @@ pub async fn handle_connection(
continue;
}
};
let message: Timestamped<coordinator_messages::CoordinatorRequest> =
let message: Timestamped<CoordinatorRequest> =
match serde_json::from_slice(&raw).wrap_err("failed to deserialize node message") {
Ok(e) => e,
Err(err) => {
@@ -49,22 +50,18 @@ pub async fn handle_connection(

// handle the message and translate it to a DaemonEvent
match message.inner {
coordinator_messages::CoordinatorRequest::Register {
machine_id,
dora_version,
listen_port,
} => {
let event = DaemonEvent::Register {
dora_version,
machine_id,
CoordinatorRequest::Register(register_request) => {
let event = DaemonRequest::Register {
connection,
listen_port,
version_check_result: register_request.check_version(),
machine_id: register_request.machine_id,
listen_port: register_request.listen_port,
};
let _ = events_tx.send(Event::Daemon(event)).await;
break;
}
coordinator_messages::CoordinatorRequest::Event { machine_id, event } => match event {
coordinator_messages::DaemonEvent::AllNodesReady {
CoordinatorRequest::Event { machine_id, event } => match event {
DaemonEvent::AllNodesReady {
dataflow_id,
exited_before_subscribe,
} => {
@@ -79,7 +76,7 @@ pub async fn handle_connection(
break;
}
}
coordinator_messages::DaemonEvent::AllNodesFinished {
DaemonEvent::AllNodesFinished {
dataflow_id,
result,
} => {
@@ -91,13 +88,13 @@ pub async fn handle_connection(
break;
}
}
coordinator_messages::DaemonEvent::Heartbeat => {
DaemonEvent::Heartbeat => {
let event = Event::DaemonHeartbeat { machine_id };
if events_tx.send(event).await.is_err() {
break;
}
}
coordinator_messages::DaemonEvent::Log(message) => {
DaemonEvent::Log(message) => {
let event = Event::Log(message);
if events_tx.send(event).await.is_err() {
break;


+ 1
- 1
binaries/coordinator/src/log_subscriber.rs View File

@@ -1,4 +1,4 @@
use dora_core::coordinator_messages::LogMessage;
use dora_message::coordinator_to_cli::LogMessage;
use eyre::{Context, ContextCompat};

use crate::tcp_utils::tcp_send;


+ 5
- 4
binaries/coordinator/src/run/mod.rs View File

@@ -4,11 +4,12 @@ use crate::{
};

use dora_core::{
daemon_messages::{
DaemonCoordinatorEvent, DaemonCoordinatorReply, SpawnDataflowNodes, Timestamped,
},
descriptor::{Descriptor, ResolvedNode},
message::uhlc::HLC,
uhlc::HLC,
};
use dora_message::{
coordinator_to_daemon::{DaemonCoordinatorEvent, SpawnDataflowNodes, Timestamped},
daemon_to_coordinator::DaemonCoordinatorReply,
};
use eyre::{bail, eyre, ContextCompat, WrapErr};
use std::{


+ 1
- 0
binaries/daemon/Cargo.toml View File

@@ -29,6 +29,7 @@ dora-download = { workspace = true }
dora-tracing = { workspace = true, optional = true }
dora-arrow-convert = { workspace = true }
dora-node-api = { workspace = true }
dora-message = { workspace = true }
serde_yaml = "0.8.23"
uuid = { version = "1.7", features = ["v7"] }
futures = "0.3.25"


+ 6
- 9
binaries/daemon/src/coordinator.rs View File

@@ -2,10 +2,11 @@ use crate::{
socket_stream_utils::{socket_stream_receive, socket_stream_send},
DaemonCoordinatorEvent,
};
use dora_core::{
coordinator_messages::{CoordinatorRequest, RegisterResult},
daemon_messages::{DaemonCoordinatorReply, Timestamped},
message::uhlc::HLC,
use dora_core::uhlc::HLC;
use dora_message::{
common::Timestamped,
coordinator_to_daemon::RegisterResult,
daemon_to_coordinator::{CoordinatorRequest, DaemonCoordinatorReply, DaemonRegisterRequest},
};
use eyre::{eyre, Context};
use std::{io::ErrorKind, net::SocketAddr};
@@ -34,11 +35,7 @@ pub async fn register(
.set_nodelay(true)
.wrap_err("failed to set TCP_NODELAY")?;
let register = serde_json::to_vec(&Timestamped {
inner: CoordinatorRequest::Register {
dora_version: env!("CARGO_PKG_VERSION").to_owned(),
machine_id,
listen_port,
},
inner: CoordinatorRequest::Register(DaemonRegisterRequest::new(machine_id, listen_port)),
timestamp: clock.new_timestamp(),
})?;
socket_stream_send(&mut stream, &register)


+ 1
- 1
binaries/daemon/src/inter_daemon.rs View File

@@ -1,5 +1,5 @@
use crate::socket_stream_utils::{socket_stream_receive, socket_stream_send};
use dora_core::daemon_messages::{InterDaemonEvent, Timestamped};
use dora_message::{common::Timestamped, daemon_to_daemon::InterDaemonEvent};
use eyre::{Context, ContextCompat};
use std::{collections::BTreeMap, io::ErrorKind, net::SocketAddr};
use tokio::net::{TcpListener, TcpStream};


+ 52
- 62
binaries/daemon/src/lib.rs View File

@@ -1,28 +1,25 @@
use aligned_vec::{AVec, ConstAlign};
use coordinator::CoordinatorEvent;
use crossbeam::queue::ArrayQueue;
use dora_core::config::{Input, OperatorId};
use dora_core::coordinator_messages::{CoordinatorRequest, Level, LogMessage};
use dora_core::daemon_messages::{
DataMessage, DynamicNodeEvent, InterDaemonEvent, NodeConfig, Timestamped,
};
use dora_core::descriptor::runtime_node_inputs;
use dora_core::message::uhlc::{self, HLC};
use dora_core::message::{ArrowTypeInfo, Metadata};
use dora_core::topics::LOCALHOST;
use dora_core::topics::{
DataflowDaemonResult, DataflowResult, NodeError, NodeErrorCause, NodeExitStatus,
};
use dora_core::{
config::{DataId, InputMapping, NodeId},
coordinator_messages::DaemonEvent,
daemon_messages::{
self, DaemonCoordinatorEvent, DaemonCoordinatorReply, DaemonReply, DataflowId, DropToken,
SpawnDataflowNodes,
config::{DataId, Input, InputMapping, NodeId, OperatorId},
descriptor::{runtime_node_inputs, CoreNodeKind, Descriptor, ResolvedNode},
topics::LOCALHOST,
uhlc::{self, HLC},
};
use dora_message::{
common::{DataMessage, DropToken, LogLevel, NodeError, NodeErrorCause, NodeExitStatus},
coordinator_to_cli::DataflowResult,
coordinator_to_daemon::{DaemonCoordinatorEvent, SpawnDataflowNodes},
daemon_to_coordinator::{
CoordinatorRequest, DaemonCoordinatorReply, DaemonEvent, DataflowDaemonResult, LogMessage,
},
descriptor::{CoreNodeKind, Descriptor, ResolvedNode},
daemon_to_daemon::InterDaemonEvent,
daemon_to_node::{DaemonReply, NodeConfig, NodeDropEvent, NodeEvent},
metadata::{self, ArrowTypeInfo},
node_to_daemon::{DynamicNodeEvent, Timestamped},
DataflowId,
};

use dora_node_api::Parameter;
use eyre::{bail, eyre, Context, ContextCompat, Result};
use futures::{future, stream, FutureExt, TryFutureExt};
@@ -32,21 +29,23 @@ use local_listener::DynamicNodeEventWrapper;
use pending::PendingNodes;
use shared_memory_server::ShmemConf;
use socket_stream_utils::socket_stream_send;
use std::sync::Arc;
use std::time::Instant;
use std::{
collections::{BTreeMap, BTreeSet, HashMap},
net::SocketAddr,
path::{Path, PathBuf},
time::Duration,
sync::Arc,
time::{Duration, Instant},
};
use sysinfo::Pid;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use tokio::net::TcpStream;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::oneshot::Sender;
use tokio::sync::{mpsc, oneshot};
use tokio::{
fs::File,
io::AsyncReadExt,
net::TcpStream,
sync::{
mpsc::{self, UnboundedSender},
oneshot::{self, Sender},
},
};
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
use tracing::{error, warn};
use uuid::{NoContext, Timestamp, Uuid};
@@ -678,7 +677,7 @@ impl Daemon {
log_messages.push(LogMessage {
dataflow_id,
node_id: Some(node_id.clone()),
level: Level::Error,
level: LogLevel::Error,
target: None,
module_path: None,
file: None,
@@ -923,11 +922,7 @@ impl Daemon {
format!("Reload failed: no running dataflow with ID `{dataflow_id}`")
})?;
if let Some(channel) = dataflow.subscribe_channels.get(&node_id) {
match send_with_timestamp(
channel,
daemon_messages::NodeEvent::Reload { operator_id },
&self.clock,
) {
match send_with_timestamp(channel, NodeEvent::Reload { operator_id }, &self.clock) {
Ok(()) => {}
Err(_) => {
dataflow.subscribe_channels.remove(&node_id);
@@ -942,7 +937,7 @@ impl Daemon {
dataflow_id: Uuid,
node_id: NodeId,
output_id: DataId,
metadata: dora_core::message::Metadata,
metadata: dora_message::metadata::Metadata,
data: Option<DataMessage>,
) -> Result<(), eyre::ErrReport> {
let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
@@ -990,7 +985,7 @@ impl Daemon {
async fn subscribe(
dataflow: &mut RunningDataflow,
node_id: NodeId,
event_sender: UnboundedSender<Timestamped<daemon_messages::NodeEvent>>,
event_sender: UnboundedSender<Timestamped<NodeEvent>>,
clock: &HLC,
) {
// some inputs might have been closed already -> report those events
@@ -1010,24 +1005,20 @@ impl Daemon {
for input_id in closed_inputs {
let _ = send_with_timestamp(
&event_sender,
daemon_messages::NodeEvent::InputClosed {
NodeEvent::InputClosed {
id: input_id.clone(),
},
clock,
);
}
if dataflow.open_inputs(&node_id).is_empty() {
let _ = send_with_timestamp(
&event_sender,
daemon_messages::NodeEvent::AllInputsClosed,
clock,
);
let _ = send_with_timestamp(&event_sender, NodeEvent::AllInputsClosed, clock);
}

// if a stop event was already sent for the dataflow, send it to
// the newly connected node too
if dataflow.stop_sent {
let _ = send_with_timestamp(&event_sender, daemon_messages::NodeEvent::Stop, clock);
let _ = send_with_timestamp(&event_sender, NodeEvent::Stop, clock);
}

dataflow.subscribe_channels.insert(node_id, event_sender);
@@ -1142,7 +1133,7 @@ impl Daemon {

let send_result = send_with_timestamp(
channel,
daemon_messages::NodeEvent::Input {
NodeEvent::Input {
id: input_id.clone(),
metadata: metadata.clone(),
data: None,
@@ -1189,7 +1180,7 @@ impl Daemon {

let send_result = send_with_timestamp(
channel,
daemon_messages::NodeEvent::Input {
NodeEvent::Input {
id: input_id.clone(),
metadata: metadata.clone(),
data: Some(message.clone()),
@@ -1263,9 +1254,9 @@ impl Daemon {
dataflow_id,
node_id: Some(node_id.clone()),
level: if node_result.is_ok() {
Level::Info
LogLevel::Info
} else {
Level::Error
LogLevel::Error
},
target: None,
module_path: None,
@@ -1304,7 +1295,7 @@ async fn send_output_to_local_receivers(
node_id: NodeId,
output_id: DataId,
dataflow: &mut RunningDataflow,
metadata: &dora_core::message::Metadata,
metadata: &metadata::Metadata,
data: Option<DataMessage>,
clock: &HLC,
) -> Result<Option<AVec<u8, ConstAlign<128>>>, eyre::ErrReport> {
@@ -1316,7 +1307,7 @@ async fn send_output_to_local_receivers(
let mut closed = Vec::new();
for (receiver_id, input_id) in local_receivers {
if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) {
let item = daemon_messages::NodeEvent::Input {
let item = NodeEvent::Input {
id: input_id.clone(),
metadata: metadata.clone(),
data: data.clone(),
@@ -1446,15 +1437,14 @@ fn close_input(
if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) {
let _ = send_with_timestamp(
channel,
daemon_messages::NodeEvent::InputClosed {
NodeEvent::InputClosed {
id: input_id.clone(),
},
clock,
);

if dataflow.open_inputs(receiver_id).is_empty() {
let _ =
send_with_timestamp(channel, daemon_messages::NodeEvent::AllInputsClosed, clock);
let _ = send_with_timestamp(channel, NodeEvent::AllInputsClosed, clock);
}
}
}
@@ -1470,8 +1460,8 @@ pub struct RunningDataflow {
/// Local nodes that are not started yet
pending_nodes: PendingNodes,

subscribe_channels: HashMap<NodeId, UnboundedSender<Timestamped<daemon_messages::NodeEvent>>>,
drop_channels: HashMap<NodeId, UnboundedSender<Timestamped<daemon_messages::NodeDropEvent>>>,
subscribe_channels: HashMap<NodeId, UnboundedSender<Timestamped<NodeEvent>>>,
drop_channels: HashMap<NodeId, UnboundedSender<Timestamped<NodeDropEvent>>>,
mappings: HashMap<OutputId, BTreeSet<InputId>>,
timers: BTreeMap<Duration, BTreeSet<InputId>>,
open_inputs: BTreeMap<NodeId, BTreeSet<DataId>>,
@@ -1553,7 +1543,7 @@ impl RunningDataflow {
Parameter::String("".into()),
);

let metadata = dora_core::message::Metadata::from_parameters(
let metadata = metadata::Metadata::from_parameters(
hlc.new_timestamp(),
ArrowTypeInfo::empty(),
parameters,
@@ -1597,7 +1587,7 @@ impl RunningDataflow {
.await?;

for (_node_id, channel) in self.subscribe_channels.drain() {
let _ = send_with_timestamp(&channel, daemon_messages::NodeEvent::Stop, clock);
let _ = send_with_timestamp(&channel, NodeEvent::Stop, clock);
}

let running_nodes = self.running_nodes.clone();
@@ -1637,7 +1627,7 @@ impl RunningDataflow {
let result = match self.drop_channels.get_mut(&info.owner) {
Some(channel) => send_with_timestamp(
channel,
daemon_messages::NodeDropEvent::OutputDropped { drop_token },
NodeDropEvent::OutputDropped { drop_token },
clock,
)
.wrap_err("send failed"),
@@ -1701,11 +1691,11 @@ pub enum DaemonNodeEvent {
reply_sender: oneshot::Sender<DaemonReply>,
},
Subscribe {
event_sender: UnboundedSender<Timestamped<daemon_messages::NodeEvent>>,
event_sender: UnboundedSender<Timestamped<NodeEvent>>,
reply_sender: oneshot::Sender<DaemonReply>,
},
SubscribeDrop {
event_sender: UnboundedSender<Timestamped<daemon_messages::NodeDropEvent>>,
event_sender: UnboundedSender<Timestamped<NodeDropEvent>>,
reply_sender: oneshot::Sender<DaemonReply>,
},
CloseOutputs {
@@ -1714,7 +1704,7 @@ pub enum DaemonNodeEvent {
},
SendOut {
output_id: DataId,
metadata: dora_core::message::Metadata,
metadata: metadata::Metadata,
data: Option<DataMessage>,
},
ReportDrop {
@@ -1730,13 +1720,13 @@ pub enum DoraEvent {
Timer {
dataflow_id: DataflowId,
interval: Duration,
metadata: dora_core::message::Metadata,
metadata: metadata::Metadata,
},
Logs {
dataflow_id: DataflowId,
output_id: OutputId,
message: DataMessage,
metadata: Metadata,
metadata: metadata::Metadata,
},
SpawnedNodeResult {
dataflow_id: DataflowId,


+ 4
- 1
binaries/daemon/src/local_listener.rs View File

@@ -1,5 +1,8 @@
use crate::socket_stream_utils::{socket_stream_receive, socket_stream_send};
use dora_core::daemon_messages::{DaemonReply, DaemonRequest, DynamicNodeEvent, Timestamped};
use dora_message::{
daemon_to_node::DaemonReply,
node_to_daemon::{DaemonRequest, DynamicNodeEvent, Timestamped},
};
use eyre::Context;
use std::{io::ErrorKind, net::SocketAddr};
use tokio::{


+ 12
- 23
binaries/daemon/src/node_communication/mod.rs View File

@@ -1,12 +1,14 @@
use crate::{DaemonNodeEvent, Event};
use dora_core::{
config::{DataId, LocalCommunicationConfig, NodeId},
daemon_messages::{
DaemonCommunication, DaemonReply, DaemonRequest, DataflowId, NodeDropEvent, NodeEvent,
Timestamped,
},
message::uhlc,
topics::LOCALHOST,
uhlc,
};
use dora_message::{
common::{DropToken, Timestamped},
daemon_to_node::{DaemonCommunication, DaemonReply, NodeDropEvent, NodeEvent},
node_to_daemon::DaemonRequest,
DataflowId,
};
use eyre::{eyre, Context};
use futures::{future, task, Future};
@@ -215,24 +217,14 @@ impl Listener {
}

match message.inner {
DaemonRequest::Register {
dataflow_id,
node_id,
dora_version: node_api_version,
} => {
let daemon_version = env!("CARGO_PKG_VERSION");
let result = if node_api_version == daemon_version {
Ok(())
} else {
Err(format!(
"version mismatch: node API v{node_api_version} is not compatible \
with daemon v{daemon_version}"
))
};
DaemonRequest::Register(register_request) => {
let result = register_request.check_version();
let send_result = connection
.send_reply(DaemonReply::Result(result.clone()))
.await
.wrap_err("failed to send register reply");
let dataflow_id = register_request.dataflow_id;
let node_id = register_request.node_id;
match (result, send_result) {
(Ok(()), Ok(())) => {
let mut listener = Listener {
@@ -517,10 +509,7 @@ impl Listener {
Ok(())
}

async fn report_drop_tokens(
&mut self,
drop_tokens: Vec<dora_core::daemon_messages::DropToken>,
) -> eyre::Result<()> {
async fn report_drop_tokens(&mut self, drop_tokens: Vec<DropToken>) -> eyre::Result<()> {
if !drop_tokens.is_empty() {
let event = Event::Node {
dataflow_id: self.dataflow_id,


+ 3
- 4
binaries/daemon/src/node_communication/shmem.rs View File

@@ -2,10 +2,9 @@ use std::{collections::BTreeMap, sync::Arc};

use super::{Connection, Listener};
use crate::Event;
use dora_core::{
config::DataId,
daemon_messages::{DaemonReply, DaemonRequest, Timestamped},
message::uhlc::HLC,
use dora_core::{config::DataId, uhlc::HLC};
use dora_message::{
common::Timestamped, daemon_to_node::DaemonReply, node_to_daemon::DaemonRequest,
};
use eyre::eyre;
use shared_memory_server::ShmemServer;


+ 3
- 4
binaries/daemon/src/node_communication/tcp.rs View File

@@ -5,10 +5,9 @@ use crate::{
socket_stream_utils::{socket_stream_receive, socket_stream_send},
Event,
};
use dora_core::{
config::DataId,
daemon_messages::{DaemonReply, DaemonRequest, Timestamped},
message::uhlc::HLC,
use dora_core::{config::DataId, uhlc::HLC};
use dora_message::{
common::Timestamped, daemon_to_node::DaemonReply, node_to_daemon::DaemonRequest,
};
use eyre::Context;
use tokio::{


+ 3
- 4
binaries/daemon/src/node_communication/unix_domain.rs View File

@@ -1,9 +1,8 @@
use std::{collections::BTreeMap, io::ErrorKind, sync::Arc};

use dora_core::{
config::DataId,
daemon_messages::{DaemonReply, DaemonRequest, Timestamped},
message::uhlc::HLC,
use dora_core::{config::DataId, uhlc::HLC};
use dora_message::{
common::Timestamped, daemon_to_node::DaemonReply, node_to_daemon::DaemonRequest,
};
use eyre::Context;
use tokio::{


+ 7
- 4
binaries/daemon/src/pending.rs View File

@@ -2,9 +2,12 @@ use std::collections::{BTreeSet, HashMap, HashSet};

use dora_core::{
config::NodeId,
coordinator_messages::{CoordinatorRequest, DaemonEvent, Level, LogMessage},
daemon_messages::{DaemonReply, DataflowId, Timestamped},
message::uhlc::{Timestamp, HLC},
uhlc::{Timestamp, HLC},
};
use dora_message::{
daemon_to_coordinator::{CoordinatorRequest, DaemonEvent, LogLevel, LogMessage, Timestamped},
daemon_to_node::DaemonReply,
DataflowId,
};
use eyre::{bail, Context};
use tokio::{net::TcpStream, sync::oneshot};
@@ -83,7 +86,7 @@ impl PendingNodes {
log.push(LogMessage {
dataflow_id: self.dataflow_id,
node_id: Some(node_id.clone()),
level: Level::Warn,
level: LogLevel::Warn,
target: None,
module_path: None,
file: None,


+ 8
- 4
binaries/daemon/src/spawn.rs View File

@@ -1,21 +1,25 @@
use crate::{
log, node_communication::spawn_listener_loop, node_inputs, DoraEvent, Event, NodeExitStatus,
OutputId, RunningNode,
log, node_communication::spawn_listener_loop, node_inputs, DoraEvent, Event, OutputId,
RunningNode,
};
use aligned_vec::{AVec, ConstAlign};
use crossbeam::queue::ArrayQueue;
use dora_arrow_convert::IntoArrow;
use dora_core::{
config::DataId,
daemon_messages::{DataMessage, DataflowId, NodeConfig, RuntimeConfig, Timestamped},
descriptor::{
resolve_path, source_is_url, Descriptor, OperatorDefinition, OperatorSource, PythonSource,
ResolvedNode, DYNAMIC_SOURCE, SHELL_SOURCE,
},
get_python_path,
message::uhlc::HLC,
uhlc::HLC,
};
use dora_download::download_file;
use dora_message::{
daemon_to_coordinator::{DataMessage, NodeExitStatus, Timestamped},
daemon_to_node::{NodeConfig, RuntimeConfig},
DataflowId,
};
use dora_node_api::{
arrow::array::ArrayData,
arrow_utils::{copy_array_into_sample, required_data_size},


+ 1
- 0
binaries/runtime/Cargo.toml View File

@@ -15,6 +15,7 @@ dora-operator-api-types = { workspace = true }
dora-core = { workspace = true }
dora-tracing = { workspace = true, optional = true }
dora-metrics = { workspace = true, optional = true }
dora-message = { workspace = true }
eyre = "0.6.8"
futures = "0.3.21"
futures-concurrency = "7.1.0"


+ 1
- 1
binaries/runtime/src/lib.rs View File

@@ -2,9 +2,9 @@

use dora_core::{
config::{DataId, OperatorId},
daemon_messages::{NodeConfig, RuntimeConfig},
descriptor::OperatorConfig,
};
use dora_message::daemon_to_node::{NodeConfig, RuntimeConfig};
use dora_metrics::init_meter_provider;
use dora_node_api::{DoraNode, Event};
use eyre::{bail, Context, Result};


+ 2
- 2
binaries/runtime/src/operator/mod.rs View File

@@ -1,9 +1,9 @@
use dora_core::{
config::{DataId, NodeId},
descriptor::{Descriptor, OperatorDefinition, OperatorSource},
message::{ArrowTypeInfo, MetadataParameters},
};
use dora_node_api::{DataSample, Event};
use dora_message::metadata::ArrowTypeInfo;
use dora_node_api::{DataSample, Event, MetadataParameters};
use eyre::{Context, Result};
use std::any::Any;
use tokio::sync::{mpsc::Sender, oneshot};


+ 1
- 1
binaries/runtime/src/operator/python.rs View File

@@ -290,7 +290,7 @@ mod callback_impl {
use super::SendOutputCallback;
use aligned_vec::{AVec, ConstAlign};
use arrow::{array::ArrayData, pyarrow::FromPyArrow};
use dora_core::message::ArrowTypeInfo;
use dora_message::metadata::ArrowTypeInfo;
use dora_node_api::{
arrow_utils::{copy_array_into_sample, required_data_size},
ZERO_COPY_THRESHOLD,


+ 8
- 5
examples/multiple-daemons/run.rs View File

@@ -1,10 +1,11 @@
use dora_coordinator::{ControlEvent, Event};
use dora_core::{
descriptor::Descriptor,
topics::{
ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_CONTROL_DEFAULT,
DORA_COORDINATOR_PORT_DEFAULT,
},
topics::{DORA_COORDINATOR_PORT_CONTROL_DEFAULT, DORA_COORDINATOR_PORT_DEFAULT},
};
use dora_message::{
cli_to_coordinator::ControlRequest,
coordinator_to_cli::{ControlRequestReply, DataflowIdAndName},
};
use dora_tracing::set_up_tracing;
use eyre::{bail, Context};
@@ -166,7 +167,9 @@ async fn connected_machines(
Ok(machines)
}

async fn running_dataflows(coordinator_events_tx: &Sender<Event>) -> eyre::Result<Vec<DataflowId>> {
async fn running_dataflows(
coordinator_events_tx: &Sender<Event>,
) -> eyre::Result<Vec<DataflowIdAndName>> {
let (reply_sender, reply) = oneshot::channel();
coordinator_events_tx
.send(Event::Control(ControlEvent::IncomingRequest {


+ 1
- 2
libraries/core/Cargo.toml View File

@@ -15,11 +15,10 @@ serde_yaml = "0.9.11"
once_cell = "1.13.0"
which = "5.0.0"
uuid = { version = "1.7", features = ["serde", "v7"] }
dora-message = { workspace = true }
tracing = "0.1"
serde-with-expand-env = "1.1.0"
tokio = { version = "1.24.1", features = ["fs", "process", "sync"] }
aligned-vec = { version = "0.5.0", features = ["serde"] }
schemars = "0.8.19"
serde_json = "1.0.117"
log = { version = "0.4.21", features = ["serde"] }
uhlc = "0.5.1"

+ 0
- 58
libraries/core/src/coordinator_messages.rs View File

@@ -1,58 +0,0 @@
use crate::{config::NodeId, daemon_messages::DataflowId, topics::DataflowDaemonResult};
use eyre::eyre;
pub use log::Level;

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub enum CoordinatorRequest {
Register {
dora_version: String,
machine_id: String,
listen_port: u16,
},
Event {
machine_id: String,
event: DaemonEvent,
},
}

#[derive(Debug, serde::Serialize, serde::Deserialize)]
#[must_use]
pub struct LogMessage {
pub dataflow_id: DataflowId,
pub node_id: Option<NodeId>,
pub level: log::Level,
pub target: Option<String>,
pub module_path: Option<String>,
pub file: Option<String>,
pub line: Option<u32>,
pub message: String,
}

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub enum DaemonEvent {
AllNodesReady {
dataflow_id: DataflowId,
exited_before_subscribe: Vec<NodeId>,
},
AllNodesFinished {
dataflow_id: DataflowId,
result: DataflowDaemonResult,
},
Heartbeat,
Log(LogMessage),
}

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub enum RegisterResult {
Ok,
Err(String),
}

impl RegisterResult {
pub fn to_result(self) -> eyre::Result<()> {
match self {
RegisterResult::Ok => Ok(()),
RegisterResult::Err(err) => Err(eyre!(err)),
}
}
}

+ 0
- 302
libraries/core/src/daemon_messages.rs View File

@@ -1,302 +0,0 @@
use std::{
collections::{BTreeMap, BTreeSet},
fmt,
net::SocketAddr,
path::PathBuf,
time::Duration,
};

use crate::{
config::{DataId, NodeId, NodeRunConfig, OperatorId},
descriptor::{Descriptor, OperatorDefinition, ResolvedNode},
};
use aligned_vec::{AVec, ConstAlign};
use dora_message::{uhlc, Metadata};
use uuid::{NoContext, Timestamp, Uuid};

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct NodeConfig {
pub dataflow_id: DataflowId,
pub node_id: NodeId,
pub run_config: NodeRunConfig,
pub daemon_communication: DaemonCommunication,
pub dataflow_descriptor: Descriptor,
pub dynamic: bool,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum DaemonCommunication {
Shmem {
daemon_control_region_id: SharedMemoryId,
daemon_drop_region_id: SharedMemoryId,
daemon_events_region_id: SharedMemoryId,
daemon_events_close_region_id: SharedMemoryId,
},
Tcp {
socket_addr: SocketAddr,
},
#[cfg(unix)]
UnixDomain {
socket_file: PathBuf,
},
}

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct RuntimeConfig {
pub node: NodeConfig,
pub operators: Vec<OperatorDefinition>,
}

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub enum DaemonRequest {
Register {
dataflow_id: DataflowId,
node_id: NodeId,
dora_version: String,
},
Subscribe,
SendMessage {
output_id: DataId,
metadata: Metadata,
data: Option<DataMessage>,
},
CloseOutputs(Vec<DataId>),
/// Signals that the node is finished sending outputs and that it received all
/// required drop tokens.
OutputsDone,
NextEvent {
drop_tokens: Vec<DropToken>,
},
ReportDropTokens {
drop_tokens: Vec<DropToken>,
},
SubscribeDrop,
NextFinishedDropTokens,
EventStreamDropped,
NodeConfig {
node_id: NodeId,
},
}

impl DaemonRequest {
pub fn expects_tcp_bincode_reply(&self) -> bool {
#[allow(clippy::match_like_matches_macro)]
match self {
DaemonRequest::SendMessage { .. }
| DaemonRequest::NodeConfig { .. }
| DaemonRequest::ReportDropTokens { .. } => false,
DaemonRequest::Register { .. }
| DaemonRequest::Subscribe
| DaemonRequest::CloseOutputs(_)
| DaemonRequest::OutputsDone
| DaemonRequest::NextEvent { .. }
| DaemonRequest::SubscribeDrop
| DaemonRequest::NextFinishedDropTokens
| DaemonRequest::EventStreamDropped => true,
}
}

pub fn expects_tcp_json_reply(&self) -> bool {
#[allow(clippy::match_like_matches_macro)]
match self {
DaemonRequest::NodeConfig { .. } => true,
DaemonRequest::Register { .. }
| DaemonRequest::Subscribe
| DaemonRequest::CloseOutputs(_)
| DaemonRequest::OutputsDone
| DaemonRequest::NextEvent { .. }
| DaemonRequest::SubscribeDrop
| DaemonRequest::NextFinishedDropTokens
| DaemonRequest::ReportDropTokens { .. }
| DaemonRequest::SendMessage { .. }
| DaemonRequest::EventStreamDropped => false,
}
}
}

#[derive(serde::Serialize, serde::Deserialize, Clone)]
pub enum DataMessage {
Vec(AVec<u8, ConstAlign<128>>),
SharedMemory {
shared_memory_id: String,
len: usize,
drop_token: DropToken,
},
}

impl DataMessage {
pub fn drop_token(&self) -> Option<DropToken> {
match self {
DataMessage::Vec(_) => None,
DataMessage::SharedMemory { drop_token, .. } => Some(*drop_token),
}
}
}

impl fmt::Debug for DataMessage {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Vec(v) => f
.debug_struct("Vec")
.field("len", &v.len())
.finish_non_exhaustive(),
Self::SharedMemory {
shared_memory_id,
len,
drop_token,
} => f
.debug_struct("SharedMemory")
.field("shared_memory_id", shared_memory_id)
.field("len", len)
.field("drop_token", drop_token)
.finish(),
}
}
}

type SharedMemoryId = String;

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[must_use]
pub enum DaemonReply {
Result(Result<(), String>),
PreparedMessage { shared_memory_id: SharedMemoryId },
NextEvents(Vec<Timestamped<NodeEvent>>),
NextDropEvents(Vec<Timestamped<NodeDropEvent>>),
NodeConfig { result: Result<NodeConfig, String> },
Empty,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct Timestamped<T> {
pub inner: T,
pub timestamp: uhlc::Timestamp,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum NodeEvent {
Stop,
Reload {
operator_id: Option<OperatorId>,
},
Input {
id: DataId,
metadata: Metadata,
data: Option<DataMessage>,
},
InputClosed {
id: DataId,
},
AllInputsClosed,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum NodeDropEvent {
OutputDropped { drop_token: DropToken },
}

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct DropEvent {
pub tokens: Vec<DropToken>,
}

#[derive(
Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize,
)]
pub struct DropToken(Uuid);

impl DropToken {
pub fn generate() -> Self {
Self(Uuid::new_v7(Timestamp::now(NoContext)))
}
}

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub enum InputData {
SharedMemory(SharedMemoryInput),
Vec(Vec<u8>),
}

impl InputData {
pub fn drop_token(&self) -> Option<DropToken> {
match self {
InputData::SharedMemory(data) => Some(data.drop_token),
InputData::Vec(_) => None,
}
}
}

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct SharedMemoryInput {
pub shared_memory_id: SharedMemoryId,
pub len: usize,
pub drop_token: DropToken,
}

#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub enum DaemonCoordinatorEvent {
Spawn(SpawnDataflowNodes),
AllNodesReady {
dataflow_id: DataflowId,
exited_before_subscribe: Vec<NodeId>,
},
StopDataflow {
dataflow_id: DataflowId,
grace_duration: Option<Duration>,
},
ReloadDataflow {
dataflow_id: DataflowId,
node_id: NodeId,
operator_id: Option<OperatorId>,
},
Logs {
dataflow_id: DataflowId,
node_id: NodeId,
},
Destroy,
Heartbeat,
}

#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub enum DynamicNodeEvent {
NodeConfig { node_id: NodeId },
}

#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub enum InterDaemonEvent {
Output {
dataflow_id: DataflowId,
node_id: NodeId,
output_id: DataId,
metadata: Metadata,
data: Option<AVec<u8, ConstAlign<128>>>,
},
InputsClosed {
dataflow_id: DataflowId,
inputs: BTreeSet<(NodeId, DataId)>,
},
}

#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub enum DaemonCoordinatorReply {
SpawnResult(Result<(), String>),
ReloadResult(Result<(), String>),
StopResult(Result<(), String>),
DestroyResult {
result: Result<(), String>,
#[serde(skip)]
notify: Option<tokio::sync::oneshot::Sender<()>>,
},
Logs(Result<Vec<u8>, String>),
}

pub type DataflowId = Uuid;

#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub struct SpawnDataflowNodes {
pub dataflow_id: DataflowId,
pub working_dir: PathBuf,
pub nodes: Vec<ResolvedNode>,
pub machine_listen_ports: BTreeMap<String, SocketAddr>,
pub dataflow_descriptor: Descriptor,
}

+ 1
- 3
libraries/core/src/lib.rs View File

@@ -5,11 +5,9 @@ use std::{
path::Path,
};

pub use dora_message as message;
pub use uhlc;

pub mod config;
pub mod coordinator_messages;
pub mod daemon_messages;
pub mod descriptor;
pub mod topics;



+ 1
- 246
libraries/core/src/topics.rs View File

@@ -1,18 +1,4 @@
use dora_message::uhlc;
use std::{
borrow::Cow,
collections::{BTreeMap, BTreeSet},
fmt::Display,
net::{IpAddr, Ipv4Addr},
path::PathBuf,
time::Duration,
};
use uuid::Uuid;

use crate::{
config::{NodeId, OperatorId},
descriptor::Descriptor,
};
use std::net::{IpAddr, Ipv4Addr};

pub const LOCALHOST: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
pub const DORA_COORDINATOR_PORT_DEFAULT: u16 = 0xD02A;
@@ -20,234 +6,3 @@ pub const DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT: u16 = 0xD02B;
pub const DORA_COORDINATOR_PORT_CONTROL_DEFAULT: u16 = 0x177C;

pub const MANUAL_STOP: &str = "dora/stop";

#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub enum ControlRequest {
Start {
dataflow: Descriptor,
name: Option<String>,
// TODO: remove this once we figure out deploying of node/operator
// binaries from CLI to coordinator/daemon
local_working_dir: PathBuf,
},
Reload {
dataflow_id: Uuid,
node_id: NodeId,
operator_id: Option<OperatorId>,
},
Check {
dataflow_uuid: Uuid,
},
Stop {
dataflow_uuid: Uuid,
grace_duration: Option<Duration>,
},
StopByName {
name: String,
grace_duration: Option<Duration>,
},
Logs {
uuid: Option<Uuid>,
name: Option<String>,
node: String,
},
Destroy,
List,
DaemonConnected,
ConnectedMachines,
LogSubscribe {
dataflow_id: Uuid,
level: log::LevelFilter,
},
}

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct DataflowList(pub Vec<DataflowListEntry>);

impl DataflowList {
pub fn get_active(&self) -> Vec<DataflowId> {
self.0
.iter()
.filter(|d| d.status == DataflowStatus::Running)
.map(|d| d.id.clone())
.collect()
}
}

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct DataflowListEntry {
pub id: DataflowId,
pub status: DataflowStatus,
}

#[derive(Debug, Clone, Copy, serde::Deserialize, serde::Serialize, PartialEq, Eq)]
pub enum DataflowStatus {
Running,
Finished,
Failed,
}

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub enum ControlRequestReply {
Error(String),
CoordinatorStopped,
DataflowStarted { uuid: Uuid },
DataflowReloaded { uuid: Uuid },
DataflowStopped { uuid: Uuid, result: DataflowResult },
DataflowList(DataflowList),
DestroyOk,
DaemonConnected(bool),
ConnectedMachines(BTreeSet<String>),
Logs(Vec<u8>),
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct DataflowId {
pub uuid: Uuid,
pub name: Option<String>,
}

impl Display for DataflowId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if let Some(name) = &self.name {
write!(f, "[{name}] {}", self.uuid)
} else {
write!(f, "[<unnamed>] {}", self.uuid)
}
}
}

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct DataflowResult {
pub uuid: Uuid,
pub timestamp: uhlc::Timestamp,
pub node_results: BTreeMap<NodeId, Result<(), NodeError>>,
}

impl DataflowResult {
pub fn ok_empty(uuid: Uuid, timestamp: uhlc::Timestamp) -> Self {
Self {
uuid,
timestamp,
node_results: Default::default(),
}
}

pub fn is_ok(&self) -> bool {
self.node_results.values().all(|r| r.is_ok())
}
}

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct DataflowDaemonResult {
pub timestamp: uhlc::Timestamp,
pub node_results: BTreeMap<NodeId, Result<(), NodeError>>,
}

impl DataflowDaemonResult {
pub fn is_ok(&self) -> bool {
self.node_results.values().all(|r| r.is_ok())
}
}

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct NodeError {
pub timestamp: uhlc::Timestamp,
pub cause: NodeErrorCause,
pub exit_status: NodeExitStatus,
}

impl std::fmt::Display for NodeError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self.exit_status {
NodeExitStatus::Success => write!(f, "<success>"),
NodeExitStatus::IoError(err) => write!(f, "I/O error while reading exit status: {err}"),
NodeExitStatus::ExitCode(code) => write!(f, "exited with code {code}"),
NodeExitStatus::Signal(signal) => {
let signal_str: Cow<_> = match signal {
1 => "SIGHUP".into(),
2 => "SIGINT".into(),
3 => "SIGQUIT".into(),
4 => "SIGILL".into(),
6 => "SIGABRT".into(),
8 => "SIGFPE".into(),
9 => "SIGKILL".into(),
11 => "SIGSEGV".into(),
13 => "SIGPIPE".into(),
14 => "SIGALRM".into(),
15 => "SIGTERM".into(),
22 => "SIGABRT".into(),
23 => "NSIG".into(),
other => other.to_string().into(),
};
if matches!(self.cause, NodeErrorCause::GraceDuration) {
write!(f, "node was killed by dora because it didn't react to a stop message in time ({signal_str})")
} else {
write!(f, "exited because of signal {signal_str}")
}
}
NodeExitStatus::Unknown => write!(f, "unknown exit status"),
}?;

match &self.cause {
NodeErrorCause::GraceDuration => {}, // handled above
NodeErrorCause::Cascading { caused_by_node } => write!(
f,
". This error occurred because node `{caused_by_node}` exited before connecting to dora."
)?,
NodeErrorCause::Other { stderr } if stderr.is_empty() => {}
NodeErrorCause::Other { stderr } => {
let line: &str = "---------------------------------------------------------------------------------\n";
write!(f, " with stderr output:\n{line}{stderr}{line}")?
},
}

Ok(())
}
}

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub enum NodeErrorCause {
/// Node was killed because it didn't react to a stop message in time.
GraceDuration,
/// Node failed because another node failed before,
Cascading {
caused_by_node: NodeId,
},
Other {
stderr: String,
},
}

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub enum NodeExitStatus {
Success,
IoError(String),
ExitCode(i32),
Signal(i32),
Unknown,
}

impl From<Result<std::process::ExitStatus, std::io::Error>> for NodeExitStatus {
fn from(result: Result<std::process::ExitStatus, std::io::Error>) -> Self {
match result {
Ok(status) => {
if status.success() {
NodeExitStatus::Success
} else if let Some(code) = status.code() {
Self::ExitCode(code)
} else {
#[cfg(unix)]
{
use std::os::unix::process::ExitStatusExt;
if let Some(signal) = status.signal() {
return Self::Signal(signal);
}
}
Self::Unknown
}
}
Err(err) => Self::IoError(err.to_string()),
}
}
}

+ 5
- 1
libraries/message/Cargo.toml View File

@@ -11,7 +11,11 @@ license.workspace = true

[dependencies]
arrow-data = { workspace = true }
uhlc = "0.5.1"
serde = { version = "1.0.136", features = ["derive"] }
eyre = "0.6.8"
arrow-schema = { workspace = true, features = ["serde"] }
tokio = "1.39.2"
dora-core = { workspace = true }
uuid = { version = "1.7", features = ["serde", "v7"] }
log = { version = "0.4.21", features = ["serde"] }
aligned-vec = { version = "0.5.0", features = ["serde"] }

+ 47
- 0
libraries/message/src/cli_to_coordinator.rs View File

@@ -0,0 +1,47 @@
use std::{path::PathBuf, time::Duration};

use dora_core::{
config::{NodeId, OperatorId},
descriptor::Descriptor,
};
use uuid::Uuid;

#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub enum ControlRequest {
Start {
dataflow: Descriptor,
name: Option<String>,
// TODO: remove this once we figure out deploying of node/operator
// binaries from CLI to coordinator/daemon
local_working_dir: PathBuf,
},
Reload {
dataflow_id: Uuid,
node_id: NodeId,
operator_id: Option<OperatorId>,
},
Check {
dataflow_uuid: Uuid,
},
Stop {
dataflow_uuid: Uuid,
grace_duration: Option<Duration>,
},
StopByName {
name: String,
grace_duration: Option<Duration>,
},
Logs {
uuid: Option<Uuid>,
name: Option<String>,
node: String,
},
Destroy,
List,
DaemonConnected,
ConnectedMachines,
LogSubscribe {
dataflow_id: Uuid,
level: log::LevelFilter,
},
}

+ 184
- 0
libraries/message/src/common.rs View File

@@ -0,0 +1,184 @@
use core::fmt;
use std::borrow::Cow;

use aligned_vec::{AVec, ConstAlign};
use dora_core::{config::NodeId, uhlc};
use uuid::Uuid;

use crate::DataflowId;

pub use log::Level as LogLevel;

#[derive(Debug, serde::Serialize, serde::Deserialize)]
#[must_use]
pub struct LogMessage {
pub dataflow_id: DataflowId,
pub node_id: Option<NodeId>,
pub level: LogLevel,
pub target: Option<String>,
pub module_path: Option<String>,
pub file: Option<String>,
pub line: Option<u32>,
pub message: String,
}

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct NodeError {
pub timestamp: uhlc::Timestamp,
pub cause: NodeErrorCause,
pub exit_status: NodeExitStatus,
}

impl std::fmt::Display for NodeError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self.exit_status {
NodeExitStatus::Success => write!(f, "<success>"),
NodeExitStatus::IoError(err) => write!(f, "I/O error while reading exit status: {err}"),
NodeExitStatus::ExitCode(code) => write!(f, "exited with code {code}"),
NodeExitStatus::Signal(signal) => {
let signal_str: Cow<_> = match signal {
1 => "SIGHUP".into(),
2 => "SIGINT".into(),
3 => "SIGQUIT".into(),
4 => "SIGILL".into(),
6 => "SIGABRT".into(),
8 => "SIGFPE".into(),
9 => "SIGKILL".into(),
11 => "SIGSEGV".into(),
13 => "SIGPIPE".into(),
14 => "SIGALRM".into(),
15 => "SIGTERM".into(),
22 => "SIGABRT".into(),
23 => "NSIG".into(),
other => other.to_string().into(),
};
if matches!(self.cause, NodeErrorCause::GraceDuration) {
write!(f, "node was killed by dora because it didn't react to a stop message in time ({signal_str})")
} else {
write!(f, "exited because of signal {signal_str}")
}
}
NodeExitStatus::Unknown => write!(f, "unknown exit status"),
}?;

match &self.cause {
NodeErrorCause::GraceDuration => {}, // handled above
NodeErrorCause::Cascading { caused_by_node } => write!(
f,
". This error occurred because node `{caused_by_node}` exited before connecting to dora."
)?,
NodeErrorCause::Other { stderr } if stderr.is_empty() => {}
NodeErrorCause::Other { stderr } => {
let line: &str = "---------------------------------------------------------------------------------\n";
write!(f, " with stderr output:\n{line}{stderr}{line}")?
},
}

Ok(())
}
}

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub enum NodeErrorCause {
/// Node was killed because it didn't react to a stop message in time.
GraceDuration,
/// Node failed because another node failed before,
Cascading {
caused_by_node: NodeId,
},
Other {
stderr: String,
},
}

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub enum NodeExitStatus {
Success,
IoError(String),
ExitCode(i32),
Signal(i32),
Unknown,
}

impl From<Result<std::process::ExitStatus, std::io::Error>> for NodeExitStatus {
fn from(result: Result<std::process::ExitStatus, std::io::Error>) -> Self {
match result {
Ok(status) => {
if status.success() {
NodeExitStatus::Success
} else if let Some(code) = status.code() {
Self::ExitCode(code)
} else {
#[cfg(unix)]
{
use std::os::unix::process::ExitStatusExt;
if let Some(signal) = status.signal() {
return Self::Signal(signal);
}
}
Self::Unknown
}
}
Err(err) => Self::IoError(err.to_string()),
}
}
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct Timestamped<T> {
pub inner: T,
pub timestamp: uhlc::Timestamp,
}

pub type SharedMemoryId = String;

#[derive(serde::Serialize, serde::Deserialize, Clone)]
pub enum DataMessage {
Vec(AVec<u8, ConstAlign<128>>),
SharedMemory {
shared_memory_id: String,
len: usize,
drop_token: DropToken,
},
}

impl DataMessage {
pub fn drop_token(&self) -> Option<DropToken> {
match self {
DataMessage::Vec(_) => None,
DataMessage::SharedMemory { drop_token, .. } => Some(*drop_token),
}
}
}

impl fmt::Debug for DataMessage {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Vec(v) => f
.debug_struct("Vec")
.field("len", &v.len())
.finish_non_exhaustive(),
Self::SharedMemory {
shared_memory_id,
len,
drop_token,
} => f
.debug_struct("SharedMemory")
.field("shared_memory_id", shared_memory_id)
.field("len", len)
.field("drop_token", drop_token)
.finish(),
}
}
}

#[derive(
Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize,
)]
pub struct DropToken(Uuid);

impl DropToken {
pub fn generate() -> Self {
Self(Uuid::new_v7(uuid::Timestamp::now(uuid::NoContext)))
}
}

+ 85
- 0
libraries/message/src/coordinator_to_cli.rs View File

@@ -0,0 +1,85 @@
use std::collections::{BTreeMap, BTreeSet};

use dora_core::config::NodeId;
use dora_core::uhlc;
use uuid::Uuid;

pub use crate::common::LogMessage;
pub use crate::common::{NodeError, NodeErrorCause, NodeExitStatus};

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub enum ControlRequestReply {
Error(String),
CoordinatorStopped,
DataflowStarted { uuid: Uuid },
DataflowReloaded { uuid: Uuid },
DataflowStopped { uuid: Uuid, result: DataflowResult },
DataflowList(DataflowList),
DestroyOk,
DaemonConnected(bool),
ConnectedMachines(BTreeSet<String>),
Logs(Vec<u8>),
}

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct DataflowResult {
pub uuid: Uuid,
pub timestamp: uhlc::Timestamp,
pub node_results: BTreeMap<NodeId, Result<(), NodeError>>,
}

impl DataflowResult {
pub fn ok_empty(uuid: Uuid, timestamp: uhlc::Timestamp) -> Self {
Self {
uuid,
timestamp,
node_results: Default::default(),
}
}

pub fn is_ok(&self) -> bool {
self.node_results.values().all(|r| r.is_ok())
}
}

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct DataflowList(pub Vec<DataflowListEntry>);

impl DataflowList {
pub fn get_active(&self) -> Vec<DataflowIdAndName> {
self.0
.iter()
.filter(|d| d.status == DataflowStatus::Running)
.map(|d| d.id.clone())
.collect()
}
}

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct DataflowListEntry {
pub id: DataflowIdAndName,
pub status: DataflowStatus,
}

#[derive(Debug, Clone, Copy, serde::Deserialize, serde::Serialize, PartialEq, Eq)]
pub enum DataflowStatus {
Running,
Finished,
Failed,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct DataflowIdAndName {
pub uuid: Uuid,
pub name: Option<String>,
}

impl std::fmt::Display for DataflowIdAndName {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if let Some(name) = &self.name {
write!(f, "[{name}] {}", self.uuid)
} else {
write!(f, "[<unnamed>] {}", self.uuid)
}
}
}

+ 59
- 0
libraries/message/src/coordinator_to_daemon.rs View File

@@ -0,0 +1,59 @@
use std::{collections::BTreeMap, net::SocketAddr, path::PathBuf, time::Duration};

use dora_core::{
config::{NodeId, OperatorId},
// TODO: how should we version these?
descriptor::{Descriptor, ResolvedNode},
};

use crate::DataflowId;

pub use crate::common::Timestamped;

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub enum RegisterResult {
Ok,
Err(String),
}

impl RegisterResult {
pub fn to_result(self) -> eyre::Result<()> {
match self {
RegisterResult::Ok => Ok(()),
RegisterResult::Err(err) => Err(eyre::eyre!(err)),
}
}
}

#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub enum DaemonCoordinatorEvent {
Spawn(SpawnDataflowNodes),
AllNodesReady {
dataflow_id: DataflowId,
exited_before_subscribe: Vec<NodeId>,
},
StopDataflow {
dataflow_id: DataflowId,
grace_duration: Option<Duration>,
},
ReloadDataflow {
dataflow_id: DataflowId,
node_id: NodeId,
operator_id: Option<OperatorId>,
},
Logs {
dataflow_id: DataflowId,
node_id: NodeId,
},
Destroy,
Heartbeat,
}

#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub struct SpawnDataflowNodes {
pub dataflow_id: DataflowId,
pub working_dir: PathBuf,
pub nodes: Vec<ResolvedNode>,
pub machine_listen_ports: BTreeMap<String, SocketAddr>,
pub dataflow_descriptor: Descriptor,
}

+ 86
- 0
libraries/message/src/daemon_to_coordinator.rs View File

@@ -0,0 +1,86 @@
use std::collections::BTreeMap;

use dora_core::{config::NodeId, uhlc};

pub use crate::common::{
DataMessage, LogLevel, LogMessage, NodeError, NodeErrorCause, NodeExitStatus, Timestamped,
};
use crate::DataflowId;

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub enum CoordinatorRequest {
Register(DaemonRegisterRequest),
Event {
machine_id: String,
event: DaemonEvent,
},
}

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct DaemonRegisterRequest {
dora_version: String,
pub machine_id: String,
pub listen_port: u16,
}

impl DaemonRegisterRequest {
pub fn new(machine_id: String, listen_port: u16) -> Self {
Self {
dora_version: env!("CARGO_PKG_VERSION").to_owned(),
machine_id,
listen_port,
}
}

pub fn check_version(&self) -> Result<(), String> {
let crate_version = env!("CARGO_PKG_VERSION");
if self.dora_version == crate_version {
Ok(())
} else {
Err(format!(
"version mismatch: message format v{} is not compatible \
with expected message format v{crate_version}",
self.dora_version
))
}
}
}

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub enum DaemonEvent {
AllNodesReady {
dataflow_id: DataflowId,
exited_before_subscribe: Vec<NodeId>,
},
AllNodesFinished {
dataflow_id: DataflowId,
result: DataflowDaemonResult,
},
Heartbeat,
Log(LogMessage),
}

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct DataflowDaemonResult {
pub timestamp: uhlc::Timestamp,
pub node_results: BTreeMap<NodeId, Result<(), NodeError>>,
}

impl DataflowDaemonResult {
pub fn is_ok(&self) -> bool {
self.node_results.values().all(|r| r.is_ok())
}
}

#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub enum DaemonCoordinatorReply {
SpawnResult(Result<(), String>),
ReloadResult(Result<(), String>),
StopResult(Result<(), String>),
DestroyResult {
result: Result<(), String>,
#[serde(skip)]
notify: Option<tokio::sync::oneshot::Sender<()>>,
},
Logs(Result<Vec<u8>, String>),
}

+ 21
- 0
libraries/message/src/daemon_to_daemon.rs View File

@@ -0,0 +1,21 @@
use std::collections::BTreeSet;

use aligned_vec::{AVec, ConstAlign};
use dora_core::config::{DataId, NodeId};

use crate::{metadata::Metadata, DataflowId};

#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub enum InterDaemonEvent {
Output {
dataflow_id: DataflowId,
node_id: NodeId,
output_id: DataId,
metadata: Metadata,
data: Option<AVec<u8, ConstAlign<128>>>,
},
InputsClosed {
dataflow_id: DataflowId,
inputs: BTreeSet<(NodeId, DataId)>,
},
}

+ 77
- 0
libraries/message/src/daemon_to_node.rs View File

@@ -0,0 +1,77 @@
use std::{net::SocketAddr, path::PathBuf};

use dora_core::{
config::{DataId, NodeId, NodeRunConfig, OperatorId},
descriptor::{Descriptor, OperatorDefinition},
};

use crate::{metadata::Metadata, DataflowId};

pub use crate::common::{DataMessage, DropToken, SharedMemoryId, Timestamped};

// Passed via env variable
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct RuntimeConfig {
pub node: NodeConfig,
pub operators: Vec<OperatorDefinition>,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct NodeConfig {
pub dataflow_id: DataflowId,
pub node_id: NodeId,
pub run_config: NodeRunConfig,
pub daemon_communication: DaemonCommunication,
pub dataflow_descriptor: Descriptor,
pub dynamic: bool,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum DaemonCommunication {
Shmem {
daemon_control_region_id: SharedMemoryId,
daemon_drop_region_id: SharedMemoryId,
daemon_events_region_id: SharedMemoryId,
daemon_events_close_region_id: SharedMemoryId,
},
Tcp {
socket_addr: SocketAddr,
},
#[cfg(unix)]
UnixDomain {
socket_file: PathBuf,
},
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[must_use]
pub enum DaemonReply {
Result(Result<(), String>),
PreparedMessage { shared_memory_id: SharedMemoryId },
NextEvents(Vec<Timestamped<NodeEvent>>),
NextDropEvents(Vec<Timestamped<NodeDropEvent>>),
NodeConfig { result: Result<NodeConfig, String> },
Empty,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum NodeEvent {
Stop,
Reload {
operator_id: Option<OperatorId>,
},
Input {
id: DataId,
metadata: Metadata,
data: Option<DataMessage>,
},
InputClosed {
id: DataId,
},
AllInputsClosed,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum NodeDropEvent {
OutputDropped { drop_token: DropToken },
}

+ 10
- 138
libraries/message/src/lib.rs View File

@@ -3,146 +3,18 @@

#![allow(clippy::missing_safety_doc)]

use std::collections::BTreeMap;
pub mod common;
pub mod metadata;

use arrow_data::ArrayData;
use arrow_schema::DataType;
use eyre::Context;
use serde::{Deserialize, Serialize};
pub use uhlc;
pub mod coordinator_to_daemon;
pub mod daemon_to_coordinator;

pub type MetadataParameters = BTreeMap<String, Parameter>;
pub mod daemon_to_daemon;

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Metadata {
metadata_version: u16,
timestamp: uhlc::Timestamp,
pub type_info: ArrowTypeInfo,
pub parameters: MetadataParameters,
}
pub mod daemon_to_node;
pub mod node_to_daemon;

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ArrowTypeInfo {
pub data_type: DataType,
pub len: usize,
pub null_count: usize,
pub validity: Option<Vec<u8>>,
pub offset: usize,
pub buffer_offsets: Vec<BufferOffset>,
pub child_data: Vec<ArrowTypeInfo>,
}
pub mod cli_to_coordinator;
pub mod coordinator_to_cli;

impl ArrowTypeInfo {
pub const fn empty() -> Self {
Self {
data_type: DataType::Null,
len: 0,
null_count: 0,
validity: None,
offset: 0,
buffer_offsets: Vec::new(),
child_data: Vec::new(),
}
}

pub fn byte_array(data_len: usize) -> Self {
Self {
data_type: DataType::UInt8,
len: data_len,
null_count: 0,
validity: None,
offset: 0,
buffer_offsets: vec![BufferOffset {
offset: 0,
len: data_len,
}],
child_data: Vec::new(),
}
}

pub unsafe fn from_array(
array: &ArrayData,
region_start: *const u8,
region_len: usize,
) -> eyre::Result<Self> {
Ok(Self {
data_type: array.data_type().clone(),
len: array.len(),
null_count: array.null_count(),
validity: array.nulls().map(|b| b.validity().to_owned()),
offset: array.offset(),
buffer_offsets: array
.buffers()
.iter()
.map(|b| {
let ptr = b.as_ptr();
if ptr as usize <= region_start as usize {
eyre::bail!("ptr {ptr:p} starts before region {region_start:p}");
}
if ptr as usize >= region_start as usize + region_len {
eyre::bail!("ptr {ptr:p} starts after region {region_start:p}");
}
if ptr as usize + b.len() > region_start as usize + region_len {
eyre::bail!("ptr {ptr:p} ends after region {region_start:p}");
}
let offset = usize::try_from(unsafe { ptr.offset_from(region_start) })
.context("offset_from is negative")?;

Result::<_, eyre::Report>::Ok(BufferOffset {
offset,
len: b.len(),
})
})
.collect::<Result<_, _>>()?,
child_data: array
.child_data()
.iter()
.map(|c| unsafe { Self::from_array(c, region_start, region_len) })
.collect::<Result<_, _>>()?,
})
}
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct BufferOffset {
pub offset: usize,
pub len: usize,
}

#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub enum Parameter {
Bool(bool),
Integer(i64),
String(String),
}

impl Metadata {
pub fn new(timestamp: uhlc::Timestamp, type_info: ArrowTypeInfo) -> Self {
Self::from_parameters(timestamp, type_info, Default::default())
}

pub fn from_parameters(
timestamp: uhlc::Timestamp,
type_info: ArrowTypeInfo,
parameters: MetadataParameters,
) -> Self {
Self {
metadata_version: 0,
timestamp,
parameters,
type_info,
}
}

pub fn timestamp(&self) -> uhlc::Timestamp {
self.timestamp
}

pub fn open_telemetry_context(&self) -> String {
if let Some(Parameter::String(otel)) = self.parameters.get("open_telemetry_context") {
otel.to_string()
} else {
"".to_string()
}
}
}
pub type DataflowId = uuid::Uuid;

+ 143
- 0
libraries/message/src/metadata.rs View File

@@ -0,0 +1,143 @@
use std::collections::BTreeMap;

use arrow_data::ArrayData;
use arrow_schema::DataType;
use dora_core::uhlc;
use eyre::Context;
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Metadata {
metadata_version: u16,
timestamp: uhlc::Timestamp,
pub type_info: ArrowTypeInfo,
pub parameters: MetadataParameters,
}

impl Metadata {
pub fn new(timestamp: uhlc::Timestamp, type_info: ArrowTypeInfo) -> Self {
Self::from_parameters(timestamp, type_info, Default::default())
}

pub fn from_parameters(
timestamp: uhlc::Timestamp,
type_info: ArrowTypeInfo,
parameters: MetadataParameters,
) -> Self {
Self {
metadata_version: 0,
timestamp,
parameters,
type_info,
}
}

pub fn timestamp(&self) -> uhlc::Timestamp {
self.timestamp
}

pub fn open_telemetry_context(&self) -> String {
if let Some(Parameter::String(otel)) = self.parameters.get("open_telemetry_context") {
otel.to_string()
} else {
"".to_string()
}
}
}

pub type MetadataParameters = BTreeMap<String, Parameter>;

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ArrowTypeInfo {
pub data_type: DataType,
pub len: usize,
pub null_count: usize,
pub validity: Option<Vec<u8>>,
pub offset: usize,
pub buffer_offsets: Vec<BufferOffset>,
pub child_data: Vec<ArrowTypeInfo>,
}

impl ArrowTypeInfo {
pub const fn empty() -> Self {
Self {
data_type: DataType::Null,
len: 0,
null_count: 0,
validity: None,
offset: 0,
buffer_offsets: Vec::new(),
child_data: Vec::new(),
}
}

pub fn byte_array(data_len: usize) -> Self {
Self {
data_type: DataType::UInt8,
len: data_len,
null_count: 0,
validity: None,
offset: 0,
buffer_offsets: vec![BufferOffset {
offset: 0,
len: data_len,
}],
child_data: Vec::new(),
}
}

pub unsafe fn from_array(
array: &ArrayData,
region_start: *const u8,
region_len: usize,
) -> eyre::Result<Self> {
Ok(Self {
data_type: array.data_type().clone(),
len: array.len(),
null_count: array.null_count(),
validity: array.nulls().map(|b| b.validity().to_owned()),
offset: array.offset(),
buffer_offsets: array
.buffers()
.iter()
.map(|b| {
let ptr = b.as_ptr();
if ptr as usize <= region_start as usize {
eyre::bail!("ptr {ptr:p} starts before region {region_start:p}");
}
if ptr as usize >= region_start as usize + region_len {
eyre::bail!("ptr {ptr:p} starts after region {region_start:p}");
}
if ptr as usize + b.len() > region_start as usize + region_len {
eyre::bail!("ptr {ptr:p} ends after region {region_start:p}");
}
let offset = usize::try_from(unsafe { ptr.offset_from(region_start) })
.context("offset_from is negative")?;

Result::<_, eyre::Report>::Ok(BufferOffset {
offset,
len: b.len(),
})
})
.collect::<Result<_, _>>()?,
child_data: array
.child_data()
.iter()
.map(|c| unsafe { Self::from_array(c, region_start, region_len) })
.collect::<Result<_, _>>()?,
})
}
}

#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub enum Parameter {
Bool(bool),
Integer(i64),
String(String),
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct BufferOffset {
pub offset: usize,
pub len: usize,
}

+ 131
- 0
libraries/message/src/node_to_daemon.rs View File

@@ -0,0 +1,131 @@
pub use crate::common::{
DataMessage, DropToken, LogLevel, LogMessage, SharedMemoryId, Timestamped,
};
use crate::{metadata::Metadata, DataflowId};

use dora_core::config::{DataId, NodeId};

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub enum DaemonRequest {
Register(NodeRegisterRequest),
Subscribe,
SendMessage {
output_id: DataId,
metadata: Metadata,
data: Option<DataMessage>,
},
CloseOutputs(Vec<DataId>),
/// Signals that the node is finished sending outputs and that it received all
/// required drop tokens.
OutputsDone,
NextEvent {
drop_tokens: Vec<DropToken>,
},
ReportDropTokens {
drop_tokens: Vec<DropToken>,
},
SubscribeDrop,
NextFinishedDropTokens,
EventStreamDropped,
NodeConfig {
node_id: NodeId,
},
}

impl DaemonRequest {
pub fn expects_tcp_bincode_reply(&self) -> bool {
#[allow(clippy::match_like_matches_macro)]
match self {
DaemonRequest::SendMessage { .. }
| DaemonRequest::NodeConfig { .. }
| DaemonRequest::ReportDropTokens { .. } => false,
DaemonRequest::Register(NodeRegisterRequest { .. })
| DaemonRequest::Subscribe
| DaemonRequest::CloseOutputs(_)
| DaemonRequest::OutputsDone
| DaemonRequest::NextEvent { .. }
| DaemonRequest::SubscribeDrop
| DaemonRequest::NextFinishedDropTokens
| DaemonRequest::EventStreamDropped => true,
}
}

pub fn expects_tcp_json_reply(&self) -> bool {
#[allow(clippy::match_like_matches_macro)]
match self {
DaemonRequest::NodeConfig { .. } => true,
DaemonRequest::Register(NodeRegisterRequest { .. })
| DaemonRequest::Subscribe
| DaemonRequest::CloseOutputs(_)
| DaemonRequest::OutputsDone
| DaemonRequest::NextEvent { .. }
| DaemonRequest::SubscribeDrop
| DaemonRequest::NextFinishedDropTokens
| DaemonRequest::ReportDropTokens { .. }
| DaemonRequest::SendMessage { .. }
| DaemonRequest::EventStreamDropped => false,
}
}
}

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct NodeRegisterRequest {
pub dataflow_id: DataflowId,
pub node_id: NodeId,
dora_version: String,
}

impl NodeRegisterRequest {
pub fn new(dataflow_id: DataflowId, node_id: NodeId) -> Self {
Self {
dataflow_id,
node_id,
dora_version: env!("CARGO_PKG_VERSION").to_owned(),
}
}

pub fn check_version(&self) -> Result<(), String> {
let crate_version = env!("CARGO_PKG_VERSION");
if self.dora_version == crate_version {
Ok(())
} else {
Err(format!(
"version mismatch: message format v{} is not compatible \
with expected message format v{crate_version}",
self.dora_version
))
}
}
}

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct DropEvent {
pub tokens: Vec<DropToken>,
}

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub enum InputData {
SharedMemory(SharedMemoryInput),
Vec(Vec<u8>),
}

impl InputData {
pub fn drop_token(&self) -> Option<DropToken> {
match self {
InputData::SharedMemory(data) => Some(data.drop_token),
InputData::Vec(_) => None,
}
}
}

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct SharedMemoryInput {
pub shared_memory_id: SharedMemoryId,
pub len: usize,
pub drop_token: DropToken,
}

#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub enum DynamicNodeEvent {
NodeConfig { node_id: NodeId },
}

Loading…
Cancel
Save