diff --git a/Cargo.lock b/Cargo.lock index 7d0ebed2..0a05d4eb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -983,14 +983,11 @@ dependencies = [ name = "dora-core" version = "0.1.2" dependencies = [ - "bincode", "dora-message", "eyre", "once_cell", - "raw_sync", "serde", "serde_yaml 0.9.11", - "shared_memory", "tracing", "uuid 1.2.1", "which", @@ -1012,7 +1009,7 @@ dependencies = [ "serde", "serde_json", "serde_yaml 0.8.23", - "shared_memory", + "shared-memory-server", "tokio", "tokio-stream", "tracing", @@ -1077,11 +1074,10 @@ dependencies = [ "eyre", "flume", "once_cell", - "raw_sync", "serde", "serde_json", "serde_yaml 0.8.23", - "shared_memory", + "shared-memory-server", "thiserror", "tokio", "tracing", @@ -2871,9 +2867,9 @@ checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5" [[package]] name = "proc-macro2" -version = "1.0.43" +version = "1.0.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a2ca2c61bc9f3d74d2886294ab7b9853abd9c1ad903a3ac7815c58989bb7bab" +checksum = "6ef7d57beacfaf2d8aee5937dab7b7f28de3cb8b1828479bb5de2a7106f2bae2" dependencies = [ "unicode-ident", ] @@ -3482,18 +3478,18 @@ checksum = "d65bd28f48be7196d222d95b9243287f48d27aca604e08497513019ff0502cc4" [[package]] name = "serde" -version = "1.0.144" +version = "1.0.152" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f747710de3dcd43b88c9168773254e809d8ddbdf9653b84e2554ab219f17860" +checksum = "bb7d1f0d3021d347a83e556fc4683dea2ea09d87bccdf88ff5c12545d89d5efb" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.144" +version = "1.0.152" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94ed3a816fb1d101812f83e789f888322c34e291f894f19590dc310963e87a00" +checksum = "af487d118eecd09402d70a5d72551860e788df87b464af30e5ea6a38c75c541e" dependencies = [ "proc-macro2", "quote", @@ -3581,6 +3577,18 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "shared-memory-server" +version = "0.1.2" +dependencies = [ + "bincode", + "eyre", + "raw_sync", + "serde", + "shared_memory", + "tracing", +] + [[package]] name = "shared_memory" version = "0.12.0" @@ -3711,9 +3719,9 @@ checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" [[package]] name = "syn" -version = "1.0.99" +version = "1.0.107" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58dbef6ec655055e20b86b15a8cc6d439cca19b667537ac6a1369572d151ab13" +checksum = "1f4064b5b16e03ae50984a5a8ed5d4f8803e6bc1fd170a3cda91a1be4b18e3f5" dependencies = [ "proc-macro2", "quote", @@ -4049,9 +4057,9 @@ checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6" [[package]] name = "tracing" -version = "0.1.36" +version = "0.1.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2fce9567bd60a67d08a16488756721ba392f24f29006402881e43b19aac64307" +checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" dependencies = [ "cfg-if 1.0.0", "log", @@ -4062,9 +4070,9 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.22" +version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11c75893af559bc8e10716548bdef5cb2b983f8e637db9d0e15126b61b484ee2" +checksum = "4017f8f45139870ca7e672686113917c71c7a6e02d4924eda67186083c03081a" dependencies = [ "proc-macro2", "quote", @@ -4073,9 +4081,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.29" +version = "0.1.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5aeea4303076558a00714b823f9ad67d58a3bbda1df83d8827d21193156e22f7" +checksum = "24eb03ba0eab1fd845050058ce5e616558e8f8d8fca633e6b163fe25c797213a" dependencies = [ "once_cell", "valuable", diff --git a/Cargo.toml b/Cargo.toml index 595fdc34..0aef9722 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ members = [ "libraries/communication-layer/*", "libraries/core", "libraries/message", + "libraries/shared-memory-server", "libraries/extensions/download", "libraries/extensions/telemetry/*", "libraries/extensions/zenoh-logger", diff --git a/apis/rust/node/Cargo.toml b/apis/rust/node/Cargo.toml index d685f34d..e62185f4 100644 --- a/apis/rust/node/Cargo.toml +++ b/apis/rust/node/Cargo.toml @@ -22,8 +22,7 @@ uuid = { version = "1.1.2", features = ["v4"] } capnp = "0.14.11" dora-message = { path = "../../../libraries/message" } dora-core = { path = "../../../libraries/core" } -shared_memory = "0.12.0" -raw_sync = "0.1.5" +shared-memory-server = { path = "../../../libraries/shared-memory-server" } [dev-dependencies] tokio = { version = "1.17.0", features = ["rt"] } diff --git a/apis/rust/node/src/daemon.rs b/apis/rust/node/src/daemon.rs index 424c3ac4..4f6a7a3c 100644 --- a/apis/rust/node/src/daemon.rs +++ b/apis/rust/node/src/daemon.rs @@ -1,12 +1,15 @@ use dora_core::{ config::{DataId, NodeId}, daemon_messages::{DaemonReply, DaemonRequest, DataflowId, NodeEvent}, - shared_memory::ShmemClient, }; use dora_message::Metadata; use eyre::{bail, eyre, Context}; -use shared_memory::{Shmem, ShmemConf}; -use std::{marker::PhantomData, thread::JoinHandle, time::Duration}; +use shared_memory_server::{Shmem, ShmemClient, ShmemConf}; +use std::{ + marker::PhantomData, + thread::JoinHandle, + time::{Duration, Instant}, +}; pub struct DaemonConnection { pub control_channel: ControlChannel, diff --git a/apis/rust/node/src/lib.rs b/apis/rust/node/src/lib.rs index 3ce43f6e..8f7087db 100644 --- a/apis/rust/node/src/lib.rs +++ b/apis/rust/node/src/lib.rs @@ -9,7 +9,7 @@ use dora_core::{ pub use dora_message::{uhlc, Metadata, MetadataParameters}; use eyre::WrapErr; pub use flume::Receiver; -use shared_memory::ShmemConf; +use shared_memory_server::ShmemConf; pub mod daemon; diff --git a/binaries/daemon/Cargo.toml b/binaries/daemon/Cargo.toml index e6f75685..6be78b6a 100644 --- a/binaries/daemon/Cargo.toml +++ b/binaries/daemon/Cargo.toml @@ -14,7 +14,6 @@ tracing-subscriber = "0.3.15" futures-concurrency = "7.0.0" serde = { version = "1.0.136", features = ["derive"] } serde_json = "1.0.86" -shared_memory = "0.12.0" dora-core = { path = "../../libraries/core" } dora-message = { path = "../../libraries/message" } flume = "0.10.14" @@ -23,3 +22,4 @@ serde_yaml = "0.8.23" uuid = { version = "1.1.2", features = ["v4"] } futures = "0.3.25" clap = { version = "3.1.8", features = ["derive"] } +shared-memory-server = { path = "../../libraries/shared-memory-server" } diff --git a/binaries/daemon/src/listener.rs b/binaries/daemon/src/listener.rs index c1012c1c..5f4fd949 100644 --- a/binaries/daemon/src/listener.rs +++ b/binaries/daemon/src/listener.rs @@ -2,9 +2,9 @@ use crate::{shared_mem_handler, DaemonNodeEvent, Event}; use dora_core::{ config::NodeId, daemon_messages::{DaemonReply, DaemonRequest, DataflowId, DropEvent, NodeEvent}, - shared_memory::ShmemServer, }; use eyre::{eyre, Context}; +use shared_memory_server::ShmemServer; use tokio::sync::{mpsc, oneshot}; #[tracing::instrument(skip(server, daemon_tx, shmem_handler_tx))] diff --git a/binaries/daemon/src/shared_mem_handler.rs b/binaries/daemon/src/shared_mem_handler.rs index 973fe4b1..20cfa9cf 100644 --- a/binaries/daemon/src/shared_mem_handler.rs +++ b/binaries/daemon/src/shared_mem_handler.rs @@ -13,7 +13,7 @@ use eyre::{eyre, Context}; use flume::{Receiver, Sender}; use futures::StreamExt; use futures_concurrency::stream::Merge; -use shared_memory::{Shmem, ShmemConf}; +use shared_memory_server::{Shmem, ShmemConf}; use tokio::sync::oneshot; use uuid::Uuid; diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index 47d929bd..bda1f97e 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -2,11 +2,10 @@ use crate::{listener::listener_loop, shared_mem_handler, DoraEvent, Event}; use dora_core::{ daemon_messages::{DataflowId, NodeConfig, SpawnNodeParams}, descriptor::{resolve_path, source_is_url}, - shared_memory::ShmemServer, }; use dora_download::download_file; use eyre::{eyre, WrapErr}; -use shared_memory::ShmemConf; +use shared_memory_server::{ShmemConf, ShmemServer}; use std::{env::consts::EXE_EXTENSION, path::Path, process::Stdio}; use tokio::sync::mpsc; diff --git a/libraries/core/Cargo.toml b/libraries/core/Cargo.toml index 6c7b347e..b4651fec 100644 --- a/libraries/core/Cargo.toml +++ b/libraries/core/Cargo.toml @@ -15,7 +15,4 @@ zenoh-config = { git = "https://github.com/eclipse-zenoh/zenoh.git", rev = "79a1 which = "4.3.0" uuid = { version = "1.2.1", features = ["serde"] } dora-message = { path = "../message" } -shared_memory = "0.12.0" -bincode = "1.3.3" -raw_sync = "0.1.5" tracing = "0.1" diff --git a/libraries/core/src/lib.rs b/libraries/core/src/lib.rs index 52e82393..a96517dc 100644 --- a/libraries/core/src/lib.rs +++ b/libraries/core/src/lib.rs @@ -8,7 +8,6 @@ pub mod config; pub mod coordinator_messages; pub mod daemon_messages; pub mod descriptor; -pub mod shared_memory; pub mod topics; pub fn adjust_shared_library_path(path: &Path) -> Result { diff --git a/libraries/shared-memory-server/Cargo.toml b/libraries/shared-memory-server/Cargo.toml new file mode 100644 index 00000000..3fefc942 --- /dev/null +++ b/libraries/shared-memory-server/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "shared-memory-server" +version.workspace = true +edition = "2021" +license = "Apache-2.0" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +eyre = "0.6.8" +serde = { version = "1.0.152", features = ["derive"] } +shared_memory = "0.12.0" +raw_sync = "0.1.5" +bincode = "1.3.3" +tracing = "0.1.37" diff --git a/libraries/core/src/shared_memory/channel.rs b/libraries/shared-memory-server/src/channel.rs similarity index 96% rename from libraries/core/src/shared_memory/channel.rs rename to libraries/shared-memory-server/src/channel.rs index 10643122..6f911b8e 100644 --- a/libraries/core/src/shared_memory/channel.rs +++ b/libraries/shared-memory-server/src/channel.rs @@ -5,7 +5,7 @@ use shared_memory::Shmem; use std::{ mem, slice, sync::atomic::{AtomicBool, AtomicU64}, - time::Duration, + time::{Duration, Instant}, }; pub struct ShmemChannel { @@ -81,12 +81,17 @@ impl ShmemChannel { }) } - pub fn send(&mut self, value: &T) -> eyre::Result<()> + pub fn send(&mut self, value: &T, start: Instant) -> eyre::Result<()> where T: Serialize + std::fmt::Debug, { let msg = bincode::serialize(value).wrap_err("failed to serialize value")?; + let elapsed = start.elapsed(); + if elapsed.as_micros() > 1 { + tracing::debug!("before send: {elapsed:?}"); + } + self.send_raw(&msg) } diff --git a/libraries/core/src/shared_memory/mod.rs b/libraries/shared-memory-server/src/lib.rs similarity index 88% rename from libraries/core/src/shared_memory/mod.rs rename to libraries/shared-memory-server/src/lib.rs index d230f535..0e482c8b 100644 --- a/libraries/core/src/shared_memory/mod.rs +++ b/libraries/shared-memory-server/src/lib.rs @@ -1,9 +1,9 @@ use self::channel::ShmemChannel; use eyre::{eyre, Context}; use serde::{Deserialize, Serialize}; -use shared_memory::Shmem; +pub use shared_memory::{Shmem, ShmemConf}; use std::marker::PhantomData; -use std::time::Duration; +use std::time::{Duration, Instant}; mod channel; @@ -40,7 +40,7 @@ impl ShmemServer { U: Serialize + std::fmt::Debug, { assert!(self.reply_expected); - self.channel.send(value)?; + self.channel.send(value, Instant::now())?; self.reply_expected = false; Ok(()) } @@ -61,13 +61,13 @@ impl ShmemClient { }) } - pub fn request(&mut self, value: &T) -> eyre::Result + pub fn request(&mut self, value: &T, start: Instant) -> eyre::Result where T: Serialize + std::fmt::Debug, U: for<'a> Deserialize<'a> + std::fmt::Debug, { self.channel - .send(value) + .send(value, start) .wrap_err("failed to send request")?; self.channel .receive(self.timeout)