|
|
|
@@ -1,46 +1,53 @@ |
|
|
|
use dora_tracing::set_up_tracing; |
|
|
|
use eyre::{bail, Context}; |
|
|
|
use std::path::Path; |
|
|
|
use eyre::ContextCompat; |
|
|
|
use std::path::{Path, PathBuf}; |
|
|
|
use xshell::{cmd, Shell}; |
|
|
|
|
|
|
|
#[tokio::main] |
|
|
|
async fn main() -> eyre::Result<()> { |
|
|
|
set_up_tracing("rust-dataflow-runner").wrap_err("failed to set up tracing subscriber")?; |
|
|
|
fn main() -> eyre::Result<()> { |
|
|
|
// create a new shell in this folder |
|
|
|
let sh = prepare_shell()?; |
|
|
|
// build the `dora` binary (you can skip this if you use `cargo install dora-cli`) |
|
|
|
let dora = prepare_dora(&sh)?; |
|
|
|
|
|
|
|
let root = Path::new(env!("CARGO_MANIFEST_DIR")); |
|
|
|
std::env::set_current_dir(root.join(file!()).parent().unwrap()) |
|
|
|
.wrap_err("failed to set working dir")?; |
|
|
|
// build the dataflow using `dora build` |
|
|
|
cmd!(sh, "{dora} build dataflow.yml").run()?; |
|
|
|
|
|
|
|
// start up the dora daemon and coordinator |
|
|
|
cmd!(sh, "{dora} up").run()?; |
|
|
|
|
|
|
|
let dataflow = Path::new("dataflow.yml"); |
|
|
|
build_dataflow(dataflow).await?; |
|
|
|
// start running the dataflow.yml -> outputs the UUID assigned to the dataflow |
|
|
|
let output = cmd!(sh, "{dora} start dataflow.yml --attach").read_stderr()?; |
|
|
|
let uuid = output.lines().next().context("no output")?; |
|
|
|
|
|
|
|
run_dataflow(dataflow).await?; |
|
|
|
// stop the dora daemon and coordinator again |
|
|
|
cmd!(sh, "{dora} destroy").run()?; |
|
|
|
|
|
|
|
// verify that the node output was written to `out` |
|
|
|
sh.change_dir("out"); |
|
|
|
sh.change_dir(uuid); |
|
|
|
let sink_output = sh.read_file("log_rust-sink.txt")?; |
|
|
|
if sink_output.lines().count() < 50 { |
|
|
|
eyre::bail!("sink did not receive the expected number of messages") |
|
|
|
} |
|
|
|
|
|
|
|
Ok(()) |
|
|
|
} |
|
|
|
|
|
|
|
async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> { |
|
|
|
let cargo = std::env::var("CARGO").unwrap(); |
|
|
|
let mut cmd = tokio::process::Command::new(&cargo); |
|
|
|
cmd.arg("run"); |
|
|
|
cmd.arg("--package").arg("dora-cli"); |
|
|
|
cmd.arg("--").arg("build").arg(dataflow); |
|
|
|
if !cmd.status().await?.success() { |
|
|
|
bail!("failed to build dataflow"); |
|
|
|
}; |
|
|
|
Ok(()) |
|
|
|
/// Prepares a shell and set the working directory to the parent folder of this file. |
|
|
|
/// |
|
|
|
/// You can use your system shell instead (e.g. `bash`); |
|
|
|
fn prepare_shell() -> Result<Shell, eyre::Error> { |
|
|
|
let sh = Shell::new()?; |
|
|
|
let root = Path::new(env!("CARGO_MANIFEST_DIR")); |
|
|
|
sh.change_dir(root.join(file!()).parent().unwrap()); |
|
|
|
Ok(sh) |
|
|
|
} |
|
|
|
|
|
|
|
async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> { |
|
|
|
let cargo = std::env::var("CARGO").unwrap(); |
|
|
|
let mut cmd = tokio::process::Command::new(&cargo); |
|
|
|
cmd.arg("run"); |
|
|
|
cmd.arg("--package").arg("dora-cli"); |
|
|
|
cmd.arg("--") |
|
|
|
.arg("daemon") |
|
|
|
.arg("--run-dataflow") |
|
|
|
.arg(dataflow); |
|
|
|
if !cmd.status().await?.success() { |
|
|
|
bail!("failed to run dataflow"); |
|
|
|
}; |
|
|
|
Ok(()) |
|
|
|
/// Build the `dora` command-line executable from this repo. |
|
|
|
/// |
|
|
|
/// You can skip this step and run `cargo install dora-cli --locked` instead. |
|
|
|
fn prepare_dora(sh: &Shell) -> eyre::Result<PathBuf> { |
|
|
|
cmd!(sh, "cargo build --package dora-cli").run()?; |
|
|
|
let root = Path::new(env!("CARGO_MANIFEST_DIR")); |
|
|
|
let dora = root.join("target").join("debug").join("dora"); |
|
|
|
Ok(dora) |
|
|
|
} |