Browse Source

Make runtime pass dataflow descriptor to Python operators

tags/v0.2.4-rc
Philipp Oppermann 2 years ago
parent
commit
049e5e1e43
Failed to extract signature
5 changed files with 15 additions and 2 deletions
  1. +1
    -0
      Cargo.lock
  2. +1
    -0
      binaries/runtime/Cargo.toml
  3. +3
    -0
      binaries/runtime/src/lib.rs
  4. +3
    -1
      binaries/runtime/src/operator/mod.rs
  5. +7
    -1
      binaries/runtime/src/operator/python.rs

+ 1
- 0
Cargo.lock View File

@@ -1539,6 +1539,7 @@ dependencies = [
"opentelemetry 0.18.0",
"opentelemetry-system-metrics",
"pyo3",
"pythonize",
"serde_yaml 0.8.26",
"tokio",
"tokio-stream",


+ 1
- 0
binaries/runtime/Cargo.toml View File

@@ -35,6 +35,7 @@ dora-download = { workspace = true }
flume = "0.10.14"
clap = { version = "4.0.3", features = ["derive"] }
tracing-opentelemetry = { version = "0.18.0", optional = true }
pythonize = "0.18.0"

[features]
default = ["tracing"]


+ 3
- 0
binaries/runtime/src/lib.rs View File

@@ -38,6 +38,8 @@ pub fn main() -> eyre::Result<()> {
#[cfg(feature = "tracing")]
set_up_tracing(&node_id.to_string()).context("failed to set up tracing subscriber")?;

let dataflow_descriptor = config.dataflow_descriptor.clone();

let operator_definition = if operators.is_empty() {
bail!("no operators");
} else if operators.len() > 1 {
@@ -90,6 +92,7 @@ pub fn main() -> eyre::Result<()> {
incoming_events,
operator_events_tx,
init_done_tx,
&dataflow_descriptor,
)
.wrap_err_with(|| format!("failed to run operator {operator_id}"))?;



+ 3
- 1
binaries/runtime/src/operator/mod.rs View File

@@ -1,6 +1,6 @@
use dora_core::{
config::{DataId, NodeId},
descriptor::{OperatorDefinition, OperatorSource},
descriptor::{Descriptor, OperatorDefinition, OperatorSource},
message::MetadataParameters,
};
use dora_node_api::{DataSample, Event};
@@ -19,6 +19,7 @@ pub fn run_operator(
incoming_events: flume::Receiver<Event>,
events_tx: Sender<OperatorEvent>,
init_done: oneshot::Sender<Result<()>>,
dataflow_descriptor: &Descriptor,
) -> eyre::Result<()> {
match &operator_definition.config.source {
OperatorSource::SharedLibrary(source) => {
@@ -47,6 +48,7 @@ pub fn run_operator(
events_tx,
incoming_events,
init_done,
dataflow_descriptor,
)
.wrap_err_with(|| {
format!(


+ 7
- 1
binaries/runtime/src/operator/python.rs View File

@@ -3,7 +3,7 @@
use super::{OperatorEvent, StopReason};
use dora_core::{
config::{NodeId, OperatorId},
descriptor::source_is_url,
descriptor::{source_is_url, Descriptor},
};
use dora_download::download_file;
use dora_node_api::Event;
@@ -39,6 +39,7 @@ pub fn run(
events_tx: Sender<OperatorEvent>,
incoming_events: flume::Receiver<Event>,
init_done: oneshot::Sender<Result<()>>,
dataflow_descriptor: &Descriptor,
) -> eyre::Result<()> {
let path = if source_is_url(source) {
let target_path = Path::new("build")
@@ -98,6 +99,11 @@ pub fn run(
let operator = py
.eval("Operator()", None, Some(locals))
.map_err(traceback)?;
operator.setattr(
"dataflow_descriptor",
pythonize::pythonize(py, dataflow_descriptor)?,
)?;

Result::<_, eyre::Report>::Ok(Py::from(operator))
};



Loading…
Cancel
Save