Browse Source

Merge branch 'main' of https://github.com/dora-rs/dora into update_dora_new

tags/v0.3.5-rc0
XxChang 1 year ago
parent
commit
d9ed1dc251
26 changed files with 825 additions and 125 deletions
  1. +12
    -1
      .github/workflows/ci.yml
  2. +9
    -0
      Cargo.lock
  3. +1
    -0
      Cargo.toml
  4. +8
    -2
      apis/python/node/src/lib.rs
  5. +1
    -0
      apis/rust/node/Cargo.toml
  6. +22
    -6
      apis/rust/node/src/daemon_connection/tcp.rs
  7. +53
    -7
      apis/rust/node/src/node/mod.rs
  8. +36
    -11
      binaries/cli/src/main.rs
  9. +2
    -0
      binaries/cli/src/up.rs
  10. +1
    -1
      binaries/coordinator/src/lib.rs
  11. +115
    -46
      binaries/daemon/src/lib.rs
  12. +138
    -0
      binaries/daemon/src/local_listener.rs
  13. +8
    -3
      binaries/daemon/src/node_communication/mod.rs
  14. +31
    -25
      binaries/daemon/src/spawn.rs
  15. +10
    -4
      examples/multiple-daemons/run.rs
  16. +16
    -0
      examples/python-dataflow/dataflow_dynamic.yml
  17. +97
    -0
      examples/python-dataflow/plot_dynamic.py
  18. +31
    -0
      examples/rust-dataflow/dataflow_dynamic.yml
  19. +10
    -0
      examples/rust-dataflow/sink-dynamic/Cargo.toml
  20. +39
    -0
      examples/rust-dataflow/sink-dynamic/src/main.rs
  21. +32
    -3
      libraries/core/src/daemon_messages.rs
  22. +47
    -0
      libraries/core/src/descriptor/mod.rs
  23. +2
    -1
      libraries/core/src/descriptor/validate.rs
  24. +9
    -1
      libraries/core/src/topics.rs
  25. +39
    -14
      libraries/extensions/telemetry/tracing/src/lib.rs
  26. +56
    -0
      tool_nodes/dora-record/README.md

+ 12
- 1
.github/workflows/ci.yml View File

@@ -289,8 +289,14 @@ jobs:
dora start dataflow.yml --name ci-rust-test
sleep 10
dora stop --name ci-rust-test --grace-duration 5s
cd ..
dora build examples/rust-dataflow/dataflow_dynamic.yml
dora start examples/rust-dataflow/dataflow_dynamic.yml --name ci-rust-dynamic
cargo run -p rust-dataflow-example-sink-dynamic
sleep 5
dora stop --name ci-rust-dynamic --grace-duration 5s
dora destroy
- name: "Test CLI (Python)"
timeout-minutes: 30
# fail-fast by using bash shell explictly
@@ -312,6 +318,11 @@ jobs:
dora start dataflow.yml --name ci-python-test
sleep 10
dora stop --name ci-python-test --grace-duration 5s
pip install opencv-python
dora start ../examples/python-dataflow/dataflow_dynamic.yml --name ci-python-dynamic
python ../examples/python-dataflow/plot_dynamic.py
sleep 5
dora stop --name ci-python-test --grace-duration 5s
dora destroy

- name: "Test CLI (C)"


+ 9
- 0
Cargo.lock View File

@@ -2407,6 +2407,7 @@ dependencies = [
"futures",
"futures-concurrency",
"futures-timer",
"serde_json",
"serde_yaml 0.8.26",
"shared-memory-server",
"shared_memory_extended",
@@ -8074,6 +8075,14 @@ dependencies = [
"eyre",
]

[[package]]
name = "rust-dataflow-example-sink-dynamic"
version = "0.3.4"
dependencies = [
"dora-node-api",
"eyre",
]

[[package]]
name = "rust-dataflow-example-status-node"
version = "0.3.4"


+ 1
- 0
Cargo.toml View File

@@ -16,6 +16,7 @@ members = [
"examples/rust-dataflow/node",
"examples/rust-dataflow/status-node",
"examples/rust-dataflow/sink",
"examples/rust-dataflow/sink-dynamic",
"examples/rust-ros2-dataflow/node",
"examples/benchmark/node",
"examples/benchmark/sink",


+ 8
- 2
apis/python/node/src/lib.rs View File

@@ -3,6 +3,7 @@
use std::time::Duration;

use arrow::pyarrow::{FromPyArrow, ToPyArrow};
use dora_node_api::dora_core::config::NodeId;
use dora_node_api::merged::{MergeExternalSend, MergedEvent};
use dora_node_api::{DoraNode, EventStream};
use dora_operator_api_python::{pydict_to_metadata, PyEvent};
@@ -32,8 +33,13 @@ pub struct Node {
#[pymethods]
impl Node {
#[new]
pub fn new() -> eyre::Result<Self> {
let (node, events) = DoraNode::init_from_env()?;
pub fn new(node_id: Option<String>) -> eyre::Result<Self> {
let (node, events) = if let Some(node_id) = node_id {
DoraNode::init_flexible(NodeId::from(node_id))
.context("Could not setup node from node id. Make sure to have a running dataflow with this dynamic node")?
} else {
DoraNode::init_from_env().context("Couldn not initiate node from environment variable. For dynamic node, please add a node id in the initialization function.")?
};

Ok(Node {
events: Events::Dora(events),


+ 1
- 0
apis/rust/node/Cargo.toml View File

@@ -26,6 +26,7 @@ futures-concurrency = "7.3.0"
futures-timer = "3.0.2"
dora-arrow-convert = { workspace = true }
aligned-vec = "0.5.0"
serde_json = "1.0.86"

[dev-dependencies]
tokio = { version = "1.24.2", features = ["rt"] }

+ 22
- 6
apis/rust/node/src/daemon_connection/tcp.rs View File

@@ -5,13 +5,21 @@ use std::{
net::TcpStream,
};

enum Serializer {
Bincode,
SerdeJson,
}
pub fn request(
connection: &mut TcpStream,
request: &Timestamped<DaemonRequest>,
) -> eyre::Result<DaemonReply> {
send_message(connection, request)?;
if request.inner.expects_tcp_reply() {
receive_reply(connection)
if request.inner.expects_tcp_bincode_reply() {
receive_reply(connection, Serializer::Bincode)
.and_then(|reply| reply.ok_or_else(|| eyre!("server disconnected unexpectedly")))
// Use serde json for message with variable length
} else if request.inner.expects_tcp_json_reply() {
receive_reply(connection, Serializer::SerdeJson)
.and_then(|reply| reply.ok_or_else(|| eyre!("server disconnected unexpectedly")))
} else {
Ok(DaemonReply::Empty)
@@ -27,7 +35,10 @@ fn send_message(
Ok(())
}

fn receive_reply(connection: &mut TcpStream) -> eyre::Result<Option<DaemonReply>> {
fn receive_reply(
connection: &mut TcpStream,
serializer: Serializer,
) -> eyre::Result<Option<DaemonReply>> {
let raw = match tcp_receive(connection) {
Ok(raw) => raw,
Err(err) => match err.kind() {
@@ -43,9 +54,14 @@ fn receive_reply(connection: &mut TcpStream) -> eyre::Result<Option<DaemonReply>
}
},
};
bincode::deserialize(&raw)
.wrap_err("failed to deserialize DaemonReply")
.map(Some)
match serializer {
Serializer::Bincode => bincode::deserialize(&raw)
.wrap_err("failed to deserialize DaemonReply")
.map(Some),
Serializer::SerdeJson => serde_json::from_slice(&raw)
.wrap_err("failed to deserialize DaemonReply")
.map(Some),
}
}

fn tcp_send(connection: &mut (impl Write + Unpin), message: &[u8]) -> std::io::Result<()> {


+ 53
- 7
apis/rust/node/src/node/mod.rs View File

@@ -1,4 +1,4 @@
use crate::EventStream;
use crate::{daemon_connection::DaemonChannel, EventStream};

use self::{
arrow_utils::{copy_array_into_sample, required_data_size},
@@ -9,10 +9,12 @@ use aligned_vec::{AVec, ConstAlign};
use arrow::array::Array;
use dora_core::{
config::{DataId, NodeId, NodeRunConfig},
daemon_messages::{DataMessage, DataflowId, DropToken, NodeConfig},
daemon_messages::{DaemonRequest, DataMessage, DataflowId, DropToken, NodeConfig, Timestamped},
descriptor::Descriptor,
message::{uhlc, ArrowTypeInfo, Metadata, MetadataParameters},
topics::{DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, LOCALHOST},
};

use eyre::{bail, WrapErr};
use shared_memory_extended::{Shmem, ShmemConf};
use std::{
@@ -21,6 +23,7 @@ use std::{
sync::Arc,
time::Duration,
};
use tracing::info;

#[cfg(feature = "tracing")]
use dora_tracing::set_up_tracing;
@@ -56,8 +59,9 @@ impl DoraNode {
///
pub fn init_from_env() -> eyre::Result<(Self, EventStream)> {
let node_config: NodeConfig = {
let raw = std::env::var("DORA_NODE_CONFIG")
.wrap_err("env variable DORA_NODE_CONFIG must be set")?;
let raw = std::env::var("DORA_NODE_CONFIG").wrap_err(
"env variable DORA_NODE_CONFIG must be set. Are you sure your using `dora start`?",
)?;
serde_yaml::from_str(&raw).context("failed to deserialize operator config")?
};
#[cfg(feature = "tracing")]
@@ -66,6 +70,49 @@ impl DoraNode {
Self::init(node_config)
}

/// Initiate a node from a dataflow id and a node id.
///
/// ```no_run
/// use dora_node_api::DoraNode;
/// use dora_node_api::dora_core::config::NodeId;
///
/// let (mut node, mut events) = DoraNode::init_from_node_id(NodeId::from("plot".to_string())).expect("Could not init node plot");
/// ```
///
pub fn init_from_node_id(node_id: NodeId) -> eyre::Result<(Self, EventStream)> {
// Make sure that the node is initialized outside of dora start.
let daemon_address = (LOCALHOST, DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT).into();

let mut channel =
DaemonChannel::new_tcp(daemon_address).context("Could not connect to the daemon")?;
let clock = Arc::new(uhlc::HLC::default());

let reply = channel
.request(&Timestamped {
inner: DaemonRequest::NodeConfig { node_id },
timestamp: clock.new_timestamp(),
})
.wrap_err("failed to request node config from daemon")?;
match reply {
dora_core::daemon_messages::DaemonReply::NodeConfig {
result: Ok(node_config),
} => Self::init(node_config),
dora_core::daemon_messages::DaemonReply::NodeConfig { result: Err(error) } => {
bail!("failed to get node config from daemon: {error}")
}
_ => bail!("unexpected reply from daemon"),
}
}

pub fn init_flexible(node_id: NodeId) -> eyre::Result<(Self, EventStream)> {
if std::env::var("DORA_NODE_CONFIG").is_ok() {
info!("Skipping {node_id} specified within the node initialization in favor of `DORA_NODE_CONFIG` specified by `dora start`");
Self::init_from_env()
} else {
Self::init_from_node_id(node_id)
}
}

#[tracing::instrument]
pub fn init(node_config: NodeConfig) -> eyre::Result<(Self, EventStream)> {
let NodeConfig {
@@ -74,8 +121,8 @@ impl DoraNode {
run_config,
daemon_communication,
dataflow_descriptor,
dynamic: _,
} = node_config;

let clock = Arc::new(uhlc::HLC::default());

let event_stream =
@@ -91,13 +138,12 @@ impl DoraNode {
let node = Self {
id: node_id,
dataflow_id,
node_config: run_config,
node_config: run_config.clone(),
control_channel,
clock,
sent_out_shared_memory: HashMap::new(),
drop_stream,
cache: VecDeque::new(),

dataflow_descriptor,
};
Ok((node, event_stream))


+ 36
- 11
binaries/cli/src/main.rs View File

@@ -6,12 +6,13 @@ use dora_core::{
descriptor::Descriptor,
topics::{
ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_CONTROL_DEFAULT,
DORA_COORDINATOR_PORT_DEFAULT,
DORA_COORDINATOR_PORT_DEFAULT, DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT,
},
};
use dora_daemon::Daemon;
#[cfg(feature = "tracing")]
use dora_tracing::set_up_tracing;
use dora_tracing::set_up_tracing_opts;
use duration_str::parse;
use eyre::{bail, Context};
use std::net::SocketAddr;
@@ -174,14 +175,20 @@ enum Command {
/// Unique identifier for the machine (required for distributed dataflows)
#[clap(long)]
machine_id: Option<String>,
/// The IP address and port this daemon will bind to.
/// The inter daemon IP address and port this daemon will bind to.
#[clap(long, default_value_t = SocketAddr::new(LISTEN_WILDCARD, 0))]
addr: SocketAddr,
inter_daemon_addr: SocketAddr,
/// Local listen port for event such as dynamic node.
#[clap(long, default_value_t = DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT)]
local_listen_port: u16,
/// Address and port number of the dora coordinator
#[clap(long, default_value_t = SocketAddr::new(LOCALHOST, DORA_COORDINATOR_PORT_DEFAULT))]
coordinator_addr: SocketAddr,
#[clap(long, hide = true)]
run_dataflow: Option<PathBuf>,
/// Suppresses all log output to stdout.
#[clap(long)]
quiet: bool,
},
/// Run runtime
Runtime,
@@ -199,6 +206,9 @@ enum Command {
/// Port number to bind to for control communication
#[clap(long, default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)]
control_port: u16,
/// Suppresses all log output to stdout.
#[clap(long)]
quiet: bool,
},
}

@@ -243,15 +253,25 @@ fn run() -> eyre::Result<()> {
let args = Args::parse();

#[cfg(feature = "tracing")]
match args.command {
Command::Daemon { .. } => {
set_up_tracing("dora-daemon").context("failed to set up tracing subscriber")?;
match &args.command {
Command::Daemon {
quiet, machine_id, ..
} => {
let name = "dora-daemon";
let filename = machine_id
.as_ref()
.map(|id| format!("{name}-{id}"))
.unwrap_or(name.to_string());
set_up_tracing_opts(name, !quiet, Some(&filename))
.context("failed to set up tracing subscriber")?;
}
Command::Runtime => {
// Do not set the runtime in the cli.
}
Command::Coordinator { .. } => {
set_up_tracing("dora-coordinator").context("failed to set up tracing subscriber")?;
Command::Coordinator { quiet, .. } => {
let name = "dora-coordinator";
set_up_tracing_opts(name, !quiet, Some(name))
.context("failed to set up tracing subscriber")?;
}
_ => {
set_up_tracing("dora-cli").context("failed to set up tracing subscriber")?;
@@ -396,6 +416,7 @@ fn run() -> eyre::Result<()> {
port,
control_interface,
control_port,
quiet,
} => {
let rt = Builder::new_multi_thread()
.enable_all()
@@ -407,16 +428,20 @@ fn run() -> eyre::Result<()> {
let (port, task) =
dora_coordinator::start(bind, bind_control, futures::stream::empty::<Event>())
.await?;
println!("Listening for incoming daemon connection on {port}");
if !quiet {
println!("Listening for incoming daemon connection on {port}");
}
task.await
})
.context("failed to run dora-coordinator")?
}
Command::Daemon {
coordinator_addr,
addr,
inter_daemon_addr,
local_listen_port,
machine_id,
run_dataflow,
quiet: _,
} => {
let rt = Builder::new_multi_thread()
.enable_all()
@@ -439,7 +464,7 @@ fn run() -> eyre::Result<()> {
if coordinator_addr.ip() == LOCALHOST {
tracing::info!("Starting in local mode");
}
Daemon::run(coordinator_addr, machine_id.unwrap_or_default(), addr).await
Daemon::run(coordinator_addr, machine_id.unwrap_or_default(), inter_daemon_addr, local_listen_port).await
}
}
})


+ 2
- 0
binaries/cli/src/up.rs View File

@@ -85,6 +85,7 @@ fn start_coordinator() -> eyre::Result<()> {
let mut cmd =
Command::new(std::env::current_exe().wrap_err("failed to get current executable path")?);
cmd.arg("coordinator");
cmd.arg("--quiet");
cmd.spawn().wrap_err("failed to run `dora coordinator`")?;

println!("started dora coordinator");
@@ -96,6 +97,7 @@ fn start_daemon() -> eyre::Result<()> {
let mut cmd =
Command::new(std::env::current_exe().wrap_err("failed to get current executable path")?);
cmd.arg("daemon");
cmd.arg("--quiet");
cmd.spawn().wrap_err("failed to run `dora daemon`")?;

println!("started dora daemon");


+ 1
- 1
binaries/coordinator/src/lib.rs View File

@@ -655,7 +655,7 @@ async fn send_heartbeat_message(
inner: DaemonCoordinatorEvent::Heartbeat,
timestamp,
})
.unwrap();
.context("Could not serialize heartbeat message")?;

tcp_send(connection, &message)
.await


+ 115
- 46
binaries/daemon/src/lib.rs View File

@@ -2,9 +2,13 @@ use aligned_vec::{AVec, ConstAlign};
use coordinator::CoordinatorEvent;
use dora_core::config::{Input, OperatorId};
use dora_core::coordinator_messages::CoordinatorRequest;
use dora_core::daemon_messages::{DataMessage, InterDaemonEvent, Timestamped};
use dora_core::daemon_messages::{
DataMessage, DynamicNodeEvent, InterDaemonEvent, NodeConfig, Timestamped,
};
use dora_core::descriptor::runtime_node_inputs;
use dora_core::message::uhlc::{self, HLC};
use dora_core::message::{ArrowTypeInfo, Metadata, MetadataParameters};
use dora_core::topics::LOCALHOST;
use dora_core::{
config::{DataId, InputMapping, NodeId},
coordinator_messages::DaemonEvent,
@@ -15,10 +19,11 @@ use dora_core::{
descriptor::{CoreNodeKind, Descriptor, ResolvedNode},
};

use eyre::{bail, eyre, Context, ContextCompat};
use eyre::{bail, eyre, Context, ContextCompat, Result};
use futures::{future, stream, FutureExt, TryFutureExt};
use futures_concurrency::stream::Merge;
use inter_daemon::InterDaemonConnection;
use local_listener::DynamicNodeEventWrapper;
use pending::PendingNodes;
use shared_memory_server::ShmemConf;
use std::sync::Arc;
@@ -45,6 +50,7 @@ use uuid::{NoContext, Timestamp, Uuid};

mod coordinator;
mod inter_daemon;
mod local_listener;
mod log;
mod node_communication;
mod pending;
@@ -81,16 +87,18 @@ impl Daemon {
pub async fn run(
coordinator_addr: SocketAddr,
machine_id: String,
bind_addr: SocketAddr,
inter_daemon_addr: SocketAddr,
local_listen_port: u16,
) -> eyre::Result<()> {
let clock = Arc::new(HLC::default());

let ctrlc_events = set_up_ctrlc_handler(clock.clone())?;

// spawn listen loop
// spawn inter daemon listen loop
let (events_tx, events_rx) = flume::bounded(10);
let listen_port =
inter_daemon::spawn_listener_loop(bind_addr, machine_id.clone(), events_tx).await?;
inter_daemon::spawn_listener_loop(inter_daemon_addr, machine_id.clone(), events_tx)
.await?;
let daemon_events = events_rx.into_stream().map(|e| Timestamped {
inner: Event::Daemon(e.inner),
timestamp: e.timestamp,
@@ -111,8 +119,26 @@ impl Daemon {
},
);

// Spawn local listener loop
let (events_tx, events_rx) = flume::bounded(10);
let _listen_port = local_listener::spawn_listener_loop(
(LOCALHOST, local_listen_port).into(),
machine_id.clone(),
events_tx,
)
.await?;
let dynamic_node_events = events_rx.into_stream().map(|e| Timestamped {
inner: Event::DynamicNode(e.inner),
timestamp: e.timestamp,
});
Self::run_general(
(coordinator_events, ctrlc_events, daemon_events).merge(),
(
coordinator_events,
ctrlc_events,
daemon_events,
dynamic_node_events,
)
.merge(),
Some(coordinator_addr),
machine_id,
None,
@@ -276,6 +302,7 @@ impl Daemon {
RunStatus::Continue => {}
RunStatus::Exit => break,
},
Event::DynamicNode(event) => self.handle_dynamic_node_event(event).await?,
Event::HeartbeatInterval => {
if let Some(connection) = &mut self.coordinator_connection {
let msg = serde_json::to_vec(&Timestamped {
@@ -437,7 +464,6 @@ impl Daemon {
.running
.get_mut(&dataflow_id)
.wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"))?;
// .stop_all(&self.clock.clone(), grace_duration);

let reply = DaemonCoordinatorReply::StopResult(Ok(()));
let _ = reply_tx
@@ -599,10 +625,8 @@ impl Daemon {
.await
.wrap_err_with(|| format!("failed to spawn node `{node_id}`"))
{
Ok(pid) => {
dataflow
.running_nodes
.insert(node_id.clone(), RunningNode { pid });
Ok(running_node) => {
dataflow.running_nodes.insert(node_id, running_node);
}
Err(err) => {
tracing::error!("{err:?}");
@@ -624,6 +648,70 @@ impl Daemon {
Ok(())
}

async fn handle_dynamic_node_event(
&mut self,
event: DynamicNodeEventWrapper,
) -> eyre::Result<()> {
match event {
DynamicNodeEventWrapper {
event: DynamicNodeEvent::NodeConfig { node_id },
reply_tx,
} => {
let number_node_id = self
.running
.iter()
.filter(|(_id, dataflow)| dataflow.running_nodes.contains_key(&node_id))
.count();

let node_config = match number_node_id {
2.. => {
let _ = reply_tx.send(Some(DaemonReply::NodeConfig {
result: Err(format!(
"multiple dataflows contains dynamic node id {}. Please only have one running dataflow with the specified node id if you want to use dynamic node",
node_id
)
.to_string()),
}));
return Ok(());
}
1 => self
.running
.iter()
.filter(|(_id, dataflow)| dataflow.running_nodes.contains_key(&node_id))
.map(|(id, dataflow)| -> Result<NodeConfig> {
let node_config = dataflow
.running_nodes
.get(&node_id)
.context("no node with ID `{node_id}` within the given dataflow")?
.node_config
.clone();
if !node_config.dynamic {
bail!("node with ID `{node_id}` in {id} is not dynamic");
}
Ok(node_config)
})
.next()
.context("no node with ID `{node_id}`")?
.context("failed to get dynamic node config within given dataflow")?,
0 => {
let _ = reply_tx.send(Some(DaemonReply::NodeConfig {
result: Err("no node with ID `{node_id}`".to_string()),
}));
return Ok(());
}
};

let reply = DaemonReply::NodeConfig {
result: Ok(node_config),
};
let _ = reply_tx.send(Some(reply)).map_err(|_| {
error!("could not send node info reply from daemon to coordinator")
});
Ok(())
}
}
}

async fn handle_node_event(
&mut self,
event: DaemonNodeEvent,
@@ -920,7 +1008,11 @@ impl Daemon {
.await?;

dataflow.running_nodes.remove(node_id);
if dataflow.running_nodes.is_empty() {
if dataflow
.running_nodes
.iter()
.all(|(_id, n)| n.node_config.dynamic)
{
let result = match self.dataflow_errors.get(&dataflow.id) {
None => Ok(()),
Some(errors) => {
@@ -1227,33 +1319,6 @@ fn node_inputs(node: &ResolvedNode) -> BTreeMap<DataId, Input> {
}
}

fn runtime_node_inputs(n: &dora_core::descriptor::RuntimeNode) -> BTreeMap<DataId, Input> {
n.operators
.iter()
.flat_map(|operator| {
operator.config.inputs.iter().map(|(input_id, mapping)| {
(
DataId::from(format!("{}/{input_id}", operator.id)),
mapping.clone(),
)
})
})
.collect()
}

fn runtime_node_outputs(n: &dora_core::descriptor::RuntimeNode) -> BTreeSet<DataId> {
n.operators
.iter()
.flat_map(|operator| {
operator
.config
.outputs
.iter()
.map(|output_id| DataId::from(format!("{}/{output_id}", operator.id)))
})
.collect()
}

async fn send_input_closed_events<F>(
dataflow: &mut RunningDataflow,
inter_daemon_connections: &mut BTreeMap<String, InterDaemonConnection>,
@@ -1330,7 +1395,8 @@ fn close_input(

#[derive(Debug, Clone)]
struct RunningNode {
pid: u32,
pid: Option<u32>,
node_config: NodeConfig,
}

pub struct RunningDataflow {
@@ -1444,12 +1510,14 @@ impl RunningDataflow {
system.refresh_processes();

for (node, node_details) in running_nodes.iter() {
if let Some(process) = system.process(Pid::from(node_details.pid as usize)) {
process.kill();
warn!(
"{node} was killed due to not stopping within the {:#?} grace period",
duration
)
if let Some(pid) = node_details.pid {
if let Some(process) = system.process(Pid::from(pid as usize)) {
process.kill();
warn!(
"{node} was killed due to not stopping within the {:#?} grace period",
duration
)
}
}
}
});
@@ -1515,6 +1583,7 @@ pub enum Event {
Coordinator(CoordinatorEvent),
Daemon(InterDaemonEvent),
Dora(DoraEvent),
DynamicNode(DynamicNodeEventWrapper),
HeartbeatInterval,
CtrlC,
}


+ 138
- 0
binaries/daemon/src/local_listener.rs View File

@@ -0,0 +1,138 @@
use crate::tcp_utils::{tcp_receive, tcp_send};
use dora_core::daemon_messages::{DaemonReply, DaemonRequest, DynamicNodeEvent, Timestamped};
use eyre::Context;
use std::{io::ErrorKind, net::SocketAddr};
use tokio::{
net::{TcpListener, TcpStream},
sync::oneshot,
};

#[derive(Debug)]
pub struct DynamicNodeEventWrapper {
pub event: DynamicNodeEvent,
pub reply_tx: oneshot::Sender<Option<DaemonReply>>,
}

pub async fn spawn_listener_loop(
bind: SocketAddr,
machine_id: String,
events_tx: flume::Sender<Timestamped<DynamicNodeEventWrapper>>,
) -> eyre::Result<u16> {
let socket = match TcpListener::bind(bind).await {
Ok(socket) => socket,
Err(err) => {
return Err(eyre::Report::new(err).wrap_err("failed to create local TCP listener"))
}
};
let listen_port = socket
.local_addr()
.wrap_err("failed to get local addr of socket")?
.port();

tokio::spawn(async move {
listener_loop(socket, events_tx).await;
tracing::debug!("Local listener loop finished for machine `{machine_id}`");
});

Ok(listen_port)
}

async fn listener_loop(
listener: TcpListener,
events_tx: flume::Sender<Timestamped<DynamicNodeEventWrapper>>,
) {
loop {
match listener
.accept()
.await
.wrap_err("failed to accept new connection")
{
Err(err) => {
tracing::info!("{err}");
}
Ok((connection, _)) => {
tokio::spawn(handle_connection_loop(connection, events_tx.clone()));
}
}
}
}

async fn handle_connection_loop(
mut connection: TcpStream,
events_tx: flume::Sender<Timestamped<DynamicNodeEventWrapper>>,
) {
if let Err(err) = connection.set_nodelay(true) {
tracing::warn!("failed to set nodelay for connection: {err}");
}

loop {
match receive_message(&mut connection).await {
Ok(Some(Timestamped {
inner: DaemonRequest::NodeConfig { node_id },
timestamp,
})) => {
let (reply_tx, reply_rx) = oneshot::channel();
if events_tx
.send_async(Timestamped {
inner: DynamicNodeEventWrapper {
event: DynamicNodeEvent::NodeConfig { node_id },
reply_tx,
},
timestamp,
})
.await
.is_err()
{
break;
}
let Ok(reply) = reply_rx.await else {
tracing::warn!("daemon sent no reply");
continue;
};
if let Some(reply) = reply {
let serialized = match serde_json::to_vec(&reply)
.wrap_err("failed to serialize DaemonReply")
{
Ok(r) => r,
Err(err) => {
tracing::error!("{err:?}");
continue;
}
};
if let Err(err) = tcp_send(&mut connection, &serialized).await {
tracing::warn!("failed to send reply: {err}");
continue;
};
}
}
Ok(None) => break,
Err(err) => {
tracing::warn!("{err:?}");
break;
}
_ => tracing::warn!(
"Unexpected Daemon Request that is not yet by Additional local listener controls"
),
}
}
}

async fn receive_message(
connection: &mut TcpStream,
) -> eyre::Result<Option<Timestamped<DaemonRequest>>> {
let raw = match tcp_receive(connection).await {
Ok(raw) => raw,
Err(err) => match err.kind() {
ErrorKind::UnexpectedEof
| ErrorKind::ConnectionAborted
| ErrorKind::ConnectionReset => return Ok(None),
_other => {
return Err(err)
.context("unexpected I/O error while trying to receive DaemonRequest")
}
},
};
bincode::deserialize(&raw)
.wrap_err("failed to deserialize DaemonRequest")
.map(Some)
}

+ 8
- 3
binaries/daemon/src/node_communication/mod.rs View File

@@ -6,6 +6,7 @@ use dora_core::{
Timestamped,
},
message::uhlc,
topics::LOCALHOST,
};
use eyre::{eyre, Context};
use futures::{future, task, Future};
@@ -13,7 +14,6 @@ use shared_memory_server::{ShmemConf, ShmemServer};
use std::{
collections::{BTreeMap, VecDeque},
mem,
net::Ipv4Addr,
sync::Arc,
task::Poll,
};
@@ -39,8 +39,7 @@ pub async fn spawn_listener_loop(
) -> eyre::Result<DaemonCommunication> {
match config {
LocalCommunicationConfig::Tcp => {
let localhost = Ipv4Addr::new(127, 0, 0, 1);
let socket = match TcpListener::bind((localhost, 0)).await {
let socket = match TcpListener::bind((LOCALHOST, 0)).await {
Ok(socket) => socket,
Err(err) => {
return Err(
@@ -350,6 +349,12 @@ impl Listener {
.await
.wrap_err("failed to send register reply")?;
}
DaemonRequest::NodeConfig { .. } => {
let reply = DaemonReply::Result(Err("unexpected node config message".into()));
self.send_reply(reply, connection)
.await
.wrap_err("failed to send register reply")?;
}
DaemonRequest::OutputsDone => {
let (reply_sender, reply) = oneshot::channel();
self.process_daemon_event(


+ 31
- 25
binaries/daemon/src/spawn.rs View File

@@ -1,15 +1,15 @@
use crate::{
log, node_communication::spawn_listener_loop, node_inputs, runtime_node_inputs,
runtime_node_outputs, DoraEvent, Event, NodeExitStatus, OutputId,
log, node_communication::spawn_listener_loop, node_inputs, DoraEvent, Event, NodeExitStatus,
OutputId, RunningNode,
};
use aligned_vec::{AVec, ConstAlign};
use dora_arrow_convert::IntoArrow;
use dora_core::{
config::{DataId, NodeRunConfig},
config::DataId,
daemon_messages::{DataMessage, DataflowId, NodeConfig, RuntimeConfig, Timestamped},
descriptor::{
resolve_path, source_is_url, Descriptor, OperatorDefinition, OperatorSource, PythonSource,
ResolvedNode, SHELL_SOURCE,
ResolvedNode, DYNAMIC_SOURCE, SHELL_SOURCE,
},
get_python_path,
message::uhlc::HLC,
@@ -42,7 +42,7 @@ pub async fn spawn_node(
daemon_tx: mpsc::Sender<Timestamped<Event>>,
dataflow_descriptor: Descriptor,
clock: Arc<HLC>,
) -> eyre::Result<u32> {
) -> eyre::Result<RunningNode> {
let node_id = node.id.clone();
tracing::debug!("Spawning node `{dataflow_id}/{node_id}`");

@@ -63,9 +63,24 @@ pub async fn spawn_node(
.send_stdout_as()
.context("Could not resolve `send_stdout_as` configuration")?;

let node_config = NodeConfig {
dataflow_id,
node_id: node_id.clone(),
run_config: node.kind.run_config(),
daemon_communication,
dataflow_descriptor,
dynamic: node.kind.dynamic(),
};

let mut child = match node.kind {
dora_core::descriptor::CoreNodeKind::Custom(n) => {
let mut command = match n.source.as_str() {
DYNAMIC_SOURCE => {
return Ok(RunningNode {
pid: None,
node_config,
});
}
SHELL_SOURCE => {
if cfg!(target_os = "windows") {
let mut cmd = tokio::process::Command::new("cmd");
@@ -117,17 +132,11 @@ pub async fn spawn_node(

command.current_dir(working_dir);
command.stdin(Stdio::null());
let node_config = NodeConfig {
dataflow_id,
node_id: node_id.clone(),
run_config: n.run_config.clone(),
daemon_communication,
dataflow_descriptor,
};

command.env(
"DORA_NODE_CONFIG",
serde_yaml::to_string(&node_config).wrap_err("failed to serialize node config")?,
serde_yaml::to_string(&node_config.clone())
.wrap_err("failed to serialize node config")?,
);
// Injecting the env variable defined in the `yaml` into
// the node runtime.
@@ -223,16 +232,7 @@ pub async fn spawn_node(
command.current_dir(working_dir);

let runtime_config = RuntimeConfig {
node: NodeConfig {
dataflow_id,
node_id: node_id.clone(),
run_config: NodeRunConfig {
inputs: runtime_node_inputs(&n),
outputs: runtime_node_outputs(&n),
},
daemon_communication,
dataflow_descriptor,
},
node: node_config.clone(),
operators: n.operators,
};
command.env(
@@ -270,7 +270,13 @@ pub async fn spawn_node(
.expect("Failed to create log file");
let mut child_stdout =
tokio::io::BufReader::new(child.stdout.take().expect("failed to take stdout"));
let pid = child.id().unwrap();
let pid = child.id().context(
"Could not get the pid for the just spawned node and indicate that there is an error",
)?;
let running_node = RunningNode {
pid: Some(pid),
node_config,
};
let stdout_tx = tx.clone();

// Stdout listener stream
@@ -454,5 +460,5 @@ pub async fn spawn_node(
.send(())
.map_err(|_| error!("Could not inform that log file thread finished"));
});
Ok(pid)
Ok(running_node)
}

+ 10
- 4
examples/multiple-daemons/run.rs View File

@@ -52,8 +52,8 @@ async fn main() -> eyre::Result<()> {
)
.await?;
let coordinator_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), coordinator_port);
let daemon_a = run_daemon(coordinator_addr.to_string(), "A");
let daemon_b = run_daemon(coordinator_addr.to_string(), "B");
let daemon_a = run_daemon(coordinator_addr.to_string(), "A", 9843); // Random port
let daemon_b = run_daemon(coordinator_addr.to_string(), "B", 9842);

tracing::info!("Spawning coordinator and daemons");
let mut tasks = JoinSet::new();
@@ -211,7 +211,11 @@ async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> {
Ok(())
}

async fn run_daemon(coordinator: String, machine_id: &str) -> eyre::Result<()> {
async fn run_daemon(
coordinator: String,
machine_id: &str,
local_listen_port: u16,
) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
@@ -221,7 +225,9 @@ async fn run_daemon(coordinator: String, machine_id: &str) -> eyre::Result<()> {
.arg("--machine-id")
.arg(machine_id)
.arg("--coordinator-addr")
.arg(coordinator);
.arg(coordinator)
.arg("--local-listen-port")
.arg(local_listen_port.to_string());
if !cmd.status().await?.success() {
bail!("failed to run dataflow");
};


+ 16
- 0
examples/python-dataflow/dataflow_dynamic.yml View File

@@ -0,0 +1,16 @@
nodes:
- id: webcam
custom:
source: ./webcam.py
inputs:
tick:
source: dora/timer/millis/50
queue_size: 1000
outputs:
- image

- id: plot
custom:
source: dynamic
inputs:
image: webcam/image

+ 97
- 0
examples/python-dataflow/plot_dynamic.py View File

@@ -0,0 +1,97 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import os
from dora import Node
from dora import DoraStatus

import cv2
import numpy as np
from utils import LABELS

CI = os.environ.get("CI")

font = cv2.FONT_HERSHEY_SIMPLEX


class Plotter:
"""
Plot image and bounding box
"""

def __init__(self):
self.image = []
self.bboxs = []

def on_input(
self,
dora_input,
) -> DoraStatus:
"""
Put image and bounding box on cv2 window.

Args:
dora_input["id"] (str): Id of the dora_input declared in the yaml configuration
dora_input["value"] (arrow array): message of the dora_input
"""
if dora_input["id"] == "image":
frame = dora_input["value"].to_numpy()
frame = cv2.imdecode(frame, -1)
self.image = frame

elif dora_input["id"] == "bbox" and len(self.image) != 0:
bboxs = dora_input["value"].to_numpy()
self.bboxs = np.reshape(bboxs, (-1, 6))
for bbox in self.bboxs:
[
min_x,
min_y,
max_x,
max_y,
confidence,
label,
] = bbox
cv2.rectangle(
self.image,
(int(min_x), int(min_y)),
(int(max_x), int(max_y)),
(0, 255, 0),
2,
)

cv2.putText(
self.image,
LABELS[int(label)] + f", {confidence:0.2f}",
(int(max_x), int(max_y)),
font,
0.75,
(0, 255, 0),
2,
1,
)

if CI != "true":
cv2.imshow("frame", self.image)
if cv2.waitKey(1) & 0xFF == ord("q"):
return DoraStatus.STOP

return DoraStatus.CONTINUE


plotter = Plotter()

node = Node("plot")

for event in node:
event_type = event["type"]
if event_type == "INPUT":
status = plotter.on_input(event)
if status == DoraStatus.CONTINUE:
pass
elif status == DoraStatus.STOP:
print("plotter returned stop status")
break
elif event_type == "STOP":
print("received stop")
else:
print("received unexpected event:", event_type)

+ 31
- 0
examples/rust-dataflow/dataflow_dynamic.yml View File

@@ -0,0 +1,31 @@
nodes:
- id: rust-node
custom:
build: cargo build -p rust-dataflow-example-node
source: ../../target/debug/rust-dataflow-example-node
inputs:
tick: dora/timer/millis/100
outputs:
- random
- id: rust-status-node
custom:
build: cargo build -p rust-dataflow-example-status-node
source: ../../target/debug/rust-dataflow-example-status-node
inputs:
tick: dora/timer/millis/100
random: rust-node/random
outputs:
- status
- id: rust-sink-dynamic
custom:
build: cargo build -p rust-dataflow-example-sink-dynamic
source: dynamic
inputs:
message: rust-status-node/status
- id: dora-record
custom:
build: cargo build -p dora-record
source: ../../target/debug/dora-record
inputs:
message: rust-status-node/status
random: rust-node/random

+ 10
- 0
examples/rust-dataflow/sink-dynamic/Cargo.toml View File

@@ -0,0 +1,10 @@
[package]
name = "rust-dataflow-example-sink-dynamic"
version.workspace = true
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
dora-node-api = { workspace = true, features = ["tracing"] }
eyre = "0.6.8"

+ 39
- 0
examples/rust-dataflow/sink-dynamic/src/main.rs View File

@@ -0,0 +1,39 @@
use dora_node_api::{self, dora_core::config::NodeId, DoraNode, Event};
use eyre::{bail, Context};

fn main() -> eyre::Result<()> {
let (_node, mut events) =
DoraNode::init_from_node_id(NodeId::from("rust-sink-dynamic".to_string()))?;

while let Some(event) = events.recv() {
match event {
Event::Input {
id,
metadata: _,
data,
} => match id.as_str() {
"message" => {
let received_string: &str =
TryFrom::try_from(&data).context("expected string message")?;
println!("sink received message: {}", received_string);
if !received_string.starts_with("operator received random value ") {
bail!("unexpected message format (should start with 'operator received random value')")
}
if !received_string.ends_with(" ticks") {
bail!("unexpected message format (should end with 'ticks')")
}
}
other => eprintln!("Ignoring unexpected input `{other}`"),
},
Event::Stop => {
println!("Received manual stop");
}
Event::InputClosed { id } => {
println!("Input `{id}` was closed");
}
other => eprintln!("Received unexpected input: {other:?}"),
}
}

Ok(())
}

+ 32
- 3
libraries/core/src/daemon_messages.rs View File

@@ -14,13 +14,14 @@ use aligned_vec::{AVec, ConstAlign};
use dora_message::{uhlc, Metadata};
use uuid::{NoContext, Timestamp, Uuid};

#[derive(Debug, serde::Serialize, serde::Deserialize)]
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct NodeConfig {
pub dataflow_id: DataflowId,
pub node_id: NodeId,
pub run_config: NodeRunConfig,
pub daemon_communication: DaemonCommunication,
pub dataflow_descriptor: Descriptor,
pub dynamic: bool,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
@@ -68,13 +69,18 @@ pub enum DaemonRequest {
SubscribeDrop,
NextFinishedDropTokens,
EventStreamDropped,
NodeConfig {
node_id: NodeId,
},
}

impl DaemonRequest {
pub fn expects_tcp_reply(&self) -> bool {
pub fn expects_tcp_bincode_reply(&self) -> bool {
#[allow(clippy::match_like_matches_macro)]
match self {
DaemonRequest::SendMessage { .. } | DaemonRequest::ReportDropTokens { .. } => false,
DaemonRequest::SendMessage { .. }
| DaemonRequest::NodeConfig { .. }
| DaemonRequest::ReportDropTokens { .. } => false,
DaemonRequest::Register { .. }
| DaemonRequest::Subscribe
| DaemonRequest::CloseOutputs(_)
@@ -85,6 +91,23 @@ impl DaemonRequest {
| DaemonRequest::EventStreamDropped => true,
}
}

pub fn expects_tcp_json_reply(&self) -> bool {
#[allow(clippy::match_like_matches_macro)]
match self {
DaemonRequest::NodeConfig { .. } => true,
DaemonRequest::Register { .. }
| DaemonRequest::Subscribe
| DaemonRequest::CloseOutputs(_)
| DaemonRequest::OutputsDone
| DaemonRequest::NextEvent { .. }
| DaemonRequest::SubscribeDrop
| DaemonRequest::NextFinishedDropTokens
| DaemonRequest::ReportDropTokens { .. }
| DaemonRequest::SendMessage { .. }
| DaemonRequest::EventStreamDropped => false,
}
}
}

#[derive(serde::Serialize, serde::Deserialize, Clone)]
@@ -136,6 +159,7 @@ pub enum DaemonReply {
PreparedMessage { shared_memory_id: SharedMemoryId },
NextEvents(Vec<Timestamped<NodeEvent>>),
NextDropEvents(Vec<Timestamped<NodeDropEvent>>),
NodeConfig { result: Result<NodeConfig, String> },
Empty,
}

@@ -229,6 +253,11 @@ pub enum DaemonCoordinatorEvent {
Heartbeat,
}

#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub enum DynamicNodeEvent {
NodeConfig { node_id: NodeId },
}

#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub enum InterDaemonEvent {
Output {


+ 47
- 0
libraries/core/src/descriptor/mod.rs View File

@@ -16,6 +16,7 @@ pub use visualize::collect_dora_timers;
mod validate;
mod visualize;
pub const SHELL_SOURCE: &str = "shell";
pub const DYNAMIC_SOURCE: &str = "dynamic";

/// Dataflow description
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
@@ -335,6 +336,52 @@ pub enum CoreNodeKind {
Custom(CustomNode),
}

pub fn runtime_node_inputs(n: &RuntimeNode) -> BTreeMap<DataId, Input> {
n.operators
.iter()
.flat_map(|operator| {
operator.config.inputs.iter().map(|(input_id, mapping)| {
(
DataId::from(format!("{}/{input_id}", operator.id)),
mapping.clone(),
)
})
})
.collect()
}

fn runtime_node_outputs(n: &RuntimeNode) -> BTreeSet<DataId> {
n.operators
.iter()
.flat_map(|operator| {
operator
.config
.outputs
.iter()
.map(|output_id| DataId::from(format!("{}/{output_id}", operator.id)))
})
.collect()
}

impl CoreNodeKind {
pub fn run_config(&self) -> NodeRunConfig {
match self {
CoreNodeKind::Runtime(n) => NodeRunConfig {
inputs: runtime_node_inputs(n),
outputs: runtime_node_outputs(n),
},
CoreNodeKind::Custom(n) => n.run_config.clone(),
}
}

pub fn dynamic(&self) -> bool {
match self {
CoreNodeKind::Runtime(_n) => false,
CoreNodeKind::Custom(n) => n.source == DYNAMIC_SOURCE,
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(transparent)]
pub struct RuntimeNode {


+ 2
- 1
libraries/core/src/descriptor/validate.rs View File

@@ -9,7 +9,7 @@ use eyre::{bail, eyre, Context};
use std::{path::Path, process::Command};
use tracing::info;

use super::{resolve_path, Descriptor, SHELL_SOURCE};
use super::{resolve_path, Descriptor, DYNAMIC_SOURCE, SHELL_SOURCE};
const VERSION: &str = env!("CARGO_PKG_VERSION");

pub fn check_dataflow(
@@ -26,6 +26,7 @@ pub fn check_dataflow(
match &node.kind {
descriptor::CoreNodeKind::Custom(custom) => match custom.source.as_str() {
SHELL_SOURCE => (),
DYNAMIC_SOURCE => (),
source => {
if source_is_url(source) {
info!("{source} is a URL."); // TODO: Implement url check.


+ 9
- 1
libraries/core/src/topics.rs View File

@@ -1,4 +1,10 @@
use std::{collections::BTreeSet, fmt::Display, path::PathBuf, time::Duration};
use std::{
collections::BTreeSet,
fmt::Display,
net::{IpAddr, Ipv4Addr},
path::PathBuf,
time::Duration,
};
use uuid::Uuid;

use crate::{
@@ -6,7 +12,9 @@ use crate::{
descriptor::Descriptor,
};

pub const LOCALHOST: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
pub const DORA_COORDINATOR_PORT_DEFAULT: u16 = 0xD02A;
pub const DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT: u16 = 0xD02B;
pub const DORA_COORDINATOR_PORT_CONTROL_DEFAULT: u16 = 0x177C;

pub const MANUAL_STOP: &str = "dora/stop";


+ 39
- 14
libraries/extensions/telemetry/tracing/src/lib.rs View File

@@ -3,6 +3,8 @@
//! This module init a tracing propagator for Rust code that requires tracing, and is
//! able to serialize and deserialize context that has been sent via the middleware.

use std::path::Path;

use eyre::Context as EyreContext;
use tracing::metadata::LevelFilter;
use tracing_subscriber::{
@@ -14,13 +16,38 @@ use tracing_subscriber::Registry;
pub mod telemetry;

pub fn set_up_tracing(name: &str) -> eyre::Result<()> {
// Filter log using `RUST_LOG`. More useful for CLI.
let filter = EnvFilter::from_default_env().or(LevelFilter::WARN);
let stdout_log = tracing_subscriber::fmt::layer()
.pretty()
.with_filter(filter);
set_up_tracing_opts(name, true, None)
}

pub fn set_up_tracing_opts(name: &str, stdout: bool, filename: Option<&str>) -> eyre::Result<()> {
let mut layers = Vec::new();

if stdout {
// Filter log using `RUST_LOG`. More useful for CLI.
let env_filter = EnvFilter::from_default_env().or(LevelFilter::WARN);
let layer = tracing_subscriber::fmt::layer()
.compact()
.with_filter(env_filter);
layers.push(layer.boxed());
}

if let Some(filename) = filename {
let out_dir = Path::new("out");
std::fs::create_dir_all(out_dir).context("failed to create `out` directory")?;
let path = out_dir.join(filename).with_extension("txt");
let file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(path)
.context("failed to create log file")?;
// Filter log using `RUST_LOG`. More useful for CLI.
let layer = tracing_subscriber::fmt::layer()
.with_ansi(false)
.with_writer(file)
.with_filter(LevelFilter::INFO);
layers.push(layer.boxed());
}

let registry = Registry::default().with(stdout_log);
if let Some(endpoint) = std::env::var_os("DORA_JAEGER_TRACING") {
let endpoint = endpoint
.to_str()
@@ -28,13 +55,11 @@ pub fn set_up_tracing(name: &str) -> eyre::Result<()> {
let tracer = crate::telemetry::init_jaeger_tracing(name, endpoint)
.wrap_err("Could not instantiate tracing")?;
let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);
let subscriber = registry.with(telemetry);
tracing::subscriber::set_global_default(subscriber).context(format!(
"failed to set tracing global subscriber for {name}"
))
} else {
tracing::subscriber::set_global_default(registry).context(format!(
"failed to set tracing global subscriber for {name}"
))
layers.push(telemetry.boxed());
}

let registry = Registry::default().with(layers);
tracing::subscriber::set_global_default(registry).context(format!(
"failed to set tracing global subscriber for {name}"
))
}

+ 56
- 0
tool_nodes/dora-record/README.md View File

@@ -0,0 +1,56 @@
# dora-record

dora data recording using Apache Arrow Parquet.

This nodes is still experimental.

## Getting Started

```bash
cargo install dora-record --locked
```

## Adding to existing graph:

```yaml
- id: dora-record
custom:
source: dora-record
inputs:
image: webcam/image
text: webcam/text
# You can add any input and it is going to be logged.
```

## Output Files

Format: Parquet file

path: `out/<DATAFLOW_ID>/<INPUT>.parquet`

Columns:

- trace_id: String, representing the id of the current trace
- span_id: String, representing the unique span id
- timestamp_uhlc: u64, representing the timestamp in [Unique Hybrid Logical Clock time](https://github.com/atolab/uhlc-rs)
- timestamp_utc: DataType::Timestamp(Milliseconds), representing the timestamp in Coordinated Universal Time.
- `<INPUT>` : Column containing the input in its defined format.

Example:

```json
{
"trace_id": "2fd23ddf1b5d2aa38ddb86ceedb55928",
"span_id": "15aef03e0f052bbf",
"timestamp_uhlc": "7368873278370007008",
"timestamp_utc": 1715699508406,
"random": [1886295351360621740]
}
```

## merging multiple file

We can merge input files using the `trace_id` that is going to be shared when using opentelemetry features.

- `trace_id` can also be queried from UI such as jaeger UI, influxDB and so on...
- `trace_id` keep tracks of the logical flow of data, compared to timestamp based merging that might not reflect the actual logical flow of data.

Loading…
Cancel
Save