| @@ -14,13 +14,13 @@ jobs: | |||
| runs-on: ubuntu-latest | |||
| steps: | |||
| - uses: actions/checkout@v3 | |||
| - name: Install Cap'n Proto and libacl-dev | |||
| - name: Install libacl-dev | |||
| run: | | |||
| export DEBIAN_FRONTEND=noninteractive | |||
| sudo apt-get install -y capnproto libcapnp-dev libacl1-dev | |||
| sudo apt-get install -y libacl1-dev | |||
| - uses: actions/setup-python@v2 | |||
| with: | |||
| python-version: 3.8.10 | |||
| python-version: 3.8 | |||
| - uses: r7kamura/rust-problem-matchers@v1.1.0 | |||
| - run: cargo --version --verbose | |||
| @@ -16,24 +16,11 @@ jobs: | |||
| timeout-minutes: 30 | |||
| steps: | |||
| - uses: actions/checkout@v3 | |||
| - name: Install Cap'n Proto and libacl-dev (Linux) | |||
| - name: Install libacl-dev (Linux) | |||
| if: runner.os == 'Linux' | |||
| run: | | |||
| export DEBIAN_FRONTEND=noninteractive | |||
| sudo apt-get install -y capnproto libcapnp-dev libacl1-dev | |||
| - name: Install Cap'n Proto (macOS) | |||
| if: runner.os == 'macOS' | |||
| run: brew install capnp | |||
| env: | |||
| HOMEBREW_NO_AUTO_UPDATE: 1 | |||
| HOMEBREW_NO_BOTTLE_SOURCE_FALLBACK: 1 | |||
| HOMEBREW_NO_INSTALL_CLEANUP: 1 | |||
| - name: Install Cap'n Proto (Windows) | |||
| if: runner.os == 'Windows' | |||
| shell: pwsh | |||
| run: | | |||
| choco install capnproto | |||
| echo "$Env:Programfiles\capnproto" | Out-File -FilePath $env:GITHUB_PATH -Encoding utf8 -Append | |||
| sudo apt-get install -y libacl1-dev | |||
| - uses: r7kamura/rust-problem-matchers@v1.1.0 | |||
| - run: cargo --version --verbose | |||
| @@ -56,24 +43,11 @@ jobs: | |||
| steps: | |||
| - uses: actions/checkout@v3 | |||
| - name: Install Cap'n Proto and libacl-dev (Linux) | |||
| - name: Install libacl-dev (Linux) | |||
| if: runner.os == 'Linux' | |||
| run: | | |||
| export DEBIAN_FRONTEND=noninteractive | |||
| sudo apt-get install -y capnproto libcapnp-dev libacl1-dev | |||
| - name: Install Cap'n Proto (macOS) | |||
| if: runner.os == 'macOS' | |||
| run: brew install capnp | |||
| env: | |||
| HOMEBREW_NO_AUTO_UPDATE: 1 | |||
| HOMEBREW_NO_BOTTLE_SOURCE_FALLBACK: 1 | |||
| HOMEBREW_NO_INSTALL_CLEANUP: 1 | |||
| - name: Install Cap'n Proto (Windows) | |||
| if: runner.os == 'Windows' | |||
| shell: pwsh | |||
| run: | | |||
| choco install capnproto | |||
| echo "$Env:Programfiles\capnproto" | Out-File -FilePath $env:GITHUB_PATH -Encoding utf8 -Append | |||
| sudo apt-get install -y libacl1-dev | |||
| - uses: r7kamura/rust-problem-matchers@v1.1.0 | |||
| - run: cargo --version --verbose | |||
| @@ -108,11 +82,11 @@ jobs: | |||
| steps: | |||
| - uses: actions/checkout@v3 | |||
| - name: Install Cap'n Proto and libacl-dev (Linux) | |||
| - name: Install libacl-dev (Linux) | |||
| if: runner.os == 'Linux' | |||
| run: | | |||
| export DEBIAN_FRONTEND=noninteractive | |||
| sudo apt-get install -y capnproto libcapnp-dev libacl1-dev | |||
| sudo apt-get install -y libacl1-dev | |||
| - uses: r7kamura/rust-problem-matchers@v1.1.0 | |||
| - run: cargo --version --verbose | |||
| @@ -127,10 +101,10 @@ jobs: | |||
| runs-on: ubuntu-latest | |||
| steps: | |||
| - uses: actions/checkout@v3 | |||
| - name: Install Cap'n Proto and libacl-dev | |||
| - name: Install libacl-dev | |||
| run: | | |||
| export DEBIAN_FRONTEND=noninteractive | |||
| sudo apt-get install -y capnproto libcapnp-dev libacl1-dev | |||
| sudo apt-get install -y libacl1-dev | |||
| - uses: r7kamura/rust-problem-matchers@v1.1.0 | |||
| - run: cargo --version --verbose | |||
| @@ -0,0 +1,52 @@ | |||
| name: Pypi Release | |||
| permissions: | |||
| contents: write | |||
| on: | |||
| release: | |||
| types: | |||
| - "published" | |||
| jobs: | |||
| release: | |||
| name: "Pypi Release" | |||
| strategy: | |||
| matrix: | |||
| platform: [ubuntu-latest, ubuntu-20.04] | |||
| python-version: ["3.7"] | |||
| fail-fast: false | |||
| runs-on: ${{ matrix.platform }} | |||
| steps: | |||
| - uses: actions/checkout@v3 | |||
| - name: Install libacl-dev (Linux) | |||
| if: runner.os == 'Linux' | |||
| run: | | |||
| export DEBIAN_FRONTEND=noninteractive | |||
| sudo apt-get install -y libacl1-dev | |||
| - uses: r7kamura/rust-problem-matchers@v1.1.0 | |||
| # Publish Dora Node Python API | |||
| - name: Set up Python | |||
| uses: actions/setup-python@v2 | |||
| with: | |||
| python-version: ${{ matrix.python-version }} | |||
| - name: Install dependencies | |||
| run: | | |||
| python -m pip install --upgrade pip | |||
| pip install maturin==0.14 | |||
| pip install patchelf --upgrade | |||
| - name: Publish wheel | |||
| shell: bash | |||
| env: | |||
| MATURIN_PASSWORD: ${{ secrets.PYPI_PASS }} | |||
| run: | | |||
| cd apis/python/node | |||
| maturin publish \ | |||
| --skip-existing \ | |||
| -o wheels \ | |||
| --no-sdist \ | |||
| --username __token__ \ | |||
| @@ -14,7 +14,7 @@ jobs: | |||
| strategy: | |||
| matrix: | |||
| platform: [ubuntu-latest, macos-latest, windows-latest] | |||
| platform: [ubuntu-20.04, ubuntu-22.04, macos-12, windows-2022] | |||
| python-version: ["3.7"] | |||
| fail-fast: false | |||
| runs-on: ${{ matrix.platform }} | |||
| @@ -22,24 +22,11 @@ jobs: | |||
| steps: | |||
| - uses: actions/checkout@v3 | |||
| - name: Install Cap'n Proto and libacl-dev (Linux) | |||
| - name: Install libacl-dev (Linux) | |||
| if: runner.os == 'Linux' | |||
| run: | | |||
| export DEBIAN_FRONTEND=noninteractive | |||
| sudo apt-get install -y capnproto libcapnp-dev libacl1-dev | |||
| - name: Install Cap'n Proto (macOS) | |||
| if: runner.os == 'macOS' | |||
| run: brew install capnp | |||
| env: | |||
| HOMEBREW_NO_AUTO_UPDATE: 1 | |||
| HOMEBREW_NO_BOTTLE_SOURCE_FALLBACK: 1 | |||
| HOMEBREW_NO_INSTALL_CLEANUP: 1 | |||
| - name: Install Cap'n Proto (Windows) | |||
| if: runner.os == 'Windows' | |||
| shell: pwsh | |||
| run: | | |||
| choco install capnproto | |||
| echo "$Env:Programfiles\capnproto" | Out-File -FilePath $env:GITHUB_PATH -Encoding utf8 -Append | |||
| sudo apt-get install -y libacl1-dev | |||
| - uses: r7kamura/rust-problem-matchers@v1.1.0 | |||
| @@ -78,27 +65,5 @@ jobs: | |||
| with: | |||
| upload_url: ${{ github.event.release.upload_url }} | |||
| asset_path: archive.zip | |||
| asset_name: dora-${{ github.ref_name }}-x86_64-${{ runner.os }}.zip | |||
| asset_name: dora-${{ github.ref_name }}-x86_64-${{ matrix.platform }}.zip | |||
| asset_content_type: application/zip | |||
| # Publish Dora Node Python API | |||
| - name: Set up Python | |||
| uses: actions/setup-python@v2 | |||
| with: | |||
| python-version: ${{ matrix.python-version }} | |||
| - name: Install dependencies | |||
| run: | | |||
| python -m pip install --upgrade pip | |||
| pip install maturin==0.13.2 | |||
| - name: Publish wheel | |||
| shell: bash | |||
| env: | |||
| MATURIN_PASSWORD: ${{ secrets.PYPI_PASS }} | |||
| run: | | |||
| cd apis/python/node | |||
| maturin publish \ | |||
| --no-sdist \ | |||
| --skip-existing \ | |||
| -o wheels \ | |||
| -i python \ | |||
| --username __token__ \ | |||
| @@ -541,7 +541,7 @@ dependencies = [ | |||
| [[package]] | |||
| name = "communication-layer-pub-sub" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| dependencies = [ | |||
| "eyre", | |||
| "iceoryx-rs", | |||
| @@ -552,7 +552,7 @@ dependencies = [ | |||
| [[package]] | |||
| name = "communication-layer-request-reply" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| dependencies = [ | |||
| "eyre", | |||
| ] | |||
| @@ -903,7 +903,7 @@ dependencies = [ | |||
| [[package]] | |||
| name = "dora-cli" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| dependencies = [ | |||
| "atty", | |||
| "clap 4.0.3", | |||
| @@ -923,7 +923,7 @@ dependencies = [ | |||
| [[package]] | |||
| name = "dora-coordinator" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| dependencies = [ | |||
| "bincode", | |||
| "clap 3.2.20", | |||
| @@ -953,7 +953,7 @@ dependencies = [ | |||
| [[package]] | |||
| name = "dora-core" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| dependencies = [ | |||
| "dora-message", | |||
| "eyre", | |||
| @@ -988,7 +988,7 @@ dependencies = [ | |||
| [[package]] | |||
| name = "dora-download" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| dependencies = [ | |||
| "eyre", | |||
| "reqwest", | |||
| @@ -1009,7 +1009,7 @@ dependencies = [ | |||
| [[package]] | |||
| name = "dora-message" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| dependencies = [ | |||
| "capnp", | |||
| "capnpc", | |||
| @@ -1019,7 +1019,7 @@ dependencies = [ | |||
| [[package]] | |||
| name = "dora-metrics" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| dependencies = [ | |||
| "futures", | |||
| "opentelemetry", | |||
| @@ -1030,7 +1030,7 @@ dependencies = [ | |||
| [[package]] | |||
| name = "dora-node-api" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| dependencies = [ | |||
| "capnp", | |||
| "dora-core", | |||
| @@ -1051,7 +1051,7 @@ dependencies = [ | |||
| [[package]] | |||
| name = "dora-node-api-c" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| dependencies = [ | |||
| "dora-node-api", | |||
| "eyre", | |||
| @@ -1061,7 +1061,7 @@ dependencies = [ | |||
| [[package]] | |||
| name = "dora-node-api-cxx" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| dependencies = [ | |||
| "cxx", | |||
| "cxx-build", | |||
| @@ -1071,10 +1071,11 @@ dependencies = [ | |||
| [[package]] | |||
| name = "dora-node-api-python" | |||
| version = "0.1.0" | |||
| version = "0.1.1-2" | |||
| dependencies = [ | |||
| "dora-node-api", | |||
| "dora-operator-api-python", | |||
| "dora-runtime", | |||
| "eyre", | |||
| "flume", | |||
| "pyo3", | |||
| @@ -1083,7 +1084,7 @@ dependencies = [ | |||
| [[package]] | |||
| name = "dora-operator-api" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| dependencies = [ | |||
| "dora-operator-api-macros", | |||
| "dora-operator-api-types", | |||
| @@ -1091,14 +1092,14 @@ dependencies = [ | |||
| [[package]] | |||
| name = "dora-operator-api-c" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| dependencies = [ | |||
| "dora-operator-api-types", | |||
| ] | |||
| [[package]] | |||
| name = "dora-operator-api-cxx" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| dependencies = [ | |||
| "cxx", | |||
| "cxx-build", | |||
| @@ -1111,7 +1112,7 @@ dependencies = [ | |||
| [[package]] | |||
| name = "dora-operator-api-macros" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| dependencies = [ | |||
| "dora-operator-api", | |||
| "dora-operator-api-types", | |||
| @@ -1122,7 +1123,7 @@ dependencies = [ | |||
| [[package]] | |||
| name = "dora-operator-api-python" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| dependencies = [ | |||
| "dora-node-api", | |||
| "eyre", | |||
| @@ -1133,14 +1134,14 @@ dependencies = [ | |||
| [[package]] | |||
| name = "dora-operator-api-types" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| dependencies = [ | |||
| "safer-ffi", | |||
| ] | |||
| [[package]] | |||
| name = "dora-runtime" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| dependencies = [ | |||
| "clap 3.2.20", | |||
| "dora-core", | |||
| @@ -1171,7 +1172,7 @@ dependencies = [ | |||
| [[package]] | |||
| name = "dora-tracing" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| dependencies = [ | |||
| "opentelemetry", | |||
| "opentelemetry-jaeger", | |||
| @@ -1717,7 +1718,7 @@ dependencies = [ | |||
| [[package]] | |||
| name = "iceoryx-example-node" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| dependencies = [ | |||
| "dora-node-api", | |||
| "eyre", | |||
| @@ -1726,14 +1727,14 @@ dependencies = [ | |||
| [[package]] | |||
| name = "iceoryx-example-operator" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| dependencies = [ | |||
| "dora-operator-api", | |||
| ] | |||
| [[package]] | |||
| name = "iceoryx-example-sink" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| dependencies = [ | |||
| "dora-node-api", | |||
| "eyre", | |||
| @@ -2271,9 +2272,9 @@ dependencies = [ | |||
| [[package]] | |||
| name = "ndk-sys" | |||
| version = "0.4.0" | |||
| version = "0.4.1+23.1.7779620" | |||
| source = "registry+https://github.com/rust-lang/crates.io-index" | |||
| checksum = "21d83ec9c63ec5bf950200a8e508bdad6659972187b625469f58ef8c08e29046" | |||
| checksum = "3cf2aae958bd232cac5069850591667ad422d263686d75b52a065f9badeee5a3" | |||
| dependencies = [ | |||
| "jni-sys", | |||
| ] | |||
| @@ -3309,7 +3310,7 @@ dependencies = [ | |||
| [[package]] | |||
| name = "rust-dataflow-example-node" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| dependencies = [ | |||
| "dora-node-api", | |||
| "eyre", | |||
| @@ -3320,14 +3321,14 @@ dependencies = [ | |||
| [[package]] | |||
| name = "rust-dataflow-example-operator" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| dependencies = [ | |||
| "dora-operator-api", | |||
| ] | |||
| [[package]] | |||
| name = "rust-dataflow-example-sink" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| dependencies = [ | |||
| "dora-node-api", | |||
| "eyre", | |||
| @@ -4997,7 +4998,7 @@ dependencies = [ | |||
| [[package]] | |||
| name = "zenoh-logger" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| dependencies = [ | |||
| "zenoh", | |||
| "zenoh-config", | |||
| @@ -12,6 +12,8 @@ Dataflow Oriented Robotic Architecture ⚡ | |||
| This project is in early development, and many features have yet to be implemented with breaking changes. Please don't take for granted the current design. | |||
| `dora` primary support is with `Linux` ( Ubuntu 20.04 and Ubuntu 22.04 ) as it is the primary OS for both Cloud and small computers. If you wish to use `dora` with another OS, please compile from source. | |||
| --- | |||
| ## 📖 Documentation | |||
| @@ -28,34 +30,11 @@ For linux | |||
| ```bash | |||
| wget https://github.com/dora-rs/dora/releases/download/<version>/dora-<version>-x86_64-Linux.zip | |||
| unzip dora-<version>-x86_64-Linux.zip | |||
| python3 -m pip install dora-rs==<version> | |||
| PATH=$PATH:$(pwd):$(pwd)/iceoryx | |||
| dora --help | |||
| ``` | |||
| <details> | |||
| <summary> For Macos </summary> | |||
| ```bash | |||
| wget https://github.com/dora-rs/dora/releases/download/<version>/dora-<version>-x86_64-macOS.zip | |||
| unzip dora-<version>-x86_64-macOS.zip | |||
| PATH=$PATH:$(pwd):$(pwd)/iceoryx | |||
| dora --help | |||
| ``` | |||
| </details> | |||
| <details> | |||
| <summary> For Windows </summary> | |||
| ```bash | |||
| wget https://github.com/dora-rs/dora/releases/download/<version>/dora-<version>-x86_64-Windows.zip | |||
| unzip dora-<version>-x86_64-Windows.zip | |||
| PATH=$PATH:$(pwd):$(pwd)/iceoryx | |||
| dora --help | |||
| ``` | |||
| </details> | |||
| > This is `x86_64` only for the moment. | |||
| 2. Create a new dataflow | |||
| @@ -1,6 +1,6 @@ | |||
| [package] | |||
| name = "dora-node-api-cxx" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| edition = "2021" | |||
| # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | |||
| @@ -10,7 +10,7 @@ crate-type = ["staticlib"] | |||
| [dependencies] | |||
| cxx = "1.0.73" | |||
| dora-node-api = { version = "0.1.0", path = "../../../apis/rust/node", default-features = false } | |||
| dora-node-api = { version = "0.1.1", path = "../../../apis/rust/node", default-features = false } | |||
| eyre = "0.6.8" | |||
| [build-dependencies] | |||
| @@ -1,6 +1,6 @@ | |||
| [package] | |||
| name = "dora-operator-api-cxx" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| edition = "2021" | |||
| [lib] | |||
| @@ -8,7 +8,7 @@ crate-type = ["staticlib"] | |||
| [dependencies] | |||
| cxx = "1.0.73" | |||
| dora-operator-api = { version = "0.1.0", path = "../../../apis/rust/operator" } | |||
| dora-operator-api = { version = "0.1.1", path = "../../../apis/rust/operator" } | |||
| eyre = "0.6.8" | |||
| futures = "0.3.21" | |||
| rand = "0.8.5" | |||
| @@ -1,6 +1,6 @@ | |||
| [package] | |||
| name = "dora-node-api-c" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| edition = "2021" | |||
| license = "Apache-2.0" | |||
| @@ -1,6 +1,6 @@ | |||
| [package] | |||
| name = "dora-operator-api-c" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| edition = "2021" | |||
| license = "Apache-2.0" | |||
| description = "C API implemetation for Dora Operator" | |||
| @@ -1,6 +1,6 @@ | |||
| [package] | |||
| name = "dora-node-api-python" | |||
| version = "0.1.0" | |||
| version = "0.1.1-2" | |||
| edition = "2021" | |||
| license = "Apache-2.0" | |||
| @@ -13,6 +13,7 @@ pyo3 = { version = "0.16", features = ["eyre", "abi3-py37"] } | |||
| eyre = "0.6" | |||
| serde_yaml = "0.8.23" | |||
| flume = "0.10.14" | |||
| dora-runtime = { path = "../../../binaries/runtime" } | |||
| [lib] | |||
| name = "dora" | |||
| @@ -64,7 +64,7 @@ impl Node { | |||
| data: &PyBytes, | |||
| metadata: Option<&PyDict>, | |||
| ) -> Result<()> { | |||
| let data = &data.as_bytes(); | |||
| let data = data.as_bytes(); | |||
| let metadata = pydict_to_metadata(metadata)?; | |||
| self.node | |||
| .send_output(&output_id.into(), metadata, data.len(), |out| { | |||
| @@ -78,8 +78,17 @@ impl Node { | |||
| } | |||
| } | |||
| #[pyfunction] | |||
| fn start_runtime() -> Result<()> { | |||
| dora_runtime::main() | |||
| .wrap_err("Python Dora Runtime failed.") | |||
| .unwrap(); | |||
| Ok(()) | |||
| } | |||
| #[pymodule] | |||
| fn dora(_py: Python, m: &PyModule) -> PyResult<()> { | |||
| m.add_function(wrap_pyfunction!(start_runtime, m)?)?; | |||
| m.add_class::<Node>().unwrap(); | |||
| Ok(()) | |||
| } | |||
| @@ -1,6 +1,6 @@ | |||
| [package] | |||
| name = "dora-operator-api-python" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| edition = "2021" | |||
| license = "Apache-2.0" | |||
| @@ -1,6 +1,6 @@ | |||
| [package] | |||
| name = "dora-node-api" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| edition = "2021" | |||
| license = "Apache-2.0" | |||
| @@ -1,6 +1,6 @@ | |||
| [package] | |||
| name = "dora-operator-api" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| edition = "2021" | |||
| license = "Apache-2.0" | |||
| description = "Rust API implemetation for Dora Operator" | |||
| @@ -1,6 +1,6 @@ | |||
| [package] | |||
| name = "dora-operator-api-macros" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| edition = "2021" | |||
| license = "Apache-2.0" | |||
| description = "Rust API Macros for Dora Operator" | |||
| @@ -1,6 +1,6 @@ | |||
| [package] | |||
| name = "dora-operator-api-types" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| edition = "2021" | |||
| # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | |||
| @@ -1,6 +1,6 @@ | |||
| [package] | |||
| name = "dora-cli" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| edition = "2021" | |||
| # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | |||
| @@ -3,7 +3,6 @@ use dora_core::{ | |||
| adjust_shared_library_path, | |||
| config::{InputMapping, UserInputMapping}, | |||
| descriptor::{self, source_is_url, CoreNodeKind, OperatorSource}, | |||
| topics::ControlRequest, | |||
| }; | |||
| use eyre::{bail, eyre, Context}; | |||
| use std::{env::consts::EXE_EXTENSION, io::Write, path::Path}; | |||
| @@ -64,7 +64,7 @@ fn create_operator(name: String, path: Option<PathBuf>) -> Result<(), eyre::ErrR | |||
| // create directories | |||
| let root = path.as_deref().unwrap_or_else(|| Path::new(&name)); | |||
| fs::create_dir(&root) | |||
| fs::create_dir(root) | |||
| .with_context(|| format!("failed to create directory `{}`", root.display()))?; | |||
| let operator_path = root.join("operator.cc"); | |||
| @@ -96,7 +96,7 @@ fn create_custom_node(name: String, path: Option<PathBuf>) -> Result<(), eyre::E | |||
| // create directories | |||
| let root = path.as_deref().unwrap_or_else(|| Path::new(&name)); | |||
| fs::create_dir(&root) | |||
| fs::create_dir(root) | |||
| .with_context(|| format!("failed to create directory `{}`", root.display()))?; | |||
| let node_path = root.join("node.cc"); | |||
| @@ -6,4 +6,4 @@ edition = "2021" | |||
| # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | |||
| [dependencies] | |||
| dora-node-api = { git = "https://github.com/dora-rs/dora.git" } | |||
| dora-node-api = { git = "https://github.com/dora-rs/dora.git", tag = "v0.1.1" } | |||
| @@ -1,4 +1,4 @@ | |||
| use dora_node_api::{self, core::config::DataId, DoraNode}; | |||
| use dora_node_api::DoraNode; | |||
| use std::error::Error; | |||
| fn main() -> Result<(), Box<dyn Error>> { | |||
| @@ -9,4 +9,4 @@ edition = "2021" | |||
| crate-type = ["cdylib"] | |||
| [dependencies] | |||
| dora-operator-api = { git = "https://github.com/dora-rs/dora.git" } | |||
| dora-operator-api = { git = "https://github.com/dora-rs/dora.git", tag = "v0.1.1" } | |||
| @@ -1,6 +1,6 @@ | |||
| [package] | |||
| name = "dora-coordinator" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| edition = "2021" | |||
| license = "Apache-2.0" | |||
| @@ -20,7 +20,7 @@ clap = { version = "3.1.8", features = ["derive"] } | |||
| uuid = { version = "1.2.1" } | |||
| time = "0.3.9" | |||
| rand = "0.8.5" | |||
| dora-core = { version = "0.1.0", path = "../../libraries/core" } | |||
| dora-core = { version = "0.1.1", path = "../../libraries/core" } | |||
| dora-message = { path = "../../libraries/message" } | |||
| tracing = "0.1.36" | |||
| tracing-subscriber = "0.3.15" | |||
| @@ -81,14 +81,23 @@ fn handle_requests( | |||
| Ok(reply) | |||
| })); | |||
| if let Err(err) = result { | |||
| if err.kind() == ErrorKind::Other { | |||
| let inner = err.into_inner().unwrap(); | |||
| let downcasted = inner.downcast_ref().unwrap(); | |||
| match downcasted { | |||
| HandlerError::ParseError(err) => { | |||
| tracing::warn!("failed to parse request: {err}"); | |||
| match err.kind() { | |||
| ErrorKind::UnexpectedEof => { | |||
| tracing::trace!("Control connection closed"); | |||
| break; | |||
| } | |||
| ErrorKind::Other => { | |||
| let inner = err.into_inner().unwrap(); | |||
| let downcasted = inner.downcast_ref().unwrap(); | |||
| match downcasted { | |||
| HandlerError::ParseError(err) => { | |||
| tracing::warn!("failed to parse request: {err}"); | |||
| } | |||
| HandlerError::ServerStopped => break, | |||
| } | |||
| HandlerError::ServerStopped => break, | |||
| } | |||
| _ => { | |||
| tracing::warn!("I/O error while trying to receive control request: {err:?}"); | |||
| } | |||
| } | |||
| } | |||
| @@ -103,6 +112,7 @@ enum HandlerError { | |||
| ServerStopped, | |||
| } | |||
| #[derive(Debug)] | |||
| pub enum ControlEvent { | |||
| IncomingRequest { | |||
| request: ControlRequest, | |||
| @@ -99,6 +99,7 @@ async fn start(runtime_path: &Path) -> eyre::Result<()> { | |||
| let mut daemon_connections = HashMap::new(); | |||
| while let Some(event) = events.next().await { | |||
| tracing::trace!("Handling event {event:?}"); | |||
| match event { | |||
| Event::NewDaemonConnection(connection) => { | |||
| let events_tx = daemon_events_tx.clone(); | |||
| @@ -369,6 +370,7 @@ async fn start_dataflow( | |||
| }) | |||
| } | |||
| #[derive(Debug)] | |||
| pub enum Event { | |||
| NewDaemonConnection(TcpStream), | |||
| DaemonConnectError(eyre::Report), | |||
| @@ -377,10 +379,12 @@ pub enum Event { | |||
| Daemon(DaemonEvent), | |||
| } | |||
| #[derive(Debug)] | |||
| pub enum DataflowEvent { | |||
| Finished { result: eyre::Result<()> }, | |||
| } | |||
| #[derive(Debug)] | |||
| pub enum DaemonEvent { | |||
| Register { | |||
| machine_id: String, | |||
| @@ -1,7 +1,7 @@ | |||
| use super::command_init_common_env; | |||
| use dora_core::{ | |||
| config::NodeId, | |||
| descriptor::{self, EnvValue}, | |||
| descriptor::{self, EnvValue, OperatorSource}, | |||
| }; | |||
| use eyre::{eyre, WrapErr}; | |||
| use std::{collections::BTreeMap, path::Path}; | |||
| @@ -15,7 +15,30 @@ pub fn spawn_runtime_node( | |||
| communication: &dora_core::config::CommunicationConfig, | |||
| working_dir: &Path, | |||
| ) -> eyre::Result<tokio::task::JoinHandle<eyre::Result<(), eyre::Error>>> { | |||
| let mut command = tokio::process::Command::new(runtime); | |||
| let has_python_operator = node | |||
| .operators | |||
| .iter() | |||
| .any(|x| matches!(x.config.source, OperatorSource::Python { .. })); | |||
| let has_other_operator = node | |||
| .operators | |||
| .iter() | |||
| .any(|x| !matches!(x.config.source, OperatorSource::Python { .. })); | |||
| let mut command = if has_python_operator && !has_other_operator { | |||
| // Use python to spawn runtime if there is a python operator | |||
| let mut command = tokio::process::Command::new("python3"); | |||
| command.args(["-c", "import dora; dora.start_runtime()"]); | |||
| command | |||
| } else if !has_python_operator && has_other_operator { | |||
| // Use default runtime if there is no python operator | |||
| tokio::process::Command::new(runtime) | |||
| } else { | |||
| return Err(eyre!( | |||
| "Runtime can not mix Python Operator with other type of operator." | |||
| )); | |||
| }; | |||
| command_init_common_env(&mut command, &node_id, communication)?; | |||
| command.env( | |||
| "DORA_OPERATORS", | |||
| @@ -1,6 +1,6 @@ | |||
| [package] | |||
| name = "dora-runtime" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| edition = "2021" | |||
| license = "Apache-2.0" | |||
| @@ -11,7 +11,7 @@ clap = { version = "3.1.12", features = ["derive"] } | |||
| dora-node-api = { path = "../../apis/rust/node", default-features = false } | |||
| dora-operator-api-python = { path = "../../apis/python/operator" } | |||
| dora-operator-api-types = { path = "../../apis/rust/operator/types" } | |||
| dora-core = { version = "0.1.0", path = "../../libraries/core" } | |||
| dora-core = { version = "0.1.1", path = "../../libraries/core" } | |||
| dora-tracing = { path = "../../libraries/extensions/telemetry/tracing", optional = true } | |||
| dora-metrics = { path = "../../libraries/extensions/telemetry/metrics", optional = true } | |||
| opentelemetry = { version = "0.17", features = [ | |||
| @@ -29,7 +29,7 @@ tokio-stream = "0.1.8" | |||
| zenoh = { git = "https://github.com/eclipse-zenoh/zenoh.git", rev = "79a136e4fd90b11ff5d775ced981af53c4f1071b" } | |||
| zenoh-config = { git = "https://github.com/eclipse-zenoh/zenoh.git", rev = "79a136e4fd90b11ff5d775ced981af53c4f1071b" } | |||
| fern = "0.6.1" | |||
| pyo3 = { version = "0.16.5", features = ["auto-initialize", "eyre", "abi3"] } | |||
| pyo3 = { version = "0.16", features = ["eyre", "abi3-py37"] } | |||
| # pyo3-abi3 flag allow simpler linking. See: https://pyo3.rs/v0.13.2/building_and_distribution.html | |||
| flume = "0.10.14" | |||
| dora-message = { path = "../../libraries/message" } | |||
| @@ -0,0 +1,203 @@ | |||
| #![warn(unsafe_op_in_unsafe_fn)] | |||
| use dora_core::{ | |||
| config::{CommunicationConfig, DataId, NodeId, OperatorId}, | |||
| descriptor::OperatorDefinition, | |||
| }; | |||
| use dora_node_api::{ | |||
| self, | |||
| communication::{self, CommunicationLayer, Publisher, STOP_TOPIC}, | |||
| manual_stop_publisher, | |||
| }; | |||
| use eyre::{bail, Context, Result}; | |||
| use futures::{Stream, StreamExt}; | |||
| use operator::{spawn_operator, OperatorEvent, StopReason}; | |||
| use std::{ | |||
| collections::{BTreeSet, HashMap}, | |||
| mem, | |||
| }; | |||
| use tokio::{runtime::Builder, sync::mpsc}; | |||
| use tokio_stream::{wrappers::ReceiverStream, StreamMap}; | |||
| mod operator; | |||
| pub fn main() -> eyre::Result<()> { | |||
| set_up_tracing().context("failed to set up tracing subscriber")?; | |||
| let node_id: NodeId = { | |||
| let raw = | |||
| std::env::var("DORA_NODE_ID").wrap_err("env variable DORA_NODE_ID must be set")?; | |||
| serde_yaml::from_str(&raw).context("failed to deserialize operator config")? | |||
| }; | |||
| let communication_config: CommunicationConfig = { | |||
| let raw = std::env::var("DORA_COMMUNICATION_CONFIG") | |||
| .wrap_err("env variable DORA_COMMUNICATION_CONFIG must be set")?; | |||
| serde_yaml::from_str(&raw).context("failed to deserialize communication config")? | |||
| }; | |||
| let operators: Vec<OperatorDefinition> = { | |||
| let raw = | |||
| std::env::var("DORA_OPERATORS").wrap_err("env variable DORA_OPERATORS must be set")?; | |||
| serde_yaml::from_str(&raw).context("failed to deserialize operator config")? | |||
| }; | |||
| let mut communication: Box<dyn CommunicationLayer> = | |||
| communication::init(&communication_config)?; | |||
| let mut operator_events = StreamMap::new(); | |||
| let mut operator_stop_publishers = HashMap::new(); | |||
| let mut operator_events_tx = HashMap::new(); | |||
| for operator_config in &operators { | |||
| let (events_tx, events) = mpsc::channel(1); | |||
| let stop_publisher = publisher( | |||
| &node_id, | |||
| operator_config.id.clone(), | |||
| STOP_TOPIC.to_owned().into(), | |||
| communication.as_mut(), | |||
| ) | |||
| .with_context(|| { | |||
| format!( | |||
| "failed to create stop publisher for operator {}", | |||
| operator_config.id | |||
| ) | |||
| })?; | |||
| operator_stop_publishers.insert(operator_config.id.clone(), stop_publisher); | |||
| operator_events.insert(operator_config.id.clone(), ReceiverStream::new(events)); | |||
| operator_events_tx.insert(operator_config.id.clone(), events_tx); | |||
| } | |||
| let operator_events = operator_events.map(|(id, event)| Event::Operator { id, event }); | |||
| let node_id_clone = node_id.clone(); | |||
| let tokio_runtime = Builder::new_current_thread() | |||
| .enable_all() | |||
| .build() | |||
| .wrap_err("Could not build a tokio runtime.")?; | |||
| let manual_stop_publisher = manual_stop_publisher(communication.as_mut())?; | |||
| let stop_thread = std::thread::spawn(move || -> Result<()> { | |||
| tokio_runtime.block_on(run( | |||
| node_id_clone, | |||
| operator_events, | |||
| operator_stop_publishers, | |||
| manual_stop_publisher, | |||
| )) | |||
| }); | |||
| for operator_config in &operators { | |||
| let events_tx = operator_events_tx.get(&operator_config.id).unwrap(); | |||
| spawn_operator( | |||
| &node_id, | |||
| operator_config.clone(), | |||
| events_tx.clone(), | |||
| communication.as_mut(), | |||
| ) | |||
| .wrap_err_with(|| format!("failed to init operator {}", operator_config.id))?; | |||
| } | |||
| stop_thread | |||
| .join() | |||
| .map_err(|err| eyre::eyre!("Stop thread failed with err: {err:#?}"))? | |||
| .wrap_err("Stop loop thread failed unexpectedly.")?; | |||
| Ok(()) | |||
| } | |||
| async fn run( | |||
| node_id: NodeId, | |||
| mut events: impl Stream<Item = Event> + Unpin, | |||
| mut operator_stop_publishers: HashMap<OperatorId, Box<dyn Publisher>>, | |||
| manual_stop_publisher: Box<dyn Publisher>, | |||
| ) -> eyre::Result<()> { | |||
| #[cfg(feature = "metrics")] | |||
| let _started = { | |||
| use dora_metrics::init_meter; | |||
| use opentelemetry::global; | |||
| use opentelemetry_system_metrics::init_process_observer; | |||
| let _started = init_meter(); | |||
| let meter = global::meter(Box::leak(node_id.to_string().into_boxed_str())); | |||
| init_process_observer(meter); | |||
| _started | |||
| }; | |||
| let mut stopped_operators = BTreeSet::new(); | |||
| while let Some(event) = events.next().await { | |||
| match event { | |||
| Event::Operator { id, event } => { | |||
| match event { | |||
| OperatorEvent::Error(err) => { | |||
| bail!(err.wrap_err(format!("operator {id} failed"))) | |||
| } | |||
| OperatorEvent::Panic(payload) => std::panic::resume_unwind(payload), | |||
| OperatorEvent::Finished { reason } => { | |||
| if let StopReason::ExplicitStopAll = reason { | |||
| let hlc = dora_message::uhlc::HLC::default(); | |||
| let metadata = dora_message::Metadata::new(hlc.new_timestamp()); | |||
| let data = metadata | |||
| .serialize() | |||
| .wrap_err("failed to serialize stop message")?; | |||
| manual_stop_publisher | |||
| .publish(&data) | |||
| .map_err(|err| eyre::eyre!(err)) | |||
| .wrap_err("failed to send stop message")?; | |||
| break; | |||
| } | |||
| if let Some(stop_publisher) = operator_stop_publishers.remove(&id) { | |||
| tracing::info!("operator {node_id}/{id} finished ({reason:?})"); | |||
| stopped_operators.insert(id.clone()); | |||
| // send stopped message | |||
| tokio::task::spawn_blocking(move || stop_publisher.publish(&[])) | |||
| .await | |||
| .wrap_err("failed to join stop publish task")? | |||
| .map_err(|err| eyre::eyre!(err)) | |||
| .with_context(|| { | |||
| format!( | |||
| "failed to send stop message for operator `{node_id}/{id}`" | |||
| ) | |||
| })?; | |||
| if operator_stop_publishers.is_empty() { | |||
| break; | |||
| } | |||
| } else { | |||
| tracing::warn!("no stop publisher for {id}"); | |||
| } | |||
| } | |||
| } | |||
| } | |||
| } | |||
| } | |||
| mem::drop(events); | |||
| Ok(()) | |||
| } | |||
| fn publisher( | |||
| self_id: &NodeId, | |||
| operator_id: OperatorId, | |||
| output_id: DataId, | |||
| communication: &mut dyn CommunicationLayer, | |||
| ) -> eyre::Result<Box<dyn Publisher>> { | |||
| let topic = format!("{self_id}/{operator_id}/{output_id}"); | |||
| communication | |||
| .publisher(&topic) | |||
| .map_err(|err| eyre::eyre!(err)) | |||
| .wrap_err_with(|| format!("failed to create publisher for output {output_id}")) | |||
| } | |||
| enum Event { | |||
| Operator { | |||
| id: OperatorId, | |||
| event: OperatorEvent, | |||
| }, | |||
| } | |||
| fn set_up_tracing() -> eyre::Result<()> { | |||
| use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt; | |||
| let stdout_log = tracing_subscriber::fmt::layer().pretty(); | |||
| let subscriber = tracing_subscriber::Registry::default().with(stdout_log); | |||
| tracing::subscriber::set_global_default(subscriber) | |||
| .context("failed to set tracing global subscriber") | |||
| } | |||
| @@ -1,178 +1,3 @@ | |||
| #![warn(unsafe_op_in_unsafe_fn)] | |||
| use dora_core::{ | |||
| config::{CommunicationConfig, DataId, NodeId, OperatorId}, | |||
| descriptor::OperatorDefinition, | |||
| }; | |||
| use dora_node_api::{ | |||
| self, | |||
| communication::{self, CommunicationLayer, Publisher, STOP_TOPIC}, | |||
| manual_stop_publisher, | |||
| }; | |||
| use eyre::{bail, Context}; | |||
| use futures::{Stream, StreamExt}; | |||
| use operator::{spawn_operator, OperatorEvent, StopReason}; | |||
| use std::{ | |||
| collections::{BTreeSet, HashMap}, | |||
| mem, | |||
| }; | |||
| use tokio::sync::mpsc; | |||
| use tokio_stream::{wrappers::ReceiverStream, StreamMap}; | |||
| mod operator; | |||
| fn main() -> eyre::Result<()> { | |||
| set_up_tracing().context("failed to set up tracing subscriber")?; | |||
| let node_id = { | |||
| let raw = | |||
| std::env::var("DORA_NODE_ID").wrap_err("env variable DORA_NODE_ID must be set")?; | |||
| serde_yaml::from_str(&raw).context("failed to deserialize operator config")? | |||
| }; | |||
| let communication_config: CommunicationConfig = { | |||
| let raw = std::env::var("DORA_COMMUNICATION_CONFIG") | |||
| .wrap_err("env variable DORA_COMMUNICATION_CONFIG must be set")?; | |||
| serde_yaml::from_str(&raw).context("failed to deserialize communication config")? | |||
| }; | |||
| let operators: Vec<OperatorDefinition> = { | |||
| let raw = | |||
| std::env::var("DORA_OPERATORS").wrap_err("env variable DORA_OPERATORS must be set")?; | |||
| serde_yaml::from_str(&raw).context("failed to deserialize operator config")? | |||
| }; | |||
| let mut communication: Box<dyn CommunicationLayer> = | |||
| communication::init(&communication_config)?; | |||
| let mut operator_events = StreamMap::new(); | |||
| let mut operator_stop_publishers = HashMap::new(); | |||
| for operator_config in &operators { | |||
| let (events_tx, events) = mpsc::channel(1); | |||
| spawn_operator( | |||
| &node_id, | |||
| operator_config.clone(), | |||
| events_tx.clone(), | |||
| communication.as_mut(), | |||
| ) | |||
| .wrap_err_with(|| format!("failed to init operator {}", operator_config.id))?; | |||
| operator_events.insert(operator_config.id.clone(), ReceiverStream::new(events)); | |||
| let stop_publisher = publisher( | |||
| &node_id, | |||
| operator_config.id.clone(), | |||
| STOP_TOPIC.to_owned().into(), | |||
| communication.as_mut(), | |||
| ) | |||
| .with_context(|| { | |||
| format!( | |||
| "failed to create stop publisher for operator {}", | |||
| operator_config.id | |||
| ) | |||
| })?; | |||
| operator_stop_publishers.insert(operator_config.id.clone(), stop_publisher); | |||
| } | |||
| let operator_events = operator_events.map(|(id, event)| Event::Operator { id, event }); | |||
| tokio::runtime::Runtime::new()?.block_on(run( | |||
| node_id, | |||
| operator_events, | |||
| operator_stop_publishers, | |||
| communication.as_mut(), | |||
| )) | |||
| } | |||
| async fn run( | |||
| node_id: NodeId, | |||
| mut events: impl Stream<Item = Event> + Unpin, | |||
| mut operator_stop_publishers: HashMap<OperatorId, Box<dyn Publisher>>, | |||
| communication: &mut dyn CommunicationLayer, | |||
| ) -> eyre::Result<()> { | |||
| #[cfg(feature = "metrics")] | |||
| let _started = { | |||
| use dora_metrics::init_meter; | |||
| use opentelemetry::global; | |||
| use opentelemetry_system_metrics::init_process_observer; | |||
| let _started = init_meter(); | |||
| let meter = global::meter(Box::leak(node_id.to_string().into_boxed_str())); | |||
| init_process_observer(meter); | |||
| _started | |||
| }; | |||
| let mut stopped_operators = BTreeSet::new(); | |||
| while let Some(event) = events.next().await { | |||
| match event { | |||
| Event::Operator { id, event } => { | |||
| match event { | |||
| OperatorEvent::Error(err) => { | |||
| bail!(err.wrap_err(format!("operator {id} failed"))) | |||
| } | |||
| OperatorEvent::Panic(payload) => std::panic::resume_unwind(payload), | |||
| OperatorEvent::Finished { reason } => { | |||
| if let StopReason::ExplicitStopAll = reason { | |||
| let manual_stop_publisher = manual_stop_publisher(communication)?; | |||
| tokio::task::spawn_blocking(manual_stop_publisher) | |||
| .await | |||
| .wrap_err("failed to join stop publish task")? | |||
| .map_err(|err| eyre::eyre!(err)) | |||
| .wrap_err("failed to send stop message")?; | |||
| } | |||
| if let Some(stop_publisher) = operator_stop_publishers.remove(&id) { | |||
| tracing::info!("operator {node_id}/{id} finished ({reason:?})"); | |||
| stopped_operators.insert(id.clone()); | |||
| // send stopped message | |||
| tokio::task::spawn_blocking(move || stop_publisher.publish(&[])) | |||
| .await | |||
| .wrap_err("failed to join stop publish task")? | |||
| .map_err(|err| eyre::eyre!(err)) | |||
| .with_context(|| { | |||
| format!( | |||
| "failed to send stop message for operator `{node_id}/{id}`" | |||
| ) | |||
| })?; | |||
| if operator_stop_publishers.is_empty() { | |||
| break; | |||
| } | |||
| } else { | |||
| tracing::warn!("no stop publisher for {id}"); | |||
| } | |||
| } | |||
| } | |||
| } | |||
| } | |||
| } | |||
| mem::drop(events); | |||
| Ok(()) | |||
| } | |||
| fn publisher( | |||
| self_id: &NodeId, | |||
| operator_id: OperatorId, | |||
| output_id: DataId, | |||
| communication: &mut dyn CommunicationLayer, | |||
| ) -> eyre::Result<Box<dyn Publisher>> { | |||
| let topic = format!("{self_id}/{operator_id}/{output_id}"); | |||
| communication | |||
| .publisher(&topic) | |||
| .map_err(|err| eyre::eyre!(err)) | |||
| .wrap_err_with(|| format!("failed to create publisher for output {output_id}")) | |||
| } | |||
| enum Event { | |||
| Operator { | |||
| id: OperatorId, | |||
| event: OperatorEvent, | |||
| }, | |||
| } | |||
| fn set_up_tracing() -> eyre::Result<()> { | |||
| use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt; | |||
| let stdout_log = tracing_subscriber::fmt::layer().pretty(); | |||
| let subscriber = tracing_subscriber::Registry::default().with(stdout_log); | |||
| tracing::subscriber::set_global_default(subscriber) | |||
| .context("failed to set tracing global subscriber") | |||
| fn main() -> Result<(), eyre::Report> { | |||
| dora_runtime::main() | |||
| } | |||
| @@ -23,7 +23,6 @@ use std::{ | |||
| panic::{catch_unwind, AssertUnwindSafe}, | |||
| path::Path, | |||
| sync::Arc, | |||
| thread, | |||
| }; | |||
| use tokio::sync::mpsc::Sender; | |||
| @@ -51,7 +50,7 @@ pub fn spawn( | |||
| let path = if source_is_url(source) { | |||
| let target_path = Path::new("build") | |||
| .join(node_id.to_string()) | |||
| .join(format!("{}.py", operator_id.to_string())); | |||
| .join(format!("{}.py", operator_id)); | |||
| // try to download the shared library | |||
| let rt = tokio::runtime::Builder::new_current_thread() | |||
| .enable_all() | |||
| @@ -170,24 +169,22 @@ pub fn spawn( | |||
| Result::<_, eyre::Report>::Ok(reason) | |||
| }; | |||
| thread::spawn(move || { | |||
| let closure = AssertUnwindSafe(|| { | |||
| python_runner() | |||
| .wrap_err_with(|| format!("error in Python module at {}", path_cloned.display())) | |||
| }); | |||
| let closure = AssertUnwindSafe(|| { | |||
| python_runner() | |||
| .wrap_err_with(|| format!("error in Python module at {}", path_cloned.display())) | |||
| }); | |||
| match catch_unwind(closure) { | |||
| Ok(Ok(reason)) => { | |||
| let _ = events_tx.blocking_send(OperatorEvent::Finished { reason }); | |||
| } | |||
| Ok(Err(err)) => { | |||
| let _ = events_tx.blocking_send(OperatorEvent::Error(err)); | |||
| } | |||
| Err(panic) => { | |||
| let _ = events_tx.blocking_send(OperatorEvent::Panic(panic)); | |||
| } | |||
| match catch_unwind(closure) { | |||
| Ok(Ok(reason)) => { | |||
| let _ = events_tx.blocking_send(OperatorEvent::Finished { reason }); | |||
| } | |||
| }); | |||
| Ok(Err(err)) => { | |||
| let _ = events_tx.blocking_send(OperatorEvent::Error(err)); | |||
| } | |||
| Err(panic) => { | |||
| let _ = events_tx.blocking_send(OperatorEvent::Panic(panic)); | |||
| } | |||
| } | |||
| Ok(()) | |||
| } | |||
| @@ -205,11 +202,10 @@ mod callback_impl { | |||
| use super::SendOutputCallback; | |||
| use dora_message::Metadata; | |||
| use dora_operator_api_python::pydict_to_metadata; | |||
| use eyre::{eyre, Context}; | |||
| use eyre::{eyre, Context, Result}; | |||
| use pyo3::{ | |||
| pymethods, | |||
| types::{PyBytes, PyDict}, | |||
| PyResult, | |||
| }; | |||
| #[pymethods] | |||
| @@ -219,27 +215,27 @@ mod callback_impl { | |||
| output: &str, | |||
| data: &PyBytes, | |||
| metadata: Option<&PyDict>, | |||
| ) -> PyResult<()> { | |||
| ) -> Result<()> { | |||
| let data = data.as_bytes(); | |||
| let parameters = pydict_to_metadata(metadata).wrap_err("Could not parse metadata.")?; | |||
| let metadata = Metadata::from_parameters(self.hlc.new_timestamp(), parameters); | |||
| let mut message = metadata | |||
| .serialize() | |||
| .context(format!("failed to serialize `{}` metadata", output))?; | |||
| match self.publishers.get(output) { | |||
| Some(publisher) => { | |||
| let parameters = pydict_to_metadata(metadata)?; | |||
| let metadata = Metadata::from_parameters(self.hlc.new_timestamp(), parameters); | |||
| let message = metadata | |||
| .serialize() | |||
| .context(format!("failed to serialize `{}` metadata", output)); | |||
| message.and_then(|mut message| { | |||
| message.extend_from_slice(data.as_bytes()); | |||
| publisher | |||
| .publish(&message) | |||
| .map_err(|err| eyre::eyre!(err)) | |||
| .context("publish failed") | |||
| }) | |||
| message.extend_from_slice(data); | |||
| publisher | |||
| .publish(&message) | |||
| .map_err(|err| eyre::eyre!(err)) | |||
| .context("publish failed") | |||
| } | |||
| None => Err(eyre!( | |||
| "unexpected output {output} (not defined in dataflow config)" | |||
| )), | |||
| } | |||
| .map_err(|err| err.into()) | |||
| } | |||
| } | |||
| } | |||
| @@ -1,11 +1,11 @@ | |||
| [package] | |||
| name = "iceoryx-example-node" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| edition = "2021" | |||
| # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | |||
| [dependencies] | |||
| dora-node-api = { version = "0.1.0", path = "../../../apis/rust/node" } | |||
| dora-node-api = { version = "0.1.1", path = "../../../apis/rust/node" } | |||
| eyre = "0.6.8" | |||
| rand = "0.8.5" | |||
| @@ -1,6 +1,6 @@ | |||
| [package] | |||
| name = "iceoryx-example-operator" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| edition = "2021" | |||
| license = "Apache-2.0" | |||
| @@ -1,12 +1,12 @@ | |||
| [package] | |||
| name = "iceoryx-example-sink" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| edition = "2021" | |||
| # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | |||
| [dependencies] | |||
| dora-node-api = { version = "0.1.0", path = "../../../apis/rust/node" } | |||
| dora-node-api = { version = "0.1.1", path = "../../../apis/rust/node" } | |||
| eyre = "0.6.8" | |||
| futures = "0.3.21" | |||
| tokio = { version = "1.20.1", features = ["macros"] } | |||
| @@ -34,7 +34,6 @@ class Operator: | |||
| frame = np.frombuffer(dora_input["data"], dtype="uint8") | |||
| frame = cv2.imdecode(frame, -1) | |||
| frame = frame[:, :, ::-1] # OpenCV image (BGR to RGB) | |||
| results = self.model(frame) # includes NMS | |||
| arrays = np.array(results.xyxy[0].cpu()).tobytes() | |||
| send_output("bbox", arrays, dora_input["metadata"]) | |||
| @@ -24,6 +24,7 @@ class Operator: | |||
| def __init__(self): | |||
| self.image = [] | |||
| self.bboxs = [] | |||
| def on_input( | |||
| self, | |||
| @@ -45,39 +46,39 @@ class Operator: | |||
| elif dora_input["id"] == "bbox" and len(self.image) != 0: | |||
| bboxs = np.frombuffer(dora_input["data"], dtype="float32") | |||
| bboxs = np.reshape(bboxs, (-1, 6)) | |||
| for bbox in bboxs: | |||
| [ | |||
| min_x, | |||
| min_y, | |||
| max_x, | |||
| max_y, | |||
| confidence, | |||
| label, | |||
| ] = bbox | |||
| cv2.rectangle( | |||
| self.image, | |||
| (int(min_x), int(min_y)), | |||
| (int(max_x), int(max_y)), | |||
| (0, 255, 0), | |||
| 2, | |||
| ) | |||
| cv2.putText( | |||
| self.image, | |||
| LABELS[int(label)] + f", {confidence:0.2f}", | |||
| (int(max_x), int(max_y)), | |||
| font, | |||
| 0.75, | |||
| (0, 255, 0), | |||
| 2, | |||
| 1, | |||
| ) | |||
| if CI != "true": | |||
| cv2.imshow("frame", self.image) | |||
| if cv2.waitKey(1) & 0xFF == ord("q"): | |||
| return DoraStatus.STOP | |||
| self.bboxs = np.reshape(bboxs, (-1, 6)) | |||
| for bbox in self.bboxs: | |||
| [ | |||
| min_x, | |||
| min_y, | |||
| max_x, | |||
| max_y, | |||
| confidence, | |||
| label, | |||
| ] = bbox | |||
| cv2.rectangle( | |||
| self.image, | |||
| (int(min_x), int(min_y)), | |||
| (int(max_x), int(max_y)), | |||
| (0, 255, 0), | |||
| 2, | |||
| ) | |||
| cv2.putText( | |||
| self.image, | |||
| LABELS[int(label)] + f", {confidence:0.2f}", | |||
| (int(max_x), int(max_y)), | |||
| font, | |||
| 0.75, | |||
| (0, 255, 0), | |||
| 2, | |||
| 1, | |||
| ) | |||
| if CI != "true": | |||
| cv2.imshow("frame", self.image) | |||
| if cv2.waitKey(1) & 0xFF == ord("q"): | |||
| return DoraStatus.STOP | |||
| return DoraStatus.CONTINUE | |||
| @@ -13,7 +13,7 @@ video_capture = cv2.VideoCapture(0) | |||
| start = time.time() | |||
| # Run for 20 seconds | |||
| while time.time() - start < 20: | |||
| while time.time() - start < 10: | |||
| # Wait next dora_input | |||
| node.next() | |||
| ret, frame = video_capture.read() | |||
| @@ -1,10 +1,10 @@ | |||
| [package] | |||
| name = "rust-dataflow-example-sink" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| edition = "2021" | |||
| # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | |||
| [dependencies] | |||
| dora-node-api = { version = "0.1.0", path = "../../../apis/rust/node" } | |||
| dora-node-api = { version = "0.1.1", path = "../../../apis/rust/node" } | |||
| eyre = "0.6.8" | |||
| @@ -1,12 +1,12 @@ | |||
| [package] | |||
| name = "rust-dataflow-example-node" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| edition = "2021" | |||
| # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | |||
| [dependencies] | |||
| dora-node-api = { version = "0.1.0", path = "../../../apis/rust/node" } | |||
| dora-node-api = { version = "0.1.1", path = "../../../apis/rust/node" } | |||
| eyre = "0.6.8" | |||
| futures = "0.3.21" | |||
| rand = "0.8.5" | |||
| @@ -1,6 +1,6 @@ | |||
| [package] | |||
| name = "rust-dataflow-example-operator" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| edition = "2021" | |||
| license = "Apache-2.0" | |||
| @@ -1,10 +1,10 @@ | |||
| [package] | |||
| name = "rust-dataflow-example-sink" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| edition = "2021" | |||
| # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | |||
| [dependencies] | |||
| dora-node-api = { version = "0.1.0", path = "../../../apis/rust/node" } | |||
| dora-node-api = { version = "0.1.1", path = "../../../apis/rust/node" } | |||
| eyre = "0.6.8" | |||
| @@ -1,6 +1,6 @@ | |||
| [package] | |||
| name = "communication-layer-pub-sub" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| edition = "2021" | |||
| [features] | |||
| @@ -1,6 +1,6 @@ | |||
| [package] | |||
| name = "communication-layer-request-reply" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| edition = "2021" | |||
| [features] | |||
| @@ -125,7 +125,7 @@ impl TcpConnection { | |||
| fn send(&mut self, request: &[u8]) -> std::io::Result<()> { | |||
| let len_raw = (request.len() as u64).to_le_bytes(); | |||
| self.stream.write_all(&len_raw)?; | |||
| self.stream.write_all(&request)?; | |||
| self.stream.write_all(request)?; | |||
| Ok(()) | |||
| } | |||
| @@ -1,6 +1,6 @@ | |||
| [package] | |||
| name = "dora-core" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| edition = "2021" | |||
| license = "Apache-2.0" | |||
| @@ -1,6 +1,6 @@ | |||
| [package] | |||
| name = "dora-download" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| edition = "2021" | |||
| license = "Apache-2.0" | |||
| @@ -1,6 +1,6 @@ | |||
| [package] | |||
| name = "dora-metrics" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| edition = "2021" | |||
| license = "Apache-2.0" | |||
| @@ -1,6 +1,6 @@ | |||
| [package] | |||
| name = "dora-tracing" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| edition = "2021" | |||
| license = "Apache-2.0" | |||
| @@ -1,6 +1,6 @@ | |||
| [package] | |||
| name = "zenoh-logger" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| edition = "2021" | |||
| license = "Apache-2.0" | |||
| @@ -1,6 +1,6 @@ | |||
| [package] | |||
| name = "dora-message" | |||
| version = "0.1.0" | |||
| version = "0.1.1" | |||
| edition = "2021" | |||
| license = "Apache-2.0" | |||