diff --git a/Cargo.lock b/Cargo.lock index e5c95481..aa91ce42 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1539,6 +1539,7 @@ dependencies = [ "opentelemetry 0.18.0", "opentelemetry-system-metrics", "pyo3", + "pythonize", "serde_yaml 0.8.26", "tokio", "tokio-stream", diff --git a/binaries/runtime/Cargo.toml b/binaries/runtime/Cargo.toml index bdc1976a..431fb2df 100644 --- a/binaries/runtime/Cargo.toml +++ b/binaries/runtime/Cargo.toml @@ -35,6 +35,7 @@ dora-download = { workspace = true } flume = "0.10.14" clap = { version = "4.0.3", features = ["derive"] } tracing-opentelemetry = { version = "0.18.0", optional = true } +pythonize = "0.18.0" [features] default = ["tracing"] diff --git a/binaries/runtime/src/lib.rs b/binaries/runtime/src/lib.rs index 2419c663..bb68a5c9 100644 --- a/binaries/runtime/src/lib.rs +++ b/binaries/runtime/src/lib.rs @@ -38,6 +38,8 @@ pub fn main() -> eyre::Result<()> { #[cfg(feature = "tracing")] set_up_tracing(&node_id.to_string()).context("failed to set up tracing subscriber")?; + let dataflow_descriptor = config.dataflow_descriptor.clone(); + let operator_definition = if operators.is_empty() { bail!("no operators"); } else if operators.len() > 1 { @@ -90,6 +92,7 @@ pub fn main() -> eyre::Result<()> { incoming_events, operator_events_tx, init_done_tx, + &dataflow_descriptor, ) .wrap_err_with(|| format!("failed to run operator {operator_id}"))?; diff --git a/binaries/runtime/src/operator/mod.rs b/binaries/runtime/src/operator/mod.rs index 12cd65bd..8f6725fd 100644 --- a/binaries/runtime/src/operator/mod.rs +++ b/binaries/runtime/src/operator/mod.rs @@ -1,6 +1,6 @@ use dora_core::{ config::{DataId, NodeId}, - descriptor::{OperatorDefinition, OperatorSource}, + descriptor::{Descriptor, OperatorDefinition, OperatorSource}, message::MetadataParameters, }; use dora_node_api::{DataSample, Event}; @@ -19,6 +19,7 @@ pub fn run_operator( incoming_events: flume::Receiver, events_tx: Sender, init_done: oneshot::Sender>, + dataflow_descriptor: &Descriptor, ) -> eyre::Result<()> { match &operator_definition.config.source { OperatorSource::SharedLibrary(source) => { @@ -47,6 +48,7 @@ pub fn run_operator( events_tx, incoming_events, init_done, + dataflow_descriptor, ) .wrap_err_with(|| { format!( diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index 3414c959..52ae4267 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -3,7 +3,7 @@ use super::{OperatorEvent, StopReason}; use dora_core::{ config::{NodeId, OperatorId}, - descriptor::source_is_url, + descriptor::{source_is_url, Descriptor}, }; use dora_download::download_file; use dora_node_api::Event; @@ -39,6 +39,7 @@ pub fn run( events_tx: Sender, incoming_events: flume::Receiver, init_done: oneshot::Sender>, + dataflow_descriptor: &Descriptor, ) -> eyre::Result<()> { let path = if source_is_url(source) { let target_path = Path::new("build") @@ -98,6 +99,11 @@ pub fn run( let operator = py .eval("Operator()", None, Some(locals)) .map_err(traceback)?; + operator.setattr( + "dataflow_descriptor", + pythonize::pythonize(py, dataflow_descriptor)?, + )?; + Result::<_, eyre::Report>::Ok(Py::from(operator)) };