Browse Source

Merge branch 'main' into multiple-daemons

tags/v0.2.3-rc
Philipp Oppermann 2 years ago
parent
commit
8bf124a014
Failed to extract signature
12 changed files with 213 additions and 104 deletions
  1. +42
    -0
      .github/labeler.yml
  2. +1
    -1
      .github/workflows/ci.yml
  3. +15
    -0
      .github/workflows/labeler.yml
  4. +1
    -1
      .github/workflows/pip-release.yml
  5. +39
    -6
      README.md
  6. +3
    -5
      binaries/cli/src/attach.rs
  7. +31
    -33
      binaries/cli/src/check.rs
  8. +44
    -31
      binaries/cli/src/main.rs
  9. +30
    -24
      binaries/cli/src/up.rs
  10. +4
    -1
      binaries/daemon/src/spawn.rs
  11. +1
    -1
      binaries/runtime/src/operator/python.rs
  12. +2
    -1
      libraries/extensions/telemetry/tracing/src/lib.rs

+ 42
- 0
.github/labeler.yml View File

@@ -0,0 +1,42 @@
# Add/remove 'critical' label if issue contains the words 'urgent' or 'critical'
critical:
- "(critical|urgent)"

cli:
- "/(cli|command)/i"

daemon:
- "daemon"

coordinator:
- "coordinator"

runtime:
- "runtime"

python:
- "python"

c:
- "/\bc\b/i"

c++:
- "cxx"

rust:
- "rust"

windows:
- "windows"

macos:
- "macos"

linux:
- "(linux|ubuntu)"

bug:
- "bug"

documentation:
- "(doc|documentation)"

+ 1
- 1
.github/workflows/ci.yml View File

@@ -71,7 +71,7 @@ jobs:
name: "CLI Test"
strategy:
matrix:
platform: [ubuntu-latest, macos-latest]
platform: [ubuntu-latest, macos-latest, windows-latest]
fail-fast: false
runs-on: ${{ matrix.platform }}
steps:


+ 15
- 0
.github/workflows/labeler.yml View File

@@ -0,0 +1,15 @@
name: "Issue Labeler"
on:
issues:
types: [opened, edited]

jobs:
triage:
runs-on: ubuntu-latest
steps:
- uses: github/issue-labeler@v3.1 #May not be the latest version
with:
repo-token: "${{ github.token }}"
configuration-path: .github/labeler.yml
enable-versioned-regex: 0
include-title: 1

+ 1
- 1
.github/workflows/pip-release.yml View File

@@ -14,7 +14,7 @@ jobs:

strategy:
matrix:
platform: [ubuntu-latest, ubuntu-20.04]
platform: [ubuntu-latest, ubuntu-20.04, macos-latest, windows-latest]
python-version: ["3.7"]
fail-fast: false
runs-on: ${{ matrix.platform }}


+ 39
- 6
README.md View File

@@ -115,22 +115,55 @@ nodes:

Composability as:
- [x] `YAML` declarative programming
- [x] polyglot:
- [x] Rust
- [x] C
- [x] C++
- [x] Python
- [x] Isolated operators and custom nodes that can be reused.
- [x] Hot Reloading for Python Operators

Low latency as:
- [x] written in <i>...Cough...blazingly fast ...Cough...</i> Rust.
- [x] PubSub communication with shared memory!
- [ ] Zero-copy on read!
- [x] Zero-copy!

Distributed as:
- [ ] PubSub communication between machines with [`zenoh`](https://github.com/eclipse-zenoh/zenoh)
- [x] Distributed telemetry with [`opentelemetry`](https://github.com/open-telemetry/opentelemetry-rust)

## Support matrix

### Programming Language API:

- [x] Rust
- [x] Python
- [x] C
- [x] C++
- [ ] WebAssembly (Wished for)

### OS:
- [x] Linux Ubuntu (tested)
- [x] MacOS (tested)
- [x] Windows (tested)

> Although, MacOS and Windows has a low priority for us now.

### Platform:
- [x] x86 (tested)
- [ ] aarch64

> Other platforms should also work althougth we haven't tested them yet.

### Data Format
- [x] Bytes
- [x] Arrow Array (Uint8) for Python
- [ ] Arrow Array (Uint16, Int32, ...) (Planned feature)
- [ ] Arrow Map (Wished feature)

### Local Communication
- [x] TCP
- [x] Shared Memory

### Remote Communication
- [x] TCP
- [ ] Zenoh

---




+ 3
- 5
binaries/cli/src/attach.rs View File

@@ -11,13 +11,11 @@ use std::{path::PathBuf, sync::mpsc, time::Duration};
use tracing::{error, info};
use uuid::Uuid;

use crate::control_connection;

pub fn attach_dataflow(
dataflow: Descriptor,
dataflow_path: PathBuf,
dataflow_id: Uuid,
session: &mut Option<Box<TcpRequestReplyConnection>>,
session: &mut TcpRequestReplyConnection,
hot_reload: bool,
) -> Result<(), eyre::ErrReport> {
let (tx, rx) = mpsc::sync_channel(2);
@@ -70,7 +68,7 @@ pub fn attach_dataflow(
if let Some((dataflow_id, node_id, operator_id)) = node_path_lookup.get(&path) {
watcher_tx
.send(ControlRequest::Reload {
dataflow_id: dataflow_id.clone(),
dataflow_id: *dataflow_id,
node_id: node_id.clone(),
operator_id: operator_id.clone(),
})
@@ -123,7 +121,7 @@ pub fn attach_dataflow(
Ok(reload_event) => reload_event,
};

let reply_raw = control_connection(session)?
let reply_raw = session
.request(&serde_json::to_vec(&control_request)?)
.wrap_err("failed to send request message to coordinator")?;
let result: ControlRequestReply =


+ 31
- 33
binaries/cli/src/check.rs View File

@@ -1,4 +1,5 @@
use crate::control_connection;
use crate::connect_to_coordinator;
use communication_layer_request_reply::TcpRequestReplyConnection;
use dora_core::topics::{ControlRequest, ControlRequestReply};
use eyre::{bail, Context};
use std::io::Write;
@@ -16,19 +17,30 @@ pub fn check_environment() -> eyre::Result<()> {

// check whether coordinator is running
write!(stdout, "Dora Coordinator: ")?;
if coordinator_running()? {
let _ = stdout.set_color(ColorSpec::new().set_fg(Some(Color::Green)));
writeln!(stdout, "ok")?;
} else {
let _ = stdout.set_color(ColorSpec::new().set_fg(Some(Color::Red)));
writeln!(stdout, "not running")?;
error_occured = true;
}
let mut session = match connect_to_coordinator() {
Ok(session) => {
let _ = stdout.set_color(ColorSpec::new().set_fg(Some(Color::Green)));
writeln!(stdout, "ok")?;
Some(session)
}
Err(_) => {
let _ = stdout.set_color(ColorSpec::new().set_fg(Some(Color::Red)));
writeln!(stdout, "not running")?;
error_occured = true;
None
}
};

let _ = stdout.reset();

// check whether daemon is running
write!(stdout, "Dora Daemon: ")?;
if daemon_running()? {
if session
.as_deref_mut()
.map(daemon_running)
.transpose()?
.unwrap_or(false)
{
let _ = stdout.set_color(ColorSpec::new().set_fg(Some(Color::Green)));
writeln!(stdout, "ok")?;
} else {
@@ -47,30 +59,16 @@ pub fn check_environment() -> eyre::Result<()> {
Ok(())
}

pub fn coordinator_running() -> Result<bool, eyre::ErrReport> {
let mut control_session = None;
let connected = control_connection(&mut control_session).is_ok();
Ok(connected)
}

pub fn daemon_running() -> Result<bool, eyre::ErrReport> {
let mut control_session = None;
let running = match control_connection(&mut control_session) {
Ok(connection) => {
let reply_raw = connection
.request(&serde_json::to_vec(&ControlRequest::DaemonConnected).unwrap())
.wrap_err("failed to send DaemonConnected message")?;
pub fn daemon_running(session: &mut TcpRequestReplyConnection) -> Result<bool, eyre::ErrReport> {
let reply_raw = session
.request(&serde_json::to_vec(&ControlRequest::DaemonConnected).unwrap())
.wrap_err("failed to send DaemonConnected message")?;

let reply = serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match reply {
ControlRequestReply::DaemonConnected(running) => running,
other => bail!("unexpected reply to daemon connection check: {other:?}"),
}
}
Err(_) => {
// coordinator is not running
false
}
let reply = serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
let running = match reply {
ControlRequestReply::DaemonConnected(running) => running,
other => bail!("unexpected reply to daemon connection check: {other:?}"),
};

Ok(running)
}

+ 44
- 31
binaries/cli/src/main.rs View File

@@ -117,13 +117,18 @@ enum Lang {
Cxx,
}

fn main() -> eyre::Result<()> {
fn main() {
if let Err(err) = run() {
eprintln!("{err:#}");
std::process::exit(1);
}
}

fn run() -> eyre::Result<()> {
#[cfg(feature = "tracing")]
set_up_tracing("dora-cli").context("failed to set up tracing subscriber")?;
let args = Args::parse();

let mut session = None;

match args.command {
Command::Check { dataflow } => match dataflow {
Some(dataflow) => {
@@ -178,26 +183,41 @@ fn main() -> eyre::Result<()> {
dataflow_descriptor
.check(&working_dir)
.wrap_err("Could not validate yaml")?;
let dataflow_id =
start_dataflow(dataflow_descriptor.clone(), name, working_dir, &mut session)?;
let mut session =
connect_to_coordinator().wrap_err("failed to connect to dora coordinator")?;
let dataflow_id = start_dataflow(
dataflow_descriptor.clone(),
name,
working_dir,
&mut *session,
)?;

if attach {
attach_dataflow(
dataflow_descriptor,
dataflow,
dataflow_id,
&mut session,
&mut *session,
hot_reload,
)?
}
}
Command::List => list(&mut session)?,
Command::Stop { uuid, name } => match (uuid, name) {
(Some(uuid), _) => stop_dataflow(uuid, &mut session)?,
(None, Some(name)) => stop_dataflow_by_name(name, &mut session)?,
(None, None) => stop_dataflow_interactive(&mut session)?,
Command::List => match connect_to_coordinator() {
Ok(mut session) => list(&mut *session)?,
Err(_) => {
bail!("No dora coordinator seems to be running.");
}
},
Command::Destroy { config } => up::destroy(config.as_deref(), &mut session)?,
Command::Stop { uuid, name } => {
let mut session =
connect_to_coordinator().wrap_err("could not connect to dora coordinator")?;
match (uuid, name) {
(Some(uuid), _) => stop_dataflow(uuid, &mut *session)?,
(None, Some(name)) => stop_dataflow_by_name(name, &mut *session)?,
(None, None) => stop_dataflow_interactive(&mut *session)?,
}
}
Command::Destroy { config } => up::destroy(config.as_deref())?,
}

Ok(())
@@ -207,9 +227,9 @@ fn start_dataflow(
dataflow: Descriptor,
name: Option<String>,
local_working_dir: PathBuf,
session: &mut Option<Box<TcpRequestReplyConnection>>,
session: &mut TcpRequestReplyConnection,
) -> Result<Uuid, eyre::ErrReport> {
let reply_raw = control_connection(session)?
let reply_raw = session
.request(
&serde_json::to_vec(&ControlRequest::Start {
dataflow,
@@ -232,9 +252,7 @@ fn start_dataflow(
}
}

fn stop_dataflow_interactive(
session: &mut Option<Box<TcpRequestReplyConnection>>,
) -> eyre::Result<()> {
fn stop_dataflow_interactive(session: &mut TcpRequestReplyConnection) -> eyre::Result<()> {
let uuids = query_running_dataflows(session).wrap_err("failed to query running dataflows")?;
if uuids.is_empty() {
eprintln!("No dataflows are running");
@@ -248,9 +266,9 @@ fn stop_dataflow_interactive(

fn stop_dataflow(
uuid: Uuid,
session: &mut Option<Box<TcpRequestReplyConnection>>,
session: &mut TcpRequestReplyConnection,
) -> Result<(), eyre::ErrReport> {
let reply_raw = control_connection(session)?
let reply_raw = session
.request(
&serde_json::to_vec(&ControlRequest::Stop {
dataflow_uuid: uuid,
@@ -269,9 +287,9 @@ fn stop_dataflow(

fn stop_dataflow_by_name(
name: String,
session: &mut Option<Box<TcpRequestReplyConnection>>,
session: &mut TcpRequestReplyConnection,
) -> Result<(), eyre::ErrReport> {
let reply_raw = control_connection(session)?
let reply_raw = session
.request(&serde_json::to_vec(&ControlRequest::StopByName { name }).unwrap())
.wrap_err("failed to send dataflow stop_by_name message")?;
let result: ControlRequestReply =
@@ -283,7 +301,7 @@ fn stop_dataflow_by_name(
}
}

fn list(session: &mut Option<Box<TcpRequestReplyConnection>>) -> Result<(), eyre::ErrReport> {
fn list(session: &mut TcpRequestReplyConnection) -> Result<(), eyre::ErrReport> {
let ids = query_running_dataflows(session)?;

if ids.is_empty() {
@@ -299,9 +317,9 @@ fn list(session: &mut Option<Box<TcpRequestReplyConnection>>) -> Result<(), eyre
}

fn query_running_dataflows(
session: &mut Option<Box<TcpRequestReplyConnection>>,
session: &mut TcpRequestReplyConnection,
) -> Result<Vec<DataflowId>, eyre::ErrReport> {
let reply_raw = control_connection(session)?
let reply_raw = session
.request(&serde_json::to_vec(&ControlRequest::List).unwrap())
.wrap_err("failed to send list message")?;
let reply: ControlRequestReply =
@@ -315,11 +333,6 @@ fn query_running_dataflows(
Ok(ids)
}

fn control_connection(
session: &mut Option<Box<TcpRequestReplyConnection>>,
) -> eyre::Result<&mut Box<TcpRequestReplyConnection>> {
Ok(match session {
Some(session) => session,
None => session.insert(TcpLayer::new().connect(control_socket_addr())?),
})
fn connect_to_coordinator() -> std::io::Result<Box<TcpRequestReplyConnection>> {
TcpLayer::new().connect(control_socket_addr())
}

+ 30
- 24
binaries/cli/src/up.rs View File

@@ -1,8 +1,4 @@
use crate::{
check::{coordinator_running, daemon_running},
control_connection,
};
use communication_layer_request_reply::TcpRequestReplyConnection;
use crate::{check::daemon_running, connect_to_coordinator};
use dora_core::topics::ControlRequest;
use eyre::Context;
use std::{fs, path::Path, process::Command, time::Duration};
@@ -17,34 +13,44 @@ pub(crate) fn up(
) -> eyre::Result<()> {
let UpConfig {} = parse_dora_config(config_path)?;

if !coordinator_running()? {
start_coordinator(coordinator).wrap_err("failed to start dora-coordinator")?;
// sleep a bit until the coordinator accepts connections
while !coordinator_running()? {
std::thread::sleep(Duration::from_millis(50));
let mut session = match connect_to_coordinator() {
Ok(session) => session,
Err(_) => {
start_coordinator(coordinator).wrap_err("failed to start dora-coordinator")?;

loop {
match connect_to_coordinator() {
Ok(session) => break session,
Err(_) => {
// sleep a bit until the coordinator accepts connections
std::thread::sleep(Duration::from_millis(50));
}
}
}
}
}
if !daemon_running()? {
};

if !daemon_running(&mut *session)? {
start_daemon(daemon).wrap_err("failed to start dora-daemon")?;
}

Ok(())
}

pub(crate) fn destroy(
config_path: Option<&Path>,
session: &mut Option<Box<TcpRequestReplyConnection>>,
) -> Result<(), eyre::ErrReport> {
pub(crate) fn destroy(config_path: Option<&Path>) -> Result<(), eyre::ErrReport> {
let UpConfig {} = parse_dora_config(config_path)?;

if coordinator_running()? {
// send destroy command to dora-coordinator
control_connection(session)?
.request(&serde_json::to_vec(&ControlRequest::Destroy).unwrap())
.wrap_err("failed to send destroy message")?;
println!("Send destroy command to dora-coordinator");
} else {
eprintln!("The dora-coordinator is not running");
match connect_to_coordinator() {
Ok(mut session) => {
// send destroy command to dora-coordinator
session
.request(&serde_json::to_vec(&ControlRequest::Destroy).unwrap())
.wrap_err("failed to send destroy message")?;
println!("Send destroy command to dora-coordinator");
}
Err(_) => {
eprintln!("Could not connect to dora-coordinator");
}
}

Ok(())


+ 4
- 1
binaries/daemon/src/spawn.rs View File

@@ -110,7 +110,10 @@ pub async fn spawn_node(
let mut command = if has_python_operator && !has_other_operator {
// Use python to spawn runtime if there is a python operator
let mut command = tokio::process::Command::new("python3");
command.args(["-c", "import dora; dora.start_runtime()"]);
command.args([
"-c",
format!("import dora; dora.start_runtime() # {}", node.id).as_str(),
]);
command
} else if !has_python_operator && has_other_operator {
let mut cmd = tokio::process::Command::new(


+ 1
- 1
binaries/runtime/src/operator/python.rs View File

@@ -25,7 +25,7 @@ use tracing::{error, field, span, warn};
fn traceback(err: pyo3::PyErr) -> eyre::Report {
let traceback = Python::with_gil(|py| err.traceback(py).and_then(|t| t.format().ok()));
if let Some(traceback) = traceback {
eyre::eyre!("{err}{traceback}")
eyre::eyre!("{traceback}\n{err}")
} else {
eyre::eyre!("{err}")
}


+ 2
- 1
libraries/extensions/telemetry/tracing/src/lib.rs View File

@@ -4,6 +4,7 @@
//! able to serialize and deserialize context that has been sent via the middleware.

use eyre::Context as EyreContext;
use tracing::metadata::LevelFilter;
use tracing_subscriber::{prelude::__tracing_subscriber_SubscriberExt, EnvFilter, Layer};

use eyre::ContextCompat;
@@ -12,7 +13,7 @@ pub mod telemetry;

pub fn set_up_tracing(name: &str) -> eyre::Result<()> {
// Filter log using `RUST_LOG`. More useful for CLI.
let filter = EnvFilter::from_default_env();
let filter = EnvFilter::from_default_env().add_directive(LevelFilter::WARN.into());
let stdout_log = tracing_subscriber::fmt::layer()
.pretty()
.with_filter(filter);


Loading…
Cancel
Save