diff --git a/Cargo.lock b/Cargo.lock index 3d46e6c7..fbc31192 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2939,6 +2939,7 @@ dependencies = [ name = "dora-cli" version = "0.3.12" dependencies = [ + "arrow", "bat", "clap 4.5.32", "colored", @@ -2955,6 +2956,7 @@ dependencies = [ "dora-tracing", "dunce", "duration-str", + "enum_dispatch", "env_logger 0.11.6", "eyre", "futures", @@ -2977,6 +2979,7 @@ dependencies = [ "tracing-log 0.2.0", "uuid 1.16.0", "webbrowser 0.8.15", + "zenoh 1.3.0", ] [[package]] @@ -3022,6 +3025,7 @@ dependencies = [ "url", "uuid 1.16.0", "which", + "zenoh 1.3.0", ] [[package]] @@ -3882,6 +3886,18 @@ dependencies = [ "syn 2.0.101", ] +[[package]] +name = "enum_dispatch" +version = "0.3.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa18ce2bc66555b3218614519ac839ddb759a7d6720732f979ef8d13be147ecd" +dependencies = [ + "once_cell", + "proc-macro2", + "quote", + "syn 2.0.101", +] + [[package]] name = "enumflags2" version = "0.7.11" @@ -5138,9 +5154,9 @@ checksum = "fbf6a919d6cf397374f7dfeeea91d974c7c0a7221d0d0f4f20d859d329e53fcc" [[package]] name = "hermit-abi" -version = "0.5.0" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fbd780fe5cc30f81464441920d82ac8740e2e46b29a6fad543ddd075229ce37e" +checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c" [[package]] name = "hex" @@ -5900,7 +5916,7 @@ version = "0.4.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e04d7f318608d35d4b61ddd75cbdaee86b023ebe2bd5a66ee0915f0bf93095a9" dependencies = [ - "hermit-abi 0.5.0", + "hermit-abi 0.5.2", "libc", "windows-sys 0.59.0", ] @@ -7873,11 +7889,11 @@ checksum = "d75b0bedcc4fe52caa0e03d9f1151a323e4aa5e2d78ba3580400cd3c9e2bc4bc" [[package]] name = "onig" -version = "6.4.0" +version = "6.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c4b31c8722ad9171c6d77d3557db078cab2bd50afcc9d09c8b315c59df8ca4f" +checksum = "336b9c63443aceef14bea841b899035ae3abe89b7c486aaf4c5bd8aafedac3f0" dependencies = [ - "bitflags 1.3.2", + "bitflags 2.9.0", "libc", "once_cell", "onig_sys", @@ -7885,9 +7901,9 @@ dependencies = [ [[package]] name = "onig_sys" -version = "69.8.1" +version = "69.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b829e3d7e9cc74c7e315ee8edb185bf4190da5acde74afd7fc59c35b1f086e7" +checksum = "c7f86c6eef3d6df15f23bcfb6af487cbd2fed4e5581d58d5bf1f5f8b7f6727dc" dependencies = [ "cc", "pkg-config", @@ -14413,9 +14429,9 @@ checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" [[package]] name = "value-bag" -version = "1.10.0" +version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ef4c4aa54d5d05a279399bfa921ec387b7aba77caf7a682ae8d86785b8fdad2" +checksum = "943ce29a8a743eb10d6082545d861b24f9d1b160b7d741e0f2cdf726bec909c5" [[package]] name = "variantly" diff --git a/Cargo.toml b/Cargo.toml index 9d705ba2..f105e875 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -95,6 +95,7 @@ pyo3 = { version = "0.23", features = [ pythonize = "0.23" git2 = { version = "0.18.0", features = ["vendored-openssl"] } serde_yaml = "0.9.33" +zenoh = "1.1.1" [package] name = "dora-examples" diff --git a/binaries/cli/Cargo.toml b/binaries/cli/Cargo.toml index 797fb9e0..a527d6fe 100644 --- a/binaries/cli/Cargo.toml +++ b/binaries/cli/Cargo.toml @@ -20,9 +20,10 @@ tracing = ["dep:dora-tracing"] python = ["pyo3"] [dependencies] +arrow = { workspace = true } clap = { version = "4.0.3", features = ["derive"] } eyre = "0.6.8" -dora-core = { workspace = true } +dora-core = { workspace = true, features = ["zenoh"] } dora-message = { workspace = true } dora-node-api-c = { workspace = true } dora-operator-api-c = { workspace = true } @@ -65,6 +66,8 @@ pyo3 = { workspace = true, features = [ self-replace = "1.5.0" dunce = "1.0.5" git2 = { workspace = true } +zenoh = { workspace = true } +enum_dispatch = "0.3.13" [build-dependencies] pyo3-build-config = "0.23" diff --git a/binaries/cli/src/command/inspect.rs b/binaries/cli/src/command/inspect.rs new file mode 100644 index 00000000..0057a7ae --- /dev/null +++ b/binaries/cli/src/command/inspect.rs @@ -0,0 +1,254 @@ +use std::{net::IpAddr, ptr::NonNull, sync::Arc}; + +use clap::Args; +use colored::Colorize; +use dora_core::{ + config::InputMapping, + topics::{ + open_zenoh_session, zenoh_output_publish_topic, DORA_COORDINATOR_PORT_CONTROL_DEFAULT, + }, +}; +use dora_message::{ + cli_to_coordinator::ControlRequest, + common::Timestamped, + coordinator_to_cli::ControlRequestReply, + daemon_to_daemon::InterDaemonEvent, + id::{DataId, NodeId}, + metadata::{ArrowTypeInfo, BufferOffset}, +}; +use eyre::{bail, eyre, Context}; +use tokio::{runtime::Builder, task::JoinSet}; +use uuid::Uuid; + +use crate::{ + command::{default_tracing, Executable}, + common::{connect_to_coordinator, resolve_dataflow_identifier}, + LOCALHOST, +}; + +/// Inspect data in terminal. +#[derive(Debug, Args)] +pub struct Inspect { + /// Identifier of the dataflow + #[clap(long, short, value_name = "UUID_OR_NAME")] + dataflow: Option, + /// Data to inspect, e.g. `node_id/output_id` + #[clap(value_name = "DATA")] + data: Vec, + /// Address of the dora coordinator + #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] + coordinator_addr: IpAddr, + /// Port number of the coordinator control server + #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] + coordinator_port: u16, +} + +impl Executable for Inspect { + fn execute(self) -> eyre::Result<()> { + default_tracing()?; + + inspect( + self.dataflow, + self.data, + self.coordinator_addr, + self.coordinator_port, + ) + } +} + +fn inspect( + dataflow: Option, + data: Vec, + coordinator_addr: IpAddr, + coordinator_port: u16, +) -> eyre::Result<()> { + if data.is_empty() { + bail!("No data to inspect provided. Please provide at least one `node_id/output_id` pair."); + } + let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into()) + .wrap_err("failed to connect to dora coordinator")?; + let dataflow_id = resolve_dataflow_identifier(&mut *session, dataflow.as_deref())?; + let data = data + .into_iter() + .map(|s| { + match serde_json::from_value::(serde_json::Value::String(s.clone())) { + Ok(InputMapping::User(user)) => Ok((user.source, user.output)), + Ok(_) => { + bail!("Reserved input mapping cannot be inspected") + } + Err(e) => bail!("Invalid output id `{s}`: {e}"), + } + }) + .collect::>>()?; + let dataflow_descriptor = { + let reply_raw = session + .request( + &serde_json::to_vec(&ControlRequest::Info { + dataflow_uuid: dataflow_id, + }) + .unwrap(), + ) + .wrap_err("failed to send message")?; + let reply: ControlRequestReply = + serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; + match reply { + ControlRequestReply::DataflowInfo { descriptor, .. } => descriptor, + ControlRequestReply::Error(err) => bail!("{err}"), + other => bail!("unexpected list dataflow reply: {other:?}"), + } + }; + if !dataflow_descriptor.debug.publish_all_messages_to_zenoh { + bail!( + "Dataflow `{dataflow_id}` does not have `publish_all_messages_to_zenoh` enabled. You should enable it in order to inspect data.\n\ + \n\ + Tip; Add the following snipppet to your dataflow descriptor:\n\ + \n\ + ```\n\ + _unstable_debug:\n publish_all_messages_to_zenoh: true\n\ + ``` + " + ); + } + + let outputs = data.clone(); + + let rt = Builder::new_multi_thread() + .enable_all() + .build() + .context("tokio runtime failed")?; + rt.block_on(async move { + let zenoh_session = open_zenoh_session(Some(coordinator_addr)) + .await + .context("failed to open zenoh session")?; + + let mut join_set = JoinSet::new(); + for (node_id, output_id) in outputs { + join_set.spawn(log_to_terminal( + zenoh_session.clone(), + dataflow_id, + node_id, + output_id, + )); + } + while let Some(res) = join_set.join_next().await { + match res { + Ok(Ok(())) => {} + Ok(Err(e)) => { + eprintln!("Error while inspecting output: {e}"); + } + Err(e) => { + eprintln!("Join error: {e}"); + } + } + } + + Result::<_, eyre::Error>::Ok(()) + }) +} + +fn buffer_into_arrow_array( + raw_buffer: &arrow::buffer::Buffer, + type_info: &ArrowTypeInfo, +) -> eyre::Result { + if raw_buffer.is_empty() { + return Ok(arrow::array::ArrayData::new_empty(&type_info.data_type)); + } + + let mut buffers = Vec::new(); + for BufferOffset { offset, len } in &type_info.buffer_offsets { + buffers.push(raw_buffer.slice_with_length(*offset, *len)); + } + + let mut child_data = Vec::new(); + for child_type_info in &type_info.child_data { + child_data.push(buffer_into_arrow_array(raw_buffer, child_type_info)?) + } + + arrow::array::ArrayData::try_new( + type_info.data_type.clone(), + type_info.len, + type_info + .validity + .clone() + .map(arrow::buffer::Buffer::from_vec), + type_info.offset, + buffers, + child_data, + ) + .context("Error creating Arrow array") +} + +async fn log_to_terminal( + zenoh_session: zenoh::Session, + dataflow_id: Uuid, + node_id: NodeId, + output_id: DataId, +) -> eyre::Result<()> { + let subscribe_topic = zenoh_output_publish_topic(dataflow_id, &node_id, &output_id); + let output_name = format!("{node_id}/{output_id}"); + let subscriber = zenoh_session + .declare_subscriber(subscribe_topic) + .await + .map_err(|e| eyre!(e)) + .wrap_err_with(|| format!("failed to subscribe to {output_name}"))?; + + let output_name = output_name.green(); + while let Ok(sample) = subscriber.recv_async().await { + let event = match Timestamped::deserialize_inter_daemon_event(&sample.payload().to_bytes()) + { + Ok(event) => event, + Err(_) => { + eprintln!("Received invalid event"); + continue; + } + }; + match event.inner { + InterDaemonEvent::Output { metadata, data, .. } => { + use std::fmt::Write; + + let mut output = format!("{output_name}\t"); + if let Some(data) = data { + let ptr = NonNull::new(data.as_ptr() as *mut u8).unwrap(); + let len = data.len(); + let buffer = unsafe { + arrow::buffer::Buffer::from_custom_allocation(ptr, len, Arc::new(data)) + }; + let array = match buffer_into_arrow_array(&buffer, &metadata.type_info) { + Ok(array) => array, + Err(e) => { + eprintln!("invalid data: {e}"); + continue; + } + }; + let display = if array.is_empty() { + "[]".to_owned() + } else { + let mut display = format!("{:?}", arrow::array::make_array(array)); + display = display + .split_once('\n') + .map(|(_, content)| content) + .unwrap_or(&display) + .replace("\n ", " "); + if display.ends_with(",\n]") { + display.truncate(display.len() - 3); + display += " ]"; + } + display + }; + + write!(output, " {}={display}", "data".bold()).unwrap(); + } + if !metadata.parameters.is_empty() { + write!(output, " {}={:?}", "metadata".bold(), metadata.parameters).unwrap(); + } + println!("{output}"); + } + InterDaemonEvent::OutputClosed { .. } => { + eprintln!("Output {node_id}/{output_id} closed"); + break; + } + } + } + + Ok(()) +} diff --git a/binaries/cli/src/command/logs.rs b/binaries/cli/src/command/logs.rs index 06496ad6..27cacad7 100644 --- a/binaries/cli/src/command/logs.rs +++ b/binaries/cli/src/command/logs.rs @@ -1,10 +1,12 @@ use super::{default_tracing, Executable}; -use crate::common::{connect_to_coordinator, query_running_dataflows}; +use crate::common::{connect_to_coordinator, resolve_dataflow_identifier}; use bat::{Input, PrettyPrinter}; use clap::Args; use communication_layer_request_reply::TcpRequestReplyConnection; use dora_core::topics::{DORA_COORDINATOR_PORT_CONTROL_DEFAULT, LOCALHOST}; -use dora_message::{cli_to_coordinator::ControlRequest, coordinator_to_cli::ControlRequestReply}; +use dora_message::{ + cli_to_coordinator::ControlRequest, coordinator_to_cli::ControlRequestReply, id::NodeId, +}; use eyre::{bail, Context, Result}; use uuid::Uuid; @@ -16,7 +18,7 @@ pub struct LogsArgs { pub dataflow: Option, /// Show logs for the given node #[clap(value_name = "NAME")] - pub node: String, + pub node: NodeId, /// Address of the dora coordinator #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] pub coordinator_addr: std::net::IpAddr, @@ -32,36 +34,17 @@ impl Executable for LogsArgs { let mut session = connect_to_coordinator((self.coordinator_addr, self.coordinator_port).into()) .wrap_err("failed to connect to dora coordinator")?; - let list = - query_running_dataflows(&mut *session).wrap_err("failed to query running dataflows")?; - if let Some(dataflow) = self.dataflow { - let uuid = Uuid::parse_str(&dataflow).ok(); - let name = if uuid.is_some() { None } else { Some(dataflow) }; - logs(&mut *session, uuid, name, self.node) - } else { - let active = list.get_active(); - let uuid = match &active[..] { - [] => bail!("No dataflows are running"), - [uuid] => uuid.clone(), - _ => inquire::Select::new("Choose dataflow to show logs:", active).prompt()?, - }; - logs(&mut *session, Some(uuid.uuid), None, self.node) - } + let uuid = resolve_dataflow_identifier(&mut *session, self.dataflow.as_deref())?; + logs(&mut *session, uuid, self.node) } } -pub fn logs( - session: &mut TcpRequestReplyConnection, - uuid: Option, - name: Option, - node: String, -) -> Result<()> { +pub fn logs(session: &mut TcpRequestReplyConnection, uuid: Uuid, node: NodeId) -> Result<()> { let logs = { let reply_raw = session .request( &serde_json::to_vec(&ControlRequest::Logs { uuid, - name, node: node.clone(), }) .wrap_err("")?, diff --git a/binaries/cli/src/command/mod.rs b/binaries/cli/src/command/mod.rs index 617f9b05..30b9bee8 100644 --- a/binaries/cli/src/command/mod.rs +++ b/binaries/cli/src/command/mod.rs @@ -4,6 +4,7 @@ mod coordinator; mod daemon; mod destroy; mod graph; +mod inspect; mod list; mod logs; mod new; @@ -14,6 +15,7 @@ mod start; mod stop; mod up; +use enum_dispatch::enum_dispatch; pub use run::run_func; use build::Build; @@ -23,17 +25,19 @@ use daemon::Daemon; use destroy::Destroy; use eyre::Context; use graph::Graph; +use inspect::Inspect; use list::ListArgs; use logs::LogsArgs; use new::NewArgs; use run::Run; use runtime::Runtime; -use self_::SelfSubCommand; +use self_::Self_; use start::Start; use stop::Stop; use up::Up; /// dora-rs cli client +#[enum_dispatch(Executable)] #[derive(Debug, clap::Subcommand)] pub enum Command { Check(Check), @@ -57,11 +61,9 @@ pub enum Command { Daemon(Daemon), Runtime(Runtime), Coordinator(Coordinator), + Inspect(Inspect), - Self_ { - #[clap(subcommand)] - command: SelfSubCommand, - }, + Self_(Self_), } fn default_tracing() -> eyre::Result<()> { @@ -77,28 +79,7 @@ fn default_tracing() -> eyre::Result<()> { Ok(()) } +#[enum_dispatch] pub trait Executable { fn execute(self) -> eyre::Result<()>; } - -impl Executable for Command { - fn execute(self) -> eyre::Result<()> { - match self { - Command::Check(args) => args.execute(), - Command::Coordinator(args) => args.execute(), - Command::Graph(args) => args.execute(), - Command::Build(args) => args.execute(), - Command::New(args) => args.execute(), - Command::Run(args) => args.execute(), - Command::Up(args) => args.execute(), - Command::Destroy(args) => args.execute(), - Command::Start(args) => args.execute(), - Command::Stop(args) => args.execute(), - Command::List(args) => args.execute(), - Command::Logs(args) => args.execute(), - Command::Daemon(args) => args.execute(), - Command::Self_ { command } => command.execute(), - Command::Runtime(args) => args.execute(), - } - } -} diff --git a/binaries/cli/src/command/self_.rs b/binaries/cli/src/command/self_.rs index 1914e46b..bd9fe1b5 100644 --- a/binaries/cli/src/command/self_.rs +++ b/binaries/cli/src/command/self_.rs @@ -2,6 +2,12 @@ use super::{default_tracing, Executable}; use clap::Subcommand; use eyre::{bail, Context}; +#[derive(Debug, clap::Args)] +pub struct Self_ { + #[clap(subcommand)] + command: SelfSubCommand, +} + #[derive(Debug, Subcommand)] /// Dora CLI self-management commands pub enum SelfSubCommand { @@ -19,11 +25,11 @@ pub enum SelfSubCommand { }, } -impl Executable for SelfSubCommand { +impl Executable for Self_ { fn execute(self) -> eyre::Result<()> { default_tracing()?; - match self { + match self.command { SelfSubCommand::Update { check_only } => { println!("Checking for updates..."); diff --git a/binaries/cli/src/common.rs b/binaries/cli/src/common.rs index e02a1673..4c25fdc1 100644 --- a/binaries/cli/src/common.rs +++ b/binaries/cli/src/common.rs @@ -50,6 +50,33 @@ pub(crate) fn query_running_dataflows( Ok(ids) } +pub(crate) fn resolve_dataflow_identifier( + session: &mut TcpRequestReplyConnection, + name_or_uuid: Option<&str>, +) -> eyre::Result { + if let Some(uuid) = name_or_uuid.and_then(|s| Uuid::parse_str(s).ok()) { + return Ok(uuid); + } + + let list = query_running_dataflows(session).wrap_err("failed to query running dataflows")?; + let active: Vec = list.get_active(); + if let Some(name) = name_or_uuid { + let Some(dataflow) = active.iter().find(|it| it.name.as_deref() == Some(name)) else { + bail!("No dataflow with name `{name}` is running"); + }; + return Ok(dataflow.uuid); + } + Ok(match &active[..] { + [] => bail!("No dataflows are running"), + [entry] => entry.uuid, + _ => { + inquire::Select::new("Choose dataflow to show logs:", active) + .prompt()? + .uuid + } + }) +} + pub(crate) fn connect_to_coordinator( coordinator_addr: SocketAddr, ) -> std::io::Result> { diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 67ee69a4..2147ba54 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -625,32 +625,30 @@ async fn start_inner( let _ = reply_sender.send(Err(err)); } }, - ControlRequest::Logs { uuid, name, node } => { - let dataflow_uuid = if let Some(uuid) = uuid { - Ok(uuid) - } else if let Some(name) = name { - resolve_name(name, &running_dataflows, &archived_dataflows) + ControlRequest::Logs { uuid, node } => { + let reply = retrieve_logs( + &running_dataflows, + &archived_dataflows, + uuid, + node, + &mut daemon_connections, + clock.new_timestamp(), + ) + .await + .map(ControlRequestReply::Logs); + let _ = reply_sender.send(reply); + } + ControlRequest::Info { dataflow_uuid } => { + if let Some(dataflow) = running_dataflows.get(&dataflow_uuid) { + let _ = reply_sender.send(Ok(ControlRequestReply::DataflowInfo { + uuid: dataflow.uuid, + name: dataflow.name.clone(), + descriptor: dataflow.descriptor.clone(), + })); } else { - Err(eyre!("No uuid")) - }; - - match dataflow_uuid { - Ok(uuid) => { - let reply = retrieve_logs( - &running_dataflows, - &archived_dataflows, - uuid, - node.into(), - &mut daemon_connections, - clock.new_timestamp(), - ) - .await - .map(ControlRequestReply::Logs); - let _ = reply_sender.send(reply); - } - Err(err) => { - let _ = reply_sender.send(Err(err)); - } + let _ = reply_sender.send(Err(eyre!( + "No running dataflow with uuid `{dataflow_uuid}`" + ))); } } ControlRequest::Destroy => { @@ -1011,6 +1009,7 @@ struct RunningBuild { struct RunningDataflow { name: Option, uuid: Uuid, + descriptor: Descriptor, /// The IDs of the daemons that the dataflow is running on. daemons: BTreeSet, /// IDs of daemons that are waiting until all nodes are started. @@ -1406,7 +1405,7 @@ async fn start_dataflow( } = spawn_dataflow( build_id, session_id, - dataflow, + dataflow.clone(), local_working_dir, daemon_connections, clock, @@ -1416,6 +1415,7 @@ async fn start_dataflow( Ok(RunningDataflow { uuid, name, + descriptor: dataflow, pending_daemons: if daemons.len() > 1 { daemons.clone() } else { diff --git a/binaries/daemon/Cargo.toml b/binaries/daemon/Cargo.toml index c9218d59..b6143502 100644 --- a/binaries/daemon/Cargo.toml +++ b/binaries/daemon/Cargo.toml @@ -25,7 +25,7 @@ tracing = "0.1.36" tracing-opentelemetry = { version = "0.18.0", optional = true } futures-concurrency = "7.1.0" serde_json = "1.0.86" -dora-core = { workspace = true, features = ["build"] } +dora-core = { workspace = true, features = ["build", "zenoh"] } flume = "0.10.14" dora-download = { workspace = true } dora-tracing = { workspace = true, optional = true } @@ -44,7 +44,7 @@ which = "5.0.0" sysinfo = "0.30.11" crossbeam = "0.8.4" crossbeam-skiplist = "0.1.3" -zenoh = "1.1.1" +zenoh = { workspace = true } url = "2.5.4" git2 = { workspace = true } dunce = "1.0.5" diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 89642c7b..e7bdb49e 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -8,7 +8,7 @@ use dora_core::{ read_as_descriptor, CoreNodeKind, Descriptor, DescriptorExt, ResolvedNode, RuntimeNode, DYNAMIC_SOURCE, }, - topics::LOCALHOST, + topics::{open_zenoh_session, zenoh_output_publish_topic, LOCALHOST}, uhlc::{self, HLC}, }; use dora_message::{ @@ -304,99 +304,9 @@ impl Daemon { None => None, }; - let zenoh_session = match std::env::var(zenoh::Config::DEFAULT_CONFIG_PATH_ENV) { - Ok(path) => { - let zenoh_config = zenoh::Config::from_file(&path) - .map_err(|e| eyre!(e)) - .wrap_err_with(|| format!("failed to read zenoh config from {path}"))?; - zenoh::open(zenoh_config) - .await - .map_err(|e| eyre!(e)) - .context("failed to open zenoh session")? - } - Err(std::env::VarError::NotPresent) => { - let mut zenoh_config = zenoh::Config::default(); - - if let Some(addr) = coordinator_addr { - // Linkstate make it possible to connect two daemons on different network through a public daemon - // TODO: There is currently a CI/CD Error in windows linkstate. - if cfg!(not(target_os = "windows")) { - zenoh_config - .insert_json5("routing/peer", r#"{ mode: "linkstate" }"#) - .unwrap(); - } - - zenoh_config - .insert_json5( - "connect/endpoints", - &format!( - r#"{{ router: ["tcp/[::]:7447"], peer: ["tcp/{}:5456"] }}"#, - addr.ip() - ), - ) - .unwrap(); - zenoh_config - .insert_json5( - "listen/endpoints", - r#"{ router: ["tcp/[::]:7447"], peer: ["tcp/[::]:5456"] }"#, - ) - .unwrap(); - if cfg!(target_os = "macos") { - warn!("disabling multicast on macos systems. Enable it with the ZENOH_CONFIG env variable or file"); - zenoh_config - .insert_json5("scouting/multicast", r#"{ enabled: false }"#) - .unwrap(); - } - }; - if let Ok(zenoh_session) = zenoh::open(zenoh_config).await { - zenoh_session - } else { - warn!( - "failed to open zenoh session, retrying with default config + coordinator" - ); - let mut zenoh_config = zenoh::Config::default(); - // Linkstate make it possible to connect two daemons on different network through a public daemon - // TODO: There is currently a CI/CD Error in windows linkstate. - if cfg!(not(target_os = "windows")) { - zenoh_config - .insert_json5("routing/peer", r#"{ mode: "linkstate" }"#) - .unwrap(); - } - - if let Some(addr) = coordinator_addr { - zenoh_config - .insert_json5( - "connect/endpoints", - &format!( - r#"{{ router: ["tcp/[::]:7447"], peer: ["tcp/{}:5456"] }}"#, - addr.ip() - ), - ) - .unwrap(); - if cfg!(target_os = "macos") { - warn!("disabling multicast on macos systems. Enable it with the ZENOH_CONFIG env variable or file"); - zenoh_config - .insert_json5("scouting/multicast", r#"{ enabled: false }"#) - .unwrap(); - } - } - if let Ok(zenoh_session) = zenoh::open(zenoh_config).await { - zenoh_session - } else { - warn!("failed to open zenoh session, retrying with default config"); - let zenoh_config = zenoh::Config::default(); - zenoh::open(zenoh_config) - .await - .map_err(|e| eyre!(e)) - .context("failed to open zenoh session")? - } - } - } - Err(std::env::VarError::NotUnicode(_)) => eyre::bail!( - "{} env variable is not valid unicode", - zenoh::Config::DEFAULT_CONFIG_PATH_ENV - ), - }; + let zenoh_session = open_zenoh_session(coordinator_addr.map(|addr| addr.ip())) + .await + .wrap_err("failed to open zenoh session")?; let (dora_events_tx, dora_events_rx) = mpsc::channel(5); let daemon = Self { logger: Logger { @@ -1256,7 +1166,8 @@ impl Daemon { .clone() .wrap_err("no remote_daemon_events_tx channel")?; let mut finished_rx = dataflow.finished_tx.subscribe(); - let subscribe_topic = dataflow.output_publish_topic(output_id); + let subscribe_topic = + zenoh_output_publish_topic(dataflow.id, &output_id.0, &output_id.1); tracing::debug!("declaring subscriber on {subscribe_topic}"); let subscriber = self .zenoh_session @@ -1712,7 +1623,8 @@ impl Daemon { let publisher = match dataflow.publishers.get(output_id) { Some(publisher) => publisher, None => { - let publish_topic = dataflow.output_publish_topic(output_id); + let publish_topic = + zenoh_output_publish_topic(dataflow.id, &output_id.0, &output_id.1); tracing::debug!("declaring publisher on {publish_topic}"); let publisher = self .zenoh_session @@ -2599,13 +2511,6 @@ impl RunningDataflow { Ok(()) } - - fn output_publish_topic(&self, output_id: &OutputId) -> String { - let network_id = "default"; - let dataflow_id = self.id; - let OutputId(node_id, output_id) = output_id; - format!("dora/{network_id}/{dataflow_id}/output/{node_id}/{output_id}") - } } fn empty_type_info() -> ArrowTypeInfo { diff --git a/libraries/core/Cargo.toml b/libraries/core/Cargo.toml index fca59d4c..2f95dbd9 100644 --- a/libraries/core/Cargo.toml +++ b/libraries/core/Cargo.toml @@ -12,6 +12,7 @@ repository.workspace = true [features] build = ["dep:git2", "dep:url"] +zenoh = ["dep:zenoh"] [dependencies] dora-message = { workspace = true } @@ -32,3 +33,4 @@ itertools = "0.14" url = { version = "2.5.4", optional = true } git2 = { workspace = true, optional = true } fs_extra = "1.3.0" +zenoh = { workspace = true, optional = true } diff --git a/libraries/core/src/topics.rs b/libraries/core/src/topics.rs index 7ae0e705..8969767a 100644 --- a/libraries/core/src/topics.rs +++ b/libraries/core/src/topics.rs @@ -6,3 +6,112 @@ pub const DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT: u16 = 53291; pub const DORA_COORDINATOR_PORT_CONTROL_DEFAULT: u16 = 6012; pub const MANUAL_STOP: &str = "dora/stop"; + +#[cfg(feature = "zenoh")] +pub async fn open_zenoh_session(coordinator_addr: Option) -> eyre::Result { + use eyre::{eyre, Context}; + use tracing::warn; + + let zenoh_session = match std::env::var(zenoh::Config::DEFAULT_CONFIG_PATH_ENV) { + Ok(path) => { + let zenoh_config = zenoh::Config::from_file(&path) + .map_err(|e| eyre!(e)) + .wrap_err_with(|| format!("failed to read zenoh config from {path}"))?; + zenoh::open(zenoh_config) + .await + .map_err(|e| eyre!(e)) + .context("failed to open zenoh session")? + } + Err(std::env::VarError::NotPresent) => { + let mut zenoh_config = zenoh::Config::default(); + + if let Some(addr) = coordinator_addr { + // Linkstate make it possible to connect two daemons on different network through a public daemon + // TODO: There is currently a CI/CD Error in windows linkstate. + if cfg!(not(target_os = "windows")) { + zenoh_config + .insert_json5("routing/peer", r#"{ mode: "linkstate" }"#) + .unwrap(); + } + + zenoh_config + .insert_json5( + "connect/endpoints", + &format!( + r#"{{ router: ["tcp/[::]:7447"], peer: ["tcp/{}:5456"] }}"#, + addr + ), + ) + .unwrap(); + zenoh_config + .insert_json5( + "listen/endpoints", + r#"{ router: ["tcp/[::]:7447"], peer: ["tcp/[::]:5456"] }"#, + ) + .unwrap(); + if cfg!(target_os = "macos") { + warn!("disabling multicast on macos systems. Enable it with the ZENOH_CONFIG env variable or file"); + zenoh_config + .insert_json5("scouting/multicast", r#"{ enabled: false }"#) + .unwrap(); + } + }; + if let Ok(zenoh_session) = zenoh::open(zenoh_config).await { + zenoh_session + } else { + warn!("failed to open zenoh session, retrying with default config + coordinator"); + let mut zenoh_config = zenoh::Config::default(); + // Linkstate make it possible to connect two daemons on different network through a public daemon + // TODO: There is currently a CI/CD Error in windows linkstate. + if cfg!(not(target_os = "windows")) { + zenoh_config + .insert_json5("routing/peer", r#"{ mode: "linkstate" }"#) + .unwrap(); + } + + if let Some(addr) = coordinator_addr { + zenoh_config + .insert_json5( + "connect/endpoints", + &format!( + r#"{{ router: ["tcp/[::]:7447"], peer: ["tcp/{}:5456"] }}"#, + addr + ), + ) + .unwrap(); + if cfg!(target_os = "macos") { + warn!("disabling multicast on macos systems. Enable it with the ZENOH_CONFIG env variable or file"); + zenoh_config + .insert_json5("scouting/multicast", r#"{ enabled: false }"#) + .unwrap(); + } + } + if let Ok(zenoh_session) = zenoh::open(zenoh_config).await { + zenoh_session + } else { + warn!("failed to open zenoh session, retrying with default config"); + let zenoh_config = zenoh::Config::default(); + zenoh::open(zenoh_config) + .await + .map_err(|e| eyre!(e)) + .context("failed to open zenoh session")? + } + } + } + Err(std::env::VarError::NotUnicode(_)) => eyre::bail!( + "{} env variable is not valid unicode", + zenoh::Config::DEFAULT_CONFIG_PATH_ENV + ), + }; + Ok(zenoh_session) +} + +#[cfg(feature = "zenoh")] +pub fn zenoh_output_publish_topic( + dataflow_id: uuid::Uuid, + node_id: &dora_message::id::NodeId, + output_id: &dora_message::id::DataId, +) -> String { + let network_id = "default"; + format!("dora/{network_id}/{dataflow_id}/output/{node_id}/{output_id}") +} diff --git a/libraries/message/src/cli_to_coordinator.rs b/libraries/message/src/cli_to_coordinator.rs index bf3d3a03..b76d49c8 100644 --- a/libraries/message/src/cli_to_coordinator.rs +++ b/libraries/message/src/cli_to_coordinator.rs @@ -64,12 +64,14 @@ pub enum ControlRequest { grace_duration: Option, }, Logs { - uuid: Option, - name: Option, - node: String, + uuid: Uuid, + node: NodeId, }, Destroy, List, + Info { + dataflow_uuid: Uuid, + }, DaemonConnected, ConnectedMachines, LogSubscribe { diff --git a/libraries/message/src/coordinator_to_cli.rs b/libraries/message/src/coordinator_to_cli.rs index 02243468..68e4b039 100644 --- a/libraries/message/src/coordinator_to_cli.rs +++ b/libraries/message/src/coordinator_to_cli.rs @@ -6,7 +6,7 @@ use std::{ use uuid::Uuid; pub use crate::common::{LogLevel, LogMessage, NodeError, NodeErrorCause, NodeExitStatus}; -use crate::{common::DaemonId, id::NodeId, BuildId}; +use crate::{common::DaemonId, descriptor::Descriptor, id::NodeId, BuildId}; #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] pub enum ControlRequestReply { @@ -33,6 +33,11 @@ pub enum ControlRequestReply { result: DataflowResult, }, DataflowList(DataflowList), + DataflowInfo { + uuid: Uuid, + name: Option, + descriptor: Descriptor, + }, DestroyOk, DaemonConnected(bool), ConnectedDaemons(BTreeSet),