diff --git a/Cargo.lock b/Cargo.lock index fb45da50..a763d686 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3030,7 +3030,7 @@ dependencies = [ "git2", "itertools 0.14.0", "serde_json", - "serde_yaml 0.8.26", + "serde_yaml 0.9.34+deprecated", "shared-memory-server", "sysinfo 0.30.13", "tokio", @@ -3161,7 +3161,7 @@ dependencies = [ "futures-concurrency", "futures-timer", "serde_json", - "serde_yaml 0.8.26", + "serde_yaml 0.9.34+deprecated", "shared-memory-server", "shared_memory_extended", "tokio", @@ -3212,7 +3212,7 @@ dependencies = [ "futures", "pyo3", "pythonize", - "serde_yaml 0.8.26", + "serde_yaml 0.9.34+deprecated", "tokio", ] @@ -3293,7 +3293,7 @@ dependencies = [ "futures", "futures-concurrency", "pyo3", - "serde_yaml 0.8.26", + "serde_yaml 0.9.34+deprecated", ] [[package]] @@ -3419,7 +3419,7 @@ dependencies = [ "libloading 0.7.4", "pyo3", "pythonize", - "serde_yaml 0.8.26", + "serde_yaml 0.9.34+deprecated", "tokio", "tokio-stream", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 541d3297..2b863fe5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -93,6 +93,7 @@ pyo3 = { version = "0.23", features = [ ] } pythonize = "0.23" git2 = { version = "0.18.0", features = ["vendored-openssl"] } +serde_yaml = "0.9.33" [package] name = "dora-examples" diff --git a/apis/python/node/Cargo.toml b/apis/python/node/Cargo.toml index c06fbbaa..063d8157 100644 --- a/apis/python/node/Cargo.toml +++ b/apis/python/node/Cargo.toml @@ -21,7 +21,7 @@ dora-node-api = { workspace = true } dora-operator-api-python = { workspace = true } pyo3.workspace = true eyre = "0.6" -serde_yaml = "0.8.23" +serde_yaml = { workspace = true } flume = "0.10.14" dora-runtime = { workspace = true, features = ["tracing", "metrics", "python"] } dora-cli = { workspace = true } diff --git a/apis/python/operator/Cargo.toml b/apis/python/operator/Cargo.toml index a96c5987..a65a929d 100644 --- a/apis/python/operator/Cargo.toml +++ b/apis/python/operator/Cargo.toml @@ -14,7 +14,7 @@ repository.workspace = true dora-node-api = { workspace = true } pyo3 = { workspace = true, features = ["eyre", "abi3-py37"] } eyre = "0.6" -serde_yaml = "0.8.23" +serde_yaml = { workspace = true } flume = "0.10.14" arrow = { workspace = true, features = ["pyarrow"] } arrow-schema = { workspace = true } diff --git a/apis/rust/node/Cargo.toml b/apis/rust/node/Cargo.toml index a96256f0..d1485b4b 100644 --- a/apis/rust/node/Cargo.toml +++ b/apis/rust/node/Cargo.toml @@ -17,7 +17,7 @@ dora-core = { workspace = true } dora-message = { workspace = true } shared-memory-server = { workspace = true } eyre = "0.6.7" -serde_yaml = "0.8.23" +serde_yaml = { workspace = true } tracing = "0.1.33" flume = "0.10.14" bincode = "1.3.3" diff --git a/apis/rust/node/src/node/mod.rs b/apis/rust/node/src/node/mod.rs index 47890d46..af58e536 100644 --- a/apis/rust/node/src/node/mod.rs +++ b/apis/rust/node/src/node/mod.rs @@ -60,7 +60,7 @@ pub struct DoraNode { drop_stream: DropStream, cache: VecDeque, - dataflow_descriptor: Descriptor, + dataflow_descriptor: serde_yaml::Result, warned_unknown_output: BTreeSet, _rt: TokioRuntime, } @@ -200,7 +200,7 @@ impl DoraNode { sent_out_shared_memory: HashMap::new(), drop_stream, cache: VecDeque::new(), - dataflow_descriptor, + dataflow_descriptor: serde_yaml::from_value(dataflow_descriptor), warned_unknown_output: BTreeSet::new(), _rt: rt, }; @@ -449,8 +449,15 @@ impl DoraNode { /// Returns the full dataflow descriptor that this node is part of. /// /// This method returns the parsed dataflow YAML file. - pub fn dataflow_descriptor(&self) -> &Descriptor { - &self.dataflow_descriptor + pub fn dataflow_descriptor(&self) -> eyre::Result<&Descriptor> { + match &self.dataflow_descriptor { + Ok(d) => Ok(d), + Err(err) => eyre::bail!( + "failed to parse dataflow descriptor: {err}\n\n + This might be caused by mismatched version numbers of dora \ + daemon and the dora node API" + ), + } } } diff --git a/binaries/cli/Cargo.toml b/binaries/cli/Cargo.toml index 51e07f54..e806dc60 100644 --- a/binaries/cli/Cargo.toml +++ b/binaries/cli/Cargo.toml @@ -27,7 +27,7 @@ dora-node-api-c = { workspace = true } dora-operator-api-c = { workspace = true } dora-download = { workspace = true } serde = { version = "1.0.136", features = ["derive"] } -serde_yaml = "0.9.11" +serde_yaml = { workspace = true } webbrowser = "0.8.3" serde_json = "1.0.86" termcolor = "1.1.3" diff --git a/binaries/daemon/Cargo.toml b/binaries/daemon/Cargo.toml index fdfd3596..04043b2f 100644 --- a/binaries/daemon/Cargo.toml +++ b/binaries/daemon/Cargo.toml @@ -31,7 +31,7 @@ dora-tracing = { workspace = true, optional = true } dora-arrow-convert = { workspace = true } dora-node-api = { workspace = true } dora-message = { workspace = true } -serde_yaml = "0.8.23" +serde_yaml = { workspace = true } uuid = { version = "1.7", features = ["v7"] } futures = "0.3.25" shared-memory-server = { workspace = true } diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index cf6a3092..7d75b755 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -89,7 +89,8 @@ impl Spawner { node_id: node_id.clone(), run_config: node.kind.run_config(), daemon_communication, - dataflow_descriptor: self.dataflow_descriptor.clone(), + dataflow_descriptor: serde_yaml::to_value(&self.dataflow_descriptor) + .context("failed to serialize dataflow descriptor to YAML")?, dynamic: node.kind.dynamic(), }; diff --git a/binaries/runtime/Cargo.toml b/binaries/runtime/Cargo.toml index 270ba264..73e6c615 100644 --- a/binaries/runtime/Cargo.toml +++ b/binaries/runtime/Cargo.toml @@ -21,7 +21,7 @@ eyre = "0.6.8" futures = "0.3.21" futures-concurrency = "7.1.0" libloading = "0.7.3" -serde_yaml = "0.8.23" +serde_yaml = { workspace = true } tokio = { version = "1.24.2", features = ["full"] } tokio-stream = "0.1.8" # pyo3-abi3 flag allow simpler linking. See: https://pyo3.rs/v0.13.2/building_and_distribution.html diff --git a/binaries/runtime/src/lib.rs b/binaries/runtime/src/lib.rs index ea949bf4..6b3311bc 100644 --- a/binaries/runtime/src/lib.rs +++ b/binaries/runtime/src/lib.rs @@ -43,7 +43,8 @@ pub fn main() -> eyre::Result<()> { .wrap_err("failed to set up tracing subscriber")?; } - let dataflow_descriptor = config.dataflow_descriptor.clone(); + let dataflow_descriptor = serde_yaml::from_value(config.dataflow_descriptor.clone()) + .context("failed to parse dataflow descriptor")?; let operator_definition = if operators.is_empty() { bail!("no operators"); diff --git a/libraries/core/Cargo.toml b/libraries/core/Cargo.toml index af467ffe..d50765ef 100644 --- a/libraries/core/Cargo.toml +++ b/libraries/core/Cargo.toml @@ -13,7 +13,7 @@ repository.workspace = true dora-message = { workspace = true } eyre = "0.6.8" serde = { version = "1.0.136", features = ["derive"] } -serde_yaml = "0.9.11" +serde_yaml = { workspace = true } once_cell = "1.13.0" which = "5.0.0" uuid = { version = "1.7", features = ["serde", "v7"] } diff --git a/libraries/message/Cargo.toml b/libraries/message/Cargo.toml index 7bb3f673..7e4e179f 100644 --- a/libraries/message/Cargo.toml +++ b/libraries/message/Cargo.toml @@ -23,7 +23,7 @@ aligned-vec = { version = "0.5.0", features = ["serde"] } semver = { version = "1.0.23", features = ["serde"] } schemars = "0.8.19" uhlc = "0.5.1" -serde_yaml = "0.9.11" +serde_yaml = { workspace = true } once_cell = "1.13.0" serde-with-expand-env = "1.1.0" bincode = "1.3.3" diff --git a/libraries/message/src/daemon_to_node.rs b/libraries/message/src/daemon_to_node.rs index acc1630e..e0ce466c 100644 --- a/libraries/message/src/daemon_to_node.rs +++ b/libraries/message/src/daemon_to_node.rs @@ -23,7 +23,7 @@ pub struct NodeConfig { pub node_id: NodeId, pub run_config: NodeRunConfig, pub daemon_communication: DaemonCommunication, - pub dataflow_descriptor: Descriptor, + pub dataflow_descriptor: serde_yaml::Value, pub dynamic: bool, }