diff --git a/Cargo.lock b/Cargo.lock index b799d0db..b548e8c9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -857,6 +857,7 @@ dependencies = [ "serde", "serde-with-expand-env", "serde_yaml 0.9.19", + "tokio", "tracing", "uuid", "which", diff --git a/binaries/cli/src/build.rs b/binaries/cli/src/build.rs index 4b308ba6..b235f3dd 100644 --- a/binaries/cli/src/build.rs +++ b/binaries/cli/src/build.rs @@ -1,10 +1,12 @@ -use crate::graph; -use dora_core::{config::OperatorId, descriptor::SINGLE_OPERATOR_DEFAULT_ID}; +use dora_core::{ + config::OperatorId, + descriptor::{Descriptor, SINGLE_OPERATOR_DEFAULT_ID}, +}; use eyre::{eyre, Context}; use std::{path::Path, process::Command}; pub fn build(dataflow: &Path) -> eyre::Result<()> { - let descriptor = graph::read_descriptor(dataflow)?; + let descriptor = Descriptor::blocking_read(dataflow)?; let dataflow_absolute = if dataflow.is_relative() { std::env::current_dir().unwrap().join(dataflow) } else { diff --git a/binaries/cli/src/graph/mod.rs b/binaries/cli/src/graph/mod.rs index 34ee73a3..c28f9eb4 100644 --- a/binaries/cli/src/graph/mod.rs +++ b/binaries/cli/src/graph/mod.rs @@ -1,8 +1,4 @@ -use std::{ - fs::{self, File}, - io::Write, - path::Path, -}; +use std::{fs::File, io::Write, path::Path}; use dora_core::descriptor::Descriptor; use eyre::Context; @@ -61,7 +57,7 @@ pub fn visualize_as_html(dataflow: &Path) -> eyre::Result { } pub fn visualize_as_mermaid(dataflow: &Path) -> eyre::Result { - let descriptor = read_descriptor(dataflow) + let descriptor = Descriptor::blocking_read(dataflow) .with_context(|| format!("failed to read dataflow at `{}`", dataflow.display()))?; let visualized = descriptor .visualize_as_mermaid() @@ -69,10 +65,3 @@ pub fn visualize_as_mermaid(dataflow: &Path) -> eyre::Result { Ok(visualized) } - -pub fn read_descriptor(file: &Path) -> eyre::Result { - let descriptor_file = fs::read(file).context("failed to open given file")?; - let descriptor: Descriptor = - serde_yaml::from_slice(&descriptor_file).context("failed to parse given descriptor")?; - Ok(descriptor) -} diff --git a/binaries/coordinator/src/run/mod.rs b/binaries/coordinator/src/run/mod.rs index 4673fd0d..fde82a32 100644 --- a/binaries/coordinator/src/run/mod.rs +++ b/binaries/coordinator/src/run/mod.rs @@ -17,7 +17,7 @@ pub async fn spawn_dataflow( dataflow_path: &Path, daemon_connections: &mut HashMap, ) -> eyre::Result { - let descriptor = read_descriptor(dataflow_path).await.wrap_err_with(|| { + let descriptor = Descriptor::read(dataflow_path).await.wrap_err_with(|| { format!( "failed to read dataflow descriptor at {}", dataflow_path.display() @@ -83,12 +83,3 @@ pub struct SpawnedDataflow { pub communication_config: CommunicationConfig, pub machines: BTreeSet, } - -async fn read_descriptor(file: &Path) -> Result { - let descriptor_file = tokio::fs::read(file) - .await - .context("failed to open given file")?; - let descriptor: Descriptor = - serde_yaml::from_slice(&descriptor_file).context("failed to parse given descriptor")?; - Ok(descriptor) -} diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 8a58427e..d3e09a02 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -87,7 +87,7 @@ impl Daemon { .ok_or_else(|| eyre::eyre!("canonicalized dataflow path has no parent"))? .to_owned(); - let descriptor = read_descriptor(dataflow_path).await?; + let descriptor = Descriptor::read(dataflow_path).await?; let nodes = descriptor.resolve_aliases(); let spawn_command = SpawnDataflowNodes { @@ -1001,10 +1001,3 @@ enum RunStatus { Continue, Exit, } - -pub async fn read_descriptor(file: &Path) -> eyre::Result { - let descriptor_file = fs::read(file).await.wrap_err("failed to open given file")?; - let descriptor: Descriptor = - serde_yaml::from_slice(&descriptor_file).context("failed to parse given descriptor")?; - Ok(descriptor) -} diff --git a/libraries/core/Cargo.toml b/libraries/core/Cargo.toml index ddbdd04b..9567416f 100644 --- a/libraries/core/Cargo.toml +++ b/libraries/core/Cargo.toml @@ -18,3 +18,4 @@ uuid = { version = "1.2.1", features = ["serde"] } dora-message = { workspace = true } tracing = "0.1" serde-with-expand-env = "1.1.0" +tokio = { version = "1.24.1", features = ["fs"] } diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index b41ac666..b0efa622 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -2,7 +2,7 @@ use crate::{ config::{CommunicationConfig, DataId, Input, InputMapping, NodeId, NodeRunConfig, OperatorId}, daemon_messages::DaemonCommunicationConfig, }; -use eyre::{bail, Result}; +use eyre::{bail, Context, Result}; use serde::{Deserialize, Serialize}; use serde_with_expand_env::with_expand_envs; use std::{ @@ -93,6 +93,22 @@ impl Descriptor { Ok(flowchart) } + + pub async fn read(file: &Path) -> eyre::Result { + let descriptor_file = tokio::fs::read(file) + .await + .context("failed to open given file")?; + let descriptor: Descriptor = + serde_yaml::from_slice(&descriptor_file).context("failed to parse given descriptor")?; + Ok(descriptor) + } + + pub fn blocking_read(file: &Path) -> eyre::Result { + let descriptor_file = std::fs::read(file).context("failed to open given file")?; + let descriptor: Descriptor = + serde_yaml::from_slice(&descriptor_file).context("failed to parse given descriptor")?; + Ok(descriptor) + } } #[derive(Debug, Clone, Serialize, Deserialize)]