Browse Source

Implement cli `destroy` command using zenoh-based control channel

tags/v0.0.0-test-pr-120
Philipp Oppermann 3 years ago
parent
commit
963f39fa99
Failed to extract signature
11 changed files with 459 additions and 247 deletions
  1. +56
    -23
      Cargo.lock
  2. +3
    -0
      binaries/cli/Cargo.toml
  3. +24
    -1
      binaries/cli/src/main.rs
  4. +4
    -1
      binaries/coordinator/Cargo.toml
  5. +112
    -0
      binaries/coordinator/src/control.rs
  6. +33
    -220
      binaries/coordinator/src/lib.rs
  7. +60
    -0
      binaries/coordinator/src/run/custom.rs
  8. +123
    -0
      binaries/coordinator/src/run/mod.rs
  9. +39
    -0
      binaries/coordinator/src/run/runtime.rs
  10. +2
    -2
      libraries/core/src/lib.rs
  11. +3
    -0
      libraries/core/src/topics.rs

+ 56
- 23
Cargo.lock View File

@@ -886,6 +886,7 @@ name = "dora-cli"
version = "0.1.0"
dependencies = [
"clap 4.0.3",
"communication-layer-pub-sub",
"dora-core",
"eyre",
"serde_yaml 0.9.11",
@@ -899,6 +900,7 @@ version = "0.1.0"
dependencies = [
"bincode",
"clap 3.2.20",
"communication-layer-pub-sub",
"dora-core",
"dora-message",
"dora-node-api",
@@ -1886,25 +1888,14 @@ dependencies = [

[[package]]
name = "mio"
version = "0.8.2"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52da4364ffb0e4fe33a9841a98a3f3014fb964045ce4f7a45a398243c8d6b0c9"
checksum = "57ee1c23c7c63b0c9250c339ffdc69255f110b298b901b9f6c82547b7b87caaf"
dependencies = [
"libc",
"log",
"miow",
"ntapi",
"wasi 0.11.0+wasi-snapshot-preview1",
"winapi",
]

[[package]]
name = "miow"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9f1c5b025cda876f66ef43a113f91ebc9f4ccef34843000e0adf6ebbab84e21"
dependencies = [
"winapi",
"windows-sys 0.36.1",
]

[[package]]
@@ -2351,7 +2342,7 @@ dependencies = [
"libc",
"redox_syscall",
"smallvec",
"windows-sys",
"windows-sys 0.32.0",
]

[[package]]
@@ -3570,9 +3561,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"

[[package]]
name = "tokio"
version = "1.20.1"
version = "1.21.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a8325f63a7d4774dd041e363b2409ed1c5cbbd0f867795e661df066b2b0a581"
checksum = "a9e03c497dc955702ba729190dc4aac6f2a0ce97f913e5b1b5912fc5039d9099"
dependencies = [
"autocfg 1.1.0",
"bytes",
@@ -3580,7 +3571,6 @@ dependencies = [
"memchr",
"mio",
"num_cpus",
"once_cell",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
@@ -4232,11 +4222,24 @@ version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3df6e476185f92a12c072be4a189a0210dcdcf512a1891d6dff9edb874deadc6"
dependencies = [
"windows_aarch64_msvc",
"windows_i686_gnu",
"windows_i686_msvc",
"windows_x86_64_gnu",
"windows_x86_64_msvc",
"windows_aarch64_msvc 0.32.0",
"windows_i686_gnu 0.32.0",
"windows_i686_msvc 0.32.0",
"windows_x86_64_gnu 0.32.0",
"windows_x86_64_msvc 0.32.0",
]

[[package]]
name = "windows-sys"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea04155a16a59f9eab786fe12a4a450e75cdb175f9e0d80da1e17db09f55b8d2"
dependencies = [
"windows_aarch64_msvc 0.36.1",
"windows_i686_gnu 0.36.1",
"windows_i686_msvc 0.36.1",
"windows_x86_64_gnu 0.36.1",
"windows_x86_64_msvc 0.36.1",
]

[[package]]
@@ -4245,30 +4248,60 @@ version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8e92753b1c443191654ec532f14c199742964a061be25d77d7a96f09db20bf5"

[[package]]
name = "windows_aarch64_msvc"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47"

[[package]]
name = "windows_i686_gnu"
version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a711c68811799e017b6038e0922cb27a5e2f43a2ddb609fe0b6f3eeda9de615"

[[package]]
name = "windows_i686_gnu"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6"

[[package]]
name = "windows_i686_msvc"
version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "146c11bb1a02615db74680b32a68e2d61f553cc24c4eb5b4ca10311740e44172"

[[package]]
name = "windows_i686_msvc"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024"

[[package]]
name = "windows_x86_64_gnu"
version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c912b12f7454c6620635bbff3450962753834be2a594819bd5e945af18ec64bc"

[[package]]
name = "windows_x86_64_gnu"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1"

[[package]]
name = "windows_x86_64_msvc"
version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "504a2476202769977a040c6364301a3f65d0cc9e3fb08600b2bda150a0488316"

[[package]]
name = "windows_x86_64_msvc"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680"

[[package]]
name = "with_builtin_macros"
version = "0.0.3"


+ 3
- 0
binaries/cli/Cargo.toml View File

@@ -16,3 +16,6 @@ dora-core = { path = "../../libraries/core" }
serde_yaml = "0.9.11"
tempfile = "3.3.0"
webbrowser = "0.8.0"
communication-layer-pub-sub = { path = "../../libraries/communication-layer", default-features = false, features = [
"zenoh",
] }

+ 24
- 1
binaries/cli/src/main.rs View File

@@ -1,5 +1,7 @@
use clap::Parser;
use eyre::Context;
use communication_layer_pub_sub::CommunicationLayer;
use dora_core::topics::{ZENOH_CONTROL_PREFIX, ZENOH_CONTROL_STOP_ALL};
use eyre::{eyre, Context};
use std::{io::Write, path::PathBuf};
use tempfile::NamedTempFile;

@@ -36,6 +38,8 @@ enum Command {
args: CommandNew,
},
Dashboard,
Up,
Destroy,
Start,
Stop,
Logs,
@@ -73,6 +77,14 @@ enum Lang {
fn main() -> eyre::Result<()> {
let args = Args::parse();

let mut zenoh_control_session =
communication_layer_pub_sub::zenoh::ZenohCommunicationLayer::init(
Default::default(),
ZENOH_CONTROL_PREFIX.into(),
)
.map_err(|err| eyre!(err))
.wrap_err("failed to open zenoh control session")?;

match args.command {
Command::Check {
dataflow,
@@ -113,8 +125,19 @@ fn main() -> eyre::Result<()> {
}
Command::New { args } => template::create(args)?,
Command::Dashboard => todo!(),
Command::Up => todo!(),
Command::Start => todo!(),
Command::Stop => todo!(),
Command::Destroy => {
let publisher = zenoh_control_session
.publisher(ZENOH_CONTROL_STOP_ALL)
.map_err(|err| eyre!(err))
.wrap_err("failed to create publisher for stop message")?;
publisher
.publish(&[])
.map_err(|err| eyre!(err))
.wrap_err("failed to publish stop message")?;
}
Command::Logs => todo!(),
Command::Metrics => todo!(),
Command::Stats => todo!(),


+ 4
- 1
binaries/coordinator/Cargo.toml View File

@@ -13,7 +13,7 @@ eyre = "0.6.7"
futures = "0.3.21"
serde = { version = "1.0.136", features = ["derive"] }
serde_yaml = "0.8.23"
tokio = { version = "1.17.0", features = ["full"] }
tokio = { version = "1.21.2", features = ["full"] }
tokio-stream = { version = "0.1.8", features = ["io-util"] }
tokio-util = { version = "0.7.1", features = ["codec"] }
clap = { version = "3.1.8", features = ["derive"] }
@@ -25,3 +25,6 @@ dora-message = { path = "../../libraries/message" }
tracing = "0.1.36"
tracing-subscriber = "0.3.15"
futures-concurrency = "5.0.1"
communication-layer-pub-sub = { path = "../../libraries/communication-layer", default-features = false, features = [
"zenoh",
] }

+ 112
- 0
binaries/coordinator/src/control.rs View File

@@ -0,0 +1,112 @@
use crate::Event;
use communication_layer_pub_sub::{CommunicationLayer, Subscriber};
use dora_core::topics::{
ZENOH_CONTROL_PREFIX, ZENOH_CONTROL_START_DATAFLOW, ZENOH_CONTROL_STOP_ALL,
};
use eyre::{eyre, WrapErr};
use futures::{Stream, StreamExt};
use std::path::Path;
use tokio_stream::wrappers::ReceiverStream;

pub(crate) fn control_events() -> impl Stream<Item = Event> {
let (tx, rx) = tokio::sync::mpsc::channel(2);

tokio::task::spawn_blocking(move || {
let result = subscribe_control_sync(tx.clone());
match result {
Ok(()) => {}
Err(error) => {
let _ = tx.blocking_send(Event::ControlChannelError(error));
}
}
});

ReceiverStream::new(rx).chain(futures::stream::iter(std::iter::from_fn(|| {
tracing::info!("control channel closed");
None
})))
}

fn subscribe_control_sync(tx: tokio::sync::mpsc::Sender<Event>) -> eyre::Result<()> {
let mut zenoh_control_session =
communication_layer_pub_sub::zenoh::ZenohCommunicationLayer::init(
Default::default(),
ZENOH_CONTROL_PREFIX.into(),
)
.map_err(|err| eyre!(err))
.wrap_err("failed to open zenoh control session")?;

let start_dataflow = zenoh_control_session
.subscribe(ZENOH_CONTROL_START_DATAFLOW)
.map_err(|err| eyre!(err))
.wrap_err("failed to subscribe to start dataflow topic")?;
let start_tx = tx.downgrade();
let _start_dataflow_thread =
std::thread::spawn(move || start_dataflow_handler(start_dataflow, start_tx));

let stop_tx = tx;
let stop = zenoh_control_session
.subscribe(ZENOH_CONTROL_STOP_ALL)
.map_err(|err| eyre!(err))
.wrap_err("failed to subscribe to stop all topic")?;
let _stop_thread = tokio::task::spawn_blocking(move || {
stop_handler(stop, stop_tx);
});

Ok(())
}

fn stop_handler(mut stop: Box<dyn Subscriber>, stop_tx: tokio::sync::mpsc::Sender<Event>) {
let send_result = match stop.recv().map_err(|err| eyre!(err)) {
Ok(_) => stop_tx.blocking_send(Event::Stop),
Err(err) => stop_tx.blocking_send(Event::ControlChannelError(
err.wrap_err("failed to receive on control channel"),
)),
};
let _ = send_result;
}

fn start_dataflow_handler(
mut start_dataflow: Box<dyn Subscriber>,
start_tx: tokio::sync::mpsc::WeakSender<Event>,
) {
loop {
let recv_result = start_dataflow.recv();
let start_tx = match start_tx.upgrade() {
Some(tx) => tx,
None => {
// control channel was closed after receiving stop message
break;
}
};
let message = match recv_result {
Ok(Some(message)) => message,
Ok(None) => break,
Err(err) => {
let send_result = start_tx.blocking_send(Event::ControlChannelError(
eyre!(err).wrap_err("failed to receive on start_dataflow topic"),
));
match send_result {
Ok(()) => continue,
Err(_) => break,
}
}
};
let data = message.get();
let path = match std::str::from_utf8(&data) {
Ok(path) => Path::new(path),
Err(err) => {
let send_result = start_tx.blocking_send(Event::ParseError(
eyre!(err).wrap_err("failed to parse start_dataflow message"),
));
match send_result {
Ok(()) => continue,
Err(_) => break,
}
}
};
let _ = start_tx.blocking_send(Event::StartDataflow {
path: path.to_owned(),
});
}
}

+ 33
- 220
binaries/coordinator/src/lib.rs View File

@@ -1,17 +1,11 @@
use dora_core::descriptor::{self, collect_dora_timers, CoreNodeKind, Descriptor};
use dora_node_api::{
communication,
config::{format_duration, NodeId},
};
use eyre::{bail, eyre, WrapErr};
use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
use eyre::WrapErr;
use futures::StreamExt;
use futures_concurrency::stream::Merge;
use std::{
env::consts::EXE_EXTENSION,
path::{Path, PathBuf},
time::Duration,
};
use tokio_stream::wrappers::{IntervalStream, ReceiverStream};
use std::path::{Path, PathBuf};
use tokio_stream::wrappers::ReceiverStream;

mod control;
mod run;

#[derive(Debug, Clone, clap::Parser)]
#[clap(about = "Dora coordinator")]
@@ -37,7 +31,7 @@ pub async fn run(args: Args) -> eyre::Result<()> {
match run_dataflow {
Some(path) => {
// start the given dataflow directly
self::run_dataflow(path.clone(), &runtime_path)
run::run_dataflow(path.clone(), &runtime_path)
.await
.wrap_err_with(|| format!("failed to run dataflow at {}", path.display()))?;
}
@@ -55,17 +49,25 @@ async fn start(runtime_path: &Path) -> eyre::Result<()> {
let mut dataflow_errors_tx = Some(dataflow_errors_tx);
let dataflow_error_events = ReceiverStream::new(dataflow_errors).map(Event::DataflowError);

let stop_events = tokio::time::sleep(Duration::from_secs(5))
.into_stream()
.map(|()| Event::Stop);
let control_events = control::control_events();

let mut events = (dataflow_error_events, stop_events).merge();
let mut events = (dataflow_error_events, control_events).merge();

while let Some(event) = events.next().await {
let mut stop = false;
match event {
Event::DataflowError(err) => {
tracing::error!("{err:?}");
}
Event::ParseError(err) => {
let err = err.wrap_err("failed to parse message");
tracing::error!("{err:?}");
}
Event::ControlChannelError(err) => {
let err = err.wrap_err("Stopping because of fatal control channel error");
tracing::error!("{err:?}");
stop = true;
}
Event::StartDataflow { path } => {
let runtime_path = runtime_path.to_owned();
let dataflow_errors_tx = match &dataflow_errors_tx {
@@ -76,7 +78,7 @@ async fn start(runtime_path: &Path) -> eyre::Result<()> {
}
};
let task = async move {
let result = run_dataflow(path.clone(), &runtime_path)
let result = run::run_dataflow(path.clone(), &runtime_path)
.await
.wrap_err_with(|| format!("failed to run dataflow at {}", path.display()));
match result {
@@ -90,216 +92,27 @@ async fn start(runtime_path: &Path) -> eyre::Result<()> {
}
Event::Stop => {
tracing::info!("Received stop command");
// ensure that no new dataflows can be started
dataflow_errors_tx = None;
stop = true;
}
}
}

Ok(())
}

enum Event {
DataflowError(eyre::Report),
StartDataflow { path: PathBuf },
Stop,
}

async fn run_dataflow(dataflow_path: PathBuf, runtime: &Path) -> eyre::Result<()> {
let runtime = runtime.with_extension(EXE_EXTENSION);
let descriptor = read_descriptor(&dataflow_path).await.wrap_err_with(|| {
format!(
"failed to read dataflow descriptor at {}",
dataflow_path.display()
)
})?;

let nodes = descriptor.resolve_aliases();
let dora_timers = collect_dora_timers(&nodes);
let communication_config = {
let mut config = descriptor.communication;
// add uuid as prefix to ensure isolation
config.add_topic_prefix(&uuid::Uuid::new_v4().to_string());
config
};

if nodes
.iter()
.any(|n| matches!(n.kind, CoreNodeKind::Runtime(_)))
&& !runtime.is_file()
{
bail!(
"There is no runtime at {}, or it is not a file",
runtime.display()
);
}

let mut tasks = FuturesUnordered::new();
for node in nodes {
let node_id = node.id.clone();
if stop {
tracing::info!("stopping...");

match node.kind {
descriptor::CoreNodeKind::Custom(node) => {
let result = spawn_custom_node(node_id.clone(), &node, &communication_config)
.wrap_err_with(|| format!("failed to spawn custom node {node_id}"))?;
tasks.push(result);
}
descriptor::CoreNodeKind::Runtime(node) => {
if !node.operators.is_empty() {
let result =
spawn_runtime_node(&runtime, node_id.clone(), &node, &communication_config)
.wrap_err_with(|| format!("failed to spawn runtime node {node_id}"))?;
tasks.push(result);
}
}
// ensure that no new dataflows can be started
dataflow_errors_tx = None;
}
}

for interval in dora_timers {
let communication_config = communication_config.clone();
let mut communication =
tokio::task::spawn_blocking(move || communication::init(&communication_config))
.await
.wrap_err("failed to join communication layer init task")?
.wrap_err("failed to init communication layer")?;
tokio::spawn(async move {
let topic = {
let duration = format_duration(interval);
format!("dora/timer/{duration}")
};
let metadata = dora_message::Metadata::default();
let data = metadata.serialize().unwrap();
let mut stream = IntervalStream::new(tokio::time::interval(interval));
while (stream.next().await).is_some() {
communication
.publisher(&topic)
.unwrap()
.publish(&data)
.expect("failed to publish timer tick message");
}
});
}

while let Some(task_result) = tasks.next().await {
task_result
.wrap_err("failed to join async task")?
.wrap_err("custom node failed")?;
}
tracing::info!("stopped");

Ok(())
}

#[tracing::instrument]
fn spawn_custom_node(
node_id: NodeId,
node: &descriptor::CustomNode,
communication: &dora_node_api::config::CommunicationConfig,
) -> eyre::Result<tokio::task::JoinHandle<eyre::Result<(), eyre::Error>>> {
let mut args = node.run.split_ascii_whitespace();
let cmd = {
let raw = Path::new(
args.next()
.ok_or_else(|| eyre!("`run` field must not be empty"))?,
);
let path = if raw.extension().is_none() {
raw.with_extension(EXE_EXTENSION)
} else {
raw.to_owned()
};
path.canonicalize()
.wrap_err_with(|| format!("no node exists at `{}`", path.display()))?
};

let mut command = tokio::process::Command::new(cmd);
command.args(args);
command_init_common_env(&mut command, &node_id, communication)?;
command.env(
"DORA_NODE_RUN_CONFIG",
serde_yaml::to_string(&node.run_config)
.wrap_err("failed to serialize custom node run config")?,
);

// Injecting the env variable defined in the `yaml` into
// the node runtime.
if let Some(envs) = &node.env {
for (key, value) in envs {
command.env(key, value.to_string());
}
}

let mut child = command
.spawn()
.wrap_err_with(|| format!("failed to run command `{}`", &node.run))?;
let result = tokio::spawn(async move {
let status = child.wait().await.context("child process failed")?;
if status.success() {
tracing::info!("node {node_id} finished");
Ok(())
} else if let Some(code) = status.code() {
Err(eyre!("node {node_id} failed with exit code: {code}"))
} else {
Err(eyre!("node {node_id} failed (unknown exit code)"))
}
});
Ok(result)
}

#[tracing::instrument(skip(node))]
fn spawn_runtime_node(
runtime: &Path,
node_id: NodeId,
node: &descriptor::RuntimeNode,
communication: &dora_node_api::config::CommunicationConfig,
) -> eyre::Result<tokio::task::JoinHandle<eyre::Result<(), eyre::Error>>> {
let mut command = tokio::process::Command::new(runtime);
command_init_common_env(&mut command, &node_id, communication)?;
command.env(
"DORA_OPERATORS",
serde_yaml::to_string(&node.operators)
.wrap_err("failed to serialize custom node run config")?,
);

let mut child = command
.spawn()
.wrap_err_with(|| format!("failed to run runtime at `{}`", runtime.display()))?;
let result = tokio::spawn(async move {
let status = child.wait().await.context("child process failed")?;
if status.success() {
tracing::info!("runtime node {node_id} finished");
Ok(())
} else if let Some(code) = status.code() {
Err(eyre!(
"runtime node {node_id} failed with exit code: {code}"
))
} else {
Err(eyre!("runtime node {node_id} failed (unknown exit code)"))
}
});
Ok(result)
}

fn command_init_common_env(
command: &mut tokio::process::Command,
node_id: &NodeId,
communication: &dora_node_api::config::CommunicationConfig,
) -> Result<(), eyre::Error> {
command.env(
"DORA_NODE_ID",
serde_yaml::to_string(&node_id).wrap_err("failed to serialize custom node ID")?,
);
command.env(
"DORA_COMMUNICATION_CONFIG",
serde_yaml::to_string(communication)
.wrap_err("failed to serialize communication config")?,
);
Ok(())
}

async fn read_descriptor(file: &Path) -> Result<Descriptor, eyre::Error> {
let descriptor_file = tokio::fs::read(file)
.await
.context("failed to open given file")?;
let descriptor: Descriptor =
serde_yaml::from_slice(&descriptor_file).context("failed to parse given descriptor")?;
Ok(descriptor)
enum Event {
DataflowError(eyre::Report),
ControlChannelError(eyre::Report),
StartDataflow { path: PathBuf },
ParseError(eyre::Report),
Stop,
}

+ 60
- 0
binaries/coordinator/src/run/custom.rs View File

@@ -0,0 +1,60 @@
use super::command_init_common_env;
use dora_core::descriptor;
use dora_node_api::config::NodeId;
use eyre::{eyre, WrapErr};
use std::{env::consts::EXE_EXTENSION, path::Path};

#[tracing::instrument]
pub(super) fn spawn_custom_node(
node_id: NodeId,
node: &descriptor::CustomNode,
communication: &dora_node_api::config::CommunicationConfig,
) -> eyre::Result<tokio::task::JoinHandle<eyre::Result<(), eyre::Error>>> {
let mut args = node.run.split_ascii_whitespace();
let cmd = {
let raw = Path::new(
args.next()
.ok_or_else(|| eyre!("`run` field must not be empty"))?,
);
let path = if raw.extension().is_none() {
raw.with_extension(EXE_EXTENSION)
} else {
raw.to_owned()
};
path.canonicalize()
.wrap_err_with(|| format!("no node exists at `{}`", path.display()))?
};

let mut command = tokio::process::Command::new(cmd);
command.args(args);
command_init_common_env(&mut command, &node_id, communication)?;
command.env(
"DORA_NODE_RUN_CONFIG",
serde_yaml::to_string(&node.run_config)
.wrap_err("failed to serialize custom node run config")?,
);

// Injecting the env variable defined in the `yaml` into
// the node runtime.
if let Some(envs) = &node.env {
for (key, value) in envs {
command.env(key, value.to_string());
}
}

let mut child = command
.spawn()
.wrap_err_with(|| format!("failed to run command `{}`", &node.run))?;
let result = tokio::spawn(async move {
let status = child.wait().await.context("child process failed")?;
if status.success() {
tracing::info!("node {node_id} finished");
Ok(())
} else if let Some(code) = status.code() {
Err(eyre!("node {node_id} failed with exit code: {code}"))
} else {
Err(eyre!("node {node_id} failed (unknown exit code)"))
}
});
Ok(result)
}

+ 123
- 0
binaries/coordinator/src/run/mod.rs View File

@@ -0,0 +1,123 @@
use self::{custom::spawn_custom_node, runtime::spawn_runtime_node};
use dora_core::descriptor::{self, collect_dora_timers, CoreNodeKind, Descriptor, NodeId};
use dora_node_api::{communication, config::format_duration};
use eyre::{bail, WrapErr};
use futures::{stream::FuturesUnordered, StreamExt};
use std::{
env::consts::EXE_EXTENSION,
path::{Path, PathBuf},
};
use tokio_stream::wrappers::IntervalStream;

mod custom;
mod runtime;

pub async fn run_dataflow(dataflow_path: PathBuf, runtime: &Path) -> eyre::Result<()> {
let runtime = runtime.with_extension(EXE_EXTENSION);
let descriptor = read_descriptor(&dataflow_path).await.wrap_err_with(|| {
format!(
"failed to read dataflow descriptor at {}",
dataflow_path.display()
)
})?;

let nodes = descriptor.resolve_aliases();
let dora_timers = collect_dora_timers(&nodes);
let communication_config = {
let mut config = descriptor.communication;
// add uuid as prefix to ensure isolation
config.add_topic_prefix(&uuid::Uuid::new_v4().to_string());
config
};

if nodes
.iter()
.any(|n| matches!(n.kind, CoreNodeKind::Runtime(_)))
&& !runtime.is_file()
{
bail!(
"There is no runtime at {}, or it is not a file",
runtime.display()
);
}

let mut tasks = FuturesUnordered::new();
for node in nodes {
let node_id = node.id.clone();

match node.kind {
descriptor::CoreNodeKind::Custom(node) => {
let result = spawn_custom_node(node_id.clone(), &node, &communication_config)
.wrap_err_with(|| format!("failed to spawn custom node {node_id}"))?;
tasks.push(result);
}
descriptor::CoreNodeKind::Runtime(node) => {
if !node.operators.is_empty() {
let result =
spawn_runtime_node(&runtime, node_id.clone(), &node, &communication_config)
.wrap_err_with(|| format!("failed to spawn runtime node {node_id}"))?;
tasks.push(result);
}
}
}
}

for interval in dora_timers {
let communication_config = communication_config.clone();
let mut communication =
tokio::task::spawn_blocking(move || communication::init(&communication_config))
.await
.wrap_err("failed to join communication layer init task")?
.wrap_err("failed to init communication layer")?;
tokio::spawn(async move {
let topic = {
let duration = format_duration(interval);
format!("dora/timer/{duration}")
};
let metadata = dora_message::Metadata::default();
let data = metadata.serialize().unwrap();
let mut stream = IntervalStream::new(tokio::time::interval(interval));
while (stream.next().await).is_some() {
communication
.publisher(&topic)
.unwrap()
.publish(&data)
.expect("failed to publish timer tick message");
}
});
}

while let Some(task_result) = tasks.next().await {
task_result
.wrap_err("failed to join async task")?
.wrap_err("custom node failed")?;
}

Ok(())
}

async fn read_descriptor(file: &Path) -> Result<Descriptor, eyre::Error> {
let descriptor_file = tokio::fs::read(file)
.await
.context("failed to open given file")?;
let descriptor: Descriptor =
serde_yaml::from_slice(&descriptor_file).context("failed to parse given descriptor")?;
Ok(descriptor)
}

fn command_init_common_env(
command: &mut tokio::process::Command,
node_id: &NodeId,
communication: &dora_node_api::config::CommunicationConfig,
) -> Result<(), eyre::Error> {
command.env(
"DORA_NODE_ID",
serde_yaml::to_string(&node_id).wrap_err("failed to serialize custom node ID")?,
);
command.env(
"DORA_COMMUNICATION_CONFIG",
serde_yaml::to_string(communication)
.wrap_err("failed to serialize communication config")?,
);
Ok(())
}

+ 39
- 0
binaries/coordinator/src/run/runtime.rs View File

@@ -0,0 +1,39 @@
use super::command_init_common_env;
use dora_core::descriptor;
use dora_node_api::config::NodeId;
use eyre::{eyre, WrapErr};
use std::path::Path;

#[tracing::instrument(skip(node))]
pub fn spawn_runtime_node(
runtime: &Path,
node_id: NodeId,
node: &descriptor::RuntimeNode,
communication: &dora_node_api::config::CommunicationConfig,
) -> eyre::Result<tokio::task::JoinHandle<eyre::Result<(), eyre::Error>>> {
let mut command = tokio::process::Command::new(runtime);
command_init_common_env(&mut command, &node_id, communication)?;
command.env(
"DORA_OPERATORS",
serde_yaml::to_string(&node.operators)
.wrap_err("failed to serialize custom node run config")?,
);

let mut child = command
.spawn()
.wrap_err_with(|| format!("failed to run runtime at `{}`", runtime.display()))?;
let result = tokio::spawn(async move {
let status = child.wait().await.context("child process failed")?;
if status.success() {
tracing::info!("runtime node {node_id} finished");
Ok(())
} else if let Some(code) = status.code() {
Err(eyre!(
"runtime node {node_id} failed with exit code: {code}"
))
} else {
Err(eyre!("runtime node {node_id} failed (unknown exit code)"))
}
});
Ok(result)
}

+ 2
- 2
libraries/core/src/lib.rs View File

@@ -1,11 +1,11 @@
use eyre::{bail, eyre};
use std::{
env::consts::{DLL_PREFIX, DLL_SUFFIX},
path::Path,
};

use eyre::{bail, eyre};

pub mod descriptor;
pub mod topics;

pub fn adjust_shared_library_path(path: &Path) -> Result<std::path::PathBuf, eyre::ErrReport> {
let file_name = path


+ 3
- 0
libraries/core/src/topics.rs View File

@@ -0,0 +1,3 @@
pub const ZENOH_CONTROL_PREFIX: &str = "dora_control";
pub const ZENOH_CONTROL_STOP_ALL: &str = "stop_all";
pub const ZENOH_CONTROL_START_DATAFLOW: &str = "start_dataflow";

Loading…
Cancel
Save