From 9d917bdaa9816106d7f266e4e86e52df73aee0a6 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Tue, 10 Jun 2025 13:04:33 +0200 Subject: [PATCH] Parse dataflow descriptor lazily in node APIs The dataflow descriptor format still changes often, which led to parse errors. By doing the parsing lazily, this should only affect users of the `dataflow_descriptor` function from now on. --- Cargo.lock | 10 +++++----- Cargo.toml | 1 + apis/python/node/Cargo.toml | 2 +- apis/python/operator/Cargo.toml | 2 +- apis/rust/node/Cargo.toml | 2 +- apis/rust/node/src/node/mod.rs | 15 +++++++++++---- binaries/cli/Cargo.toml | 2 +- binaries/daemon/Cargo.toml | 2 +- binaries/daemon/src/spawn.rs | 3 ++- binaries/runtime/Cargo.toml | 2 +- binaries/runtime/src/lib.rs | 3 ++- libraries/core/Cargo.toml | 2 +- libraries/message/Cargo.toml | 2 +- libraries/message/src/daemon_to_node.rs | 2 +- 14 files changed, 30 insertions(+), 20 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fb45da50..a763d686 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3030,7 +3030,7 @@ dependencies = [ "git2", "itertools 0.14.0", "serde_json", - "serde_yaml 0.8.26", + "serde_yaml 0.9.34+deprecated", "shared-memory-server", "sysinfo 0.30.13", "tokio", @@ -3161,7 +3161,7 @@ dependencies = [ "futures-concurrency", "futures-timer", "serde_json", - "serde_yaml 0.8.26", + "serde_yaml 0.9.34+deprecated", "shared-memory-server", "shared_memory_extended", "tokio", @@ -3212,7 +3212,7 @@ dependencies = [ "futures", "pyo3", "pythonize", - "serde_yaml 0.8.26", + "serde_yaml 0.9.34+deprecated", "tokio", ] @@ -3293,7 +3293,7 @@ dependencies = [ "futures", "futures-concurrency", "pyo3", - "serde_yaml 0.8.26", + "serde_yaml 0.9.34+deprecated", ] [[package]] @@ -3419,7 +3419,7 @@ dependencies = [ "libloading 0.7.4", "pyo3", "pythonize", - "serde_yaml 0.8.26", + "serde_yaml 0.9.34+deprecated", "tokio", "tokio-stream", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 541d3297..2b863fe5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -93,6 +93,7 @@ pyo3 = { version = "0.23", features = [ ] } pythonize = "0.23" git2 = { version = "0.18.0", features = ["vendored-openssl"] } +serde_yaml = "0.9.33" [package] name = "dora-examples" diff --git a/apis/python/node/Cargo.toml b/apis/python/node/Cargo.toml index c06fbbaa..063d8157 100644 --- a/apis/python/node/Cargo.toml +++ b/apis/python/node/Cargo.toml @@ -21,7 +21,7 @@ dora-node-api = { workspace = true } dora-operator-api-python = { workspace = true } pyo3.workspace = true eyre = "0.6" -serde_yaml = "0.8.23" +serde_yaml = { workspace = true } flume = "0.10.14" dora-runtime = { workspace = true, features = ["tracing", "metrics", "python"] } dora-cli = { workspace = true } diff --git a/apis/python/operator/Cargo.toml b/apis/python/operator/Cargo.toml index a96c5987..a65a929d 100644 --- a/apis/python/operator/Cargo.toml +++ b/apis/python/operator/Cargo.toml @@ -14,7 +14,7 @@ repository.workspace = true dora-node-api = { workspace = true } pyo3 = { workspace = true, features = ["eyre", "abi3-py37"] } eyre = "0.6" -serde_yaml = "0.8.23" +serde_yaml = { workspace = true } flume = "0.10.14" arrow = { workspace = true, features = ["pyarrow"] } arrow-schema = { workspace = true } diff --git a/apis/rust/node/Cargo.toml b/apis/rust/node/Cargo.toml index a96256f0..d1485b4b 100644 --- a/apis/rust/node/Cargo.toml +++ b/apis/rust/node/Cargo.toml @@ -17,7 +17,7 @@ dora-core = { workspace = true } dora-message = { workspace = true } shared-memory-server = { workspace = true } eyre = "0.6.7" -serde_yaml = "0.8.23" +serde_yaml = { workspace = true } tracing = "0.1.33" flume = "0.10.14" bincode = "1.3.3" diff --git a/apis/rust/node/src/node/mod.rs b/apis/rust/node/src/node/mod.rs index 47890d46..af58e536 100644 --- a/apis/rust/node/src/node/mod.rs +++ b/apis/rust/node/src/node/mod.rs @@ -60,7 +60,7 @@ pub struct DoraNode { drop_stream: DropStream, cache: VecDeque, - dataflow_descriptor: Descriptor, + dataflow_descriptor: serde_yaml::Result, warned_unknown_output: BTreeSet, _rt: TokioRuntime, } @@ -200,7 +200,7 @@ impl DoraNode { sent_out_shared_memory: HashMap::new(), drop_stream, cache: VecDeque::new(), - dataflow_descriptor, + dataflow_descriptor: serde_yaml::from_value(dataflow_descriptor), warned_unknown_output: BTreeSet::new(), _rt: rt, }; @@ -449,8 +449,15 @@ impl DoraNode { /// Returns the full dataflow descriptor that this node is part of. /// /// This method returns the parsed dataflow YAML file. - pub fn dataflow_descriptor(&self) -> &Descriptor { - &self.dataflow_descriptor + pub fn dataflow_descriptor(&self) -> eyre::Result<&Descriptor> { + match &self.dataflow_descriptor { + Ok(d) => Ok(d), + Err(err) => eyre::bail!( + "failed to parse dataflow descriptor: {err}\n\n + This might be caused by mismatched version numbers of dora \ + daemon and the dora node API" + ), + } } } diff --git a/binaries/cli/Cargo.toml b/binaries/cli/Cargo.toml index 51e07f54..e806dc60 100644 --- a/binaries/cli/Cargo.toml +++ b/binaries/cli/Cargo.toml @@ -27,7 +27,7 @@ dora-node-api-c = { workspace = true } dora-operator-api-c = { workspace = true } dora-download = { workspace = true } serde = { version = "1.0.136", features = ["derive"] } -serde_yaml = "0.9.11" +serde_yaml = { workspace = true } webbrowser = "0.8.3" serde_json = "1.0.86" termcolor = "1.1.3" diff --git a/binaries/daemon/Cargo.toml b/binaries/daemon/Cargo.toml index fdfd3596..04043b2f 100644 --- a/binaries/daemon/Cargo.toml +++ b/binaries/daemon/Cargo.toml @@ -31,7 +31,7 @@ dora-tracing = { workspace = true, optional = true } dora-arrow-convert = { workspace = true } dora-node-api = { workspace = true } dora-message = { workspace = true } -serde_yaml = "0.8.23" +serde_yaml = { workspace = true } uuid = { version = "1.7", features = ["v7"] } futures = "0.3.25" shared-memory-server = { workspace = true } diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index cf6a3092..7d75b755 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -89,7 +89,8 @@ impl Spawner { node_id: node_id.clone(), run_config: node.kind.run_config(), daemon_communication, - dataflow_descriptor: self.dataflow_descriptor.clone(), + dataflow_descriptor: serde_yaml::to_value(&self.dataflow_descriptor) + .context("failed to serialize dataflow descriptor to YAML")?, dynamic: node.kind.dynamic(), }; diff --git a/binaries/runtime/Cargo.toml b/binaries/runtime/Cargo.toml index 270ba264..73e6c615 100644 --- a/binaries/runtime/Cargo.toml +++ b/binaries/runtime/Cargo.toml @@ -21,7 +21,7 @@ eyre = "0.6.8" futures = "0.3.21" futures-concurrency = "7.1.0" libloading = "0.7.3" -serde_yaml = "0.8.23" +serde_yaml = { workspace = true } tokio = { version = "1.24.2", features = ["full"] } tokio-stream = "0.1.8" # pyo3-abi3 flag allow simpler linking. See: https://pyo3.rs/v0.13.2/building_and_distribution.html diff --git a/binaries/runtime/src/lib.rs b/binaries/runtime/src/lib.rs index ea949bf4..6b3311bc 100644 --- a/binaries/runtime/src/lib.rs +++ b/binaries/runtime/src/lib.rs @@ -43,7 +43,8 @@ pub fn main() -> eyre::Result<()> { .wrap_err("failed to set up tracing subscriber")?; } - let dataflow_descriptor = config.dataflow_descriptor.clone(); + let dataflow_descriptor = serde_yaml::from_value(config.dataflow_descriptor.clone()) + .context("failed to parse dataflow descriptor")?; let operator_definition = if operators.is_empty() { bail!("no operators"); diff --git a/libraries/core/Cargo.toml b/libraries/core/Cargo.toml index af467ffe..d50765ef 100644 --- a/libraries/core/Cargo.toml +++ b/libraries/core/Cargo.toml @@ -13,7 +13,7 @@ repository.workspace = true dora-message = { workspace = true } eyre = "0.6.8" serde = { version = "1.0.136", features = ["derive"] } -serde_yaml = "0.9.11" +serde_yaml = { workspace = true } once_cell = "1.13.0" which = "5.0.0" uuid = { version = "1.7", features = ["serde", "v7"] } diff --git a/libraries/message/Cargo.toml b/libraries/message/Cargo.toml index 7bb3f673..7e4e179f 100644 --- a/libraries/message/Cargo.toml +++ b/libraries/message/Cargo.toml @@ -23,7 +23,7 @@ aligned-vec = { version = "0.5.0", features = ["serde"] } semver = { version = "1.0.23", features = ["serde"] } schemars = "0.8.19" uhlc = "0.5.1" -serde_yaml = "0.9.11" +serde_yaml = { workspace = true } once_cell = "1.13.0" serde-with-expand-env = "1.1.0" bincode = "1.3.3" diff --git a/libraries/message/src/daemon_to_node.rs b/libraries/message/src/daemon_to_node.rs index acc1630e..e0ce466c 100644 --- a/libraries/message/src/daemon_to_node.rs +++ b/libraries/message/src/daemon_to_node.rs @@ -23,7 +23,7 @@ pub struct NodeConfig { pub node_id: NodeId, pub run_config: NodeRunConfig, pub daemon_communication: DaemonCommunication, - pub dataflow_descriptor: Descriptor, + pub dataflow_descriptor: serde_yaml::Value, pub dynamic: bool, }