diff --git a/Cargo.lock b/Cargo.lock index e1dd9b00..4436bb64 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2368,6 +2368,7 @@ dependencies = [ "tokio-stream", "tracing", "uuid", + "xshell", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 09690ea9..08956d42 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -97,6 +97,7 @@ uuid = { version = "1.7", features = ["v7", "serde"] } tracing = "0.1.36" futures = "0.3.25" tokio-stream = "0.1.11" +xshell = "0.2.6" [[example]] name = "c-dataflow" diff --git a/examples/rust-dataflow/run.rs b/examples/rust-dataflow/run.rs index f5e035a5..9cd668b6 100644 --- a/examples/rust-dataflow/run.rs +++ b/examples/rust-dataflow/run.rs @@ -1,46 +1,53 @@ -use dora_tracing::set_up_tracing; -use eyre::{bail, Context}; -use std::path::Path; +use eyre::ContextCompat; +use std::path::{Path, PathBuf}; +use xshell::{cmd, Shell}; -#[tokio::main] -async fn main() -> eyre::Result<()> { - set_up_tracing("rust-dataflow-runner").wrap_err("failed to set up tracing subscriber")?; +fn main() -> eyre::Result<()> { + // create a new shell in this folder + let sh = prepare_shell()?; + // build the `dora` binary (you can skip this if you use `cargo install dora-cli`) + let dora = prepare_dora(&sh)?; - let root = Path::new(env!("CARGO_MANIFEST_DIR")); - std::env::set_current_dir(root.join(file!()).parent().unwrap()) - .wrap_err("failed to set working dir")?; + // build the dataflow using `dora build` + cmd!(sh, "{dora} build dataflow.yml").run()?; + + // start up the dora daemon and coordinator + cmd!(sh, "{dora} up").run()?; - let dataflow = Path::new("dataflow.yml"); - build_dataflow(dataflow).await?; + // start running the dataflow.yml -> outputs the UUID assigned to the dataflow + let output = cmd!(sh, "{dora} start dataflow.yml --attach").read_stderr()?; + let uuid = output.lines().next().context("no output")?; - run_dataflow(dataflow).await?; + // stop the dora daemon and coordinator again + cmd!(sh, "{dora} destroy").run()?; + + // verify that the node output was written to `out` + sh.change_dir("out"); + sh.change_dir(uuid); + let sink_output = sh.read_file("log_rust-sink.txt")?; + if sink_output.lines().count() < 50 { + eyre::bail!("sink did not receive the expected number of messages") + } Ok(()) } -async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> { - let cargo = std::env::var("CARGO").unwrap(); - let mut cmd = tokio::process::Command::new(&cargo); - cmd.arg("run"); - cmd.arg("--package").arg("dora-cli"); - cmd.arg("--").arg("build").arg(dataflow); - if !cmd.status().await?.success() { - bail!("failed to build dataflow"); - }; - Ok(()) +/// Prepares a shell and set the working directory to the parent folder of this file. +/// +/// You can use your system shell instead (e.g. `bash`); +fn prepare_shell() -> Result { + let sh = Shell::new()?; + let root = Path::new(env!("CARGO_MANIFEST_DIR")); + sh.change_dir(root.join(file!()).parent().unwrap()); + Ok(sh) } -async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> { - let cargo = std::env::var("CARGO").unwrap(); - let mut cmd = tokio::process::Command::new(&cargo); - cmd.arg("run"); - cmd.arg("--package").arg("dora-cli"); - cmd.arg("--") - .arg("daemon") - .arg("--run-dataflow") - .arg(dataflow); - if !cmd.status().await?.success() { - bail!("failed to run dataflow"); - }; - Ok(()) +/// Build the `dora` command-line executable from this repo. +/// +/// You can skip this step and run `cargo install dora-cli --locked` instead. +fn prepare_dora(sh: &Shell) -> eyre::Result { + cmd!(sh, "cargo build --package dora-cli").run()?; + let root = Path::new(env!("CARGO_MANIFEST_DIR")); + let dora = root.join("target").join("debug").join("dora"); + Ok(dora) }