Browse Source

Remove duplicated `read_descriptor` function

tags/v0.2.2-rc
haixuanTao 2 years ago
parent
commit
db92865c3f
7 changed files with 28 additions and 35 deletions
  1. +1
    -0
      Cargo.lock
  2. +5
    -3
      binaries/cli/src/build.rs
  3. +2
    -13
      binaries/cli/src/graph/mod.rs
  4. +1
    -10
      binaries/coordinator/src/run/mod.rs
  5. +1
    -8
      binaries/daemon/src/lib.rs
  6. +1
    -0
      libraries/core/Cargo.toml
  7. +17
    -1
      libraries/core/src/descriptor/mod.rs

+ 1
- 0
Cargo.lock View File

@@ -857,6 +857,7 @@ dependencies = [
"serde",
"serde-with-expand-env",
"serde_yaml 0.9.19",
"tokio",
"tracing",
"uuid",
"which",


+ 5
- 3
binaries/cli/src/build.rs View File

@@ -1,10 +1,12 @@
use crate::graph;
use dora_core::{config::OperatorId, descriptor::SINGLE_OPERATOR_DEFAULT_ID};
use dora_core::{
config::OperatorId,
descriptor::{Descriptor, SINGLE_OPERATOR_DEFAULT_ID},
};
use eyre::{eyre, Context};
use std::{path::Path, process::Command};

pub fn build(dataflow: &Path) -> eyre::Result<()> {
let descriptor = graph::read_descriptor(dataflow)?;
let descriptor = Descriptor::blocking_read(dataflow)?;
let dataflow_absolute = if dataflow.is_relative() {
std::env::current_dir().unwrap().join(dataflow)
} else {


+ 2
- 13
binaries/cli/src/graph/mod.rs View File

@@ -1,8 +1,4 @@
use std::{
fs::{self, File},
io::Write,
path::Path,
};
use std::{fs::File, io::Write, path::Path};

use dora_core::descriptor::Descriptor;
use eyre::Context;
@@ -61,7 +57,7 @@ pub fn visualize_as_html(dataflow: &Path) -> eyre::Result<String> {
}

pub fn visualize_as_mermaid(dataflow: &Path) -> eyre::Result<String> {
let descriptor = read_descriptor(dataflow)
let descriptor = Descriptor::blocking_read(dataflow)
.with_context(|| format!("failed to read dataflow at `{}`", dataflow.display()))?;
let visualized = descriptor
.visualize_as_mermaid()
@@ -69,10 +65,3 @@ pub fn visualize_as_mermaid(dataflow: &Path) -> eyre::Result<String> {

Ok(visualized)
}

pub fn read_descriptor(file: &Path) -> eyre::Result<Descriptor> {
let descriptor_file = fs::read(file).context("failed to open given file")?;
let descriptor: Descriptor =
serde_yaml::from_slice(&descriptor_file).context("failed to parse given descriptor")?;
Ok(descriptor)
}

+ 1
- 10
binaries/coordinator/src/run/mod.rs View File

@@ -17,7 +17,7 @@ pub async fn spawn_dataflow(
dataflow_path: &Path,
daemon_connections: &mut HashMap<String, TcpStream>,
) -> eyre::Result<SpawnedDataflow> {
let descriptor = read_descriptor(dataflow_path).await.wrap_err_with(|| {
let descriptor = Descriptor::read(dataflow_path).await.wrap_err_with(|| {
format!(
"failed to read dataflow descriptor at {}",
dataflow_path.display()
@@ -83,12 +83,3 @@ pub struct SpawnedDataflow {
pub communication_config: CommunicationConfig,
pub machines: BTreeSet<String>,
}

async fn read_descriptor(file: &Path) -> Result<Descriptor, eyre::Error> {
let descriptor_file = tokio::fs::read(file)
.await
.context("failed to open given file")?;
let descriptor: Descriptor =
serde_yaml::from_slice(&descriptor_file).context("failed to parse given descriptor")?;
Ok(descriptor)
}

+ 1
- 8
binaries/daemon/src/lib.rs View File

@@ -87,7 +87,7 @@ impl Daemon {
.ok_or_else(|| eyre::eyre!("canonicalized dataflow path has no parent"))?
.to_owned();

let descriptor = read_descriptor(dataflow_path).await?;
let descriptor = Descriptor::read(dataflow_path).await?;
let nodes = descriptor.resolve_aliases();

let spawn_command = SpawnDataflowNodes {
@@ -1001,10 +1001,3 @@ enum RunStatus {
Continue,
Exit,
}

pub async fn read_descriptor(file: &Path) -> eyre::Result<Descriptor> {
let descriptor_file = fs::read(file).await.wrap_err("failed to open given file")?;
let descriptor: Descriptor =
serde_yaml::from_slice(&descriptor_file).context("failed to parse given descriptor")?;
Ok(descriptor)
}

+ 1
- 0
libraries/core/Cargo.toml View File

@@ -18,3 +18,4 @@ uuid = { version = "1.2.1", features = ["serde"] }
dora-message = { workspace = true }
tracing = "0.1"
serde-with-expand-env = "1.1.0"
tokio = { version = "1.24.1", features = ["fs"] }

+ 17
- 1
libraries/core/src/descriptor/mod.rs View File

@@ -2,7 +2,7 @@ use crate::{
config::{CommunicationConfig, DataId, Input, InputMapping, NodeId, NodeRunConfig, OperatorId},
daemon_messages::DaemonCommunicationConfig,
};
use eyre::{bail, Result};
use eyre::{bail, Context, Result};
use serde::{Deserialize, Serialize};
use serde_with_expand_env::with_expand_envs;
use std::{
@@ -93,6 +93,22 @@ impl Descriptor {

Ok(flowchart)
}

pub async fn read(file: &Path) -> eyre::Result<Descriptor> {
let descriptor_file = tokio::fs::read(file)
.await
.context("failed to open given file")?;
let descriptor: Descriptor =
serde_yaml::from_slice(&descriptor_file).context("failed to parse given descriptor")?;
Ok(descriptor)
}

pub fn blocking_read(file: &Path) -> eyre::Result<Descriptor> {
let descriptor_file = std::fs::read(file).context("failed to open given file")?;
let descriptor: Descriptor =
serde_yaml::from_slice(&descriptor_file).context("failed to parse given descriptor")?;
Ok(descriptor)
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]


Loading…
Cancel
Save