Browse Source

Start creating a stdio-based client interface

tags/v0.0.0-test.4
Philipp Oppermann 3 years ago
parent
commit
eca7863978
6 changed files with 118 additions and 23 deletions
  1. +27
    -2
      Cargo.lock
  2. +5
    -2
      Cargo.toml
  3. +2
    -0
      examples/dataflow-example.yml
  4. +8
    -0
      examples/example_source.rs
  5. +11
    -10
      src/descriptor.rs
  6. +65
    -9
      src/main.rs

+ 27
- 2
Cargo.lock View File

@@ -515,11 +515,14 @@ dependencies = [
"envy",
"eyre",
"futures",
"futures-concurrency",
"log",
"pyo3",
"serde",
"serde_yaml",
"structopt",
"tokio",
"tokio-stream",
"zenoh",
]

@@ -620,6 +623,17 @@ dependencies = [
"futures-sink",
]

[[package]]
name = "futures-concurrency"
version = "2.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48e98b7b5aedee7c34a5cfb1ee1681af8faf46e2f30c0b8af5ea08eba517d61c"
dependencies = [
"async-trait",
"futures-core",
"pin-project",
]

[[package]]
name = "futures-core"
version = "0.3.21"
@@ -967,9 +981,9 @@ dependencies = [

[[package]]
name = "log"
version = "0.4.14"
version = "0.4.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710"
checksum = "6389c490849ff5bc16be905ae24bc913a9c8892e19b2341dbc175e14c341c2b8"
dependencies = [
"cfg-if",
"value-bag",
@@ -2160,6 +2174,17 @@ dependencies = [
"syn",
]

[[package]]
name = "tokio-stream"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50145484efff8818b5ccd256697f36863f587da82cf8b409c53adf1e840798e3"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
]

[[package]]
name = "tracing"
version = "0.1.32"


+ 5
- 2
Cargo.toml View File

@@ -14,5 +14,8 @@ zenoh = { git="https://github.com/eclipse-zenoh/zenoh.git" }
env_logger = "0.9.0"
tokio = { version="1.17.0", features=["full"]}
pyo3 = "0.16.1"
futures = "0.3.12"
envy = "0.4.2"
futures = "0.3.21"
envy = "0.4.2"
log = "0.4.16"
tokio-stream = { version = "0.1.8", features = ["io-util"] }
futures-concurrency = "2.0.3"

+ 2
- 0
examples/dataflow-example.yml View File

@@ -6,8 +6,10 @@ sinks:
sources:
- id: source-1
output: C
run: target/debug/examples/example_source
- id: source-2
output: G
run: target/debug/examples/example_source
operators:
- id: op-1
inputs:


+ 8
- 0
examples/example_source.rs View File

@@ -0,0 +1,8 @@
use std::time::Duration;

fn main() {
for i in 0..100 {
println!("{i}");
std::thread::sleep(Duration::from_millis(100));
}
}

+ 11
- 10
src/descriptor.rs View File

@@ -4,11 +4,11 @@ use std::collections::{BTreeSet, HashMap, HashSet};
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct Descriptor {
#[serde(default)]
sources: HashSet<Source>,
pub sources: HashSet<Source>,
#[serde(default)]
sinks: HashSet<Sink>,
pub sinks: HashSet<Sink>,
#[serde(default)]
operators: HashSet<Operator>,
pub operators: HashSet<Operator>,
}

impl Descriptor {
@@ -74,19 +74,20 @@ impl Descriptor {

#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct Source {
id: String,
output: String,
pub id: String,
pub output: String,
pub run: String,
}

#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct Sink {
id: String,
input: String,
pub id: String,
pub input: String,
}

#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct Operator {
id: String,
inputs: BTreeSet<String>,
outputs: BTreeSet<String>,
pub id: String,
pub inputs: BTreeSet<String>,
pub outputs: BTreeSet<String>,
}

+ 65
- 9
src/main.rs View File

@@ -1,27 +1,31 @@
use dora_rs::descriptor::Descriptor;
use eyre::Context;
use std::{fs::File, path::PathBuf};
use eyre::{eyre, Context};
use futures::{stream::FuturesUnordered, StreamExt};
use futures_concurrency::Merge;
use std::path::{Path, PathBuf};
use structopt::StructOpt;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio_stream::wrappers::LinesStream;

#[derive(Debug, Clone, StructOpt)]
#[structopt(about = "Dora control")]
enum Command {
#[structopt(about = "Print Graph")]
Graph { file: PathBuf },
Visualize { file: PathBuf },
#[structopt(about = "Run Python server")]
StartPython(dora_rs::python::server::PythonCommand),
#[structopt(about = "Run dataflow pipeline")]
Run { file: PathBuf },
}

fn main() -> eyre::Result<()> {
#[tokio::main]
async fn main() -> eyre::Result<()> {
env_logger::init();

let command = Command::from_args();
match command {
Command::Graph { file } => {
let descriptor_file = File::open(&file).context("failed to open given file")?;

let descriptor: Descriptor = serde_yaml::from_reader(descriptor_file)
.context("failed to parse given descriptor")?;
Command::Visualize { file } => {
let descriptor = read_descriptor(&file).await?;
let visualized = descriptor
.visualize_as_mermaid()
.context("failed to visualize descriptor")?;
@@ -34,7 +38,59 @@ fn main() -> eyre::Result<()> {
Command::StartPython(command) => {
dora_rs::python::server::run(command).context("python server failed")?;
}
Command::Run { file } => {
let descriptor = read_descriptor(&file).await?;

let mut outputs = Vec::new();
let tasks = FuturesUnordered::new();

for source in &descriptor.sources {
let mut command = tokio::process::Command::new(&source.run);
command.stdout(std::process::Stdio::piped());

let mut child = command
.spawn()
.with_context(|| format!("failed to spawn source {}", source.id))?;
let stdout = child
.stdout
.take()
.ok_or_else(|| eyre!("failed to take stdout handle of source"))?;
let reader = LinesStream::new(BufReader::new(stdout).lines());

let source_id = source.id.clone();
let result = tokio::spawn(async move {
let status = child.wait().await.context("child process failed")?;
if status.success() {
Ok(())
} else if let Some(code) = status.code() {
Err(eyre!("Source {source_id} failed with exit code: {code}"))
} else {
Err(eyre!("Source {source_id} failed (unknown exit code)"))
}
});

outputs.push(reader.map(|l| (source.output.clone(), l)));
tasks.push(result);
}

// print all output for now (the eventual goal is to pass it to operators)
let mut merged = outputs.merge();
while let Some((name, line)) = merged.next().await {
let output =
line.with_context(|| format!("failed to get next line of output {name}"))?;
println!("Output {name}: {output}");
}
}
}

Ok(())
}

async fn read_descriptor(file: &Path) -> Result<Descriptor, eyre::Error> {
let descriptor_file = tokio::fs::read(file)
.await
.context("failed to open given file")?;
let descriptor: Descriptor =
serde_yaml::from_slice(&descriptor_file).context("failed to parse given descriptor")?;
Ok(descriptor)
}

Loading…
Cancel
Save