diff --git a/.github/workflows/ci-python.yml b/.github/workflows/ci-python.yml index 0ab79c8a..d04d8e24 100644 --- a/.github/workflows/ci-python.yml +++ b/.github/workflows/ci-python.yml @@ -14,13 +14,13 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 - - name: Install Cap'n Proto and libacl-dev + - name: Install libacl-dev run: | export DEBIAN_FRONTEND=noninteractive - sudo apt-get install -y capnproto libcapnp-dev libacl1-dev + sudo apt-get install -y libacl1-dev - uses: actions/setup-python@v2 with: - python-version: 3.8.10 + python-version: 3.8 - uses: r7kamura/rust-problem-matchers@v1.1.0 - run: cargo --version --verbose diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 108bbcfb..b9e5624a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -16,24 +16,11 @@ jobs: timeout-minutes: 30 steps: - uses: actions/checkout@v3 - - name: Install Cap'n Proto and libacl-dev (Linux) + - name: Install libacl-dev (Linux) if: runner.os == 'Linux' run: | export DEBIAN_FRONTEND=noninteractive - sudo apt-get install -y capnproto libcapnp-dev libacl1-dev - - name: Install Cap'n Proto (macOS) - if: runner.os == 'macOS' - run: brew install capnp - env: - HOMEBREW_NO_AUTO_UPDATE: 1 - HOMEBREW_NO_BOTTLE_SOURCE_FALLBACK: 1 - HOMEBREW_NO_INSTALL_CLEANUP: 1 - - name: Install Cap'n Proto (Windows) - if: runner.os == 'Windows' - shell: pwsh - run: | - choco install capnproto - echo "$Env:Programfiles\capnproto" | Out-File -FilePath $env:GITHUB_PATH -Encoding utf8 -Append + sudo apt-get install -y libacl1-dev - uses: r7kamura/rust-problem-matchers@v1.1.0 - run: cargo --version --verbose @@ -56,24 +43,11 @@ jobs: steps: - uses: actions/checkout@v3 - - name: Install Cap'n Proto and libacl-dev (Linux) + - name: Install libacl-dev (Linux) if: runner.os == 'Linux' run: | export DEBIAN_FRONTEND=noninteractive - sudo apt-get install -y capnproto libcapnp-dev libacl1-dev - - name: Install Cap'n Proto (macOS) - if: runner.os == 'macOS' - run: brew install capnp - env: - HOMEBREW_NO_AUTO_UPDATE: 1 - HOMEBREW_NO_BOTTLE_SOURCE_FALLBACK: 1 - HOMEBREW_NO_INSTALL_CLEANUP: 1 - - name: Install Cap'n Proto (Windows) - if: runner.os == 'Windows' - shell: pwsh - run: | - choco install capnproto - echo "$Env:Programfiles\capnproto" | Out-File -FilePath $env:GITHUB_PATH -Encoding utf8 -Append + sudo apt-get install -y libacl1-dev - uses: r7kamura/rust-problem-matchers@v1.1.0 - run: cargo --version --verbose @@ -108,11 +82,11 @@ jobs: steps: - uses: actions/checkout@v3 - - name: Install Cap'n Proto and libacl-dev (Linux) + - name: Install libacl-dev (Linux) if: runner.os == 'Linux' run: | export DEBIAN_FRONTEND=noninteractive - sudo apt-get install -y capnproto libcapnp-dev libacl1-dev + sudo apt-get install -y libacl1-dev - uses: r7kamura/rust-problem-matchers@v1.1.0 - run: cargo --version --verbose @@ -127,10 +101,10 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 - - name: Install Cap'n Proto and libacl-dev + - name: Install libacl-dev run: | export DEBIAN_FRONTEND=noninteractive - sudo apt-get install -y capnproto libcapnp-dev libacl1-dev + sudo apt-get install -y libacl1-dev - uses: r7kamura/rust-problem-matchers@v1.1.0 - run: cargo --version --verbose diff --git a/.github/workflows/pip-release.yml b/.github/workflows/pip-release.yml new file mode 100644 index 00000000..5e2a5933 --- /dev/null +++ b/.github/workflows/pip-release.yml @@ -0,0 +1,52 @@ +name: Pypi Release + +permissions: + contents: write + +on: + release: + types: + - "published" + +jobs: + release: + name: "Pypi Release" + + strategy: + matrix: + platform: [ubuntu-latest, ubuntu-20.04] + python-version: ["3.7"] + fail-fast: false + runs-on: ${{ matrix.platform }} + + steps: + - uses: actions/checkout@v3 + + - name: Install libacl-dev (Linux) + if: runner.os == 'Linux' + run: | + export DEBIAN_FRONTEND=noninteractive + sudo apt-get install -y libacl1-dev + - uses: r7kamura/rust-problem-matchers@v1.1.0 + + # Publish Dora Node Python API + - name: Set up Python + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install maturin==0.14 + pip install patchelf --upgrade + - name: Publish wheel + shell: bash + env: + MATURIN_PASSWORD: ${{ secrets.PYPI_PASS }} + run: | + cd apis/python/node + maturin publish \ + --skip-existing \ + -o wheels \ + --no-sdist \ + --username __token__ \ diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 5b37ec1e..05a29b61 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -14,7 +14,7 @@ jobs: strategy: matrix: - platform: [ubuntu-latest, macos-latest, windows-latest] + platform: [ubuntu-20.04, ubuntu-22.04, macos-12, windows-2022] python-version: ["3.7"] fail-fast: false runs-on: ${{ matrix.platform }} @@ -22,24 +22,11 @@ jobs: steps: - uses: actions/checkout@v3 - - name: Install Cap'n Proto and libacl-dev (Linux) + - name: Install libacl-dev (Linux) if: runner.os == 'Linux' run: | export DEBIAN_FRONTEND=noninteractive - sudo apt-get install -y capnproto libcapnp-dev libacl1-dev - - name: Install Cap'n Proto (macOS) - if: runner.os == 'macOS' - run: brew install capnp - env: - HOMEBREW_NO_AUTO_UPDATE: 1 - HOMEBREW_NO_BOTTLE_SOURCE_FALLBACK: 1 - HOMEBREW_NO_INSTALL_CLEANUP: 1 - - name: Install Cap'n Proto (Windows) - if: runner.os == 'Windows' - shell: pwsh - run: | - choco install capnproto - echo "$Env:Programfiles\capnproto" | Out-File -FilePath $env:GITHUB_PATH -Encoding utf8 -Append + sudo apt-get install -y libacl1-dev - uses: r7kamura/rust-problem-matchers@v1.1.0 @@ -78,27 +65,5 @@ jobs: with: upload_url: ${{ github.event.release.upload_url }} asset_path: archive.zip - asset_name: dora-${{ github.ref_name }}-x86_64-${{ runner.os }}.zip + asset_name: dora-${{ github.ref_name }}-x86_64-${{ matrix.platform }}.zip asset_content_type: application/zip - - # Publish Dora Node Python API - - name: Set up Python - uses: actions/setup-python@v2 - with: - python-version: ${{ matrix.python-version }} - - name: Install dependencies - run: | - python -m pip install --upgrade pip - pip install maturin==0.13.2 - - name: Publish wheel - shell: bash - env: - MATURIN_PASSWORD: ${{ secrets.PYPI_PASS }} - run: | - cd apis/python/node - maturin publish \ - --no-sdist \ - --skip-existing \ - -o wheels \ - -i python \ - --username __token__ \ diff --git a/Cargo.lock b/Cargo.lock index 01cdbc55..618c71f0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -541,7 +541,7 @@ dependencies = [ [[package]] name = "communication-layer-pub-sub" -version = "0.1.0" +version = "0.1.1" dependencies = [ "eyre", "iceoryx-rs", @@ -552,7 +552,7 @@ dependencies = [ [[package]] name = "communication-layer-request-reply" -version = "0.1.0" +version = "0.1.1" dependencies = [ "eyre", ] @@ -903,7 +903,7 @@ dependencies = [ [[package]] name = "dora-cli" -version = "0.1.0" +version = "0.1.1" dependencies = [ "atty", "clap 4.0.3", @@ -923,7 +923,7 @@ dependencies = [ [[package]] name = "dora-coordinator" -version = "0.1.0" +version = "0.1.1" dependencies = [ "bincode", "clap 3.2.20", @@ -953,7 +953,7 @@ dependencies = [ [[package]] name = "dora-core" -version = "0.1.0" +version = "0.1.1" dependencies = [ "dora-message", "eyre", @@ -988,7 +988,7 @@ dependencies = [ [[package]] name = "dora-download" -version = "0.1.0" +version = "0.1.1" dependencies = [ "eyre", "reqwest", @@ -1009,7 +1009,7 @@ dependencies = [ [[package]] name = "dora-message" -version = "0.1.0" +version = "0.1.1" dependencies = [ "capnp", "capnpc", @@ -1019,7 +1019,7 @@ dependencies = [ [[package]] name = "dora-metrics" -version = "0.1.0" +version = "0.1.1" dependencies = [ "futures", "opentelemetry", @@ -1030,7 +1030,7 @@ dependencies = [ [[package]] name = "dora-node-api" -version = "0.1.0" +version = "0.1.1" dependencies = [ "capnp", "dora-core", @@ -1051,7 +1051,7 @@ dependencies = [ [[package]] name = "dora-node-api-c" -version = "0.1.0" +version = "0.1.1" dependencies = [ "dora-node-api", "eyre", @@ -1061,7 +1061,7 @@ dependencies = [ [[package]] name = "dora-node-api-cxx" -version = "0.1.0" +version = "0.1.1" dependencies = [ "cxx", "cxx-build", @@ -1071,10 +1071,11 @@ dependencies = [ [[package]] name = "dora-node-api-python" -version = "0.1.0" +version = "0.1.1-2" dependencies = [ "dora-node-api", "dora-operator-api-python", + "dora-runtime", "eyre", "flume", "pyo3", @@ -1083,7 +1084,7 @@ dependencies = [ [[package]] name = "dora-operator-api" -version = "0.1.0" +version = "0.1.1" dependencies = [ "dora-operator-api-macros", "dora-operator-api-types", @@ -1091,14 +1092,14 @@ dependencies = [ [[package]] name = "dora-operator-api-c" -version = "0.1.0" +version = "0.1.1" dependencies = [ "dora-operator-api-types", ] [[package]] name = "dora-operator-api-cxx" -version = "0.1.0" +version = "0.1.1" dependencies = [ "cxx", "cxx-build", @@ -1111,7 +1112,7 @@ dependencies = [ [[package]] name = "dora-operator-api-macros" -version = "0.1.0" +version = "0.1.1" dependencies = [ "dora-operator-api", "dora-operator-api-types", @@ -1122,7 +1123,7 @@ dependencies = [ [[package]] name = "dora-operator-api-python" -version = "0.1.0" +version = "0.1.1" dependencies = [ "dora-node-api", "eyre", @@ -1133,14 +1134,14 @@ dependencies = [ [[package]] name = "dora-operator-api-types" -version = "0.1.0" +version = "0.1.1" dependencies = [ "safer-ffi", ] [[package]] name = "dora-runtime" -version = "0.1.0" +version = "0.1.1" dependencies = [ "clap 3.2.20", "dora-core", @@ -1171,7 +1172,7 @@ dependencies = [ [[package]] name = "dora-tracing" -version = "0.1.0" +version = "0.1.1" dependencies = [ "opentelemetry", "opentelemetry-jaeger", @@ -1717,7 +1718,7 @@ dependencies = [ [[package]] name = "iceoryx-example-node" -version = "0.1.0" +version = "0.1.1" dependencies = [ "dora-node-api", "eyre", @@ -1726,14 +1727,14 @@ dependencies = [ [[package]] name = "iceoryx-example-operator" -version = "0.1.0" +version = "0.1.1" dependencies = [ "dora-operator-api", ] [[package]] name = "iceoryx-example-sink" -version = "0.1.0" +version = "0.1.1" dependencies = [ "dora-node-api", "eyre", @@ -2271,9 +2272,9 @@ dependencies = [ [[package]] name = "ndk-sys" -version = "0.4.0" +version = "0.4.1+23.1.7779620" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21d83ec9c63ec5bf950200a8e508bdad6659972187b625469f58ef8c08e29046" +checksum = "3cf2aae958bd232cac5069850591667ad422d263686d75b52a065f9badeee5a3" dependencies = [ "jni-sys", ] @@ -3309,7 +3310,7 @@ dependencies = [ [[package]] name = "rust-dataflow-example-node" -version = "0.1.0" +version = "0.1.1" dependencies = [ "dora-node-api", "eyre", @@ -3320,14 +3321,14 @@ dependencies = [ [[package]] name = "rust-dataflow-example-operator" -version = "0.1.0" +version = "0.1.1" dependencies = [ "dora-operator-api", ] [[package]] name = "rust-dataflow-example-sink" -version = "0.1.0" +version = "0.1.1" dependencies = [ "dora-node-api", "eyre", @@ -4997,7 +4998,7 @@ dependencies = [ [[package]] name = "zenoh-logger" -version = "0.1.0" +version = "0.1.1" dependencies = [ "zenoh", "zenoh-config", diff --git a/README.md b/README.md index a96b4619..c36b2cfc 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,8 @@ Dataflow Oriented Robotic Architecture ⚡ This project is in early development, and many features have yet to be implemented with breaking changes. Please don't take for granted the current design. +`dora` primary support is with `Linux` ( Ubuntu 20.04 and Ubuntu 22.04 ) as it is the primary OS for both Cloud and small computers. If you wish to use `dora` with another OS, please compile from source. + --- ## 📖 Documentation @@ -28,34 +30,11 @@ For linux ```bash wget https://github.com/dora-rs/dora/releases/download//dora--x86_64-Linux.zip unzip dora--x86_64-Linux.zip +python3 -m pip install dora-rs== PATH=$PATH:$(pwd):$(pwd)/iceoryx dora --help ``` -
- For Macos - -```bash -wget https://github.com/dora-rs/dora/releases/download//dora--x86_64-macOS.zip -unzip dora--x86_64-macOS.zip -PATH=$PATH:$(pwd):$(pwd)/iceoryx -dora --help -``` - -
- -
- For Windows - -```bash -wget https://github.com/dora-rs/dora/releases/download//dora--x86_64-Windows.zip -unzip dora--x86_64-Windows.zip -PATH=$PATH:$(pwd):$(pwd)/iceoryx -dora --help -``` - -
- > This is `x86_64` only for the moment. 2. Create a new dataflow diff --git a/apis/c++/node/Cargo.toml b/apis/c++/node/Cargo.toml index 028d9b8c..897c7eac 100644 --- a/apis/c++/node/Cargo.toml +++ b/apis/c++/node/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dora-node-api-cxx" -version = "0.1.0" +version = "0.1.1" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -10,7 +10,7 @@ crate-type = ["staticlib"] [dependencies] cxx = "1.0.73" -dora-node-api = { version = "0.1.0", path = "../../../apis/rust/node", default-features = false } +dora-node-api = { version = "0.1.1", path = "../../../apis/rust/node", default-features = false } eyre = "0.6.8" [build-dependencies] diff --git a/apis/c++/operator/Cargo.toml b/apis/c++/operator/Cargo.toml index 980e07ca..9f359563 100644 --- a/apis/c++/operator/Cargo.toml +++ b/apis/c++/operator/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dora-operator-api-cxx" -version = "0.1.0" +version = "0.1.1" edition = "2021" [lib] @@ -8,7 +8,7 @@ crate-type = ["staticlib"] [dependencies] cxx = "1.0.73" -dora-operator-api = { version = "0.1.0", path = "../../../apis/rust/operator" } +dora-operator-api = { version = "0.1.1", path = "../../../apis/rust/operator" } eyre = "0.6.8" futures = "0.3.21" rand = "0.8.5" diff --git a/apis/c/node/Cargo.toml b/apis/c/node/Cargo.toml index c38fc627..49912e64 100644 --- a/apis/c/node/Cargo.toml +++ b/apis/c/node/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dora-node-api-c" -version = "0.1.0" +version = "0.1.1" edition = "2021" license = "Apache-2.0" diff --git a/apis/c/operator/Cargo.toml b/apis/c/operator/Cargo.toml index ca457efd..82d857f8 100644 --- a/apis/c/operator/Cargo.toml +++ b/apis/c/operator/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dora-operator-api-c" -version = "0.1.0" +version = "0.1.1" edition = "2021" license = "Apache-2.0" description = "C API implemetation for Dora Operator" diff --git a/apis/python/node/Cargo.toml b/apis/python/node/Cargo.toml index ce870912..fdce146c 100644 --- a/apis/python/node/Cargo.toml +++ b/apis/python/node/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dora-node-api-python" -version = "0.1.0" +version = "0.1.1-2" edition = "2021" license = "Apache-2.0" @@ -13,6 +13,7 @@ pyo3 = { version = "0.16", features = ["eyre", "abi3-py37"] } eyre = "0.6" serde_yaml = "0.8.23" flume = "0.10.14" +dora-runtime = { path = "../../../binaries/runtime" } [lib] name = "dora" diff --git a/apis/python/node/src/lib.rs b/apis/python/node/src/lib.rs index 97f68014..b729b782 100644 --- a/apis/python/node/src/lib.rs +++ b/apis/python/node/src/lib.rs @@ -64,7 +64,7 @@ impl Node { data: &PyBytes, metadata: Option<&PyDict>, ) -> Result<()> { - let data = &data.as_bytes(); + let data = data.as_bytes(); let metadata = pydict_to_metadata(metadata)?; self.node .send_output(&output_id.into(), metadata, data.len(), |out| { @@ -78,8 +78,17 @@ impl Node { } } +#[pyfunction] +fn start_runtime() -> Result<()> { + dora_runtime::main() + .wrap_err("Python Dora Runtime failed.") + .unwrap(); + Ok(()) +} + #[pymodule] fn dora(_py: Python, m: &PyModule) -> PyResult<()> { + m.add_function(wrap_pyfunction!(start_runtime, m)?)?; m.add_class::().unwrap(); Ok(()) } diff --git a/apis/python/operator/Cargo.toml b/apis/python/operator/Cargo.toml index d51de4a0..fd3a42b7 100644 --- a/apis/python/operator/Cargo.toml +++ b/apis/python/operator/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dora-operator-api-python" -version = "0.1.0" +version = "0.1.1" edition = "2021" license = "Apache-2.0" diff --git a/apis/rust/node/Cargo.toml b/apis/rust/node/Cargo.toml index 63d731b6..52166b1a 100644 --- a/apis/rust/node/Cargo.toml +++ b/apis/rust/node/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dora-node-api" -version = "0.1.0" +version = "0.1.1" edition = "2021" license = "Apache-2.0" diff --git a/apis/rust/operator/Cargo.toml b/apis/rust/operator/Cargo.toml index 08c74a9b..233b42d8 100644 --- a/apis/rust/operator/Cargo.toml +++ b/apis/rust/operator/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dora-operator-api" -version = "0.1.0" +version = "0.1.1" edition = "2021" license = "Apache-2.0" description = "Rust API implemetation for Dora Operator" diff --git a/apis/rust/operator/macros/Cargo.toml b/apis/rust/operator/macros/Cargo.toml index 2565aa4e..066d808c 100644 --- a/apis/rust/operator/macros/Cargo.toml +++ b/apis/rust/operator/macros/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dora-operator-api-macros" -version = "0.1.0" +version = "0.1.1" edition = "2021" license = "Apache-2.0" description = "Rust API Macros for Dora Operator" diff --git a/apis/rust/operator/types/Cargo.toml b/apis/rust/operator/types/Cargo.toml index 43cab511..2d6cbc21 100644 --- a/apis/rust/operator/types/Cargo.toml +++ b/apis/rust/operator/types/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dora-operator-api-types" -version = "0.1.0" +version = "0.1.1" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/binaries/cli/Cargo.toml b/binaries/cli/Cargo.toml index 6803514b..96c77e33 100644 --- a/binaries/cli/Cargo.toml +++ b/binaries/cli/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dora-cli" -version = "0.1.0" +version = "0.1.1" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/binaries/cli/src/check.rs b/binaries/cli/src/check.rs index c0685454..64bf3e47 100644 --- a/binaries/cli/src/check.rs +++ b/binaries/cli/src/check.rs @@ -3,7 +3,6 @@ use dora_core::{ adjust_shared_library_path, config::{InputMapping, UserInputMapping}, descriptor::{self, source_is_url, CoreNodeKind, OperatorSource}, - topics::ControlRequest, }; use eyre::{bail, eyre, Context}; use std::{env::consts::EXE_EXTENSION, io::Write, path::Path}; diff --git a/binaries/cli/src/template/cxx/mod.rs b/binaries/cli/src/template/cxx/mod.rs index 6532bc85..6d96fc5e 100644 --- a/binaries/cli/src/template/cxx/mod.rs +++ b/binaries/cli/src/template/cxx/mod.rs @@ -64,7 +64,7 @@ fn create_operator(name: String, path: Option) -> Result<(), eyre::ErrR // create directories let root = path.as_deref().unwrap_or_else(|| Path::new(&name)); - fs::create_dir(&root) + fs::create_dir(root) .with_context(|| format!("failed to create directory `{}`", root.display()))?; let operator_path = root.join("operator.cc"); @@ -96,7 +96,7 @@ fn create_custom_node(name: String, path: Option) -> Result<(), eyre::E // create directories let root = path.as_deref().unwrap_or_else(|| Path::new(&name)); - fs::create_dir(&root) + fs::create_dir(root) .with_context(|| format!("failed to create directory `{}`", root.display()))?; let node_path = root.join("node.cc"); diff --git a/binaries/cli/src/template/rust/node/Cargo-template.toml b/binaries/cli/src/template/rust/node/Cargo-template.toml index 1493f3fd..518d4272 100644 --- a/binaries/cli/src/template/rust/node/Cargo-template.toml +++ b/binaries/cli/src/template/rust/node/Cargo-template.toml @@ -6,4 +6,4 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -dora-node-api = { git = "https://github.com/dora-rs/dora.git" } +dora-node-api = { git = "https://github.com/dora-rs/dora.git", tag = "v0.1.1" } diff --git a/binaries/cli/src/template/rust/node/main-template.rs b/binaries/cli/src/template/rust/node/main-template.rs index a7c01a66..f1fdfbbb 100644 --- a/binaries/cli/src/template/rust/node/main-template.rs +++ b/binaries/cli/src/template/rust/node/main-template.rs @@ -1,4 +1,4 @@ -use dora_node_api::{self, core::config::DataId, DoraNode}; +use dora_node_api::DoraNode; use std::error::Error; fn main() -> Result<(), Box> { diff --git a/binaries/cli/src/template/rust/operator/Cargo-template.toml b/binaries/cli/src/template/rust/operator/Cargo-template.toml index 0b27ecfe..32317f32 100644 --- a/binaries/cli/src/template/rust/operator/Cargo-template.toml +++ b/binaries/cli/src/template/rust/operator/Cargo-template.toml @@ -9,4 +9,4 @@ edition = "2021" crate-type = ["cdylib"] [dependencies] -dora-operator-api = { git = "https://github.com/dora-rs/dora.git" } +dora-operator-api = { git = "https://github.com/dora-rs/dora.git", tag = "v0.1.1" } diff --git a/binaries/coordinator/Cargo.toml b/binaries/coordinator/Cargo.toml index 0b035824..624e38fe 100644 --- a/binaries/coordinator/Cargo.toml +++ b/binaries/coordinator/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dora-coordinator" -version = "0.1.0" +version = "0.1.1" edition = "2021" license = "Apache-2.0" @@ -20,7 +20,7 @@ clap = { version = "3.1.8", features = ["derive"] } uuid = { version = "1.2.1" } time = "0.3.9" rand = "0.8.5" -dora-core = { version = "0.1.0", path = "../../libraries/core" } +dora-core = { version = "0.1.1", path = "../../libraries/core" } dora-message = { path = "../../libraries/message" } tracing = "0.1.36" tracing-subscriber = "0.3.15" diff --git a/binaries/coordinator/src/control.rs b/binaries/coordinator/src/control.rs index a2066952..e0a0bd5a 100644 --- a/binaries/coordinator/src/control.rs +++ b/binaries/coordinator/src/control.rs @@ -81,14 +81,23 @@ fn handle_requests( Ok(reply) })); if let Err(err) = result { - if err.kind() == ErrorKind::Other { - let inner = err.into_inner().unwrap(); - let downcasted = inner.downcast_ref().unwrap(); - match downcasted { - HandlerError::ParseError(err) => { - tracing::warn!("failed to parse request: {err}"); + match err.kind() { + ErrorKind::UnexpectedEof => { + tracing::trace!("Control connection closed"); + break; + } + ErrorKind::Other => { + let inner = err.into_inner().unwrap(); + let downcasted = inner.downcast_ref().unwrap(); + match downcasted { + HandlerError::ParseError(err) => { + tracing::warn!("failed to parse request: {err}"); + } + HandlerError::ServerStopped => break, } - HandlerError::ServerStopped => break, + } + _ => { + tracing::warn!("I/O error while trying to receive control request: {err:?}"); } } } @@ -103,6 +112,7 @@ enum HandlerError { ServerStopped, } +#[derive(Debug)] pub enum ControlEvent { IncomingRequest { request: ControlRequest, diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 0143b5f8..dc12ef20 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -99,6 +99,7 @@ async fn start(runtime_path: &Path) -> eyre::Result<()> { let mut daemon_connections = HashMap::new(); while let Some(event) = events.next().await { + tracing::trace!("Handling event {event:?}"); match event { Event::NewDaemonConnection(connection) => { let events_tx = daemon_events_tx.clone(); @@ -369,6 +370,7 @@ async fn start_dataflow( }) } +#[derive(Debug)] pub enum Event { NewDaemonConnection(TcpStream), DaemonConnectError(eyre::Report), @@ -377,10 +379,12 @@ pub enum Event { Daemon(DaemonEvent), } +#[derive(Debug)] pub enum DataflowEvent { Finished { result: eyre::Result<()> }, } +#[derive(Debug)] pub enum DaemonEvent { Register { machine_id: String, diff --git a/binaries/coordinator/src/run/runtime.rs b/binaries/coordinator/src/run/runtime.rs index e6aba2da..2a3c5541 100644 --- a/binaries/coordinator/src/run/runtime.rs +++ b/binaries/coordinator/src/run/runtime.rs @@ -1,7 +1,7 @@ use super::command_init_common_env; use dora_core::{ config::NodeId, - descriptor::{self, EnvValue}, + descriptor::{self, EnvValue, OperatorSource}, }; use eyre::{eyre, WrapErr}; use std::{collections::BTreeMap, path::Path}; @@ -15,7 +15,30 @@ pub fn spawn_runtime_node( communication: &dora_core::config::CommunicationConfig, working_dir: &Path, ) -> eyre::Result>> { - let mut command = tokio::process::Command::new(runtime); + let has_python_operator = node + .operators + .iter() + .any(|x| matches!(x.config.source, OperatorSource::Python { .. })); + + let has_other_operator = node + .operators + .iter() + .any(|x| !matches!(x.config.source, OperatorSource::Python { .. })); + + let mut command = if has_python_operator && !has_other_operator { + // Use python to spawn runtime if there is a python operator + let mut command = tokio::process::Command::new("python3"); + command.args(["-c", "import dora; dora.start_runtime()"]); + command + } else if !has_python_operator && has_other_operator { + // Use default runtime if there is no python operator + tokio::process::Command::new(runtime) + } else { + return Err(eyre!( + "Runtime can not mix Python Operator with other type of operator." + )); + }; + command_init_common_env(&mut command, &node_id, communication)?; command.env( "DORA_OPERATORS", diff --git a/binaries/runtime/Cargo.toml b/binaries/runtime/Cargo.toml index 8d0debb6..66dc3511 100644 --- a/binaries/runtime/Cargo.toml +++ b/binaries/runtime/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dora-runtime" -version = "0.1.0" +version = "0.1.1" edition = "2021" license = "Apache-2.0" @@ -11,7 +11,7 @@ clap = { version = "3.1.12", features = ["derive"] } dora-node-api = { path = "../../apis/rust/node", default-features = false } dora-operator-api-python = { path = "../../apis/python/operator" } dora-operator-api-types = { path = "../../apis/rust/operator/types" } -dora-core = { version = "0.1.0", path = "../../libraries/core" } +dora-core = { version = "0.1.1", path = "../../libraries/core" } dora-tracing = { path = "../../libraries/extensions/telemetry/tracing", optional = true } dora-metrics = { path = "../../libraries/extensions/telemetry/metrics", optional = true } opentelemetry = { version = "0.17", features = [ @@ -29,7 +29,7 @@ tokio-stream = "0.1.8" zenoh = { git = "https://github.com/eclipse-zenoh/zenoh.git", rev = "79a136e4fd90b11ff5d775ced981af53c4f1071b" } zenoh-config = { git = "https://github.com/eclipse-zenoh/zenoh.git", rev = "79a136e4fd90b11ff5d775ced981af53c4f1071b" } fern = "0.6.1" -pyo3 = { version = "0.16.5", features = ["auto-initialize", "eyre", "abi3"] } +pyo3 = { version = "0.16", features = ["eyre", "abi3-py37"] } # pyo3-abi3 flag allow simpler linking. See: https://pyo3.rs/v0.13.2/building_and_distribution.html flume = "0.10.14" dora-message = { path = "../../libraries/message" } diff --git a/binaries/runtime/src/lib.rs b/binaries/runtime/src/lib.rs new file mode 100644 index 00000000..fedf559b --- /dev/null +++ b/binaries/runtime/src/lib.rs @@ -0,0 +1,203 @@ +#![warn(unsafe_op_in_unsafe_fn)] + +use dora_core::{ + config::{CommunicationConfig, DataId, NodeId, OperatorId}, + descriptor::OperatorDefinition, +}; +use dora_node_api::{ + self, + communication::{self, CommunicationLayer, Publisher, STOP_TOPIC}, + manual_stop_publisher, +}; +use eyre::{bail, Context, Result}; +use futures::{Stream, StreamExt}; +use operator::{spawn_operator, OperatorEvent, StopReason}; + +use std::{ + collections::{BTreeSet, HashMap}, + mem, +}; +use tokio::{runtime::Builder, sync::mpsc}; +use tokio_stream::{wrappers::ReceiverStream, StreamMap}; + +mod operator; + +pub fn main() -> eyre::Result<()> { + set_up_tracing().context("failed to set up tracing subscriber")?; + + let node_id: NodeId = { + let raw = + std::env::var("DORA_NODE_ID").wrap_err("env variable DORA_NODE_ID must be set")?; + serde_yaml::from_str(&raw).context("failed to deserialize operator config")? + }; + let communication_config: CommunicationConfig = { + let raw = std::env::var("DORA_COMMUNICATION_CONFIG") + .wrap_err("env variable DORA_COMMUNICATION_CONFIG must be set")?; + serde_yaml::from_str(&raw).context("failed to deserialize communication config")? + }; + let operators: Vec = { + let raw = + std::env::var("DORA_OPERATORS").wrap_err("env variable DORA_OPERATORS must be set")?; + serde_yaml::from_str(&raw).context("failed to deserialize operator config")? + }; + + let mut communication: Box = + communication::init(&communication_config)?; + + let mut operator_events = StreamMap::new(); + let mut operator_stop_publishers = HashMap::new(); + let mut operator_events_tx = HashMap::new(); + + for operator_config in &operators { + let (events_tx, events) = mpsc::channel(1); + let stop_publisher = publisher( + &node_id, + operator_config.id.clone(), + STOP_TOPIC.to_owned().into(), + communication.as_mut(), + ) + .with_context(|| { + format!( + "failed to create stop publisher for operator {}", + operator_config.id + ) + })?; + operator_stop_publishers.insert(operator_config.id.clone(), stop_publisher); + + operator_events.insert(operator_config.id.clone(), ReceiverStream::new(events)); + operator_events_tx.insert(operator_config.id.clone(), events_tx); + } + + let operator_events = operator_events.map(|(id, event)| Event::Operator { id, event }); + let node_id_clone = node_id.clone(); + let tokio_runtime = Builder::new_current_thread() + .enable_all() + .build() + .wrap_err("Could not build a tokio runtime.")?; + let manual_stop_publisher = manual_stop_publisher(communication.as_mut())?; + let stop_thread = std::thread::spawn(move || -> Result<()> { + tokio_runtime.block_on(run( + node_id_clone, + operator_events, + operator_stop_publishers, + manual_stop_publisher, + )) + }); + + for operator_config in &operators { + let events_tx = operator_events_tx.get(&operator_config.id).unwrap(); + spawn_operator( + &node_id, + operator_config.clone(), + events_tx.clone(), + communication.as_mut(), + ) + .wrap_err_with(|| format!("failed to init operator {}", operator_config.id))?; + } + + stop_thread + .join() + .map_err(|err| eyre::eyre!("Stop thread failed with err: {err:#?}"))? + .wrap_err("Stop loop thread failed unexpectedly.")?; + Ok(()) +} + +async fn run( + node_id: NodeId, + mut events: impl Stream + Unpin, + mut operator_stop_publishers: HashMap>, + manual_stop_publisher: Box, +) -> eyre::Result<()> { + #[cfg(feature = "metrics")] + let _started = { + use dora_metrics::init_meter; + use opentelemetry::global; + use opentelemetry_system_metrics::init_process_observer; + + let _started = init_meter(); + let meter = global::meter(Box::leak(node_id.to_string().into_boxed_str())); + init_process_observer(meter); + _started + }; + + let mut stopped_operators = BTreeSet::new(); + + while let Some(event) = events.next().await { + match event { + Event::Operator { id, event } => { + match event { + OperatorEvent::Error(err) => { + bail!(err.wrap_err(format!("operator {id} failed"))) + } + OperatorEvent::Panic(payload) => std::panic::resume_unwind(payload), + OperatorEvent::Finished { reason } => { + if let StopReason::ExplicitStopAll = reason { + let hlc = dora_message::uhlc::HLC::default(); + let metadata = dora_message::Metadata::new(hlc.new_timestamp()); + let data = metadata + .serialize() + .wrap_err("failed to serialize stop message")?; + manual_stop_publisher + .publish(&data) + .map_err(|err| eyre::eyre!(err)) + .wrap_err("failed to send stop message")?; + break; + } + if let Some(stop_publisher) = operator_stop_publishers.remove(&id) { + tracing::info!("operator {node_id}/{id} finished ({reason:?})"); + stopped_operators.insert(id.clone()); + // send stopped message + tokio::task::spawn_blocking(move || stop_publisher.publish(&[])) + .await + .wrap_err("failed to join stop publish task")? + .map_err(|err| eyre::eyre!(err)) + .with_context(|| { + format!( + "failed to send stop message for operator `{node_id}/{id}`" + ) + })?; + if operator_stop_publishers.is_empty() { + break; + } + } else { + tracing::warn!("no stop publisher for {id}"); + } + } + } + } + } + } + + mem::drop(events); + + Ok(()) +} + +fn publisher( + self_id: &NodeId, + operator_id: OperatorId, + output_id: DataId, + communication: &mut dyn CommunicationLayer, +) -> eyre::Result> { + let topic = format!("{self_id}/{operator_id}/{output_id}"); + communication + .publisher(&topic) + .map_err(|err| eyre::eyre!(err)) + .wrap_err_with(|| format!("failed to create publisher for output {output_id}")) +} + +enum Event { + Operator { + id: OperatorId, + event: OperatorEvent, + }, +} + +fn set_up_tracing() -> eyre::Result<()> { + use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt; + + let stdout_log = tracing_subscriber::fmt::layer().pretty(); + let subscriber = tracing_subscriber::Registry::default().with(stdout_log); + tracing::subscriber::set_global_default(subscriber) + .context("failed to set tracing global subscriber") +} diff --git a/binaries/runtime/src/main.rs b/binaries/runtime/src/main.rs index b8d38d84..98a70f34 100644 --- a/binaries/runtime/src/main.rs +++ b/binaries/runtime/src/main.rs @@ -1,178 +1,3 @@ -#![warn(unsafe_op_in_unsafe_fn)] - -use dora_core::{ - config::{CommunicationConfig, DataId, NodeId, OperatorId}, - descriptor::OperatorDefinition, -}; -use dora_node_api::{ - self, - communication::{self, CommunicationLayer, Publisher, STOP_TOPIC}, - manual_stop_publisher, -}; -use eyre::{bail, Context}; -use futures::{Stream, StreamExt}; -use operator::{spawn_operator, OperatorEvent, StopReason}; -use std::{ - collections::{BTreeSet, HashMap}, - mem, -}; -use tokio::sync::mpsc; -use tokio_stream::{wrappers::ReceiverStream, StreamMap}; - -mod operator; - -fn main() -> eyre::Result<()> { - set_up_tracing().context("failed to set up tracing subscriber")?; - - let node_id = { - let raw = - std::env::var("DORA_NODE_ID").wrap_err("env variable DORA_NODE_ID must be set")?; - serde_yaml::from_str(&raw).context("failed to deserialize operator config")? - }; - let communication_config: CommunicationConfig = { - let raw = std::env::var("DORA_COMMUNICATION_CONFIG") - .wrap_err("env variable DORA_COMMUNICATION_CONFIG must be set")?; - serde_yaml::from_str(&raw).context("failed to deserialize communication config")? - }; - let operators: Vec = { - let raw = - std::env::var("DORA_OPERATORS").wrap_err("env variable DORA_OPERATORS must be set")?; - serde_yaml::from_str(&raw).context("failed to deserialize operator config")? - }; - - let mut communication: Box = - communication::init(&communication_config)?; - - let mut operator_events = StreamMap::new(); - let mut operator_stop_publishers = HashMap::new(); - for operator_config in &operators { - let (events_tx, events) = mpsc::channel(1); - spawn_operator( - &node_id, - operator_config.clone(), - events_tx.clone(), - communication.as_mut(), - ) - .wrap_err_with(|| format!("failed to init operator {}", operator_config.id))?; - operator_events.insert(operator_config.id.clone(), ReceiverStream::new(events)); - - let stop_publisher = publisher( - &node_id, - operator_config.id.clone(), - STOP_TOPIC.to_owned().into(), - communication.as_mut(), - ) - .with_context(|| { - format!( - "failed to create stop publisher for operator {}", - operator_config.id - ) - })?; - operator_stop_publishers.insert(operator_config.id.clone(), stop_publisher); - } - - let operator_events = operator_events.map(|(id, event)| Event::Operator { id, event }); - - tokio::runtime::Runtime::new()?.block_on(run( - node_id, - operator_events, - operator_stop_publishers, - communication.as_mut(), - )) -} - -async fn run( - node_id: NodeId, - mut events: impl Stream + Unpin, - mut operator_stop_publishers: HashMap>, - communication: &mut dyn CommunicationLayer, -) -> eyre::Result<()> { - #[cfg(feature = "metrics")] - let _started = { - use dora_metrics::init_meter; - use opentelemetry::global; - use opentelemetry_system_metrics::init_process_observer; - - let _started = init_meter(); - let meter = global::meter(Box::leak(node_id.to_string().into_boxed_str())); - init_process_observer(meter); - _started - }; - - let mut stopped_operators = BTreeSet::new(); - - while let Some(event) = events.next().await { - match event { - Event::Operator { id, event } => { - match event { - OperatorEvent::Error(err) => { - bail!(err.wrap_err(format!("operator {id} failed"))) - } - OperatorEvent::Panic(payload) => std::panic::resume_unwind(payload), - OperatorEvent::Finished { reason } => { - if let StopReason::ExplicitStopAll = reason { - let manual_stop_publisher = manual_stop_publisher(communication)?; - tokio::task::spawn_blocking(manual_stop_publisher) - .await - .wrap_err("failed to join stop publish task")? - .map_err(|err| eyre::eyre!(err)) - .wrap_err("failed to send stop message")?; - } - if let Some(stop_publisher) = operator_stop_publishers.remove(&id) { - tracing::info!("operator {node_id}/{id} finished ({reason:?})"); - stopped_operators.insert(id.clone()); - // send stopped message - tokio::task::spawn_blocking(move || stop_publisher.publish(&[])) - .await - .wrap_err("failed to join stop publish task")? - .map_err(|err| eyre::eyre!(err)) - .with_context(|| { - format!( - "failed to send stop message for operator `{node_id}/{id}`" - ) - })?; - if operator_stop_publishers.is_empty() { - break; - } - } else { - tracing::warn!("no stop publisher for {id}"); - } - } - } - } - } - } - - mem::drop(events); - - Ok(()) -} - -fn publisher( - self_id: &NodeId, - operator_id: OperatorId, - output_id: DataId, - communication: &mut dyn CommunicationLayer, -) -> eyre::Result> { - let topic = format!("{self_id}/{operator_id}/{output_id}"); - communication - .publisher(&topic) - .map_err(|err| eyre::eyre!(err)) - .wrap_err_with(|| format!("failed to create publisher for output {output_id}")) -} - -enum Event { - Operator { - id: OperatorId, - event: OperatorEvent, - }, -} - -fn set_up_tracing() -> eyre::Result<()> { - use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt; - - let stdout_log = tracing_subscriber::fmt::layer().pretty(); - let subscriber = tracing_subscriber::Registry::default().with(stdout_log); - tracing::subscriber::set_global_default(subscriber) - .context("failed to set tracing global subscriber") +fn main() -> Result<(), eyre::Report> { + dora_runtime::main() } diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index 189346f0..b4849cca 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -23,7 +23,6 @@ use std::{ panic::{catch_unwind, AssertUnwindSafe}, path::Path, sync::Arc, - thread, }; use tokio::sync::mpsc::Sender; @@ -51,7 +50,7 @@ pub fn spawn( let path = if source_is_url(source) { let target_path = Path::new("build") .join(node_id.to_string()) - .join(format!("{}.py", operator_id.to_string())); + .join(format!("{}.py", operator_id)); // try to download the shared library let rt = tokio::runtime::Builder::new_current_thread() .enable_all() @@ -170,24 +169,22 @@ pub fn spawn( Result::<_, eyre::Report>::Ok(reason) }; - thread::spawn(move || { - let closure = AssertUnwindSafe(|| { - python_runner() - .wrap_err_with(|| format!("error in Python module at {}", path_cloned.display())) - }); + let closure = AssertUnwindSafe(|| { + python_runner() + .wrap_err_with(|| format!("error in Python module at {}", path_cloned.display())) + }); - match catch_unwind(closure) { - Ok(Ok(reason)) => { - let _ = events_tx.blocking_send(OperatorEvent::Finished { reason }); - } - Ok(Err(err)) => { - let _ = events_tx.blocking_send(OperatorEvent::Error(err)); - } - Err(panic) => { - let _ = events_tx.blocking_send(OperatorEvent::Panic(panic)); - } + match catch_unwind(closure) { + Ok(Ok(reason)) => { + let _ = events_tx.blocking_send(OperatorEvent::Finished { reason }); } - }); + Ok(Err(err)) => { + let _ = events_tx.blocking_send(OperatorEvent::Error(err)); + } + Err(panic) => { + let _ = events_tx.blocking_send(OperatorEvent::Panic(panic)); + } + } Ok(()) } @@ -205,11 +202,10 @@ mod callback_impl { use super::SendOutputCallback; use dora_message::Metadata; use dora_operator_api_python::pydict_to_metadata; - use eyre::{eyre, Context}; + use eyre::{eyre, Context, Result}; use pyo3::{ pymethods, types::{PyBytes, PyDict}, - PyResult, }; #[pymethods] @@ -219,27 +215,27 @@ mod callback_impl { output: &str, data: &PyBytes, metadata: Option<&PyDict>, - ) -> PyResult<()> { + ) -> Result<()> { + let data = data.as_bytes(); + let parameters = pydict_to_metadata(metadata).wrap_err("Could not parse metadata.")?; + let metadata = Metadata::from_parameters(self.hlc.new_timestamp(), parameters); + let mut message = metadata + .serialize() + .context(format!("failed to serialize `{}` metadata", output))?; + match self.publishers.get(output) { Some(publisher) => { - let parameters = pydict_to_metadata(metadata)?; - let metadata = Metadata::from_parameters(self.hlc.new_timestamp(), parameters); - let message = metadata - .serialize() - .context(format!("failed to serialize `{}` metadata", output)); - message.and_then(|mut message| { - message.extend_from_slice(data.as_bytes()); - publisher - .publish(&message) - .map_err(|err| eyre::eyre!(err)) - .context("publish failed") - }) + message.extend_from_slice(data); + + publisher + .publish(&message) + .map_err(|err| eyre::eyre!(err)) + .context("publish failed") } None => Err(eyre!( "unexpected output {output} (not defined in dataflow config)" )), } - .map_err(|err| err.into()) } } } diff --git a/examples/iceoryx/node/Cargo.toml b/examples/iceoryx/node/Cargo.toml index dafe3892..66bea6e5 100644 --- a/examples/iceoryx/node/Cargo.toml +++ b/examples/iceoryx/node/Cargo.toml @@ -1,11 +1,11 @@ [package] name = "iceoryx-example-node" -version = "0.1.0" +version = "0.1.1" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -dora-node-api = { version = "0.1.0", path = "../../../apis/rust/node" } +dora-node-api = { version = "0.1.1", path = "../../../apis/rust/node" } eyre = "0.6.8" rand = "0.8.5" diff --git a/examples/iceoryx/operator/Cargo.toml b/examples/iceoryx/operator/Cargo.toml index 474d6103..eb5f3032 100644 --- a/examples/iceoryx/operator/Cargo.toml +++ b/examples/iceoryx/operator/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "iceoryx-example-operator" -version = "0.1.0" +version = "0.1.1" edition = "2021" license = "Apache-2.0" diff --git a/examples/iceoryx/sink/Cargo.toml b/examples/iceoryx/sink/Cargo.toml index fa48b258..4f3b049e 100644 --- a/examples/iceoryx/sink/Cargo.toml +++ b/examples/iceoryx/sink/Cargo.toml @@ -1,12 +1,12 @@ [package] name = "iceoryx-example-sink" -version = "0.1.0" +version = "0.1.1" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -dora-node-api = { version = "0.1.0", path = "../../../apis/rust/node" } +dora-node-api = { version = "0.1.1", path = "../../../apis/rust/node" } eyre = "0.6.8" futures = "0.3.21" tokio = { version = "1.20.1", features = ["macros"] } diff --git a/examples/python-dataflow/object_detection.py b/examples/python-dataflow/object_detection.py index efd0adc6..05c66b1b 100644 --- a/examples/python-dataflow/object_detection.py +++ b/examples/python-dataflow/object_detection.py @@ -34,7 +34,6 @@ class Operator: frame = np.frombuffer(dora_input["data"], dtype="uint8") frame = cv2.imdecode(frame, -1) frame = frame[:, :, ::-1] # OpenCV image (BGR to RGB) - results = self.model(frame) # includes NMS arrays = np.array(results.xyxy[0].cpu()).tobytes() send_output("bbox", arrays, dora_input["metadata"]) diff --git a/examples/python-dataflow/plot.py b/examples/python-dataflow/plot.py index 727494c6..57a2a293 100644 --- a/examples/python-dataflow/plot.py +++ b/examples/python-dataflow/plot.py @@ -24,6 +24,7 @@ class Operator: def __init__(self): self.image = [] + self.bboxs = [] def on_input( self, @@ -45,39 +46,39 @@ class Operator: elif dora_input["id"] == "bbox" and len(self.image) != 0: bboxs = np.frombuffer(dora_input["data"], dtype="float32") - bboxs = np.reshape(bboxs, (-1, 6)) - for bbox in bboxs: - [ - min_x, - min_y, - max_x, - max_y, - confidence, - label, - ] = bbox - cv2.rectangle( - self.image, - (int(min_x), int(min_y)), - (int(max_x), int(max_y)), - (0, 255, 0), - 2, - ) - - cv2.putText( - self.image, - LABELS[int(label)] + f", {confidence:0.2f}", - (int(max_x), int(max_y)), - font, - 0.75, - (0, 255, 0), - 2, - 1, - ) - - if CI != "true": - cv2.imshow("frame", self.image) - if cv2.waitKey(1) & 0xFF == ord("q"): - return DoraStatus.STOP + self.bboxs = np.reshape(bboxs, (-1, 6)) + for bbox in self.bboxs: + [ + min_x, + min_y, + max_x, + max_y, + confidence, + label, + ] = bbox + cv2.rectangle( + self.image, + (int(min_x), int(min_y)), + (int(max_x), int(max_y)), + (0, 255, 0), + 2, + ) + + cv2.putText( + self.image, + LABELS[int(label)] + f", {confidence:0.2f}", + (int(max_x), int(max_y)), + font, + 0.75, + (0, 255, 0), + 2, + 1, + ) + + if CI != "true": + cv2.imshow("frame", self.image) + if cv2.waitKey(1) & 0xFF == ord("q"): + return DoraStatus.STOP return DoraStatus.CONTINUE diff --git a/examples/python-dataflow/webcam.py b/examples/python-dataflow/webcam.py index f64849fa..a44d776a 100755 --- a/examples/python-dataflow/webcam.py +++ b/examples/python-dataflow/webcam.py @@ -13,7 +13,7 @@ video_capture = cv2.VideoCapture(0) start = time.time() # Run for 20 seconds -while time.time() - start < 20: +while time.time() - start < 10: # Wait next dora_input node.next() ret, frame = video_capture.read() diff --git a/examples/rust-dataflow-url/sink/Cargo.toml b/examples/rust-dataflow-url/sink/Cargo.toml index e80b5a61..f98c5d28 100644 --- a/examples/rust-dataflow-url/sink/Cargo.toml +++ b/examples/rust-dataflow-url/sink/Cargo.toml @@ -1,10 +1,10 @@ [package] name = "rust-dataflow-example-sink" -version = "0.1.0" +version = "0.1.1" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -dora-node-api = { version = "0.1.0", path = "../../../apis/rust/node" } +dora-node-api = { version = "0.1.1", path = "../../../apis/rust/node" } eyre = "0.6.8" diff --git a/examples/rust-dataflow/node/Cargo.toml b/examples/rust-dataflow/node/Cargo.toml index d7bcd656..71f91475 100644 --- a/examples/rust-dataflow/node/Cargo.toml +++ b/examples/rust-dataflow/node/Cargo.toml @@ -1,12 +1,12 @@ [package] name = "rust-dataflow-example-node" -version = "0.1.0" +version = "0.1.1" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -dora-node-api = { version = "0.1.0", path = "../../../apis/rust/node" } +dora-node-api = { version = "0.1.1", path = "../../../apis/rust/node" } eyre = "0.6.8" futures = "0.3.21" rand = "0.8.5" diff --git a/examples/rust-dataflow/operator/Cargo.toml b/examples/rust-dataflow/operator/Cargo.toml index edcac2b8..e18c676c 100644 --- a/examples/rust-dataflow/operator/Cargo.toml +++ b/examples/rust-dataflow/operator/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rust-dataflow-example-operator" -version = "0.1.0" +version = "0.1.1" edition = "2021" license = "Apache-2.0" diff --git a/examples/rust-dataflow/sink/Cargo.toml b/examples/rust-dataflow/sink/Cargo.toml index e80b5a61..f98c5d28 100644 --- a/examples/rust-dataflow/sink/Cargo.toml +++ b/examples/rust-dataflow/sink/Cargo.toml @@ -1,10 +1,10 @@ [package] name = "rust-dataflow-example-sink" -version = "0.1.0" +version = "0.1.1" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -dora-node-api = { version = "0.1.0", path = "../../../apis/rust/node" } +dora-node-api = { version = "0.1.1", path = "../../../apis/rust/node" } eyre = "0.6.8" diff --git a/libraries/communication-layer/pub-sub/Cargo.toml b/libraries/communication-layer/pub-sub/Cargo.toml index b246998d..525e5093 100644 --- a/libraries/communication-layer/pub-sub/Cargo.toml +++ b/libraries/communication-layer/pub-sub/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "communication-layer-pub-sub" -version = "0.1.0" +version = "0.1.1" edition = "2021" [features] diff --git a/libraries/communication-layer/request-reply/Cargo.toml b/libraries/communication-layer/request-reply/Cargo.toml index 8f4a9392..e580b981 100644 --- a/libraries/communication-layer/request-reply/Cargo.toml +++ b/libraries/communication-layer/request-reply/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "communication-layer-request-reply" -version = "0.1.0" +version = "0.1.1" edition = "2021" [features] diff --git a/libraries/communication-layer/request-reply/src/tcp.rs b/libraries/communication-layer/request-reply/src/tcp.rs index 515b5684..a8ffb622 100644 --- a/libraries/communication-layer/request-reply/src/tcp.rs +++ b/libraries/communication-layer/request-reply/src/tcp.rs @@ -125,7 +125,7 @@ impl TcpConnection { fn send(&mut self, request: &[u8]) -> std::io::Result<()> { let len_raw = (request.len() as u64).to_le_bytes(); self.stream.write_all(&len_raw)?; - self.stream.write_all(&request)?; + self.stream.write_all(request)?; Ok(()) } diff --git a/libraries/core/Cargo.toml b/libraries/core/Cargo.toml index 4c5dcf61..01ccd3eb 100644 --- a/libraries/core/Cargo.toml +++ b/libraries/core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dora-core" -version = "0.1.0" +version = "0.1.1" edition = "2021" license = "Apache-2.0" diff --git a/libraries/extensions/download/Cargo.toml b/libraries/extensions/download/Cargo.toml index 5ae30c93..ad851bc6 100644 --- a/libraries/extensions/download/Cargo.toml +++ b/libraries/extensions/download/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dora-download" -version = "0.1.0" +version = "0.1.1" edition = "2021" license = "Apache-2.0" diff --git a/libraries/extensions/telemetry/metrics/Cargo.toml b/libraries/extensions/telemetry/metrics/Cargo.toml index 00f83ca5..77c1620d 100644 --- a/libraries/extensions/telemetry/metrics/Cargo.toml +++ b/libraries/extensions/telemetry/metrics/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dora-metrics" -version = "0.1.0" +version = "0.1.1" edition = "2021" license = "Apache-2.0" diff --git a/libraries/extensions/telemetry/tracing/Cargo.toml b/libraries/extensions/telemetry/tracing/Cargo.toml index ee855913..ccc59e57 100644 --- a/libraries/extensions/telemetry/tracing/Cargo.toml +++ b/libraries/extensions/telemetry/tracing/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dora-tracing" -version = "0.1.0" +version = "0.1.1" edition = "2021" license = "Apache-2.0" diff --git a/libraries/extensions/zenoh-logger/Cargo.toml b/libraries/extensions/zenoh-logger/Cargo.toml index 1ce72e50..1782b11a 100644 --- a/libraries/extensions/zenoh-logger/Cargo.toml +++ b/libraries/extensions/zenoh-logger/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "zenoh-logger" -version = "0.1.0" +version = "0.1.1" edition = "2021" license = "Apache-2.0" diff --git a/libraries/message/Cargo.toml b/libraries/message/Cargo.toml index 462c4fd5..98c941ce 100644 --- a/libraries/message/Cargo.toml +++ b/libraries/message/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dora-message" -version = "0.1.0" +version = "0.1.1" edition = "2021" license = "Apache-2.0"