Browse Source

Start working on a example that uses multiple daemons

tags/v0.2.3-rc
Philipp Oppermann 2 years ago
parent
commit
c515651264
Failed to extract signature
13 changed files with 379 additions and 19 deletions
  1. +29
    -0
      Cargo.lock
  2. +8
    -0
      Cargo.toml
  3. +1
    -0
      binaries/coordinator/Cargo.toml
  4. +55
    -18
      binaries/coordinator/src/lib.rs
  5. +1
    -1
      binaries/coordinator/src/main.rs
  6. +37
    -0
      examples/multiple-daemons/dataflow.yml
  7. +13
    -0
      examples/multiple-daemons/node/Cargo.toml
  8. +38
    -0
      examples/multiple-daemons/node/src/main.rs
  9. +13
    -0
      examples/multiple-daemons/operator/Cargo.toml
  10. +52
    -0
      examples/multiple-daemons/operator/src/lib.rs
  11. +83
    -0
      examples/multiple-daemons/run.rs
  12. +10
    -0
      examples/multiple-daemons/sink/Cargo.toml
  13. +39
    -0
      examples/multiple-daemons/sink/src/main.rs

+ 29
- 0
Cargo.lock View File

@@ -1068,6 +1068,7 @@ dependencies = [
name = "dora-coordinator"
version = "0.2.2"
dependencies = [
"clap 3.2.23",
"ctrlc",
"dora-core",
"dora-tracing",
@@ -1143,10 +1144,12 @@ dependencies = [
name = "dora-examples"
version = "0.0.0"
dependencies = [
"dora-coordinator",
"dora-core",
"dora-daemon",
"dunce",
"eyre",
"futures",
"serde_yaml 0.8.26",
"tokio",
"tracing",
@@ -2375,6 +2378,32 @@ version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"

[[package]]
name = "multiple-daemons-example-node"
version = "0.2.2"
dependencies = [
"dora-node-api",
"eyre",
"futures",
"rand",
"tokio",
]

[[package]]
name = "multiple-daemons-example-operator"
version = "0.2.2"
dependencies = [
"dora-operator-api",
]

[[package]]
name = "multiple-daemons-example-sink"
version = "0.2.2"
dependencies = [
"dora-node-api",
"eyre",
]

[[package]]
name = "nanorand"
version = "0.7.0"


+ 8
- 0
Cargo.toml View File

@@ -13,6 +13,7 @@ members = [
"binaries/runtime",
"examples/rust-dataflow/*",
"examples/benchmark/*",
"examples/multiple-daemons/*",
"libraries/communication-layer/*",
"libraries/core",
"libraries/message",
@@ -46,6 +47,7 @@ communication-layer-request-reply = { version = "0.2.2", path = "libraries/commu
dora-message = { version = "0.2.2", path = "libraries/message" }
dora-runtime = { version = "0.2.2", path = "binaries/runtime" }
dora-daemon = { version = "0.2.2", path = "binaries/daemon" }
dora-coordinator = { version = "0.2.2", path = "binaries/coordinator" }

[package]
name = "dora-examples"
@@ -58,12 +60,14 @@ license = "Apache-2.0"
eyre = "0.6.8"
tokio = "1.24.2"
dora-daemon = { workspace = true }
dora-coordinator = { workspace = true }
dora-core = { workspace = true }
dunce = "1.0.2"
serde_yaml = "0.8.23"
uuid = { version = "1.2.1", features = ["v4", "serde"] }
tracing = "0.1.36"
tracing-subscriber = "0.3.15"
futures = "0.3.25"

[[example]]
name = "c-dataflow"
@@ -92,3 +96,7 @@ path = "examples/python-operator-dataflow/run.rs"
[[example]]
name = "benchmark"
path = "examples/benchmark/run.rs"

[[example]]
name = "multiple-daemons"
path = "examples/multiple-daemons/run.rs"

+ 1
- 0
binaries/coordinator/Cargo.toml View File

@@ -30,3 +30,4 @@ serde_json = "1.0.86"
which = "4.3.0"
thiserror = "1.0.37"
ctrlc = "3.2.5"
clap = { version = "3.1.8", features = ["derive"] }

+ 55
- 18
binaries/coordinator/src/lib.rs View File

@@ -14,15 +14,19 @@ use dora_core::{
},
};
use eyre::{bail, eyre, ContextCompat, WrapErr};
use futures::{stream::FuturesUnordered, Stream, StreamExt};
use futures::{stream::FuturesUnordered, Future, Stream, StreamExt};
use futures_concurrency::stream::Merge;
use run::SpawnedDataflow;
use std::{
collections::{BTreeSet, HashMap},
path::Path,
path::{Path, PathBuf},
time::Duration,
};
use tokio::{net::TcpStream, sync::mpsc, task::JoinHandle};
use tokio::{
net::{TcpListener, TcpStream},
sync::mpsc,
task::JoinHandle,
};
use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream};
use uuid::Uuid;

@@ -31,27 +35,60 @@ mod listener;
mod run;
mod tcp_utils;

pub async fn run() -> eyre::Result<()> {
let mut tasks = FuturesUnordered::new();
#[derive(Debug, Clone, clap::Parser)]
#[clap(about = "Dora coordinator")]
pub struct Args {
#[clap(long)]
pub port: Option<u16>,

// start in daemon mode
start(&tasks).await?;
#[clap(long)]
pub run_dataflow: Option<PathBuf>,

tracing::debug!("coordinator main loop finished, waiting on spawned tasks");
while let Some(join_result) = tasks.next().await {
if let Err(err) = join_result {
tracing::error!("task panicked: {err}");
}
}
tracing::debug!("all spawned tasks finished, exiting..");
#[clap(long)]
pub dora_runtime_path: Option<PathBuf>,
}

pub async fn run(args: Args) -> eyre::Result<()> {
let ctrlc_events = set_up_ctrlc_handler()?;

let (_, task) = start(args, ctrlc_events).await?;

task.await?;

Ok(())
}

async fn start(tasks: &FuturesUnordered<JoinHandle<()>>) -> eyre::Result<()> {
let ctrlc_events = set_up_ctrlc_handler()?;
pub async fn start(
args: Args,
external_events: impl Stream<Item = Event> + Unpin,
) -> Result<(u16, impl Future<Output = eyre::Result<()>>), eyre::ErrReport> {
let port = args.port.unwrap_or(DORA_COORDINATOR_PORT_DEFAULT);
let listener = listener::create_listener(port).await?;
let port = listener
.local_addr()
.wrap_err("failed to get local addr of listener")?
.port();
let mut tasks = FuturesUnordered::new();
let future = async move {
start_inner(listener, &tasks, external_events).await?;

tracing::debug!("coordinator main loop finished, waiting on spawned tasks");
while let Some(join_result) = tasks.next().await {
if let Err(err) = join_result {
tracing::error!("task panicked: {err}");
}
}
tracing::debug!("all spawned tasks finished, exiting..");
Ok(())
};
Ok((port, future))
}

let listener = listener::create_listener(DORA_COORDINATOR_PORT_DEFAULT).await?;
async fn start_inner(
listener: TcpListener,
tasks: &FuturesUnordered<JoinHandle<()>>,
external_events: impl Stream<Item = Event> + Unpin,
) -> eyre::Result<()> {
let new_daemon_connections = TcpListenerStream::new(listener).map(|c| {
c.map(Event::NewDaemonConnection)
.wrap_err("failed to open connection")
@@ -75,7 +112,7 @@ async fn start(tasks: &FuturesUnordered<JoinHandle<()>>) -> eyre::Result<()> {
(
control_events,
new_daemon_connections,
ctrlc_events,
external_events,
daemon_watchdog_interval,
)
.merge(),


+ 1
- 1
binaries/coordinator/src/main.rs View File

@@ -8,5 +8,5 @@ async fn main() -> eyre::Result<()> {
#[cfg(feature = "tracing")]
set_up_tracing("dora-coordinator").context("failed to set up tracing subscriber")?;

dora_coordinator::run().await
dora_coordinator::run(clap::Parser::parse()).await
}

+ 37
- 0
examples/multiple-daemons/dataflow.yml View File

@@ -0,0 +1,37 @@
communication:
zenoh:
prefix: example-multiple-daemons

daemon_config: Tcp # or Shmem

nodes:
- id: rust-node
deploy:
machine: A
custom:
build: cargo build -p multiple-daemons-example-node
source: ../../target/debug/multiple-daemons-example-node
inputs:
tick: dora/timer/millis/10
outputs:
- random
- id: runtime-node
deploy:
machine: A
operators:
- id: rust-operator
build: cargo build -p multiple-daemons-example-operator
shared-library: ../../target/debug/rust_dataflow_example_operator
inputs:
tick: dora/timer/millis/100
random: rust-node/random
outputs:
- status
- id: rust-sink
deploy:
machine: B
custom:
build: cargo build -p multiple-daemons-example-sink
source: ../../target/debug/multiple-daemons-example-sink
inputs:
message: runtime-node/rust-operator/status

+ 13
- 0
examples/multiple-daemons/node/Cargo.toml View File

@@ -0,0 +1,13 @@
[package]
name = "multiple-daemons-example-node"
version.workspace = true
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
dora-node-api = { workspace = true, features = ["tracing"] }
eyre = "0.6.8"
futures = "0.3.21"
rand = "0.8.5"
tokio = { version = "1.24.2", features = ["rt", "macros"] }

+ 38
- 0
examples/multiple-daemons/node/src/main.rs View File

@@ -0,0 +1,38 @@
use dora_node_api::{self, dora_core::config::DataId, DoraNode, Event};

fn main() -> eyre::Result<()> {
println!("hello");

let output = DataId::from("random".to_owned());

let (mut node, mut events) = DoraNode::init_from_env()?;

for i in 0..100 {
let event = match events.recv() {
Some(input) => input,
None => break,
};

match event {
Event::Input {
id,
metadata,
data: _,
} => match id.as_str() {
"tick" => {
let random: u64 = rand::random();
println!("tick {i}, sending {random:#x}");
let data: &[u8] = &random.to_le_bytes();
node.send_output(output.clone(), metadata.parameters, data.len(), |out| {
out.copy_from_slice(data);
})?;
}
other => eprintln!("Ignoring unexpected input `{other}`"),
},
Event::Stop => println!("Received manual stop"),
other => eprintln!("Received unexpected input: {other:?}"),
}
}

Ok(())
}

+ 13
- 0
examples/multiple-daemons/operator/Cargo.toml View File

@@ -0,0 +1,13 @@
[package]
name = "multiple-daemons-example-operator"
version.workspace = true
edition = "2021"
license.workspace = true

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[lib]
crate-type = ["cdylib"]

[dependencies]
dora-operator-api = { workspace = true }

+ 52
- 0
examples/multiple-daemons/operator/src/lib.rs View File

@@ -0,0 +1,52 @@
#![warn(unsafe_op_in_unsafe_fn)]

use dora_operator_api::{register_operator, DoraOperator, DoraOutputSender, DoraStatus, Event};

register_operator!(ExampleOperator);

#[derive(Debug, Default)]
struct ExampleOperator {
ticks: usize,
}

impl DoraOperator for ExampleOperator {
fn on_event(
&mut self,
event: &Event,
output_sender: &mut DoraOutputSender,
) -> Result<DoraStatus, String> {
match event {
Event::Input { id, data } => match *id {
"tick" => {
self.ticks += 1;
}
"random" => {
let parsed = {
let data: [u8; 8] =
(*data).try_into().map_err(|_| "unexpected random data")?;
u64::from_le_bytes(data)
};
let output = format!(
"operator received random value {parsed:#x} after {} ticks",
self.ticks
);
output_sender.send("status".into(), output.into_bytes())?;
}
other => eprintln!("ignoring unexpected input {other}"),
},
Event::Stop => {}
Event::InputClosed { id } => {
println!("input `{id}` was closed");
if *id == "random" {
println!("`random` input was closed -> exiting");
return Ok(DoraStatus::Stop);
}
}
other => {
println!("received unknown event {other:?}");
}
}

Ok(DoraStatus::Continue)
}
}

+ 83
- 0
examples/multiple-daemons/run.rs View File

@@ -0,0 +1,83 @@
use eyre::{bail, Context};
use futures::stream;
use std::{
net::{Ipv4Addr, SocketAddr},
path::Path,
};
use tokio::task::JoinSet;
use tracing::metadata::LevelFilter;
use tracing_subscriber::Layer;

#[tokio::main]
async fn main() -> eyre::Result<()> {
set_up_tracing().wrap_err("failed to set up tracing subscriber")?;

let root = Path::new(env!("CARGO_MANIFEST_DIR"));
std::env::set_current_dir(root.join(file!()).parent().unwrap())
.wrap_err("failed to set working dir")?;

let dataflow = Path::new("dataflow.yml");
build_dataflow(dataflow).await?;

build_package("dora-runtime").await?;
let dora_runtime_path = Some(root.join("target").join("debug").join("dora-runtime"));

let (coordinator_port, coordinator) = dora_coordinator::start(
dora_coordinator::Args {
port: Some(0),
run_dataflow: Some(dataflow.to_path_buf()),
dora_runtime_path: dora_runtime_path.clone(),
},
stream::empty(),
)
.await?;
let coordinator_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), coordinator_port);
let daemon_a =
dora_daemon::Daemon::run(coordinator_addr, "A".into(), dora_runtime_path.clone());
let daemon_b = dora_daemon::Daemon::run(coordinator_addr, "B".into(), dora_runtime_path);

let mut tasks = JoinSet::new();
tasks.spawn(coordinator);
tasks.spawn(daemon_a);
tasks.spawn(daemon_b);

while let Some(res) = tasks.join_next().await {
res.unwrap()?;
}

Ok(())
}

async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--").arg("build").arg(dataflow);
if !cmd.status().await?.success() {
bail!("failed to build dataflow");
};
Ok(())
}

async fn build_package(package: &str) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("build");
cmd.arg("--package").arg(package);
if !cmd.status().await?.success() {
bail!("failed to build {package}");
};
Ok(())
}

fn set_up_tracing() -> eyre::Result<()> {
use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt;

let stdout_log = tracing_subscriber::fmt::layer()
.pretty()
.with_filter(LevelFilter::DEBUG);
let subscriber = tracing_subscriber::Registry::default().with(stdout_log);
tracing::subscriber::set_global_default(subscriber)
.context("failed to set tracing global subscriber")
}

+ 10
- 0
examples/multiple-daemons/sink/Cargo.toml View File

@@ -0,0 +1,10 @@
[package]
name = "multiple-daemons-example-sink"
version.workspace = true
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
dora-node-api = { workspace = true, features = ["tracing"] }
eyre = "0.6.8"

+ 39
- 0
examples/multiple-daemons/sink/src/main.rs View File

@@ -0,0 +1,39 @@
use dora_node_api::{self, DoraNode, Event};
use eyre::{bail, Context, ContextCompat};

fn main() -> eyre::Result<()> {
let (_node, mut events) = DoraNode::init_from_env()?;

while let Some(event) = events.recv() {
match event {
Event::Input {
id,
metadata: _,
data,
} => match id.as_str() {
"message" => {
let data = data.wrap_err("no data")?;
let received_string = std::str::from_utf8(&data)
.wrap_err("received message was not utf8-encoded")?;
println!("sink received message: {}", received_string);
if !received_string.starts_with("operator received random value ") {
bail!("unexpected message format (should start with 'operator received random value')")
}
if !received_string.ends_with(" ticks") {
bail!("unexpected message format (should end with 'ticks')")
}
}
other => eprintln!("Ignoring unexpected input `{other}`"),
},
Event::Stop => {
println!("Received manual stop");
}
Event::InputClosed { id } => {
println!("Input `{id}` was closed");
}
other => eprintln!("Received unexpected input: {other:?}"),
}
}

Ok(())
}

Loading…
Cancel
Save