From 6a3dcecf33b8a98073559d23801366ae2559abf2 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 7 Aug 2024 20:50:40 +0200 Subject: [PATCH 1/2] Refactor: Move message definitions to `dora-message` crate First step towards versioning the message definitions indepedently. --- Cargo.lock | 15 +- Cargo.toml | 9 +- apis/python/node/src/lib.rs | 3 +- apis/rust/node/Cargo.toml | 1 + apis/rust/node/src/daemon_connection/mod.rs | 17 +- apis/rust/node/src/daemon_connection/tcp.rs | 5 +- .../node/src/daemon_connection/unix_domain.rs | 5 +- apis/rust/node/src/event_stream/event.rs | 6 +- apis/rust/node/src/event_stream/mod.rs | 27 +- apis/rust/node/src/event_stream/thread.rs | 9 +- apis/rust/node/src/lib.rs | 7 +- apis/rust/node/src/node/arrow_utils.rs | 2 +- apis/rust/node/src/node/control_channel.rs | 15 +- apis/rust/node/src/node/drop_stream.rs | 16 +- apis/rust/node/src/node/mod.rs | 13 +- binaries/cli/Cargo.toml | 1 + binaries/cli/src/attach.rs | 9 +- binaries/cli/src/check.rs | 2 +- binaries/cli/src/formatting.rs | 2 +- binaries/cli/src/logs.rs | 2 +- binaries/cli/src/main.rs | 19 +- binaries/cli/src/up.rs | 3 +- binaries/coordinator/Cargo.toml | 1 + binaries/coordinator/src/control.rs | 2 +- binaries/coordinator/src/lib.rs | 54 ++-- binaries/coordinator/src/listener.rs | 31 +- binaries/coordinator/src/log_subscriber.rs | 2 +- binaries/coordinator/src/run/mod.rs | 9 +- binaries/daemon/Cargo.toml | 1 + binaries/daemon/src/coordinator.rs | 15 +- binaries/daemon/src/inter_daemon.rs | 2 +- binaries/daemon/src/lib.rs | 114 +++---- binaries/daemon/src/local_listener.rs | 5 +- binaries/daemon/src/node_communication/mod.rs | 35 +- .../daemon/src/node_communication/shmem.rs | 7 +- binaries/daemon/src/node_communication/tcp.rs | 7 +- .../src/node_communication/unix_domain.rs | 7 +- binaries/daemon/src/pending.rs | 11 +- binaries/daemon/src/spawn.rs | 12 +- binaries/runtime/Cargo.toml | 1 + binaries/runtime/src/lib.rs | 2 +- binaries/runtime/src/operator/mod.rs | 4 +- binaries/runtime/src/operator/python.rs | 2 +- examples/multiple-daemons/run.rs | 13 +- libraries/core/Cargo.toml | 3 +- libraries/core/src/coordinator_messages.rs | 58 ---- libraries/core/src/daemon_messages.rs | 302 ------------------ libraries/core/src/lib.rs | 4 +- libraries/core/src/topics.rs | 247 +------------- libraries/message/Cargo.toml | 6 +- libraries/message/src/cli_to_coordinator.rs | 47 +++ libraries/message/src/common.rs | 184 +++++++++++ libraries/message/src/coordinator_to_cli.rs | 85 +++++ .../message/src/coordinator_to_daemon.rs | 59 ++++ .../message/src/daemon_to_coordinator.rs | 86 +++++ libraries/message/src/daemon_to_daemon.rs | 21 ++ libraries/message/src/daemon_to_node.rs | 77 +++++ libraries/message/src/lib.rs | 148 +-------- libraries/message/src/metadata.rs | 143 +++++++++ libraries/message/src/node_to_daemon.rs | 131 ++++++++ 60 files changed, 1117 insertions(+), 1009 deletions(-) delete mode 100644 libraries/core/src/coordinator_messages.rs delete mode 100644 libraries/core/src/daemon_messages.rs create mode 100644 libraries/message/src/cli_to_coordinator.rs create mode 100644 libraries/message/src/common.rs create mode 100644 libraries/message/src/coordinator_to_cli.rs create mode 100644 libraries/message/src/coordinator_to_daemon.rs create mode 100644 libraries/message/src/daemon_to_coordinator.rs create mode 100644 libraries/message/src/daemon_to_daemon.rs create mode 100644 libraries/message/src/daemon_to_node.rs create mode 100644 libraries/message/src/metadata.rs create mode 100644 libraries/message/src/node_to_daemon.rs diff --git a/Cargo.lock b/Cargo.lock index 9dcf5003..b3ef061c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2284,6 +2284,7 @@ dependencies = [ "dora-coordinator", "dora-core", "dora-daemon", + "dora-message", "dora-node-api-c", "dora-operator-api-c", "dora-runtime", @@ -2313,6 +2314,7 @@ version = "0.3.5" dependencies = [ "ctrlc", "dora-core", + "dora-message", "dora-tracing", "eyre", "futures", @@ -2330,8 +2332,6 @@ dependencies = [ name = "dora-core" version = "0.3.5" dependencies = [ - "aligned-vec", - "dora-message", "eyre", "log", "once_cell", @@ -2342,6 +2342,7 @@ dependencies = [ "serde_yaml 0.9.34+deprecated", "tokio", "tracing", + "uhlc", "uuid", "which", ] @@ -2359,6 +2360,7 @@ dependencies = [ "dora-arrow-convert", "dora-core", "dora-download", + "dora-message", "dora-node-api", "dora-tracing", "eyre", @@ -2394,6 +2396,7 @@ dependencies = [ "dora-coordinator", "dora-core", "dora-download", + "dora-message", "dora-tracing", "dunce", "eyre", @@ -2409,11 +2412,15 @@ dependencies = [ name = "dora-message" version = "0.3.5" dependencies = [ + "aligned-vec", "arrow-data", "arrow-schema", + "dora-core", "eyre", + "log", "serde", - "uhlc", + "tokio", + "uuid", ] [[package]] @@ -2436,6 +2443,7 @@ dependencies = [ "bincode", "dora-arrow-convert", "dora-core", + "dora-message", "dora-tracing", "eyre", "flume 0.10.14", @@ -2641,6 +2649,7 @@ dependencies = [ "arrow", "dora-core", "dora-download", + "dora-message", "dora-metrics", "dora-node-api", "dora-operator-api-python", diff --git a/Cargo.toml b/Cargo.toml index 41fc4c5b..e058dd81 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,18 +60,22 @@ dora-metrics = { version = "0.3.5", path = "libraries/extensions/telemetry/metri dora-download = { version = "0.3.5", path = "libraries/extensions/download" } shared-memory-server = { version = "0.3.5", path = "libraries/shared-memory-server" } communication-layer-request-reply = { version = "0.3.5", path = "libraries/communication-layer/request-reply" } -dora-message = { version = "0.3.5", path = "libraries/message" } dora-runtime = { version = "0.3.5", path = "binaries/runtime" } dora-daemon = { version = "0.3.5", path = "binaries/daemon" } dora-coordinator = { version = "0.3.5", path = "binaries/coordinator" } dora-ros2-bridge = { path = "libraries/extensions/ros2-bridge" } dora-ros2-bridge-msg-gen = { path = "libraries/extensions/ros2-bridge/msg-gen" } dora-ros2-bridge-python = { path = "libraries/extensions/ros2-bridge/python" } +dora-message = { version = "0.3.5", path = "libraries/message" } arrow = { version = "52" } arrow-schema = { version = "52" } arrow-data = { version = "52" } arrow-array = { version = "52" } -pyo3 = { version = "0.21", features = ["eyre", "abi3-py37", "multiple-pymethods"] } +pyo3 = { version = "0.21", features = [ + "eyre", + "abi3-py37", + "multiple-pymethods", +] } pythonize = "0.21" [package] @@ -90,6 +94,7 @@ eyre = "0.6.8" tokio = "1.24.2" dora-coordinator = { workspace = true } dora-core = { workspace = true } +dora-message = { workspace = true } dora-tracing = { workspace = true } dora-download = { workspace = true } dunce = "1.0.2" diff --git a/apis/python/node/src/lib.rs b/apis/python/node/src/lib.rs index bb9f5e0e..191af9d7 100644 --- a/apis/python/node/src/lib.rs +++ b/apis/python/node/src/lib.rs @@ -5,9 +5,8 @@ use std::time::Duration; use arrow::pyarrow::{FromPyArrow, ToPyArrow}; use dora_node_api::dora_core::config::NodeId; -use dora_node_api::dora_core::daemon_messages::DataflowId; use dora_node_api::merged::{MergeExternalSend, MergedEvent}; -use dora_node_api::{DoraNode, EventStream}; +use dora_node_api::{DataflowId, DoraNode, EventStream}; use dora_operator_api_python::{pydict_to_metadata, DelayedCleanup, NodeCleanupHandle, PyEvent}; use dora_ros2_bridge_python::Ros2Subscription; use eyre::Context; diff --git a/apis/rust/node/Cargo.toml b/apis/rust/node/Cargo.toml index 41d7fe6b..f6635fdb 100644 --- a/apis/rust/node/Cargo.toml +++ b/apis/rust/node/Cargo.toml @@ -12,6 +12,7 @@ tracing = ["dep:dora-tracing"] [dependencies] dora-core = { workspace = true } +dora-message = { workspace = true } shared-memory-server = { workspace = true } eyre = "0.6.7" serde_yaml = "0.8.23" diff --git a/apis/rust/node/src/daemon_connection/mod.rs b/apis/rust/node/src/daemon_connection/mod.rs index d21607f1..6a80452d 100644 --- a/apis/rust/node/src/daemon_connection/mod.rs +++ b/apis/rust/node/src/daemon_connection/mod.rs @@ -1,7 +1,8 @@ -use dora_core::{ - config::NodeId, - daemon_messages::{DaemonReply, DaemonRequest, DataflowId, Timestamped}, - message::uhlc::Timestamp, +use dora_core::{config::NodeId, uhlc::Timestamp}; +use dora_message::{ + daemon_to_node::DaemonReply, + node_to_daemon::{DaemonRequest, NodeRegisterRequest, Timestamped}, + DataflowId, }; use eyre::{bail, eyre, Context}; use shared_memory_server::{ShmemClient, ShmemConf}; @@ -58,11 +59,7 @@ impl DaemonChannel { timestamp: Timestamp, ) -> eyre::Result<()> { let msg = Timestamped { - inner: DaemonRequest::Register { - dataflow_id, - node_id, - dora_version: env!("CARGO_PKG_VERSION").to_owned(), - }, + inner: DaemonRequest::Register(NodeRegisterRequest::new(dataflow_id, node_id)), timestamp, }; let reply = self @@ -70,7 +67,7 @@ impl DaemonChannel { .wrap_err("failed to send register request to dora-daemon")?; match reply { - dora_core::daemon_messages::DaemonReply::Result(result) => result + DaemonReply::Result(result) => result .map_err(|e| eyre!(e)) .wrap_err("failed to register node with dora-daemon")?, other => bail!("unexpected register reply: {other:?}"), diff --git a/apis/rust/node/src/daemon_connection/tcp.rs b/apis/rust/node/src/daemon_connection/tcp.rs index 15fdf19d..1d5e1e2b 100644 --- a/apis/rust/node/src/daemon_connection/tcp.rs +++ b/apis/rust/node/src/daemon_connection/tcp.rs @@ -1,4 +1,7 @@ -use dora_core::daemon_messages::{DaemonReply, DaemonRequest, Timestamped}; +use dora_message::{ + daemon_to_node::DaemonReply, + node_to_daemon::{DaemonRequest, Timestamped}, +}; use eyre::{eyre, Context}; use std::{ io::{Read, Write}, diff --git a/apis/rust/node/src/daemon_connection/unix_domain.rs b/apis/rust/node/src/daemon_connection/unix_domain.rs index dfcb17a2..fd6d50d2 100644 --- a/apis/rust/node/src/daemon_connection/unix_domain.rs +++ b/apis/rust/node/src/daemon_connection/unix_domain.rs @@ -1,4 +1,7 @@ -use dora_core::daemon_messages::{DaemonReply, DaemonRequest, Timestamped}; +use dora_message::{ + daemon_to_node::DaemonReply, + node_to_daemon::{DaemonRequest, Timestamped}, +}; use eyre::{eyre, Context}; use std::{ io::{Read, Write}, diff --git a/apis/rust/node/src/event_stream/event.rs b/apis/rust/node/src/event_stream/event.rs index 75b3c595..b5d0e9b8 100644 --- a/apis/rust/node/src/event_stream/event.rs +++ b/apis/rust/node/src/event_stream/event.rs @@ -2,10 +2,8 @@ use std::{ptr::NonNull, sync::Arc}; use aligned_vec::{AVec, ConstAlign}; use dora_arrow_convert::{ArrowData, IntoArrow}; -use dora_core::{ - config::{DataId, OperatorId}, - message::{ArrowTypeInfo, BufferOffset, Metadata}, -}; +use dora_core::config::{DataId, OperatorId}; +use dora_message::metadata::{ArrowTypeInfo, BufferOffset, Metadata}; use eyre::{Context, Result}; use shared_memory_extended::{Shmem, ShmemConf}; diff --git a/apis/rust/node/src/event_stream/mod.rs b/apis/rust/node/src/event_stream/mod.rs index 2a11503b..2901a766 100644 --- a/apis/rust/node/src/event_stream/mod.rs +++ b/apis/rust/node/src/event_stream/mod.rs @@ -1,5 +1,10 @@ use std::{sync::Arc, time::Duration}; +use dora_message::{ + daemon_to_node::{DaemonCommunication, DaemonReply, DataMessage, NodeEvent}, + node_to_daemon::{DaemonRequest, Timestamped}, + DataflowId, +}; pub use event::{Event, MappedInputData, RawData}; use futures::{ future::{select, Either}, @@ -12,13 +17,7 @@ use self::{ thread::{EventItem, EventStreamThreadHandle}, }; use crate::daemon_connection::DaemonChannel; -use dora_core::{ - config::NodeId, - daemon_messages::{ - self, DaemonCommunication, DaemonRequest, DataflowId, NodeEvent, Timestamped, - }, - message::uhlc, -}; +use dora_core::{config::NodeId, uhlc}; use eyre::{eyre, Context}; mod event; @@ -97,8 +96,8 @@ impl EventStream { .wrap_err("failed to create subscription with dora-daemon")?; match reply { - daemon_messages::DaemonReply::Result(Ok(())) => {} - daemon_messages::DaemonReply::Result(Err(err)) => { + DaemonReply::Result(Ok(())) => {} + DaemonReply::Result(Err(err)) => { eyre::bail!("subscribe failed: {err}") } other => eyre::bail!("unexpected subscribe reply: {other:?}"), @@ -151,8 +150,8 @@ impl EventStream { NodeEvent::Input { id, metadata, data } => { let data = match data { None => Ok(None), - Some(daemon_messages::DataMessage::Vec(v)) => Ok(Some(RawData::Vec(v))), - Some(daemon_messages::DataMessage::SharedMemory { + Some(DataMessage::Vec(v)) => Ok(Some(RawData::Vec(v))), + Some(DataMessage::SharedMemory { shared_memory_id, len, drop_token: _, // handled in `event_stream_loop` @@ -225,10 +224,8 @@ impl Drop for EventStream { .map_err(|e| eyre!(e)) .wrap_err("failed to signal event stream closure to dora-daemon") .and_then(|r| match r { - daemon_messages::DaemonReply::Result(Ok(())) => Ok(()), - daemon_messages::DaemonReply::Result(Err(err)) => { - Err(eyre!("EventStreamClosed failed: {err}")) - } + DaemonReply::Result(Ok(())) => Ok(()), + DaemonReply::Result(Err(err)) => Err(eyre!("EventStreamClosed failed: {err}")), other => Err(eyre!("unexpected EventStreamClosed reply: {other:?}")), }); if let Err(err) = result { diff --git a/apis/rust/node/src/event_stream/thread.rs b/apis/rust/node/src/event_stream/thread.rs index bee8cc22..add58e3e 100644 --- a/apis/rust/node/src/event_stream/thread.rs +++ b/apis/rust/node/src/event_stream/thread.rs @@ -1,7 +1,10 @@ use dora_core::{ config::NodeId, - daemon_messages::{DaemonReply, DaemonRequest, DropToken, NodeEvent, Timestamped}, - message::uhlc::{self, Timestamp}, + uhlc::{self, Timestamp}, +}; +use dora_message::{ + daemon_to_node::{DaemonReply, NodeEvent}, + node_to_daemon::{DaemonRequest, DropToken, Timestamped}, }; use eyre::{eyre, Context}; use flume::RecvTimeoutError; @@ -263,7 +266,7 @@ fn report_drop_tokens( timestamp, }; match channel.request(&daemon_request)? { - dora_core::daemon_messages::DaemonReply::Empty => Ok(()), + DaemonReply::Empty => Ok(()), other => Err(eyre!("unexpected ReportDropTokens reply: {other:?}")), } } diff --git a/apis/rust/node/src/lib.rs b/apis/rust/node/src/lib.rs index 7c61559e..9836ab7b 100644 --- a/apis/rust/node/src/lib.rs +++ b/apis/rust/node/src/lib.rs @@ -15,8 +15,11 @@ //! pub use arrow; pub use dora_arrow_convert::*; -pub use dora_core; -pub use dora_core::message::{uhlc, Metadata, MetadataParameters, Parameter}; +pub use dora_core::{self, uhlc}; +pub use dora_message::{ + metadata::{Metadata, MetadataParameters, Parameter}, + DataflowId, +}; pub use event_stream::{merged, Event, EventStream, MappedInputData, RawData}; pub use flume::Receiver; pub use node::{arrow_utils, DataSample, DoraNode, ZERO_COPY_THRESHOLD}; diff --git a/apis/rust/node/src/node/arrow_utils.rs b/apis/rust/node/src/node/arrow_utils.rs index 8deb3ca2..247c76ae 100644 --- a/apis/rust/node/src/node/arrow_utils.rs +++ b/apis/rust/node/src/node/arrow_utils.rs @@ -1,5 +1,5 @@ use arrow::array::{ArrayData, BufferSpec}; -use dora_core::message::{ArrowTypeInfo, BufferOffset}; +use dora_message::metadata::{ArrowTypeInfo, BufferOffset}; pub fn required_data_size(array: &ArrayData) -> usize { let mut next_offset = 0; diff --git a/apis/rust/node/src/node/control_channel.rs b/apis/rust/node/src/node/control_channel.rs index 28826e95..ce6d899f 100644 --- a/apis/rust/node/src/node/control_channel.rs +++ b/apis/rust/node/src/node/control_channel.rs @@ -3,8 +3,13 @@ use std::sync::Arc; use crate::daemon_connection::DaemonChannel; use dora_core::{ config::{DataId, NodeId}, - daemon_messages::{DaemonCommunication, DaemonRequest, DataMessage, DataflowId, Timestamped}, - message::{uhlc::HLC, Metadata}, + uhlc::HLC, +}; +use dora_message::{ + daemon_to_node::{DaemonCommunication, DaemonReply}, + metadata::Metadata, + node_to_daemon::{DaemonRequest, DataMessage, Timestamped}, + DataflowId, }; use eyre::{bail, eyre, Context}; @@ -60,7 +65,7 @@ impl ControlChannel { }) .wrap_err("failed to report outputs done to dora-daemon")?; match reply { - dora_core::daemon_messages::DaemonReply::Result(result) => result + DaemonReply::Result(result) => result .map_err(|e| eyre!(e)) .wrap_err("failed to report outputs done event to dora-daemon")?, other => bail!("unexpected outputs done reply: {other:?}"), @@ -77,7 +82,7 @@ impl ControlChannel { }) .wrap_err("failed to report closed outputs to dora-daemon")?; match reply { - dora_core::daemon_messages::DaemonReply::Result(result) => result + DaemonReply::Result(result) => result .map_err(|e| eyre!(e)) .wrap_err("failed to receive closed outputs reply from dora-daemon")?, other => bail!("unexpected closed outputs reply: {other:?}"), @@ -104,7 +109,7 @@ impl ControlChannel { }) .wrap_err("failed to send SendMessage request to dora-daemon")?; match reply { - dora_core::daemon_messages::DaemonReply::Empty => Ok(()), + DaemonReply::Empty => Ok(()), other => bail!("unexpected SendMessage reply: {other:?}"), } } diff --git a/apis/rust/node/src/node/drop_stream.rs b/apis/rust/node/src/node/drop_stream.rs index 80947ac9..d62b0588 100644 --- a/apis/rust/node/src/node/drop_stream.rs +++ b/apis/rust/node/src/node/drop_stream.rs @@ -1,13 +1,11 @@ use std::{sync::Arc, time::Duration}; use crate::daemon_connection::DaemonChannel; -use dora_core::{ - config::NodeId, - daemon_messages::{ - self, DaemonCommunication, DaemonReply, DaemonRequest, DataflowId, DropToken, - NodeDropEvent, Timestamped, - }, - message::uhlc, +use dora_core::{config::NodeId, uhlc}; +use dora_message::{ + daemon_to_node::{DaemonCommunication, DaemonReply, NodeDropEvent}, + node_to_daemon::{DaemonRequest, DropToken, Timestamped}, + DataflowId, }; use eyre::{eyre, Context}; use flume::RecvTimeoutError; @@ -64,8 +62,8 @@ impl DropStream { .wrap_err("failed to create subscription with dora-daemon")?; match reply { - daemon_messages::DaemonReply::Result(Ok(())) => {} - daemon_messages::DaemonReply::Result(Err(err)) => { + DaemonReply::Result(Ok(())) => {} + DaemonReply::Result(Err(err)) => { eyre::bail!("drop subscribe failed: {err}") } other => eyre::bail!("unexpected drop subscribe reply: {other:?}"), diff --git a/apis/rust/node/src/node/mod.rs b/apis/rust/node/src/node/mod.rs index 8951deb5..e37c1afb 100644 --- a/apis/rust/node/src/node/mod.rs +++ b/apis/rust/node/src/node/mod.rs @@ -9,12 +9,17 @@ use aligned_vec::{AVec, ConstAlign}; use arrow::array::Array; use dora_core::{ config::{DataId, NodeId, NodeRunConfig}, - daemon_messages::{DaemonRequest, DataMessage, DataflowId, DropToken, NodeConfig, Timestamped}, descriptor::Descriptor, - message::{uhlc, ArrowTypeInfo, Metadata, MetadataParameters}, topics::{DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, LOCALHOST}, + uhlc, }; +use dora_message::{ + daemon_to_node::{DaemonReply, NodeConfig}, + metadata::{ArrowTypeInfo, Metadata, MetadataParameters}, + node_to_daemon::{DaemonRequest, DataMessage, DropToken, Timestamped}, + DataflowId, +}; use eyre::{bail, WrapErr}; use shared_memory_extended::{Shmem, ShmemConf}; use std::{ @@ -94,10 +99,10 @@ impl DoraNode { }) .wrap_err("failed to request node config from daemon")?; match reply { - dora_core::daemon_messages::DaemonReply::NodeConfig { + DaemonReply::NodeConfig { result: Ok(node_config), } => Self::init(node_config), - dora_core::daemon_messages::DaemonReply::NodeConfig { result: Err(error) } => { + DaemonReply::NodeConfig { result: Err(error) } => { bail!("failed to get node config from daemon: {error}") } _ => bail!("unexpected reply from daemon"), diff --git a/binaries/cli/Cargo.toml b/binaries/cli/Cargo.toml index 80b05987..059e4086 100644 --- a/binaries/cli/Cargo.toml +++ b/binaries/cli/Cargo.toml @@ -20,6 +20,7 @@ tracing = ["dep:dora-tracing"] clap = { version = "4.0.3", features = ["derive"] } eyre = "0.6.8" dora-core = { workspace = true } +dora-message = { workspace = true } dora-node-api-c = { workspace = true } dora-operator-api-c = { workspace = true } serde = { version = "1.0.136", features = ["derive"] } diff --git a/binaries/cli/src/attach.rs b/binaries/cli/src/attach.rs index d5edeffd..80eefd36 100644 --- a/binaries/cli/src/attach.rs +++ b/binaries/cli/src/attach.rs @@ -1,10 +1,9 @@ use colored::Colorize; use communication_layer_request_reply::{TcpConnection, TcpRequestReplyConnection}; -use dora_core::{ - coordinator_messages::LogMessage, - descriptor::{resolve_path, CoreNodeKind, Descriptor}, - topics::{ControlRequest, ControlRequestReply}, -}; +use dora_core::descriptor::{resolve_path, CoreNodeKind, Descriptor}; +use dora_message::cli_to_coordinator::ControlRequest; +use dora_message::common::LogMessage; +use dora_message::coordinator_to_cli::ControlRequestReply; use eyre::Context; use notify::event::ModifyKind; use notify::{Config, Event as NotifyEvent, EventKind, RecommendedWatcher, RecursiveMode, Watcher}; diff --git a/binaries/cli/src/check.rs b/binaries/cli/src/check.rs index 49bc948e..86cd6f7c 100644 --- a/binaries/cli/src/check.rs +++ b/binaries/cli/src/check.rs @@ -1,6 +1,6 @@ use crate::connect_to_coordinator; use communication_layer_request_reply::TcpRequestReplyConnection; -use dora_core::topics::{ControlRequest, ControlRequestReply}; +use dora_message::{cli_to_coordinator::ControlRequest, coordinator_to_cli::ControlRequestReply}; use eyre::{bail, Context}; use std::{ io::{IsTerminal, Write}, diff --git a/binaries/cli/src/formatting.rs b/binaries/cli/src/formatting.rs index f19e1599..592a8c1e 100644 --- a/binaries/cli/src/formatting.rs +++ b/binaries/cli/src/formatting.rs @@ -1,4 +1,4 @@ -use dora_core::topics::{DataflowResult, NodeErrorCause}; +use dora_message::{common::NodeErrorCause, coordinator_to_cli::DataflowResult}; pub struct FormatDataflowError<'a>(pub &'a DataflowResult); diff --git a/binaries/cli/src/logs.rs b/binaries/cli/src/logs.rs index a15f11b1..027f3793 100644 --- a/binaries/cli/src/logs.rs +++ b/binaries/cli/src/logs.rs @@ -1,5 +1,5 @@ use communication_layer_request_reply::TcpRequestReplyConnection; -use dora_core::topics::{ControlRequest, ControlRequestReply}; +use dora_message::{cli_to_coordinator::ControlRequest, coordinator_to_cli::ControlRequestReply}; use eyre::{bail, Context, Result}; use uuid::Uuid; diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 8dcb6590..ef809196 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -6,11 +6,15 @@ use dora_coordinator::Event; use dora_core::{ descriptor::Descriptor, topics::{ - ControlRequest, ControlRequestReply, DataflowList, DORA_COORDINATOR_PORT_CONTROL_DEFAULT, - DORA_COORDINATOR_PORT_DEFAULT, DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, + DORA_COORDINATOR_PORT_CONTROL_DEFAULT, DORA_COORDINATOR_PORT_DEFAULT, + DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, }, }; use dora_daemon::Daemon; +use dora_message::{ + cli_to_coordinator::ControlRequest, + coordinator_to_cli::{ControlRequestReply, DataflowList, DataflowResult, DataflowStatus}, +}; #[cfg(feature = "tracing")] use dora_tracing::set_up_tracing; use dora_tracing::set_up_tracing_opts; @@ -575,10 +579,7 @@ fn stop_dataflow( } } -fn handle_dataflow_result( - result: dora_core::topics::DataflowResult, - uuid: Option, -) -> Result<(), eyre::Error> { +fn handle_dataflow_result(result: DataflowResult, uuid: Option) -> Result<(), eyre::Error> { if result.is_ok() { Ok(()) } else { @@ -627,9 +628,9 @@ fn list(session: &mut TcpRequestReplyConnection) -> Result<(), eyre::ErrReport> let uuid = entry.id.uuid; let name = entry.id.name.unwrap_or_default(); let status = match entry.status { - dora_core::topics::DataflowStatus::Running => "Running", - dora_core::topics::DataflowStatus::Finished => "Succeeded", - dora_core::topics::DataflowStatus::Failed => "Failed", + DataflowStatus::Running => "Running", + DataflowStatus::Finished => "Succeeded", + DataflowStatus::Failed => "Failed", }; tw.write_all(format!("{uuid}\t{name}\t{status}\n").as_bytes())?; } diff --git a/binaries/cli/src/up.rs b/binaries/cli/src/up.rs index d1376d7c..411bedab 100644 --- a/binaries/cli/src/up.rs +++ b/binaries/cli/src/up.rs @@ -1,5 +1,6 @@ use crate::{check::daemon_running, connect_to_coordinator, LOCALHOST}; -use dora_core::topics::{ControlRequest, DORA_COORDINATOR_PORT_CONTROL_DEFAULT}; +use dora_core::topics::DORA_COORDINATOR_PORT_CONTROL_DEFAULT; +use dora_message::cli_to_coordinator::ControlRequest; use eyre::Context; use std::{fs, net::SocketAddr, path::Path, process::Command, time::Duration}; #[derive(Debug, Default, serde::Serialize, serde::Deserialize)] diff --git a/binaries/coordinator/Cargo.toml b/binaries/coordinator/Cargo.toml index 4132fd66..d47e93f6 100644 --- a/binaries/coordinator/Cargo.toml +++ b/binaries/coordinator/Cargo.toml @@ -26,3 +26,4 @@ serde_json = "1.0.86" names = "0.14.0" ctrlc = "3.2.5" log = { version = "0.4.21", features = ["serde"] } +dora-message = { workspace = true } diff --git a/binaries/coordinator/src/control.rs b/binaries/coordinator/src/control.rs index d37804e4..c103bf45 100644 --- a/binaries/coordinator/src/control.rs +++ b/binaries/coordinator/src/control.rs @@ -2,7 +2,7 @@ use crate::{ tcp_utils::{tcp_receive, tcp_send}, Event, }; -use dora_core::topics::{ControlRequest, ControlRequestReply}; +use dora_message::{cli_to_coordinator::ControlRequest, coordinator_to_cli::ControlRequestReply}; use eyre::{eyre, Context}; use futures::{ future::{self, Either}, diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 1c88d981..88e57290 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -5,14 +5,17 @@ use crate::{ pub use control::ControlEvent; use dora_core::{ config::{NodeId, OperatorId}, - coordinator_messages::{LogMessage, RegisterResult}, - daemon_messages::{DaemonCoordinatorEvent, DaemonCoordinatorReply, Timestamped}, descriptor::{Descriptor, ResolvedNode}, - message::uhlc::{self, HLC}, - topics::{ - ControlRequest, ControlRequestReply, DataflowDaemonResult, DataflowId, DataflowListEntry, - DataflowResult, + uhlc::{self, HLC}, +}; +use dora_message::{ + cli_to_coordinator::ControlRequest, + coordinator_to_cli::{ + ControlRequestReply, DataflowIdAndName, DataflowList, DataflowListEntry, DataflowResult, + DataflowStatus, LogMessage, }, + coordinator_to_daemon::{DaemonCoordinatorEvent, RegisterResult, Timestamped}, + daemon_to_coordinator::{DaemonCoordinatorReply, DataflowDaemonResult}, }; use eyre::{bail, eyre, ContextCompat, WrapErr}; use futures::{stream::FuturesUnordered, Future, Stream, StreamExt}; @@ -169,26 +172,17 @@ async fn start_inner( tracing::warn!("{:?}", err.wrap_err("failed to connect to dora-daemon")); } Event::Daemon(event) => match event { - DaemonEvent::Register { + DaemonRequest::Register { machine_id, mut connection, - dora_version: daemon_version, + version_check_result, listen_port, } => { - let coordinator_version: &&str = &env!("CARGO_PKG_VERSION"); - let version_check = if &daemon_version == coordinator_version { - Ok(()) - } else { - Err(format!( - "version mismatch: daemon v{daemon_version} is \ - not compatible with coordinator v{coordinator_version}" - )) - }; let peer_ip = connection .peer_addr() .map(|addr| addr.ip()) .map_err(|err| format!("failed to get peer addr of connection: {err}")); - let register_result = version_check.and(peer_ip); + let register_result = version_check_result.and(peer_ip); let reply: Timestamped = Timestamped { inner: match ®ister_result { @@ -470,30 +464,28 @@ async fn start_inner( dataflows.sort_by_key(|d| (&d.name, d.uuid)); let running = dataflows.into_iter().map(|d| DataflowListEntry { - id: DataflowId { + id: DataflowIdAndName { uuid: d.uuid, name: d.name.clone(), }, - status: dora_core::topics::DataflowStatus::Running, + status: DataflowStatus::Running, }); let finished_failed = dataflow_results.iter().map(|(&uuid, results)| { let name = archived_dataflows.get(&uuid).and_then(|d| d.name.clone()); - let id = DataflowId { uuid, name }; + let id = DataflowIdAndName { uuid, name }; let status = if results.values().all(|r| r.is_ok()) { - dora_core::topics::DataflowStatus::Finished + DataflowStatus::Finished } else { - dora_core::topics::DataflowStatus::Failed + DataflowStatus::Failed }; DataflowListEntry { id, status } }); - let reply = Ok(ControlRequestReply::DataflowList( - dora_core::topics::DataflowList( - running.chain(finished_failed).collect(), - ), - )); + let reply = Ok(ControlRequestReply::DataflowList(DataflowList( + running.chain(finished_failed).collect(), + ))); let _ = reply_sender.send(reply); } ControlRequest::DaemonConnected => { @@ -965,7 +957,7 @@ pub enum Event { DaemonHeartbeat { machine_id: String }, Dataflow { uuid: Uuid, event: DataflowEvent }, Control(ControlEvent), - Daemon(DaemonEvent), + Daemon(DaemonRequest), DaemonHeartbeatInterval, CtrlC, Log(LogMessage), @@ -995,9 +987,9 @@ pub enum DataflowEvent { } #[derive(Debug)] -pub enum DaemonEvent { +pub enum DaemonRequest { Register { - dora_version: String, + version_check_result: Result<(), String>, machine_id: String, connection: TcpStream, listen_port: u16, diff --git a/binaries/coordinator/src/listener.rs b/binaries/coordinator/src/listener.rs index 8152d26e..8b7924e3 100644 --- a/binaries/coordinator/src/listener.rs +++ b/binaries/coordinator/src/listener.rs @@ -1,5 +1,6 @@ -use crate::{tcp_utils::tcp_receive, DaemonEvent, DataflowEvent, Event}; -use dora_core::{coordinator_messages, daemon_messages::Timestamped, message::uhlc::HLC}; +use crate::{tcp_utils::tcp_receive, DaemonRequest, DataflowEvent, Event}; +use dora_core::uhlc::HLC; +use dora_message::daemon_to_coordinator::{CoordinatorRequest, DaemonEvent, Timestamped}; use eyre::Context; use std::{io::ErrorKind, net::SocketAddr, sync::Arc}; use tokio::{ @@ -34,7 +35,7 @@ pub async fn handle_connection( continue; } }; - let message: Timestamped = + let message: Timestamped = match serde_json::from_slice(&raw).wrap_err("failed to deserialize node message") { Ok(e) => e, Err(err) => { @@ -49,22 +50,18 @@ pub async fn handle_connection( // handle the message and translate it to a DaemonEvent match message.inner { - coordinator_messages::CoordinatorRequest::Register { - machine_id, - dora_version, - listen_port, - } => { - let event = DaemonEvent::Register { - dora_version, - machine_id, + CoordinatorRequest::Register(register_request) => { + let event = DaemonRequest::Register { connection, - listen_port, + version_check_result: register_request.check_version(), + machine_id: register_request.machine_id, + listen_port: register_request.listen_port, }; let _ = events_tx.send(Event::Daemon(event)).await; break; } - coordinator_messages::CoordinatorRequest::Event { machine_id, event } => match event { - coordinator_messages::DaemonEvent::AllNodesReady { + CoordinatorRequest::Event { machine_id, event } => match event { + DaemonEvent::AllNodesReady { dataflow_id, exited_before_subscribe, } => { @@ -79,7 +76,7 @@ pub async fn handle_connection( break; } } - coordinator_messages::DaemonEvent::AllNodesFinished { + DaemonEvent::AllNodesFinished { dataflow_id, result, } => { @@ -91,13 +88,13 @@ pub async fn handle_connection( break; } } - coordinator_messages::DaemonEvent::Heartbeat => { + DaemonEvent::Heartbeat => { let event = Event::DaemonHeartbeat { machine_id }; if events_tx.send(event).await.is_err() { break; } } - coordinator_messages::DaemonEvent::Log(message) => { + DaemonEvent::Log(message) => { let event = Event::Log(message); if events_tx.send(event).await.is_err() { break; diff --git a/binaries/coordinator/src/log_subscriber.rs b/binaries/coordinator/src/log_subscriber.rs index c9520037..cb602d47 100644 --- a/binaries/coordinator/src/log_subscriber.rs +++ b/binaries/coordinator/src/log_subscriber.rs @@ -1,4 +1,4 @@ -use dora_core::coordinator_messages::LogMessage; +use dora_message::coordinator_to_cli::LogMessage; use eyre::{Context, ContextCompat}; use crate::tcp_utils::tcp_send; diff --git a/binaries/coordinator/src/run/mod.rs b/binaries/coordinator/src/run/mod.rs index 534b857b..1aec2c9c 100644 --- a/binaries/coordinator/src/run/mod.rs +++ b/binaries/coordinator/src/run/mod.rs @@ -4,11 +4,12 @@ use crate::{ }; use dora_core::{ - daemon_messages::{ - DaemonCoordinatorEvent, DaemonCoordinatorReply, SpawnDataflowNodes, Timestamped, - }, descriptor::{Descriptor, ResolvedNode}, - message::uhlc::HLC, + uhlc::HLC, +}; +use dora_message::{ + coordinator_to_daemon::{DaemonCoordinatorEvent, SpawnDataflowNodes, Timestamped}, + daemon_to_coordinator::DaemonCoordinatorReply, }; use eyre::{bail, eyre, ContextCompat, WrapErr}; use std::{ diff --git a/binaries/daemon/Cargo.toml b/binaries/daemon/Cargo.toml index 2f1be890..d95b2e92 100644 --- a/binaries/daemon/Cargo.toml +++ b/binaries/daemon/Cargo.toml @@ -29,6 +29,7 @@ dora-download = { workspace = true } dora-tracing = { workspace = true, optional = true } dora-arrow-convert = { workspace = true } dora-node-api = { workspace = true } +dora-message = { workspace = true } serde_yaml = "0.8.23" uuid = { version = "1.7", features = ["v7"] } futures = "0.3.25" diff --git a/binaries/daemon/src/coordinator.rs b/binaries/daemon/src/coordinator.rs index 895bf49b..ad72ec02 100644 --- a/binaries/daemon/src/coordinator.rs +++ b/binaries/daemon/src/coordinator.rs @@ -2,10 +2,11 @@ use crate::{ socket_stream_utils::{socket_stream_receive, socket_stream_send}, DaemonCoordinatorEvent, }; -use dora_core::{ - coordinator_messages::{CoordinatorRequest, RegisterResult}, - daemon_messages::{DaemonCoordinatorReply, Timestamped}, - message::uhlc::HLC, +use dora_core::uhlc::HLC; +use dora_message::{ + common::Timestamped, + coordinator_to_daemon::RegisterResult, + daemon_to_coordinator::{CoordinatorRequest, DaemonCoordinatorReply, DaemonRegisterRequest}, }; use eyre::{eyre, Context}; use std::{io::ErrorKind, net::SocketAddr}; @@ -34,11 +35,7 @@ pub async fn register( .set_nodelay(true) .wrap_err("failed to set TCP_NODELAY")?; let register = serde_json::to_vec(&Timestamped { - inner: CoordinatorRequest::Register { - dora_version: env!("CARGO_PKG_VERSION").to_owned(), - machine_id, - listen_port, - }, + inner: CoordinatorRequest::Register(DaemonRegisterRequest::new(machine_id, listen_port)), timestamp: clock.new_timestamp(), })?; socket_stream_send(&mut stream, ®ister) diff --git a/binaries/daemon/src/inter_daemon.rs b/binaries/daemon/src/inter_daemon.rs index 21cce12a..bcc09671 100644 --- a/binaries/daemon/src/inter_daemon.rs +++ b/binaries/daemon/src/inter_daemon.rs @@ -1,5 +1,5 @@ use crate::socket_stream_utils::{socket_stream_receive, socket_stream_send}; -use dora_core::daemon_messages::{InterDaemonEvent, Timestamped}; +use dora_message::{common::Timestamped, daemon_to_daemon::InterDaemonEvent}; use eyre::{Context, ContextCompat}; use std::{collections::BTreeMap, io::ErrorKind, net::SocketAddr}; use tokio::net::{TcpListener, TcpStream}; diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index d8e19e3d..f5f4c13a 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -1,28 +1,25 @@ use aligned_vec::{AVec, ConstAlign}; use coordinator::CoordinatorEvent; use crossbeam::queue::ArrayQueue; -use dora_core::config::{Input, OperatorId}; -use dora_core::coordinator_messages::{CoordinatorRequest, Level, LogMessage}; -use dora_core::daemon_messages::{ - DataMessage, DynamicNodeEvent, InterDaemonEvent, NodeConfig, Timestamped, -}; -use dora_core::descriptor::runtime_node_inputs; -use dora_core::message::uhlc::{self, HLC}; -use dora_core::message::{ArrowTypeInfo, Metadata}; -use dora_core::topics::LOCALHOST; -use dora_core::topics::{ - DataflowDaemonResult, DataflowResult, NodeError, NodeErrorCause, NodeExitStatus, -}; use dora_core::{ - config::{DataId, InputMapping, NodeId}, - coordinator_messages::DaemonEvent, - daemon_messages::{ - self, DaemonCoordinatorEvent, DaemonCoordinatorReply, DaemonReply, DataflowId, DropToken, - SpawnDataflowNodes, + config::{DataId, Input, InputMapping, NodeId, OperatorId}, + descriptor::{runtime_node_inputs, CoreNodeKind, Descriptor, ResolvedNode}, + topics::LOCALHOST, + uhlc::{self, HLC}, +}; +use dora_message::{ + common::{DataMessage, DropToken, LogLevel, NodeError, NodeErrorCause, NodeExitStatus}, + coordinator_to_cli::DataflowResult, + coordinator_to_daemon::{DaemonCoordinatorEvent, SpawnDataflowNodes}, + daemon_to_coordinator::{ + CoordinatorRequest, DaemonCoordinatorReply, DaemonEvent, DataflowDaemonResult, LogMessage, }, - descriptor::{CoreNodeKind, Descriptor, ResolvedNode}, + daemon_to_daemon::InterDaemonEvent, + daemon_to_node::{DaemonReply, NodeConfig, NodeDropEvent, NodeEvent}, + metadata::{self, ArrowTypeInfo}, + node_to_daemon::{DynamicNodeEvent, Timestamped}, + DataflowId, }; - use dora_node_api::Parameter; use eyre::{bail, eyre, Context, ContextCompat, Result}; use futures::{future, stream, FutureExt, TryFutureExt}; @@ -32,21 +29,23 @@ use local_listener::DynamicNodeEventWrapper; use pending::PendingNodes; use shared_memory_server::ShmemConf; use socket_stream_utils::socket_stream_send; -use std::sync::Arc; -use std::time::Instant; use std::{ collections::{BTreeMap, BTreeSet, HashMap}, net::SocketAddr, path::{Path, PathBuf}, - time::Duration, + sync::Arc, + time::{Duration, Instant}, }; use sysinfo::Pid; -use tokio::fs::File; -use tokio::io::AsyncReadExt; -use tokio::net::TcpStream; -use tokio::sync::mpsc::UnboundedSender; -use tokio::sync::oneshot::Sender; -use tokio::sync::{mpsc, oneshot}; +use tokio::{ + fs::File, + io::AsyncReadExt, + net::TcpStream, + sync::{ + mpsc::{self, UnboundedSender}, + oneshot::{self, Sender}, + }, +}; use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; use tracing::{error, warn}; use uuid::{NoContext, Timestamp, Uuid}; @@ -678,7 +677,7 @@ impl Daemon { log_messages.push(LogMessage { dataflow_id, node_id: Some(node_id.clone()), - level: Level::Error, + level: LogLevel::Error, target: None, module_path: None, file: None, @@ -923,11 +922,7 @@ impl Daemon { format!("Reload failed: no running dataflow with ID `{dataflow_id}`") })?; if let Some(channel) = dataflow.subscribe_channels.get(&node_id) { - match send_with_timestamp( - channel, - daemon_messages::NodeEvent::Reload { operator_id }, - &self.clock, - ) { + match send_with_timestamp(channel, NodeEvent::Reload { operator_id }, &self.clock) { Ok(()) => {} Err(_) => { dataflow.subscribe_channels.remove(&node_id); @@ -942,7 +937,7 @@ impl Daemon { dataflow_id: Uuid, node_id: NodeId, output_id: DataId, - metadata: dora_core::message::Metadata, + metadata: dora_message::metadata::Metadata, data: Option, ) -> Result<(), eyre::ErrReport> { let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| { @@ -990,7 +985,7 @@ impl Daemon { async fn subscribe( dataflow: &mut RunningDataflow, node_id: NodeId, - event_sender: UnboundedSender>, + event_sender: UnboundedSender>, clock: &HLC, ) { // some inputs might have been closed already -> report those events @@ -1010,24 +1005,20 @@ impl Daemon { for input_id in closed_inputs { let _ = send_with_timestamp( &event_sender, - daemon_messages::NodeEvent::InputClosed { + NodeEvent::InputClosed { id: input_id.clone(), }, clock, ); } if dataflow.open_inputs(&node_id).is_empty() { - let _ = send_with_timestamp( - &event_sender, - daemon_messages::NodeEvent::AllInputsClosed, - clock, - ); + let _ = send_with_timestamp(&event_sender, NodeEvent::AllInputsClosed, clock); } // if a stop event was already sent for the dataflow, send it to // the newly connected node too if dataflow.stop_sent { - let _ = send_with_timestamp(&event_sender, daemon_messages::NodeEvent::Stop, clock); + let _ = send_with_timestamp(&event_sender, NodeEvent::Stop, clock); } dataflow.subscribe_channels.insert(node_id, event_sender); @@ -1142,7 +1133,7 @@ impl Daemon { let send_result = send_with_timestamp( channel, - daemon_messages::NodeEvent::Input { + NodeEvent::Input { id: input_id.clone(), metadata: metadata.clone(), data: None, @@ -1189,7 +1180,7 @@ impl Daemon { let send_result = send_with_timestamp( channel, - daemon_messages::NodeEvent::Input { + NodeEvent::Input { id: input_id.clone(), metadata: metadata.clone(), data: Some(message.clone()), @@ -1263,9 +1254,9 @@ impl Daemon { dataflow_id, node_id: Some(node_id.clone()), level: if node_result.is_ok() { - Level::Info + LogLevel::Info } else { - Level::Error + LogLevel::Error }, target: None, module_path: None, @@ -1304,7 +1295,7 @@ async fn send_output_to_local_receivers( node_id: NodeId, output_id: DataId, dataflow: &mut RunningDataflow, - metadata: &dora_core::message::Metadata, + metadata: &metadata::Metadata, data: Option, clock: &HLC, ) -> Result>>, eyre::ErrReport> { @@ -1316,7 +1307,7 @@ async fn send_output_to_local_receivers( let mut closed = Vec::new(); for (receiver_id, input_id) in local_receivers { if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) { - let item = daemon_messages::NodeEvent::Input { + let item = NodeEvent::Input { id: input_id.clone(), metadata: metadata.clone(), data: data.clone(), @@ -1446,15 +1437,14 @@ fn close_input( if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) { let _ = send_with_timestamp( channel, - daemon_messages::NodeEvent::InputClosed { + NodeEvent::InputClosed { id: input_id.clone(), }, clock, ); if dataflow.open_inputs(receiver_id).is_empty() { - let _ = - send_with_timestamp(channel, daemon_messages::NodeEvent::AllInputsClosed, clock); + let _ = send_with_timestamp(channel, NodeEvent::AllInputsClosed, clock); } } } @@ -1470,8 +1460,8 @@ pub struct RunningDataflow { /// Local nodes that are not started yet pending_nodes: PendingNodes, - subscribe_channels: HashMap>>, - drop_channels: HashMap>>, + subscribe_channels: HashMap>>, + drop_channels: HashMap>>, mappings: HashMap>, timers: BTreeMap>, open_inputs: BTreeMap>, @@ -1553,7 +1543,7 @@ impl RunningDataflow { Parameter::String("".into()), ); - let metadata = dora_core::message::Metadata::from_parameters( + let metadata = metadata::Metadata::from_parameters( hlc.new_timestamp(), ArrowTypeInfo::empty(), parameters, @@ -1597,7 +1587,7 @@ impl RunningDataflow { .await?; for (_node_id, channel) in self.subscribe_channels.drain() { - let _ = send_with_timestamp(&channel, daemon_messages::NodeEvent::Stop, clock); + let _ = send_with_timestamp(&channel, NodeEvent::Stop, clock); } let running_nodes = self.running_nodes.clone(); @@ -1637,7 +1627,7 @@ impl RunningDataflow { let result = match self.drop_channels.get_mut(&info.owner) { Some(channel) => send_with_timestamp( channel, - daemon_messages::NodeDropEvent::OutputDropped { drop_token }, + NodeDropEvent::OutputDropped { drop_token }, clock, ) .wrap_err("send failed"), @@ -1701,11 +1691,11 @@ pub enum DaemonNodeEvent { reply_sender: oneshot::Sender, }, Subscribe { - event_sender: UnboundedSender>, + event_sender: UnboundedSender>, reply_sender: oneshot::Sender, }, SubscribeDrop { - event_sender: UnboundedSender>, + event_sender: UnboundedSender>, reply_sender: oneshot::Sender, }, CloseOutputs { @@ -1714,7 +1704,7 @@ pub enum DaemonNodeEvent { }, SendOut { output_id: DataId, - metadata: dora_core::message::Metadata, + metadata: metadata::Metadata, data: Option, }, ReportDrop { @@ -1730,13 +1720,13 @@ pub enum DoraEvent { Timer { dataflow_id: DataflowId, interval: Duration, - metadata: dora_core::message::Metadata, + metadata: metadata::Metadata, }, Logs { dataflow_id: DataflowId, output_id: OutputId, message: DataMessage, - metadata: Metadata, + metadata: metadata::Metadata, }, SpawnedNodeResult { dataflow_id: DataflowId, diff --git a/binaries/daemon/src/local_listener.rs b/binaries/daemon/src/local_listener.rs index d9ff858a..1d324747 100644 --- a/binaries/daemon/src/local_listener.rs +++ b/binaries/daemon/src/local_listener.rs @@ -1,5 +1,8 @@ use crate::socket_stream_utils::{socket_stream_receive, socket_stream_send}; -use dora_core::daemon_messages::{DaemonReply, DaemonRequest, DynamicNodeEvent, Timestamped}; +use dora_message::{ + daemon_to_node::DaemonReply, + node_to_daemon::{DaemonRequest, DynamicNodeEvent, Timestamped}, +}; use eyre::Context; use std::{io::ErrorKind, net::SocketAddr}; use tokio::{ diff --git a/binaries/daemon/src/node_communication/mod.rs b/binaries/daemon/src/node_communication/mod.rs index c05078a3..76a3c667 100644 --- a/binaries/daemon/src/node_communication/mod.rs +++ b/binaries/daemon/src/node_communication/mod.rs @@ -1,12 +1,14 @@ use crate::{DaemonNodeEvent, Event}; use dora_core::{ config::{DataId, LocalCommunicationConfig, NodeId}, - daemon_messages::{ - DaemonCommunication, DaemonReply, DaemonRequest, DataflowId, NodeDropEvent, NodeEvent, - Timestamped, - }, - message::uhlc, topics::LOCALHOST, + uhlc, +}; +use dora_message::{ + common::{DropToken, Timestamped}, + daemon_to_node::{DaemonCommunication, DaemonReply, NodeDropEvent, NodeEvent}, + node_to_daemon::DaemonRequest, + DataflowId, }; use eyre::{eyre, Context}; use futures::{future, task, Future}; @@ -215,24 +217,14 @@ impl Listener { } match message.inner { - DaemonRequest::Register { - dataflow_id, - node_id, - dora_version: node_api_version, - } => { - let daemon_version = env!("CARGO_PKG_VERSION"); - let result = if node_api_version == daemon_version { - Ok(()) - } else { - Err(format!( - "version mismatch: node API v{node_api_version} is not compatible \ - with daemon v{daemon_version}" - )) - }; + DaemonRequest::Register(register_request) => { + let result = register_request.check_version(); let send_result = connection .send_reply(DaemonReply::Result(result.clone())) .await .wrap_err("failed to send register reply"); + let dataflow_id = register_request.dataflow_id; + let node_id = register_request.node_id; match (result, send_result) { (Ok(()), Ok(())) => { let mut listener = Listener { @@ -517,10 +509,7 @@ impl Listener { Ok(()) } - async fn report_drop_tokens( - &mut self, - drop_tokens: Vec, - ) -> eyre::Result<()> { + async fn report_drop_tokens(&mut self, drop_tokens: Vec) -> eyre::Result<()> { if !drop_tokens.is_empty() { let event = Event::Node { dataflow_id: self.dataflow_id, diff --git a/binaries/daemon/src/node_communication/shmem.rs b/binaries/daemon/src/node_communication/shmem.rs index 212ae19d..61133ada 100644 --- a/binaries/daemon/src/node_communication/shmem.rs +++ b/binaries/daemon/src/node_communication/shmem.rs @@ -2,10 +2,9 @@ use std::{collections::BTreeMap, sync::Arc}; use super::{Connection, Listener}; use crate::Event; -use dora_core::{ - config::DataId, - daemon_messages::{DaemonReply, DaemonRequest, Timestamped}, - message::uhlc::HLC, +use dora_core::{config::DataId, uhlc::HLC}; +use dora_message::{ + common::Timestamped, daemon_to_node::DaemonReply, node_to_daemon::DaemonRequest, }; use eyre::eyre; use shared_memory_server::ShmemServer; diff --git a/binaries/daemon/src/node_communication/tcp.rs b/binaries/daemon/src/node_communication/tcp.rs index 2d787d8d..20402960 100644 --- a/binaries/daemon/src/node_communication/tcp.rs +++ b/binaries/daemon/src/node_communication/tcp.rs @@ -5,10 +5,9 @@ use crate::{ socket_stream_utils::{socket_stream_receive, socket_stream_send}, Event, }; -use dora_core::{ - config::DataId, - daemon_messages::{DaemonReply, DaemonRequest, Timestamped}, - message::uhlc::HLC, +use dora_core::{config::DataId, uhlc::HLC}; +use dora_message::{ + common::Timestamped, daemon_to_node::DaemonReply, node_to_daemon::DaemonRequest, }; use eyre::Context; use tokio::{ diff --git a/binaries/daemon/src/node_communication/unix_domain.rs b/binaries/daemon/src/node_communication/unix_domain.rs index 1268821e..4348afe8 100644 --- a/binaries/daemon/src/node_communication/unix_domain.rs +++ b/binaries/daemon/src/node_communication/unix_domain.rs @@ -1,9 +1,8 @@ use std::{collections::BTreeMap, io::ErrorKind, sync::Arc}; -use dora_core::{ - config::DataId, - daemon_messages::{DaemonReply, DaemonRequest, Timestamped}, - message::uhlc::HLC, +use dora_core::{config::DataId, uhlc::HLC}; +use dora_message::{ + common::Timestamped, daemon_to_node::DaemonReply, node_to_daemon::DaemonRequest, }; use eyre::Context; use tokio::{ diff --git a/binaries/daemon/src/pending.rs b/binaries/daemon/src/pending.rs index e4037fdc..9622fd8e 100644 --- a/binaries/daemon/src/pending.rs +++ b/binaries/daemon/src/pending.rs @@ -2,9 +2,12 @@ use std::collections::{BTreeSet, HashMap, HashSet}; use dora_core::{ config::NodeId, - coordinator_messages::{CoordinatorRequest, DaemonEvent, Level, LogMessage}, - daemon_messages::{DaemonReply, DataflowId, Timestamped}, - message::uhlc::{Timestamp, HLC}, + uhlc::{Timestamp, HLC}, +}; +use dora_message::{ + daemon_to_coordinator::{CoordinatorRequest, DaemonEvent, LogLevel, LogMessage, Timestamped}, + daemon_to_node::DaemonReply, + DataflowId, }; use eyre::{bail, Context}; use tokio::{net::TcpStream, sync::oneshot}; @@ -83,7 +86,7 @@ impl PendingNodes { log.push(LogMessage { dataflow_id: self.dataflow_id, node_id: Some(node_id.clone()), - level: Level::Warn, + level: LogLevel::Warn, target: None, module_path: None, file: None, diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index f84fac75..0a0243e2 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -1,21 +1,25 @@ use crate::{ - log, node_communication::spawn_listener_loop, node_inputs, DoraEvent, Event, NodeExitStatus, - OutputId, RunningNode, + log, node_communication::spawn_listener_loop, node_inputs, DoraEvent, Event, OutputId, + RunningNode, }; use aligned_vec::{AVec, ConstAlign}; use crossbeam::queue::ArrayQueue; use dora_arrow_convert::IntoArrow; use dora_core::{ config::DataId, - daemon_messages::{DataMessage, DataflowId, NodeConfig, RuntimeConfig, Timestamped}, descriptor::{ resolve_path, source_is_url, Descriptor, OperatorDefinition, OperatorSource, PythonSource, ResolvedNode, DYNAMIC_SOURCE, SHELL_SOURCE, }, get_python_path, - message::uhlc::HLC, + uhlc::HLC, }; use dora_download::download_file; +use dora_message::{ + daemon_to_coordinator::{DataMessage, NodeExitStatus, Timestamped}, + daemon_to_node::{NodeConfig, RuntimeConfig}, + DataflowId, +}; use dora_node_api::{ arrow::array::ArrayData, arrow_utils::{copy_array_into_sample, required_data_size}, diff --git a/binaries/runtime/Cargo.toml b/binaries/runtime/Cargo.toml index 2f6b923d..b21a15bb 100644 --- a/binaries/runtime/Cargo.toml +++ b/binaries/runtime/Cargo.toml @@ -15,6 +15,7 @@ dora-operator-api-types = { workspace = true } dora-core = { workspace = true } dora-tracing = { workspace = true, optional = true } dora-metrics = { workspace = true, optional = true } +dora-message = { workspace = true } eyre = "0.6.8" futures = "0.3.21" futures-concurrency = "7.1.0" diff --git a/binaries/runtime/src/lib.rs b/binaries/runtime/src/lib.rs index 308d59f1..22b81f50 100644 --- a/binaries/runtime/src/lib.rs +++ b/binaries/runtime/src/lib.rs @@ -2,9 +2,9 @@ use dora_core::{ config::{DataId, OperatorId}, - daemon_messages::{NodeConfig, RuntimeConfig}, descriptor::OperatorConfig, }; +use dora_message::daemon_to_node::{NodeConfig, RuntimeConfig}; use dora_metrics::init_meter_provider; use dora_node_api::{DoraNode, Event}; use eyre::{bail, Context, Result}; diff --git a/binaries/runtime/src/operator/mod.rs b/binaries/runtime/src/operator/mod.rs index b78fb939..aa7cd87c 100644 --- a/binaries/runtime/src/operator/mod.rs +++ b/binaries/runtime/src/operator/mod.rs @@ -1,9 +1,9 @@ use dora_core::{ config::{DataId, NodeId}, descriptor::{Descriptor, OperatorDefinition, OperatorSource}, - message::{ArrowTypeInfo, MetadataParameters}, }; -use dora_node_api::{DataSample, Event}; +use dora_message::metadata::ArrowTypeInfo; +use dora_node_api::{DataSample, Event, MetadataParameters}; use eyre::{Context, Result}; use std::any::Any; use tokio::sync::{mpsc::Sender, oneshot}; diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index 21e00ec6..a1784f72 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -290,7 +290,7 @@ mod callback_impl { use super::SendOutputCallback; use aligned_vec::{AVec, ConstAlign}; use arrow::{array::ArrayData, pyarrow::FromPyArrow}; - use dora_core::message::ArrowTypeInfo; + use dora_message::metadata::ArrowTypeInfo; use dora_node_api::{ arrow_utils::{copy_array_into_sample, required_data_size}, ZERO_COPY_THRESHOLD, diff --git a/examples/multiple-daemons/run.rs b/examples/multiple-daemons/run.rs index 7dcb1ab8..cdbfd1dc 100644 --- a/examples/multiple-daemons/run.rs +++ b/examples/multiple-daemons/run.rs @@ -1,10 +1,11 @@ use dora_coordinator::{ControlEvent, Event}; use dora_core::{ descriptor::Descriptor, - topics::{ - ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_CONTROL_DEFAULT, - DORA_COORDINATOR_PORT_DEFAULT, - }, + topics::{DORA_COORDINATOR_PORT_CONTROL_DEFAULT, DORA_COORDINATOR_PORT_DEFAULT}, +}; +use dora_message::{ + cli_to_coordinator::ControlRequest, + coordinator_to_cli::{ControlRequestReply, DataflowIdAndName}, }; use dora_tracing::set_up_tracing; use eyre::{bail, Context}; @@ -166,7 +167,9 @@ async fn connected_machines( Ok(machines) } -async fn running_dataflows(coordinator_events_tx: &Sender) -> eyre::Result> { +async fn running_dataflows( + coordinator_events_tx: &Sender, +) -> eyre::Result> { let (reply_sender, reply) = oneshot::channel(); coordinator_events_tx .send(Event::Control(ControlEvent::IncomingRequest { diff --git a/libraries/core/Cargo.toml b/libraries/core/Cargo.toml index 24dc0e5a..5a916eba 100644 --- a/libraries/core/Cargo.toml +++ b/libraries/core/Cargo.toml @@ -15,11 +15,10 @@ serde_yaml = "0.9.11" once_cell = "1.13.0" which = "5.0.0" uuid = { version = "1.7", features = ["serde", "v7"] } -dora-message = { workspace = true } tracing = "0.1" serde-with-expand-env = "1.1.0" tokio = { version = "1.24.1", features = ["fs", "process", "sync"] } -aligned-vec = { version = "0.5.0", features = ["serde"] } schemars = "0.8.19" serde_json = "1.0.117" log = { version = "0.4.21", features = ["serde"] } +uhlc = "0.5.1" diff --git a/libraries/core/src/coordinator_messages.rs b/libraries/core/src/coordinator_messages.rs deleted file mode 100644 index 695946ea..00000000 --- a/libraries/core/src/coordinator_messages.rs +++ /dev/null @@ -1,58 +0,0 @@ -use crate::{config::NodeId, daemon_messages::DataflowId, topics::DataflowDaemonResult}; -use eyre::eyre; -pub use log::Level; - -#[derive(Debug, serde::Serialize, serde::Deserialize)] -pub enum CoordinatorRequest { - Register { - dora_version: String, - machine_id: String, - listen_port: u16, - }, - Event { - machine_id: String, - event: DaemonEvent, - }, -} - -#[derive(Debug, serde::Serialize, serde::Deserialize)] -#[must_use] -pub struct LogMessage { - pub dataflow_id: DataflowId, - pub node_id: Option, - pub level: log::Level, - pub target: Option, - pub module_path: Option, - pub file: Option, - pub line: Option, - pub message: String, -} - -#[derive(Debug, serde::Serialize, serde::Deserialize)] -pub enum DaemonEvent { - AllNodesReady { - dataflow_id: DataflowId, - exited_before_subscribe: Vec, - }, - AllNodesFinished { - dataflow_id: DataflowId, - result: DataflowDaemonResult, - }, - Heartbeat, - Log(LogMessage), -} - -#[derive(Debug, serde::Serialize, serde::Deserialize)] -pub enum RegisterResult { - Ok, - Err(String), -} - -impl RegisterResult { - pub fn to_result(self) -> eyre::Result<()> { - match self { - RegisterResult::Ok => Ok(()), - RegisterResult::Err(err) => Err(eyre!(err)), - } - } -} diff --git a/libraries/core/src/daemon_messages.rs b/libraries/core/src/daemon_messages.rs deleted file mode 100644 index 0bda5b42..00000000 --- a/libraries/core/src/daemon_messages.rs +++ /dev/null @@ -1,302 +0,0 @@ -use std::{ - collections::{BTreeMap, BTreeSet}, - fmt, - net::SocketAddr, - path::PathBuf, - time::Duration, -}; - -use crate::{ - config::{DataId, NodeId, NodeRunConfig, OperatorId}, - descriptor::{Descriptor, OperatorDefinition, ResolvedNode}, -}; -use aligned_vec::{AVec, ConstAlign}; -use dora_message::{uhlc, Metadata}; -use uuid::{NoContext, Timestamp, Uuid}; - -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct NodeConfig { - pub dataflow_id: DataflowId, - pub node_id: NodeId, - pub run_config: NodeRunConfig, - pub daemon_communication: DaemonCommunication, - pub dataflow_descriptor: Descriptor, - pub dynamic: bool, -} - -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub enum DaemonCommunication { - Shmem { - daemon_control_region_id: SharedMemoryId, - daemon_drop_region_id: SharedMemoryId, - daemon_events_region_id: SharedMemoryId, - daemon_events_close_region_id: SharedMemoryId, - }, - Tcp { - socket_addr: SocketAddr, - }, - #[cfg(unix)] - UnixDomain { - socket_file: PathBuf, - }, -} - -#[derive(Debug, serde::Serialize, serde::Deserialize)] -pub struct RuntimeConfig { - pub node: NodeConfig, - pub operators: Vec, -} - -#[derive(Debug, serde::Serialize, serde::Deserialize)] -pub enum DaemonRequest { - Register { - dataflow_id: DataflowId, - node_id: NodeId, - dora_version: String, - }, - Subscribe, - SendMessage { - output_id: DataId, - metadata: Metadata, - data: Option, - }, - CloseOutputs(Vec), - /// Signals that the node is finished sending outputs and that it received all - /// required drop tokens. - OutputsDone, - NextEvent { - drop_tokens: Vec, - }, - ReportDropTokens { - drop_tokens: Vec, - }, - SubscribeDrop, - NextFinishedDropTokens, - EventStreamDropped, - NodeConfig { - node_id: NodeId, - }, -} - -impl DaemonRequest { - pub fn expects_tcp_bincode_reply(&self) -> bool { - #[allow(clippy::match_like_matches_macro)] - match self { - DaemonRequest::SendMessage { .. } - | DaemonRequest::NodeConfig { .. } - | DaemonRequest::ReportDropTokens { .. } => false, - DaemonRequest::Register { .. } - | DaemonRequest::Subscribe - | DaemonRequest::CloseOutputs(_) - | DaemonRequest::OutputsDone - | DaemonRequest::NextEvent { .. } - | DaemonRequest::SubscribeDrop - | DaemonRequest::NextFinishedDropTokens - | DaemonRequest::EventStreamDropped => true, - } - } - - pub fn expects_tcp_json_reply(&self) -> bool { - #[allow(clippy::match_like_matches_macro)] - match self { - DaemonRequest::NodeConfig { .. } => true, - DaemonRequest::Register { .. } - | DaemonRequest::Subscribe - | DaemonRequest::CloseOutputs(_) - | DaemonRequest::OutputsDone - | DaemonRequest::NextEvent { .. } - | DaemonRequest::SubscribeDrop - | DaemonRequest::NextFinishedDropTokens - | DaemonRequest::ReportDropTokens { .. } - | DaemonRequest::SendMessage { .. } - | DaemonRequest::EventStreamDropped => false, - } - } -} - -#[derive(serde::Serialize, serde::Deserialize, Clone)] -pub enum DataMessage { - Vec(AVec>), - SharedMemory { - shared_memory_id: String, - len: usize, - drop_token: DropToken, - }, -} - -impl DataMessage { - pub fn drop_token(&self) -> Option { - match self { - DataMessage::Vec(_) => None, - DataMessage::SharedMemory { drop_token, .. } => Some(*drop_token), - } - } -} - -impl fmt::Debug for DataMessage { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Self::Vec(v) => f - .debug_struct("Vec") - .field("len", &v.len()) - .finish_non_exhaustive(), - Self::SharedMemory { - shared_memory_id, - len, - drop_token, - } => f - .debug_struct("SharedMemory") - .field("shared_memory_id", shared_memory_id) - .field("len", len) - .field("drop_token", drop_token) - .finish(), - } - } -} - -type SharedMemoryId = String; - -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -#[must_use] -pub enum DaemonReply { - Result(Result<(), String>), - PreparedMessage { shared_memory_id: SharedMemoryId }, - NextEvents(Vec>), - NextDropEvents(Vec>), - NodeConfig { result: Result }, - Empty, -} - -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct Timestamped { - pub inner: T, - pub timestamp: uhlc::Timestamp, -} - -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub enum NodeEvent { - Stop, - Reload { - operator_id: Option, - }, - Input { - id: DataId, - metadata: Metadata, - data: Option, - }, - InputClosed { - id: DataId, - }, - AllInputsClosed, -} - -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub enum NodeDropEvent { - OutputDropped { drop_token: DropToken }, -} - -#[derive(Debug, serde::Serialize, serde::Deserialize)] -pub struct DropEvent { - pub tokens: Vec, -} - -#[derive( - Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize, -)] -pub struct DropToken(Uuid); - -impl DropToken { - pub fn generate() -> Self { - Self(Uuid::new_v7(Timestamp::now(NoContext))) - } -} - -#[derive(Debug, serde::Serialize, serde::Deserialize)] -pub enum InputData { - SharedMemory(SharedMemoryInput), - Vec(Vec), -} - -impl InputData { - pub fn drop_token(&self) -> Option { - match self { - InputData::SharedMemory(data) => Some(data.drop_token), - InputData::Vec(_) => None, - } - } -} - -#[derive(Debug, serde::Serialize, serde::Deserialize)] -pub struct SharedMemoryInput { - pub shared_memory_id: SharedMemoryId, - pub len: usize, - pub drop_token: DropToken, -} - -#[derive(Debug, serde::Deserialize, serde::Serialize)] -pub enum DaemonCoordinatorEvent { - Spawn(SpawnDataflowNodes), - AllNodesReady { - dataflow_id: DataflowId, - exited_before_subscribe: Vec, - }, - StopDataflow { - dataflow_id: DataflowId, - grace_duration: Option, - }, - ReloadDataflow { - dataflow_id: DataflowId, - node_id: NodeId, - operator_id: Option, - }, - Logs { - dataflow_id: DataflowId, - node_id: NodeId, - }, - Destroy, - Heartbeat, -} - -#[derive(Debug, serde::Deserialize, serde::Serialize)] -pub enum DynamicNodeEvent { - NodeConfig { node_id: NodeId }, -} - -#[derive(Debug, serde::Deserialize, serde::Serialize)] -pub enum InterDaemonEvent { - Output { - dataflow_id: DataflowId, - node_id: NodeId, - output_id: DataId, - metadata: Metadata, - data: Option>>, - }, - InputsClosed { - dataflow_id: DataflowId, - inputs: BTreeSet<(NodeId, DataId)>, - }, -} - -#[derive(Debug, serde::Deserialize, serde::Serialize)] -pub enum DaemonCoordinatorReply { - SpawnResult(Result<(), String>), - ReloadResult(Result<(), String>), - StopResult(Result<(), String>), - DestroyResult { - result: Result<(), String>, - #[serde(skip)] - notify: Option>, - }, - Logs(Result, String>), -} - -pub type DataflowId = Uuid; - -#[derive(Debug, serde::Deserialize, serde::Serialize)] -pub struct SpawnDataflowNodes { - pub dataflow_id: DataflowId, - pub working_dir: PathBuf, - pub nodes: Vec, - pub machine_listen_ports: BTreeMap, - pub dataflow_descriptor: Descriptor, -} diff --git a/libraries/core/src/lib.rs b/libraries/core/src/lib.rs index 83ce0dc5..08f0a611 100644 --- a/libraries/core/src/lib.rs +++ b/libraries/core/src/lib.rs @@ -5,11 +5,9 @@ use std::{ path::Path, }; -pub use dora_message as message; +pub use uhlc; pub mod config; -pub mod coordinator_messages; -pub mod daemon_messages; pub mod descriptor; pub mod topics; diff --git a/libraries/core/src/topics.rs b/libraries/core/src/topics.rs index 5448555d..3eef0030 100644 --- a/libraries/core/src/topics.rs +++ b/libraries/core/src/topics.rs @@ -1,18 +1,4 @@ -use dora_message::uhlc; -use std::{ - borrow::Cow, - collections::{BTreeMap, BTreeSet}, - fmt::Display, - net::{IpAddr, Ipv4Addr}, - path::PathBuf, - time::Duration, -}; -use uuid::Uuid; - -use crate::{ - config::{NodeId, OperatorId}, - descriptor::Descriptor, -}; +use std::net::{IpAddr, Ipv4Addr}; pub const LOCALHOST: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); pub const DORA_COORDINATOR_PORT_DEFAULT: u16 = 0xD02A; @@ -20,234 +6,3 @@ pub const DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT: u16 = 0xD02B; pub const DORA_COORDINATOR_PORT_CONTROL_DEFAULT: u16 = 0x177C; pub const MANUAL_STOP: &str = "dora/stop"; - -#[derive(Debug, serde::Deserialize, serde::Serialize)] -pub enum ControlRequest { - Start { - dataflow: Descriptor, - name: Option, - // TODO: remove this once we figure out deploying of node/operator - // binaries from CLI to coordinator/daemon - local_working_dir: PathBuf, - }, - Reload { - dataflow_id: Uuid, - node_id: NodeId, - operator_id: Option, - }, - Check { - dataflow_uuid: Uuid, - }, - Stop { - dataflow_uuid: Uuid, - grace_duration: Option, - }, - StopByName { - name: String, - grace_duration: Option, - }, - Logs { - uuid: Option, - name: Option, - node: String, - }, - Destroy, - List, - DaemonConnected, - ConnectedMachines, - LogSubscribe { - dataflow_id: Uuid, - level: log::LevelFilter, - }, -} - -#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] -pub struct DataflowList(pub Vec); - -impl DataflowList { - pub fn get_active(&self) -> Vec { - self.0 - .iter() - .filter(|d| d.status == DataflowStatus::Running) - .map(|d| d.id.clone()) - .collect() - } -} - -#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] -pub struct DataflowListEntry { - pub id: DataflowId, - pub status: DataflowStatus, -} - -#[derive(Debug, Clone, Copy, serde::Deserialize, serde::Serialize, PartialEq, Eq)] -pub enum DataflowStatus { - Running, - Finished, - Failed, -} - -#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] -pub enum ControlRequestReply { - Error(String), - CoordinatorStopped, - DataflowStarted { uuid: Uuid }, - DataflowReloaded { uuid: Uuid }, - DataflowStopped { uuid: Uuid, result: DataflowResult }, - DataflowList(DataflowList), - DestroyOk, - DaemonConnected(bool), - ConnectedMachines(BTreeSet), - Logs(Vec), -} - -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct DataflowId { - pub uuid: Uuid, - pub name: Option, -} - -impl Display for DataflowId { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - if let Some(name) = &self.name { - write!(f, "[{name}] {}", self.uuid) - } else { - write!(f, "[] {}", self.uuid) - } - } -} - -#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] -pub struct DataflowResult { - pub uuid: Uuid, - pub timestamp: uhlc::Timestamp, - pub node_results: BTreeMap>, -} - -impl DataflowResult { - pub fn ok_empty(uuid: Uuid, timestamp: uhlc::Timestamp) -> Self { - Self { - uuid, - timestamp, - node_results: Default::default(), - } - } - - pub fn is_ok(&self) -> bool { - self.node_results.values().all(|r| r.is_ok()) - } -} - -#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] -pub struct DataflowDaemonResult { - pub timestamp: uhlc::Timestamp, - pub node_results: BTreeMap>, -} - -impl DataflowDaemonResult { - pub fn is_ok(&self) -> bool { - self.node_results.values().all(|r| r.is_ok()) - } -} - -#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] -pub struct NodeError { - pub timestamp: uhlc::Timestamp, - pub cause: NodeErrorCause, - pub exit_status: NodeExitStatus, -} - -impl std::fmt::Display for NodeError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match &self.exit_status { - NodeExitStatus::Success => write!(f, ""), - NodeExitStatus::IoError(err) => write!(f, "I/O error while reading exit status: {err}"), - NodeExitStatus::ExitCode(code) => write!(f, "exited with code {code}"), - NodeExitStatus::Signal(signal) => { - let signal_str: Cow<_> = match signal { - 1 => "SIGHUP".into(), - 2 => "SIGINT".into(), - 3 => "SIGQUIT".into(), - 4 => "SIGILL".into(), - 6 => "SIGABRT".into(), - 8 => "SIGFPE".into(), - 9 => "SIGKILL".into(), - 11 => "SIGSEGV".into(), - 13 => "SIGPIPE".into(), - 14 => "SIGALRM".into(), - 15 => "SIGTERM".into(), - 22 => "SIGABRT".into(), - 23 => "NSIG".into(), - other => other.to_string().into(), - }; - if matches!(self.cause, NodeErrorCause::GraceDuration) { - write!(f, "node was killed by dora because it didn't react to a stop message in time ({signal_str})") - } else { - write!(f, "exited because of signal {signal_str}") - } - } - NodeExitStatus::Unknown => write!(f, "unknown exit status"), - }?; - - match &self.cause { - NodeErrorCause::GraceDuration => {}, // handled above - NodeErrorCause::Cascading { caused_by_node } => write!( - f, - ". This error occurred because node `{caused_by_node}` exited before connecting to dora." - )?, - NodeErrorCause::Other { stderr } if stderr.is_empty() => {} - NodeErrorCause::Other { stderr } => { - let line: &str = "---------------------------------------------------------------------------------\n"; - write!(f, " with stderr output:\n{line}{stderr}{line}")? - }, - } - - Ok(()) - } -} - -#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] -pub enum NodeErrorCause { - /// Node was killed because it didn't react to a stop message in time. - GraceDuration, - /// Node failed because another node failed before, - Cascading { - caused_by_node: NodeId, - }, - Other { - stderr: String, - }, -} - -#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] -pub enum NodeExitStatus { - Success, - IoError(String), - ExitCode(i32), - Signal(i32), - Unknown, -} - -impl From> for NodeExitStatus { - fn from(result: Result) -> Self { - match result { - Ok(status) => { - if status.success() { - NodeExitStatus::Success - } else if let Some(code) = status.code() { - Self::ExitCode(code) - } else { - #[cfg(unix)] - { - use std::os::unix::process::ExitStatusExt; - if let Some(signal) = status.signal() { - return Self::Signal(signal); - } - } - Self::Unknown - } - } - Err(err) => Self::IoError(err.to_string()), - } - } -} diff --git a/libraries/message/Cargo.toml b/libraries/message/Cargo.toml index a9dd3c04..983f7b86 100644 --- a/libraries/message/Cargo.toml +++ b/libraries/message/Cargo.toml @@ -11,7 +11,11 @@ license.workspace = true [dependencies] arrow-data = { workspace = true } -uhlc = "0.5.1" serde = { version = "1.0.136", features = ["derive"] } eyre = "0.6.8" arrow-schema = { workspace = true, features = ["serde"] } +tokio = "1.39.2" +dora-core = { workspace = true } +uuid = { version = "1.7", features = ["serde", "v7"] } +log = { version = "0.4.21", features = ["serde"] } +aligned-vec = { version = "0.5.0", features = ["serde"] } diff --git a/libraries/message/src/cli_to_coordinator.rs b/libraries/message/src/cli_to_coordinator.rs new file mode 100644 index 00000000..64482596 --- /dev/null +++ b/libraries/message/src/cli_to_coordinator.rs @@ -0,0 +1,47 @@ +use std::{path::PathBuf, time::Duration}; + +use dora_core::{ + config::{NodeId, OperatorId}, + descriptor::Descriptor, +}; +use uuid::Uuid; + +#[derive(Debug, serde::Deserialize, serde::Serialize)] +pub enum ControlRequest { + Start { + dataflow: Descriptor, + name: Option, + // TODO: remove this once we figure out deploying of node/operator + // binaries from CLI to coordinator/daemon + local_working_dir: PathBuf, + }, + Reload { + dataflow_id: Uuid, + node_id: NodeId, + operator_id: Option, + }, + Check { + dataflow_uuid: Uuid, + }, + Stop { + dataflow_uuid: Uuid, + grace_duration: Option, + }, + StopByName { + name: String, + grace_duration: Option, + }, + Logs { + uuid: Option, + name: Option, + node: String, + }, + Destroy, + List, + DaemonConnected, + ConnectedMachines, + LogSubscribe { + dataflow_id: Uuid, + level: log::LevelFilter, + }, +} diff --git a/libraries/message/src/common.rs b/libraries/message/src/common.rs new file mode 100644 index 00000000..03a75e88 --- /dev/null +++ b/libraries/message/src/common.rs @@ -0,0 +1,184 @@ +use core::fmt; +use std::borrow::Cow; + +use aligned_vec::{AVec, ConstAlign}; +use dora_core::{config::NodeId, uhlc}; +use uuid::Uuid; + +use crate::DataflowId; + +pub use log::Level as LogLevel; + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +#[must_use] +pub struct LogMessage { + pub dataflow_id: DataflowId, + pub node_id: Option, + pub level: LogLevel, + pub target: Option, + pub module_path: Option, + pub file: Option, + pub line: Option, + pub message: String, +} + +#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] +pub struct NodeError { + pub timestamp: uhlc::Timestamp, + pub cause: NodeErrorCause, + pub exit_status: NodeExitStatus, +} + +impl std::fmt::Display for NodeError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match &self.exit_status { + NodeExitStatus::Success => write!(f, ""), + NodeExitStatus::IoError(err) => write!(f, "I/O error while reading exit status: {err}"), + NodeExitStatus::ExitCode(code) => write!(f, "exited with code {code}"), + NodeExitStatus::Signal(signal) => { + let signal_str: Cow<_> = match signal { + 1 => "SIGHUP".into(), + 2 => "SIGINT".into(), + 3 => "SIGQUIT".into(), + 4 => "SIGILL".into(), + 6 => "SIGABRT".into(), + 8 => "SIGFPE".into(), + 9 => "SIGKILL".into(), + 11 => "SIGSEGV".into(), + 13 => "SIGPIPE".into(), + 14 => "SIGALRM".into(), + 15 => "SIGTERM".into(), + 22 => "SIGABRT".into(), + 23 => "NSIG".into(), + other => other.to_string().into(), + }; + if matches!(self.cause, NodeErrorCause::GraceDuration) { + write!(f, "node was killed by dora because it didn't react to a stop message in time ({signal_str})") + } else { + write!(f, "exited because of signal {signal_str}") + } + } + NodeExitStatus::Unknown => write!(f, "unknown exit status"), + }?; + + match &self.cause { + NodeErrorCause::GraceDuration => {}, // handled above + NodeErrorCause::Cascading { caused_by_node } => write!( + f, + ". This error occurred because node `{caused_by_node}` exited before connecting to dora." + )?, + NodeErrorCause::Other { stderr } if stderr.is_empty() => {} + NodeErrorCause::Other { stderr } => { + let line: &str = "---------------------------------------------------------------------------------\n"; + write!(f, " with stderr output:\n{line}{stderr}{line}")? + }, + } + + Ok(()) + } +} + +#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] +pub enum NodeErrorCause { + /// Node was killed because it didn't react to a stop message in time. + GraceDuration, + /// Node failed because another node failed before, + Cascading { + caused_by_node: NodeId, + }, + Other { + stderr: String, + }, +} + +#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] +pub enum NodeExitStatus { + Success, + IoError(String), + ExitCode(i32), + Signal(i32), + Unknown, +} + +impl From> for NodeExitStatus { + fn from(result: Result) -> Self { + match result { + Ok(status) => { + if status.success() { + NodeExitStatus::Success + } else if let Some(code) = status.code() { + Self::ExitCode(code) + } else { + #[cfg(unix)] + { + use std::os::unix::process::ExitStatusExt; + if let Some(signal) = status.signal() { + return Self::Signal(signal); + } + } + Self::Unknown + } + } + Err(err) => Self::IoError(err.to_string()), + } + } +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct Timestamped { + pub inner: T, + pub timestamp: uhlc::Timestamp, +} + +pub type SharedMemoryId = String; + +#[derive(serde::Serialize, serde::Deserialize, Clone)] +pub enum DataMessage { + Vec(AVec>), + SharedMemory { + shared_memory_id: String, + len: usize, + drop_token: DropToken, + }, +} + +impl DataMessage { + pub fn drop_token(&self) -> Option { + match self { + DataMessage::Vec(_) => None, + DataMessage::SharedMemory { drop_token, .. } => Some(*drop_token), + } + } +} + +impl fmt::Debug for DataMessage { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Vec(v) => f + .debug_struct("Vec") + .field("len", &v.len()) + .finish_non_exhaustive(), + Self::SharedMemory { + shared_memory_id, + len, + drop_token, + } => f + .debug_struct("SharedMemory") + .field("shared_memory_id", shared_memory_id) + .field("len", len) + .field("drop_token", drop_token) + .finish(), + } + } +} + +#[derive( + Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize, +)] +pub struct DropToken(Uuid); + +impl DropToken { + pub fn generate() -> Self { + Self(Uuid::new_v7(uuid::Timestamp::now(uuid::NoContext))) + } +} diff --git a/libraries/message/src/coordinator_to_cli.rs b/libraries/message/src/coordinator_to_cli.rs new file mode 100644 index 00000000..9d61ae6e --- /dev/null +++ b/libraries/message/src/coordinator_to_cli.rs @@ -0,0 +1,85 @@ +use std::collections::{BTreeMap, BTreeSet}; + +use dora_core::config::NodeId; +use dora_core::uhlc; +use uuid::Uuid; + +pub use crate::common::LogMessage; +pub use crate::common::{NodeError, NodeErrorCause, NodeExitStatus}; + +#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] +pub enum ControlRequestReply { + Error(String), + CoordinatorStopped, + DataflowStarted { uuid: Uuid }, + DataflowReloaded { uuid: Uuid }, + DataflowStopped { uuid: Uuid, result: DataflowResult }, + DataflowList(DataflowList), + DestroyOk, + DaemonConnected(bool), + ConnectedMachines(BTreeSet), + Logs(Vec), +} + +#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] +pub struct DataflowResult { + pub uuid: Uuid, + pub timestamp: uhlc::Timestamp, + pub node_results: BTreeMap>, +} + +impl DataflowResult { + pub fn ok_empty(uuid: Uuid, timestamp: uhlc::Timestamp) -> Self { + Self { + uuid, + timestamp, + node_results: Default::default(), + } + } + + pub fn is_ok(&self) -> bool { + self.node_results.values().all(|r| r.is_ok()) + } +} + +#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] +pub struct DataflowList(pub Vec); + +impl DataflowList { + pub fn get_active(&self) -> Vec { + self.0 + .iter() + .filter(|d| d.status == DataflowStatus::Running) + .map(|d| d.id.clone()) + .collect() + } +} + +#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] +pub struct DataflowListEntry { + pub id: DataflowIdAndName, + pub status: DataflowStatus, +} + +#[derive(Debug, Clone, Copy, serde::Deserialize, serde::Serialize, PartialEq, Eq)] +pub enum DataflowStatus { + Running, + Finished, + Failed, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct DataflowIdAndName { + pub uuid: Uuid, + pub name: Option, +} + +impl std::fmt::Display for DataflowIdAndName { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if let Some(name) = &self.name { + write!(f, "[{name}] {}", self.uuid) + } else { + write!(f, "[] {}", self.uuid) + } + } +} diff --git a/libraries/message/src/coordinator_to_daemon.rs b/libraries/message/src/coordinator_to_daemon.rs new file mode 100644 index 00000000..e8741f62 --- /dev/null +++ b/libraries/message/src/coordinator_to_daemon.rs @@ -0,0 +1,59 @@ +use std::{collections::BTreeMap, net::SocketAddr, path::PathBuf, time::Duration}; + +use dora_core::{ + config::{NodeId, OperatorId}, + // TODO: how should we version these? + descriptor::{Descriptor, ResolvedNode}, +}; + +use crate::DataflowId; + +pub use crate::common::Timestamped; + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub enum RegisterResult { + Ok, + Err(String), +} + +impl RegisterResult { + pub fn to_result(self) -> eyre::Result<()> { + match self { + RegisterResult::Ok => Ok(()), + RegisterResult::Err(err) => Err(eyre::eyre!(err)), + } + } +} + +#[derive(Debug, serde::Deserialize, serde::Serialize)] +pub enum DaemonCoordinatorEvent { + Spawn(SpawnDataflowNodes), + AllNodesReady { + dataflow_id: DataflowId, + exited_before_subscribe: Vec, + }, + StopDataflow { + dataflow_id: DataflowId, + grace_duration: Option, + }, + ReloadDataflow { + dataflow_id: DataflowId, + node_id: NodeId, + operator_id: Option, + }, + Logs { + dataflow_id: DataflowId, + node_id: NodeId, + }, + Destroy, + Heartbeat, +} + +#[derive(Debug, serde::Deserialize, serde::Serialize)] +pub struct SpawnDataflowNodes { + pub dataflow_id: DataflowId, + pub working_dir: PathBuf, + pub nodes: Vec, + pub machine_listen_ports: BTreeMap, + pub dataflow_descriptor: Descriptor, +} diff --git a/libraries/message/src/daemon_to_coordinator.rs b/libraries/message/src/daemon_to_coordinator.rs new file mode 100644 index 00000000..a3116911 --- /dev/null +++ b/libraries/message/src/daemon_to_coordinator.rs @@ -0,0 +1,86 @@ +use std::collections::BTreeMap; + +use dora_core::{config::NodeId, uhlc}; + +pub use crate::common::{ + DataMessage, LogLevel, LogMessage, NodeError, NodeErrorCause, NodeExitStatus, Timestamped, +}; +use crate::DataflowId; + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub enum CoordinatorRequest { + Register(DaemonRegisterRequest), + Event { + machine_id: String, + event: DaemonEvent, + }, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub struct DaemonRegisterRequest { + dora_version: String, + pub machine_id: String, + pub listen_port: u16, +} + +impl DaemonRegisterRequest { + pub fn new(machine_id: String, listen_port: u16) -> Self { + Self { + dora_version: env!("CARGO_PKG_VERSION").to_owned(), + machine_id, + listen_port, + } + } + + pub fn check_version(&self) -> Result<(), String> { + let crate_version = env!("CARGO_PKG_VERSION"); + if self.dora_version == crate_version { + Ok(()) + } else { + Err(format!( + "version mismatch: message format v{} is not compatible \ + with expected message format v{crate_version}", + self.dora_version + )) + } + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub enum DaemonEvent { + AllNodesReady { + dataflow_id: DataflowId, + exited_before_subscribe: Vec, + }, + AllNodesFinished { + dataflow_id: DataflowId, + result: DataflowDaemonResult, + }, + Heartbeat, + Log(LogMessage), +} + +#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] +pub struct DataflowDaemonResult { + pub timestamp: uhlc::Timestamp, + pub node_results: BTreeMap>, +} + +impl DataflowDaemonResult { + pub fn is_ok(&self) -> bool { + self.node_results.values().all(|r| r.is_ok()) + } +} + +#[derive(Debug, serde::Deserialize, serde::Serialize)] +pub enum DaemonCoordinatorReply { + SpawnResult(Result<(), String>), + ReloadResult(Result<(), String>), + StopResult(Result<(), String>), + DestroyResult { + result: Result<(), String>, + #[serde(skip)] + notify: Option>, + }, + Logs(Result, String>), +} diff --git a/libraries/message/src/daemon_to_daemon.rs b/libraries/message/src/daemon_to_daemon.rs new file mode 100644 index 00000000..20fb5dd9 --- /dev/null +++ b/libraries/message/src/daemon_to_daemon.rs @@ -0,0 +1,21 @@ +use std::collections::BTreeSet; + +use aligned_vec::{AVec, ConstAlign}; +use dora_core::config::{DataId, NodeId}; + +use crate::{metadata::Metadata, DataflowId}; + +#[derive(Debug, serde::Deserialize, serde::Serialize)] +pub enum InterDaemonEvent { + Output { + dataflow_id: DataflowId, + node_id: NodeId, + output_id: DataId, + metadata: Metadata, + data: Option>>, + }, + InputsClosed { + dataflow_id: DataflowId, + inputs: BTreeSet<(NodeId, DataId)>, + }, +} diff --git a/libraries/message/src/daemon_to_node.rs b/libraries/message/src/daemon_to_node.rs new file mode 100644 index 00000000..178b3377 --- /dev/null +++ b/libraries/message/src/daemon_to_node.rs @@ -0,0 +1,77 @@ +use std::{net::SocketAddr, path::PathBuf}; + +use dora_core::{ + config::{DataId, NodeId, NodeRunConfig, OperatorId}, + descriptor::{Descriptor, OperatorDefinition}, +}; + +use crate::{metadata::Metadata, DataflowId}; + +pub use crate::common::{DataMessage, DropToken, SharedMemoryId, Timestamped}; + +// Passed via env variable +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub struct RuntimeConfig { + pub node: NodeConfig, + pub operators: Vec, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct NodeConfig { + pub dataflow_id: DataflowId, + pub node_id: NodeId, + pub run_config: NodeRunConfig, + pub daemon_communication: DaemonCommunication, + pub dataflow_descriptor: Descriptor, + pub dynamic: bool, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub enum DaemonCommunication { + Shmem { + daemon_control_region_id: SharedMemoryId, + daemon_drop_region_id: SharedMemoryId, + daemon_events_region_id: SharedMemoryId, + daemon_events_close_region_id: SharedMemoryId, + }, + Tcp { + socket_addr: SocketAddr, + }, + #[cfg(unix)] + UnixDomain { + socket_file: PathBuf, + }, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[must_use] +pub enum DaemonReply { + Result(Result<(), String>), + PreparedMessage { shared_memory_id: SharedMemoryId }, + NextEvents(Vec>), + NextDropEvents(Vec>), + NodeConfig { result: Result }, + Empty, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub enum NodeEvent { + Stop, + Reload { + operator_id: Option, + }, + Input { + id: DataId, + metadata: Metadata, + data: Option, + }, + InputClosed { + id: DataId, + }, + AllInputsClosed, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub enum NodeDropEvent { + OutputDropped { drop_token: DropToken }, +} diff --git a/libraries/message/src/lib.rs b/libraries/message/src/lib.rs index fa69c2fb..aa339aae 100644 --- a/libraries/message/src/lib.rs +++ b/libraries/message/src/lib.rs @@ -3,146 +3,18 @@ #![allow(clippy::missing_safety_doc)] -use std::collections::BTreeMap; +pub mod common; +pub mod metadata; -use arrow_data::ArrayData; -use arrow_schema::DataType; -use eyre::Context; -use serde::{Deserialize, Serialize}; -pub use uhlc; +pub mod coordinator_to_daemon; +pub mod daemon_to_coordinator; -pub type MetadataParameters = BTreeMap; +pub mod daemon_to_daemon; -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct Metadata { - metadata_version: u16, - timestamp: uhlc::Timestamp, - pub type_info: ArrowTypeInfo, - pub parameters: MetadataParameters, -} +pub mod daemon_to_node; +pub mod node_to_daemon; -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct ArrowTypeInfo { - pub data_type: DataType, - pub len: usize, - pub null_count: usize, - pub validity: Option>, - pub offset: usize, - pub buffer_offsets: Vec, - pub child_data: Vec, -} +pub mod cli_to_coordinator; +pub mod coordinator_to_cli; -impl ArrowTypeInfo { - pub const fn empty() -> Self { - Self { - data_type: DataType::Null, - len: 0, - null_count: 0, - validity: None, - offset: 0, - buffer_offsets: Vec::new(), - child_data: Vec::new(), - } - } - - pub fn byte_array(data_len: usize) -> Self { - Self { - data_type: DataType::UInt8, - len: data_len, - null_count: 0, - validity: None, - offset: 0, - buffer_offsets: vec![BufferOffset { - offset: 0, - len: data_len, - }], - child_data: Vec::new(), - } - } - - pub unsafe fn from_array( - array: &ArrayData, - region_start: *const u8, - region_len: usize, - ) -> eyre::Result { - Ok(Self { - data_type: array.data_type().clone(), - len: array.len(), - null_count: array.null_count(), - validity: array.nulls().map(|b| b.validity().to_owned()), - offset: array.offset(), - buffer_offsets: array - .buffers() - .iter() - .map(|b| { - let ptr = b.as_ptr(); - if ptr as usize <= region_start as usize { - eyre::bail!("ptr {ptr:p} starts before region {region_start:p}"); - } - if ptr as usize >= region_start as usize + region_len { - eyre::bail!("ptr {ptr:p} starts after region {region_start:p}"); - } - if ptr as usize + b.len() > region_start as usize + region_len { - eyre::bail!("ptr {ptr:p} ends after region {region_start:p}"); - } - let offset = usize::try_from(unsafe { ptr.offset_from(region_start) }) - .context("offset_from is negative")?; - - Result::<_, eyre::Report>::Ok(BufferOffset { - offset, - len: b.len(), - }) - }) - .collect::>()?, - child_data: array - .child_data() - .iter() - .map(|c| unsafe { Self::from_array(c, region_start, region_len) }) - .collect::>()?, - }) - } -} - -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct BufferOffset { - pub offset: usize, - pub len: usize, -} - -#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] -pub enum Parameter { - Bool(bool), - Integer(i64), - String(String), -} - -impl Metadata { - pub fn new(timestamp: uhlc::Timestamp, type_info: ArrowTypeInfo) -> Self { - Self::from_parameters(timestamp, type_info, Default::default()) - } - - pub fn from_parameters( - timestamp: uhlc::Timestamp, - type_info: ArrowTypeInfo, - parameters: MetadataParameters, - ) -> Self { - Self { - metadata_version: 0, - timestamp, - parameters, - type_info, - } - } - - pub fn timestamp(&self) -> uhlc::Timestamp { - self.timestamp - } - - pub fn open_telemetry_context(&self) -> String { - if let Some(Parameter::String(otel)) = self.parameters.get("open_telemetry_context") { - otel.to_string() - } else { - "".to_string() - } - } -} +pub type DataflowId = uuid::Uuid; diff --git a/libraries/message/src/metadata.rs b/libraries/message/src/metadata.rs new file mode 100644 index 00000000..fe526dac --- /dev/null +++ b/libraries/message/src/metadata.rs @@ -0,0 +1,143 @@ +use std::collections::BTreeMap; + +use arrow_data::ArrayData; +use arrow_schema::DataType; +use dora_core::uhlc; +use eyre::Context; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct Metadata { + metadata_version: u16, + timestamp: uhlc::Timestamp, + pub type_info: ArrowTypeInfo, + pub parameters: MetadataParameters, +} + +impl Metadata { + pub fn new(timestamp: uhlc::Timestamp, type_info: ArrowTypeInfo) -> Self { + Self::from_parameters(timestamp, type_info, Default::default()) + } + + pub fn from_parameters( + timestamp: uhlc::Timestamp, + type_info: ArrowTypeInfo, + parameters: MetadataParameters, + ) -> Self { + Self { + metadata_version: 0, + timestamp, + parameters, + type_info, + } + } + + pub fn timestamp(&self) -> uhlc::Timestamp { + self.timestamp + } + + pub fn open_telemetry_context(&self) -> String { + if let Some(Parameter::String(otel)) = self.parameters.get("open_telemetry_context") { + otel.to_string() + } else { + "".to_string() + } + } +} + +pub type MetadataParameters = BTreeMap; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct ArrowTypeInfo { + pub data_type: DataType, + pub len: usize, + pub null_count: usize, + pub validity: Option>, + pub offset: usize, + pub buffer_offsets: Vec, + pub child_data: Vec, +} + +impl ArrowTypeInfo { + pub const fn empty() -> Self { + Self { + data_type: DataType::Null, + len: 0, + null_count: 0, + validity: None, + offset: 0, + buffer_offsets: Vec::new(), + child_data: Vec::new(), + } + } + + pub fn byte_array(data_len: usize) -> Self { + Self { + data_type: DataType::UInt8, + len: data_len, + null_count: 0, + validity: None, + offset: 0, + buffer_offsets: vec![BufferOffset { + offset: 0, + len: data_len, + }], + child_data: Vec::new(), + } + } + + pub unsafe fn from_array( + array: &ArrayData, + region_start: *const u8, + region_len: usize, + ) -> eyre::Result { + Ok(Self { + data_type: array.data_type().clone(), + len: array.len(), + null_count: array.null_count(), + validity: array.nulls().map(|b| b.validity().to_owned()), + offset: array.offset(), + buffer_offsets: array + .buffers() + .iter() + .map(|b| { + let ptr = b.as_ptr(); + if ptr as usize <= region_start as usize { + eyre::bail!("ptr {ptr:p} starts before region {region_start:p}"); + } + if ptr as usize >= region_start as usize + region_len { + eyre::bail!("ptr {ptr:p} starts after region {region_start:p}"); + } + if ptr as usize + b.len() > region_start as usize + region_len { + eyre::bail!("ptr {ptr:p} ends after region {region_start:p}"); + } + let offset = usize::try_from(unsafe { ptr.offset_from(region_start) }) + .context("offset_from is negative")?; + + Result::<_, eyre::Report>::Ok(BufferOffset { + offset, + len: b.len(), + }) + }) + .collect::>()?, + child_data: array + .child_data() + .iter() + .map(|c| unsafe { Self::from_array(c, region_start, region_len) }) + .collect::>()?, + }) + } +} + +#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] +pub enum Parameter { + Bool(bool), + Integer(i64), + String(String), +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct BufferOffset { + pub offset: usize, + pub len: usize, +} diff --git a/libraries/message/src/node_to_daemon.rs b/libraries/message/src/node_to_daemon.rs new file mode 100644 index 00000000..0a761964 --- /dev/null +++ b/libraries/message/src/node_to_daemon.rs @@ -0,0 +1,131 @@ +pub use crate::common::{ + DataMessage, DropToken, LogLevel, LogMessage, SharedMemoryId, Timestamped, +}; +use crate::{metadata::Metadata, DataflowId}; + +use dora_core::config::{DataId, NodeId}; + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub enum DaemonRequest { + Register(NodeRegisterRequest), + Subscribe, + SendMessage { + output_id: DataId, + metadata: Metadata, + data: Option, + }, + CloseOutputs(Vec), + /// Signals that the node is finished sending outputs and that it received all + /// required drop tokens. + OutputsDone, + NextEvent { + drop_tokens: Vec, + }, + ReportDropTokens { + drop_tokens: Vec, + }, + SubscribeDrop, + NextFinishedDropTokens, + EventStreamDropped, + NodeConfig { + node_id: NodeId, + }, +} + +impl DaemonRequest { + pub fn expects_tcp_bincode_reply(&self) -> bool { + #[allow(clippy::match_like_matches_macro)] + match self { + DaemonRequest::SendMessage { .. } + | DaemonRequest::NodeConfig { .. } + | DaemonRequest::ReportDropTokens { .. } => false, + DaemonRequest::Register(NodeRegisterRequest { .. }) + | DaemonRequest::Subscribe + | DaemonRequest::CloseOutputs(_) + | DaemonRequest::OutputsDone + | DaemonRequest::NextEvent { .. } + | DaemonRequest::SubscribeDrop + | DaemonRequest::NextFinishedDropTokens + | DaemonRequest::EventStreamDropped => true, + } + } + + pub fn expects_tcp_json_reply(&self) -> bool { + #[allow(clippy::match_like_matches_macro)] + match self { + DaemonRequest::NodeConfig { .. } => true, + DaemonRequest::Register(NodeRegisterRequest { .. }) + | DaemonRequest::Subscribe + | DaemonRequest::CloseOutputs(_) + | DaemonRequest::OutputsDone + | DaemonRequest::NextEvent { .. } + | DaemonRequest::SubscribeDrop + | DaemonRequest::NextFinishedDropTokens + | DaemonRequest::ReportDropTokens { .. } + | DaemonRequest::SendMessage { .. } + | DaemonRequest::EventStreamDropped => false, + } + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub struct NodeRegisterRequest { + pub dataflow_id: DataflowId, + pub node_id: NodeId, + dora_version: String, +} + +impl NodeRegisterRequest { + pub fn new(dataflow_id: DataflowId, node_id: NodeId) -> Self { + Self { + dataflow_id, + node_id, + dora_version: env!("CARGO_PKG_VERSION").to_owned(), + } + } + + pub fn check_version(&self) -> Result<(), String> { + let crate_version = env!("CARGO_PKG_VERSION"); + if self.dora_version == crate_version { + Ok(()) + } else { + Err(format!( + "version mismatch: message format v{} is not compatible \ + with expected message format v{crate_version}", + self.dora_version + )) + } + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub struct DropEvent { + pub tokens: Vec, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub enum InputData { + SharedMemory(SharedMemoryInput), + Vec(Vec), +} + +impl InputData { + pub fn drop_token(&self) -> Option { + match self { + InputData::SharedMemory(data) => Some(data.drop_token), + InputData::Vec(_) => None, + } + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub struct SharedMemoryInput { + pub shared_memory_id: SharedMemoryId, + pub len: usize, + pub drop_token: DropToken, +} + +#[derive(Debug, serde::Deserialize, serde::Serialize)] +pub enum DynamicNodeEvent { + NodeConfig { node_id: NodeId }, +} From 85cccb6b9448d674fa1c36de14ffbacc00212916 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 7 Aug 2024 20:57:18 +0200 Subject: [PATCH 2/2] Publish `dora-message` after `dora-core` The `dora-message` crate now depends on `dora-core`, instead of the other way around. --- .github/workflows/release.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index aa9b7ad5..a0819598 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -54,11 +54,11 @@ jobs: # workspace crates. # Publish libraries crates - cargo publish -p dora-message --token ${{ secrets.CARGO_REGISTRY_TOKEN }} cargo publish -p dora-tracing --token ${{ secrets.CARGO_REGISTRY_TOKEN }} cargo publish -p dora-metrics --token ${{ secrets.CARGO_REGISTRY_TOKEN }} cargo publish -p dora-download --token ${{ secrets.CARGO_REGISTRY_TOKEN }} cargo publish -p dora-core --token ${{ secrets.CARGO_REGISTRY_TOKEN }} + cargo publish -p dora-message --token ${{ secrets.CARGO_REGISTRY_TOKEN }} cargo publish -p communication-layer-pub-sub --token ${{ secrets.CARGO_REGISTRY_TOKEN }} cargo publish -p communication-layer-request-reply --token ${{ secrets.CARGO_REGISTRY_TOKEN }} cargo publish -p shared-memory-server --token ${{ secrets.CARGO_REGISTRY_TOKEN }}