diff --git a/Cargo.lock b/Cargo.lock index 109ab186..aa91ce42 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1463,6 +1463,7 @@ dependencies = [ "eyre", "flume", "pyo3", + "pythonize", "serde_yaml 0.8.26", ] @@ -1538,6 +1539,7 @@ dependencies = [ "opentelemetry 0.18.0", "opentelemetry-system-metrics", "pyo3", + "pythonize", "serde_yaml 0.8.26", "tokio", "tokio-stream", @@ -3694,6 +3696,16 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "pythonize" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a0e1bbcd2a3856284bf4f4ef09ccb1157e9467847792754556f153ea3fe6b42" +dependencies = [ + "pyo3", + "serde", +] + [[package]] name = "quick-xml" version = "0.28.2" diff --git a/apis/python/node/Cargo.toml b/apis/python/node/Cargo.toml index 8e92a8c9..ad676943 100644 --- a/apis/python/node/Cargo.toml +++ b/apis/python/node/Cargo.toml @@ -22,6 +22,7 @@ serde_yaml = "0.8.23" flume = "0.10.14" dora-runtime = { workspace = true, features = ["tracing", "python"] } arrow = { version = "35.0.0", features = ["pyarrow"] } +pythonize = "0.18.0" [lib] name = "dora" diff --git a/apis/python/node/src/lib.rs b/apis/python/node/src/lib.rs index 32c7012a..af77bf08 100644 --- a/apis/python/node/src/lib.rs +++ b/apis/python/node/src/lib.rs @@ -83,6 +83,13 @@ impl Node { self.send_output_slice(output_id, data.len(), data, metadata) }) } + + /// Returns the full dataflow descriptor that this node is part of. + /// + /// This method returns the parsed dataflow YAML file. + pub fn dataflow_descriptor(&self, py: Python) -> pythonize::Result { + pythonize::pythonize(py, self.node.dataflow_descriptor()) + } } impl Node { diff --git a/apis/rust/node/src/node/mod.rs b/apis/rust/node/src/node/mod.rs index b677a72c..5e637489 100644 --- a/apis/rust/node/src/node/mod.rs +++ b/apis/rust/node/src/node/mod.rs @@ -4,6 +4,7 @@ use self::{control_channel::ControlChannel, drop_stream::DropStream}; use dora_core::{ config::{DataId, NodeId, NodeRunConfig}, daemon_messages::{Data, DropToken, NodeConfig}, + descriptor::Descriptor, message::{uhlc, Metadata, MetadataParameters}, }; use eyre::{bail, WrapErr}; @@ -31,6 +32,8 @@ pub struct DoraNode { sent_out_shared_memory: HashMap, drop_stream: DropStream, cache: VecDeque, + + dataflow_descriptor: Descriptor, } impl DoraNode { @@ -61,6 +64,7 @@ impl DoraNode { node_id, run_config, daemon_communication, + dataflow_descriptor, } = node_config; let event_stream = EventStream::init(dataflow_id, &node_id, &daemon_communication) @@ -78,6 +82,8 @@ impl DoraNode { sent_out_shared_memory: HashMap::new(), drop_stream, cache: VecDeque::new(), + + dataflow_descriptor, }; Ok((node, event_stream)) } @@ -239,6 +245,13 @@ impl DoraNode { self.cache.pop_front(); } } + + /// Returns the full dataflow descriptor that this node is part of. + /// + /// This method returns the parsed dataflow YAML file. + pub fn dataflow_descriptor(&self) -> &Descriptor { + &self.dataflow_descriptor + } } impl Drop for DoraNode { diff --git a/binaries/coordinator/src/run/mod.rs b/binaries/coordinator/src/run/mod.rs index 4e2843a1..9e752a5f 100644 --- a/binaries/coordinator/src/run/mod.rs +++ b/binaries/coordinator/src/run/mod.rs @@ -40,8 +40,8 @@ pub(super) async fn spawn_dataflow( dataflow_id: uuid, working_dir, nodes: nodes.clone(), - communication: dataflow.communication, machine_listen_ports, + dataflow_descriptor: dataflow, }; let message = serde_json::to_vec(&DaemonCoordinatorEvent::Spawn(spawn_command))?; diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index e8c28361..3b9f2ce1 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -1,5 +1,5 @@ use coordinator::CoordinatorEvent; -use dora_core::config::{Input, LocalCommunicationConfig, OperatorId}; +use dora_core::config::{Input, OperatorId}; use dora_core::coordinator_messages::CoordinatorRequest; use dora_core::daemon_messages::{Data, InterDaemonEvent}; use dora_core::message::uhlc::HLC; @@ -116,8 +116,8 @@ impl Daemon { dataflow_id: Uuid::new_v4(), working_dir, nodes, - communication: descriptor.communication, machine_listen_ports: BTreeMap::new(), + dataflow_descriptor: descriptor, }; let exit_when_done = spawn_command @@ -271,10 +271,10 @@ impl Daemon { dataflow_id, working_dir, nodes, - communication, machine_listen_ports, + dataflow_descriptor, }) => { - match communication.remote { + match dataflow_descriptor.communication.remote { dora_core::config::RemoteCommunicationConfig::Tcp => {} } for (machine_id, socket) in machine_listen_ports { @@ -291,7 +291,7 @@ impl Daemon { } let result = self - .spawn_dataflow(dataflow_id, working_dir, nodes, communication.local) + .spawn_dataflow(dataflow_id, working_dir, nodes, dataflow_descriptor) .await; if let Err(err) = &result { tracing::error!("{err:?}"); @@ -466,7 +466,7 @@ impl Daemon { dataflow_id: uuid::Uuid, working_dir: PathBuf, nodes: Vec, - daemon_communication_config: LocalCommunicationConfig, + dataflow_descriptor: Descriptor, ) -> eyre::Result<()> { let dataflow = RunningDataflow::new(dataflow_id, self.machine_id.clone()); let dataflow = match self.running.entry(dataflow_id) { @@ -522,7 +522,7 @@ impl Daemon { &working_dir, node, self.events_tx.clone(), - daemon_communication_config, + dataflow_descriptor.clone(), ) .await .wrap_err_with(|| format!("failed to spawn node `{node_id}`")) diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index 174e47f9..85e0638a 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -3,9 +3,11 @@ use crate::{ runtime_node_outputs, DoraEvent, Event, NodeExitStatus, }; use dora_core::{ - config::{LocalCommunicationConfig, NodeRunConfig}, + config::NodeRunConfig, daemon_messages::{DataflowId, NodeConfig, RuntimeConfig}, - descriptor::{resolve_path, source_is_url, OperatorSource, ResolvedNode, SHELL_SOURCE}, + descriptor::{ + resolve_path, source_is_url, Descriptor, OperatorSource, ResolvedNode, SHELL_SOURCE, + }, }; use dora_download::download_file; use eyre::WrapErr; @@ -26,7 +28,7 @@ pub async fn spawn_node( working_dir: &Path, node: ResolvedNode, daemon_tx: mpsc::Sender, - config: LocalCommunicationConfig, + dataflow_descriptor: Descriptor, ) -> eyre::Result<()> { let node_id = node.id.clone(); tracing::debug!("Spawning node `{dataflow_id}/{node_id}`"); @@ -35,8 +37,14 @@ pub async fn spawn_node( .into_iter() .map(|(k, v)| (k, v.queue_size.unwrap_or(10))) .collect(); - let daemon_communication = - spawn_listener_loop(&dataflow_id, &node_id, &daemon_tx, config, queue_sizes).await?; + let daemon_communication = spawn_listener_loop( + &dataflow_id, + &node_id, + &daemon_tx, + dataflow_descriptor.communication.local, + queue_sizes, + ) + .await?; let mut child = match node.kind { dora_core::descriptor::CoreNodeKind::Custom(n) => { @@ -84,6 +92,7 @@ pub async fn spawn_node( node_id: node_id.clone(), run_config: n.run_config.clone(), daemon_communication, + dataflow_descriptor, }; command.env( @@ -149,6 +158,7 @@ pub async fn spawn_node( outputs: runtime_node_outputs(&n), }, daemon_communication, + dataflow_descriptor, }, operators: n.operators, }; diff --git a/binaries/runtime/Cargo.toml b/binaries/runtime/Cargo.toml index bdc1976a..431fb2df 100644 --- a/binaries/runtime/Cargo.toml +++ b/binaries/runtime/Cargo.toml @@ -35,6 +35,7 @@ dora-download = { workspace = true } flume = "0.10.14" clap = { version = "4.0.3", features = ["derive"] } tracing-opentelemetry = { version = "0.18.0", optional = true } +pythonize = "0.18.0" [features] default = ["tracing"] diff --git a/binaries/runtime/src/lib.rs b/binaries/runtime/src/lib.rs index 2419c663..bb68a5c9 100644 --- a/binaries/runtime/src/lib.rs +++ b/binaries/runtime/src/lib.rs @@ -38,6 +38,8 @@ pub fn main() -> eyre::Result<()> { #[cfg(feature = "tracing")] set_up_tracing(&node_id.to_string()).context("failed to set up tracing subscriber")?; + let dataflow_descriptor = config.dataflow_descriptor.clone(); + let operator_definition = if operators.is_empty() { bail!("no operators"); } else if operators.len() > 1 { @@ -90,6 +92,7 @@ pub fn main() -> eyre::Result<()> { incoming_events, operator_events_tx, init_done_tx, + &dataflow_descriptor, ) .wrap_err_with(|| format!("failed to run operator {operator_id}"))?; diff --git a/binaries/runtime/src/operator/mod.rs b/binaries/runtime/src/operator/mod.rs index 12cd65bd..8f6725fd 100644 --- a/binaries/runtime/src/operator/mod.rs +++ b/binaries/runtime/src/operator/mod.rs @@ -1,6 +1,6 @@ use dora_core::{ config::{DataId, NodeId}, - descriptor::{OperatorDefinition, OperatorSource}, + descriptor::{Descriptor, OperatorDefinition, OperatorSource}, message::MetadataParameters, }; use dora_node_api::{DataSample, Event}; @@ -19,6 +19,7 @@ pub fn run_operator( incoming_events: flume::Receiver, events_tx: Sender, init_done: oneshot::Sender>, + dataflow_descriptor: &Descriptor, ) -> eyre::Result<()> { match &operator_definition.config.source { OperatorSource::SharedLibrary(source) => { @@ -47,6 +48,7 @@ pub fn run_operator( events_tx, incoming_events, init_done, + dataflow_descriptor, ) .wrap_err_with(|| { format!( diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index 3414c959..52ae4267 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -3,7 +3,7 @@ use super::{OperatorEvent, StopReason}; use dora_core::{ config::{NodeId, OperatorId}, - descriptor::source_is_url, + descriptor::{source_is_url, Descriptor}, }; use dora_download::download_file; use dora_node_api::Event; @@ -39,6 +39,7 @@ pub fn run( events_tx: Sender, incoming_events: flume::Receiver, init_done: oneshot::Sender>, + dataflow_descriptor: &Descriptor, ) -> eyre::Result<()> { let path = if source_is_url(source) { let target_path = Path::new("build") @@ -98,6 +99,11 @@ pub fn run( let operator = py .eval("Operator()", None, Some(locals)) .map_err(traceback)?; + operator.setattr( + "dataflow_descriptor", + pythonize::pythonize(py, dataflow_descriptor)?, + )?; + Result::<_, eyre::Report>::Ok(Py::from(operator)) }; diff --git a/libraries/core/src/daemon_messages.rs b/libraries/core/src/daemon_messages.rs index 5929fb6a..af955f48 100644 --- a/libraries/core/src/daemon_messages.rs +++ b/libraries/core/src/daemon_messages.rs @@ -6,8 +6,8 @@ use std::{ }; use crate::{ - config::{CommunicationConfig, DataId, NodeId, NodeRunConfig, OperatorId}, - descriptor::{OperatorDefinition, ResolvedNode}, + config::{DataId, NodeId, NodeRunConfig, OperatorId}, + descriptor::{Descriptor, OperatorDefinition, ResolvedNode}, }; use dora_message::Metadata; use uuid::Uuid; @@ -18,6 +18,7 @@ pub struct NodeConfig { pub node_id: NodeId, pub run_config: NodeRunConfig, pub daemon_communication: DaemonCommunication, + pub dataflow_descriptor: Descriptor, } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] @@ -250,6 +251,6 @@ pub struct SpawnDataflowNodes { pub dataflow_id: DataflowId, pub working_dir: PathBuf, pub nodes: Vec, - pub communication: CommunicationConfig, pub machine_listen_ports: BTreeMap, + pub dataflow_descriptor: Descriptor, }