Browse Source

Merge pull request #301 from dora-rs/pass-dataflow-to-nodes

Make dataflow descriptor available to Python nodes and operators
tags/v0.2.4-rc
Haixuan Xavier Tao GitHub 2 years ago
parent
commit
2e9108e894
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 74 additions and 18 deletions
  1. +12
    -0
      Cargo.lock
  2. +1
    -0
      apis/python/node/Cargo.toml
  3. +7
    -0
      apis/python/node/src/lib.rs
  4. +13
    -0
      apis/rust/node/src/node/mod.rs
  5. +1
    -1
      binaries/coordinator/src/run/mod.rs
  6. +7
    -7
      binaries/daemon/src/lib.rs
  7. +15
    -5
      binaries/daemon/src/spawn.rs
  8. +1
    -0
      binaries/runtime/Cargo.toml
  9. +3
    -0
      binaries/runtime/src/lib.rs
  10. +3
    -1
      binaries/runtime/src/operator/mod.rs
  11. +7
    -1
      binaries/runtime/src/operator/python.rs
  12. +4
    -3
      libraries/core/src/daemon_messages.rs

+ 12
- 0
Cargo.lock View File

@@ -1463,6 +1463,7 @@ dependencies = [
"eyre",
"flume",
"pyo3",
"pythonize",
"serde_yaml 0.8.26",
]

@@ -1538,6 +1539,7 @@ dependencies = [
"opentelemetry 0.18.0",
"opentelemetry-system-metrics",
"pyo3",
"pythonize",
"serde_yaml 0.8.26",
"tokio",
"tokio-stream",
@@ -3694,6 +3696,16 @@ dependencies = [
"syn 1.0.109",
]

[[package]]
name = "pythonize"
version = "0.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a0e1bbcd2a3856284bf4f4ef09ccb1157e9467847792754556f153ea3fe6b42"
dependencies = [
"pyo3",
"serde",
]

[[package]]
name = "quick-xml"
version = "0.28.2"


+ 1
- 0
apis/python/node/Cargo.toml View File

@@ -22,6 +22,7 @@ serde_yaml = "0.8.23"
flume = "0.10.14"
dora-runtime = { workspace = true, features = ["tracing", "python"] }
arrow = { version = "35.0.0", features = ["pyarrow"] }
pythonize = "0.18.0"

[lib]
name = "dora"


+ 7
- 0
apis/python/node/src/lib.rs View File

@@ -83,6 +83,13 @@ impl Node {
self.send_output_slice(output_id, data.len(), data, metadata)
})
}

/// Returns the full dataflow descriptor that this node is part of.
///
/// This method returns the parsed dataflow YAML file.
pub fn dataflow_descriptor(&self, py: Python) -> pythonize::Result<PyObject> {
pythonize::pythonize(py, self.node.dataflow_descriptor())
}
}

impl Node {


+ 13
- 0
apis/rust/node/src/node/mod.rs View File

@@ -4,6 +4,7 @@ use self::{control_channel::ControlChannel, drop_stream::DropStream};
use dora_core::{
config::{DataId, NodeId, NodeRunConfig},
daemon_messages::{Data, DropToken, NodeConfig},
descriptor::Descriptor,
message::{uhlc, Metadata, MetadataParameters},
};
use eyre::{bail, WrapErr};
@@ -31,6 +32,8 @@ pub struct DoraNode {
sent_out_shared_memory: HashMap<DropToken, ShmemHandle>,
drop_stream: DropStream,
cache: VecDeque<ShmemHandle>,

dataflow_descriptor: Descriptor,
}

impl DoraNode {
@@ -61,6 +64,7 @@ impl DoraNode {
node_id,
run_config,
daemon_communication,
dataflow_descriptor,
} = node_config;

let event_stream = EventStream::init(dataflow_id, &node_id, &daemon_communication)
@@ -78,6 +82,8 @@ impl DoraNode {
sent_out_shared_memory: HashMap::new(),
drop_stream,
cache: VecDeque::new(),

dataflow_descriptor,
};
Ok((node, event_stream))
}
@@ -239,6 +245,13 @@ impl DoraNode {
self.cache.pop_front();
}
}

/// Returns the full dataflow descriptor that this node is part of.
///
/// This method returns the parsed dataflow YAML file.
pub fn dataflow_descriptor(&self) -> &Descriptor {
&self.dataflow_descriptor
}
}

impl Drop for DoraNode {


+ 1
- 1
binaries/coordinator/src/run/mod.rs View File

@@ -40,8 +40,8 @@ pub(super) async fn spawn_dataflow(
dataflow_id: uuid,
working_dir,
nodes: nodes.clone(),
communication: dataflow.communication,
machine_listen_ports,
dataflow_descriptor: dataflow,
};
let message = serde_json::to_vec(&DaemonCoordinatorEvent::Spawn(spawn_command))?;



+ 7
- 7
binaries/daemon/src/lib.rs View File

@@ -1,5 +1,5 @@
use coordinator::CoordinatorEvent;
use dora_core::config::{Input, LocalCommunicationConfig, OperatorId};
use dora_core::config::{Input, OperatorId};
use dora_core::coordinator_messages::CoordinatorRequest;
use dora_core::daemon_messages::{Data, InterDaemonEvent};
use dora_core::message::uhlc::HLC;
@@ -116,8 +116,8 @@ impl Daemon {
dataflow_id: Uuid::new_v4(),
working_dir,
nodes,
communication: descriptor.communication,
machine_listen_ports: BTreeMap::new(),
dataflow_descriptor: descriptor,
};

let exit_when_done = spawn_command
@@ -271,10 +271,10 @@ impl Daemon {
dataflow_id,
working_dir,
nodes,
communication,
machine_listen_ports,
dataflow_descriptor,
}) => {
match communication.remote {
match dataflow_descriptor.communication.remote {
dora_core::config::RemoteCommunicationConfig::Tcp => {}
}
for (machine_id, socket) in machine_listen_ports {
@@ -291,7 +291,7 @@ impl Daemon {
}

let result = self
.spawn_dataflow(dataflow_id, working_dir, nodes, communication.local)
.spawn_dataflow(dataflow_id, working_dir, nodes, dataflow_descriptor)
.await;
if let Err(err) = &result {
tracing::error!("{err:?}");
@@ -466,7 +466,7 @@ impl Daemon {
dataflow_id: uuid::Uuid,
working_dir: PathBuf,
nodes: Vec<ResolvedNode>,
daemon_communication_config: LocalCommunicationConfig,
dataflow_descriptor: Descriptor,
) -> eyre::Result<()> {
let dataflow = RunningDataflow::new(dataflow_id, self.machine_id.clone());
let dataflow = match self.running.entry(dataflow_id) {
@@ -522,7 +522,7 @@ impl Daemon {
&working_dir,
node,
self.events_tx.clone(),
daemon_communication_config,
dataflow_descriptor.clone(),
)
.await
.wrap_err_with(|| format!("failed to spawn node `{node_id}`"))


+ 15
- 5
binaries/daemon/src/spawn.rs View File

@@ -3,9 +3,11 @@ use crate::{
runtime_node_outputs, DoraEvent, Event, NodeExitStatus,
};
use dora_core::{
config::{LocalCommunicationConfig, NodeRunConfig},
config::NodeRunConfig,
daemon_messages::{DataflowId, NodeConfig, RuntimeConfig},
descriptor::{resolve_path, source_is_url, OperatorSource, ResolvedNode, SHELL_SOURCE},
descriptor::{
resolve_path, source_is_url, Descriptor, OperatorSource, ResolvedNode, SHELL_SOURCE,
},
};
use dora_download::download_file;
use eyre::WrapErr;
@@ -26,7 +28,7 @@ pub async fn spawn_node(
working_dir: &Path,
node: ResolvedNode,
daemon_tx: mpsc::Sender<Event>,
config: LocalCommunicationConfig,
dataflow_descriptor: Descriptor,
) -> eyre::Result<()> {
let node_id = node.id.clone();
tracing::debug!("Spawning node `{dataflow_id}/{node_id}`");
@@ -35,8 +37,14 @@ pub async fn spawn_node(
.into_iter()
.map(|(k, v)| (k, v.queue_size.unwrap_or(10)))
.collect();
let daemon_communication =
spawn_listener_loop(&dataflow_id, &node_id, &daemon_tx, config, queue_sizes).await?;
let daemon_communication = spawn_listener_loop(
&dataflow_id,
&node_id,
&daemon_tx,
dataflow_descriptor.communication.local,
queue_sizes,
)
.await?;

let mut child = match node.kind {
dora_core::descriptor::CoreNodeKind::Custom(n) => {
@@ -84,6 +92,7 @@ pub async fn spawn_node(
node_id: node_id.clone(),
run_config: n.run_config.clone(),
daemon_communication,
dataflow_descriptor,
};

command.env(
@@ -149,6 +158,7 @@ pub async fn spawn_node(
outputs: runtime_node_outputs(&n),
},
daemon_communication,
dataflow_descriptor,
},
operators: n.operators,
};


+ 1
- 0
binaries/runtime/Cargo.toml View File

@@ -35,6 +35,7 @@ dora-download = { workspace = true }
flume = "0.10.14"
clap = { version = "4.0.3", features = ["derive"] }
tracing-opentelemetry = { version = "0.18.0", optional = true }
pythonize = "0.18.0"

[features]
default = ["tracing"]


+ 3
- 0
binaries/runtime/src/lib.rs View File

@@ -38,6 +38,8 @@ pub fn main() -> eyre::Result<()> {
#[cfg(feature = "tracing")]
set_up_tracing(&node_id.to_string()).context("failed to set up tracing subscriber")?;

let dataflow_descriptor = config.dataflow_descriptor.clone();

let operator_definition = if operators.is_empty() {
bail!("no operators");
} else if operators.len() > 1 {
@@ -90,6 +92,7 @@ pub fn main() -> eyre::Result<()> {
incoming_events,
operator_events_tx,
init_done_tx,
&dataflow_descriptor,
)
.wrap_err_with(|| format!("failed to run operator {operator_id}"))?;



+ 3
- 1
binaries/runtime/src/operator/mod.rs View File

@@ -1,6 +1,6 @@
use dora_core::{
config::{DataId, NodeId},
descriptor::{OperatorDefinition, OperatorSource},
descriptor::{Descriptor, OperatorDefinition, OperatorSource},
message::MetadataParameters,
};
use dora_node_api::{DataSample, Event};
@@ -19,6 +19,7 @@ pub fn run_operator(
incoming_events: flume::Receiver<Event>,
events_tx: Sender<OperatorEvent>,
init_done: oneshot::Sender<Result<()>>,
dataflow_descriptor: &Descriptor,
) -> eyre::Result<()> {
match &operator_definition.config.source {
OperatorSource::SharedLibrary(source) => {
@@ -47,6 +48,7 @@ pub fn run_operator(
events_tx,
incoming_events,
init_done,
dataflow_descriptor,
)
.wrap_err_with(|| {
format!(


+ 7
- 1
binaries/runtime/src/operator/python.rs View File

@@ -3,7 +3,7 @@
use super::{OperatorEvent, StopReason};
use dora_core::{
config::{NodeId, OperatorId},
descriptor::source_is_url,
descriptor::{source_is_url, Descriptor},
};
use dora_download::download_file;
use dora_node_api::Event;
@@ -39,6 +39,7 @@ pub fn run(
events_tx: Sender<OperatorEvent>,
incoming_events: flume::Receiver<Event>,
init_done: oneshot::Sender<Result<()>>,
dataflow_descriptor: &Descriptor,
) -> eyre::Result<()> {
let path = if source_is_url(source) {
let target_path = Path::new("build")
@@ -98,6 +99,11 @@ pub fn run(
let operator = py
.eval("Operator()", None, Some(locals))
.map_err(traceback)?;
operator.setattr(
"dataflow_descriptor",
pythonize::pythonize(py, dataflow_descriptor)?,
)?;

Result::<_, eyre::Report>::Ok(Py::from(operator))
};



+ 4
- 3
libraries/core/src/daemon_messages.rs View File

@@ -6,8 +6,8 @@ use std::{
};

use crate::{
config::{CommunicationConfig, DataId, NodeId, NodeRunConfig, OperatorId},
descriptor::{OperatorDefinition, ResolvedNode},
config::{DataId, NodeId, NodeRunConfig, OperatorId},
descriptor::{Descriptor, OperatorDefinition, ResolvedNode},
};
use dora_message::Metadata;
use uuid::Uuid;
@@ -18,6 +18,7 @@ pub struct NodeConfig {
pub node_id: NodeId,
pub run_config: NodeRunConfig,
pub daemon_communication: DaemonCommunication,
pub dataflow_descriptor: Descriptor,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
@@ -250,6 +251,6 @@ pub struct SpawnDataflowNodes {
pub dataflow_id: DataflowId,
pub working_dir: PathBuf,
pub nodes: Vec<ResolvedNode>,
pub communication: CommunicationConfig,
pub machine_listen_ports: BTreeMap<String, SocketAddr>,
pub dataflow_descriptor: Descriptor,
}

Loading…
Cancel
Save