Browse Source

Parse dataflow descriptor lazily in node APIs

The dataflow descriptor format still changes often, which led to parse errors. By doing the parsing lazily, this should only affect users of the `dataflow_descriptor` function from now on.
tags/v0.3.12-rc0
Philipp Oppermann 7 months ago
parent
commit
9d917bdaa9
Failed to extract signature
14 changed files with 30 additions and 20 deletions
  1. +5
    -5
      Cargo.lock
  2. +1
    -0
      Cargo.toml
  3. +1
    -1
      apis/python/node/Cargo.toml
  4. +1
    -1
      apis/python/operator/Cargo.toml
  5. +1
    -1
      apis/rust/node/Cargo.toml
  6. +11
    -4
      apis/rust/node/src/node/mod.rs
  7. +1
    -1
      binaries/cli/Cargo.toml
  8. +1
    -1
      binaries/daemon/Cargo.toml
  9. +2
    -1
      binaries/daemon/src/spawn.rs
  10. +1
    -1
      binaries/runtime/Cargo.toml
  11. +2
    -1
      binaries/runtime/src/lib.rs
  12. +1
    -1
      libraries/core/Cargo.toml
  13. +1
    -1
      libraries/message/Cargo.toml
  14. +1
    -1
      libraries/message/src/daemon_to_node.rs

+ 5
- 5
Cargo.lock View File

@@ -3030,7 +3030,7 @@ dependencies = [
"git2",
"itertools 0.14.0",
"serde_json",
"serde_yaml 0.8.26",
"serde_yaml 0.9.34+deprecated",
"shared-memory-server",
"sysinfo 0.30.13",
"tokio",
@@ -3161,7 +3161,7 @@ dependencies = [
"futures-concurrency",
"futures-timer",
"serde_json",
"serde_yaml 0.8.26",
"serde_yaml 0.9.34+deprecated",
"shared-memory-server",
"shared_memory_extended",
"tokio",
@@ -3212,7 +3212,7 @@ dependencies = [
"futures",
"pyo3",
"pythonize",
"serde_yaml 0.8.26",
"serde_yaml 0.9.34+deprecated",
"tokio",
]

@@ -3293,7 +3293,7 @@ dependencies = [
"futures",
"futures-concurrency",
"pyo3",
"serde_yaml 0.8.26",
"serde_yaml 0.9.34+deprecated",
]

[[package]]
@@ -3419,7 +3419,7 @@ dependencies = [
"libloading 0.7.4",
"pyo3",
"pythonize",
"serde_yaml 0.8.26",
"serde_yaml 0.9.34+deprecated",
"tokio",
"tokio-stream",
"tracing",


+ 1
- 0
Cargo.toml View File

@@ -93,6 +93,7 @@ pyo3 = { version = "0.23", features = [
] }
pythonize = "0.23"
git2 = { version = "0.18.0", features = ["vendored-openssl"] }
serde_yaml = "0.9.33"

[package]
name = "dora-examples"


+ 1
- 1
apis/python/node/Cargo.toml View File

@@ -21,7 +21,7 @@ dora-node-api = { workspace = true }
dora-operator-api-python = { workspace = true }
pyo3.workspace = true
eyre = "0.6"
serde_yaml = "0.8.23"
serde_yaml = { workspace = true }
flume = "0.10.14"
dora-runtime = { workspace = true, features = ["tracing", "metrics", "python"] }
dora-cli = { workspace = true }


+ 1
- 1
apis/python/operator/Cargo.toml View File

@@ -14,7 +14,7 @@ repository.workspace = true
dora-node-api = { workspace = true }
pyo3 = { workspace = true, features = ["eyre", "abi3-py37"] }
eyre = "0.6"
serde_yaml = "0.8.23"
serde_yaml = { workspace = true }
flume = "0.10.14"
arrow = { workspace = true, features = ["pyarrow"] }
arrow-schema = { workspace = true }


+ 1
- 1
apis/rust/node/Cargo.toml View File

@@ -17,7 +17,7 @@ dora-core = { workspace = true }
dora-message = { workspace = true }
shared-memory-server = { workspace = true }
eyre = "0.6.7"
serde_yaml = "0.8.23"
serde_yaml = { workspace = true }
tracing = "0.1.33"
flume = "0.10.14"
bincode = "1.3.3"


+ 11
- 4
apis/rust/node/src/node/mod.rs View File

@@ -60,7 +60,7 @@ pub struct DoraNode {
drop_stream: DropStream,
cache: VecDeque<ShmemHandle>,

dataflow_descriptor: Descriptor,
dataflow_descriptor: serde_yaml::Result<Descriptor>,
warned_unknown_output: BTreeSet<DataId>,
_rt: TokioRuntime,
}
@@ -200,7 +200,7 @@ impl DoraNode {
sent_out_shared_memory: HashMap::new(),
drop_stream,
cache: VecDeque::new(),
dataflow_descriptor,
dataflow_descriptor: serde_yaml::from_value(dataflow_descriptor),
warned_unknown_output: BTreeSet::new(),
_rt: rt,
};
@@ -449,8 +449,15 @@ impl DoraNode {
/// Returns the full dataflow descriptor that this node is part of.
///
/// This method returns the parsed dataflow YAML file.
pub fn dataflow_descriptor(&self) -> &Descriptor {
&self.dataflow_descriptor
pub fn dataflow_descriptor(&self) -> eyre::Result<&Descriptor> {
match &self.dataflow_descriptor {
Ok(d) => Ok(d),
Err(err) => eyre::bail!(
"failed to parse dataflow descriptor: {err}\n\n
This might be caused by mismatched version numbers of dora \
daemon and the dora node API"
),
}
}
}



+ 1
- 1
binaries/cli/Cargo.toml View File

@@ -27,7 +27,7 @@ dora-node-api-c = { workspace = true }
dora-operator-api-c = { workspace = true }
dora-download = { workspace = true }
serde = { version = "1.0.136", features = ["derive"] }
serde_yaml = "0.9.11"
serde_yaml = { workspace = true }
webbrowser = "0.8.3"
serde_json = "1.0.86"
termcolor = "1.1.3"


+ 1
- 1
binaries/daemon/Cargo.toml View File

@@ -31,7 +31,7 @@ dora-tracing = { workspace = true, optional = true }
dora-arrow-convert = { workspace = true }
dora-node-api = { workspace = true }
dora-message = { workspace = true }
serde_yaml = "0.8.23"
serde_yaml = { workspace = true }
uuid = { version = "1.7", features = ["v7"] }
futures = "0.3.25"
shared-memory-server = { workspace = true }


+ 2
- 1
binaries/daemon/src/spawn.rs View File

@@ -89,7 +89,8 @@ impl Spawner {
node_id: node_id.clone(),
run_config: node.kind.run_config(),
daemon_communication,
dataflow_descriptor: self.dataflow_descriptor.clone(),
dataflow_descriptor: serde_yaml::to_value(&self.dataflow_descriptor)
.context("failed to serialize dataflow descriptor to YAML")?,
dynamic: node.kind.dynamic(),
};



+ 1
- 1
binaries/runtime/Cargo.toml View File

@@ -21,7 +21,7 @@ eyre = "0.6.8"
futures = "0.3.21"
futures-concurrency = "7.1.0"
libloading = "0.7.3"
serde_yaml = "0.8.23"
serde_yaml = { workspace = true }
tokio = { version = "1.24.2", features = ["full"] }
tokio-stream = "0.1.8"
# pyo3-abi3 flag allow simpler linking. See: https://pyo3.rs/v0.13.2/building_and_distribution.html


+ 2
- 1
binaries/runtime/src/lib.rs View File

@@ -43,7 +43,8 @@ pub fn main() -> eyre::Result<()> {
.wrap_err("failed to set up tracing subscriber")?;
}

let dataflow_descriptor = config.dataflow_descriptor.clone();
let dataflow_descriptor = serde_yaml::from_value(config.dataflow_descriptor.clone())
.context("failed to parse dataflow descriptor")?;

let operator_definition = if operators.is_empty() {
bail!("no operators");


+ 1
- 1
libraries/core/Cargo.toml View File

@@ -13,7 +13,7 @@ repository.workspace = true
dora-message = { workspace = true }
eyre = "0.6.8"
serde = { version = "1.0.136", features = ["derive"] }
serde_yaml = "0.9.11"
serde_yaml = { workspace = true }
once_cell = "1.13.0"
which = "5.0.0"
uuid = { version = "1.7", features = ["serde", "v7"] }


+ 1
- 1
libraries/message/Cargo.toml View File

@@ -23,7 +23,7 @@ aligned-vec = { version = "0.5.0", features = ["serde"] }
semver = { version = "1.0.23", features = ["serde"] }
schemars = "0.8.19"
uhlc = "0.5.1"
serde_yaml = "0.9.11"
serde_yaml = { workspace = true }
once_cell = "1.13.0"
serde-with-expand-env = "1.1.0"
bincode = "1.3.3"

+ 1
- 1
libraries/message/src/daemon_to_node.rs View File

@@ -23,7 +23,7 @@ pub struct NodeConfig {
pub node_id: NodeId,
pub run_config: NodeRunConfig,
pub daemon_communication: DaemonCommunication,
pub dataflow_descriptor: Descriptor,
pub dataflow_descriptor: serde_yaml::Value,
pub dynamic: bool,
}



Loading…
Cancel
Save