Run dynamic nodetags/v0.3.5-rc0
| @@ -289,8 +289,14 @@ jobs: | |||
| dora start dataflow.yml --name ci-rust-test | |||
| sleep 10 | |||
| dora stop --name ci-rust-test --grace-duration 5s | |||
| cd .. | |||
| dora build examples/rust-dataflow/dataflow_dynamic.yml | |||
| dora start examples/rust-dataflow/dataflow_dynamic.yml --name ci-rust-dynamic | |||
| cargo run -p rust-dataflow-example-sink-dynamic | |||
| sleep 5 | |||
| dora stop --name ci-rust-dynamic --grace-duration 5s | |||
| dora destroy | |||
| - name: "Test CLI (Python)" | |||
| timeout-minutes: 30 | |||
| # fail-fast by using bash shell explictly | |||
| @@ -312,6 +318,11 @@ jobs: | |||
| dora start dataflow.yml --name ci-python-test | |||
| sleep 10 | |||
| dora stop --name ci-python-test --grace-duration 5s | |||
| pip install opencv-python | |||
| dora start ../examples/python-dataflow/dataflow_dynamic.yml --name ci-python-dynamic | |||
| python ../examples/python-dataflow/plot_dynamic.py | |||
| sleep 5 | |||
| dora stop --name ci-python-test --grace-duration 5s | |||
| dora destroy | |||
| clippy: | |||
| @@ -2407,6 +2407,7 @@ dependencies = [ | |||
| "futures", | |||
| "futures-concurrency", | |||
| "futures-timer", | |||
| "serde_json", | |||
| "serde_yaml 0.8.26", | |||
| "shared-memory-server", | |||
| "shared_memory_extended", | |||
| @@ -8074,6 +8075,14 @@ dependencies = [ | |||
| "eyre", | |||
| ] | |||
| [[package]] | |||
| name = "rust-dataflow-example-sink-dynamic" | |||
| version = "0.3.4" | |||
| dependencies = [ | |||
| "dora-node-api", | |||
| "eyre", | |||
| ] | |||
| [[package]] | |||
| name = "rust-dataflow-example-status-node" | |||
| version = "0.3.4" | |||
| @@ -16,6 +16,7 @@ members = [ | |||
| "examples/rust-dataflow/node", | |||
| "examples/rust-dataflow/status-node", | |||
| "examples/rust-dataflow/sink", | |||
| "examples/rust-dataflow/sink-dynamic", | |||
| "examples/rust-ros2-dataflow/node", | |||
| "examples/benchmark/node", | |||
| "examples/benchmark/sink", | |||
| @@ -3,6 +3,7 @@ | |||
| use std::time::Duration; | |||
| use arrow::pyarrow::{FromPyArrow, ToPyArrow}; | |||
| use dora_node_api::dora_core::config::NodeId; | |||
| use dora_node_api::merged::{MergeExternalSend, MergedEvent}; | |||
| use dora_node_api::{DoraNode, EventStream}; | |||
| use dora_operator_api_python::{pydict_to_metadata, PyEvent}; | |||
| @@ -32,8 +33,13 @@ pub struct Node { | |||
| #[pymethods] | |||
| impl Node { | |||
| #[new] | |||
| pub fn new() -> eyre::Result<Self> { | |||
| let (node, events) = DoraNode::init_from_env()?; | |||
| pub fn new(node_id: Option<String>) -> eyre::Result<Self> { | |||
| let (node, events) = if let Some(node_id) = node_id { | |||
| DoraNode::init_flexible(NodeId::from(node_id)) | |||
| .context("Could not setup node from node id. Make sure to have a running dataflow with this dynamic node")? | |||
| } else { | |||
| DoraNode::init_from_env().context("Couldn not initiate node from environment variable. For dynamic node, please add a node id in the initialization function.")? | |||
| }; | |||
| Ok(Node { | |||
| events: Events::Dora(events), | |||
| @@ -26,6 +26,7 @@ futures-concurrency = "7.3.0" | |||
| futures-timer = "3.0.2" | |||
| dora-arrow-convert = { workspace = true } | |||
| aligned-vec = "0.5.0" | |||
| serde_json = "1.0.86" | |||
| [dev-dependencies] | |||
| tokio = { version = "1.24.2", features = ["rt"] } | |||
| @@ -5,13 +5,21 @@ use std::{ | |||
| net::TcpStream, | |||
| }; | |||
| enum Serializer { | |||
| Bincode, | |||
| SerdeJson, | |||
| } | |||
| pub fn request( | |||
| connection: &mut TcpStream, | |||
| request: &Timestamped<DaemonRequest>, | |||
| ) -> eyre::Result<DaemonReply> { | |||
| send_message(connection, request)?; | |||
| if request.inner.expects_tcp_reply() { | |||
| receive_reply(connection) | |||
| if request.inner.expects_tcp_bincode_reply() { | |||
| receive_reply(connection, Serializer::Bincode) | |||
| .and_then(|reply| reply.ok_or_else(|| eyre!("server disconnected unexpectedly"))) | |||
| // Use serde json for message with variable length | |||
| } else if request.inner.expects_tcp_json_reply() { | |||
| receive_reply(connection, Serializer::SerdeJson) | |||
| .and_then(|reply| reply.ok_or_else(|| eyre!("server disconnected unexpectedly"))) | |||
| } else { | |||
| Ok(DaemonReply::Empty) | |||
| @@ -27,7 +35,10 @@ fn send_message( | |||
| Ok(()) | |||
| } | |||
| fn receive_reply(connection: &mut TcpStream) -> eyre::Result<Option<DaemonReply>> { | |||
| fn receive_reply( | |||
| connection: &mut TcpStream, | |||
| serializer: Serializer, | |||
| ) -> eyre::Result<Option<DaemonReply>> { | |||
| let raw = match tcp_receive(connection) { | |||
| Ok(raw) => raw, | |||
| Err(err) => match err.kind() { | |||
| @@ -43,9 +54,14 @@ fn receive_reply(connection: &mut TcpStream) -> eyre::Result<Option<DaemonReply> | |||
| } | |||
| }, | |||
| }; | |||
| bincode::deserialize(&raw) | |||
| .wrap_err("failed to deserialize DaemonReply") | |||
| .map(Some) | |||
| match serializer { | |||
| Serializer::Bincode => bincode::deserialize(&raw) | |||
| .wrap_err("failed to deserialize DaemonReply") | |||
| .map(Some), | |||
| Serializer::SerdeJson => serde_json::from_slice(&raw) | |||
| .wrap_err("failed to deserialize DaemonReply") | |||
| .map(Some), | |||
| } | |||
| } | |||
| fn tcp_send(connection: &mut (impl Write + Unpin), message: &[u8]) -> std::io::Result<()> { | |||
| @@ -1,4 +1,4 @@ | |||
| use crate::EventStream; | |||
| use crate::{daemon_connection::DaemonChannel, EventStream}; | |||
| use self::{ | |||
| arrow_utils::{copy_array_into_sample, required_data_size}, | |||
| @@ -9,10 +9,12 @@ use aligned_vec::{AVec, ConstAlign}; | |||
| use arrow::array::Array; | |||
| use dora_core::{ | |||
| config::{DataId, NodeId, NodeRunConfig}, | |||
| daemon_messages::{DataMessage, DataflowId, DropToken, NodeConfig}, | |||
| daemon_messages::{DaemonRequest, DataMessage, DataflowId, DropToken, NodeConfig, Timestamped}, | |||
| descriptor::Descriptor, | |||
| message::{uhlc, ArrowTypeInfo, Metadata, MetadataParameters}, | |||
| topics::{DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, LOCALHOST}, | |||
| }; | |||
| use eyre::{bail, WrapErr}; | |||
| use shared_memory_extended::{Shmem, ShmemConf}; | |||
| use std::{ | |||
| @@ -21,6 +23,7 @@ use std::{ | |||
| sync::Arc, | |||
| time::Duration, | |||
| }; | |||
| use tracing::info; | |||
| #[cfg(feature = "tracing")] | |||
| use dora_tracing::set_up_tracing; | |||
| @@ -56,8 +59,9 @@ impl DoraNode { | |||
| /// | |||
| pub fn init_from_env() -> eyre::Result<(Self, EventStream)> { | |||
| let node_config: NodeConfig = { | |||
| let raw = std::env::var("DORA_NODE_CONFIG") | |||
| .wrap_err("env variable DORA_NODE_CONFIG must be set")?; | |||
| let raw = std::env::var("DORA_NODE_CONFIG").wrap_err( | |||
| "env variable DORA_NODE_CONFIG must be set. Are you sure your using `dora start`?", | |||
| )?; | |||
| serde_yaml::from_str(&raw).context("failed to deserialize operator config")? | |||
| }; | |||
| #[cfg(feature = "tracing")] | |||
| @@ -66,6 +70,49 @@ impl DoraNode { | |||
| Self::init(node_config) | |||
| } | |||
| /// Initiate a node from a dataflow id and a node id. | |||
| /// | |||
| /// ```no_run | |||
| /// use dora_node_api::DoraNode; | |||
| /// use dora_node_api::dora_core::config::NodeId; | |||
| /// | |||
| /// let (mut node, mut events) = DoraNode::init_from_node_id(NodeId::from("plot".to_string())).expect("Could not init node plot"); | |||
| /// ``` | |||
| /// | |||
| pub fn init_from_node_id(node_id: NodeId) -> eyre::Result<(Self, EventStream)> { | |||
| // Make sure that the node is initialized outside of dora start. | |||
| let daemon_address = (LOCALHOST, DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT).into(); | |||
| let mut channel = | |||
| DaemonChannel::new_tcp(daemon_address).context("Could not connect to the daemon")?; | |||
| let clock = Arc::new(uhlc::HLC::default()); | |||
| let reply = channel | |||
| .request(&Timestamped { | |||
| inner: DaemonRequest::NodeConfig { node_id }, | |||
| timestamp: clock.new_timestamp(), | |||
| }) | |||
| .wrap_err("failed to request node config from daemon")?; | |||
| match reply { | |||
| dora_core::daemon_messages::DaemonReply::NodeConfig { | |||
| result: Ok(node_config), | |||
| } => Self::init(node_config), | |||
| dora_core::daemon_messages::DaemonReply::NodeConfig { result: Err(error) } => { | |||
| bail!("failed to get node config from daemon: {error}") | |||
| } | |||
| _ => bail!("unexpected reply from daemon"), | |||
| } | |||
| } | |||
| pub fn init_flexible(node_id: NodeId) -> eyre::Result<(Self, EventStream)> { | |||
| if std::env::var("DORA_NODE_CONFIG").is_ok() { | |||
| info!("Skipping {node_id} specified within the node initialization in favor of `DORA_NODE_CONFIG` specified by `dora start`"); | |||
| Self::init_from_env() | |||
| } else { | |||
| Self::init_from_node_id(node_id) | |||
| } | |||
| } | |||
| #[tracing::instrument] | |||
| pub fn init(node_config: NodeConfig) -> eyre::Result<(Self, EventStream)> { | |||
| let NodeConfig { | |||
| @@ -74,8 +121,8 @@ impl DoraNode { | |||
| run_config, | |||
| daemon_communication, | |||
| dataflow_descriptor, | |||
| dynamic: _, | |||
| } = node_config; | |||
| let clock = Arc::new(uhlc::HLC::default()); | |||
| let event_stream = | |||
| @@ -91,13 +138,12 @@ impl DoraNode { | |||
| let node = Self { | |||
| id: node_id, | |||
| dataflow_id, | |||
| node_config: run_config, | |||
| node_config: run_config.clone(), | |||
| control_channel, | |||
| clock, | |||
| sent_out_shared_memory: HashMap::new(), | |||
| drop_stream, | |||
| cache: VecDeque::new(), | |||
| dataflow_descriptor, | |||
| }; | |||
| Ok((node, event_stream)) | |||
| @@ -6,7 +6,7 @@ use dora_core::{ | |||
| descriptor::Descriptor, | |||
| topics::{ | |||
| ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_CONTROL_DEFAULT, | |||
| DORA_COORDINATOR_PORT_DEFAULT, | |||
| DORA_COORDINATOR_PORT_DEFAULT, DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, | |||
| }, | |||
| }; | |||
| use dora_daemon::Daemon; | |||
| @@ -175,9 +175,12 @@ enum Command { | |||
| /// Unique identifier for the machine (required for distributed dataflows) | |||
| #[clap(long)] | |||
| machine_id: Option<String>, | |||
| /// The IP address and port this daemon will bind to. | |||
| /// The inter daemon IP address and port this daemon will bind to. | |||
| #[clap(long, default_value_t = SocketAddr::new(LISTEN_WILDCARD, 0))] | |||
| addr: SocketAddr, | |||
| inter_daemon_addr: SocketAddr, | |||
| /// Local listen port for event such as dynamic node. | |||
| #[clap(long, default_value_t = DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT)] | |||
| local_listen_port: u16, | |||
| /// Address and port number of the dora coordinator | |||
| #[clap(long, default_value_t = SocketAddr::new(LOCALHOST, DORA_COORDINATOR_PORT_DEFAULT))] | |||
| coordinator_addr: SocketAddr, | |||
| @@ -434,7 +437,8 @@ fn run() -> eyre::Result<()> { | |||
| } | |||
| Command::Daemon { | |||
| coordinator_addr, | |||
| addr, | |||
| inter_daemon_addr, | |||
| local_listen_port, | |||
| machine_id, | |||
| run_dataflow, | |||
| quiet: _, | |||
| @@ -460,7 +464,7 @@ fn run() -> eyre::Result<()> { | |||
| if coordinator_addr.ip() == LOCALHOST { | |||
| tracing::info!("Starting in local mode"); | |||
| } | |||
| Daemon::run(coordinator_addr, machine_id.unwrap_or_default(), addr).await | |||
| Daemon::run(coordinator_addr, machine_id.unwrap_or_default(), inter_daemon_addr, local_listen_port).await | |||
| } | |||
| } | |||
| }) | |||
| @@ -655,7 +655,7 @@ async fn send_heartbeat_message( | |||
| inner: DaemonCoordinatorEvent::Heartbeat, | |||
| timestamp, | |||
| }) | |||
| .unwrap(); | |||
| .context("Could not serialize heartbeat message")?; | |||
| tcp_send(connection, &message) | |||
| .await | |||
| @@ -2,9 +2,13 @@ use aligned_vec::{AVec, ConstAlign}; | |||
| use coordinator::CoordinatorEvent; | |||
| use dora_core::config::{Input, OperatorId}; | |||
| use dora_core::coordinator_messages::CoordinatorRequest; | |||
| use dora_core::daemon_messages::{DataMessage, InterDaemonEvent, Timestamped}; | |||
| use dora_core::daemon_messages::{ | |||
| DataMessage, DynamicNodeEvent, InterDaemonEvent, NodeConfig, Timestamped, | |||
| }; | |||
| use dora_core::descriptor::runtime_node_inputs; | |||
| use dora_core::message::uhlc::{self, HLC}; | |||
| use dora_core::message::{ArrowTypeInfo, Metadata, MetadataParameters}; | |||
| use dora_core::topics::LOCALHOST; | |||
| use dora_core::{ | |||
| config::{DataId, InputMapping, NodeId}, | |||
| coordinator_messages::DaemonEvent, | |||
| @@ -15,10 +19,11 @@ use dora_core::{ | |||
| descriptor::{CoreNodeKind, Descriptor, ResolvedNode}, | |||
| }; | |||
| use eyre::{bail, eyre, Context, ContextCompat}; | |||
| use eyre::{bail, eyre, Context, ContextCompat, Result}; | |||
| use futures::{future, stream, FutureExt, TryFutureExt}; | |||
| use futures_concurrency::stream::Merge; | |||
| use inter_daemon::InterDaemonConnection; | |||
| use local_listener::DynamicNodeEventWrapper; | |||
| use pending::PendingNodes; | |||
| use shared_memory_server::ShmemConf; | |||
| use std::sync::Arc; | |||
| @@ -45,6 +50,7 @@ use uuid::{NoContext, Timestamp, Uuid}; | |||
| mod coordinator; | |||
| mod inter_daemon; | |||
| mod local_listener; | |||
| mod log; | |||
| mod node_communication; | |||
| mod pending; | |||
| @@ -81,16 +87,18 @@ impl Daemon { | |||
| pub async fn run( | |||
| coordinator_addr: SocketAddr, | |||
| machine_id: String, | |||
| bind_addr: SocketAddr, | |||
| inter_daemon_addr: SocketAddr, | |||
| local_listen_port: u16, | |||
| ) -> eyre::Result<()> { | |||
| let clock = Arc::new(HLC::default()); | |||
| let ctrlc_events = set_up_ctrlc_handler(clock.clone())?; | |||
| // spawn listen loop | |||
| // spawn inter daemon listen loop | |||
| let (events_tx, events_rx) = flume::bounded(10); | |||
| let listen_port = | |||
| inter_daemon::spawn_listener_loop(bind_addr, machine_id.clone(), events_tx).await?; | |||
| inter_daemon::spawn_listener_loop(inter_daemon_addr, machine_id.clone(), events_tx) | |||
| .await?; | |||
| let daemon_events = events_rx.into_stream().map(|e| Timestamped { | |||
| inner: Event::Daemon(e.inner), | |||
| timestamp: e.timestamp, | |||
| @@ -111,8 +119,26 @@ impl Daemon { | |||
| }, | |||
| ); | |||
| // Spawn local listener loop | |||
| let (events_tx, events_rx) = flume::bounded(10); | |||
| let _listen_port = local_listener::spawn_listener_loop( | |||
| (LOCALHOST, local_listen_port).into(), | |||
| machine_id.clone(), | |||
| events_tx, | |||
| ) | |||
| .await?; | |||
| let dynamic_node_events = events_rx.into_stream().map(|e| Timestamped { | |||
| inner: Event::DynamicNode(e.inner), | |||
| timestamp: e.timestamp, | |||
| }); | |||
| Self::run_general( | |||
| (coordinator_events, ctrlc_events, daemon_events).merge(), | |||
| ( | |||
| coordinator_events, | |||
| ctrlc_events, | |||
| daemon_events, | |||
| dynamic_node_events, | |||
| ) | |||
| .merge(), | |||
| Some(coordinator_addr), | |||
| machine_id, | |||
| None, | |||
| @@ -276,6 +302,7 @@ impl Daemon { | |||
| RunStatus::Continue => {} | |||
| RunStatus::Exit => break, | |||
| }, | |||
| Event::DynamicNode(event) => self.handle_dynamic_node_event(event).await?, | |||
| Event::HeartbeatInterval => { | |||
| if let Some(connection) = &mut self.coordinator_connection { | |||
| let msg = serde_json::to_vec(&Timestamped { | |||
| @@ -437,7 +464,6 @@ impl Daemon { | |||
| .running | |||
| .get_mut(&dataflow_id) | |||
| .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"))?; | |||
| // .stop_all(&self.clock.clone(), grace_duration); | |||
| let reply = DaemonCoordinatorReply::StopResult(Ok(())); | |||
| let _ = reply_tx | |||
| @@ -599,10 +625,8 @@ impl Daemon { | |||
| .await | |||
| .wrap_err_with(|| format!("failed to spawn node `{node_id}`")) | |||
| { | |||
| Ok(pid) => { | |||
| dataflow | |||
| .running_nodes | |||
| .insert(node_id.clone(), RunningNode { pid }); | |||
| Ok(running_node) => { | |||
| dataflow.running_nodes.insert(node_id, running_node); | |||
| } | |||
| Err(err) => { | |||
| tracing::error!("{err:?}"); | |||
| @@ -624,6 +648,70 @@ impl Daemon { | |||
| Ok(()) | |||
| } | |||
| async fn handle_dynamic_node_event( | |||
| &mut self, | |||
| event: DynamicNodeEventWrapper, | |||
| ) -> eyre::Result<()> { | |||
| match event { | |||
| DynamicNodeEventWrapper { | |||
| event: DynamicNodeEvent::NodeConfig { node_id }, | |||
| reply_tx, | |||
| } => { | |||
| let number_node_id = self | |||
| .running | |||
| .iter() | |||
| .filter(|(_id, dataflow)| dataflow.running_nodes.contains_key(&node_id)) | |||
| .count(); | |||
| let node_config = match number_node_id { | |||
| 2.. => { | |||
| let _ = reply_tx.send(Some(DaemonReply::NodeConfig { | |||
| result: Err(format!( | |||
| "multiple dataflows contains dynamic node id {}. Please only have one running dataflow with the specified node id if you want to use dynamic node", | |||
| node_id | |||
| ) | |||
| .to_string()), | |||
| })); | |||
| return Ok(()); | |||
| } | |||
| 1 => self | |||
| .running | |||
| .iter() | |||
| .filter(|(_id, dataflow)| dataflow.running_nodes.contains_key(&node_id)) | |||
| .map(|(id, dataflow)| -> Result<NodeConfig> { | |||
| let node_config = dataflow | |||
| .running_nodes | |||
| .get(&node_id) | |||
| .context("no node with ID `{node_id}` within the given dataflow")? | |||
| .node_config | |||
| .clone(); | |||
| if !node_config.dynamic { | |||
| bail!("node with ID `{node_id}` in {id} is not dynamic"); | |||
| } | |||
| Ok(node_config) | |||
| }) | |||
| .next() | |||
| .context("no node with ID `{node_id}`")? | |||
| .context("failed to get dynamic node config within given dataflow")?, | |||
| 0 => { | |||
| let _ = reply_tx.send(Some(DaemonReply::NodeConfig { | |||
| result: Err("no node with ID `{node_id}`".to_string()), | |||
| })); | |||
| return Ok(()); | |||
| } | |||
| }; | |||
| let reply = DaemonReply::NodeConfig { | |||
| result: Ok(node_config), | |||
| }; | |||
| let _ = reply_tx.send(Some(reply)).map_err(|_| { | |||
| error!("could not send node info reply from daemon to coordinator") | |||
| }); | |||
| Ok(()) | |||
| } | |||
| } | |||
| } | |||
| async fn handle_node_event( | |||
| &mut self, | |||
| event: DaemonNodeEvent, | |||
| @@ -920,7 +1008,11 @@ impl Daemon { | |||
| .await?; | |||
| dataflow.running_nodes.remove(node_id); | |||
| if dataflow.running_nodes.is_empty() { | |||
| if dataflow | |||
| .running_nodes | |||
| .iter() | |||
| .all(|(_id, n)| n.node_config.dynamic) | |||
| { | |||
| let result = match self.dataflow_errors.get(&dataflow.id) { | |||
| None => Ok(()), | |||
| Some(errors) => { | |||
| @@ -1227,33 +1319,6 @@ fn node_inputs(node: &ResolvedNode) -> BTreeMap<DataId, Input> { | |||
| } | |||
| } | |||
| fn runtime_node_inputs(n: &dora_core::descriptor::RuntimeNode) -> BTreeMap<DataId, Input> { | |||
| n.operators | |||
| .iter() | |||
| .flat_map(|operator| { | |||
| operator.config.inputs.iter().map(|(input_id, mapping)| { | |||
| ( | |||
| DataId::from(format!("{}/{input_id}", operator.id)), | |||
| mapping.clone(), | |||
| ) | |||
| }) | |||
| }) | |||
| .collect() | |||
| } | |||
| fn runtime_node_outputs(n: &dora_core::descriptor::RuntimeNode) -> BTreeSet<DataId> { | |||
| n.operators | |||
| .iter() | |||
| .flat_map(|operator| { | |||
| operator | |||
| .config | |||
| .outputs | |||
| .iter() | |||
| .map(|output_id| DataId::from(format!("{}/{output_id}", operator.id))) | |||
| }) | |||
| .collect() | |||
| } | |||
| async fn send_input_closed_events<F>( | |||
| dataflow: &mut RunningDataflow, | |||
| inter_daemon_connections: &mut BTreeMap<String, InterDaemonConnection>, | |||
| @@ -1330,7 +1395,8 @@ fn close_input( | |||
| #[derive(Debug, Clone)] | |||
| struct RunningNode { | |||
| pid: u32, | |||
| pid: Option<u32>, | |||
| node_config: NodeConfig, | |||
| } | |||
| pub struct RunningDataflow { | |||
| @@ -1444,12 +1510,14 @@ impl RunningDataflow { | |||
| system.refresh_processes(); | |||
| for (node, node_details) in running_nodes.iter() { | |||
| if let Some(process) = system.process(Pid::from(node_details.pid as usize)) { | |||
| process.kill(); | |||
| warn!( | |||
| "{node} was killed due to not stopping within the {:#?} grace period", | |||
| duration | |||
| ) | |||
| if let Some(pid) = node_details.pid { | |||
| if let Some(process) = system.process(Pid::from(pid as usize)) { | |||
| process.kill(); | |||
| warn!( | |||
| "{node} was killed due to not stopping within the {:#?} grace period", | |||
| duration | |||
| ) | |||
| } | |||
| } | |||
| } | |||
| }); | |||
| @@ -1515,6 +1583,7 @@ pub enum Event { | |||
| Coordinator(CoordinatorEvent), | |||
| Daemon(InterDaemonEvent), | |||
| Dora(DoraEvent), | |||
| DynamicNode(DynamicNodeEventWrapper), | |||
| HeartbeatInterval, | |||
| CtrlC, | |||
| } | |||
| @@ -0,0 +1,138 @@ | |||
| use crate::tcp_utils::{tcp_receive, tcp_send}; | |||
| use dora_core::daemon_messages::{DaemonReply, DaemonRequest, DynamicNodeEvent, Timestamped}; | |||
| use eyre::Context; | |||
| use std::{io::ErrorKind, net::SocketAddr}; | |||
| use tokio::{ | |||
| net::{TcpListener, TcpStream}, | |||
| sync::oneshot, | |||
| }; | |||
| #[derive(Debug)] | |||
| pub struct DynamicNodeEventWrapper { | |||
| pub event: DynamicNodeEvent, | |||
| pub reply_tx: oneshot::Sender<Option<DaemonReply>>, | |||
| } | |||
| pub async fn spawn_listener_loop( | |||
| bind: SocketAddr, | |||
| machine_id: String, | |||
| events_tx: flume::Sender<Timestamped<DynamicNodeEventWrapper>>, | |||
| ) -> eyre::Result<u16> { | |||
| let socket = match TcpListener::bind(bind).await { | |||
| Ok(socket) => socket, | |||
| Err(err) => { | |||
| return Err(eyre::Report::new(err).wrap_err("failed to create local TCP listener")) | |||
| } | |||
| }; | |||
| let listen_port = socket | |||
| .local_addr() | |||
| .wrap_err("failed to get local addr of socket")? | |||
| .port(); | |||
| tokio::spawn(async move { | |||
| listener_loop(socket, events_tx).await; | |||
| tracing::debug!("Local listener loop finished for machine `{machine_id}`"); | |||
| }); | |||
| Ok(listen_port) | |||
| } | |||
| async fn listener_loop( | |||
| listener: TcpListener, | |||
| events_tx: flume::Sender<Timestamped<DynamicNodeEventWrapper>>, | |||
| ) { | |||
| loop { | |||
| match listener | |||
| .accept() | |||
| .await | |||
| .wrap_err("failed to accept new connection") | |||
| { | |||
| Err(err) => { | |||
| tracing::info!("{err}"); | |||
| } | |||
| Ok((connection, _)) => { | |||
| tokio::spawn(handle_connection_loop(connection, events_tx.clone())); | |||
| } | |||
| } | |||
| } | |||
| } | |||
| async fn handle_connection_loop( | |||
| mut connection: TcpStream, | |||
| events_tx: flume::Sender<Timestamped<DynamicNodeEventWrapper>>, | |||
| ) { | |||
| if let Err(err) = connection.set_nodelay(true) { | |||
| tracing::warn!("failed to set nodelay for connection: {err}"); | |||
| } | |||
| loop { | |||
| match receive_message(&mut connection).await { | |||
| Ok(Some(Timestamped { | |||
| inner: DaemonRequest::NodeConfig { node_id }, | |||
| timestamp, | |||
| })) => { | |||
| let (reply_tx, reply_rx) = oneshot::channel(); | |||
| if events_tx | |||
| .send_async(Timestamped { | |||
| inner: DynamicNodeEventWrapper { | |||
| event: DynamicNodeEvent::NodeConfig { node_id }, | |||
| reply_tx, | |||
| }, | |||
| timestamp, | |||
| }) | |||
| .await | |||
| .is_err() | |||
| { | |||
| break; | |||
| } | |||
| let Ok(reply) = reply_rx.await else { | |||
| tracing::warn!("daemon sent no reply"); | |||
| continue; | |||
| }; | |||
| if let Some(reply) = reply { | |||
| let serialized = match serde_json::to_vec(&reply) | |||
| .wrap_err("failed to serialize DaemonReply") | |||
| { | |||
| Ok(r) => r, | |||
| Err(err) => { | |||
| tracing::error!("{err:?}"); | |||
| continue; | |||
| } | |||
| }; | |||
| if let Err(err) = tcp_send(&mut connection, &serialized).await { | |||
| tracing::warn!("failed to send reply: {err}"); | |||
| continue; | |||
| }; | |||
| } | |||
| } | |||
| Ok(None) => break, | |||
| Err(err) => { | |||
| tracing::warn!("{err:?}"); | |||
| break; | |||
| } | |||
| _ => tracing::warn!( | |||
| "Unexpected Daemon Request that is not yet by Additional local listener controls" | |||
| ), | |||
| } | |||
| } | |||
| } | |||
| async fn receive_message( | |||
| connection: &mut TcpStream, | |||
| ) -> eyre::Result<Option<Timestamped<DaemonRequest>>> { | |||
| let raw = match tcp_receive(connection).await { | |||
| Ok(raw) => raw, | |||
| Err(err) => match err.kind() { | |||
| ErrorKind::UnexpectedEof | |||
| | ErrorKind::ConnectionAborted | |||
| | ErrorKind::ConnectionReset => return Ok(None), | |||
| _other => { | |||
| return Err(err) | |||
| .context("unexpected I/O error while trying to receive DaemonRequest") | |||
| } | |||
| }, | |||
| }; | |||
| bincode::deserialize(&raw) | |||
| .wrap_err("failed to deserialize DaemonRequest") | |||
| .map(Some) | |||
| } | |||
| @@ -6,6 +6,7 @@ use dora_core::{ | |||
| Timestamped, | |||
| }, | |||
| message::uhlc, | |||
| topics::LOCALHOST, | |||
| }; | |||
| use eyre::{eyre, Context}; | |||
| use futures::{future, task, Future}; | |||
| @@ -13,7 +14,6 @@ use shared_memory_server::{ShmemConf, ShmemServer}; | |||
| use std::{ | |||
| collections::{BTreeMap, VecDeque}, | |||
| mem, | |||
| net::Ipv4Addr, | |||
| sync::Arc, | |||
| task::Poll, | |||
| }; | |||
| @@ -39,8 +39,7 @@ pub async fn spawn_listener_loop( | |||
| ) -> eyre::Result<DaemonCommunication> { | |||
| match config { | |||
| LocalCommunicationConfig::Tcp => { | |||
| let localhost = Ipv4Addr::new(127, 0, 0, 1); | |||
| let socket = match TcpListener::bind((localhost, 0)).await { | |||
| let socket = match TcpListener::bind((LOCALHOST, 0)).await { | |||
| Ok(socket) => socket, | |||
| Err(err) => { | |||
| return Err( | |||
| @@ -350,6 +349,12 @@ impl Listener { | |||
| .await | |||
| .wrap_err("failed to send register reply")?; | |||
| } | |||
| DaemonRequest::NodeConfig { .. } => { | |||
| let reply = DaemonReply::Result(Err("unexpected node config message".into())); | |||
| self.send_reply(reply, connection) | |||
| .await | |||
| .wrap_err("failed to send register reply")?; | |||
| } | |||
| DaemonRequest::OutputsDone => { | |||
| let (reply_sender, reply) = oneshot::channel(); | |||
| self.process_daemon_event( | |||
| @@ -1,15 +1,15 @@ | |||
| use crate::{ | |||
| log, node_communication::spawn_listener_loop, node_inputs, runtime_node_inputs, | |||
| runtime_node_outputs, DoraEvent, Event, NodeExitStatus, OutputId, | |||
| log, node_communication::spawn_listener_loop, node_inputs, DoraEvent, Event, NodeExitStatus, | |||
| OutputId, RunningNode, | |||
| }; | |||
| use aligned_vec::{AVec, ConstAlign}; | |||
| use dora_arrow_convert::IntoArrow; | |||
| use dora_core::{ | |||
| config::{DataId, NodeRunConfig}, | |||
| config::DataId, | |||
| daemon_messages::{DataMessage, DataflowId, NodeConfig, RuntimeConfig, Timestamped}, | |||
| descriptor::{ | |||
| resolve_path, source_is_url, Descriptor, OperatorDefinition, OperatorSource, PythonSource, | |||
| ResolvedNode, SHELL_SOURCE, | |||
| ResolvedNode, DYNAMIC_SOURCE, SHELL_SOURCE, | |||
| }, | |||
| get_python_path, | |||
| message::uhlc::HLC, | |||
| @@ -42,7 +42,7 @@ pub async fn spawn_node( | |||
| daemon_tx: mpsc::Sender<Timestamped<Event>>, | |||
| dataflow_descriptor: Descriptor, | |||
| clock: Arc<HLC>, | |||
| ) -> eyre::Result<u32> { | |||
| ) -> eyre::Result<RunningNode> { | |||
| let node_id = node.id.clone(); | |||
| tracing::debug!("Spawning node `{dataflow_id}/{node_id}`"); | |||
| @@ -63,9 +63,24 @@ pub async fn spawn_node( | |||
| .send_stdout_as() | |||
| .context("Could not resolve `send_stdout_as` configuration")?; | |||
| let node_config = NodeConfig { | |||
| dataflow_id, | |||
| node_id: node_id.clone(), | |||
| run_config: node.kind.run_config(), | |||
| daemon_communication, | |||
| dataflow_descriptor, | |||
| dynamic: node.kind.dynamic(), | |||
| }; | |||
| let mut child = match node.kind { | |||
| dora_core::descriptor::CoreNodeKind::Custom(n) => { | |||
| let mut command = match n.source.as_str() { | |||
| DYNAMIC_SOURCE => { | |||
| return Ok(RunningNode { | |||
| pid: None, | |||
| node_config, | |||
| }); | |||
| } | |||
| SHELL_SOURCE => { | |||
| if cfg!(target_os = "windows") { | |||
| let mut cmd = tokio::process::Command::new("cmd"); | |||
| @@ -117,17 +132,11 @@ pub async fn spawn_node( | |||
| command.current_dir(working_dir); | |||
| command.stdin(Stdio::null()); | |||
| let node_config = NodeConfig { | |||
| dataflow_id, | |||
| node_id: node_id.clone(), | |||
| run_config: n.run_config.clone(), | |||
| daemon_communication, | |||
| dataflow_descriptor, | |||
| }; | |||
| command.env( | |||
| "DORA_NODE_CONFIG", | |||
| serde_yaml::to_string(&node_config).wrap_err("failed to serialize node config")?, | |||
| serde_yaml::to_string(&node_config.clone()) | |||
| .wrap_err("failed to serialize node config")?, | |||
| ); | |||
| // Injecting the env variable defined in the `yaml` into | |||
| // the node runtime. | |||
| @@ -223,16 +232,7 @@ pub async fn spawn_node( | |||
| command.current_dir(working_dir); | |||
| let runtime_config = RuntimeConfig { | |||
| node: NodeConfig { | |||
| dataflow_id, | |||
| node_id: node_id.clone(), | |||
| run_config: NodeRunConfig { | |||
| inputs: runtime_node_inputs(&n), | |||
| outputs: runtime_node_outputs(&n), | |||
| }, | |||
| daemon_communication, | |||
| dataflow_descriptor, | |||
| }, | |||
| node: node_config.clone(), | |||
| operators: n.operators, | |||
| }; | |||
| command.env( | |||
| @@ -270,7 +270,13 @@ pub async fn spawn_node( | |||
| .expect("Failed to create log file"); | |||
| let mut child_stdout = | |||
| tokio::io::BufReader::new(child.stdout.take().expect("failed to take stdout")); | |||
| let pid = child.id().unwrap(); | |||
| let pid = child.id().context( | |||
| "Could not get the pid for the just spawned node and indicate that there is an error", | |||
| )?; | |||
| let running_node = RunningNode { | |||
| pid: Some(pid), | |||
| node_config, | |||
| }; | |||
| let stdout_tx = tx.clone(); | |||
| // Stdout listener stream | |||
| @@ -454,5 +460,5 @@ pub async fn spawn_node( | |||
| .send(()) | |||
| .map_err(|_| error!("Could not inform that log file thread finished")); | |||
| }); | |||
| Ok(pid) | |||
| Ok(running_node) | |||
| } | |||
| @@ -52,8 +52,8 @@ async fn main() -> eyre::Result<()> { | |||
| ) | |||
| .await?; | |||
| let coordinator_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), coordinator_port); | |||
| let daemon_a = run_daemon(coordinator_addr.to_string(), "A"); | |||
| let daemon_b = run_daemon(coordinator_addr.to_string(), "B"); | |||
| let daemon_a = run_daemon(coordinator_addr.to_string(), "A", 9843); // Random port | |||
| let daemon_b = run_daemon(coordinator_addr.to_string(), "B", 9842); | |||
| tracing::info!("Spawning coordinator and daemons"); | |||
| let mut tasks = JoinSet::new(); | |||
| @@ -211,7 +211,11 @@ async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> { | |||
| Ok(()) | |||
| } | |||
| async fn run_daemon(coordinator: String, machine_id: &str) -> eyre::Result<()> { | |||
| async fn run_daemon( | |||
| coordinator: String, | |||
| machine_id: &str, | |||
| local_listen_port: u16, | |||
| ) -> eyre::Result<()> { | |||
| let cargo = std::env::var("CARGO").unwrap(); | |||
| let mut cmd = tokio::process::Command::new(&cargo); | |||
| cmd.arg("run"); | |||
| @@ -221,7 +225,9 @@ async fn run_daemon(coordinator: String, machine_id: &str) -> eyre::Result<()> { | |||
| .arg("--machine-id") | |||
| .arg(machine_id) | |||
| .arg("--coordinator-addr") | |||
| .arg(coordinator); | |||
| .arg(coordinator) | |||
| .arg("--local-listen-port") | |||
| .arg(local_listen_port.to_string()); | |||
| if !cmd.status().await?.success() { | |||
| bail!("failed to run dataflow"); | |||
| }; | |||
| @@ -0,0 +1,16 @@ | |||
| nodes: | |||
| - id: webcam | |||
| custom: | |||
| source: ./webcam.py | |||
| inputs: | |||
| tick: | |||
| source: dora/timer/millis/50 | |||
| queue_size: 1000 | |||
| outputs: | |||
| - image | |||
| - id: plot | |||
| custom: | |||
| source: dynamic | |||
| inputs: | |||
| image: webcam/image | |||
| @@ -0,0 +1,97 @@ | |||
| #!/usr/bin/env python3 | |||
| # -*- coding: utf-8 -*- | |||
| import os | |||
| from dora import Node | |||
| from dora import DoraStatus | |||
| import cv2 | |||
| import numpy as np | |||
| from utils import LABELS | |||
| CI = os.environ.get("CI") | |||
| font = cv2.FONT_HERSHEY_SIMPLEX | |||
| class Plotter: | |||
| """ | |||
| Plot image and bounding box | |||
| """ | |||
| def __init__(self): | |||
| self.image = [] | |||
| self.bboxs = [] | |||
| def on_input( | |||
| self, | |||
| dora_input, | |||
| ) -> DoraStatus: | |||
| """ | |||
| Put image and bounding box on cv2 window. | |||
| Args: | |||
| dora_input["id"] (str): Id of the dora_input declared in the yaml configuration | |||
| dora_input["value"] (arrow array): message of the dora_input | |||
| """ | |||
| if dora_input["id"] == "image": | |||
| frame = dora_input["value"].to_numpy() | |||
| frame = cv2.imdecode(frame, -1) | |||
| self.image = frame | |||
| elif dora_input["id"] == "bbox" and len(self.image) != 0: | |||
| bboxs = dora_input["value"].to_numpy() | |||
| self.bboxs = np.reshape(bboxs, (-1, 6)) | |||
| for bbox in self.bboxs: | |||
| [ | |||
| min_x, | |||
| min_y, | |||
| max_x, | |||
| max_y, | |||
| confidence, | |||
| label, | |||
| ] = bbox | |||
| cv2.rectangle( | |||
| self.image, | |||
| (int(min_x), int(min_y)), | |||
| (int(max_x), int(max_y)), | |||
| (0, 255, 0), | |||
| 2, | |||
| ) | |||
| cv2.putText( | |||
| self.image, | |||
| LABELS[int(label)] + f", {confidence:0.2f}", | |||
| (int(max_x), int(max_y)), | |||
| font, | |||
| 0.75, | |||
| (0, 255, 0), | |||
| 2, | |||
| 1, | |||
| ) | |||
| if CI != "true": | |||
| cv2.imshow("frame", self.image) | |||
| if cv2.waitKey(1) & 0xFF == ord("q"): | |||
| return DoraStatus.STOP | |||
| return DoraStatus.CONTINUE | |||
| plotter = Plotter() | |||
| node = Node("plot") | |||
| for event in node: | |||
| event_type = event["type"] | |||
| if event_type == "INPUT": | |||
| status = plotter.on_input(event) | |||
| if status == DoraStatus.CONTINUE: | |||
| pass | |||
| elif status == DoraStatus.STOP: | |||
| print("plotter returned stop status") | |||
| break | |||
| elif event_type == "STOP": | |||
| print("received stop") | |||
| else: | |||
| print("received unexpected event:", event_type) | |||
| @@ -0,0 +1,31 @@ | |||
| nodes: | |||
| - id: rust-node | |||
| custom: | |||
| build: cargo build -p rust-dataflow-example-node | |||
| source: ../../target/debug/rust-dataflow-example-node | |||
| inputs: | |||
| tick: dora/timer/millis/100 | |||
| outputs: | |||
| - random | |||
| - id: rust-status-node | |||
| custom: | |||
| build: cargo build -p rust-dataflow-example-status-node | |||
| source: ../../target/debug/rust-dataflow-example-status-node | |||
| inputs: | |||
| tick: dora/timer/millis/100 | |||
| random: rust-node/random | |||
| outputs: | |||
| - status | |||
| - id: rust-sink-dynamic | |||
| custom: | |||
| build: cargo build -p rust-dataflow-example-sink-dynamic | |||
| source: dynamic | |||
| inputs: | |||
| message: rust-status-node/status | |||
| - id: dora-record | |||
| custom: | |||
| build: cargo build -p dora-record | |||
| source: ../../target/debug/dora-record | |||
| inputs: | |||
| message: rust-status-node/status | |||
| random: rust-node/random | |||
| @@ -0,0 +1,10 @@ | |||
| [package] | |||
| name = "rust-dataflow-example-sink-dynamic" | |||
| version.workspace = true | |||
| edition = "2021" | |||
| # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | |||
| [dependencies] | |||
| dora-node-api = { workspace = true, features = ["tracing"] } | |||
| eyre = "0.6.8" | |||
| @@ -0,0 +1,39 @@ | |||
| use dora_node_api::{self, dora_core::config::NodeId, DoraNode, Event}; | |||
| use eyre::{bail, Context}; | |||
| fn main() -> eyre::Result<()> { | |||
| let (_node, mut events) = | |||
| DoraNode::init_from_node_id(NodeId::from("rust-sink-dynamic".to_string()))?; | |||
| while let Some(event) = events.recv() { | |||
| match event { | |||
| Event::Input { | |||
| id, | |||
| metadata: _, | |||
| data, | |||
| } => match id.as_str() { | |||
| "message" => { | |||
| let received_string: &str = | |||
| TryFrom::try_from(&data).context("expected string message")?; | |||
| println!("sink received message: {}", received_string); | |||
| if !received_string.starts_with("operator received random value ") { | |||
| bail!("unexpected message format (should start with 'operator received random value')") | |||
| } | |||
| if !received_string.ends_with(" ticks") { | |||
| bail!("unexpected message format (should end with 'ticks')") | |||
| } | |||
| } | |||
| other => eprintln!("Ignoring unexpected input `{other}`"), | |||
| }, | |||
| Event::Stop => { | |||
| println!("Received manual stop"); | |||
| } | |||
| Event::InputClosed { id } => { | |||
| println!("Input `{id}` was closed"); | |||
| } | |||
| other => eprintln!("Received unexpected input: {other:?}"), | |||
| } | |||
| } | |||
| Ok(()) | |||
| } | |||
| @@ -14,13 +14,14 @@ use aligned_vec::{AVec, ConstAlign}; | |||
| use dora_message::{uhlc, Metadata}; | |||
| use uuid::{NoContext, Timestamp, Uuid}; | |||
| #[derive(Debug, serde::Serialize, serde::Deserialize)] | |||
| #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] | |||
| pub struct NodeConfig { | |||
| pub dataflow_id: DataflowId, | |||
| pub node_id: NodeId, | |||
| pub run_config: NodeRunConfig, | |||
| pub daemon_communication: DaemonCommunication, | |||
| pub dataflow_descriptor: Descriptor, | |||
| pub dynamic: bool, | |||
| } | |||
| #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] | |||
| @@ -68,13 +69,18 @@ pub enum DaemonRequest { | |||
| SubscribeDrop, | |||
| NextFinishedDropTokens, | |||
| EventStreamDropped, | |||
| NodeConfig { | |||
| node_id: NodeId, | |||
| }, | |||
| } | |||
| impl DaemonRequest { | |||
| pub fn expects_tcp_reply(&self) -> bool { | |||
| pub fn expects_tcp_bincode_reply(&self) -> bool { | |||
| #[allow(clippy::match_like_matches_macro)] | |||
| match self { | |||
| DaemonRequest::SendMessage { .. } | DaemonRequest::ReportDropTokens { .. } => false, | |||
| DaemonRequest::SendMessage { .. } | |||
| | DaemonRequest::NodeConfig { .. } | |||
| | DaemonRequest::ReportDropTokens { .. } => false, | |||
| DaemonRequest::Register { .. } | |||
| | DaemonRequest::Subscribe | |||
| | DaemonRequest::CloseOutputs(_) | |||
| @@ -85,6 +91,23 @@ impl DaemonRequest { | |||
| | DaemonRequest::EventStreamDropped => true, | |||
| } | |||
| } | |||
| pub fn expects_tcp_json_reply(&self) -> bool { | |||
| #[allow(clippy::match_like_matches_macro)] | |||
| match self { | |||
| DaemonRequest::NodeConfig { .. } => true, | |||
| DaemonRequest::Register { .. } | |||
| | DaemonRequest::Subscribe | |||
| | DaemonRequest::CloseOutputs(_) | |||
| | DaemonRequest::OutputsDone | |||
| | DaemonRequest::NextEvent { .. } | |||
| | DaemonRequest::SubscribeDrop | |||
| | DaemonRequest::NextFinishedDropTokens | |||
| | DaemonRequest::ReportDropTokens { .. } | |||
| | DaemonRequest::SendMessage { .. } | |||
| | DaemonRequest::EventStreamDropped => false, | |||
| } | |||
| } | |||
| } | |||
| #[derive(serde::Serialize, serde::Deserialize, Clone)] | |||
| @@ -136,6 +159,7 @@ pub enum DaemonReply { | |||
| PreparedMessage { shared_memory_id: SharedMemoryId }, | |||
| NextEvents(Vec<Timestamped<NodeEvent>>), | |||
| NextDropEvents(Vec<Timestamped<NodeDropEvent>>), | |||
| NodeConfig { result: Result<NodeConfig, String> }, | |||
| Empty, | |||
| } | |||
| @@ -229,6 +253,11 @@ pub enum DaemonCoordinatorEvent { | |||
| Heartbeat, | |||
| } | |||
| #[derive(Debug, serde::Deserialize, serde::Serialize)] | |||
| pub enum DynamicNodeEvent { | |||
| NodeConfig { node_id: NodeId }, | |||
| } | |||
| #[derive(Debug, serde::Deserialize, serde::Serialize)] | |||
| pub enum InterDaemonEvent { | |||
| Output { | |||
| @@ -16,6 +16,7 @@ pub use visualize::collect_dora_timers; | |||
| mod validate; | |||
| mod visualize; | |||
| pub const SHELL_SOURCE: &str = "shell"; | |||
| pub const DYNAMIC_SOURCE: &str = "dynamic"; | |||
| /// Dataflow description | |||
| #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] | |||
| @@ -335,6 +336,52 @@ pub enum CoreNodeKind { | |||
| Custom(CustomNode), | |||
| } | |||
| pub fn runtime_node_inputs(n: &RuntimeNode) -> BTreeMap<DataId, Input> { | |||
| n.operators | |||
| .iter() | |||
| .flat_map(|operator| { | |||
| operator.config.inputs.iter().map(|(input_id, mapping)| { | |||
| ( | |||
| DataId::from(format!("{}/{input_id}", operator.id)), | |||
| mapping.clone(), | |||
| ) | |||
| }) | |||
| }) | |||
| .collect() | |||
| } | |||
| fn runtime_node_outputs(n: &RuntimeNode) -> BTreeSet<DataId> { | |||
| n.operators | |||
| .iter() | |||
| .flat_map(|operator| { | |||
| operator | |||
| .config | |||
| .outputs | |||
| .iter() | |||
| .map(|output_id| DataId::from(format!("{}/{output_id}", operator.id))) | |||
| }) | |||
| .collect() | |||
| } | |||
| impl CoreNodeKind { | |||
| pub fn run_config(&self) -> NodeRunConfig { | |||
| match self { | |||
| CoreNodeKind::Runtime(n) => NodeRunConfig { | |||
| inputs: runtime_node_inputs(n), | |||
| outputs: runtime_node_outputs(n), | |||
| }, | |||
| CoreNodeKind::Custom(n) => n.run_config.clone(), | |||
| } | |||
| } | |||
| pub fn dynamic(&self) -> bool { | |||
| match self { | |||
| CoreNodeKind::Runtime(_n) => false, | |||
| CoreNodeKind::Custom(n) => n.source == DYNAMIC_SOURCE, | |||
| } | |||
| } | |||
| } | |||
| #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] | |||
| #[serde(transparent)] | |||
| pub struct RuntimeNode { | |||
| @@ -9,7 +9,7 @@ use eyre::{bail, eyre, Context}; | |||
| use std::{path::Path, process::Command}; | |||
| use tracing::info; | |||
| use super::{resolve_path, Descriptor, SHELL_SOURCE}; | |||
| use super::{resolve_path, Descriptor, DYNAMIC_SOURCE, SHELL_SOURCE}; | |||
| const VERSION: &str = env!("CARGO_PKG_VERSION"); | |||
| pub fn check_dataflow( | |||
| @@ -26,6 +26,7 @@ pub fn check_dataflow( | |||
| match &node.kind { | |||
| descriptor::CoreNodeKind::Custom(custom) => match custom.source.as_str() { | |||
| SHELL_SOURCE => (), | |||
| DYNAMIC_SOURCE => (), | |||
| source => { | |||
| if source_is_url(source) { | |||
| info!("{source} is a URL."); // TODO: Implement url check. | |||
| @@ -1,4 +1,10 @@ | |||
| use std::{collections::BTreeSet, fmt::Display, path::PathBuf, time::Duration}; | |||
| use std::{ | |||
| collections::BTreeSet, | |||
| fmt::Display, | |||
| net::{IpAddr, Ipv4Addr}, | |||
| path::PathBuf, | |||
| time::Duration, | |||
| }; | |||
| use uuid::Uuid; | |||
| use crate::{ | |||
| @@ -6,7 +12,9 @@ use crate::{ | |||
| descriptor::Descriptor, | |||
| }; | |||
| pub const LOCALHOST: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); | |||
| pub const DORA_COORDINATOR_PORT_DEFAULT: u16 = 0xD02A; | |||
| pub const DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT: u16 = 0xD02B; | |||
| pub const DORA_COORDINATOR_PORT_CONTROL_DEFAULT: u16 = 0x177C; | |||
| pub const MANUAL_STOP: &str = "dora/stop"; | |||
| @@ -0,0 +1,56 @@ | |||
| # dora-record | |||
| dora data recording using Apache Arrow Parquet. | |||
| This nodes is still experimental. | |||
| ## Getting Started | |||
| ```bash | |||
| cargo install dora-record --locked | |||
| ``` | |||
| ## Adding to existing graph: | |||
| ```yaml | |||
| - id: dora-record | |||
| custom: | |||
| source: dora-record | |||
| inputs: | |||
| image: webcam/image | |||
| text: webcam/text | |||
| # You can add any input and it is going to be logged. | |||
| ``` | |||
| ## Output Files | |||
| Format: Parquet file | |||
| path: `out/<DATAFLOW_ID>/<INPUT>.parquet` | |||
| Columns: | |||
| - trace_id: String, representing the id of the current trace | |||
| - span_id: String, representing the unique span id | |||
| - timestamp_uhlc: u64, representing the timestamp in [Unique Hybrid Logical Clock time](https://github.com/atolab/uhlc-rs) | |||
| - timestamp_utc: DataType::Timestamp(Milliseconds), representing the timestamp in Coordinated Universal Time. | |||
| - `<INPUT>` : Column containing the input in its defined format. | |||
| Example: | |||
| ```json | |||
| { | |||
| "trace_id": "2fd23ddf1b5d2aa38ddb86ceedb55928", | |||
| "span_id": "15aef03e0f052bbf", | |||
| "timestamp_uhlc": "7368873278370007008", | |||
| "timestamp_utc": 1715699508406, | |||
| "random": [1886295351360621740] | |||
| } | |||
| ``` | |||
| ## merging multiple file | |||
| We can merge input files using the `trace_id` that is going to be shared when using opentelemetry features. | |||
| - `trace_id` can also be queried from UI such as jaeger UI, influxDB and so on... | |||
| - `trace_id` keep tracks of the logical flow of data, compared to timestamp based merging that might not reflect the actual logical flow of data. | |||