Browse Source

Merge branch 'main' into xshell

xshell
Philipp Oppermann 1 year ago
parent
commit
cd41356e76
Failed to extract signature
19 changed files with 1923 additions and 1133 deletions
  1. +13
    -0
      .github/dependabot.yml
  2. +18
    -0
      .github/workflows/ci.yml
  3. +2
    -2
      .github/workflows/pip-release.yml
  4. +1227
    -1062
      Cargo.lock
  5. +20
    -20
      Cargo.toml
  6. +24
    -0
      Changelog.md
  7. +14
    -14
      README.md
  8. +3
    -0
      apis/python/node/dora/__init__.pyi
  9. +7
    -0
      apis/python/node/src/lib.rs
  10. +5
    -0
      apis/python/operator/src/lib.rs
  11. +1
    -0
      apis/rust/node/src/event_stream/merged.rs
  12. +2
    -0
      libraries/core/Cargo.toml
  13. +31
    -0
      libraries/core/README.md
  14. +441
    -0
      libraries/core/dora-schema.json
  15. +42
    -0
      libraries/core/src/bin/generate_schema.rs
  16. +34
    -11
      libraries/core/src/config.rs
  17. +38
    -16
      libraries/core/src/descriptor/mod.rs
  18. +0
    -7
      libraries/core/src/descriptor/validate.rs
  19. +1
    -1
      libraries/extensions/download/Cargo.toml

+ 13
- 0
.github/dependabot.yml View File

@@ -0,0 +1,13 @@
version: 2
updates:
- package-ecosystem: "cargo"
directory: "/"
versioning-strategy: "lockfile-only"
allow:
- dependency-type: "all"
groups:
version-updates:
patterns:
- "*"
schedule:
interval: "weekly"

+ 18
- 0
.github/workflows/ci.yml View File

@@ -159,6 +159,24 @@ jobs:
steps:
- uses: actions/checkout@v3
- uses: r7kamura/rust-problem-matchers@v1.1.0

- name: Free Disk Space (Ubuntu)
uses: jlumbroso/free-disk-space@main
if: runner.os == 'Linux'
with:
# this might remove tools that are actually needed,
# if set to "true" but frees about 6 GB
tool-cache: false

# all of these default to true, but feel free to set to
# "false" if necessary for your workflow
android: true
dotnet: true
haskell: true
large-packages: false
docker-images: true
swap-storage: false

- run: cargo --version --verbose
- uses: Swatinem/rust-cache@v2
with:


+ 2
- 2
.github/workflows/pip-release.yml View File

@@ -26,7 +26,7 @@ jobs:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.7"
python-version: "3.8"
- name: Install dependencies
run: |
pip install patchelf --upgrade
@@ -91,7 +91,7 @@ jobs:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.7"
python-version: "3.8"
- name: Build wheels
uses: PyO3/maturin-action@v1
with:


+ 1227
- 1062
Cargo.lock
File diff suppressed because it is too large
View File


+ 20
- 20
Cargo.toml View File

@@ -38,31 +38,31 @@ members = [

[workspace.package]
# Make sure to also bump `apis/node/python/__init__.py` version.
version = "0.3.3"
version = "0.3.4"
description = "`dora` goal is to be a low latency, composable, and distributed data flow."
documentation = "https://dora.carsmos.ai"
license = "Apache-2.0"

[workspace.dependencies]
dora-node-api = { version = "0.3.3", path = "apis/rust/node", default-features = false }
dora-node-api-python = { version = "0.3.3", path = "apis/python/node", default-features = false }
dora-operator-api = { version = "0.3.3", path = "apis/rust/operator", default-features = false }
dora-operator-api-macros = { version = "0.3.3", path = "apis/rust/operator/macros" }
dora-operator-api-types = { version = "0.3.3", path = "apis/rust/operator/types" }
dora-operator-api-python = { version = "0.3.3", path = "apis/python/operator" }
dora-operator-api-c = { version = "0.3.3", path = "apis/c/operator" }
dora-node-api-c = { version = "0.3.3", path = "apis/c/node" }
dora-core = { version = "0.3.3", path = "libraries/core" }
dora-arrow-convert = { version = "0.3.3", path = "libraries/arrow-convert" }
dora-tracing = { version = "0.3.3", path = "libraries/extensions/telemetry/tracing" }
dora-metrics = { version = "0.3.3", path = "libraries/extensions/telemetry/metrics" }
dora-download = { version = "0.3.3", path = "libraries/extensions/download" }
shared-memory-server = { version = "0.3.3", path = "libraries/shared-memory-server" }
communication-layer-request-reply = { version = "0.3.3", path = "libraries/communication-layer/request-reply" }
dora-message = { version = "0.3.3", path = "libraries/message" }
dora-runtime = { version = "0.3.3", path = "binaries/runtime" }
dora-daemon = { version = "0.3.3", path = "binaries/daemon" }
dora-coordinator = { version = "0.3.3", path = "binaries/coordinator" }
dora-node-api = { version = "0.3.4", path = "apis/rust/node", default-features = false }
dora-node-api-python = { version = "0.3.4", path = "apis/python/node", default-features = false }
dora-operator-api = { version = "0.3.4", path = "apis/rust/operator", default-features = false }
dora-operator-api-macros = { version = "0.3.4", path = "apis/rust/operator/macros" }
dora-operator-api-types = { version = "0.3.4", path = "apis/rust/operator/types" }
dora-operator-api-python = { version = "0.3.4", path = "apis/python/operator" }
dora-operator-api-c = { version = "0.3.4", path = "apis/c/operator" }
dora-node-api-c = { version = "0.3.4", path = "apis/c/node" }
dora-core = { version = "0.3.4", path = "libraries/core" }
dora-arrow-convert = { version = "0.3.4", path = "libraries/arrow-convert" }
dora-tracing = { version = "0.3.4", path = "libraries/extensions/telemetry/tracing" }
dora-metrics = { version = "0.3.4", path = "libraries/extensions/telemetry/metrics" }
dora-download = { version = "0.3.4", path = "libraries/extensions/download" }
shared-memory-server = { version = "0.3.4", path = "libraries/shared-memory-server" }
communication-layer-request-reply = { version = "0.3.4", path = "libraries/communication-layer/request-reply" }
dora-message = { version = "0.3.4", path = "libraries/message" }
dora-runtime = { version = "0.3.4", path = "binaries/runtime" }
dora-daemon = { version = "0.3.4", path = "binaries/daemon" }
dora-coordinator = { version = "0.3.4", path = "binaries/coordinator" }
dora-ros2-bridge = { path = "libraries/extensions/ros2-bridge" }
dora-ros2-bridge-msg-gen = { path = "libraries/extensions/ros2-bridge/msg-gen" }
dora-ros2-bridge-python = { path = "libraries/extensions/ros2-bridge/python" }


+ 24
- 0
Changelog.md View File

@@ -1,5 +1,29 @@
# Changelog

## v0.3.4 (2024-05-17)

## What's Changed

- Remove `cxx_build` call, which is no longer used by @phil-opp in https://github.com/dora-rs/dora/pull/470
- Update `ros2-client` to latest version by @phil-opp in https://github.com/dora-rs/dora/pull/457
- Configurable bind addrs by @Michael-J-Ward in https://github.com/dora-rs/dora/pull/471
- Simple warning fixes by @Michael-J-Ward in https://github.com/dora-rs/dora/pull/477
- Adding `dora-rerun` as a visualization tool by @haixuanTao in https://github.com/dora-rs/dora/pull/479
- Fix Clippy and RERUN_MEMORY_LIMIT env variable default by @haixuanTao in https://github.com/dora-rs/dora/pull/490
- Fix CI build errors by @phil-opp in https://github.com/dora-rs/dora/pull/491
- Use `resolver = 2` for in workspace in Rust template by @phil-opp in https://github.com/dora-rs/dora/pull/492
- Add grace duration and kill process by @haixuanTao in https://github.com/dora-rs/dora/pull/487
- Simplify parsing of `AMENT_PREFIX_PATH` by @haixuanTao in https://github.com/dora-rs/dora/pull/489
- Convert rust example to node by @Michael-J-Ward in https://github.com/dora-rs/dora/pull/494
- Adding python IDE typing by @haixuanTao in https://github.com/dora-rs/dora/pull/493
- Fix: Wait until dora daemon is connected to coordinator on `dora up` by @phil-opp in https://github.com/dora-rs/dora/pull/496

## New Contributors

- @Michael-J-Ward made their first contribution in https://github.com/dora-rs/dora/pull/471

**Full Changelog**: https://github.com/dora-rs/dora/compare/v0.3.3...v0.3.4

## v0.3.3 (2024-04-08)

## What's Changed


+ 14
- 14
README.md View File

@@ -3,13 +3,13 @@
</p>

<h2 align="center">
<a href="https://dora.carsmos.ai">Website</a>
<a href="https://www.dora-rs.ai">Website</a>
|
<a href="https://dora.carsmos.ai/docs/api/python-api">Python API</a>
<a href="https://www.dora-rs.ai/docs/api/python-api">Python API</a>
-
<a href="https://docs.rs/dora-node-api/latest/dora_node_api/">Rust API</a>
|
<a href="https://dora.carsmos.ai/docs/guides/">Guide</a>
<a href="https://www.dora-rs.ai/docs/guides/">Guide</a>
|
<a href="https://discord.gg/6eMGGutkfE">Discord</a>
</h2>
@@ -46,7 +46,7 @@ dora-rs is still experimental and you might experience bugs, but we're working v

dora-rs can show impressive performance, up to 17x faster compared to current status quo ROS2 in Python! This is the result of using our own shared memory server and Apache Arrow to achieve zero copy data passing.

<a href="https://dora.carsmos.ai/">
<a href="https://www.dora-rs.ai/">
<img src="./docs/src/latency.png" align="center" width="600">
</a>

@@ -91,7 +91,7 @@ Nodes can either be:

The dataflow paradigm has the advantage of creating an abstraction layer that makes robotic applications modular and easily configurable.

<a href="https://dora.carsmos.ai/">
<a href="https://www.dora-rs.ai/">
<img src="https://raw.githubusercontent.com/dora-rs/dora-rs.github.io/main/static/img/overview.svg" align="center" width="600">
</a>

@@ -153,24 +153,24 @@ pip install dora-rs # For Python API
dora --help
```

For more info on installation, check out [our guide](https://dora.carsmos.ai/docs/guides/Installation/installing).
For more info on installation, check out [our guide](https://www.dora-rs.ai/docs/guides/Installation/installing).

## Getting Started

1. Install the example python dependencies:

```bash
pip install -r https://raw.githubusercontent.com/dora-rs/dora/v0.3.3/examples/python-operator-dataflow/requirements.txt
pip install -r https://raw.githubusercontent.com/dora-rs/dora/v0.3.4/examples/python-operator-dataflow/requirements.txt
```

2. Get some example operators:

```bash
wget https://raw.githubusercontent.com/dora-rs/dora/v0.3.3/examples/python-operator-dataflow/webcam.py
wget https://raw.githubusercontent.com/dora-rs/dora/v0.3.3/examples/python-operator-dataflow/plot.py
wget https://raw.githubusercontent.com/dora-rs/dora/v0.3.3/examples/python-operator-dataflow/utils.py
wget https://raw.githubusercontent.com/dora-rs/dora/v0.3.3/examples/python-operator-dataflow/object_detection.py
wget https://raw.githubusercontent.com/dora-rs/dora/v0.3.3/examples/python-operator-dataflow/dataflow.yml
wget https://raw.githubusercontent.com/dora-rs/dora/v0.3.4/examples/python-operator-dataflow/webcam.py
wget https://raw.githubusercontent.com/dora-rs/dora/v0.3.4/examples/python-operator-dataflow/plot.py
wget https://raw.githubusercontent.com/dora-rs/dora/v0.3.4/examples/python-operator-dataflow/utils.py
wget https://raw.githubusercontent.com/dora-rs/dora/v0.3.4/examples/python-operator-dataflow/object_detection.py
wget https://raw.githubusercontent.com/dora-rs/dora/v0.3.4/examples/python-operator-dataflow/dataflow.yml
```

3. Start the dataflow
@@ -184,7 +184,7 @@ dora start dataflow.yml --attach --hot-reload

To stop your dataflow, you can use <kbd>ctrl</kbd>+<kbd>c</kbd>

To go further, you can add a yolov8 operator, check out our getting started here: https://dora.carsmos.ai/docs/guides/getting-started/yolov8/
To go further, you can add a yolov8 operator, check out our getting started here: https://www.dora-rs.ai/docs/guides/getting-started/yolov8/

## ROS2 Bridge

@@ -233,7 +233,7 @@ Cool hardware that we think might be good fit to try out dora-rs 🙋 We are not

## Documentation

The full documentation is available on [our website](https://dora.carsmos.ai)
The full documentation is available on [our website](https://www.dora-rs.ai)

## Discussions



+ 3
- 0
apis/python/node/dora/__init__.pyi View File

@@ -39,6 +39,9 @@ node = Node()

This method returns the parsed dataflow YAML file."""

def dataflow_id(self) -> str:
"""Returns the dataflow id."""

def merge_external_events(self, subscription: dora.Ros2Subscription) -> None:
"""Merge an external event stream with dora main loop.
This currently only work with ROS2."""


+ 7
- 0
apis/python/node/src/lib.rs View File

@@ -154,6 +154,13 @@ impl Node {
pythonize::pythonize(py, self.node.dataflow_descriptor())
}

/// Returns the dataflow id.
///
/// :rtype: str
pub fn dataflow_id(&self) -> String {
self.node.dataflow_id().to_string()
}

/// Merge an external event stream with dora main loop.
/// This currently only work with ROS2.
///


+ 5
- 0
apis/python/operator/src/lib.rs View File

@@ -5,6 +5,7 @@ use pyo3::{exceptions::PyLookupError, prelude::*, types::PyDict};

/// Dora Event
#[pyclass]
#[derive(Debug)]
pub struct PyEvent {
event: MergedEvent<PyObject>,
data: Option<ArrayRef>,
@@ -56,6 +57,10 @@ impl PyEvent {
MergedEvent::External(event) => Some(event),
}
}

fn __str__(&self) -> PyResult<String> {
Ok(format!("{:#?}", &self.event))
}
}

impl PyEvent {


+ 1
- 0
apis/rust/node/src/event_stream/merged.rs View File

@@ -1,6 +1,7 @@
use futures::{Stream, StreamExt};
use futures_concurrency::stream::Merge;

#[derive(Debug)]
pub enum MergedEvent<E> {
Dora(super::Event),
External(E),


+ 2
- 0
libraries/core/Cargo.toml View File

@@ -20,3 +20,5 @@ tracing = "0.1"
serde-with-expand-env = "1.1.0"
tokio = { version = "1.24.1", features = ["fs", "process", "sync"] }
aligned-vec = { version = "0.5.0", features = ["serde"] }
schemars = "0.8.19"
serde_json = "1.0.117"

+ 31
- 0
libraries/core/README.md View File

@@ -0,0 +1,31 @@
# Core library for dora

## Generating dora schema

```bash
cargo run -p dora-core generate_schemas
```

## VSCode YAML Dataflow Support

We can pass the JSON Schema to VSCode [`redhat.vscode-yaml`](https://marketplace.visualstudio.com/items?itemName=redhat.vscode-yaml) to enables features such as:

- Type validation
- Suggestions
- Documentation

### Getting started

1. Install [`redhat.vscode-yaml`](https://marketplace.visualstudio.com/items?itemName=redhat.vscode-yaml)

2. Open User Settings(JSON) in VSCode within `ctrl`+ `shift` + `p` search bar.

3. Add the following:

```json
"yaml.schemas": {
"https://raw.githubusercontent.com/dora-rs/dora/main/libraries/core/dora-schema.json": "/*"
},
```

And you should be set! 🔥

+ 441
- 0
libraries/core/dora-schema.json View File

@@ -0,0 +1,441 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "dora-rs specification",
"description": "Dataflow description",
"type": "object",
"required": [
"nodes"
],
"properties": {
"nodes": {
"type": "array",
"items": {
"$ref": "#/definitions/Node"
}
}
},
"additionalProperties": true,
"definitions": {
"CustomNode": {
"type": "object",
"required": [
"source"
],
"properties": {
"args": {
"description": "Args for the executable.",
"type": [
"string",
"null"
]
},
"build": {
"type": [
"string",
"null"
]
},
"envs": {
"description": "Environment variables for the custom nodes",
"type": [
"object",
"null"
],
"additionalProperties": {
"$ref": "#/definitions/EnvValue"
}
},
"inputs": {
"description": "Inputs for the nodes as a map from input ID to `<node_id>/<output_id>`.\n\ne.g.\n\ninputs:\n\nexample_input: example_node/example_output1",
"default": {},
"type": "object",
"additionalProperties": true
},
"outputs": {
"description": "Outputs as a list of outputs.\n\ne.g.\n\noutputs:\n\n- output_1\n\n- output_2",
"default": [],
"type": "array",
"items": {
"$ref": "#/definitions/DataId"
},
"uniqueItems": true
},
"send_stdout_as": {
"description": "Send stdout and stderr to another node",
"type": [
"string",
"null"
]
},
"source": {
"description": "Path of the source code\n\nIf you want to use a specific `conda` environment. Provide the python path within the source.\n\nsource: /home/peter/miniconda3/bin/python\n\nargs: some_node.py\n\nSource can match any executable in PATH.",
"type": "string"
}
}
},
"DataId": {
"type": "string"
},
"Duration": {
"type": "object",
"required": [
"nanos",
"secs"
],
"properties": {
"nanos": {
"type": "integer",
"format": "uint32",
"minimum": 0.0
},
"secs": {
"type": "integer",
"format": "uint64",
"minimum": 0.0
}
}
},
"EnvValue": {
"anyOf": [
{
"type": "boolean"
},
{
"type": "integer",
"format": "uint64",
"minimum": 0.0
},
{
"type": "string"
}
]
},
"Input": {
"type": "object",
"required": [
"mapping"
],
"properties": {
"mapping": {
"$ref": "#/definitions/InputMapping"
},
"queue_size": {
"type": [
"integer",
"null"
],
"format": "uint",
"minimum": 0.0
}
},
"additionalProperties": true
},
"InputMapping": {
"oneOf": [
{
"type": "object",
"required": [
"Timer"
],
"properties": {
"Timer": {
"type": "object",
"required": [
"interval"
],
"properties": {
"interval": {
"$ref": "#/definitions/Duration"
}
}
}
},
"additionalProperties": true
},
{
"type": "object",
"required": [
"User"
],
"properties": {
"User": {
"$ref": "#/definitions/UserInputMapping"
}
},
"additionalProperties": true
}
]
},
"Node": {
"description": "Dora Node",
"type": "object",
"oneOf": [
{
"description": "Dora runtime node",
"type": "object",
"required": [
"operators"
],
"properties": {
"operators": {
"type": "array",
"items": {
"$ref": "#/definitions/OperatorDefinition"
}
}
},
"additionalProperties": true
},
{
"type": "object",
"required": [
"custom"
],
"properties": {
"custom": {
"$ref": "#/definitions/CustomNode"
}
},
"additionalProperties": true
},
{
"type": "object",
"required": [
"operator"
],
"properties": {
"operator": {
"$ref": "#/definitions/SingleOperatorDefinition"
}
},
"additionalProperties": true
}
],
"required": [
"id"
],
"properties": {
"description": {
"description": "Description of the node",
"type": [
"string",
"null"
]
},
"env": {
"description": "Environment variables",
"type": [
"object",
"null"
],
"additionalProperties": {
"$ref": "#/definitions/EnvValue"
}
},
"id": {
"description": "Node identifier",
"allOf": [
{
"$ref": "#/definitions/NodeId"
}
]
},
"name": {
"description": "Node name",
"type": [
"string",
"null"
]
}
}
},
"NodeId": {
"type": "string"
},
"OperatorDefinition": {
"type": "object",
"oneOf": [
{
"type": "object",
"required": [
"shared-library"
],
"properties": {
"shared-library": {
"type": "string"
}
},
"additionalProperties": true
},
{
"type": "object",
"required": [
"python"
],
"properties": {
},
"additionalProperties": true
}
],
"required": [
"id"
],
"properties": {
"build": {
"type": [
"string",
"null"
]
},
"description": {
"type": [
"string",
"null"
]
},
"id": {
"$ref": "#/definitions/OperatorId"
},
"inputs": {
"default": {},
"type": "object",
"additionalProperties": true
},
"name": {
"type": [
"string",
"null"
]
},
"outputs": {
"default": [],
"type": "array",
"items": {
"$ref": "#/definitions/DataId"
},
"uniqueItems": true
},
"send_stdout_as": {
"type": [
"string",
"null"
]
}
}
},
"OperatorId": {
"type": "string"
},
"PythonSource": {
"type": "object",
"required": [
"source"
],
"properties": {
"conda_env": {
"type": [
"string",
"null"
]
},
"source": {
"type": "string"
}
},
"additionalProperties": true
},
"SingleOperatorDefinition": {
"type": "object",
"oneOf": [
{
"type": "object",
"required": [
"shared-library"
],
"properties": {
"shared-library": {
"type": "string"
}
},
"additionalProperties": true
},
{
"type": "object",
"required": [
"python"
],
"properties": {
},
"additionalProperties": true
}
],
"properties": {
"build": {
"type": [
"string",
"null"
]
},
"description": {
"type": [
"string",
"null"
]
},
"id": {
"description": "ID is optional if there is only a single operator.",
"anyOf": [
{
"$ref": "#/definitions/OperatorId"
},
{
"type": "null"
}
]
},
"inputs": {
"default": {},
"type": "object",
"additionalProperties": true
},
"name": {
"type": [
"string",
"null"
]
},
"outputs": {
"default": [],
"type": "array",
"items": {
"$ref": "#/definitions/DataId"
},
"uniqueItems": true
},
"send_stdout_as": {
"type": [
"string",
"null"
]
}
}
},
"UserInputMapping": {
"type": "object",
"required": [
"output",
"source"
],
"properties": {
"output": {
"$ref": "#/definitions/DataId"
},
"source": {
"$ref": "#/definitions/NodeId"
}
}
}
}
}

+ 42
- 0
libraries/core/src/bin/generate_schema.rs View File

@@ -0,0 +1,42 @@
use std::{env, path::Path};

use dora_core::descriptor::Descriptor;
use schemars::schema_for;

fn main() -> () {
let schema = schema_for!(Descriptor);
let raw_schema =
serde_json::to_string_pretty(&schema).expect("Could not serialize schema to json");

// Add additional properties to True, as #[derive(transparent)] of enums are not well handled.
//
// 'OneOf' such as Custom Nodes, Operators and Single Operators overwrite property values of the initial struct `Nodes`.`
// which make the original properties such as `id` and `name` not validated by IDE extensions.
let raw_schema = raw_schema.replace(
"\"additionalProperties\": false",
"\"additionalProperties\": true",
);

// Remove `serde(from=` nested field as they are not handled properly by `schemars`
let raw_schema = raw_schema.replace(
"\"python\": {
\"$ref\": \"#/definitions/PythonSource\"
}",
"",
);
let raw_schema = raw_schema.replace(
"{
\"$ref\": \"#/definitions/Input\"
}",
"true",
);

// Get the Cargo root manifest directory
let manifest_dir = env::var("CARGO_MANIFEST_DIR").expect("CARGO_MANIFEST_DIR is not set");

// Create the path for the new file next to Cargo.toml
let new_file_path = Path::new(&manifest_dir).join("dora-schema.json");

// write to file
std::fs::write(new_file_path, raw_schema).expect("Could not write schema to file");
}

+ 34
- 11
libraries/core/src/config.rs View File

@@ -1,4 +1,5 @@
use once_cell::sync::OnceCell;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::{
borrow::Borrow,
@@ -9,7 +10,9 @@ use std::{
time::Duration,
};

#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
#[derive(
Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, JsonSchema,
)]
pub struct NodeId(String);

impl FromStr for NodeId {
@@ -32,7 +35,9 @@ impl std::fmt::Display for NodeId {
}
}

#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
#[derive(
Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, JsonSchema,
)]
pub struct OperatorId(String);

impl FromStr for OperatorId {
@@ -61,7 +66,9 @@ impl AsRef<str> for OperatorId {
}
}

#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
#[derive(
Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, JsonSchema,
)]
pub struct DataId(String);

impl From<DataId> for String {
@@ -114,7 +121,7 @@ impl Borrow<str> for DataId {
}
}

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, JsonSchema)]
pub enum InputMapping {
Timer { interval: Duration },
User(UserInputMapping),
@@ -214,7 +221,7 @@ impl<'de> Deserialize<'de> for InputMapping {
}
}

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, JsonSchema)]
pub struct UserInputMapping {
pub source: NodeId,
pub output: DataId,
@@ -236,15 +243,32 @@ pub fn format_duration(interval: Duration) -> FormattedDuration {
FormattedDuration(interval)
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
pub struct NodeRunConfig {
/// Inputs for the nodes as a map from input ID to `node_id/output_id`.
///
/// e.g.
///
/// inputs:
///
/// example_input: example_node/example_output1
///
#[serde(default)]
pub inputs: BTreeMap<DataId, Input>,
/// List of output IDs.
///
/// e.g.
///
/// outputs:
///
/// - output_1
///
/// - output_2
#[serde(default)]
pub outputs: BTreeSet<DataId>,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
#[serde(deny_unknown_fields, from = "InputDef", into = "InputDef")]
pub struct Input {
pub mapping: InputMapping,
@@ -294,7 +318,7 @@ impl From<InputDef> for Input {
}
}

#[derive(Debug, Default, Serialize, Deserialize, Clone)]
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema, Clone)]
#[serde(deny_unknown_fields, rename_all = "lowercase")]
pub struct CommunicationConfig {
// see https://github.com/dtolnay/serde-yaml/issues/298
@@ -303,16 +327,15 @@ pub struct CommunicationConfig {
with = "serde_yaml::with::singleton_map",
rename = "_unstable_local"
)]
#[schemars(with = "String")]
pub local: LocalCommunicationConfig,
#[serde(
default,
with = "serde_yaml::with::singleton_map",
rename = "_unstable_remote"
)]
#[schemars(with = "String")]
pub remote: RemoteCommunicationConfig,

// deprecated
pub zenoh: Option<serde_yaml::Value>,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]


+ 38
- 16
libraries/core/src/descriptor/mod.rs View File

@@ -2,6 +2,7 @@ use crate::config::{
CommunicationConfig, DataId, Input, InputMapping, NodeId, NodeRunConfig, OperatorId,
};
use eyre::{bail, eyre, Context, Result};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_with_expand_env::with_expand_envs;
use std::{
@@ -12,19 +13,19 @@ use std::{
};
use tracing::warn;
pub use visualize::collect_dora_timers;

mod validate;
mod visualize;
pub const SHELL_SOURCE: &str = "shell";

/// Dataflow description
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(deny_unknown_fields)]
#[schemars(title = "dora-rs specification")]
pub struct Descriptor {
#[schemars(skip)]
#[serde(default)]
pub communication: CommunicationConfig,
// deprecated
pub daemon_config: Option<serde_yaml::Value>,
#[schemars(skip)]
#[serde(default, rename = "_unstable_deploy")]
pub deploy: Deploy,
pub nodes: Vec<Node>,
@@ -122,19 +123,26 @@ impl Descriptor {
}
}

#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)]
#[serde(deny_unknown_fields)]
pub struct Deploy {
pub machine: Option<String>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
/// Dora Node
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub struct Node {
/// Node identifier
pub id: NodeId,
/// Node name
pub name: Option<String>,
/// Description of the node
pub description: Option<String>,
/// Environment variables
pub env: Option<BTreeMap<String, EnvValue>>,

/// Unstable machine deployment configuration
#[schemars(skip)]
#[serde(default, rename = "_unstable_deploy")]
pub deploy: Deploy,

@@ -142,7 +150,7 @@ pub struct Node {
pub kind: NodeKind,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "lowercase")]
pub enum NodeKind {
/// Dora runtime node
@@ -217,20 +225,20 @@ pub enum CoreNodeKind {
Custom(CustomNode),
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(transparent)]
pub struct RuntimeNode {
pub operators: Vec<OperatorDefinition>,
}

#[derive(Debug, Serialize, Deserialize, Clone)]
#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)]
pub struct OperatorDefinition {
pub id: OperatorId,
#[serde(flatten)]
pub config: OperatorConfig,
}

#[derive(Debug, Serialize, Deserialize, Clone)]
#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)]
pub struct SingleOperatorDefinition {
/// ID is optional if there is only a single operator.
pub id: Option<OperatorId>,
@@ -238,7 +246,7 @@ pub struct SingleOperatorDefinition {
pub config: OperatorConfig,
}

#[derive(Debug, Serialize, Deserialize, Clone)]
#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)]
pub struct OperatorConfig {
pub name: Option<String>,
pub description: Option<String>,
@@ -257,14 +265,15 @@ pub struct OperatorConfig {
pub send_stdout_as: Option<String>,
}

#[derive(Debug, Serialize, Deserialize, Clone)]
#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)]
#[serde(rename_all = "kebab-case")]
pub enum OperatorSource {
SharedLibrary(String),
Python(PythonSource),
#[schemars(skip)]
Wasm(String),
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
#[serde(
deny_unknown_fields,
from = "PythonSourceDef",
@@ -275,7 +284,7 @@ pub struct PythonSource {
pub conda_env: Option<String>,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
#[serde(untagged)]
pub enum PythonSourceDef {
SourceOnly(String),
@@ -342,14 +351,27 @@ pub struct PythonOperatorConfig {
pub outputs: BTreeSet<DataId>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub struct CustomNode {
/// Path of the source code
///
/// If you want to use a specific `conda` environment.
/// Provide the python path within the source.
///
/// source: /home/peter/miniconda3/bin/python
///
/// args: some_node.py
///
/// Source can match any executable in PATH.
pub source: String,
/// Args for the executable.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub args: Option<String>,
/// Environment variables for the custom nodes
pub envs: Option<BTreeMap<String, EnvValue>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub build: Option<String>,
/// Send stdout and stderr to another node
#[serde(skip_serializing_if = "Option::is_none")]
pub send_stdout_as: Option<String>,

@@ -357,7 +379,7 @@ pub struct CustomNode {
pub run_config: NodeRunConfig,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(untagged)]
pub enum EnvValue {
#[serde(deserialize_with = "with_expand_envs")]


+ 0
- 7
libraries/core/src/descriptor/validate.rs View File

@@ -13,13 +13,6 @@ use super::{resolve_path, Descriptor, SHELL_SOURCE};
const VERSION: &str = env!("CARGO_PKG_VERSION");

pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path) -> eyre::Result<()> {
if dataflow.daemon_config.is_some() {
tracing::warn!("ignoring deprecated `daemon_config` key in dataflow config");
}
if dataflow.communication.zenoh.is_some() {
tracing::warn!("ignoring deprecated `communication.zenoh` key in dataflow config");
}

let nodes = dataflow.resolve_aliases_and_set_defaults();
let mut has_python_operator = false;



+ 1
- 1
libraries/extensions/download/Cargo.toml View File

@@ -10,7 +10,7 @@ license.workspace = true

[dependencies]
eyre = "0.6.8"
reqwest = { version = "0.11.12", default-features = false, features = [
reqwest = { version = "0.12.4", default-features = false, features = [
"rustls-tls",
] }
tokio = { version = "1.24.2", features = ["fs"] }


Loading…
Cancel
Save