diff --git a/.github/labeler.yml b/.github/labeler.yml new file mode 100644 index 00000000..8cb24d34 --- /dev/null +++ b/.github/labeler.yml @@ -0,0 +1,42 @@ +# Add/remove 'critical' label if issue contains the words 'urgent' or 'critical' +critical: + - "(critical|urgent)" + +cli: + - "/(cli|command)/i" + +daemon: + - "daemon" + +coordinator: + - "coordinator" + +runtime: + - "runtime" + +python: + - "python" + +c: + - "/\bc\b/i" + +c++: + - "cxx" + +rust: + - "rust" + +windows: + - "windows" + +macos: + - "macos" + +linux: + - "(linux|ubuntu)" + +bug: + - "bug" + +documentation: + - "(doc|documentation)" diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ccf81303..260aa5b4 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -71,7 +71,7 @@ jobs: name: "CLI Test" strategy: matrix: - platform: [ubuntu-latest, macos-latest] + platform: [ubuntu-latest, macos-latest, windows-latest] fail-fast: false runs-on: ${{ matrix.platform }} steps: diff --git a/.github/workflows/labeler.yml b/.github/workflows/labeler.yml new file mode 100644 index 00000000..1fe3e190 --- /dev/null +++ b/.github/workflows/labeler.yml @@ -0,0 +1,15 @@ +name: "Issue Labeler" +on: + issues: + types: [opened, edited] + +jobs: + triage: + runs-on: ubuntu-latest + steps: + - uses: github/issue-labeler@v3.1 #May not be the latest version + with: + repo-token: "${{ github.token }}" + configuration-path: .github/labeler.yml + enable-versioned-regex: 0 + include-title: 1 diff --git a/.github/workflows/pip-release.yml b/.github/workflows/pip-release.yml index 35879de0..314b5de1 100644 --- a/.github/workflows/pip-release.yml +++ b/.github/workflows/pip-release.yml @@ -14,7 +14,7 @@ jobs: strategy: matrix: - platform: [ubuntu-latest, ubuntu-20.04] + platform: [ubuntu-latest, ubuntu-20.04, macos-latest, windows-latest] python-version: ["3.7"] fail-fast: false runs-on: ${{ matrix.platform }} diff --git a/README.md b/README.md index 52e00ce4..45aaf622 100644 --- a/README.md +++ b/README.md @@ -115,22 +115,55 @@ nodes: Composability as: - [x] `YAML` declarative programming -- [x] polyglot: - - [x] Rust - - [x] C - - [x] C++ - - [x] Python - [x] Isolated operators and custom nodes that can be reused. +- [x] Hot Reloading for Python Operators Low latency as: - [x] written in ...Cough...blazingly fast ...Cough... Rust. - [x] PubSub communication with shared memory! -- [ ] Zero-copy on read! +- [x] Zero-copy! Distributed as: - [ ] PubSub communication between machines with [`zenoh`](https://github.com/eclipse-zenoh/zenoh) - [x] Distributed telemetry with [`opentelemetry`](https://github.com/open-telemetry/opentelemetry-rust) +## Support matrix + +### Programming Language API: + +- [x] Rust +- [x] Python +- [x] C +- [x] C++ +- [ ] WebAssembly (Wished for) + +### OS: +- [x] Linux Ubuntu (tested) +- [x] MacOS (tested) +- [x] Windows (tested) + +> Although, MacOS and Windows has a low priority for us now. + +### Platform: +- [x] x86 (tested) +- [ ] aarch64 + +> Other platforms should also work althougth we haven't tested them yet. + +### Data Format +- [x] Bytes +- [x] Arrow Array (Uint8) for Python +- [ ] Arrow Array (Uint16, Int32, ...) (Planned feature) +- [ ] Arrow Map (Wished feature) + +### Local Communication +- [x] TCP +- [x] Shared Memory + +### Remote Communication +- [x] TCP +- [ ] Zenoh + --- diff --git a/binaries/cli/src/attach.rs b/binaries/cli/src/attach.rs index f3e36d8f..a1e1bbc0 100644 --- a/binaries/cli/src/attach.rs +++ b/binaries/cli/src/attach.rs @@ -11,13 +11,11 @@ use std::{path::PathBuf, sync::mpsc, time::Duration}; use tracing::{error, info}; use uuid::Uuid; -use crate::control_connection; - pub fn attach_dataflow( dataflow: Descriptor, dataflow_path: PathBuf, dataflow_id: Uuid, - session: &mut Option>, + session: &mut TcpRequestReplyConnection, hot_reload: bool, ) -> Result<(), eyre::ErrReport> { let (tx, rx) = mpsc::sync_channel(2); @@ -70,7 +68,7 @@ pub fn attach_dataflow( if let Some((dataflow_id, node_id, operator_id)) = node_path_lookup.get(&path) { watcher_tx .send(ControlRequest::Reload { - dataflow_id: dataflow_id.clone(), + dataflow_id: *dataflow_id, node_id: node_id.clone(), operator_id: operator_id.clone(), }) @@ -123,7 +121,7 @@ pub fn attach_dataflow( Ok(reload_event) => reload_event, }; - let reply_raw = control_connection(session)? + let reply_raw = session .request(&serde_json::to_vec(&control_request)?) .wrap_err("failed to send request message to coordinator")?; let result: ControlRequestReply = diff --git a/binaries/cli/src/check.rs b/binaries/cli/src/check.rs index a364630f..4b77eb13 100644 --- a/binaries/cli/src/check.rs +++ b/binaries/cli/src/check.rs @@ -1,4 +1,5 @@ -use crate::control_connection; +use crate::connect_to_coordinator; +use communication_layer_request_reply::TcpRequestReplyConnection; use dora_core::topics::{ControlRequest, ControlRequestReply}; use eyre::{bail, Context}; use std::io::Write; @@ -16,19 +17,30 @@ pub fn check_environment() -> eyre::Result<()> { // check whether coordinator is running write!(stdout, "Dora Coordinator: ")?; - if coordinator_running()? { - let _ = stdout.set_color(ColorSpec::new().set_fg(Some(Color::Green))); - writeln!(stdout, "ok")?; - } else { - let _ = stdout.set_color(ColorSpec::new().set_fg(Some(Color::Red))); - writeln!(stdout, "not running")?; - error_occured = true; - } + let mut session = match connect_to_coordinator() { + Ok(session) => { + let _ = stdout.set_color(ColorSpec::new().set_fg(Some(Color::Green))); + writeln!(stdout, "ok")?; + Some(session) + } + Err(_) => { + let _ = stdout.set_color(ColorSpec::new().set_fg(Some(Color::Red))); + writeln!(stdout, "not running")?; + error_occured = true; + None + } + }; + let _ = stdout.reset(); // check whether daemon is running write!(stdout, "Dora Daemon: ")?; - if daemon_running()? { + if session + .as_deref_mut() + .map(daemon_running) + .transpose()? + .unwrap_or(false) + { let _ = stdout.set_color(ColorSpec::new().set_fg(Some(Color::Green))); writeln!(stdout, "ok")?; } else { @@ -47,30 +59,16 @@ pub fn check_environment() -> eyre::Result<()> { Ok(()) } -pub fn coordinator_running() -> Result { - let mut control_session = None; - let connected = control_connection(&mut control_session).is_ok(); - Ok(connected) -} - -pub fn daemon_running() -> Result { - let mut control_session = None; - let running = match control_connection(&mut control_session) { - Ok(connection) => { - let reply_raw = connection - .request(&serde_json::to_vec(&ControlRequest::DaemonConnected).unwrap()) - .wrap_err("failed to send DaemonConnected message")?; +pub fn daemon_running(session: &mut TcpRequestReplyConnection) -> Result { + let reply_raw = session + .request(&serde_json::to_vec(&ControlRequest::DaemonConnected).unwrap()) + .wrap_err("failed to send DaemonConnected message")?; - let reply = serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; - match reply { - ControlRequestReply::DaemonConnected(running) => running, - other => bail!("unexpected reply to daemon connection check: {other:?}"), - } - } - Err(_) => { - // coordinator is not running - false - } + let reply = serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; + let running = match reply { + ControlRequestReply::DaemonConnected(running) => running, + other => bail!("unexpected reply to daemon connection check: {other:?}"), }; + Ok(running) } diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 13552802..6c7daf04 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -117,13 +117,18 @@ enum Lang { Cxx, } -fn main() -> eyre::Result<()> { +fn main() { + if let Err(err) = run() { + eprintln!("{err:#}"); + std::process::exit(1); + } +} + +fn run() -> eyre::Result<()> { #[cfg(feature = "tracing")] set_up_tracing("dora-cli").context("failed to set up tracing subscriber")?; let args = Args::parse(); - let mut session = None; - match args.command { Command::Check { dataflow } => match dataflow { Some(dataflow) => { @@ -178,26 +183,41 @@ fn main() -> eyre::Result<()> { dataflow_descriptor .check(&working_dir) .wrap_err("Could not validate yaml")?; - let dataflow_id = - start_dataflow(dataflow_descriptor.clone(), name, working_dir, &mut session)?; + let mut session = + connect_to_coordinator().wrap_err("failed to connect to dora coordinator")?; + let dataflow_id = start_dataflow( + dataflow_descriptor.clone(), + name, + working_dir, + &mut *session, + )?; if attach { attach_dataflow( dataflow_descriptor, dataflow, dataflow_id, - &mut session, + &mut *session, hot_reload, )? } } - Command::List => list(&mut session)?, - Command::Stop { uuid, name } => match (uuid, name) { - (Some(uuid), _) => stop_dataflow(uuid, &mut session)?, - (None, Some(name)) => stop_dataflow_by_name(name, &mut session)?, - (None, None) => stop_dataflow_interactive(&mut session)?, + Command::List => match connect_to_coordinator() { + Ok(mut session) => list(&mut *session)?, + Err(_) => { + bail!("No dora coordinator seems to be running."); + } }, - Command::Destroy { config } => up::destroy(config.as_deref(), &mut session)?, + Command::Stop { uuid, name } => { + let mut session = + connect_to_coordinator().wrap_err("could not connect to dora coordinator")?; + match (uuid, name) { + (Some(uuid), _) => stop_dataflow(uuid, &mut *session)?, + (None, Some(name)) => stop_dataflow_by_name(name, &mut *session)?, + (None, None) => stop_dataflow_interactive(&mut *session)?, + } + } + Command::Destroy { config } => up::destroy(config.as_deref())?, } Ok(()) @@ -207,9 +227,9 @@ fn start_dataflow( dataflow: Descriptor, name: Option, local_working_dir: PathBuf, - session: &mut Option>, + session: &mut TcpRequestReplyConnection, ) -> Result { - let reply_raw = control_connection(session)? + let reply_raw = session .request( &serde_json::to_vec(&ControlRequest::Start { dataflow, @@ -232,9 +252,7 @@ fn start_dataflow( } } -fn stop_dataflow_interactive( - session: &mut Option>, -) -> eyre::Result<()> { +fn stop_dataflow_interactive(session: &mut TcpRequestReplyConnection) -> eyre::Result<()> { let uuids = query_running_dataflows(session).wrap_err("failed to query running dataflows")?; if uuids.is_empty() { eprintln!("No dataflows are running"); @@ -248,9 +266,9 @@ fn stop_dataflow_interactive( fn stop_dataflow( uuid: Uuid, - session: &mut Option>, + session: &mut TcpRequestReplyConnection, ) -> Result<(), eyre::ErrReport> { - let reply_raw = control_connection(session)? + let reply_raw = session .request( &serde_json::to_vec(&ControlRequest::Stop { dataflow_uuid: uuid, @@ -269,9 +287,9 @@ fn stop_dataflow( fn stop_dataflow_by_name( name: String, - session: &mut Option>, + session: &mut TcpRequestReplyConnection, ) -> Result<(), eyre::ErrReport> { - let reply_raw = control_connection(session)? + let reply_raw = session .request(&serde_json::to_vec(&ControlRequest::StopByName { name }).unwrap()) .wrap_err("failed to send dataflow stop_by_name message")?; let result: ControlRequestReply = @@ -283,7 +301,7 @@ fn stop_dataflow_by_name( } } -fn list(session: &mut Option>) -> Result<(), eyre::ErrReport> { +fn list(session: &mut TcpRequestReplyConnection) -> Result<(), eyre::ErrReport> { let ids = query_running_dataflows(session)?; if ids.is_empty() { @@ -299,9 +317,9 @@ fn list(session: &mut Option>) -> Result<(), eyre } fn query_running_dataflows( - session: &mut Option>, + session: &mut TcpRequestReplyConnection, ) -> Result, eyre::ErrReport> { - let reply_raw = control_connection(session)? + let reply_raw = session .request(&serde_json::to_vec(&ControlRequest::List).unwrap()) .wrap_err("failed to send list message")?; let reply: ControlRequestReply = @@ -315,11 +333,6 @@ fn query_running_dataflows( Ok(ids) } -fn control_connection( - session: &mut Option>, -) -> eyre::Result<&mut Box> { - Ok(match session { - Some(session) => session, - None => session.insert(TcpLayer::new().connect(control_socket_addr())?), - }) +fn connect_to_coordinator() -> std::io::Result> { + TcpLayer::new().connect(control_socket_addr()) } diff --git a/binaries/cli/src/up.rs b/binaries/cli/src/up.rs index 54cd6b5f..7d551a74 100644 --- a/binaries/cli/src/up.rs +++ b/binaries/cli/src/up.rs @@ -1,8 +1,4 @@ -use crate::{ - check::{coordinator_running, daemon_running}, - control_connection, -}; -use communication_layer_request_reply::TcpRequestReplyConnection; +use crate::{check::daemon_running, connect_to_coordinator}; use dora_core::topics::ControlRequest; use eyre::Context; use std::{fs, path::Path, process::Command, time::Duration}; @@ -17,34 +13,44 @@ pub(crate) fn up( ) -> eyre::Result<()> { let UpConfig {} = parse_dora_config(config_path)?; - if !coordinator_running()? { - start_coordinator(coordinator).wrap_err("failed to start dora-coordinator")?; - // sleep a bit until the coordinator accepts connections - while !coordinator_running()? { - std::thread::sleep(Duration::from_millis(50)); + let mut session = match connect_to_coordinator() { + Ok(session) => session, + Err(_) => { + start_coordinator(coordinator).wrap_err("failed to start dora-coordinator")?; + + loop { + match connect_to_coordinator() { + Ok(session) => break session, + Err(_) => { + // sleep a bit until the coordinator accepts connections + std::thread::sleep(Duration::from_millis(50)); + } + } + } } - } - if !daemon_running()? { + }; + + if !daemon_running(&mut *session)? { start_daemon(daemon).wrap_err("failed to start dora-daemon")?; } Ok(()) } -pub(crate) fn destroy( - config_path: Option<&Path>, - session: &mut Option>, -) -> Result<(), eyre::ErrReport> { +pub(crate) fn destroy(config_path: Option<&Path>) -> Result<(), eyre::ErrReport> { let UpConfig {} = parse_dora_config(config_path)?; - if coordinator_running()? { - // send destroy command to dora-coordinator - control_connection(session)? - .request(&serde_json::to_vec(&ControlRequest::Destroy).unwrap()) - .wrap_err("failed to send destroy message")?; - println!("Send destroy command to dora-coordinator"); - } else { - eprintln!("The dora-coordinator is not running"); + match connect_to_coordinator() { + Ok(mut session) => { + // send destroy command to dora-coordinator + session + .request(&serde_json::to_vec(&ControlRequest::Destroy).unwrap()) + .wrap_err("failed to send destroy message")?; + println!("Send destroy command to dora-coordinator"); + } + Err(_) => { + eprintln!("Could not connect to dora-coordinator"); + } } Ok(()) diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index a46f655e..15581412 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -110,7 +110,10 @@ pub async fn spawn_node( let mut command = if has_python_operator && !has_other_operator { // Use python to spawn runtime if there is a python operator let mut command = tokio::process::Command::new("python3"); - command.args(["-c", "import dora; dora.start_runtime()"]); + command.args([ + "-c", + format!("import dora; dora.start_runtime() # {}", node.id).as_str(), + ]); command } else if !has_python_operator && has_other_operator { let mut cmd = tokio::process::Command::new( diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index b94001a5..3414c959 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -25,7 +25,7 @@ use tracing::{error, field, span, warn}; fn traceback(err: pyo3::PyErr) -> eyre::Report { let traceback = Python::with_gil(|py| err.traceback(py).and_then(|t| t.format().ok())); if let Some(traceback) = traceback { - eyre::eyre!("{err}{traceback}") + eyre::eyre!("{traceback}\n{err}") } else { eyre::eyre!("{err}") } diff --git a/libraries/extensions/telemetry/tracing/src/lib.rs b/libraries/extensions/telemetry/tracing/src/lib.rs index 4d069e55..75c10783 100644 --- a/libraries/extensions/telemetry/tracing/src/lib.rs +++ b/libraries/extensions/telemetry/tracing/src/lib.rs @@ -4,6 +4,7 @@ //! able to serialize and deserialize context that has been sent via the middleware. use eyre::Context as EyreContext; +use tracing::metadata::LevelFilter; use tracing_subscriber::{prelude::__tracing_subscriber_SubscriberExt, EnvFilter, Layer}; use eyre::ContextCompat; @@ -12,7 +13,7 @@ pub mod telemetry; pub fn set_up_tracing(name: &str) -> eyre::Result<()> { // Filter log using `RUST_LOG`. More useful for CLI. - let filter = EnvFilter::from_default_env(); + let filter = EnvFilter::from_default_env().add_directive(LevelFilter::WARN.into()); let stdout_log = tracing_subscriber::fmt::layer() .pretty() .with_filter(filter);