Browse Source

Merge abdcc44a44 into ac3d0da130

pull/1036/merge
Mivik GitHub 6 months ago
parent
commit
b9ef3b2299
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
15 changed files with 494 additions and 200 deletions
  1. +26
    -10
      Cargo.lock
  2. +1
    -0
      Cargo.toml
  3. +4
    -1
      binaries/cli/Cargo.toml
  4. +254
    -0
      binaries/cli/src/command/inspect.rs
  5. +8
    -25
      binaries/cli/src/command/logs.rs
  6. +8
    -27
      binaries/cli/src/command/mod.rs
  7. +8
    -2
      binaries/cli/src/command/self_.rs
  8. +27
    -0
      binaries/cli/src/common.rs
  9. +26
    -26
      binaries/coordinator/src/lib.rs
  10. +2
    -2
      binaries/daemon/Cargo.toml
  11. +8
    -103
      binaries/daemon/src/lib.rs
  12. +2
    -0
      libraries/core/Cargo.toml
  13. +109
    -0
      libraries/core/src/topics.rs
  14. +5
    -3
      libraries/message/src/cli_to_coordinator.rs
  15. +6
    -1
      libraries/message/src/coordinator_to_cli.rs

+ 26
- 10
Cargo.lock View File

@@ -2939,6 +2939,7 @@ dependencies = [
name = "dora-cli"
version = "0.3.12"
dependencies = [
"arrow",
"bat",
"clap 4.5.32",
"colored",
@@ -2955,6 +2956,7 @@ dependencies = [
"dora-tracing",
"dunce",
"duration-str",
"enum_dispatch",
"env_logger 0.11.6",
"eyre",
"futures",
@@ -2977,6 +2979,7 @@ dependencies = [
"tracing-log 0.2.0",
"uuid 1.16.0",
"webbrowser 0.8.15",
"zenoh 1.3.0",
]

[[package]]
@@ -3022,6 +3025,7 @@ dependencies = [
"url",
"uuid 1.16.0",
"which",
"zenoh 1.3.0",
]

[[package]]
@@ -3882,6 +3886,18 @@ dependencies = [
"syn 2.0.101",
]

[[package]]
name = "enum_dispatch"
version = "0.3.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aa18ce2bc66555b3218614519ac839ddb759a7d6720732f979ef8d13be147ecd"
dependencies = [
"once_cell",
"proc-macro2",
"quote",
"syn 2.0.101",
]

[[package]]
name = "enumflags2"
version = "0.7.11"
@@ -5138,9 +5154,9 @@ checksum = "fbf6a919d6cf397374f7dfeeea91d974c7c0a7221d0d0f4f20d859d329e53fcc"

[[package]]
name = "hermit-abi"
version = "0.5.0"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fbd780fe5cc30f81464441920d82ac8740e2e46b29a6fad543ddd075229ce37e"
checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c"

[[package]]
name = "hex"
@@ -5900,7 +5916,7 @@ version = "0.4.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e04d7f318608d35d4b61ddd75cbdaee86b023ebe2bd5a66ee0915f0bf93095a9"
dependencies = [
"hermit-abi 0.5.0",
"hermit-abi 0.5.2",
"libc",
"windows-sys 0.59.0",
]
@@ -7873,11 +7889,11 @@ checksum = "d75b0bedcc4fe52caa0e03d9f1151a323e4aa5e2d78ba3580400cd3c9e2bc4bc"

[[package]]
name = "onig"
version = "6.4.0"
version = "6.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c4b31c8722ad9171c6d77d3557db078cab2bd50afcc9d09c8b315c59df8ca4f"
checksum = "336b9c63443aceef14bea841b899035ae3abe89b7c486aaf4c5bd8aafedac3f0"
dependencies = [
"bitflags 1.3.2",
"bitflags 2.9.0",
"libc",
"once_cell",
"onig_sys",
@@ -7885,9 +7901,9 @@ dependencies = [

[[package]]
name = "onig_sys"
version = "69.8.1"
version = "69.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b829e3d7e9cc74c7e315ee8edb185bf4190da5acde74afd7fc59c35b1f086e7"
checksum = "c7f86c6eef3d6df15f23bcfb6af487cbd2fed4e5581d58d5bf1f5f8b7f6727dc"
dependencies = [
"cc",
"pkg-config",
@@ -14413,9 +14429,9 @@ checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65"

[[package]]
name = "value-bag"
version = "1.10.0"
version = "1.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ef4c4aa54d5d05a279399bfa921ec387b7aba77caf7a682ae8d86785b8fdad2"
checksum = "943ce29a8a743eb10d6082545d861b24f9d1b160b7d741e0f2cdf726bec909c5"

[[package]]
name = "variantly"


+ 1
- 0
Cargo.toml View File

@@ -95,6 +95,7 @@ pyo3 = { version = "0.23", features = [
pythonize = "0.23"
git2 = { version = "0.18.0", features = ["vendored-openssl"] }
serde_yaml = "0.9.33"
zenoh = "1.1.1"

[package]
name = "dora-examples"


+ 4
- 1
binaries/cli/Cargo.toml View File

@@ -20,9 +20,10 @@ tracing = ["dep:dora-tracing"]
python = ["pyo3"]

[dependencies]
arrow = { workspace = true }
clap = { version = "4.0.3", features = ["derive"] }
eyre = "0.6.8"
dora-core = { workspace = true }
dora-core = { workspace = true, features = ["zenoh"] }
dora-message = { workspace = true }
dora-node-api-c = { workspace = true }
dora-operator-api-c = { workspace = true }
@@ -65,6 +66,8 @@ pyo3 = { workspace = true, features = [
self-replace = "1.5.0"
dunce = "1.0.5"
git2 = { workspace = true }
zenoh = { workspace = true }
enum_dispatch = "0.3.13"

[build-dependencies]
pyo3-build-config = "0.23"


+ 254
- 0
binaries/cli/src/command/inspect.rs View File

@@ -0,0 +1,254 @@
use std::{net::IpAddr, ptr::NonNull, sync::Arc};

use clap::Args;
use colored::Colorize;
use dora_core::{
config::InputMapping,
topics::{
open_zenoh_session, zenoh_output_publish_topic, DORA_COORDINATOR_PORT_CONTROL_DEFAULT,
},
};
use dora_message::{
cli_to_coordinator::ControlRequest,
common::Timestamped,
coordinator_to_cli::ControlRequestReply,
daemon_to_daemon::InterDaemonEvent,
id::{DataId, NodeId},
metadata::{ArrowTypeInfo, BufferOffset},
};
use eyre::{bail, eyre, Context};
use tokio::{runtime::Builder, task::JoinSet};
use uuid::Uuid;

use crate::{
command::{default_tracing, Executable},
common::{connect_to_coordinator, resolve_dataflow_identifier},
LOCALHOST,
};

/// Inspect data in terminal.
#[derive(Debug, Args)]
pub struct Inspect {
/// Identifier of the dataflow
#[clap(long, short, value_name = "UUID_OR_NAME")]
dataflow: Option<String>,
/// Data to inspect, e.g. `node_id/output_id`
#[clap(value_name = "DATA")]
data: Vec<String>,
/// Address of the dora coordinator
#[clap(long, value_name = "IP", default_value_t = LOCALHOST)]
coordinator_addr: IpAddr,
/// Port number of the coordinator control server
#[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)]
coordinator_port: u16,
}

impl Executable for Inspect {
fn execute(self) -> eyre::Result<()> {
default_tracing()?;

inspect(
self.dataflow,
self.data,
self.coordinator_addr,
self.coordinator_port,
)
}
}

fn inspect(
dataflow: Option<String>,
data: Vec<String>,
coordinator_addr: IpAddr,
coordinator_port: u16,
) -> eyre::Result<()> {
if data.is_empty() {
bail!("No data to inspect provided. Please provide at least one `node_id/output_id` pair.");
}
let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into())
.wrap_err("failed to connect to dora coordinator")?;
let dataflow_id = resolve_dataflow_identifier(&mut *session, dataflow.as_deref())?;
let data = data
.into_iter()
.map(|s| {
match serde_json::from_value::<InputMapping>(serde_json::Value::String(s.clone())) {
Ok(InputMapping::User(user)) => Ok((user.source, user.output)),
Ok(_) => {
bail!("Reserved input mapping cannot be inspected")
}
Err(e) => bail!("Invalid output id `{s}`: {e}"),
}
})
.collect::<eyre::Result<Vec<_>>>()?;
let dataflow_descriptor = {
let reply_raw = session
.request(
&serde_json::to_vec(&ControlRequest::Info {
dataflow_uuid: dataflow_id,
})
.unwrap(),
)
.wrap_err("failed to send message")?;
let reply: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match reply {
ControlRequestReply::DataflowInfo { descriptor, .. } => descriptor,
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected list dataflow reply: {other:?}"),
}
};
if !dataflow_descriptor.debug.publish_all_messages_to_zenoh {
bail!(
"Dataflow `{dataflow_id}` does not have `publish_all_messages_to_zenoh` enabled. You should enable it in order to inspect data.\n\
\n\
Tip; Add the following snipppet to your dataflow descriptor:\n\
\n\
```\n\
_unstable_debug:\n publish_all_messages_to_zenoh: true\n\
```
"
);
}

let outputs = data.clone();

let rt = Builder::new_multi_thread()
.enable_all()
.build()
.context("tokio runtime failed")?;
rt.block_on(async move {
let zenoh_session = open_zenoh_session(Some(coordinator_addr))
.await
.context("failed to open zenoh session")?;

let mut join_set = JoinSet::new();
for (node_id, output_id) in outputs {
join_set.spawn(log_to_terminal(
zenoh_session.clone(),
dataflow_id,
node_id,
output_id,
));
}
while let Some(res) = join_set.join_next().await {
match res {
Ok(Ok(())) => {}
Ok(Err(e)) => {
eprintln!("Error while inspecting output: {e}");
}
Err(e) => {
eprintln!("Join error: {e}");
}
}
}

Result::<_, eyre::Error>::Ok(())
})
}

fn buffer_into_arrow_array(
raw_buffer: &arrow::buffer::Buffer,
type_info: &ArrowTypeInfo,
) -> eyre::Result<arrow::array::ArrayData> {
if raw_buffer.is_empty() {
return Ok(arrow::array::ArrayData::new_empty(&type_info.data_type));
}

let mut buffers = Vec::new();
for BufferOffset { offset, len } in &type_info.buffer_offsets {
buffers.push(raw_buffer.slice_with_length(*offset, *len));
}

let mut child_data = Vec::new();
for child_type_info in &type_info.child_data {
child_data.push(buffer_into_arrow_array(raw_buffer, child_type_info)?)
}

arrow::array::ArrayData::try_new(
type_info.data_type.clone(),
type_info.len,
type_info
.validity
.clone()
.map(arrow::buffer::Buffer::from_vec),
type_info.offset,
buffers,
child_data,
)
.context("Error creating Arrow array")
}

async fn log_to_terminal(
zenoh_session: zenoh::Session,
dataflow_id: Uuid,
node_id: NodeId,
output_id: DataId,
) -> eyre::Result<()> {
let subscribe_topic = zenoh_output_publish_topic(dataflow_id, &node_id, &output_id);
let output_name = format!("{node_id}/{output_id}");
let subscriber = zenoh_session
.declare_subscriber(subscribe_topic)
.await
.map_err(|e| eyre!(e))
.wrap_err_with(|| format!("failed to subscribe to {output_name}"))?;

let output_name = output_name.green();
while let Ok(sample) = subscriber.recv_async().await {
let event = match Timestamped::deserialize_inter_daemon_event(&sample.payload().to_bytes())
{
Ok(event) => event,
Err(_) => {
eprintln!("Received invalid event");
continue;
}
};
match event.inner {
InterDaemonEvent::Output { metadata, data, .. } => {
use std::fmt::Write;

let mut output = format!("{output_name}\t");
if let Some(data) = data {
let ptr = NonNull::new(data.as_ptr() as *mut u8).unwrap();
let len = data.len();
let buffer = unsafe {
arrow::buffer::Buffer::from_custom_allocation(ptr, len, Arc::new(data))
};
let array = match buffer_into_arrow_array(&buffer, &metadata.type_info) {
Ok(array) => array,
Err(e) => {
eprintln!("invalid data: {e}");
continue;
}
};
let display = if array.is_empty() {
"[]".to_owned()
} else {
let mut display = format!("{:?}", arrow::array::make_array(array));
display = display
.split_once('\n')
.map(|(_, content)| content)
.unwrap_or(&display)
.replace("\n ", " ");
if display.ends_with(",\n]") {
display.truncate(display.len() - 3);
display += " ]";
}
display
};

write!(output, " {}={display}", "data".bold()).unwrap();
}
if !metadata.parameters.is_empty() {
write!(output, " {}={:?}", "metadata".bold(), metadata.parameters).unwrap();
}
println!("{output}");
}
InterDaemonEvent::OutputClosed { .. } => {
eprintln!("Output {node_id}/{output_id} closed");
break;
}
}
}

Ok(())
}

+ 8
- 25
binaries/cli/src/command/logs.rs View File

@@ -1,10 +1,12 @@
use super::{default_tracing, Executable};
use crate::common::{connect_to_coordinator, query_running_dataflows};
use crate::common::{connect_to_coordinator, resolve_dataflow_identifier};
use bat::{Input, PrettyPrinter};
use clap::Args;
use communication_layer_request_reply::TcpRequestReplyConnection;
use dora_core::topics::{DORA_COORDINATOR_PORT_CONTROL_DEFAULT, LOCALHOST};
use dora_message::{cli_to_coordinator::ControlRequest, coordinator_to_cli::ControlRequestReply};
use dora_message::{
cli_to_coordinator::ControlRequest, coordinator_to_cli::ControlRequestReply, id::NodeId,
};
use eyre::{bail, Context, Result};
use uuid::Uuid;

@@ -16,7 +18,7 @@ pub struct LogsArgs {
pub dataflow: Option<String>,
/// Show logs for the given node
#[clap(value_name = "NAME")]
pub node: String,
pub node: NodeId,
/// Address of the dora coordinator
#[clap(long, value_name = "IP", default_value_t = LOCALHOST)]
pub coordinator_addr: std::net::IpAddr,
@@ -32,36 +34,17 @@ impl Executable for LogsArgs {
let mut session =
connect_to_coordinator((self.coordinator_addr, self.coordinator_port).into())
.wrap_err("failed to connect to dora coordinator")?;
let list =
query_running_dataflows(&mut *session).wrap_err("failed to query running dataflows")?;
if let Some(dataflow) = self.dataflow {
let uuid = Uuid::parse_str(&dataflow).ok();
let name = if uuid.is_some() { None } else { Some(dataflow) };
logs(&mut *session, uuid, name, self.node)
} else {
let active = list.get_active();
let uuid = match &active[..] {
[] => bail!("No dataflows are running"),
[uuid] => uuid.clone(),
_ => inquire::Select::new("Choose dataflow to show logs:", active).prompt()?,
};
logs(&mut *session, Some(uuid.uuid), None, self.node)
}
let uuid = resolve_dataflow_identifier(&mut *session, self.dataflow.as_deref())?;
logs(&mut *session, uuid, self.node)
}
}

pub fn logs(
session: &mut TcpRequestReplyConnection,
uuid: Option<Uuid>,
name: Option<String>,
node: String,
) -> Result<()> {
pub fn logs(session: &mut TcpRequestReplyConnection, uuid: Uuid, node: NodeId) -> Result<()> {
let logs = {
let reply_raw = session
.request(
&serde_json::to_vec(&ControlRequest::Logs {
uuid,
name,
node: node.clone(),
})
.wrap_err("")?,


+ 8
- 27
binaries/cli/src/command/mod.rs View File

@@ -4,6 +4,7 @@ mod coordinator;
mod daemon;
mod destroy;
mod graph;
mod inspect;
mod list;
mod logs;
mod new;
@@ -14,6 +15,7 @@ mod start;
mod stop;
mod up;

use enum_dispatch::enum_dispatch;
pub use run::run_func;

use build::Build;
@@ -23,17 +25,19 @@ use daemon::Daemon;
use destroy::Destroy;
use eyre::Context;
use graph::Graph;
use inspect::Inspect;
use list::ListArgs;
use logs::LogsArgs;
use new::NewArgs;
use run::Run;
use runtime::Runtime;
use self_::SelfSubCommand;
use self_::Self_;
use start::Start;
use stop::Stop;
use up::Up;

/// dora-rs cli client
#[enum_dispatch(Executable)]
#[derive(Debug, clap::Subcommand)]
pub enum Command {
Check(Check),
@@ -57,11 +61,9 @@ pub enum Command {
Daemon(Daemon),
Runtime(Runtime),
Coordinator(Coordinator),
Inspect(Inspect),

Self_ {
#[clap(subcommand)]
command: SelfSubCommand,
},
Self_(Self_),
}

fn default_tracing() -> eyre::Result<()> {
@@ -77,28 +79,7 @@ fn default_tracing() -> eyre::Result<()> {
Ok(())
}

#[enum_dispatch]
pub trait Executable {
fn execute(self) -> eyre::Result<()>;
}

impl Executable for Command {
fn execute(self) -> eyre::Result<()> {
match self {
Command::Check(args) => args.execute(),
Command::Coordinator(args) => args.execute(),
Command::Graph(args) => args.execute(),
Command::Build(args) => args.execute(),
Command::New(args) => args.execute(),
Command::Run(args) => args.execute(),
Command::Up(args) => args.execute(),
Command::Destroy(args) => args.execute(),
Command::Start(args) => args.execute(),
Command::Stop(args) => args.execute(),
Command::List(args) => args.execute(),
Command::Logs(args) => args.execute(),
Command::Daemon(args) => args.execute(),
Command::Self_ { command } => command.execute(),
Command::Runtime(args) => args.execute(),
}
}
}

+ 8
- 2
binaries/cli/src/command/self_.rs View File

@@ -2,6 +2,12 @@ use super::{default_tracing, Executable};
use clap::Subcommand;
use eyre::{bail, Context};

#[derive(Debug, clap::Args)]
pub struct Self_ {
#[clap(subcommand)]
command: SelfSubCommand,
}

#[derive(Debug, Subcommand)]
/// Dora CLI self-management commands
pub enum SelfSubCommand {
@@ -19,11 +25,11 @@ pub enum SelfSubCommand {
},
}

impl Executable for SelfSubCommand {
impl Executable for Self_ {
fn execute(self) -> eyre::Result<()> {
default_tracing()?;

match self {
match self.command {
SelfSubCommand::Update { check_only } => {
println!("Checking for updates...");



+ 27
- 0
binaries/cli/src/common.rs View File

@@ -50,6 +50,33 @@ pub(crate) fn query_running_dataflows(
Ok(ids)
}

pub(crate) fn resolve_dataflow_identifier(
session: &mut TcpRequestReplyConnection,
name_or_uuid: Option<&str>,
) -> eyre::Result<Uuid> {
if let Some(uuid) = name_or_uuid.and_then(|s| Uuid::parse_str(s).ok()) {
return Ok(uuid);
}

let list = query_running_dataflows(session).wrap_err("failed to query running dataflows")?;
let active: Vec<dora_message::coordinator_to_cli::DataflowIdAndName> = list.get_active();
if let Some(name) = name_or_uuid {
let Some(dataflow) = active.iter().find(|it| it.name.as_deref() == Some(name)) else {
bail!("No dataflow with name `{name}` is running");
};
return Ok(dataflow.uuid);
}
Ok(match &active[..] {
[] => bail!("No dataflows are running"),
[entry] => entry.uuid,
_ => {
inquire::Select::new("Choose dataflow to show logs:", active)
.prompt()?
.uuid
}
})
}

pub(crate) fn connect_to_coordinator(
coordinator_addr: SocketAddr,
) -> std::io::Result<Box<TcpRequestReplyConnection>> {


+ 26
- 26
binaries/coordinator/src/lib.rs View File

@@ -625,32 +625,30 @@ async fn start_inner(
let _ = reply_sender.send(Err(err));
}
},
ControlRequest::Logs { uuid, name, node } => {
let dataflow_uuid = if let Some(uuid) = uuid {
Ok(uuid)
} else if let Some(name) = name {
resolve_name(name, &running_dataflows, &archived_dataflows)
ControlRequest::Logs { uuid, node } => {
let reply = retrieve_logs(
&running_dataflows,
&archived_dataflows,
uuid,
node,
&mut daemon_connections,
clock.new_timestamp(),
)
.await
.map(ControlRequestReply::Logs);
let _ = reply_sender.send(reply);
}
ControlRequest::Info { dataflow_uuid } => {
if let Some(dataflow) = running_dataflows.get(&dataflow_uuid) {
let _ = reply_sender.send(Ok(ControlRequestReply::DataflowInfo {
uuid: dataflow.uuid,
name: dataflow.name.clone(),
descriptor: dataflow.descriptor.clone(),
}));
} else {
Err(eyre!("No uuid"))
};

match dataflow_uuid {
Ok(uuid) => {
let reply = retrieve_logs(
&running_dataflows,
&archived_dataflows,
uuid,
node.into(),
&mut daemon_connections,
clock.new_timestamp(),
)
.await
.map(ControlRequestReply::Logs);
let _ = reply_sender.send(reply);
}
Err(err) => {
let _ = reply_sender.send(Err(err));
}
let _ = reply_sender.send(Err(eyre!(
"No running dataflow with uuid `{dataflow_uuid}`"
)));
}
}
ControlRequest::Destroy => {
@@ -1011,6 +1009,7 @@ struct RunningBuild {
struct RunningDataflow {
name: Option<String>,
uuid: Uuid,
descriptor: Descriptor,
/// The IDs of the daemons that the dataflow is running on.
daemons: BTreeSet<DaemonId>,
/// IDs of daemons that are waiting until all nodes are started.
@@ -1406,7 +1405,7 @@ async fn start_dataflow(
} = spawn_dataflow(
build_id,
session_id,
dataflow,
dataflow.clone(),
local_working_dir,
daemon_connections,
clock,
@@ -1416,6 +1415,7 @@ async fn start_dataflow(
Ok(RunningDataflow {
uuid,
name,
descriptor: dataflow,
pending_daemons: if daemons.len() > 1 {
daemons.clone()
} else {


+ 2
- 2
binaries/daemon/Cargo.toml View File

@@ -25,7 +25,7 @@ tracing = "0.1.36"
tracing-opentelemetry = { version = "0.18.0", optional = true }
futures-concurrency = "7.1.0"
serde_json = "1.0.86"
dora-core = { workspace = true, features = ["build"] }
dora-core = { workspace = true, features = ["build", "zenoh"] }
flume = "0.10.14"
dora-download = { workspace = true }
dora-tracing = { workspace = true, optional = true }
@@ -44,7 +44,7 @@ which = "5.0.0"
sysinfo = "0.30.11"
crossbeam = "0.8.4"
crossbeam-skiplist = "0.1.3"
zenoh = "1.1.1"
zenoh = { workspace = true }
url = "2.5.4"
git2 = { workspace = true }
dunce = "1.0.5"


+ 8
- 103
binaries/daemon/src/lib.rs View File

@@ -8,7 +8,7 @@ use dora_core::{
read_as_descriptor, CoreNodeKind, Descriptor, DescriptorExt, ResolvedNode, RuntimeNode,
DYNAMIC_SOURCE,
},
topics::LOCALHOST,
topics::{open_zenoh_session, zenoh_output_publish_topic, LOCALHOST},
uhlc::{self, HLC},
};
use dora_message::{
@@ -304,99 +304,9 @@ impl Daemon {
None => None,
};

let zenoh_session = match std::env::var(zenoh::Config::DEFAULT_CONFIG_PATH_ENV) {
Ok(path) => {
let zenoh_config = zenoh::Config::from_file(&path)
.map_err(|e| eyre!(e))
.wrap_err_with(|| format!("failed to read zenoh config from {path}"))?;
zenoh::open(zenoh_config)
.await
.map_err(|e| eyre!(e))
.context("failed to open zenoh session")?
}
Err(std::env::VarError::NotPresent) => {
let mut zenoh_config = zenoh::Config::default();

if let Some(addr) = coordinator_addr {
// Linkstate make it possible to connect two daemons on different network through a public daemon
// TODO: There is currently a CI/CD Error in windows linkstate.
if cfg!(not(target_os = "windows")) {
zenoh_config
.insert_json5("routing/peer", r#"{ mode: "linkstate" }"#)
.unwrap();
}

zenoh_config
.insert_json5(
"connect/endpoints",
&format!(
r#"{{ router: ["tcp/[::]:7447"], peer: ["tcp/{}:5456"] }}"#,
addr.ip()
),
)
.unwrap();
zenoh_config
.insert_json5(
"listen/endpoints",
r#"{ router: ["tcp/[::]:7447"], peer: ["tcp/[::]:5456"] }"#,
)
.unwrap();
if cfg!(target_os = "macos") {
warn!("disabling multicast on macos systems. Enable it with the ZENOH_CONFIG env variable or file");
zenoh_config
.insert_json5("scouting/multicast", r#"{ enabled: false }"#)
.unwrap();
}
};
if let Ok(zenoh_session) = zenoh::open(zenoh_config).await {
zenoh_session
} else {
warn!(
"failed to open zenoh session, retrying with default config + coordinator"
);
let mut zenoh_config = zenoh::Config::default();
// Linkstate make it possible to connect two daemons on different network through a public daemon
// TODO: There is currently a CI/CD Error in windows linkstate.
if cfg!(not(target_os = "windows")) {
zenoh_config
.insert_json5("routing/peer", r#"{ mode: "linkstate" }"#)
.unwrap();
}

if let Some(addr) = coordinator_addr {
zenoh_config
.insert_json5(
"connect/endpoints",
&format!(
r#"{{ router: ["tcp/[::]:7447"], peer: ["tcp/{}:5456"] }}"#,
addr.ip()
),
)
.unwrap();
if cfg!(target_os = "macos") {
warn!("disabling multicast on macos systems. Enable it with the ZENOH_CONFIG env variable or file");
zenoh_config
.insert_json5("scouting/multicast", r#"{ enabled: false }"#)
.unwrap();
}
}
if let Ok(zenoh_session) = zenoh::open(zenoh_config).await {
zenoh_session
} else {
warn!("failed to open zenoh session, retrying with default config");
let zenoh_config = zenoh::Config::default();
zenoh::open(zenoh_config)
.await
.map_err(|e| eyre!(e))
.context("failed to open zenoh session")?
}
}
}
Err(std::env::VarError::NotUnicode(_)) => eyre::bail!(
"{} env variable is not valid unicode",
zenoh::Config::DEFAULT_CONFIG_PATH_ENV
),
};
let zenoh_session = open_zenoh_session(coordinator_addr.map(|addr| addr.ip()))
.await
.wrap_err("failed to open zenoh session")?;
let (dora_events_tx, dora_events_rx) = mpsc::channel(5);
let daemon = Self {
logger: Logger {
@@ -1256,7 +1166,8 @@ impl Daemon {
.clone()
.wrap_err("no remote_daemon_events_tx channel")?;
let mut finished_rx = dataflow.finished_tx.subscribe();
let subscribe_topic = dataflow.output_publish_topic(output_id);
let subscribe_topic =
zenoh_output_publish_topic(dataflow.id, &output_id.0, &output_id.1);
tracing::debug!("declaring subscriber on {subscribe_topic}");
let subscriber = self
.zenoh_session
@@ -1712,7 +1623,8 @@ impl Daemon {
let publisher = match dataflow.publishers.get(output_id) {
Some(publisher) => publisher,
None => {
let publish_topic = dataflow.output_publish_topic(output_id);
let publish_topic =
zenoh_output_publish_topic(dataflow.id, &output_id.0, &output_id.1);
tracing::debug!("declaring publisher on {publish_topic}");
let publisher = self
.zenoh_session
@@ -2599,13 +2511,6 @@ impl RunningDataflow {

Ok(())
}

fn output_publish_topic(&self, output_id: &OutputId) -> String {
let network_id = "default";
let dataflow_id = self.id;
let OutputId(node_id, output_id) = output_id;
format!("dora/{network_id}/{dataflow_id}/output/{node_id}/{output_id}")
}
}

fn empty_type_info() -> ArrowTypeInfo {


+ 2
- 0
libraries/core/Cargo.toml View File

@@ -12,6 +12,7 @@ repository.workspace = true

[features]
build = ["dep:git2", "dep:url"]
zenoh = ["dep:zenoh"]

[dependencies]
dora-message = { workspace = true }
@@ -32,3 +33,4 @@ itertools = "0.14"
url = { version = "2.5.4", optional = true }
git2 = { workspace = true, optional = true }
fs_extra = "1.3.0"
zenoh = { workspace = true, optional = true }

+ 109
- 0
libraries/core/src/topics.rs View File

@@ -6,3 +6,112 @@ pub const DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT: u16 = 53291;
pub const DORA_COORDINATOR_PORT_CONTROL_DEFAULT: u16 = 6012;

pub const MANUAL_STOP: &str = "dora/stop";

#[cfg(feature = "zenoh")]
pub async fn open_zenoh_session(coordinator_addr: Option<IpAddr>) -> eyre::Result<zenoh::Session> {
use eyre::{eyre, Context};
use tracing::warn;

let zenoh_session = match std::env::var(zenoh::Config::DEFAULT_CONFIG_PATH_ENV) {
Ok(path) => {
let zenoh_config = zenoh::Config::from_file(&path)
.map_err(|e| eyre!(e))
.wrap_err_with(|| format!("failed to read zenoh config from {path}"))?;
zenoh::open(zenoh_config)
.await
.map_err(|e| eyre!(e))
.context("failed to open zenoh session")?
}
Err(std::env::VarError::NotPresent) => {
let mut zenoh_config = zenoh::Config::default();

if let Some(addr) = coordinator_addr {
// Linkstate make it possible to connect two daemons on different network through a public daemon
// TODO: There is currently a CI/CD Error in windows linkstate.
if cfg!(not(target_os = "windows")) {
zenoh_config
.insert_json5("routing/peer", r#"{ mode: "linkstate" }"#)
.unwrap();
}

zenoh_config
.insert_json5(
"connect/endpoints",
&format!(
r#"{{ router: ["tcp/[::]:7447"], peer: ["tcp/{}:5456"] }}"#,
addr
),
)
.unwrap();
zenoh_config
.insert_json5(
"listen/endpoints",
r#"{ router: ["tcp/[::]:7447"], peer: ["tcp/[::]:5456"] }"#,
)
.unwrap();
if cfg!(target_os = "macos") {
warn!("disabling multicast on macos systems. Enable it with the ZENOH_CONFIG env variable or file");
zenoh_config
.insert_json5("scouting/multicast", r#"{ enabled: false }"#)
.unwrap();
}
};
if let Ok(zenoh_session) = zenoh::open(zenoh_config).await {
zenoh_session
} else {
warn!("failed to open zenoh session, retrying with default config + coordinator");
let mut zenoh_config = zenoh::Config::default();
// Linkstate make it possible to connect two daemons on different network through a public daemon
// TODO: There is currently a CI/CD Error in windows linkstate.
if cfg!(not(target_os = "windows")) {
zenoh_config
.insert_json5("routing/peer", r#"{ mode: "linkstate" }"#)
.unwrap();
}

if let Some(addr) = coordinator_addr {
zenoh_config
.insert_json5(
"connect/endpoints",
&format!(
r#"{{ router: ["tcp/[::]:7447"], peer: ["tcp/{}:5456"] }}"#,
addr
),
)
.unwrap();
if cfg!(target_os = "macos") {
warn!("disabling multicast on macos systems. Enable it with the ZENOH_CONFIG env variable or file");
zenoh_config
.insert_json5("scouting/multicast", r#"{ enabled: false }"#)
.unwrap();
}
}
if let Ok(zenoh_session) = zenoh::open(zenoh_config).await {
zenoh_session
} else {
warn!("failed to open zenoh session, retrying with default config");
let zenoh_config = zenoh::Config::default();
zenoh::open(zenoh_config)
.await
.map_err(|e| eyre!(e))
.context("failed to open zenoh session")?
}
}
}
Err(std::env::VarError::NotUnicode(_)) => eyre::bail!(
"{} env variable is not valid unicode",
zenoh::Config::DEFAULT_CONFIG_PATH_ENV
),
};
Ok(zenoh_session)
}

#[cfg(feature = "zenoh")]
pub fn zenoh_output_publish_topic(
dataflow_id: uuid::Uuid,
node_id: &dora_message::id::NodeId,
output_id: &dora_message::id::DataId,
) -> String {
let network_id = "default";
format!("dora/{network_id}/{dataflow_id}/output/{node_id}/{output_id}")
}

+ 5
- 3
libraries/message/src/cli_to_coordinator.rs View File

@@ -64,12 +64,14 @@ pub enum ControlRequest {
grace_duration: Option<Duration>,
},
Logs {
uuid: Option<Uuid>,
name: Option<String>,
node: String,
uuid: Uuid,
node: NodeId,
},
Destroy,
List,
Info {
dataflow_uuid: Uuid,
},
DaemonConnected,
ConnectedMachines,
LogSubscribe {


+ 6
- 1
libraries/message/src/coordinator_to_cli.rs View File

@@ -6,7 +6,7 @@ use std::{
use uuid::Uuid;

pub use crate::common::{LogLevel, LogMessage, NodeError, NodeErrorCause, NodeExitStatus};
use crate::{common::DaemonId, id::NodeId, BuildId};
use crate::{common::DaemonId, descriptor::Descriptor, id::NodeId, BuildId};

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub enum ControlRequestReply {
@@ -33,6 +33,11 @@ pub enum ControlRequestReply {
result: DataflowResult,
},
DataflowList(DataflowList),
DataflowInfo {
uuid: Uuid,
name: Option<String>,
descriptor: Descriptor,
},
DestroyOk,
DaemonConnected(bool),
ConnectedDaemons(BTreeSet<DaemonId>),


Loading…
Cancel
Save