Browse Source

Merge pull request #410 from dora-rs/single-binary

tags/v0.3.2-rc2
Haixuan Xavier Tao GitHub 2 years ago
parent
commit
f0cb96571b
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
35 changed files with 392 additions and 416 deletions
  1. +0
    -2
      .github/workflows/ci.yml
  2. +3
    -16
      .github/workflows/release.yml
  3. +6
    -46
      Cargo.lock
  4. +0
    -3
      Cargo.toml
  5. +0
    -1
      apis/c/node/Cargo.toml
  6. +2
    -2
      apis/c/operator/build.rs
  7. +0
    -7
      apis/rust/node/Cargo.toml
  8. +6
    -0
      binaries/cli/Cargo.toml
  9. +88
    -18
      binaries/cli/src/main.rs
  10. +13
    -17
      binaries/cli/src/up.rs
  11. +1
    -8
      binaries/coordinator/Cargo.toml
  12. +30
    -43
      binaries/coordinator/src/lib.rs
  13. +0
    -12
      binaries/coordinator/src/main.rs
  14. +1
    -6
      binaries/daemon/Cargo.toml
  15. +6
    -0
      binaries/daemon/src/coordinator.rs
  16. +44
    -12
      binaries/daemon/src/lib.rs
  17. +0
    -98
      binaries/daemon/src/main.rs
  18. +1
    -1
      binaries/daemon/src/spawn.rs
  19. +0
    -3
      binaries/runtime/Cargo.toml
  20. +16
    -13
      examples/benchmark/run.rs
  21. +16
    -12
      examples/c++-dataflow/run.rs
  22. +16
    -13
      examples/c-dataflow/run.rs
  23. +16
    -12
      examples/cmake-dataflow/run.rs
  24. +22
    -20
      examples/multiple-daemons/run.rs
  25. +17
    -2
      examples/python-dataflow/run.rs
  26. +17
    -2
      examples/python-operator-dataflow/run.rs
  27. +17
    -2
      examples/python-ros2-dataflow/run.rs
  28. +16
    -12
      examples/rust-dataflow-url/run.rs
  29. +16
    -13
      examples/rust-dataflow/run.rs
  30. +16
    -13
      examples/rust-ros2-dataflow/run.rs
  31. +1
    -2
      libraries/core/Cargo.toml
  32. +5
    -1
      libraries/core/src/daemon_messages.rs
  33. +0
    -1
      libraries/extensions/ros2-bridge/python/Cargo.toml
  34. +0
    -2
      libraries/extensions/telemetry/metrics/Cargo.toml
  35. +0
    -1
      libraries/extensions/telemetry/tracing/Cargo.toml

+ 0
- 2
.github/workflows/ci.yml View File

@@ -236,8 +236,6 @@ jobs:
# fail-fast by using bash shell explictly
shell: bash
run: |
cargo install --path binaries/coordinator --locked
cargo install --path binaries/daemon --locked
cargo install --path binaries/cli --locked
- name: "Test CLI"
timeout-minutes: 30


+ 3
- 16
.github/workflows/release.yml View File

@@ -15,7 +15,6 @@ jobs:
strategy:
matrix:
platform: [ubuntu-20.04]
python-version: ["3.7"]
fail-fast: false
runs-on: ${{ matrix.platform }}

@@ -72,7 +71,6 @@ jobs:
strategy:
matrix:
platform: [windows-2022]
python-version: ["3.7"]
fail-fast: false
runs-on: ${{ matrix.platform }}

@@ -83,15 +81,13 @@ jobs:

- name: "Build binaries"
timeout-minutes: 60
run: "cargo build --release -p dora-coordinator -p dora-cli -p dora-daemon"
run: "cargo build --release -p dora-cli"

- name: Create Archive (Windows)
if: runner.os == 'Windows'
shell: powershell
run: |
New-Item -Path archive -ItemType Directory
Copy-Item target/release/dora-coordinator.exe -Destination archive
Copy-Item target/release/dora-daemon.exe -Destination archive
Copy-Item target/release/dora.exe -Destination archive/dora.exe
Compress-Archive -Path archive\* -DestinationPath archive.zip

@@ -111,7 +107,6 @@ jobs:
strategy:
matrix:
platform: [macos-12, ubuntu-20.04]
python-version: ["3.7"]
fail-fast: false
runs-on: ${{ matrix.platform }}

@@ -122,14 +117,12 @@ jobs:

- name: "Build binaries"
timeout-minutes: 60
run: "cargo build --release -p dora-coordinator -p dora-cli -p dora-daemon"
run: "cargo build --release -p dora-cli"

- name: "Create Archive (Unix)"
if: runner.os == 'Linux' || runner.os == 'macOS'
run: |
mkdir archive
cp target/release/dora-coordinator archive
cp target/release/dora-daemon archive
cp target/release/dora archive/dora
cd archive
zip -r ../archive.zip .
@@ -151,7 +144,6 @@ jobs:
strategy:
matrix:
platform: [ubuntu-20.04]
python-version: ["3.7"]
fail-fast: false
runs-on: ${{ matrix.platform }}

@@ -166,14 +158,12 @@ jobs:
with:
use-cross: true
command: build
args: --release --target aarch64-unknown-linux-gnu -p dora-coordinator -p dora-cli -p dora-daemon
args: --release --target aarch64-unknown-linux-gnu -p dora-cli

- name: "Archive Linux ARM64"
if: runner.os == 'Linux'
run: |
mkdir archive_aarch64
cp target/aarch64-unknown-linux-gnu/release/dora-coordinator archive_aarch64
cp target/aarch64-unknown-linux-gnu/release/dora-daemon archive_aarch64
cp target/aarch64-unknown-linux-gnu/release/dora archive_aarch64/dora
cd archive_aarch64
zip -r ../archive_aarch64.zip .
@@ -196,7 +186,6 @@ jobs:
strategy:
matrix:
platform: [macos-12]
python-version: ["3.7"]
fail-fast: false
runs-on: ${{ matrix.platform }}

@@ -219,8 +208,6 @@ jobs:
if: runner.os == 'macOS'
run: |
mkdir archive_aarch64
cp target/aarch64-apple-darwin/release/dora-coordinator archive_aarch64
cp target/aarch64-apple-darwin/release/dora-daemon archive_aarch64
cp target/aarch64-apple-darwin/release/dora archive_aarch64/dora
cd archive_aarch64
zip -r ../archive_aarch64.zip .


+ 6
- 46
Cargo.lock View File

@@ -814,12 +814,6 @@ version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "981520c98f422fcc584dc1a95c334e6953900b9106bc47a9839b81790009eb21"

[[package]]
name = "capnp"
version = "0.14.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dca085c2c7d9d65ad749d450b19b551efaa8e3476a439bdca07aca8533097f3"

[[package]]
name = "cc"
version = "1.0.83"
@@ -1417,17 +1411,23 @@ dependencies = [
"clap 4.4.6",
"communication-layer-request-reply",
"ctrlc",
"dora-coordinator",
"dora-core",
"dora-daemon",
"dora-node-api-c",
"dora-operator-api-c",
"dora-runtime",
"dora-tracing",
"eyre",
"futures",
"inquire",
"notify",
"serde",
"serde_json",
"serde_yaml 0.9.25",
"termcolor",
"tokio",
"tokio-stream",
"tracing",
"uuid",
"webbrowser",
@@ -1437,7 +1437,6 @@ dependencies = [
name = "dora-coordinator"
version = "0.3.1"
dependencies = [
"clap 4.4.6",
"ctrlc",
"dora-core",
"dora-tracing",
@@ -1445,17 +1444,11 @@ dependencies = [
"futures",
"futures-concurrency",
"names",
"rand",
"serde",
"serde_json",
"serde_yaml 0.8.26",
"thiserror",
"tokio",
"tokio-stream",
"tracing",
"uuid",
"which",
"zenoh",
]

[[package]]
@@ -1468,7 +1461,6 @@ dependencies = [
"once_cell",
"serde",
"serde-with-expand-env",
"serde_bytes",
"serde_yaml 0.9.25",
"tokio",
"tracing",
@@ -1481,20 +1473,16 @@ name = "dora-daemon"
version = "0.3.1"
dependencies = [
"aligned-vec",
"arrow-schema",
"async-trait",
"bincode",
"clap 4.4.6",
"ctrlc",
"dora-core",
"dora-download",
"dora-runtime",
"dora-tracing",
"eyre",
"flume",
"futures",
"futures-concurrency",
"serde",
"serde_json",
"serde_yaml 0.8.26",
"shared-memory-server",
@@ -1503,7 +1491,6 @@ dependencies = [
"tracing",
"tracing-opentelemetry",
"uuid",
"which",
]

[[package]]
@@ -1520,10 +1507,8 @@ dependencies = [
name = "dora-examples"
version = "0.0.0"
dependencies = [
"clap 4.4.6",
"dora-coordinator",
"dora-core",
"dora-daemon",
"dora-download",
"dora-tracing",
"dunce",
@@ -1533,7 +1518,6 @@ dependencies = [
"tokio",
"tokio-stream",
"tracing",
"tracing-subscriber",
"uuid",
]

@@ -1552,11 +1536,9 @@ dependencies = [
name = "dora-metrics"
version = "0.3.1"
dependencies = [
"futures",
"opentelemetry 0.21.0",
"opentelemetry-otlp",
"opentelemetry_sdk 0.21.1",
"tokio",
]

[[package]]
@@ -1565,9 +1547,7 @@ version = "0.3.1"
dependencies = [
"aligned-vec",
"arrow",
"arrow-schema",
"bincode",
"capnp",
"dora-arrow-convert",
"dora-core",
"dora-tracing",
@@ -1576,16 +1556,11 @@ dependencies = [
"futures",
"futures-concurrency",
"futures-timer",
"once_cell",
"serde",
"serde_json",
"serde_yaml 0.8.26",
"shared-memory-server",
"shared_memory_extended",
"thiserror",
"tokio",
"tracing",
"uuid",
]

[[package]]
@@ -1595,7 +1570,6 @@ dependencies = [
"arrow-array",
"dora-node-api",
"eyre",
"flume",
"tracing",
]

@@ -1750,7 +1724,6 @@ dependencies = [
"dora-ros2-bridge",
"dora-ros2-bridge-msg-gen",
"eyre",
"flume",
"futures",
"pyo3",
"serde",
@@ -1762,8 +1735,6 @@ version = "0.3.1"
dependencies = [
"aligned-vec",
"arrow",
"arrow-schema",
"clap 4.4.6",
"dora-core",
"dora-download",
"dora-metrics",
@@ -1785,7 +1756,6 @@ dependencies = [
"tokio-stream",
"tracing",
"tracing-opentelemetry",
"tracing-subscriber",
]

[[package]]
@@ -1795,7 +1765,6 @@ dependencies = [
"eyre",
"opentelemetry 0.18.0",
"opentelemetry-jaeger",
"tokio",
"tracing",
"tracing-opentelemetry",
"tracing-subscriber",
@@ -5007,15 +4976,6 @@ dependencies = [
"shellexpand 2.1.2",
]

[[package]]
name = "serde_bytes"
version = "0.11.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab33ec92f677585af6d88c65593ae2375adde54efdbf16d597f2cbc7a6d368ff"
dependencies = [
"serde",
]

[[package]]
name = "serde_derive"
version = "1.0.195"


+ 0
- 3
Cargo.toml View File

@@ -81,7 +81,6 @@ ros2-examples = []
[dev-dependencies]
eyre = "0.6.8"
tokio = "1.24.2"
dora-daemon = { workspace = true }
dora-coordinator = { workspace = true }
dora-core = { workspace = true }
dora-tracing = { workspace = true }
@@ -90,10 +89,8 @@ dunce = "1.0.2"
serde_yaml = "0.8.23"
uuid = { version = "1.2.1", features = ["v4", "serde"] }
tracing = "0.1.36"
tracing-subscriber = "0.3.15"
futures = "0.3.25"
tokio-stream = "0.1.11"
clap = { version = "4.0.3", features = ["derive"] }

[[example]]
name = "c-dataflow"


+ 0
- 1
apis/c/node/Cargo.toml View File

@@ -19,7 +19,6 @@ tracing = ["dora-node-api/tracing"]

[dependencies]
eyre = "0.6.8"
flume = "0.10.14"
tracing = "0.1.33"
arrow-array = { workspace = true }



+ 2
- 2
apis/c/operator/build.rs View File

@@ -2,10 +2,10 @@ use std::path::Path;

fn main() {
dora_operator_api_types::generate_headers(Path::new("operator_types.h"))
.expect("failed to create operator_api.h");
.expect("failed to create operator_types.h");

// don't rebuild on changes (otherwise we rebuild on every run as we're
// writing the `operator_types.h` file; cargo will still rerun this script
// when the `dora_operator_api_types` crate changes)
println!("cargo:rerun-if-changed=");
println!("cargo:rerun-if-changed=build.rs");
}

+ 0
- 7
apis/rust/node/Cargo.toml View File

@@ -14,20 +14,13 @@ tracing = ["dep:dora-tracing"]
dora-core = { workspace = true }
shared-memory-server = { workspace = true }
eyre = "0.6.7"
once_cell = "1.13.0"
serde = { version = "1.0.136", features = ["derive"] }
serde_yaml = "0.8.23"
serde_json = "1.0.89"
thiserror = "1.0.30"
tracing = "0.1.33"
flume = "0.10.14"
uuid = { version = "1.1.2", features = ["v4"] }
capnp = "0.14.11"
bincode = "1.3.3"
shared_memory_extended = "0.13.0"
dora-tracing = { workspace = true, optional = true }
arrow = { workspace = true }
arrow-schema = { workspace = true }
futures = "0.3.28"
futures-concurrency = "7.3.0"
futures-timer = "3.0.2"


+ 6
- 0
binaries/cli/Cargo.toml View File

@@ -35,3 +35,9 @@ ctrlc = "3.2.5"
tracing = "0.1.36"
dora-tracing = { workspace = true, optional = true }
bat = "0.23.0"
dora-daemon = { workspace = true }
dora-coordinator = { workspace = true }
dora-runtime = { workspace = true }
tokio = { version = "1.20.1", features = ["full"] }
tokio-stream = { version = "0.1.8", features = ["io-util", "net"] }
futures = "0.3.21"

+ 88
- 18
binaries/cli/src/main.rs View File

@@ -1,15 +1,22 @@
use std::path::PathBuf;
use std::{net::Ipv4Addr, path::PathBuf};

use attach::attach_dataflow;
use clap::Parser;
use communication_layer_request_reply::{RequestReplyLayer, TcpLayer, TcpRequestReplyConnection};
use dora_coordinator::Event;
use dora_core::{
descriptor::Descriptor,
topics::{control_socket_addr, ControlRequest, ControlRequestReply, DataflowId},
topics::{
control_socket_addr, ControlRequest, ControlRequestReply, DataflowId,
DORA_COORDINATOR_PORT_DEFAULT,
},
};
use dora_daemon::Daemon;
#[cfg(feature = "tracing")]
use dora_tracing::set_up_tracing;
use eyre::{bail, Context};
use std::net::SocketAddr;
use tokio::runtime::Builder;
use uuid::Uuid;

mod attach;
@@ -56,10 +63,6 @@ enum Command {
Up {
#[clap(long)]
config: Option<PathBuf>,
#[clap(long)]
coordinator_path: Option<PathBuf>,
#[clap(long)]
daemon_path: Option<PathBuf>,
},
/// Destroy running coordinator and daemon. If some dataflows are still running, they will be stopped first.
Destroy {
@@ -92,6 +95,20 @@ enum Command {
// Stats,
// Get,
// Upgrade,
/// Run daemon
Daemon {
#[clap(long)]
machine_id: Option<String>,
#[clap(long)]
coordinator_addr: Option<SocketAddr>,

#[clap(long)]
run_dataflow: Option<PathBuf>,
},
/// Run runtime
Runtime,
/// Run coordinator
Coordinator { port: Option<u16> },
}

#[derive(Debug, clap::Args)]
@@ -127,10 +144,24 @@ fn main() {
}

fn run() -> eyre::Result<()> {
#[cfg(feature = "tracing")]
set_up_tracing("dora-cli").context("failed to set up tracing subscriber")?;
let args = Args::parse();

#[cfg(feature = "tracing")]
match args.command {
Command::Daemon { .. } => {
set_up_tracing("dora-daemon").context("failed to set up tracing subscriber")?;
}
Command::Runtime => {
// Do not set the runtime in the cli.
}
Command::Coordinator { .. } => {
set_up_tracing("dora-coordinator").context("failed to set up tracing subscriber")?;
}
_ => {
set_up_tracing("dora-cli").context("failed to set up tracing subscriber")?;
}
};

match args.command {
Command::Check { dataflow } => match dataflow {
Some(dataflow) => {
@@ -159,15 +190,7 @@ fn run() -> eyre::Result<()> {
args,
internal_create_with_path_dependencies,
} => template::create(args, internal_create_with_path_dependencies)?,
Command::Up {
config,
coordinator_path,
daemon_path,
} => up::up(
config.as_deref(),
coordinator_path.as_deref(),
daemon_path.as_deref(),
)?,
Command::Up { config } => up::up(config.as_deref())?,
Command::Logs { dataflow, node } => {
let uuid = Uuid::parse_str(&dataflow).ok();
let name = if uuid.is_some() { None } else { Some(dataflow) };
@@ -227,7 +250,54 @@ fn run() -> eyre::Result<()> {
}
}
Command::Destroy { config } => up::destroy(config.as_deref())?,
}
Command::Coordinator { port } => {
let rt = Builder::new_multi_thread()
.enable_all()
.build()
.context("tokio runtime failed")?;
rt.block_on(async {
let (_port, task) =
dora_coordinator::start(port, futures::stream::empty::<Event>()).await?;
task.await
})
.context("failed to run dora-coordinator")?
}
Command::Daemon {
coordinator_addr,
machine_id,
run_dataflow,
} => {
let rt = Builder::new_multi_thread()
.enable_all()
.build()
.context("tokio runtime failed")?;
rt.block_on(async {
match run_dataflow {
Some(dataflow_path) => {
tracing::info!("Starting dataflow `{}`", dataflow_path.display());
if let Some(coordinator_addr) = coordinator_addr {
tracing::info!(
"Not using coordinator addr {} as `run_dataflow` is for local dataflow only. Please use the `start` command for remote coordinator",
coordinator_addr
);
}

Daemon::run_dataflow(&dataflow_path).await
}
None => {
let addr = coordinator_addr.unwrap_or_else(|| {
tracing::info!("Starting in local mode");
let localhost = Ipv4Addr::new(127, 0, 0, 1);
(localhost, DORA_COORDINATOR_PORT_DEFAULT).into()
});
Daemon::run(addr, machine_id.unwrap_or_default()).await
}
}
})
.context("failed to run dora-daemon")?
}
Command::Runtime => dora_runtime::main().context("Failed to run dora-runtime")?,
};

Ok(())
}


+ 13
- 17
binaries/cli/src/up.rs View File

@@ -6,17 +6,13 @@ use std::{fs, path::Path, process::Command, time::Duration};
#[derive(Debug, Default, serde::Serialize, serde::Deserialize)]
struct UpConfig {}

pub(crate) fn up(
config_path: Option<&Path>,
coordinator: Option<&Path>,
daemon: Option<&Path>,
) -> eyre::Result<()> {
pub(crate) fn up(config_path: Option<&Path>) -> eyre::Result<()> {
let UpConfig {} = parse_dora_config(config_path)?;

let mut session = match connect_to_coordinator() {
Ok(session) => session,
Err(_) => {
start_coordinator(coordinator).wrap_err("failed to start dora-coordinator")?;
start_coordinator().wrap_err("failed to start dora-coordinator")?;

loop {
match connect_to_coordinator() {
@@ -31,7 +27,7 @@ pub(crate) fn up(
};

if !daemon_running(&mut *session)? {
start_daemon(daemon).wrap_err("failed to start dora-daemon")?;
start_daemon().wrap_err("failed to start dora-daemon")?;
}

Ok(())
@@ -70,24 +66,24 @@ fn parse_dora_config(config_path: Option<&Path>) -> Result<UpConfig, eyre::ErrRe
Ok(config)
}

fn start_coordinator(coordinator: Option<&Path>) -> eyre::Result<()> {
let coordinator = coordinator.unwrap_or_else(|| Path::new("dora-coordinator"));
let mut cmd = Command::new(coordinator);
fn start_coordinator() -> eyre::Result<()> {
let mut cmd =
Command::new(std::env::current_exe().wrap_err("failed to get current executable path")?);
cmd.arg("coordinator");
cmd.spawn()
.wrap_err_with(|| format!("failed to run {}", coordinator.display()))?;
.wrap_err_with(|| format!("failed to run `dora coordinator`"))?;

println!("started dora coordinator");

Ok(())
}

fn start_daemon(daemon: Option<&Path>) -> eyre::Result<()> {
let daemon = daemon.unwrap_or_else(|| Path::new("dora-daemon"));
let mut cmd = Command::new(daemon);
fn start_daemon() -> eyre::Result<()> {
let mut cmd =
Command::new(std::env::current_exe().wrap_err("failed to get current executable path")?);
cmd.arg("daemon");
cmd.spawn()
.wrap_err_with(|| format!("failed to run {}", daemon.display()))?;
.wrap_err_with(|| format!("failed to run `dora daemon`"))?;

println!("started dora daemon");



+ 1
- 8
binaries/coordinator/Cargo.toml View File

@@ -15,20 +15,13 @@ tracing = ["dep:dora-tracing"]
[dependencies]
eyre = "0.6.7"
futures = "0.3.21"
serde = { version = "1.0.136", features = ["derive"] }
serde_yaml = "0.8.23"
tokio = { version = "1.24.2", features = ["full"] }
tokio-stream = { version = "0.1.8", features = ["io-util", "net"] }
uuid = { version = "1.2.1" }
rand = "0.8.5"
dora-core = { workspace = true }
tracing = "0.1.36"
dora-tracing = { workspace = true, optional = true }
futures-concurrency = "7.1.0"
zenoh = "0.7.0-rc"
serde_json = "1.0.86"
which = "5.0.0"
thiserror = "1.0.37"
ctrlc = "3.2.5"
clap = { version = "4.0.3", features = ["derive"] }
names = "0.14.0"
ctrlc = "3.2.5"

+ 30
- 43
binaries/coordinator/src/lib.rs View File

@@ -38,36 +38,23 @@ mod listener;
mod run;
mod tcp_utils;

#[derive(Debug, Clone, clap::Parser)]
#[clap(about = "Dora coordinator")]
pub struct Args {
#[clap(long)]
pub port: Option<u16>,
}

pub async fn run(args: Args) -> eyre::Result<()> {
let ctrlc_events = set_up_ctrlc_handler()?;

let (_, task) = start(args, ctrlc_events).await?;

task.await?;

Ok(())
}

pub async fn start(
args: Args,
port: Option<u16>,
external_events: impl Stream<Item = Event> + Unpin,
) -> Result<(u16, impl Future<Output = eyre::Result<()>>), eyre::ErrReport> {
let port = args.port.unwrap_or(DORA_COORDINATOR_PORT_DEFAULT);
let port = port.unwrap_or(DORA_COORDINATOR_PORT_DEFAULT);
let listener = listener::create_listener(port).await?;
let port = listener
.local_addr()
.wrap_err("failed to get local addr of listener")?
.port();
let mut tasks = FuturesUnordered::new();

// Setup ctrl-c handler
let ctrlc_events = set_up_ctrlc_handler()?;

let future = async move {
start_inner(listener, &tasks, external_events).await?;
start_inner(listener, &tasks, (ctrlc_events, external_events).merge()).await?;

tracing::debug!("coordinator main loop finished, waiting on spawned tasks");
while let Some(join_result) = tasks.next().await {
@@ -604,28 +591,6 @@ struct DaemonConnection {
last_heartbeat: Instant,
}

fn set_up_ctrlc_handler() -> Result<impl Stream<Item = Event>, eyre::ErrReport> {
let (ctrlc_tx, ctrlc_rx) = mpsc::channel(1);

let mut ctrlc_sent = false;
ctrlc::set_handler(move || {
if ctrlc_sent {
tracing::warn!("received second ctrlc signal -> aborting immediately");
std::process::abort();
} else {
tracing::info!("received ctrlc signal");
if ctrlc_tx.blocking_send(Event::CtrlC).is_err() {
tracing::error!("failed to report ctrl-c event to dora-coordinator");
}

ctrlc_sent = true;
}
})
.wrap_err("failed to set ctrl-c handler")?;

Ok(ReceiverStream::new(ctrlc_rx))
}

async fn handle_destroy(
running_dataflows: &HashMap<Uuid, RunningDataflow>,
daemon_connections: &mut HashMap<String, DaemonConnection>,
@@ -887,7 +852,7 @@ async fn destroy_daemons(
match serde_json::from_slice(&reply_raw)
.wrap_err("failed to deserialize destroy reply from daemon")?
{
DaemonCoordinatorReply::DestroyResult(result) => result
DaemonCoordinatorReply::DestroyResult { result, .. } => result
.map_err(|e| eyre!(e))
.wrap_err("failed to destroy dataflow")?,
other => bail!("unexpected reply after sending `destroy`: {other:?}"),
@@ -943,3 +908,25 @@ pub enum DaemonEvent {
listen_socket: SocketAddr,
},
}

fn set_up_ctrlc_handler() -> Result<impl Stream<Item = Event>, eyre::ErrReport> {
let (ctrlc_tx, ctrlc_rx) = mpsc::channel(1);

let mut ctrlc_sent = false;
ctrlc::set_handler(move || {
if ctrlc_sent {
tracing::warn!("received second ctrlc signal -> aborting immediately");
std::process::abort();
} else {
tracing::info!("received ctrlc signal");
if ctrlc_tx.blocking_send(Event::CtrlC).is_err() {
tracing::error!("failed to report ctrl-c event to dora-coordinator");
}

ctrlc_sent = true;
}
})
.wrap_err("failed to set ctrl-c handler")?;

Ok(ReceiverStream::new(ctrlc_rx))
}

+ 0
- 12
binaries/coordinator/src/main.rs View File

@@ -1,12 +0,0 @@
#[cfg(feature = "tracing")]
use dora_tracing::set_up_tracing;
#[cfg(feature = "tracing")]
use eyre::Context;

#[tokio::main]
async fn main() -> eyre::Result<()> {
#[cfg(feature = "tracing")]
set_up_tracing("dora-coordinator").context("failed to set up tracing subscriber")?;

dora_coordinator::run(clap::Parser::parse()).await
}

+ 1
- 6
binaries/daemon/Cargo.toml View File

@@ -22,21 +22,16 @@ tokio-stream = { version = "0.1.11", features = ["net"] }
tracing = "0.1.36"
tracing-opentelemetry = { version = "0.18.0", optional = true }
futures-concurrency = "7.1.0"
serde = { version = "1.0.136", features = ["derive"] }
serde_json = "1.0.86"
dora-core = { workspace = true }
dora-runtime = { workspace = true }
flume = "0.10.14"
dora-download = { workspace = true }
dora-tracing = { workspace = true, optional = true }
serde_yaml = "0.8.23"
uuid = { version = "1.1.2", features = ["v4"] }
futures = "0.3.25"
clap = { version = "4.0.3", features = ["derive"] }
shared-memory-server = { workspace = true }
ctrlc = "3.2.5"
bincode = "1.3.3"
async-trait = "0.1.64"
arrow-schema = { workspace = true }
aligned-vec = "0.5.0"
which = "5.0.0"
ctrlc = "3.2.5"

+ 6
- 0
binaries/daemon/src/coordinator.rs View File

@@ -113,6 +113,12 @@ pub async fn register(
tracing::warn!("failed to send reply to coordinator: {err}");
continue;
};
if let DaemonCoordinatorReply::DestroyResult { notify, .. } = reply {
if let Some(notify) = notify {
let _ = notify.send(());
}
break;
}
}
}
});


+ 44
- 12
binaries/daemon/src/lib.rs View File

@@ -76,13 +76,11 @@ pub struct Daemon {
}

impl Daemon {
pub async fn run(
coordinator_addr: SocketAddr,
machine_id: String,
external_events: impl Stream<Item = Timestamped<Event>> + Unpin,
) -> eyre::Result<()> {
pub async fn run(coordinator_addr: SocketAddr, machine_id: String) -> eyre::Result<()> {
let clock = Arc::new(HLC::default());

let ctrlc_events = set_up_ctrlc_handler(clock.clone())?;

// spawn listen loop
let (events_tx, events_rx) = flume::bounded(10);
let listen_socket =
@@ -108,7 +106,7 @@ impl Daemon {
);

Self::run_general(
(coordinator_events, external_events, daemon_events).merge(),
(coordinator_events, ctrlc_events, daemon_events).merge(),
Some(coordinator_addr),
machine_id,
None,
@@ -159,7 +157,7 @@ impl Daemon {
let run_result = Self::run_general(
Box::pin(coordinator_events),
None,
"".into(),
"".to_string(),
Some(exit_when_done),
clock,
);
@@ -429,10 +427,18 @@ impl Daemon {
}
DaemonCoordinatorEvent::Destroy => {
tracing::info!("received destroy command -> exiting");
let reply = DaemonCoordinatorReply::DestroyResult(Ok(()));
let (notify_tx, notify_rx) = oneshot::channel();
let reply = DaemonCoordinatorReply::DestroyResult {
result: Ok(()),
notify: Some(notify_tx),
};
let _ = reply_tx
.send(Some(reply))
.map_err(|_| error!("could not send destroy reply from daemon to coordinator"));
// wait until the reply is sent out
if notify_rx.await.is_err() {
tracing::warn!("no confirmation received for DestroyReply");
}
RunStatus::Exit
}
DaemonCoordinatorEvent::Heartbeat => {
@@ -1062,10 +1068,6 @@ impl Daemon {
}
}

pub fn run_dora_runtime() -> eyre::Result<()> {
dora_runtime::main()
}

async fn send_output_to_local_receivers(
node_id: NodeId,
output_id: DataId,
@@ -1519,3 +1521,33 @@ fn send_with_timestamp<T>(
timestamp: clock.new_timestamp(),
})
}

fn set_up_ctrlc_handler(
clock: Arc<HLC>,
) -> Result<impl Stream<Item = Timestamped<Event>>, eyre::ErrReport> {
let (ctrlc_tx, ctrlc_rx) = mpsc::channel(1);

let mut ctrlc_sent = false;
ctrlc::set_handler(move || {
if ctrlc_sent {
tracing::warn!("received second ctrlc signal -> aborting immediately");
std::process::abort();
} else {
tracing::info!("received ctrlc signal");
if ctrlc_tx
.blocking_send(Timestamped {
inner: Event::CtrlC,
timestamp: clock.new_timestamp(),
})
.is_err()
{
tracing::error!("failed to report ctrl-c event to dora-coordinator");
}

ctrlc_sent = true;
}
})
.wrap_err("failed to set ctrl-c handler")?;

Ok(ReceiverStream::new(ctrlc_rx))
}

+ 0
- 98
binaries/daemon/src/main.rs View File

@@ -1,98 +0,0 @@
use dora_core::{
daemon_messages::Timestamped, message::uhlc::HLC, topics::DORA_COORDINATOR_PORT_DEFAULT,
};
use dora_daemon::{Daemon, Event};
#[cfg(feature = "tracing")]
use dora_tracing::set_up_tracing;
#[cfg(feature = "tracing")]
use eyre::Context;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;

use std::{
net::{Ipv4Addr, SocketAddr},
path::PathBuf,
};

#[derive(Debug, Clone, clap::Parser)]
#[clap(about = "Dora daemon")]
pub struct Args {
#[clap(long)]
pub machine_id: Option<String>,

#[clap(long)]
pub coordinator_addr: Option<SocketAddr>,

#[clap(long)]
pub run_dataflow: Option<PathBuf>,

#[clap(long)]
pub run_dora_runtime: bool,
}

#[tokio::main]
async fn main() -> eyre::Result<()> {
// the tokio::main proc macro confuses some tools such as rust-analyzer, so
// directly invoke a "normal" async function
run().await
}

async fn run() -> eyre::Result<()> {
let Args {
run_dataflow,
machine_id,
coordinator_addr,
run_dora_runtime,
} = clap::Parser::parse();

if run_dora_runtime {
return tokio::task::block_in_place(dora_daemon::run_dora_runtime);
}

#[cfg(feature = "tracing")]
set_up_tracing("dora-daemon").wrap_err("failed to set up tracing subscriber")?;

let ctrl_c_events = {
let (ctrl_c_tx, ctrl_c_rx) = mpsc::channel(1);
let mut ctrlc_sent = false;
ctrlc::set_handler(move || {
let clock = HLC::default();
if ctrlc_sent {
tracing::warn!("received second ctrlc signal -> aborting immediately");
std::process::abort();
} else {
tracing::info!("received ctrlc signal");
let event = Timestamped {
inner: Event::CtrlC,
timestamp: clock.new_timestamp(),
};
if ctrl_c_tx.blocking_send(event).is_err() {
tracing::error!("failed to report ctrl-c event to dora-daemon");
}
ctrlc_sent = true;
}
})
.wrap_err("failed to set ctrl-c handler")?;
ReceiverStream::new(ctrl_c_rx)
};

match run_dataflow {
Some(dataflow_path) => {
tracing::info!("Starting dataflow `{}`", dataflow_path.display());

Daemon::run_dataflow(&dataflow_path).await
}
None => {
Daemon::run(
coordinator_addr.unwrap_or_else(|| {
tracing::info!("Starting in local mode");
let localhost = Ipv4Addr::new(127, 0, 0, 1);
(localhost, DORA_COORDINATOR_PORT_DEFAULT).into()
}),
machine_id.unwrap_or_default(),
ctrl_c_events,
)
.await
}
}
}

+ 1
- 1
binaries/daemon/src/spawn.rs View File

@@ -162,7 +162,7 @@ pub async fn spawn_node(
let mut cmd = tokio::process::Command::new(
std::env::current_exe().wrap_err("failed to get current executable path")?,
);
cmd.arg("--run-dora-runtime");
cmd.arg("runtime");
cmd
} else {
eyre::bail!("Runtime can not mix Python Operator with other type of operator.");


+ 0
- 3
binaries/runtime/Cargo.toml View File

@@ -27,13 +27,10 @@ tokio-stream = "0.1.8"
# pyo3-abi3 flag allow simpler linking. See: https://pyo3.rs/v0.13.2/building_and_distribution.html
pyo3 = { workspace = true, features = ["eyre", "abi3-py37"], optional = true }
tracing = "0.1.36"
tracing-subscriber = "0.3.15"
dora-download = { workspace = true }
flume = "0.10.14"
clap = { version = "4.0.3", features = ["derive"] }
tracing-opentelemetry = { version = "0.18.0", optional = true }
pythonize = { workspace = true, optional = true }
arrow-schema = { workspace = true }
arrow = { workspace = true, features = ["ffi"] }
aligned-vec = "0.5.0"



+ 16
- 13
examples/benchmark/run.rs View File

@@ -2,20 +2,8 @@ use dora_tracing::set_up_tracing;
use eyre::{bail, Context};
use std::path::Path;

#[derive(Debug, Clone, clap::Parser)]
pub struct Args {
#[clap(long)]
pub run_dora_runtime: bool,
}

#[tokio::main]
async fn main() -> eyre::Result<()> {
let Args { run_dora_runtime } = clap::Parser::parse();

if run_dora_runtime {
return tokio::task::block_in_place(dora_daemon::run_dora_runtime);
}

set_up_tracing("benchmark-runner").wrap_err("failed to set up tracing subscriber")?;

let root = Path::new(env!("CARGO_MANIFEST_DIR"));
@@ -25,7 +13,7 @@ async fn main() -> eyre::Result<()> {
let dataflow = Path::new("dataflow.yml");
build_dataflow(dataflow).await?;

dora_daemon::Daemon::run_dataflow(dataflow).await?;
run_dataflow(dataflow).await?;

Ok(())
}
@@ -41,3 +29,18 @@ async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> {
};
Ok(())
}

async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--")
.arg("daemon")
.arg("--run-dataflow")
.arg(dataflow);
if !cmd.status().await?.success() {
bail!("failed to run dataflow");
};
Ok(())
}

+ 16
- 12
examples/c++-dataflow/run.rs View File

@@ -5,19 +5,8 @@ use std::{
path::Path,
};

#[derive(Debug, Clone, clap::Parser)]
pub struct Args {
#[clap(long)]
pub run_dora_runtime: bool,
}

#[tokio::main]
async fn main() -> eyre::Result<()> {
let Args { run_dora_runtime } = clap::Parser::parse();

if run_dora_runtime {
return tokio::task::block_in_place(dora_daemon::run_dora_runtime);
}
set_up_tracing("c++-dataflow-runner").wrap_err("failed to set up tracing")?;

if cfg!(windows) {
@@ -123,7 +112,7 @@ async fn main() -> eyre::Result<()> {

let dataflow = Path::new("dataflow.yml").to_owned();
build_package("dora-runtime").await?;
dora_daemon::Daemon::run_dataflow(&dataflow).await?;
run_dataflow(&dataflow).await?;

Ok(())
}
@@ -139,6 +128,21 @@ async fn build_package(package: &str) -> eyre::Result<()> {
Ok(())
}

async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--")
.arg("daemon")
.arg("--run-dataflow")
.arg(dataflow);
if !cmd.status().await?.success() {
bail!("failed to run dataflow");
};
Ok(())
}

async fn build_cxx_node(
root: &Path,
paths: &[&Path],


+ 16
- 13
examples/c-dataflow/run.rs View File

@@ -5,20 +5,8 @@ use std::{
path::Path,
};

#[derive(Debug, Clone, clap::Parser)]
pub struct Args {
#[clap(long)]
pub run_dora_runtime: bool,
}

#[tokio::main]
async fn main() -> eyre::Result<()> {
let Args { run_dora_runtime } = clap::Parser::parse();

if run_dora_runtime {
return tokio::task::block_in_place(dora_daemon::run_dora_runtime);
}

set_up_tracing("c-dataflow-runner").wrap_err("failed to set up tracing")?;

let root = Path::new(env!("CARGO_MANIFEST_DIR"));
@@ -35,7 +23,7 @@ async fn main() -> eyre::Result<()> {
build_c_operator(root).await?;

let dataflow = Path::new("dataflow.yml").to_owned();
dora_daemon::Daemon::run_dataflow(&dataflow).await?;
run_dataflow(&dataflow).await?;

Ok(())
}
@@ -51,6 +39,21 @@ async fn build_package(package: &str) -> eyre::Result<()> {
Ok(())
}

async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--")
.arg("daemon")
.arg("--run-dataflow")
.arg(dataflow);
if !cmd.status().await?.success() {
bail!("failed to run dataflow");
};
Ok(())
}

async fn build_c_node(root: &Path, name: &str, out_name: &str) -> eyre::Result<()> {
let mut clang = tokio::process::Command::new("clang");
clang.arg(name);


+ 16
- 12
examples/cmake-dataflow/run.rs View File

@@ -2,19 +2,8 @@ use dora_tracing::set_up_tracing;
use eyre::{bail, Context};
use std::path::Path;

#[derive(Debug, Clone, clap::Parser)]
pub struct Args {
#[clap(long)]
pub run_dora_runtime: bool,
}

#[tokio::main]
async fn main() -> eyre::Result<()> {
let Args { run_dora_runtime } = clap::Parser::parse();

if run_dora_runtime {
return tokio::task::block_in_place(dora_daemon::run_dora_runtime);
}
set_up_tracing("cmake-dataflow-runner").wrap_err("failed to set up tracing")?;

if cfg!(windows) {
@@ -51,7 +40,7 @@ async fn main() -> eyre::Result<()> {

let dataflow = Path::new("dataflow.yml").to_owned();
build_package("dora-runtime").await?;
dora_daemon::Daemon::run_dataflow(&dataflow).await?;
run_dataflow(&dataflow).await?;

Ok(())
}
@@ -66,3 +55,18 @@ async fn build_package(package: &str) -> eyre::Result<()> {
}
Ok(())
}

async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--")
.arg("daemon")
.arg("--run-dataflow")
.arg(dataflow);
if !cmd.status().await?.success() {
bail!("failed to run dataflow");
};
Ok(())
}

+ 22
- 20
examples/multiple-daemons/run.rs View File

@@ -5,7 +5,7 @@ use dora_core::{
};
use dora_tracing::set_up_tracing;
use eyre::{bail, Context};
use futures::stream;
use std::{
collections::BTreeSet,
net::{Ipv4Addr, SocketAddr},
@@ -22,19 +22,8 @@ use tokio::{
use tokio_stream::wrappers::ReceiverStream;
use uuid::Uuid;

#[derive(Debug, Clone, clap::Parser)]
pub struct Args {
#[clap(long)]
pub run_dora_runtime: bool,
}

#[tokio::main]
async fn main() -> eyre::Result<()> {
let Args { run_dora_runtime } = clap::Parser::parse();
if run_dora_runtime {
return tokio::task::block_in_place(dora_daemon::run_dora_runtime);
}

set_up_tracing("multiple-daemon-runner").wrap_err("failed to set up tracing subscriber")?;

let root = Path::new(env!("CARGO_MANIFEST_DIR"));
@@ -45,14 +34,11 @@ async fn main() -> eyre::Result<()> {
build_dataflow(dataflow).await?;

let (coordinator_events_tx, coordinator_events_rx) = mpsc::channel(1);
let (coordinator_port, coordinator) = dora_coordinator::start(
dora_coordinator::Args { port: Some(0) },
ReceiverStream::new(coordinator_events_rx),
)
.await?;
let (coordinator_port, coordinator) =
dora_coordinator::start(None, ReceiverStream::new(coordinator_events_rx)).await?;
let coordinator_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), coordinator_port);
let daemon_a = dora_daemon::Daemon::run(coordinator_addr, "A".into(), stream::empty());
let daemon_b = dora_daemon::Daemon::run(coordinator_addr, "B".into(), stream::empty());
let daemon_a = run_daemon(coordinator_addr.to_string(), "A".into());
let daemon_b = run_daemon(coordinator_addr.to_string(), "B".into());

tracing::info!("Spawning coordinator and daemons");
let mut tasks = JoinSet::new();
@@ -60,7 +46,6 @@ async fn main() -> eyre::Result<()> {
tasks.spawn(daemon_a);
tasks.spawn(daemon_b);

// wait until both daemons are connected
tracing::info!("waiting until daemons are connected to coordinator");
let mut retries = 0;
loop {
@@ -210,3 +195,20 @@ async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> {
};
Ok(())
}

async fn run_daemon(coordinator: String, machine_id: &str) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--")
.arg("daemon")
.arg("--machine-id")
.arg(machine_id)
.arg("--coordinator-addr")
.arg(coordinator);
if !cmd.status().await?.success() {
bail!("failed to run dataflow");
};
Ok(())
}

+ 17
- 2
examples/python-dataflow/run.rs View File

@@ -1,7 +1,7 @@
use dora_core::{get_pip_path, get_python_path, run};
use dora_download::download_file;
use dora_tracing::set_up_tracing;
use eyre::{ContextCompat, WrapErr};
use eyre::{bail, ContextCompat, WrapErr};
use std::path::Path;

#[tokio::main]
@@ -81,7 +81,22 @@ async fn main() -> eyre::Result<()> {
.context("Could not download weights.")?;

let dataflow = Path::new("dataflow.yml");
dora_daemon::Daemon::run_dataflow(dataflow).await?;
run_dataflow(dataflow).await?;

Ok(())
}

async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--")
.arg("daemon")
.arg("--run-dataflow")
.arg(dataflow);
if !cmd.status().await?.success() {
bail!("failed to run dataflow");
};
Ok(())
}

+ 17
- 2
examples/python-operator-dataflow/run.rs View File

@@ -1,7 +1,7 @@
use dora_core::{get_pip_path, get_python_path, run};
use dora_download::download_file;
use dora_tracing::set_up_tracing;
use eyre::{ContextCompat, WrapErr};
use eyre::{bail, ContextCompat, WrapErr};
use std::path::Path;

#[tokio::main]
@@ -81,7 +81,22 @@ async fn main() -> eyre::Result<()> {
.await
.context("Could not download weights.")?;
let dataflow = Path::new("dataflow.yml");
dora_daemon::Daemon::run_dataflow(dataflow).await?;
run_dataflow(dataflow).await?;

Ok(())
}

async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--")
.arg("daemon")
.arg("--run-dataflow")
.arg(dataflow);
if !cmd.status().await?.success() {
bail!("failed to run dataflow");
};
Ok(())
}

+ 17
- 2
examples/python-ros2-dataflow/run.rs View File

@@ -1,6 +1,6 @@
use dora_core::{get_pip_path, get_python_path, run};
use dora_tracing::set_up_tracing;
use eyre::{ContextCompat, WrapErr};
use eyre::{bail, ContextCompat, WrapErr};
use std::path::Path;

#[tokio::main]
@@ -74,7 +74,22 @@ async fn main() -> eyre::Result<()> {
.context("maturin develop failed")?;

let dataflow = Path::new("dataflow.yml");
dora_daemon::Daemon::run_dataflow(dataflow).await?;
run_dataflow(dataflow).await?;

Ok(())
}

async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--")
.arg("daemon")
.arg("--run-dataflow")
.arg(dataflow);
if !cmd.status().await?.success() {
bail!("failed to run dataflow");
};
Ok(())
}

+ 16
- 12
examples/rust-dataflow-url/run.rs View File

@@ -2,19 +2,8 @@ use dora_tracing::set_up_tracing;
use eyre::{bail, Context};
use std::path::Path;

#[derive(Debug, Clone, clap::Parser)]
pub struct Args {
#[clap(long)]
pub run_dora_runtime: bool,
}

#[tokio::main]
async fn main() -> eyre::Result<()> {
let Args { run_dora_runtime } = clap::Parser::parse();
if run_dora_runtime {
return tokio::task::block_in_place(dora_daemon::run_dora_runtime);
}

set_up_tracing("rust-dataflow-url-runner").wrap_err("failed to set up tracing")?;

let root = Path::new(env!("CARGO_MANIFEST_DIR"));
@@ -24,7 +13,7 @@ async fn main() -> eyre::Result<()> {
let dataflow = Path::new("dataflow.yml");
build_dataflow(dataflow).await?;

dora_daemon::Daemon::run_dataflow(dataflow).await?;
run_dataflow(dataflow).await?;

Ok(())
}
@@ -40,3 +29,18 @@ async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> {
};
Ok(())
}

async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--")
.arg("daemon")
.arg("--run-dataflow")
.arg(dataflow);
if !cmd.status().await?.success() {
bail!("failed to run dataflow");
};
Ok(())
}

+ 16
- 13
examples/rust-dataflow/run.rs View File

@@ -2,20 +2,8 @@ use dora_tracing::set_up_tracing;
use eyre::{bail, Context};
use std::path::Path;

#[derive(Debug, Clone, clap::Parser)]
pub struct Args {
#[clap(long)]
pub run_dora_runtime: bool,
}

#[tokio::main]
async fn main() -> eyre::Result<()> {
let Args { run_dora_runtime } = clap::Parser::parse();

if run_dora_runtime {
return tokio::task::block_in_place(dora_daemon::run_dora_runtime);
}

set_up_tracing("rust-dataflow-runner").wrap_err("failed to set up tracing subscriber")?;

let root = Path::new(env!("CARGO_MANIFEST_DIR"));
@@ -25,7 +13,7 @@ async fn main() -> eyre::Result<()> {
let dataflow = Path::new("dataflow.yml");
build_dataflow(dataflow).await?;

dora_daemon::Daemon::run_dataflow(dataflow).await?;
run_dataflow(dataflow).await?;

Ok(())
}
@@ -41,3 +29,18 @@ async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> {
};
Ok(())
}

async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--")
.arg("daemon")
.arg("--run-dataflow")
.arg(dataflow);
if !cmd.status().await?.success() {
bail!("failed to run dataflow");
};
Ok(())
}

+ 16
- 13
examples/rust-ros2-dataflow/run.rs View File

@@ -2,20 +2,8 @@ use dora_tracing::set_up_tracing;
use eyre::{bail, Context};
use std::path::Path;

#[derive(Debug, Clone, clap::Parser)]
pub struct Args {
#[clap(long)]
pub run_dora_runtime: bool,
}

#[tokio::main]
async fn main() -> eyre::Result<()> {
let Args { run_dora_runtime } = clap::Parser::parse();

if run_dora_runtime {
return tokio::task::block_in_place(dora_daemon::run_dora_runtime);
}

set_up_tracing("rust-ros2-dataflow-runner").wrap_err("failed to set up tracing subscriber")?;

let root = Path::new(env!("CARGO_MANIFEST_DIR"));
@@ -25,7 +13,7 @@ async fn main() -> eyre::Result<()> {
let dataflow = Path::new("dataflow.yml");
build_dataflow(dataflow).await?;

dora_daemon::Daemon::run_dataflow(dataflow).await?;
run_dataflow(dataflow).await?;

Ok(())
}
@@ -41,3 +29,18 @@ async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> {
};
Ok(())
}

async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--")
.arg("daemon")
.arg("--run-dataflow")
.arg(dataflow);
if !cmd.status().await?.success() {
bail!("failed to run dataflow");
};
Ok(())
}

+ 1
- 2
libraries/core/Cargo.toml View File

@@ -12,12 +12,11 @@ license.workspace = true
eyre = "0.6.8"
serde = { version = "1.0.136", features = ["derive"] }
serde_yaml = "0.9.11"
serde_bytes = "0.11.12"
once_cell = "1.13.0"
which = "5.0.0"
uuid = { version = "1.2.1", features = ["serde"] }
dora-message = { workspace = true }
tracing = "0.1"
serde-with-expand-env = "1.1.0"
tokio = { version = "1.24.1", features = ["fs", "process"] }
tokio = { version = "1.24.1", features = ["fs", "process", "sync"] }
aligned-vec = { version = "0.5.0", features = ["serde"] }

+ 5
- 1
libraries/core/src/daemon_messages.rs View File

@@ -247,7 +247,11 @@ pub enum DaemonCoordinatorReply {
SpawnResult(Result<(), String>),
ReloadResult(Result<(), String>),
StopResult(Result<(), String>),
DestroyResult(Result<(), String>),
DestroyResult {
result: Result<(), String>,
#[serde(skip)]
notify: Option<tokio::sync::oneshot::Sender<()>>,
},
Logs(Result<Vec<u8>, String>),
}



+ 0
- 1
libraries/extensions/ros2-bridge/python/Cargo.toml View File

@@ -10,6 +10,5 @@ dora-ros2-bridge-msg-gen = { path = "../msg-gen" }
pyo3 = { workspace = true, features = ["eyre", "abi3-py37", "serde"] }
eyre = "0.6"
serde = "1.0.166"
flume = "0.10.14"
arrow = { workspace = true, features = ["pyarrow"] }
futures = "0.3.28"

+ 0
- 2
libraries/extensions/telemetry/metrics/Cargo.toml View File

@@ -9,8 +9,6 @@ license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
futures = "0.3.21"
opentelemetry = { version = "0.21", features = ["metrics"] }
opentelemetry-otlp = { version = "0.14.0", features = ["tonic", "metrics"] }
tokio = { version = "1.24.2", features = ["full"] }
opentelemetry_sdk = { version = "0.21.0", features = ["rt-tokio", "metrics"] }

+ 0
- 1
libraries/extensions/telemetry/tracing/Cargo.toml View File

@@ -11,7 +11,6 @@ license.workspace = true
[features]

[dependencies]
tokio = { version = "1.24.2", features = ["full"] }
tracing-subscriber = { version = "0.3.15", features = ["env-filter"] }
tracing-opentelemetry = { version = "0.18.0" }
eyre = "0.6.8"


Loading…
Cancel
Save