Browse Source

Merge branch 'main' into cli-refactor

tags/v0.3.12-fix
sjfhsjfh 7 months ago
parent
commit
bd76e2a665
No known key found for this signature in database GPG Key ID: CA820EAE1C115AED
78 changed files with 3955 additions and 1268 deletions
  1. +20
    -7
      .github/workflows/ci.yml
  2. +1
    -0
      .github/workflows/pip-release.yml
  3. +2
    -2
      .gitignore
  4. +203
    -258
      Cargo.lock
  5. +9
    -1
      Cargo.toml
  6. +5
    -2
      apis/python/node/Cargo.toml
  7. +3
    -0
      apis/python/node/build.rs
  8. +8
    -0
      apis/python/node/pyproject.toml
  9. +2
    -15
      apis/python/node/src/lib.rs
  10. +1
    -1
      apis/python/operator/Cargo.toml
  11. +1
    -1
      apis/rust/node/Cargo.toml
  12. +12
    -6
      apis/rust/node/src/node/mod.rs
  13. +7
    -2
      binaries/cli/Cargo.toml
  14. +1
    -0
      binaries/cli/build.rs
  15. +10
    -2
      binaries/cli/pyproject.toml
  16. +24
    -18
      binaries/cli/src/commands/daemon.rs
  17. +24
    -3
      binaries/cli/src/commands/run.rs
  18. +0
    -127
      binaries/cli/src/commands/start.rs
  19. +5
    -35
      binaries/cli/src/commands/start/attach.rs
  20. +200
    -0
      binaries/cli/src/commands/start/mod.rs
  21. +49
    -3
      binaries/cli/src/common.rs
  22. +2
    -1
      binaries/cli/src/lib.rs
  23. +62
    -0
      binaries/cli/src/output.rs
  24. +98
    -0
      binaries/cli/src/session.rs
  25. +4
    -4
      binaries/cli/src/template/c/cmake-template.txt
  26. +4
    -4
      binaries/cli/src/template/cxx/cmake-template.txt
  27. +30
    -4
      binaries/coordinator/src/control.rs
  28. +449
    -34
      binaries/coordinator/src/lib.rs
  29. +23
    -0
      binaries/coordinator/src/listener.rs
  30. +8
    -2
      binaries/coordinator/src/log_subscriber.rs
  31. +17
    -8
      binaries/coordinator/src/run/mod.rs
  32. +6
    -2
      binaries/daemon/Cargo.toml
  33. +690
    -98
      binaries/daemon/src/lib.rs
  34. +270
    -56
      binaries/daemon/src/log.rs
  35. +4
    -0
      binaries/daemon/src/pending.rs
  36. +616
    -508
      binaries/daemon/src/spawn.rs
  37. +1
    -1
      binaries/runtime/Cargo.toml
  38. +2
    -1
      binaries/runtime/src/lib.rs
  39. +2
    -0
      examples/c++-arrow-dataflow/run.rs
  40. +1
    -0
      examples/c++-dataflow/.gitignore
  41. +2
    -0
      examples/c++-dataflow/run.rs
  42. +1
    -0
      examples/c++-ros2-dataflow/.gitignore
  43. +2
    -0
      examples/c++-ros2-dataflow/run.rs
  44. +7
    -0
      examples/c-dataflow/run.rs
  45. +2
    -0
      examples/camera/run.rs
  46. +1
    -0
      examples/cmake-dataflow/run.rs
  47. +32
    -5
      examples/multiple-daemons/run.rs
  48. +2
    -0
      examples/python-dataflow/run.rs
  49. +2
    -0
      examples/python-multi-env/run.rs
  50. +2
    -0
      examples/python-operator-dataflow/run.rs
  51. +2
    -0
      examples/python-ros2-dataflow/run.rs
  52. +2
    -0
      examples/rerun-viewer/run.rs
  53. +4
    -0
      examples/rust-dataflow-git/.gitignore
  54. +7
    -0
      examples/rust-dataflow-git/README.md
  55. +29
    -0
      examples/rust-dataflow-git/dataflow.yml
  56. +53
    -0
      examples/rust-dataflow-git/run.rs
  57. +1
    -0
      examples/rust-dataflow-url/.gitignore
  58. +2
    -0
      examples/rust-dataflow-url/run.rs
  59. +2
    -1
      examples/rust-dataflow/run.rs
  60. +2
    -0
      examples/rust-ros2-dataflow/run.rs
  61. +2
    -0
      examples/vlm/run.rs
  62. +10
    -2
      libraries/core/Cargo.toml
  63. +83
    -0
      libraries/core/src/build/build_command.rs
  64. +374
    -0
      libraries/core/src/build/git.rs
  65. +19
    -0
      libraries/core/src/build/logger.rs
  66. +148
    -0
      libraries/core/src/build/mod.rs
  67. +37
    -9
      libraries/core/src/descriptor/mod.rs
  68. +29
    -16
      libraries/core/src/descriptor/validate.rs
  69. +2
    -0
      libraries/core/src/lib.rs
  70. +2
    -2
      libraries/message/Cargo.toml
  71. +40
    -5
      libraries/message/src/cli_to_coordinator.rs
  72. +27
    -3
      libraries/message/src/common.rs
  73. +29
    -5
      libraries/message/src/coordinator_to_cli.rs
  74. +32
    -3
      libraries/message/src/coordinator_to_daemon.rs
  75. +13
    -2
      libraries/message/src/daemon_to_coordinator.rs
  76. +2
    -2
      libraries/message/src/daemon_to_node.rs
  77. +40
    -7
      libraries/message/src/descriptor.rs
  78. +33
    -0
      libraries/message/src/lib.rs

+ 20
- 7
.github/workflows/ci.yml View File

@@ -118,6 +118,9 @@ jobs:
- name: "Rust Dataflow example"
timeout-minutes: 30
run: cargo run --example rust-dataflow
- name: "Rust Git Dataflow example"
timeout-minutes: 30
run: cargo run --example rust-dataflow-git
- name: "Multiple Daemons example"
timeout-minutes: 30
run: cargo run --example multiple-daemons
@@ -209,11 +212,11 @@ jobs:
source /opt/ros/humble/setup.bash && ros2 run turtlesim turtlesim_node &
source /opt/ros/humble/setup.bash && ros2 run examples_rclcpp_minimal_service service_main &
cargo run --example rust-ros2-dataflow --features="ros2-examples"
- uses: actions/setup-python@v2
- uses: actions/setup-python@v5
if: runner.os != 'Windows'
with:
python-version: "3.8"
- uses: actions/setup-python@v2
- uses: actions/setup-python@v5
if: runner.os == 'Windows'
with:
python-version: "3.10"
@@ -321,7 +324,7 @@ jobs:
dora stop --name ci-rust-dynamic --grace-duration 5s
dora destroy

- uses: actions/setup-python@v2
- uses: actions/setup-python@v5
with:
# TODO: Support Python 3.13 when https://github.com/pytorch/pytorch/issues/130249 is fixed
python-version: "3.12"
@@ -339,35 +342,42 @@ jobs:
# Test Python template Project
dora new test_python_project --lang python --internal-create-with-path-dependencies
cd test_python_project
uv venv --seed -p 3.11
uv venv --seed -p 3.12
uv pip install -e ../apis/python/node
dora build dataflow.yml --uv
uv pip install ruff pytest

echo "Running dora up"
dora up
echo "Running dora build"
dora build dataflow.yml --uv

# Check Compliancy
uv run ruff check .
uv run pytest

export OPERATING_MODE=SAVE
dora up
echo "Running dora list"
dora list
dora build dataflow.yml --uv
echo "Running CI Python Test"
dora start dataflow.yml --name ci-python-test --detach --uv
sleep 10
echo "Running dora stop"
dora stop --name ci-python-test --grace-duration 5s
dora destroy
sleep 5

cd ..

# Run Python Node Example
echo "Running Python Node Example"
dora up
uv venv --seed -p 3.11
uv venv --seed -p 3.12
uv pip install -e apis/python/node
dora build examples/python-dataflow/dataflow.yml --uv
dora start examples/python-dataflow/dataflow.yml --name ci-python --detach --uv
sleep 10
echo "Running dora stop"
dora stop --name ci-python --grace-duration 30s

# Run Python Dynamic Node Example
@@ -376,15 +386,18 @@ jobs:
dora start examples/python-dataflow/dataflow_dynamic.yml --name ci-python-dynamic --detach --uv
uv run opencv-plot --name plot
sleep 10
echo "Running dora stop"
dora stop --name ci-python-dynamic --grace-duration 30s

# Run Python Operator Example
echo "Running CI Operator Test"
dora start examples/python-operator-dataflow/dataflow.yml --name ci-python-operator --detach --uv
sleep 10
echo "Running dora stop"
dora stop --name ci-python-operator --grace-duration 30s

dora destroy
sleep 5

# Run Python queue latency test
echo "Running CI Queue Latency Test"


+ 1
- 0
.github/workflows/pip-release.yml View File

@@ -66,6 +66,7 @@ jobs:
args: --release --out dist --zig
manylinux: manylinux_2_28
working-directory: ${{ matrix.repository.path }}
before-script-linux: sudo apt-get install libatomic1-i386-cross libatomic1-armhf-cross && mkdir -p $HOME/.rustup/toolchains/1.84-x86_64-unknown-linux-gnu/lib/rustlib/i686-unknown-linux-gnu/lib/ && ln -s /usr/i686-linux-gnu/lib/libatomic.so.1 $HOME/.rustup/toolchains/1.84-x86_64-unknown-linux-gnu/lib/rustlib/i686-unknown-linux-gnu/lib/libatomic.so && ln -s /usr/i686-linux-gnu/lib/libatomic.so.1 $HOME/.rustup/toolchains/1.84-x86_64-unknown-linux-gnu/lib/rustlib/i686-unknown-linux-gnu/lib/libatomic.so.1 && ln -s /usr/i686-linux-gnu/lib/libatomic.so.1 /opt/hostedtoolcache/Python/3.8.18/x64/lib/libatomic.so.1 && mkdir -p $HOME/.rustup/toolchains/1.84-x86_64-unknown-linux-gnu/lib/rustlib/armv7-unknown-linux-gnueabihf/lib/ && ln -s /usr/arm-linux-gnueabihf/lib/libatomic.so.1 $HOME/.rustup/toolchains/1.84-x86_64-unknown-linux-gnu/lib/rustlib/armv7-unknown-linux-gnueabihf/lib/libatomic.so
- name: Upload wheels
if: github.event_name == 'release'
uses: actions/upload-artifact@v4


+ 2
- 2
.gitignore View File

@@ -35,7 +35,7 @@ __pycache__/

# Distribution / packaging
.Python
build/
/build/
develop-eggs/
dist/
downloads/
@@ -180,4 +180,4 @@ out/
#Miscellaneous
yolo.yml

~*
~*

+ 203
- 258
Cargo.lock
File diff suppressed because it is too large
View File


+ 9
- 1
Cargo.toml View File

@@ -72,6 +72,7 @@ dora-metrics = { version = "0.3.11", path = "libraries/extensions/telemetry/metr
dora-download = { version = "0.3.11", path = "libraries/extensions/download" }
shared-memory-server = { version = "0.3.11", path = "libraries/shared-memory-server" }
communication-layer-request-reply = { version = "0.3.11", path = "libraries/communication-layer/request-reply" }
dora-cli = { version = "0.3.11", path = "binaries/cli" }
dora-runtime = { version = "0.3.11", path = "binaries/runtime" }
dora-daemon = { version = "0.3.11", path = "binaries/daemon" }
dora-coordinator = { version = "0.3.11", path = "binaries/coordinator" }
@@ -79,7 +80,7 @@ dora-ros2-bridge = { version = "0.3.11", path = "libraries/extensions/ros2-bridg
dora-ros2-bridge-msg-gen = { version = "0.3.11", path = "libraries/extensions/ros2-bridge/msg-gen" }
dora-ros2-bridge-python = { path = "libraries/extensions/ros2-bridge/python" }
# versioned independently from the other dora crates
dora-message = { version = "0.4.4", path = "libraries/message" }
dora-message = { version = "0.5.0-alpha", path = "libraries/message" }
arrow = { version = "54.2.1" }
arrow-schema = { version = "54.2.1" }
arrow-data = { version = "54.2.1" }
@@ -91,6 +92,8 @@ pyo3 = { version = "0.23", features = [
"multiple-pymethods",
] }
pythonize = "0.23"
git2 = { version = "0.18.0", features = ["vendored-openssl"] }
serde_yaml = "0.9.33"

[package]
name = "dora-examples"
@@ -107,6 +110,7 @@ ros2-examples = []
[dev-dependencies]
eyre = "0.6.8"
tokio = "1.24.2"
dora-cli = { workspace = true }
dora-coordinator = { workspace = true }
dora-core = { workspace = true }
dora-message = { workspace = true }
@@ -135,6 +139,10 @@ path = "examples/vlm/run.rs"
name = "rust-dataflow"
path = "examples/rust-dataflow/run.rs"

[[example]]
name = "rust-dataflow-git"
path = "examples/rust-dataflow-git/run.rs"

[[example]]
name = "rust-ros2-dataflow"
path = "examples/rust-ros2-dataflow/run.rs"


+ 5
- 2
apis/python/node/Cargo.toml View File

@@ -21,10 +21,10 @@ dora-node-api = { workspace = true }
dora-operator-api-python = { workspace = true }
pyo3.workspace = true
eyre = "0.6"
serde_yaml = "0.8.23"
serde_yaml = { workspace = true }
flume = "0.10.14"
dora-runtime = { workspace = true, features = ["tracing", "metrics", "python"] }
dora-daemon = { workspace = true }
dora-cli = { workspace = true }
dora-download = { workspace = true }
arrow = { workspace = true, features = ["pyarrow"] }
pythonize = { workspace = true }
@@ -33,6 +33,9 @@ dora-ros2-bridge-python = { workspace = true }
# pyo3_special_method_derive = "0.4.2"
tokio = { version = "1.24.2", features = ["rt"] }

[build-dependencies]
pyo3-build-config = "0.23"

[lib]
name = "dora"
crate-type = ["cdylib"]

+ 3
- 0
apis/python/node/build.rs View File

@@ -0,0 +1,3 @@
fn main() {
pyo3_build_config::add_extension_module_link_args();
}

+ 8
- 0
apis/python/node/pyproject.toml View File

@@ -22,3 +22,11 @@ extend-select = [
"D", # pydocstyle
"UP",
]

[tool.maturin.target.x86_64-apple-darwin]
# macOS deployment target SDK version
macos-deployment-target = "14.5"

[tool.maturin.target.aarch64-apple-darwin]
# macOS deployment target SDK version
macos-deployment-target = "14.5"

+ 2
- 15
apis/python/node/src/lib.rs View File

@@ -6,7 +6,6 @@ use std::sync::Arc;
use std::time::Duration;

use arrow::pyarrow::{FromPyArrow, ToPyArrow};
use dora_daemon::Daemon;
use dora_download::download_file;
use dora_node_api::dora_core::config::NodeId;
use dora_node_api::dora_core::descriptor::source_is_url;
@@ -231,7 +230,7 @@ impl Node {
/// :rtype: dict
pub fn dataflow_descriptor(&mut self, py: Python) -> eyre::Result<PyObject> {
Ok(
pythonize::pythonize(py, &self.node.get_mut().dataflow_descriptor())
pythonize::pythonize(py, &self.node.get_mut().dataflow_descriptor()?)
.map(|x| x.unbind())?,
)
}
@@ -382,19 +381,7 @@ pub fn resolve_dataflow(dataflow: String) -> eyre::Result<PathBuf> {
#[pyfunction]
#[pyo3(signature = (dataflow_path, uv=None))]
pub fn run(dataflow_path: String, uv: Option<bool>) -> eyre::Result<()> {
let dataflow_path = resolve_dataflow(dataflow_path).context("could not resolve dataflow")?;
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.context("tokio runtime failed")?;
let result = rt.block_on(Daemon::run_dataflow(&dataflow_path, uv.unwrap_or_default()))?;
match result.is_ok() {
true => Ok(()),
false => Err(eyre::eyre!(
"Dataflow failed to run with error: {:?}",
result.node_results
)),
}
dora_cli::command::run(dataflow_path, uv.unwrap_or_default())
}

#[pymodule]


+ 1
- 1
apis/python/operator/Cargo.toml View File

@@ -14,7 +14,7 @@ repository.workspace = true
dora-node-api = { workspace = true }
pyo3 = { workspace = true, features = ["eyre", "abi3-py37"] }
eyre = "0.6"
serde_yaml = "0.8.23"
serde_yaml = { workspace = true }
flume = "0.10.14"
arrow = { workspace = true, features = ["pyarrow"] }
arrow-schema = { workspace = true }


+ 1
- 1
apis/rust/node/Cargo.toml View File

@@ -17,7 +17,7 @@ dora-core = { workspace = true }
dora-message = { workspace = true }
shared-memory-server = { workspace = true }
eyre = "0.6.7"
serde_yaml = "0.8.23"
serde_yaml = { workspace = true }
tracing = "0.1.33"
flume = "0.10.14"
bincode = "1.3.3"


+ 12
- 6
apis/rust/node/src/node/mod.rs View File

@@ -60,7 +60,7 @@ pub struct DoraNode {
drop_stream: DropStream,
cache: VecDeque<ShmemHandle>,

dataflow_descriptor: Descriptor,
dataflow_descriptor: serde_yaml::Result<Descriptor>,
warned_unknown_output: BTreeSet<DataId>,
_rt: TokioRuntime,
}
@@ -158,10 +158,9 @@ impl DoraNode {
),
};

let id = format!("{}/{}", dataflow_id, node_id);

#[cfg(feature = "metrics")]
{
let id = format!("{}/{}", dataflow_id, node_id);
let monitor_task = async move {
if let Err(e) = run_metrics_monitor(id.clone())
.await
@@ -200,7 +199,7 @@ impl DoraNode {
sent_out_shared_memory: HashMap::new(),
drop_stream,
cache: VecDeque::new(),
dataflow_descriptor,
dataflow_descriptor: serde_yaml::from_value(dataflow_descriptor),
warned_unknown_output: BTreeSet::new(),
_rt: rt,
};
@@ -449,8 +448,15 @@ impl DoraNode {
/// Returns the full dataflow descriptor that this node is part of.
///
/// This method returns the parsed dataflow YAML file.
pub fn dataflow_descriptor(&self) -> &Descriptor {
&self.dataflow_descriptor
pub fn dataflow_descriptor(&self) -> eyre::Result<&Descriptor> {
match &self.dataflow_descriptor {
Ok(d) => Ok(d),
Err(err) => eyre::bail!(
"failed to parse dataflow descriptor: {err}\n\n
This might be caused by mismatched version numbers of dora \
daemon and the dora node API"
),
}
}
}



+ 7
- 2
binaries/cli/Cargo.toml View File

@@ -27,7 +27,7 @@ dora-node-api-c = { workspace = true }
dora-operator-api-c = { workspace = true }
dora-download = { workspace = true }
serde = { version = "1.0.136", features = ["derive"] }
serde_yaml = "0.9.11"
serde_yaml = { workspace = true }
webbrowser = "0.8.3"
serde_json = "1.0.86"
termcolor = "1.1.3"
@@ -37,6 +37,7 @@ communication-layer-request-reply = { workspace = true }
notify = "5.1.0"
ctrlc = "3.2.5"
tracing = "0.1.36"
tracing-log = "0.2.0"
dora-tracing = { workspace = true, optional = true }
bat = "0.24.0"
dora-daemon = { workspace = true }
@@ -50,7 +51,7 @@ tabwriter = "1.4.0"
log = { version = "0.4.21", features = ["serde"] }
colored = "2.1.0"
env_logger = "0.11.3"
self_update = { version = "0.27.0", features = [
self_update = { version = "0.42.0", features = [
"rustls",
"archive-zip",
"archive-tar",
@@ -61,7 +62,11 @@ pyo3 = { workspace = true, features = [
"abi3",
], optional = true }
self-replace = "1.5.0"
dunce = "1.0.5"
git2 = { workspace = true }

[build-dependencies]
pyo3-build-config = "0.23"

[lib]
name = "dora_cli"


+ 1
- 0
binaries/cli/build.rs View File

@@ -1,4 +1,5 @@
fn main() {
pyo3_build_config::add_extension_module_link_args();
println!(
"cargo:rustc-env=TARGET={}",
std::env::var("TARGET").unwrap()


+ 10
- 2
binaries/cli/pyproject.toml View File

@@ -15,6 +15,14 @@ features = ["python", "pyo3/extension-module"]

[tool.ruff.lint]
extend-select = [
"D", # pydocstyle
"UP"
"D", # pydocstyle
"UP",
]

[tool.maturin.target.x86_64-apple-darwin]
# macOS deployment target SDK version
macos-deployment-target = "14.5"

[tool.maturin.target.aarch64-apple-darwin]
# macOS deployment target SDK version
macos-deployment-target = "14.5"

+ 24
- 18
binaries/cli/src/commands/daemon.rs View File

@@ -1,9 +1,10 @@
use super::Executable;
use crate::common::handle_dataflow_result;
use crate::{common::handle_dataflow_result, session::DataflowSession};
use dora_core::topics::{
DORA_COORDINATOR_PORT_DEFAULT, DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, LOCALHOST,
};

use dora_daemon::LogDestination;
#[cfg(feature = "tracing")]
use dora_tracing::TracingBuilder;

@@ -62,24 +63,29 @@ impl Executable for Daemon {
.build()
.context("tokio runtime failed")?;
rt.block_on(async {
match self.run_dataflow {
Some(dataflow_path) => {
tracing::info!("Starting dataflow `{}`", dataflow_path.display());
if self.coordinator_addr != LOCALHOST {
tracing::info!(
"Not using coordinator addr {} as `run_dataflow` is for local dataflow only. Please use the `start` command for remote coordinator",
self.coordinator_addr
);
}
match self.run_dataflow {
Some(dataflow_path) => {
tracing::info!("Starting dataflow `{}`", dataflow_path.display());
if self.coordinator_addr != LOCALHOST {
tracing::info!(
"Not using coordinator addr {} as `run_dataflow` is for local dataflow only. Please use the `start` command for remote coordinator",
self.coordinator_addr
);
}
let dataflow_session =
DataflowSession::read_session(&dataflow_path).context("failed to read DataflowSession")?;

let result = dora_daemon::Daemon::run_dataflow(&dataflow_path, false).await?;
handle_dataflow_result(result, None)
}
None => {
dora_daemon::Daemon::run(SocketAddr::new(self.coordinator_addr, self.coordinator_port), self.machine_id, self.local_listen_port).await
let result = dora_daemon::Daemon::run_dataflow(&dataflow_path,
dataflow_session.build_id, dataflow_session.local_build, dataflow_session.session_id, false,
LogDestination::Tracing,
).await?;
handle_dataflow_result(result, None)
}
None => {
dora_daemon::Daemon::run(SocketAddr::new(self.coordinator_addr, self.coordinator_port), self.machine_id, self.local_listen_port).await
}
}
}
})
.context("failed to run dora-daemon")
})
.context("failed to run dora-daemon")
}
}

+ 24
- 3
binaries/cli/src/commands/run.rs View File

@@ -1,6 +1,10 @@
use super::Executable;
use crate::common::{handle_dataflow_result, resolve_dataflow};
use dora_daemon::Daemon;
use crate::{
common::{handle_dataflow_result, resolve_dataflow},
output::print_log_message,
session::DataflowSession,
};
use dora_daemon::{flume, Daemon, LogDestination};
use dora_tracing::TracingBuilder;
use eyre::Context;
use tokio::runtime::Builder;
@@ -32,11 +36,28 @@ impl Executable for Run {

let dataflow_path =
resolve_dataflow(self.dataflow).context("could not resolve dataflow")?;
let dataflow_session = DataflowSession::read_session(&dataflow_path)
.context("failed to read DataflowSession")?;
let rt = Builder::new_multi_thread()
.enable_all()
.build()
.context("tokio runtime failed")?;
let result = rt.block_on(Daemon::run_dataflow(&dataflow_path, self.uv))?;

let (log_tx, log_rx) = flume::bounded(100);
std::thread::spawn(move || {
for message in log_rx {
print_log_message(message, false, false);
}
});

let result = rt.block_on(Daemon::run_dataflow(
&dataflow_path,
dataflow_session.build_id,
dataflow_session.local_build,
dataflow_session.session_id,
self.uv,
LogDestination::Channel { sender: log_tx },
))?;
handle_dataflow_result(result, None)
}
}

+ 0
- 127
binaries/cli/src/commands/start.rs View File

@@ -1,127 +0,0 @@
use super::{default_tracing, Executable};
use crate::{
attach::attach_dataflow,
common::{connect_to_coordinator, resolve_dataflow},
};
use communication_layer_request_reply::TcpRequestReplyConnection;
use dora_core::{
descriptor::{Descriptor, DescriptorExt},
topics::{DORA_COORDINATOR_PORT_CONTROL_DEFAULT, LOCALHOST},
};
use dora_message::{cli_to_coordinator::ControlRequest, coordinator_to_cli::ControlRequestReply};
use eyre::{bail, Context};
use std::{net::IpAddr, path::PathBuf};
use uuid::Uuid;

#[derive(Debug, clap::Args)]
/// Start the given dataflow path. Attach a name to the running dataflow by using --name.
pub struct Start {
/// Path to the dataflow descriptor file
#[clap(value_name = "PATH")]
dataflow: String,
/// Assign a name to the dataflow
#[clap(long)]
name: Option<String>,
/// Address of the dora coordinator
#[clap(long, value_name = "IP", default_value_t = LOCALHOST)]
coordinator_addr: IpAddr,
/// Port number of the coordinator control server
#[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)]
coordinator_port: u16,
/// Attach to the dataflow and wait for its completion
#[clap(long, action)]
attach: bool,
/// Run the dataflow in background
#[clap(long, action)]
detach: bool,
/// Enable hot reloading (Python only)
#[clap(long, action)]
hot_reload: bool,
// Use UV to run nodes.
#[clap(long, action)]
uv: bool,
}

impl Executable for Start {
fn execute(self) -> eyre::Result<()> {
default_tracing()?;
let dataflow = resolve_dataflow(self.dataflow).context("could not resolve dataflow")?;
let dataflow_descriptor =
Descriptor::blocking_read(&dataflow).wrap_err("Failed to read yaml dataflow")?;
let working_dir = dataflow
.canonicalize()
.context("failed to canonicalize dataflow path")?
.parent()
.ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))?
.to_owned();

let coordinator_socket = (self.coordinator_addr, self.coordinator_port).into();
let mut session = connect_to_coordinator(coordinator_socket)
.wrap_err("failed to connect to dora coordinator")?;
let dataflow_id = start_dataflow(
dataflow_descriptor.clone(),
self.name,
working_dir,
&mut *session,
self.uv,
)?;

let attach = match (self.attach, self.detach) {
(true, true) => eyre::bail!("both `--attach` and `--detach` are given"),
(true, false) => true,
(false, true) => false,
(false, false) => {
println!("attaching to dataflow (use `--detach` to run in background)");
true
}
};

if attach {
attach_dataflow(
dataflow_descriptor,
dataflow,
dataflow_id,
&mut *session,
self.hot_reload,
coordinator_socket,
env_logger::Builder::new()
.filter_level(log::LevelFilter::Info)
.parse_default_env()
.build()
.filter(),
)?;
}
Ok(())
}
}

fn start_dataflow(
dataflow: Descriptor,
name: Option<String>,
local_working_dir: PathBuf,
session: &mut TcpRequestReplyConnection,
uv: bool,
) -> Result<Uuid, eyre::ErrReport> {
let reply_raw = session
.request(
&serde_json::to_vec(&ControlRequest::Start {
dataflow,
name,
local_working_dir,
uv,
})
.unwrap(),
)
.wrap_err("failed to send start dataflow message")?;

let result: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match result {
ControlRequestReply::DataflowStarted { uuid } => {
eprintln!("{uuid}");
Ok(uuid)
}
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected start dataflow reply: {other:?}"),
}
}

binaries/cli/src/attach.rs → binaries/cli/src/commands/start/attach.rs View File

@@ -1,4 +1,3 @@
use colored::Colorize;
use communication_layer_request_reply::{TcpConnection, TcpRequestReplyConnection};
use dora_core::descriptor::{resolve_path, CoreNodeKind, Descriptor, DescriptorExt};
use dora_message::cli_to_coordinator::ControlRequest;
@@ -16,6 +15,7 @@ use tracing::{error, info};
use uuid::Uuid;

use crate::common::handle_dataflow_result;
use crate::output::print_log_message;

pub fn attach_dataflow(
dataflow: Descriptor,
@@ -33,6 +33,8 @@ pub fn attach_dataflow(

let nodes = dataflow.resolve_aliases_and_set_defaults()?;

let print_daemon_name = nodes.values().any(|n| n.deploy.is_some());

let working_dir = dataflow_path
.canonicalize()
.context("failed to canonicalize dataflow path")?
@@ -155,39 +157,7 @@ pub fn attach_dataflow(
},
Ok(AttachEvent::Control(control_request)) => control_request,
Ok(AttachEvent::Log(Ok(log_message))) => {
let LogMessage {
dataflow_id,
node_id,
daemon_id,
level,
target,
module_path: _,
file: _,
line: _,
message,
} = log_message;
let level = match level {
log::Level::Error => "ERROR".red(),
log::Level::Warn => "WARN ".yellow(),
log::Level::Info => "INFO ".green(),
other => format!("{other:5}").normal(),
};
let dataflow = format!(" dataflow `{dataflow_id}`").cyan();
let daemon = match daemon_id {
Some(id) => format!(" on daemon `{id}`"),
None => " on default daemon".to_string(),
}
.bright_black();
let node = match node_id {
Some(node_id) => format!(" {node_id}").bold(),
None => "".normal(),
};
let target = match target {
Some(target) => format!(" {target}").dimmed(),
None => "".normal(),
};

println!("{level}{dataflow}{daemon}{node}{target}: {message}");
print_log_message(log_message, false, print_daemon_name);
continue;
}
Ok(AttachEvent::Log(Err(err))) => {
@@ -202,7 +172,7 @@ pub fn attach_dataflow(
let result: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match result {
ControlRequestReply::DataflowStarted { uuid: _ } => (),
ControlRequestReply::DataflowSpawned { uuid: _ } => (),
ControlRequestReply::DataflowStopped { uuid, result } => {
info!("dataflow {uuid} stopped");
break handle_dataflow_result(result, Some(uuid));

+ 200
- 0
binaries/cli/src/commands/start/mod.rs View File

@@ -0,0 +1,200 @@
use super::{default_tracing, Executable};
use crate::{
commands::start::attach::attach_dataflow,
common::{connect_to_coordinator, local_working_dir, resolve_dataflow},
output::print_log_message,
session::DataflowSession,
};
use communication_layer_request_reply::{TcpConnection, TcpRequestReplyConnection};
use dora_core::{
descriptor::{Descriptor, DescriptorExt},
topics::{DORA_COORDINATOR_PORT_CONTROL_DEFAULT, LOCALHOST},
};
use dora_message::{
cli_to_coordinator::ControlRequest, common::LogMessage, coordinator_to_cli::ControlRequestReply,
};
use eyre::{bail, Context};
use std::{
net::{IpAddr, SocketAddr, TcpStream},
path::PathBuf,
};
use uuid::Uuid;

mod attach;

#[derive(Debug, clap::Args)]
/// Start the given dataflow path. Attach a name to the running dataflow by using --name.
pub struct Start {
/// Path to the dataflow descriptor file
#[clap(value_name = "PATH")]
dataflow: String,
/// Assign a name to the dataflow
#[clap(long)]
name: Option<String>,
/// Address of the dora coordinator
#[clap(long, value_name = "IP", default_value_t = LOCALHOST)]
coordinator_addr: IpAddr,
/// Port number of the coordinator control server
#[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)]
coordinator_port: u16,
/// Attach to the dataflow and wait for its completion
#[clap(long, action)]
attach: bool,
/// Run the dataflow in background
#[clap(long, action)]
detach: bool,
/// Enable hot reloading (Python only)
#[clap(long, action)]
hot_reload: bool,
// Use UV to run nodes.
#[clap(long, action)]
uv: bool,
}

impl Executable for Start {
fn execute(self) -> eyre::Result<()> {
default_tracing()?;
let coordinator_socket = (self.coordinator_addr, self.coordinator_port).into();

let (dataflow, dataflow_descriptor, mut session, dataflow_id) =
start_dataflow(self.dataflow, self.name, coordinator_socket, self.uv)?;

let attach = match (self.attach, self.detach) {
(true, true) => eyre::bail!("both `--attach` and `--detach` are given"),
(true, false) => true,
(false, true) => false,
(false, false) => {
println!("attaching to dataflow (use `--detach` to run in background)");
true
}
};

if attach {
let log_level = env_logger::Builder::new()
.filter_level(log::LevelFilter::Info)
.parse_default_env()
.build()
.filter();

attach_dataflow(
dataflow_descriptor,
dataflow,
dataflow_id,
&mut *session,
self.hot_reload,
coordinator_socket,
log_level,
)
} else {
let print_daemon_name = dataflow_descriptor.nodes.iter().any(|n| n.deploy.is_some());
// wait until dataflow is started
wait_until_dataflow_started(
dataflow_id,
&mut session,
coordinator_socket,
log::LevelFilter::Info,
print_daemon_name,
)
}
}
}

fn start_dataflow(
dataflow: String,
name: Option<String>,
coordinator_socket: SocketAddr,
uv: bool,
) -> Result<(PathBuf, Descriptor, Box<TcpRequestReplyConnection>, Uuid), eyre::Error> {
let dataflow = resolve_dataflow(dataflow).context("could not resolve dataflow")?;
let dataflow_descriptor =
Descriptor::blocking_read(&dataflow).wrap_err("Failed to read yaml dataflow")?;
let dataflow_session =
DataflowSession::read_session(&dataflow).context("failed to read DataflowSession")?;

let mut session = connect_to_coordinator(coordinator_socket)
.wrap_err("failed to connect to dora coordinator")?;

let local_working_dir = local_working_dir(&dataflow, &dataflow_descriptor, &mut *session)?;

let dataflow_id = {
let dataflow = dataflow_descriptor.clone();
let session: &mut TcpRequestReplyConnection = &mut *session;
let reply_raw = session
.request(
&serde_json::to_vec(&ControlRequest::Start {
build_id: dataflow_session.build_id,
session_id: dataflow_session.session_id,
dataflow,
name,
local_working_dir,
uv,
})
.unwrap(),
)
.wrap_err("failed to send start dataflow message")?;

let result: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match result {
ControlRequestReply::DataflowStartTriggered { uuid } => {
eprintln!("dataflow start triggered: {uuid}");
uuid
}
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected start dataflow reply: {other:?}"),
}
};
Ok((dataflow, dataflow_descriptor, session, dataflow_id))
}

fn wait_until_dataflow_started(
dataflow_id: Uuid,
session: &mut Box<TcpRequestReplyConnection>,
coordinator_addr: SocketAddr,
log_level: log::LevelFilter,
print_daemon_id: bool,
) -> eyre::Result<()> {
// subscribe to log messages
let mut log_session = TcpConnection {
stream: TcpStream::connect(coordinator_addr)
.wrap_err("failed to connect to dora coordinator")?,
};
log_session
.send(
&serde_json::to_vec(&ControlRequest::LogSubscribe {
dataflow_id,
level: log_level,
})
.wrap_err("failed to serialize message")?,
)
.wrap_err("failed to send log subscribe request to coordinator")?;
std::thread::spawn(move || {
while let Ok(raw) = log_session.receive() {
let parsed: eyre::Result<LogMessage> =
serde_json::from_slice(&raw).context("failed to parse log message");
match parsed {
Ok(log_message) => {
print_log_message(log_message, false, print_daemon_id);
}
Err(err) => {
tracing::warn!("failed to parse log message: {err:?}")
}
}
}
});

let reply_raw = session
.request(&serde_json::to_vec(&ControlRequest::WaitForSpawn { dataflow_id }).unwrap())
.wrap_err("failed to send start dataflow message")?;

let result: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match result {
ControlRequestReply::DataflowSpawned { uuid } => {
eprintln!("dataflow started: {uuid}");
}
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected start dataflow reply: {other:?}"),
}
Ok(())
}

+ 49
- 3
binaries/cli/src/common.rs View File

@@ -1,13 +1,17 @@
use crate::formatting::FormatDataflowError;
use communication_layer_request_reply::{RequestReplyLayer, TcpLayer, TcpRequestReplyConnection};
use dora_core::descriptor::source_is_url;
use dora_core::descriptor::{source_is_url, Descriptor};
use dora_download::download_file;
use dora_message::{
cli_to_coordinator::ControlRequest,
coordinator_to_cli::{ControlRequestReply, DataflowList, DataflowResult},
};
use eyre::{bail, Context};
use std::{env::current_dir, net::SocketAddr, path::PathBuf};
use eyre::{bail, Context, ContextCompat};
use std::{
env::current_dir,
net::SocketAddr,
path::{Path, PathBuf},
};
use tokio::runtime::Builder;
use uuid::Uuid;

@@ -67,3 +71,45 @@ pub(crate) fn resolve_dataflow(dataflow: String) -> eyre::Result<PathBuf> {
};
Ok(dataflow)
}

pub(crate) fn local_working_dir(
dataflow_path: &Path,
dataflow_descriptor: &Descriptor,
coordinator_session: &mut TcpRequestReplyConnection,
) -> eyre::Result<Option<PathBuf>> {
Ok(
if dataflow_descriptor
.nodes
.iter()
.all(|n| n.deploy.as_ref().map(|d| d.machine.as_ref()).is_none())
&& cli_and_daemon_on_same_machine(coordinator_session)?
{
Some(
dunce::canonicalize(dataflow_path)
.context("failed to canonicalize dataflow file path")?
.parent()
.context("dataflow path has no parent dir")?
.to_owned(),
)
} else {
None
},
)
}

pub(crate) fn cli_and_daemon_on_same_machine(session: &mut TcpRequestReplyConnection) -> eyre::Result<bool> {
let reply_raw = session
.request(&serde_json::to_vec(&ControlRequest::CliAndDefaultDaemonOnSameMachine).unwrap())
.wrap_err("failed to send start dataflow message")?;

let result: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match result {
ControlRequestReply::CliAndDefaultDaemonIps {
default_daemon,
cli,
} => Ok(default_daemon.is_some() && default_daemon == cli),
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected start dataflow reply: {other:?}"),
}
}

+ 2
- 1
binaries/cli/src/lib.rs View File

@@ -5,10 +5,11 @@ use std::{
path::PathBuf,
};

mod attach;
mod commands;
mod common;
mod formatting;
pub mod output;
pub mod session;
mod template;

const LOCALHOST: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));


+ 62
- 0
binaries/cli/src/output.rs View File

@@ -0,0 +1,62 @@
use colored::Colorize;
use dora_core::build::LogLevelOrStdout;
use dora_message::common::LogMessage;

pub fn print_log_message(
log_message: LogMessage,
print_dataflow_id: bool,
print_daemon_name: bool,
) {
let LogMessage {
build_id: _,
dataflow_id,
node_id,
daemon_id,
level,
target,
module_path: _,
file: _,
line: _,
message,
} = log_message;
let level = match level {
LogLevelOrStdout::LogLevel(level) => match level {
log::Level::Error => "ERROR ".red(),
log::Level::Warn => "WARN ".yellow(),
log::Level::Info => "INFO ".green(),
log::Level::Debug => "DEBUG ".bright_blue(),
log::Level::Trace => "TRACE ".dimmed(),
},
LogLevelOrStdout::Stdout => "stdout".bright_blue().italic().dimmed(),
};

let dataflow = match dataflow_id {
Some(dataflow_id) if print_dataflow_id => format!("dataflow `{dataflow_id}` ").cyan(),
_ => String::new().cyan(),
};
let daemon = match daemon_id {
Some(id) if print_daemon_name => match id.machine_id() {
Some(machine_id) => format!("on daemon `{machine_id}`"),
None => "on default daemon ".to_string(),
},
None if print_daemon_name => "on default daemon".to_string(),
_ => String::new(),
}
.bright_black();
let colon = ":".bright_black().bold();
let node = match node_id {
Some(node_id) => {
let node_id = node_id.to_string().dimmed().bold();
let padding = if daemon.is_empty() { "" } else { " " };
format!("{node_id}{padding}{daemon}{colon} ")
}
None if daemon.is_empty() => "".into(),
None => format!("{daemon}{colon} "),
};
let target = match target {
Some(target) => format!("{target} ").dimmed(),
None => "".normal(),
};

println!("{node}{level} {target}{dataflow} {message}");
}

+ 98
- 0
binaries/cli/src/session.rs View File

@@ -0,0 +1,98 @@
use std::{
collections::BTreeMap,
path::{Path, PathBuf},
};

use dora_core::build::BuildInfo;
use dora_message::{common::GitSource, id::NodeId, BuildId, SessionId};
use eyre::{Context, ContextCompat};

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct DataflowSession {
pub build_id: Option<BuildId>,
pub session_id: SessionId,
pub git_sources: BTreeMap<NodeId, GitSource>,
pub local_build: Option<BuildInfo>,
}

impl Default for DataflowSession {
fn default() -> Self {
Self {
build_id: None,
session_id: SessionId::generate(),
git_sources: Default::default(),
local_build: Default::default(),
}
}
}

impl DataflowSession {
pub fn read_session(dataflow_path: &Path) -> eyre::Result<Self> {
let session_file = session_file_path(dataflow_path)?;
if session_file.exists() {
if let Ok(parsed) = deserialize(&session_file) {
return Ok(parsed);
} else {
tracing::warn!("failed to read dataflow session file, regenerating (you might need to run `dora build` again)");
}
}

let default_session = DataflowSession::default();
default_session.write_out_for_dataflow(dataflow_path)?;
Ok(default_session)
}

pub fn write_out_for_dataflow(&self, dataflow_path: &Path) -> eyre::Result<()> {
let session_file = session_file_path(dataflow_path)?;
let filename = session_file
.file_name()
.context("session file has no file name")?
.to_str()
.context("session file name is no utf8")?;
if let Some(parent) = session_file.parent() {
std::fs::create_dir_all(parent).context("failed to create out dir")?;
}
std::fs::write(&session_file, self.serialize()?)
.context("failed to write dataflow session file")?;
let gitignore = session_file.with_file_name(".gitignore");
if gitignore.exists() {
let existing =
std::fs::read_to_string(&gitignore).context("failed to read gitignore")?;
if !existing
.lines()
.any(|l| l.split_once('/') == Some(("", filename)))
{
let new = existing + &format!("\n/{filename}\n");
std::fs::write(gitignore, new).context("failed to update gitignore")?;
}
} else {
std::fs::write(gitignore, format!("/{filename}\n"))
.context("failed to write gitignore")?;
}
Ok(())
}

fn serialize(&self) -> eyre::Result<String> {
serde_yaml::to_string(&self).context("failed to serialize dataflow session file")
}
}

fn deserialize(session_file: &Path) -> eyre::Result<DataflowSession> {
std::fs::read_to_string(session_file)
.context("failed to read DataflowSession file")
.and_then(|s| {
serde_yaml::from_str(&s).context("failed to deserialize DataflowSession file")
})
}

fn session_file_path(dataflow_path: &Path) -> eyre::Result<PathBuf> {
let file_stem = dataflow_path
.file_stem()
.wrap_err("dataflow path has no file stem")?
.to_str()
.wrap_err("dataflow file stem is not valid utf-8")?;
let session_file = dataflow_path
.with_file_name("out")
.join(format!("{file_stem}.dora-session.yaml"));
Ok(session_file)
}

+ 4
- 4
binaries/cli/src/template/c/cmake-template.txt View File

@@ -64,16 +64,16 @@ link_directories(${dora_link_dirs})
add_executable(talker_1 talker_1/node.c)
add_dependencies(talker_1 Dora_c)
target_include_directories(talker_1 PRIVATE ${dora_c_include_dir})
target_link_libraries(talker_1 dora_node_api_c m)
target_link_libraries(talker_1 dora_node_api_c m z)

add_executable(talker_2 talker_2/node.c)
add_dependencies(talker_2 Dora_c)
target_include_directories(talker_2 PRIVATE ${dora_c_include_dir})
target_link_libraries(talker_2 dora_node_api_c m)
target_link_libraries(talker_2 dora_node_api_c m z)

add_executable(listener_1 listener_1/node.c)
add_dependencies(listener_1 Dora_c)
target_include_directories(listener_1 PRIVATE ${dora_c_include_dir})
target_link_libraries(listener_1 dora_node_api_c m)
target_link_libraries(listener_1 dora_node_api_c m z)

install(TARGETS listener_1 talker_1 talker_2 DESTINATION ${CMAKE_CURRENT_SOURCE_DIR}/bin)
install(TARGETS listener_1 talker_1 talker_2 DESTINATION ${CMAKE_CURRENT_SOURCE_DIR}/bin)

+ 4
- 4
binaries/cli/src/template/cxx/cmake-template.txt View File

@@ -70,16 +70,16 @@ link_directories(${dora_link_dirs})
add_executable(talker_1 talker_1/node.cc ${node_bridge})
add_dependencies(talker_1 Dora_cxx)
target_include_directories(talker_1 PRIVATE ${dora_cxx_include_dir})
target_link_libraries(talker_1 dora_node_api_cxx)
target_link_libraries(talker_1 dora_node_api_cxx z)

add_executable(talker_2 talker_2/node.cc ${node_bridge})
add_dependencies(talker_2 Dora_cxx)
target_include_directories(talker_2 PRIVATE ${dora_cxx_include_dir})
target_link_libraries(talker_2 dora_node_api_cxx)
target_link_libraries(talker_2 dora_node_api_cxx z)

add_executable(listener_1 listener_1/node.cc ${node_bridge})
add_dependencies(listener_1 Dora_cxx)
target_include_directories(listener_1 PRIVATE ${dora_cxx_include_dir})
target_link_libraries(listener_1 dora_node_api_cxx)
target_link_libraries(listener_1 dora_node_api_cxx z)

install(TARGETS listener_1 talker_1 talker_2 DESTINATION ${CMAKE_CURRENT_SOURCE_DIR}/bin)
install(TARGETS listener_1 talker_1 talker_2 DESTINATION ${CMAKE_CURRENT_SOURCE_DIR}/bin)

+ 30
- 4
binaries/coordinator/src/control.rs View File

@@ -2,7 +2,9 @@ use crate::{
tcp_utils::{tcp_receive, tcp_send},
Event,
};
use dora_message::{cli_to_coordinator::ControlRequest, coordinator_to_cli::ControlRequestReply};
use dora_message::{
cli_to_coordinator::ControlRequest, coordinator_to_cli::ControlRequestReply, BuildId,
};
use eyre::{eyre, Context};
use futures::{
future::{self, Either},
@@ -79,6 +81,7 @@ async fn handle_requests(
tx: mpsc::Sender<ControlEvent>,
_finish_tx: mpsc::Sender<()>,
) {
let peer_addr = connection.peer_addr().ok();
loop {
let next_request = tcp_receive(&mut connection).map(Either::Left);
let coordinator_stopped = tx.closed().map(Either::Right);
@@ -114,11 +117,29 @@ async fn handle_requests(
break;
}

let result = match request {
if let Ok(ControlRequest::BuildLogSubscribe { build_id, level }) = request {
let _ = tx
.send(ControlEvent::BuildLogSubscribe {
build_id,
level,
connection,
})
.await;
break;
}

let mut result = match request {
Ok(request) => handle_request(request, &tx).await,
Err(err) => Err(err),
};

if let Ok(ControlRequestReply::CliAndDefaultDaemonIps { cli, .. }) = &mut result {
if cli.is_none() {
// fill cli IP address in reply
*cli = peer_addr.map(|s| s.ip());
}
}

let reply = result.unwrap_or_else(|err| ControlRequestReply::Error(format!("{err:?}")));
let serialized: Vec<u8> =
match serde_json::to_vec(&reply).wrap_err("failed to serialize ControlRequestReply") {
@@ -155,7 +176,7 @@ async fn handle_request(
) -> eyre::Result<ControlRequestReply> {
let (reply_tx, reply_rx) = oneshot::channel();
let event = ControlEvent::IncomingRequest {
request,
request: request.clone(),
reply_sender: reply_tx,
};

@@ -165,7 +186,7 @@ async fn handle_request(

reply_rx
.await
.unwrap_or(Ok(ControlRequestReply::CoordinatorStopped))
.wrap_err_with(|| format!("no coordinator reply to {request:?}"))?
}

#[derive(Debug)]
@@ -179,6 +200,11 @@ pub enum ControlEvent {
level: log::LevelFilter,
connection: TcpStream,
},
BuildLogSubscribe {
build_id: BuildId,
level: log::LevelFilter,
connection: TcpStream,
},
Error(eyre::Report),
}



+ 449
- 34
binaries/coordinator/src/lib.rs View File

@@ -5,22 +5,27 @@ use crate::{
pub use control::ControlEvent;
use dora_core::{
config::{NodeId, OperatorId},
descriptor::DescriptorExt,
uhlc::{self, HLC},
};
use dora_message::{
cli_to_coordinator::ControlRequest,
common::DaemonId,
common::{DaemonId, GitSource},
coordinator_to_cli::{
ControlRequestReply, DataflowIdAndName, DataflowList, DataflowListEntry, DataflowResult,
DataflowStatus, LogLevel, LogMessage,
},
coordinator_to_daemon::{DaemonCoordinatorEvent, RegisterResult, Timestamped},
coordinator_to_daemon::{
BuildDataflowNodes, DaemonCoordinatorEvent, RegisterResult, Timestamped,
},
daemon_to_coordinator::{DaemonCoordinatorReply, DataflowDaemonResult},
descriptor::{Descriptor, ResolvedNode},
BuildId, DataflowId, SessionId,
};
use eyre::{bail, eyre, ContextCompat, Result, WrapErr};
use futures::{future::join_all, stream::FuturesUnordered, Future, Stream, StreamExt};
use futures_concurrency::stream::Merge;
use itertools::Itertools;
use log_subscriber::LogSubscriber;
use run::SpawnedDataflow;
use std::{
@@ -30,7 +35,11 @@ use std::{
sync::Arc,
time::{Duration, Instant},
};
use tokio::{net::TcpStream, sync::mpsc, task::JoinHandle};
use tokio::{
net::TcpStream,
sync::{mpsc, oneshot},
task::JoinHandle,
};
use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream};
use uuid::Uuid;

@@ -135,6 +144,10 @@ impl DaemonConnections {
}
}

fn get(&self, id: &DaemonId) -> Option<&DaemonConnection> {
self.daemons.get(id)
}

fn get_mut(&mut self, id: &DaemonId) -> Option<&mut DaemonConnection> {
self.daemons.get_mut(id)
}
@@ -157,10 +170,6 @@ impl DaemonConnections {
self.daemons.keys()
}

fn iter(&self) -> impl Iterator<Item = (&DaemonId, &DaemonConnection)> {
self.daemons.iter()
}

fn iter_mut(&mut self) -> impl Iterator<Item = (&DaemonId, &mut DaemonConnection)> {
self.daemons.iter_mut()
}
@@ -194,13 +203,20 @@ async fn start_inner(

let mut events = (abortable_events, daemon_events).merge();

let mut running_dataflows: HashMap<Uuid, RunningDataflow> = HashMap::new();
let mut dataflow_results: HashMap<Uuid, BTreeMap<DaemonId, DataflowDaemonResult>> =
let mut running_builds: HashMap<BuildId, RunningBuild> = HashMap::new();
let mut finished_builds: HashMap<BuildId, CachedResult> = HashMap::new();

let mut running_dataflows: HashMap<DataflowId, RunningDataflow> = HashMap::new();
let mut dataflow_results: HashMap<DataflowId, BTreeMap<DaemonId, DataflowDaemonResult>> =
HashMap::new();
let mut archived_dataflows: HashMap<Uuid, ArchivedDataflow> = HashMap::new();
let mut archived_dataflows: HashMap<DataflowId, ArchivedDataflow> = HashMap::new();
let mut daemon_connections = DaemonConnections::default();

while let Some(event) = events.next().await {
// used below for measuring the event handling duration
let start = Instant::now();
let event_kind = event.kind();

if event.log() {
tracing::trace!("Handling event {event:?}");
}
@@ -347,12 +363,13 @@ async fn start_inner(
let mut finished_dataflow = entry.remove();
let dataflow_id = finished_dataflow.uuid;
send_log_message(
&mut finished_dataflow,
&mut finished_dataflow.log_subscribers,
&LogMessage {
dataflow_id,
build_id: None,
dataflow_id: Some(dataflow_id),
node_id: None,
daemon_id: None,
level: LogLevel::Info,
level: LogLevel::Info.into(),
target: Some("coordinator".into()),
module_path: None,
file: None,
@@ -371,9 +388,15 @@ async fn start_inner(
DataflowResult::ok_empty(uuid, clock.new_timestamp())
}),
};
for sender in finished_dataflow.reply_senders {
for sender in finished_dataflow.stop_reply_senders {
let _ = sender.send(Ok(reply.clone()));
}
if !matches!(
finished_dataflow.spawn_result,
CachedResult::Cached { .. }
) {
log::error!("pending spawn result on dataflow finish");
}
}
}
std::collections::hash_map::Entry::Vacant(_) => {
@@ -389,7 +412,54 @@ async fn start_inner(
reply_sender,
} => {
match request {
ControlRequest::Build {
session_id,
dataflow,
git_sources,
prev_git_sources,
local_working_dir,
uv,
} => {
// assign a random build id
let build_id = BuildId::generate();

let result = build_dataflow(
build_id,
session_id,
dataflow,
git_sources,
prev_git_sources,
local_working_dir,
&clock,
uv,
&mut daemon_connections,
)
.await;
match result {
Ok(build) => {
running_builds.insert(build_id, build);
let _ = reply_sender.send(Ok(
ControlRequestReply::DataflowBuildTriggered { build_id },
));
}
Err(err) => {
let _ = reply_sender.send(Err(err));
}
}
}
ControlRequest::WaitForBuild { build_id } => {
if let Some(build) = running_builds.get_mut(&build_id) {
build.build_result.register(reply_sender);
} else if let Some(result) = finished_builds.get_mut(&build_id) {
result.register(reply_sender);
} else {
let _ =
reply_sender.send(Err(eyre!("unknown build id {build_id}")));
}
}
ControlRequest::Start {
build_id,
session_id,
dataflow,
name,
local_working_dir,
@@ -408,6 +478,8 @@ async fn start_inner(
}
}
let dataflow = start_dataflow(
build_id,
session_id,
dataflow,
local_working_dir,
name,
@@ -418,16 +490,30 @@ async fn start_inner(
.await?;
Ok(dataflow)
};
let reply = inner.await.map(|dataflow| {
let uuid = dataflow.uuid;
running_dataflows.insert(uuid, dataflow);
ControlRequestReply::DataflowStarted { uuid }
});
let _ = reply_sender.send(reply);
match inner.await {
Ok(dataflow) => {
let uuid = dataflow.uuid;
running_dataflows.insert(uuid, dataflow);
let _ = reply_sender.send(Ok(
ControlRequestReply::DataflowStartTriggered { uuid },
));
}
Err(err) => {
let _ = reply_sender.send(Err(err));
}
}
}
ControlRequest::WaitForSpawn { dataflow_id } => {
if let Some(dataflow) = running_dataflows.get_mut(&dataflow_id) {
dataflow.spawn_result.register(reply_sender);
} else {
let _ =
reply_sender.send(Err(eyre!("unknown dataflow {dataflow_id}")));
}
}
ControlRequest::Check { dataflow_uuid } => {
let status = match &running_dataflows.get(&dataflow_uuid) {
Some(_) => ControlRequestReply::DataflowStarted {
Some(_) => ControlRequestReply::DataflowSpawned {
uuid: dataflow_uuid,
},
None => ControlRequestReply::DataflowStopped {
@@ -495,7 +581,7 @@ async fn start_inner(

match dataflow {
Ok(dataflow) => {
dataflow.reply_senders.push(reply_sender);
dataflow.stop_reply_senders.push(reply_sender);
}
Err(err) => {
let _ = reply_sender.send(Err(err));
@@ -528,7 +614,7 @@ async fn start_inner(

match dataflow {
Ok(dataflow) => {
dataflow.reply_senders.push(reply_sender);
dataflow.stop_reply_senders.push(reply_sender);
}
Err(err) => {
let _ = reply_sender.send(Err(err));
@@ -626,6 +712,27 @@ async fn start_inner(
"LogSubscribe request should be handled separately"
)));
}
ControlRequest::BuildLogSubscribe { .. } => {
let _ = reply_sender.send(Err(eyre::eyre!(
"BuildLogSubscribe request should be handled separately"
)));
}
ControlRequest::CliAndDefaultDaemonOnSameMachine => {
let mut default_daemon_ip = None;
if let Some(default_id) = daemon_connections.unnamed().next() {
if let Some(connection) = daemon_connections.get(default_id) {
if let Ok(addr) = connection.stream.peer_addr() {
default_daemon_ip = Some(addr.ip());
}
}
}
let _ = reply_sender.send(Ok(
ControlRequestReply::CliAndDefaultDaemonIps {
default_daemon: default_daemon_ip,
cli: None, // filled later
},
));
}
}
}
ControlEvent::Error(err) => tracing::error!("{err:?}"),
@@ -640,6 +747,17 @@ async fn start_inner(
.push(LogSubscriber::new(level, connection));
}
}
ControlEvent::BuildLogSubscribe {
build_id,
level,
connection,
} => {
if let Some(build) = running_builds.get_mut(&build_id) {
build
.log_subscribers
.push(LogSubscriber::new(level, connection));
}
}
},
Event::DaemonHeartbeatInterval => {
let mut disconnected = BTreeSet::new();
@@ -695,14 +813,89 @@ async fn start_inner(
}
}
Event::Log(message) => {
if let Some(dataflow) = running_dataflows.get_mut(&message.dataflow_id) {
send_log_message(dataflow, &message).await;
if let Some(dataflow_id) = &message.dataflow_id {
if let Some(dataflow) = running_dataflows.get_mut(dataflow_id) {
send_log_message(&mut dataflow.log_subscribers, &message).await;
}
}
if let Some(build_id) = message.build_id {
if let Some(build) = running_builds.get_mut(&build_id) {
send_log_message(&mut build.log_subscribers, &message).await;
}
}
}
Event::DaemonExit { daemon_id } => {
tracing::info!("Daemon `{daemon_id}` exited");
daemon_connections.remove(&daemon_id);
}
Event::DataflowBuildResult {
build_id,
daemon_id,
result,
} => match running_builds.get_mut(&build_id) {
Some(build) => {
build.pending_build_results.remove(&daemon_id);
match result {
Ok(()) => {}
Err(err) => {
build.errors.push(format!("{err:?}"));
}
};
if build.pending_build_results.is_empty() {
tracing::info!("dataflow build finished: `{build_id}`");
let mut build = running_builds.remove(&build_id).unwrap();
let result = if build.errors.is_empty() {
Ok(())
} else {
Err(format!("build failed: {}", build.errors.join("\n\n")))
};

build.build_result.set_result(Ok(
ControlRequestReply::DataflowBuildFinished { build_id, result },
));

finished_builds.insert(build_id, build.build_result);
}
}
None => {
tracing::warn!("received DataflowSpawnResult, but no matching dataflow in `running_dataflows` map");
}
},
Event::DataflowSpawnResult {
dataflow_id,
daemon_id,
result,
} => match running_dataflows.get_mut(&dataflow_id) {
Some(dataflow) => {
dataflow.pending_spawn_results.remove(&daemon_id);
match result {
Ok(()) => {
if dataflow.pending_spawn_results.is_empty() {
tracing::info!("successfully spawned dataflow `{dataflow_id}`",);
dataflow.spawn_result.set_result(Ok(
ControlRequestReply::DataflowSpawned { uuid: dataflow_id },
));
}
}
Err(err) => {
tracing::warn!("error while spawning dataflow `{dataflow_id}`");
dataflow.spawn_result.set_result(Err(err));
}
};
}
None => {
tracing::warn!("received DataflowSpawnResult, but no matching dataflow in `running_dataflows` map");
}
},
}

// warn if event handling took too long -> the main loop should never be blocked for too long
let elapsed = start.elapsed();
if elapsed > Duration::from_millis(100) {
tracing::warn!(
"Coordinator took {}ms for handling event: {event_kind}",
elapsed.as_millis()
);
}
}

@@ -711,8 +904,8 @@ async fn start_inner(
Ok(())
}

async fn send_log_message(dataflow: &mut RunningDataflow, message: &LogMessage) {
for subscriber in &mut dataflow.log_subscribers {
async fn send_log_message(log_subscribers: &mut Vec<LogSubscriber>, message: &LogMessage) {
for subscriber in log_subscribers.iter_mut() {
let send_result =
tokio::time::timeout(Duration::from_millis(100), subscriber.send_message(message));

@@ -720,7 +913,7 @@ async fn send_log_message(dataflow: &mut RunningDataflow, message: &LogMessage)
subscriber.close();
}
}
dataflow.log_subscribers.retain(|s| !s.is_closed());
log_subscribers.retain(|s| !s.is_closed());
}

fn dataflow_result(
@@ -787,6 +980,15 @@ async fn send_heartbeat_message(
.wrap_err("failed to send heartbeat message to daemon")
}

struct RunningBuild {
errors: Vec<String>,
build_result: CachedResult,

log_subscribers: Vec<LogSubscriber>,

pending_build_results: BTreeSet<DaemonId>,
}

struct RunningDataflow {
name: Option<String>,
uuid: Uuid,
@@ -797,9 +999,66 @@ struct RunningDataflow {
exited_before_subscribe: Vec<NodeId>,
nodes: BTreeMap<NodeId, ResolvedNode>,

reply_senders: Vec<tokio::sync::oneshot::Sender<eyre::Result<ControlRequestReply>>>,
spawn_result: CachedResult,
stop_reply_senders: Vec<tokio::sync::oneshot::Sender<eyre::Result<ControlRequestReply>>>,

log_subscribers: Vec<LogSubscriber>,

pending_spawn_results: BTreeSet<DaemonId>,
}

pub enum CachedResult {
Pending {
result_senders: Vec<tokio::sync::oneshot::Sender<eyre::Result<ControlRequestReply>>>,
},
Cached {
result: eyre::Result<ControlRequestReply>,
},
}

impl Default for CachedResult {
fn default() -> Self {
Self::Pending {
result_senders: Vec::new(),
}
}
}

impl CachedResult {
fn register(
&mut self,
reply_sender: tokio::sync::oneshot::Sender<eyre::Result<ControlRequestReply>>,
) {
match self {
CachedResult::Pending { result_senders } => result_senders.push(reply_sender),
CachedResult::Cached { result } => {
Self::send_result_to(result, reply_sender);
}
}
}

fn set_result(&mut self, result: eyre::Result<ControlRequestReply>) {
match self {
CachedResult::Pending { result_senders } => {
for sender in result_senders.drain(..) {
Self::send_result_to(&result, sender);
}
*self = CachedResult::Cached { result };
}
CachedResult::Cached { .. } => {}
}
}

fn send_result_to(
result: &eyre::Result<ControlRequestReply>,
sender: oneshot::Sender<eyre::Result<ControlRequestReply>>,
) {
let result = match result {
Ok(r) => Ok(r.clone()),
Err(err) => Err(eyre!("{err:?}")),
};
let _ = sender.send(result);
}
}

struct ArchivedDataflow {
@@ -943,7 +1202,7 @@ async fn retrieve_logs(
let machine_ids: Vec<Option<String>> = nodes
.values()
.filter(|node| node.id == node_id)
.map(|node| node.deploy.machine.clone())
.map(|node| node.deploy.as_ref().and_then(|d| d.machine.clone()))
.collect();

let machine_id = if let [machine_id] = &machine_ids[..] {
@@ -992,9 +1251,127 @@ async fn retrieve_logs(
reply_logs.map_err(|err| eyre!(err))
}

#[allow(clippy::too_many_arguments)]
#[tracing::instrument(skip(daemon_connections, clock))]
async fn build_dataflow(
build_id: BuildId,
session_id: SessionId,
dataflow: Descriptor,
git_sources: BTreeMap<NodeId, GitSource>,
prev_git_sources: BTreeMap<NodeId, GitSource>,
local_working_dir: Option<PathBuf>,
clock: &HLC,
uv: bool,
daemon_connections: &mut DaemonConnections,
) -> eyre::Result<RunningBuild> {
let nodes = dataflow.resolve_aliases_and_set_defaults()?;

let mut git_sources_by_daemon = git_sources
.into_iter()
.into_grouping_map_by(|(id, _)| {
nodes
.get(id)
.and_then(|n| n.deploy.as_ref().and_then(|d| d.machine.as_ref()))
})
.collect();
let mut prev_git_sources_by_daemon = prev_git_sources
.into_iter()
.into_grouping_map_by(|(id, _)| {
nodes
.get(id)
.and_then(|n| n.deploy.as_ref().and_then(|d| d.machine.as_ref()))
})
.collect();

let nodes_by_daemon = nodes
.values()
.into_group_map_by(|n| n.deploy.as_ref().and_then(|d| d.machine.as_ref()));

let mut daemons = BTreeSet::new();
for (machine, nodes_on_machine) in &nodes_by_daemon {
let nodes_on_machine = nodes_on_machine.iter().map(|n| n.id.clone()).collect();
tracing::debug!(
"Running dataflow build `{build_id}` on machine `{machine:?}` (nodes: {nodes_on_machine:?})"
);

let build_command = BuildDataflowNodes {
build_id,
session_id,
local_working_dir: local_working_dir.clone(),
git_sources: git_sources_by_daemon.remove(machine).unwrap_or_default(),
prev_git_sources: prev_git_sources_by_daemon
.remove(machine)
.unwrap_or_default(),
dataflow_descriptor: dataflow.clone(),
nodes_on_machine,
uv,
};
let message = serde_json::to_vec(&Timestamped {
inner: DaemonCoordinatorEvent::Build(build_command),
timestamp: clock.new_timestamp(),
})?;

let daemon_id =
build_dataflow_on_machine(daemon_connections, machine.map(|s| s.as_str()), &message)
.await
.wrap_err_with(|| format!("failed to build dataflow on machine `{machine:?}`"))?;
daemons.insert(daemon_id);
}

tracing::info!("successfully triggered dataflow build `{build_id}`",);

Ok(RunningBuild {
errors: Vec::new(),
build_result: CachedResult::default(),
log_subscribers: Vec::new(),
pending_build_results: daemons,
})
}

async fn build_dataflow_on_machine(
daemon_connections: &mut DaemonConnections,
machine: Option<&str>,
message: &[u8],
) -> Result<DaemonId, eyre::ErrReport> {
let daemon_id = match machine {
Some(machine) => daemon_connections
.get_matching_daemon_id(machine)
.wrap_err_with(|| format!("no matching daemon for machine id {machine:?}"))?
.clone(),
None => daemon_connections
.unnamed()
.next()
.wrap_err("no unnamed daemon connections")?
.clone(),
};

let daemon_connection = daemon_connections
.get_mut(&daemon_id)
.wrap_err_with(|| format!("no daemon connection for daemon `{daemon_id}`"))?;
tcp_send(&mut daemon_connection.stream, message)
.await
.wrap_err("failed to send build message to daemon")?;

let reply_raw = tcp_receive(&mut daemon_connection.stream)
.await
.wrap_err("failed to receive build reply from daemon")?;
match serde_json::from_slice(&reply_raw)
.wrap_err("failed to deserialize build reply from daemon")?
{
DaemonCoordinatorReply::TriggerBuildResult(result) => result
.map_err(|e| eyre!(e))
.wrap_err("daemon returned an error")?,
_ => bail!("unexpected reply"),
}
Ok(daemon_id)
}

#[allow(clippy::too_many_arguments)]
async fn start_dataflow(
build_id: Option<BuildId>,
session_id: SessionId,
dataflow: Descriptor,
working_dir: PathBuf,
local_working_dir: Option<PathBuf>,
name: Option<String>,
daemon_connections: &mut DaemonConnections,
clock: &HLC,
@@ -1004,7 +1381,16 @@ async fn start_dataflow(
uuid,
daemons,
nodes,
} = spawn_dataflow(dataflow, working_dir, daemon_connections, clock, uv).await?;
} = spawn_dataflow(
build_id,
session_id,
dataflow,
local_working_dir,
daemon_connections,
clock,
uv,
)
.await?;
Ok(RunningDataflow {
uuid,
name,
@@ -1014,10 +1400,12 @@ async fn start_dataflow(
BTreeSet::new()
},
exited_before_subscribe: Default::default(),
daemons,
daemons: daemons.clone(),
nodes,
reply_senders: Vec::new(),
spawn_result: CachedResult::default(),
stop_reply_senders: Vec::new(),
log_subscribers: Vec::new(),
pending_spawn_results: daemons,
})
}

@@ -1092,6 +1480,16 @@ pub enum Event {
DaemonExit {
daemon_id: dora_message::common::DaemonId,
},
DataflowBuildResult {
build_id: BuildId,
daemon_id: DaemonId,
result: eyre::Result<()>,
},
DataflowSpawnResult {
dataflow_id: uuid::Uuid,
daemon_id: DaemonId,
result: eyre::Result<()>,
},
}

impl Event {
@@ -1103,6 +1501,23 @@ impl Event {
_ => true,
}
}

fn kind(&self) -> &'static str {
match self {
Event::NewDaemonConnection(_) => "NewDaemonConnection",
Event::DaemonConnectError(_) => "DaemonConnectError",
Event::DaemonHeartbeat { .. } => "DaemonHeartbeat",
Event::Dataflow { .. } => "Dataflow",
Event::Control(_) => "Control",
Event::Daemon(_) => "Daemon",
Event::DaemonHeartbeatInterval => "DaemonHeartbeatInterval",
Event::CtrlC => "CtrlC",
Event::Log(_) => "Log",
Event::DaemonExit { .. } => "DaemonExit",
Event::DataflowBuildResult { .. } => "DataflowBuildResult",
Event::DataflowSpawnResult { .. } => "DataflowSpawnResult",
}
}
}

#[derive(Debug)]


+ 23
- 0
binaries/coordinator/src/listener.rs View File

@@ -112,6 +112,29 @@ pub async fn handle_connection(
break;
}
}
DaemonEvent::BuildResult { build_id, result } => {
let event = Event::DataflowBuildResult {
build_id,
daemon_id,
result: result.map_err(|err| eyre::eyre!(err)),
};
if events_tx.send(event).await.is_err() {
break;
}
}
DaemonEvent::SpawnResult {
dataflow_id,
result,
} => {
let event = Event::DataflowSpawnResult {
dataflow_id,
daemon_id,
result: result.map_err(|err| eyre::eyre!(err)),
};
if events_tx.send(event).await.is_err() {
break;
}
}
},
};
}


+ 8
- 2
binaries/coordinator/src/log_subscriber.rs View File

@@ -17,9 +17,15 @@ impl LogSubscriber {
}

pub async fn send_message(&mut self, message: &LogMessage) -> eyre::Result<()> {
if message.level > self.level {
return Ok(());
match message.level {
dora_core::build::LogLevelOrStdout::LogLevel(level) => {
if level > self.level {
return Ok(());
}
}
dora_core::build::LogLevelOrStdout::Stdout => {}
}

let message = serde_json::to_vec(&message)?;
let connection = self.connection.as_mut().context("connection is closed")?;
tcp_send(connection, &message)


+ 17
- 8
binaries/coordinator/src/run/mod.rs View File

@@ -10,6 +10,7 @@ use dora_message::{
daemon_to_coordinator::DaemonCoordinatorReply,
descriptor::{Descriptor, ResolvedNode},
id::NodeId,
BuildId, SessionId,
};
use eyre::{bail, eyre, ContextCompat, WrapErr};
use itertools::Itertools;
@@ -21,8 +22,10 @@ use uuid::{NoContext, Timestamp, Uuid};

#[tracing::instrument(skip(daemon_connections, clock))]
pub(super) async fn spawn_dataflow(
build_id: Option<BuildId>,
session_id: SessionId,
dataflow: Descriptor,
working_dir: PathBuf,
local_working_dir: Option<PathBuf>,
daemon_connections: &mut DaemonConnections,
clock: &HLC,
uv: bool,
@@ -30,7 +33,9 @@ pub(super) async fn spawn_dataflow(
let nodes = dataflow.resolve_aliases_and_set_defaults()?;
let uuid = Uuid::new_v7(Timestamp::now(NoContext));

let nodes_by_daemon = nodes.values().into_group_map_by(|n| &n.deploy.machine);
let nodes_by_daemon = nodes
.values()
.into_group_map_by(|n| n.deploy.as_ref().and_then(|d| d.machine.as_ref()));

let mut daemons = BTreeSet::new();
for (machine, nodes_on_machine) in &nodes_by_daemon {
@@ -40,8 +45,10 @@ pub(super) async fn spawn_dataflow(
);

let spawn_command = SpawnDataflowNodes {
build_id,
session_id,
dataflow_id: uuid,
working_dir: working_dir.clone(),
local_working_dir: local_working_dir.clone(),
nodes: nodes.clone(),
dataflow_descriptor: dataflow.clone(),
spawn_nodes,
@@ -52,13 +59,14 @@ pub(super) async fn spawn_dataflow(
timestamp: clock.new_timestamp(),
})?;

let daemon_id = spawn_dataflow_on_machine(daemon_connections, machine.as_deref(), &message)
.await
.wrap_err_with(|| format!("failed to spawn dataflow on machine `{machine:?}`"))?;
let daemon_id =
spawn_dataflow_on_machine(daemon_connections, machine.map(|m| m.as_str()), &message)
.await
.wrap_err_with(|| format!("failed to spawn dataflow on machine `{machine:?}`"))?;
daemons.insert(daemon_id);
}

tracing::info!("successfully spawned dataflow `{uuid}`");
tracing::info!("successfully triggered dataflow spawn `{uuid}`",);

Ok(SpawnedDataflow {
uuid,
@@ -90,13 +98,14 @@ async fn spawn_dataflow_on_machine(
tcp_send(&mut daemon_connection.stream, message)
.await
.wrap_err("failed to send spawn message to daemon")?;

let reply_raw = tcp_receive(&mut daemon_connection.stream)
.await
.wrap_err("failed to receive spawn reply from daemon")?;
match serde_json::from_slice(&reply_raw)
.wrap_err("failed to deserialize spawn reply from daemon")?
{
DaemonCoordinatorReply::SpawnResult(result) => result
DaemonCoordinatorReply::TriggerSpawnResult(result) => result
.map_err(|e| eyre!(e))
.wrap_err("daemon returned an error")?,
_ => bail!("unexpected reply"),


+ 6
- 2
binaries/daemon/Cargo.toml View File

@@ -24,14 +24,14 @@ tracing = "0.1.36"
tracing-opentelemetry = { version = "0.18.0", optional = true }
futures-concurrency = "7.1.0"
serde_json = "1.0.86"
dora-core = { workspace = true }
dora-core = { workspace = true, features = ["build"] }
flume = "0.10.14"
dora-download = { workspace = true }
dora-tracing = { workspace = true, optional = true }
dora-arrow-convert = { workspace = true }
dora-node-api = { workspace = true }
dora-message = { workspace = true }
serde_yaml = "0.8.23"
serde_yaml = { workspace = true }
uuid = { version = "1.7", features = ["v7"] }
futures = "0.3.25"
shared-memory-server = { workspace = true }
@@ -44,3 +44,7 @@ sysinfo = "0.30.11"
crossbeam = "0.8.4"
crossbeam-skiplist = "0.1.3"
zenoh = "1.1.1"
url = "2.5.4"
git2 = { workspace = true }
dunce = "1.0.5"
itertools = "0.14"

+ 690
- 98
binaries/daemon/src/lib.rs
File diff suppressed because it is too large
View File


+ 270
- 56
binaries/daemon/src/log.rs View File

@@ -1,14 +1,21 @@
use std::{
ops::{Deref, DerefMut},
path::{Path, PathBuf},
sync::Arc,
};

use dora_core::{config::NodeId, uhlc};
use dora_core::{
build::{BuildLogger, LogLevelOrStdout},
config::NodeId,
uhlc,
};
use dora_message::{
common::{DaemonId, LogLevel, LogMessage, Timestamped},
daemon_to_coordinator::{CoordinatorRequest, DaemonEvent},
BuildId,
};
use eyre::Context;
use flume::Sender;
use tokio::net::TcpStream;
use uuid::Uuid;

@@ -39,11 +46,18 @@ impl NodeLogger<'_> {
.log(level, Some(self.node_id.clone()), target, message)
.await
}

pub async fn try_clone(&self) -> eyre::Result<NodeLogger<'static>> {
Ok(NodeLogger {
node_id: self.node_id.clone(),
logger: self.logger.try_clone().await?,
})
}
}

pub struct DataflowLogger<'a> {
dataflow_id: Uuid,
logger: &'a mut DaemonLogger,
logger: CowMut<'a, DaemonLogger>,
}

impl<'a> DataflowLogger<'a> {
@@ -57,12 +71,12 @@ impl<'a> DataflowLogger<'a> {
pub fn reborrow(&mut self) -> DataflowLogger {
DataflowLogger {
dataflow_id: self.dataflow_id,
logger: self.logger,
logger: CowMut::Borrowed(&mut self.logger),
}
}

pub fn inner(&self) -> &DaemonLogger {
self.logger
&self.logger
}

pub async fn log(
@@ -73,9 +87,64 @@ impl<'a> DataflowLogger<'a> {
message: impl Into<String>,
) {
self.logger
.log(level, self.dataflow_id, node_id, target, message)
.log(level, Some(self.dataflow_id), node_id, target, message)
.await
}

pub async fn try_clone(&self) -> eyre::Result<DataflowLogger<'static>> {
Ok(DataflowLogger {
dataflow_id: self.dataflow_id,
logger: CowMut::Owned(self.logger.try_clone().await?),
})
}
}

pub struct NodeBuildLogger<'a> {
build_id: BuildId,
node_id: NodeId,
logger: CowMut<'a, DaemonLogger>,
}

impl NodeBuildLogger<'_> {
pub async fn log(
&mut self,
level: impl Into<LogLevelOrStdout> + Send,
message: impl Into<String>,
) {
self.logger
.log_build(
self.build_id,
level.into(),
None,
Some(self.node_id.clone()),
message,
)
.await
}

pub async fn try_clone_impl(&self) -> eyre::Result<NodeBuildLogger<'static>> {
Ok(NodeBuildLogger {
build_id: self.build_id,
node_id: self.node_id.clone(),
logger: CowMut::Owned(self.logger.try_clone().await?),
})
}
}

impl BuildLogger for NodeBuildLogger<'_> {
type Clone = NodeBuildLogger<'static>;

fn log_message(
&mut self,
level: impl Into<LogLevelOrStdout> + Send,
message: impl Into<String> + Send,
) -> impl std::future::Future<Output = ()> + Send {
self.log(level, message)
}

fn try_clone(&self) -> impl std::future::Future<Output = eyre::Result<Self::Clone>> + Send {
self.try_clone_impl()
}
}

pub struct DaemonLogger {
@@ -87,7 +156,15 @@ impl DaemonLogger {
pub fn for_dataflow(&mut self, dataflow_id: Uuid) -> DataflowLogger {
DataflowLogger {
dataflow_id,
logger: self,
logger: CowMut::Borrowed(self),
}
}

pub fn for_node_build(&mut self, build_id: BuildId, node_id: NodeId) -> NodeBuildLogger {
NodeBuildLogger {
build_id,
node_id,
logger: CowMut::Borrowed(self),
}
}

@@ -98,15 +175,39 @@ impl DaemonLogger {
pub async fn log(
&mut self,
level: LogLevel,
dataflow_id: Uuid,
dataflow_id: Option<Uuid>,
node_id: Option<NodeId>,
target: Option<String>,
message: impl Into<String>,
) {
let message = LogMessage {
build_id: None,
daemon_id: Some(self.daemon_id.clone()),
dataflow_id,
node_id,
level: level.into(),
target,
module_path: None,
file: None,
line: None,
message: message.into(),
};
self.logger.log(message).await
}

pub async fn log_build(
&mut self,
build_id: BuildId,
level: LogLevelOrStdout,
target: Option<String>,
node_id: Option<NodeId>,
message: impl Into<String>,
) {
let message = LogMessage {
build_id: Some(build_id),
daemon_id: Some(self.daemon_id.clone()),
dataflow_id: None,
node_id,
level,
target,
module_path: None,
@@ -120,10 +221,17 @@ impl DaemonLogger {
pub(crate) fn daemon_id(&self) -> &DaemonId {
&self.daemon_id
}

pub async fn try_clone(&self) -> eyre::Result<Self> {
Ok(Self {
daemon_id: self.daemon_id.clone(),
logger: self.logger.try_clone().await?,
})
}
}

pub struct Logger {
pub(super) coordinator_connection: Option<TcpStream>,
pub(super) destination: LogDestination,
pub(super) daemon_id: DaemonId,
pub(super) clock: Arc<uhlc::HLC>,
}
@@ -137,73 +245,179 @@ impl Logger {
}

pub async fn log(&mut self, message: LogMessage) {
if let Some(connection) = &mut self.coordinator_connection {
let msg = serde_json::to_vec(&Timestamped {
inner: CoordinatorRequest::Event {
daemon_id: self.daemon_id.clone(),
event: DaemonEvent::Log(message.clone()),
},
timestamp: self.clock.new_timestamp(),
})
.expect("failed to serialize log message");
match socket_stream_send(connection, &msg)
.await
.wrap_err("failed to send log message to dora-coordinator")
{
Ok(()) => return,
Err(err) => tracing::warn!("{err:?}"),
match &mut self.destination {
LogDestination::Coordinator {
coordinator_connection,
} => {
let message = Timestamped {
inner: CoordinatorRequest::Event {
daemon_id: self.daemon_id.clone(),
event: DaemonEvent::Log(message.clone()),
},
timestamp: self.clock.new_timestamp(),
};
Self::log_to_coordinator(message, coordinator_connection).await
}
}

// log message using tracing if reporting to coordinator is not possible
match message.level {
LogLevel::Error => {
if let Some(node_id) = message.node_id {
tracing::error!("{}/{} errored:", message.dataflow_id.to_string(), node_id);
}
for line in message.message.lines() {
tracing::error!(" {}", line);
}
LogDestination::Channel { sender } => {
let _ = sender.send_async(message).await;
}
LogLevel::Warn => {
if let Some(node_id) = message.node_id {
tracing::warn!("{}/{} warned:", message.dataflow_id.to_string(), node_id);
}
for line in message.message.lines() {
tracing::warn!(" {}", line);
LogDestination::Tracing => {
// log message using tracing if reporting to coordinator is not possible
match message.level {
LogLevelOrStdout::Stdout => {
tracing::info!(
build_id = ?message.build_id.map(|id| id.to_string()),
dataflow_id = ?message.dataflow_id.map(|id| id.to_string()),
node_id = ?message.node_id.map(|id| id.to_string()),
target = message.target,
module_path = message.module_path,
file = message.file,
line = message.line,
"{}",
Indent(&message.message)
)
}
LogLevelOrStdout::LogLevel(level) => match level {
LogLevel::Error => {
tracing::error!(
build_id = ?message.build_id.map(|id| id.to_string()),
dataflow_id = ?message.dataflow_id.map(|id| id.to_string()),
node_id = ?message.node_id.map(|id| id.to_string()),
target = message.target,
module_path = message.module_path,
file = message.file,
line = message.line,
"{}",
Indent(&message.message)
);
}
LogLevel::Warn => {
tracing::warn!(
build_id = ?message.build_id.map(|id| id.to_string()),
dataflow_id = ?message.dataflow_id.map(|id| id.to_string()),
node_id = ?message.node_id.map(|id| id.to_string()),
target = message.target,
module_path = message.module_path,
file = message.file,
line = message.line,
"{}",
Indent(&message.message)
);
}
LogLevel::Info => {
tracing::info!(
build_id = ?message.build_id.map(|id| id.to_string()),
dataflow_id = ?message.dataflow_id.map(|id| id.to_string()),
node_id = ?message.node_id.map(|id| id.to_string()),
target = message.target,
module_path = message.module_path,
file = message.file,
line = message.line,
"{}",
Indent(&message.message)
);
}
LogLevel::Debug => {
tracing::debug!(
build_id = ?message.build_id.map(|id| id.to_string()),
dataflow_id = ?message.dataflow_id.map(|id| id.to_string()),
node_id = ?message.node_id.map(|id| id.to_string()),
target = message.target,
module_path = message.module_path,
file = message.file,
line = message.line,
"{}",
Indent(&message.message)
);
}
_ => {}
},
}
}
LogLevel::Info => {
if let Some(node_id) = message.node_id {
tracing::info!("{}/{} info:", message.dataflow_id.to_string(), node_id);
}

for line in message.message.lines() {
tracing::info!(" {}", line);
}
}
_ => {}
}
}

pub async fn try_clone(&self) -> eyre::Result<Self> {
let coordinator_connection = match &self.coordinator_connection {
Some(c) => {
let addr = c
let destination = match &self.destination {
LogDestination::Coordinator {
coordinator_connection,
} => {
let addr = coordinator_connection
.peer_addr()
.context("failed to get coordinator peer addr")?;
let new_connection = TcpStream::connect(addr)
.await
.context("failed to connect to coordinator during logger clone")?;
Some(new_connection)
LogDestination::Coordinator {
coordinator_connection: new_connection,
}
}
None => None,
LogDestination::Channel { sender } => LogDestination::Channel {
sender: sender.clone(),
},
LogDestination::Tracing => LogDestination::Tracing,
};

Ok(Self {
coordinator_connection,
destination,
daemon_id: self.daemon_id.clone(),
clock: self.clock.clone(),
})
}

async fn log_to_coordinator(
message: Timestamped<CoordinatorRequest>,
connection: &mut TcpStream,
) {
let msg = serde_json::to_vec(&message).expect("failed to serialize log message");
match socket_stream_send(connection, &msg)
.await
.wrap_err("failed to send log message to dora-coordinator")
{
Ok(()) => return,
Err(err) => tracing::warn!("{err:?}"),
}
}
}

pub enum LogDestination {
Coordinator { coordinator_connection: TcpStream },
Channel { sender: Sender<LogMessage> },
Tracing,
}

enum CowMut<'a, T> {
Borrowed(&'a mut T),
Owned(T),
}

impl<T> Deref for CowMut<'_, T> {
type Target = T;

fn deref(&self) -> &Self::Target {
match self {
CowMut::Borrowed(v) => v,
CowMut::Owned(v) => v,
}
}
}

impl<T> DerefMut for CowMut<'_, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
match self {
CowMut::Borrowed(v) => v,
CowMut::Owned(v) => v,
}
}
}

struct Indent<'a>(&'a str);

impl std::fmt::Display for Indent<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
for line in self.0.lines() {
write!(f, " {}", line)?;
}
Ok(())
}
}

+ 4
- 0
binaries/daemon/src/pending.rs View File

@@ -59,6 +59,10 @@ impl PendingNodes {
self.external_nodes = value;
}

pub fn local_nodes_pending(&self) -> bool {
!self.local_nodes.is_empty()
}

pub async fn handle_node_subscription(
&mut self,
node_id: NodeId,


+ 616
- 508
binaries/daemon/src/spawn.rs
File diff suppressed because it is too large
View File


+ 1
- 1
binaries/runtime/Cargo.toml View File

@@ -21,7 +21,7 @@ eyre = "0.6.8"
futures = "0.3.21"
futures-concurrency = "7.1.0"
libloading = "0.7.3"
serde_yaml = "0.8.23"
serde_yaml = { workspace = true }
tokio = { version = "1.24.2", features = ["full"] }
tokio-stream = "0.1.8"
# pyo3-abi3 flag allow simpler linking. See: https://pyo3.rs/v0.13.2/building_and_distribution.html


+ 2
- 1
binaries/runtime/src/lib.rs View File

@@ -43,7 +43,8 @@ pub fn main() -> eyre::Result<()> {
.wrap_err("failed to set up tracing subscriber")?;
}

let dataflow_descriptor = config.dataflow_descriptor.clone();
let dataflow_descriptor = serde_yaml::from_value(config.dataflow_descriptor.clone())
.context("failed to parse dataflow descriptor")?;

let operator_definition = if operators.is_empty() {
bail!("no operators");


+ 2
- 0
examples/c++-arrow-dataflow/run.rs View File

@@ -112,6 +112,7 @@ async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--release");
cmd.arg("--")
.arg("daemon")
.arg("--run-dataflow")
@@ -136,6 +137,7 @@ async fn build_cxx_node(
clang.arg("-l").arg("m");
clang.arg("-l").arg("rt");
clang.arg("-l").arg("dl");
clang.arg("-l").arg("z");
clang.arg("-pthread");
}
#[cfg(target_os = "windows")]


+ 1
- 0
examples/c++-dataflow/.gitignore View File

@@ -1 +1,2 @@
*.o
/build

+ 2
- 0
examples/c++-dataflow/run.rs View File

@@ -133,6 +133,7 @@ async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--release");
cmd.arg("--")
.arg("daemon")
.arg("--run-dataflow")
@@ -157,6 +158,7 @@ async fn build_cxx_node(
clang.arg("-l").arg("m");
clang.arg("-l").arg("rt");
clang.arg("-l").arg("dl");
clang.arg("-l").arg("z");
clang.arg("-pthread");
}
#[cfg(target_os = "windows")]


+ 1
- 0
examples/c++-ros2-dataflow/.gitignore View File

@@ -1 +1,2 @@
*.o
/build

+ 2
- 0
examples/c++-ros2-dataflow/run.rs View File

@@ -90,6 +90,7 @@ async fn build_cxx_node(
clang.arg("-l").arg("m");
clang.arg("-l").arg("rt");
clang.arg("-l").arg("dl");
clang.arg("-l").arg("z");
clang.arg("-pthread");
}
#[cfg(target_os = "windows")]
@@ -154,6 +155,7 @@ async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--release");
cmd.arg("--")
.arg("daemon")
.arg("--run-dataflow")


+ 7
- 0
examples/c-dataflow/run.rs View File

@@ -44,6 +44,7 @@ async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--release");
cmd.arg("--")
.arg("daemon")
.arg("--run-dataflow")
@@ -63,6 +64,7 @@ async fn build_c_node(root: &Path, name: &str, out_name: &str) -> eyre::Result<(
clang.arg("-l").arg("m");
clang.arg("-l").arg("rt");
clang.arg("-l").arg("dl");
clang.arg("-l").arg("z");
clang.arg("-pthread");
}
#[cfg(target_os = "windows")]
@@ -93,6 +95,8 @@ async fn build_c_node(root: &Path, name: &str, out_name: &str) -> eyre::Result<(
clang.arg("-lsynchronization");
clang.arg("-luser32");
clang.arg("-lwinspool");
clang.arg("-lwinhttp");
clang.arg("-lrpcrt4");

clang.arg("-Wl,-nodefaultlib:libcmt");
clang.arg("-D_DLL");
@@ -107,6 +111,7 @@ async fn build_c_node(root: &Path, name: &str, out_name: &str) -> eyre::Result<(
clang.arg("-l").arg("pthread");
clang.arg("-l").arg("c");
clang.arg("-l").arg("m");
clang.arg("-l").arg("z");
}
clang.arg("-L").arg(root.join("target").join("debug"));
clang
@@ -161,6 +166,8 @@ async fn build_c_operator(root: &Path) -> eyre::Result<()> {
link.arg("-lsynchronization");
link.arg("-luser32");
link.arg("-lwinspool");
link.arg("-lwinhttp");
link.arg("-lrpcrt4");

link.arg("-Wl,-nodefaultlib:libcmt");
link.arg("-D_DLL");


+ 2
- 0
examples/camera/run.rs View File

@@ -43,6 +43,7 @@ async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--release");
cmd.arg("--").arg("build").arg(dataflow).arg("--uv");
if !cmd.status().await?.success() {
bail!("failed to run dataflow");
@@ -51,6 +52,7 @@ async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--release");
cmd.arg("--").arg("run").arg(dataflow).arg("--uv");
if !cmd.status().await?.success() {
bail!("failed to run dataflow");


+ 1
- 0
examples/cmake-dataflow/run.rs View File

@@ -61,6 +61,7 @@ async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--release");
cmd.arg("--")
.arg("daemon")
.arg("--run-dataflow")


+ 32
- 5
examples/multiple-daemons/run.rs View File

@@ -1,3 +1,4 @@
use dora_cli::session::DataflowSession;
use dora_coordinator::{ControlEvent, Event};
use dora_core::{
descriptor::{read_as_descriptor, DescriptorExt},
@@ -8,7 +9,7 @@ use dora_message::{
common::DaemonId,
coordinator_to_cli::{ControlRequestReply, DataflowIdAndName},
};
use dora_tracing::set_up_tracing;
use dora_tracing::TracingBuilder;
use eyre::{bail, Context};

use std::{
@@ -29,7 +30,9 @@ use uuid::Uuid;

#[tokio::main]
async fn main() -> eyre::Result<()> {
set_up_tracing("multiple-daemon-runner").wrap_err("failed to set up tracing subscriber")?;
TracingBuilder::new("multiple-daemon-runner")
.with_stdout("debug")
.build()?;

let root = Path::new(env!("CARGO_MANIFEST_DIR"));
std::env::set_current_dir(root.join(file!()).parent().unwrap())
@@ -47,12 +50,15 @@ async fn main() -> eyre::Result<()> {
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
DORA_COORDINATOR_PORT_CONTROL_DEFAULT,
);
let (_coordinator_port, coordinator) = dora_coordinator::start(
let (coordinator_port, coordinator) = dora_coordinator::start(
coordinator_bind,
coordinator_control_bind,
ReceiverStream::new(coordinator_events_rx),
)
.await?;

tracing::info!("coordinator running on {coordinator_port}");

let coordinator_addr = Ipv4Addr::LOCALHOST;
let daemon_a = run_daemon(coordinator_addr.to_string(), "A");
let daemon_b = run_daemon(coordinator_addr.to_string(), "B");
@@ -135,12 +141,17 @@ async fn start_dataflow(
.check(&working_dir)
.wrap_err("could not validate yaml")?;

let dataflow_session =
DataflowSession::read_session(dataflow).context("failed to read DataflowSession")?;

let (reply_sender, reply) = oneshot::channel();
coordinator_events_tx
.send(Event::Control(ControlEvent::IncomingRequest {
request: ControlRequest::Start {
build_id: dataflow_session.build_id,
session_id: dataflow_session.session_id,
dataflow: dataflow_descriptor,
local_working_dir: working_dir,
local_working_dir: Some(working_dir),
name: None,
uv: false,
},
@@ -149,7 +160,21 @@ async fn start_dataflow(
.await?;
let result = reply.await??;
let uuid = match result {
ControlRequestReply::DataflowStarted { uuid } => uuid,
ControlRequestReply::DataflowStartTriggered { uuid } => uuid,
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected start dataflow reply: {other:?}"),
};

let (reply_sender, reply) = oneshot::channel();
coordinator_events_tx
.send(Event::Control(ControlEvent::IncomingRequest {
request: ControlRequest::WaitForSpawn { dataflow_id: uuid },
reply_sender,
}))
.await?;
let result = reply.await??;
let uuid = match result {
ControlRequestReply::DataflowSpawned { uuid } => uuid,
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected start dataflow reply: {other:?}"),
};
@@ -215,6 +240,7 @@ async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> {
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--release");
cmd.arg("--").arg("build").arg(dataflow);
if !cmd.status().await?.success() {
bail!("failed to build dataflow");
@@ -227,6 +253,7 @@ async fn run_daemon(coordinator: String, machine_id: &str) -> eyre::Result<()> {
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--release");
cmd.arg("--")
.arg("daemon")
.arg("--machine-id")


+ 2
- 0
examples/python-dataflow/run.rs View File

@@ -44,6 +44,7 @@ async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--release");
cmd.arg("--").arg("build").arg(dataflow).arg("--uv");
if !cmd.status().await?.success() {
bail!("failed to run dataflow");
@@ -52,6 +53,7 @@ async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--release");
cmd.arg("--").arg("run").arg(dataflow).arg("--uv");
if !cmd.status().await?.success() {
bail!("failed to run dataflow");


+ 2
- 0
examples/python-multi-env/run.rs View File

@@ -44,6 +44,7 @@ async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--release");
cmd.arg("--").arg("build").arg(dataflow).arg("--uv");
if !cmd.status().await?.success() {
bail!("failed to run dataflow");
@@ -52,6 +53,7 @@ async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--release");
cmd.arg("--").arg("run").arg(dataflow).arg("--uv");
if !cmd.status().await?.success() {
bail!("failed to run dataflow");


+ 2
- 0
examples/python-operator-dataflow/run.rs View File

@@ -43,6 +43,7 @@ async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--release");
cmd.arg("--").arg("build").arg(dataflow).arg("--uv");
if !cmd.status().await?.success() {
bail!("failed to run dataflow");
@@ -51,6 +52,7 @@ async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--release");
cmd.arg("--").arg("run").arg(dataflow).arg("--uv");
if !cmd.status().await?.success() {
bail!("failed to run dataflow");


+ 2
- 0
examples/python-ros2-dataflow/run.rs View File

@@ -44,6 +44,7 @@ async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--release");
cmd.arg("--").arg("build").arg(dataflow).arg("--uv");
if !cmd.status().await?.success() {
bail!("failed to run dataflow");
@@ -52,6 +53,7 @@ async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--release");
cmd.arg("--").arg("run").arg(dataflow).arg("--uv");
if !cmd.status().await?.success() {
bail!("failed to run dataflow");


+ 2
- 0
examples/rerun-viewer/run.rs View File

@@ -43,6 +43,7 @@ async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--release");
cmd.arg("--").arg("build").arg(dataflow).arg("--uv");
if !cmd.status().await?.success() {
bail!("failed to run dataflow");
@@ -51,6 +52,7 @@ async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--release");
cmd.arg("--").arg("run").arg(dataflow).arg("--uv");
if !cmd.status().await?.success() {
bail!("failed to run dataflow");


+ 4
- 0
examples/rust-dataflow-git/.gitignore View File

@@ -0,0 +1,4 @@
/build
/git

/dataflow.dora-session.yaml

+ 7
- 0
examples/rust-dataflow-git/README.md View File

@@ -0,0 +1,7 @@
# Git-based Rust example

To get started:

```bash
cargo run --example rust-dataflow-git
```

+ 29
- 0
examples/rust-dataflow-git/dataflow.yml View File

@@ -0,0 +1,29 @@
nodes:
- id: rust-node
git: https://github.com/dora-rs/dora.git
rev: 64ab0d7c # pinned commit, update this when changing the message crate
build: cargo build -p rust-dataflow-example-node
path: target/debug/rust-dataflow-example-node
inputs:
tick: dora/timer/millis/10
outputs:
- random

- id: rust-status-node
git: https://github.com/dora-rs/dora.git
rev: 64ab0d7c # pinned commit, update this when changing the message crate
build: cargo build -p rust-dataflow-example-status-node
path: target/debug/rust-dataflow-example-status-node
inputs:
tick: dora/timer/millis/100
random: rust-node/random
outputs:
- status

- id: rust-sink
git: https://github.com/dora-rs/dora.git
rev: 64ab0d7c # pinned commit, update this when changing the message crate
build: cargo build -p rust-dataflow-example-sink
path: target/debug/rust-dataflow-example-sink
inputs:
message: rust-status-node/status

+ 53
- 0
examples/rust-dataflow-git/run.rs View File

@@ -0,0 +1,53 @@
use dora_tracing::set_up_tracing;
use eyre::{bail, Context};
use std::path::Path;

#[tokio::main]
async fn main() -> eyre::Result<()> {
set_up_tracing("rust-dataflow-runner").wrap_err("failed to set up tracing subscriber")?;

let root = Path::new(env!("CARGO_MANIFEST_DIR"));
std::env::set_current_dir(root.join(file!()).parent().unwrap())
.wrap_err("failed to set working dir")?;

let args: Vec<String> = std::env::args().collect();
let dataflow = if args.len() > 1 {
Path::new(&args[1])
} else {
Path::new("dataflow.yml")
};
build_dataflow(dataflow).await?;

run_dataflow(dataflow).await?;

Ok(())
}

async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--release");
cmd.arg("--").arg("build").arg(dataflow);
if !cmd.status().await?.success() {
bail!("failed to build dataflow");
};
Ok(())
}

async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--release");
cmd.arg("--")
.arg("daemon")
.arg("--run-dataflow")
.arg(dataflow);
if !cmd.status().await?.success() {
bail!("failed to run dataflow");
};
Ok(())
}

+ 1
- 0
examples/rust-dataflow-url/.gitignore View File

@@ -0,0 +1 @@
/build

+ 2
- 0
examples/rust-dataflow-url/run.rs View File

@@ -23,6 +23,7 @@ async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> {
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--release");
cmd.arg("--").arg("build").arg(dataflow);
if !cmd.status().await?.success() {
bail!("failed to build dataflow");
@@ -35,6 +36,7 @@ async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--release");
cmd.arg("--")
.arg("daemon")
.arg("--run-dataflow")


+ 2
- 1
examples/rust-dataflow/run.rs View File

@@ -16,7 +16,6 @@ async fn main() -> eyre::Result<()> {
} else {
Path::new("dataflow.yml")
};

build_dataflow(dataflow).await?;

run_dataflow(dataflow).await?;
@@ -29,6 +28,7 @@ async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> {
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--release");
cmd.arg("--").arg("build").arg(dataflow);
if !cmd.status().await?.success() {
bail!("failed to build dataflow");
@@ -41,6 +41,7 @@ async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--release");
cmd.arg("--")
.arg("daemon")
.arg("--run-dataflow")


+ 2
- 0
examples/rust-ros2-dataflow/run.rs View File

@@ -23,6 +23,7 @@ async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> {
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--release");
cmd.arg("--").arg("build").arg(dataflow);
if !cmd.status().await?.success() {
bail!("failed to build dataflow");
@@ -35,6 +36,7 @@ async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--release");
cmd.arg("--")
.arg("daemon")
.arg("--run-dataflow")


+ 2
- 0
examples/vlm/run.rs View File

@@ -43,6 +43,7 @@ async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--release");
cmd.arg("--").arg("build").arg(dataflow).arg("--uv");
if !cmd.status().await?.success() {
bail!("failed to run dataflow");
@@ -51,6 +52,7 @@ async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--release");
cmd.arg("--").arg("run").arg(dataflow).arg("--uv");
if !cmd.status().await?.success() {
bail!("failed to run dataflow");


+ 10
- 2
libraries/core/Cargo.toml View File

@@ -9,17 +9,25 @@ repository.workspace = true

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[features]
build = ["dep:git2", "dep:url"]

[dependencies]
dora-message = { workspace = true }
eyre = "0.6.8"
serde = { version = "1.0.136", features = ["derive"] }
serde_yaml = "0.9.11"
serde_yaml = { workspace = true }
once_cell = "1.13.0"
which = "5.0.0"
uuid = { version = "1.7", features = ["serde", "v7"] }
tracing = "0.1"
serde-with-expand-env = "1.1.0"
tokio = { version = "1.24.1", features = ["fs", "process", "sync"] }
tokio = { version = "1.24.1", features = ["fs", "process", "sync", "rt"] }
schemars = "0.8.19"
serde_json = "1.0.117"
log = { version = "0.4.21", features = ["serde"] }
dunce = "1.0.5"
itertools = "0.14"
url = { version = "2.5.4", optional = true }
git2 = { workspace = true, optional = true }
fs_extra = "1.3.0"

+ 83
- 0
libraries/core/src/build/build_command.rs View File

@@ -0,0 +1,83 @@
use std::{
collections::BTreeMap,
io::{BufRead, BufReader},
path::Path,
process::{Command, Stdio},
};

use dora_message::descriptor::EnvValue;
use eyre::{eyre, Context};

pub fn run_build_command(
build: &str,
working_dir: &Path,
uv: bool,
envs: &Option<BTreeMap<String, EnvValue>>,
stdout_tx: tokio::sync::mpsc::Sender<std::io::Result<String>>,
) -> eyre::Result<()> {
let lines = build.lines().collect::<Vec<_>>();
for build_line in lines {
let mut split = build_line.split_whitespace();

let program = split
.next()
.ok_or_else(|| eyre!("build command is empty"))?;
let mut cmd = if uv && (program == "pip" || program == "pip3") {
let mut cmd = Command::new("uv");
cmd.arg("pip");
cmd
} else {
Command::new(program)
};
cmd.args(split);

// Inject Environment Variables
if let Some(envs) = envs {
for (key, value) in envs {
let value = value.to_string();
cmd.env(key, value);
}
}

cmd.current_dir(dunce::simplified(working_dir));

cmd.stdin(Stdio::null());
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());

cmd.env("CLICOLOR", "1");
cmd.env("CLICOLOR_FORCE", "1");

let mut child = cmd
.spawn()
.wrap_err_with(|| format!("failed to spawn `{}`", build))?;

let child_stdout = BufReader::new(child.stdout.take().expect("failed to take stdout"));
let child_stderr = BufReader::new(child.stderr.take().expect("failed to take stderr"));
let stderr_tx = stdout_tx.clone();
let stdout_tx = stdout_tx.clone();

std::thread::spawn(move || {
for line in child_stdout.lines() {
if stdout_tx.blocking_send(line).is_err() {
break;
}
}
});
std::thread::spawn(move || {
for line in child_stderr.lines() {
if stderr_tx.blocking_send(line).is_err() {
break;
}
}
});

let exit_status = cmd
.status()
.wrap_err_with(|| format!("failed to run `{}`", build))?;
if !exit_status.success() {
return Err(eyre!("build command `{build_line}` returned {exit_status}"));
}
}
Ok(())
}

+ 374
- 0
libraries/core/src/build/git.rs View File

@@ -0,0 +1,374 @@
use crate::build::{BuildLogger, PrevGitSource};
use dora_message::{common::LogLevel, DataflowId, SessionId};
use eyre::{bail, ContextCompat, WrapErr};
use git2::FetchOptions;
use itertools::Itertools;
use std::{
collections::{BTreeMap, BTreeSet},
path::{Path, PathBuf},
};
use url::Url;

#[derive(Default)]
pub struct GitManager {
/// Directories that are currently in use by running dataflows.
pub clones_in_use: BTreeMap<PathBuf, BTreeSet<DataflowId>>,
/// Builds that are prepared, but not done yet.
prepared_builds: BTreeMap<SessionId, PreparedBuild>,
reuse_for: BTreeMap<PathBuf, PathBuf>,
}

#[derive(Default)]
struct PreparedBuild {
/// Clone dirs that will be created during the build process.
///
/// This allows subsequent nodes to reuse the dirs.
planned_clone_dirs: BTreeSet<PathBuf>,
}

impl GitManager {
pub fn choose_clone_dir(
&mut self,
session_id: SessionId,
repo: String,
commit_hash: String,
prev_git: Option<PrevGitSource>,
target_dir: &Path,
) -> eyre::Result<GitFolder> {
let repo_url = Url::parse(&repo).context("failed to parse git repository URL")?;
let clone_dir = Self::clone_dir_path(target_dir, &repo_url, &commit_hash)?;

let prev_commit_hash = prev_git
.as_ref()
.filter(|p| p.git_source.repo == repo)
.map(|p| &p.git_source.commit_hash);

if let Some(using) = self.clones_in_use.get(&clone_dir) {
if !using.is_empty() {
// The directory is currently in use by another dataflow. Rebuilding
// while a dataflow is running could lead to unintended behavior.
eyre::bail!(
"the build directory is still in use by the following \
dataflows, please stop them before rebuilding: {}",
using.iter().join(", ")
)
}
}

let reuse = if self.clone_dir_ready(session_id, &clone_dir) {
// The directory already contains a checkout of the commit we're interested in.
// So we can simply reuse the directory without doing any additional git
// operations.
ReuseOptions::Reuse {
dir: clone_dir.clone(),
}
} else if let Some(previous_commit_hash) = prev_commit_hash {
// we might be able to update a previous clone
let prev_clone_dir = Self::clone_dir_path(target_dir, &repo_url, previous_commit_hash)?;

if prev_clone_dir.exists() {
let still_needed = prev_git
.map(|g| g.still_needed_for_this_build)
.unwrap_or(false);
let used_by_others = self
.clones_in_use
.get(&prev_clone_dir)
.map(|ids| !ids.is_empty())
.unwrap_or(false);
if still_needed || used_by_others {
// previous clone is still in use -> we cannot rename it, but we can copy it
ReuseOptions::CopyAndFetch {
from: prev_clone_dir,
target_dir: clone_dir.clone(),
commit_hash,
}
} else {
// there is an unused previous clone that is no longer needed -> rename it
ReuseOptions::RenameAndFetch {
from: prev_clone_dir,
target_dir: clone_dir.clone(),
commit_hash,
}
}
} else {
// no existing clone associated with previous build id
ReuseOptions::NewClone {
target_dir: clone_dir.clone(),
repo_url,
commit_hash,
}
}
} else {
// no previous build that we can reuse
ReuseOptions::NewClone {
target_dir: clone_dir.clone(),
repo_url,
commit_hash,
}
};
self.register_ready_clone_dir(session_id, clone_dir);

Ok(GitFolder { reuse })
}

pub fn in_use(&self, dir: &Path) -> bool {
self.clones_in_use
.get(dir)
.map(|ids| !ids.is_empty())
.unwrap_or(false)
}

pub fn clone_dir_ready(&self, session_id: SessionId, dir: &Path) -> bool {
self.prepared_builds
.get(&session_id)
.map(|p| p.planned_clone_dirs.contains(dir))
.unwrap_or(false)
|| dir.exists()
}

pub fn register_ready_clone_dir(&mut self, session_id: SessionId, dir: PathBuf) -> bool {
self.prepared_builds
.entry(session_id)
.or_default()
.planned_clone_dirs
.insert(dir)
}

fn clone_dir_path(
base_dir: &Path,
repo_url: &Url,
commit_hash: &String,
) -> eyre::Result<PathBuf> {
let mut path = base_dir.join(repo_url.host_str().context("git URL has no hostname")?);
path.extend(repo_url.path_segments().context("no path in git URL")?);
let path = path.join(commit_hash);
Ok(dunce::simplified(&path).to_owned())
}

pub fn clear_planned_builds(&mut self, session_id: SessionId) {
self.prepared_builds.remove(&session_id);
}
}

pub struct GitFolder {
/// Specifies whether an existing repo should be reused.
reuse: ReuseOptions,
}

impl GitFolder {
pub async fn prepare(self, logger: &mut impl BuildLogger) -> eyre::Result<PathBuf> {
let GitFolder { reuse } = self;

tracing::info!("reuse: {reuse:?}");
let clone_dir = match reuse {
ReuseOptions::NewClone {
target_dir,
repo_url,
commit_hash,
} => {
logger
.log_message(
LogLevel::Info,
format!(
"cloning {repo_url}#{commit_hash} into {}",
target_dir.display()
),
)
.await;
let clone_target = target_dir.clone();
let checkout_result = tokio::task::spawn_blocking(move || {
let repository = clone_into(repo_url.clone(), &clone_target)
.with_context(|| format!("failed to clone git repo from `{repo_url}`"))?;
checkout_tree(&repository, &commit_hash)
.with_context(|| format!("failed to checkout commit `{commit_hash}`"))
})
.await
.unwrap();

match checkout_result {
Ok(()) => target_dir,
Err(err) => {
logger
.log_message(LogLevel::Error, format!("{err:?}"))
.await;
// remove erroneous clone again
if let Err(err) = std::fs::remove_dir_all(target_dir) {
logger
.log_message(
LogLevel::Error,
format!(
"failed to remove clone dir after clone/checkout error: {}",
err.kind()
),
)
.await;
}
bail!(err)
}
}
}
ReuseOptions::CopyAndFetch {
from,
target_dir,
commit_hash,
} => {
let from_clone = from.clone();
let to = target_dir.clone();
tokio::task::spawn_blocking(move || {
std::fs::create_dir_all(&to)
.context("failed to create directory for copying git repo")?;
fs_extra::dir::copy(
&from_clone,
&to,
&fs_extra::dir::CopyOptions::new().content_only(true),
)
.with_context(|| {
format!(
"failed to copy repo clone from `{}` to `{}`",
from_clone.display(),
to.display()
)
})
})
.await??;

logger
.log_message(
LogLevel::Info,
format!("fetching changes after copying {}", from.display()),
)
.await;

let repository = fetch_changes(&target_dir, None).await?;
checkout_tree(&repository, &commit_hash)?;
target_dir
}
ReuseOptions::RenameAndFetch {
from,
target_dir,
commit_hash,
} => {
tokio::fs::rename(&from, &target_dir)
.await
.context("failed to rename repo clone")?;

logger
.log_message(
LogLevel::Info,
format!("fetching changes after renaming {}", from.display()),
)
.await;

let repository = fetch_changes(&target_dir, None).await?;
checkout_tree(&repository, &commit_hash)?;
target_dir
}
ReuseOptions::Reuse { dir } => {
logger
.log_message(
LogLevel::Info,
format!("reusing up-to-date {}", dir.display()),
)
.await;
dir
}
};
Ok(clone_dir)
}
}

#[derive(Debug)]
enum ReuseOptions {
/// Create a new clone of the repository.
NewClone {
target_dir: PathBuf,
repo_url: Url,
commit_hash: String,
},
/// Reuse an existing up-to-date clone of the repository.
Reuse { dir: PathBuf },
/// Copy an older clone of the repository and fetch changes, then reuse it.
CopyAndFetch {
from: PathBuf,
target_dir: PathBuf,
commit_hash: String,
},
/// Rename an older clone of the repository and fetch changes, then reuse it.
RenameAndFetch {
from: PathBuf,
target_dir: PathBuf,
commit_hash: String,
},
}

fn clone_into(repo_addr: Url, clone_dir: &Path) -> eyre::Result<git2::Repository> {
if let Some(parent) = clone_dir.parent() {
std::fs::create_dir_all(parent)
.context("failed to create parent directory for git clone")?;
}

let clone_dir = clone_dir.to_owned();

let mut builder = git2::build::RepoBuilder::new();
let mut fetch_options = git2::FetchOptions::new();
fetch_options.download_tags(git2::AutotagOption::All);
builder.fetch_options(fetch_options);
builder
.clone(repo_addr.as_str(), &clone_dir)
.context("failed to clone repo")
}

async fn fetch_changes(
repo_dir: &Path,
refname: Option<String>,
) -> Result<git2::Repository, eyre::Error> {
let repo_dir = repo_dir.to_owned();
let fetch_changes = tokio::task::spawn_blocking(move || {
let repository = git2::Repository::open(&repo_dir).context("failed to open git repo")?;

{
let mut remote = repository
.find_remote("origin")
.context("failed to find remote `origin` in repo")?;
remote
.connect(git2::Direction::Fetch)
.context("failed to connect to remote")?;
let default_branch = remote
.default_branch()
.context("failed to get default branch for remote")?;
let fetch = match &refname {
Some(refname) => refname,
None => default_branch
.as_str()
.context("failed to read default branch as string")?,
};
let mut fetch_options = FetchOptions::new();
fetch_options.download_tags(git2::AutotagOption::All);
remote
.fetch(&[&fetch], Some(&mut fetch_options), None)
.context("failed to fetch from git repo")?;
}
Result::<_, eyre::Error>::Ok(repository)
});
let repository = fetch_changes.await??;
Ok(repository)
}

fn checkout_tree(repository: &git2::Repository, commit_hash: &str) -> eyre::Result<()> {
let (object, reference) = repository
.revparse_ext(commit_hash)
.context("failed to parse ref")?;
repository
.checkout_tree(&object, None)
.context("failed to checkout ref")?;
match reference {
Some(reference) => repository
.set_head(reference.name().context("failed to get reference_name")?)
.context("failed to set head")?,
None => repository
.set_head_detached(object.id())
.context("failed to set detached head")?,
}

Ok(())
}

+ 19
- 0
libraries/core/src/build/logger.rs View File

@@ -0,0 +1,19 @@
use std::future::Future;

pub use dora_message::common::LogLevelOrStdout;

pub trait BuildLogger: Send {
type Clone: BuildLogger + 'static;

fn log_message(
&mut self,
level: impl Into<LogLevelOrStdout> + Send,
message: impl Into<String> + Send,
) -> impl Future<Output = ()> + Send;

fn log_stdout(&mut self, message: impl Into<String> + Send) -> impl Future<Output = ()> + Send {
self.log_message(LogLevelOrStdout::Stdout, message)
}

fn try_clone(&self) -> impl Future<Output = eyre::Result<Self::Clone>> + Send;
}

+ 148
- 0
libraries/core/src/build/mod.rs View File

@@ -0,0 +1,148 @@
pub use git::GitManager;
pub use logger::{BuildLogger, LogLevelOrStdout};

use url::Url;

use std::{collections::BTreeMap, future::Future, path::PathBuf};

use crate::descriptor::ResolvedNode;
use dora_message::{
common::{GitSource, LogLevel},
descriptor::{CoreNodeKind, EnvValue},
id::NodeId,
SessionId,
};
use eyre::Context;

use build_command::run_build_command;
use git::GitFolder;

mod build_command;
mod git;
mod logger;

#[derive(Clone)]
pub struct Builder {
pub session_id: SessionId,
pub base_working_dir: PathBuf,
pub uv: bool,
}

impl Builder {
pub async fn build_node(
self,
node: ResolvedNode,
git: Option<GitSource>,
prev_git: Option<PrevGitSource>,
mut logger: impl BuildLogger,
git_manager: &mut GitManager,
) -> eyre::Result<impl Future<Output = eyre::Result<BuiltNode>>> {
let prepared_git = if let Some(GitSource { repo, commit_hash }) = git {
let target_dir = self.base_working_dir.join("git");
let git_folder = git_manager.choose_clone_dir(
self.session_id,
repo,
commit_hash,
prev_git,
&target_dir,
)?;
Some(git_folder)
} else {
None
};

let task = async move { self.build_node_inner(node, &mut logger, prepared_git).await };
Ok(task)
}

async fn build_node_inner(
self,
node: ResolvedNode,
logger: &mut impl BuildLogger,
git_folder: Option<GitFolder>,
) -> eyre::Result<BuiltNode> {
logger.log_message(LogLevel::Debug, "building node").await;
let node_working_dir = match &node.kind {
CoreNodeKind::Custom(n) => {
let node_working_dir = match git_folder {
Some(git_folder) => {
let clone_dir = git_folder.prepare(logger).await?;
tracing::warn!(
"using git clone directory as working dir: \
this behavior is unstable and might change \
(see https://github.com/dora-rs/dora/pull/901)"
);
clone_dir
}
None => self.base_working_dir,
};

if let Some(build) = &n.build {
build_node(logger, &node.env, node_working_dir.clone(), build, self.uv).await?;
}
node_working_dir
}
CoreNodeKind::Runtime(n) => {
// run build commands
for operator in &n.operators {
if let Some(build) = &operator.config.build {
build_node(
logger,
&node.env,
self.base_working_dir.clone(),
build,
self.uv,
)
.await?;
}
}
self.base_working_dir.clone()
}
};
Ok(BuiltNode { node_working_dir })
}
}

async fn build_node(
logger: &mut impl BuildLogger,
node_env: &Option<BTreeMap<String, EnvValue>>,
working_dir: PathBuf,
build: &String,
uv: bool,
) -> eyre::Result<()> {
logger
.log_message(LogLevel::Info, format!("running build command: `{build}"))
.await;
let build = build.to_owned();
let node_env = node_env.clone();
let mut logger = logger.try_clone().await.context("failed to clone logger")?;
let (stdout_tx, mut stdout) = tokio::sync::mpsc::channel(10);
let task = tokio::task::spawn_blocking(move || {
run_build_command(&build, &working_dir, uv, &node_env, stdout_tx)
.context("build command failed")
});
tokio::spawn(async move {
while let Some(line) = stdout.recv().await {
logger
.log_stdout(line.unwrap_or_else(|err| format!("io err: {}", err.kind())))
.await;
}
});
task.await??;
Ok(())
}

pub struct BuiltNode {
pub node_working_dir: PathBuf,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct BuildInfo {
pub node_working_dirs: BTreeMap<NodeId, PathBuf>,
}

pub struct PrevGitSource {
pub git_source: GitSource,
/// `True` if any nodes of this dataflow still require the source for building.
pub still_needed_for_this_build: bool,
}

+ 37
- 9
libraries/core/src/descriptor/mod.rs View File

@@ -1,5 +1,6 @@
use dora_message::{
config::{Input, InputMapping, NodeRunConfig},
descriptor::{GitRepoRev, NodeSource},
id::{DataId, NodeId, OperatorId},
};
use eyre::{bail, Context, OptionExt, Result};
@@ -53,7 +54,7 @@ impl DescriptorExt for Descriptor {
// adjust input mappings
let mut node_kind = node_kind_mut(&mut node)?;
let input_mappings: Vec<_> = match &mut node_kind {
NodeKindMut::Standard { path: _, inputs } => inputs.values_mut().collect(),
NodeKindMut::Standard { inputs, .. } => inputs.values_mut().collect(),
NodeKindMut::Runtime(node) => node
.operators
.iter_mut()
@@ -76,8 +77,13 @@ impl DescriptorExt for Descriptor {

// resolve nodes
let kind = match node_kind {
NodeKindMut::Standard { path, inputs: _ } => CoreNodeKind::Custom(CustomNode {
source: path.clone(),
NodeKindMut::Standard {
path,
source,
inputs: _,
} => CoreNodeKind::Custom(CustomNode {
path: path.clone(),
source,
args: node.args,
build: node.build,
send_stdout_as: node.send_stdout_as,
@@ -149,14 +155,35 @@ pub async fn read_as_descriptor(path: &Path) -> eyre::Result<Descriptor> {

fn node_kind_mut(node: &mut Node) -> eyre::Result<NodeKindMut> {
match node.kind()? {
NodeKind::Standard(_) => node
.path
.as_ref()
.map(|path| NodeKindMut::Standard {
path,
NodeKind::Standard(_) => {
let source = match (&node.git, &node.branch, &node.tag, &node.rev) {
(None, None, None, None) => NodeSource::Local,
(Some(repo), branch, tag, rev) => {
let rev = match (branch, tag, rev) {
(None, None, None) => None,
(Some(branch), None, None) => Some(GitRepoRev::Branch(branch.clone())),
(None, Some(tag), None) => Some(GitRepoRev::Tag(tag.clone())),
(None, None, Some(rev)) => Some(GitRepoRev::Rev(rev.clone())),
other @ (_, _, _) => {
eyre::bail!("only one of `branch`, `tag`, and `rev` are allowed (got {other:?})")
}
};
NodeSource::GitBranch {
repo: repo.clone(),
rev,
}
}
(None, _, _, _) => {
eyre::bail!("`git` source required when using branch, tag, or rev")
}
};

Ok(NodeKindMut::Standard {
path: node.path.as_ref().ok_or_eyre("missing `path` attribute")?,
source,
inputs: &mut node.inputs,
})
.ok_or_eyre("no path"),
}
NodeKind::Runtime(_) => node
.operators
.as_mut()
@@ -249,6 +276,7 @@ pub enum NodeKind<'a> {
enum NodeKindMut<'a> {
Standard {
path: &'a String,
source: NodeSource,
inputs: &'a mut BTreeMap<DataId, Input>,
},
/// Dora runtime node


+ 29
- 16
libraries/core/src/descriptor/validate.rs View File

@@ -28,23 +28,34 @@ pub fn check_dataflow(
// check that nodes and operators exist
for node in nodes.values() {
match &node.kind {
descriptor::CoreNodeKind::Custom(custom) => match custom.source.as_str() {
SHELL_SOURCE => (),
DYNAMIC_SOURCE => (),
source => {
if source_is_url(source) {
info!("{source} is a URL."); // TODO: Implement url check.
} else if let Some(remote_daemon_id) = remote_daemon_id {
if let Some(machine) = &node.deploy.machine {
if remote_daemon_id.contains(&machine.as_str()) || coordinator_is_remote
{
info!("skipping path check for remote node `{}`", node.id);
descriptor::CoreNodeKind::Custom(custom) => match &custom.source {
dora_message::descriptor::NodeSource::Local => match custom.path.as_str() {
SHELL_SOURCE => (),
DYNAMIC_SOURCE => (),
source => {
if source_is_url(source) {
info!("{source} is a URL."); // TODO: Implement url check.
} else if let Some(remote_daemon_id) = remote_daemon_id {
if let Some(deploy) = &node.deploy {
if let Some(machine) = &deploy.machine {
if remote_daemon_id.contains(&machine.as_str())
|| coordinator_is_remote
{
info!("skipping path check for remote node `{}`", node.id);
}
}
}
}
} else {
resolve_path(source, working_dir)
.wrap_err_with(|| format!("Could not find source path `{}`", source))?;
};
} else if custom.build.is_some() {
info!("skipping path check for node with build command");
} else {
resolve_path(source, working_dir).wrap_err_with(|| {
format!("Could not find source path `{}`", source)
})?;
};
}
},
dora_message::descriptor::NodeSource::GitBranch { repo, rev } => {
info!("skipping check for node with git source");
}
},
descriptor::CoreNodeKind::Runtime(node) => {
@@ -53,6 +64,8 @@ pub fn check_dataflow(
OperatorSource::SharedLibrary(path) => {
if source_is_url(path) {
info!("{path} is a URL."); // TODO: Implement url check.
} else if operator_definition.config.build.is_some() {
info!("skipping path check for operator with build command");
} else {
let path = adjust_shared_library_path(Path::new(&path))?;
if !working_dir.join(&path).exists() {


+ 2
- 0
libraries/core/src/lib.rs View File

@@ -7,6 +7,8 @@ use std::{

pub use dora_message::{config, uhlc};

#[cfg(feature = "build")]
pub mod build;
pub mod descriptor;
pub mod metadata;
pub mod topics;


+ 2
- 2
libraries/message/Cargo.toml View File

@@ -1,7 +1,7 @@
[package]
name = "dora-message"
# versioned separately from the other dora crates
version = "0.4.4"
version = "0.5.0-alpha"
edition.workspace = true
documentation.workspace = true
description.workspace = true
@@ -23,7 +23,7 @@ aligned-vec = { version = "0.5.0", features = ["serde"] }
semver = { version = "1.0.23", features = ["serde"] }
schemars = "0.8.19"
uhlc = "0.5.1"
serde_yaml = "0.9.11"
serde_yaml = { workspace = true }
once_cell = "1.13.0"
serde-with-expand-env = "1.1.0"
bincode = "1.3.3"

+ 40
- 5
libraries/message/src/cli_to_coordinator.rs View File

@@ -1,22 +1,52 @@
use std::{path::PathBuf, time::Duration};
use std::{collections::BTreeMap, path::PathBuf, time::Duration};

use uuid::Uuid;

use crate::{
common::GitSource,
descriptor::Descriptor,
id::{NodeId, OperatorId},
BuildId, SessionId,
};

#[derive(Debug, serde::Deserialize, serde::Serialize)]
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub enum ControlRequest {
Build {
session_id: SessionId,
dataflow: Descriptor,
git_sources: BTreeMap<NodeId, GitSource>,
prev_git_sources: BTreeMap<NodeId, GitSource>,
/// Allows overwriting the base working dir when CLI and daemon are
/// running on the same machine.
///
/// Must not be used for multi-machine dataflows.
///
/// Note that nodes with git sources still use a subdirectory of
/// the base working dir.
local_working_dir: Option<PathBuf>,
uv: bool,
},
WaitForBuild {
build_id: BuildId,
},
Start {
build_id: Option<BuildId>,
session_id: SessionId,
dataflow: Descriptor,
name: Option<String>,
// TODO: remove this once we figure out deploying of node/operator
// binaries from CLI to coordinator/daemon
local_working_dir: PathBuf,
/// Allows overwriting the base working dir when CLI and daemon are
/// running on the same machine.
///
/// Must not be used for multi-machine dataflows.
///
/// Note that nodes with git sources still use a subdirectory of
/// the base working dir.
local_working_dir: Option<PathBuf>,
uv: bool,
},
WaitForSpawn {
dataflow_id: Uuid,
},
Reload {
dataflow_id: Uuid,
node_id: NodeId,
@@ -46,4 +76,9 @@ pub enum ControlRequest {
dataflow_id: Uuid,
level: log::LevelFilter,
},
BuildLogSubscribe {
build_id: BuildId,
level: log::LevelFilter,
},
CliAndDefaultDaemonOnSameMachine,
}

+ 27
- 3
libraries/message/src/common.rs View File

@@ -5,17 +5,18 @@ use aligned_vec::{AVec, ConstAlign};
use eyre::Context as _;
use uuid::Uuid;

use crate::{daemon_to_daemon::InterDaemonEvent, id::NodeId, DataflowId};
use crate::{daemon_to_daemon::InterDaemonEvent, id::NodeId, BuildId, DataflowId};

pub use log::Level as LogLevel;

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[must_use]
pub struct LogMessage {
pub dataflow_id: DataflowId,
pub build_id: Option<BuildId>,
pub dataflow_id: Option<DataflowId>,
pub node_id: Option<NodeId>,
pub daemon_id: Option<DaemonId>,
pub level: LogLevel,
pub level: LogLevelOrStdout,
pub target: Option<String>,
pub module_path: Option<String>,
pub file: Option<String>,
@@ -23,6 +24,18 @@ pub struct LogMessage {
pub message: String,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, PartialOrd, Ord)]
pub enum LogLevelOrStdout {
LogLevel(LogLevel),
Stdout,
}

impl From<LogLevel> for LogLevelOrStdout {
fn from(level: LogLevel) -> Self {
Self::LogLevel(level)
}
}

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct NodeError {
pub timestamp: uhlc::Timestamp,
@@ -32,6 +45,9 @@ pub struct NodeError {

impl std::fmt::Display for NodeError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if let NodeErrorCause::FailedToSpawn(err) = &self.cause {
return write!(f, "failed to spawn node: {err}");
}
match &self.exit_status {
NodeExitStatus::Success => write!(f, "<success>"),
NodeExitStatus::IoError(err) => write!(f, "I/O error while reading exit status: {err}"),
@@ -68,6 +84,7 @@ impl std::fmt::Display for NodeError {
f,
". This error occurred because node `{caused_by_node}` exited before connecting to dora."
)?,
NodeErrorCause::FailedToSpawn(_) => unreachable!(), // handled above
NodeErrorCause::Other { stderr } if stderr.is_empty() => {}
NodeErrorCause::Other { stderr } => {
let line: &str = "---------------------------------------------------------------------------------\n";
@@ -88,6 +105,7 @@ pub enum NodeErrorCause {
Cascading {
caused_by_node: NodeId,
},
FailedToSpawn(String),
Other {
stderr: String,
},
@@ -234,3 +252,9 @@ impl std::fmt::Display for DaemonId {
write!(f, "{}", self.uuid)
}
}

#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq, Eq)]
pub struct GitSource {
pub repo: String,
pub commit_hash: String,
}

+ 29
- 5
libraries/message/src/coordinator_to_cli.rs View File

@@ -1,22 +1,46 @@
use std::collections::{BTreeMap, BTreeSet};
use std::{
collections::{BTreeMap, BTreeSet},
net::IpAddr,
};

use uuid::Uuid;

pub use crate::common::{LogLevel, LogMessage, NodeError, NodeErrorCause, NodeExitStatus};
use crate::{common::DaemonId, id::NodeId};
use crate::{common::DaemonId, id::NodeId, BuildId};

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub enum ControlRequestReply {
Error(String),
CoordinatorStopped,
DataflowStarted { uuid: Uuid },
DataflowReloaded { uuid: Uuid },
DataflowStopped { uuid: Uuid, result: DataflowResult },
DataflowBuildTriggered {
build_id: BuildId,
},
DataflowBuildFinished {
build_id: BuildId,
result: Result<(), String>,
},
DataflowStartTriggered {
uuid: Uuid,
},
DataflowSpawned {
uuid: Uuid,
},
DataflowReloaded {
uuid: Uuid,
},
DataflowStopped {
uuid: Uuid,
result: DataflowResult,
},
DataflowList(DataflowList),
DestroyOk,
DaemonConnected(bool),
ConnectedDaemons(BTreeSet<DaemonId>),
Logs(Vec<u8>),
CliAndDefaultDaemonIps {
default_daemon: Option<IpAddr>,
cli: Option<IpAddr>,
},
}

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]


+ 32
- 3
libraries/message/src/coordinator_to_daemon.rs View File

@@ -5,10 +5,10 @@ use std::{
};

use crate::{
common::DaemonId,
common::{DaemonId, GitSource},
descriptor::{Descriptor, ResolvedNode},
id::{NodeId, OperatorId},
DataflowId,
BuildId, DataflowId, SessionId,
};

pub use crate::common::Timestamped;
@@ -33,6 +33,7 @@ impl RegisterResult {

#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub enum DaemonCoordinatorEvent {
Build(BuildDataflowNodes),
Spawn(SpawnDataflowNodes),
AllNodesReady {
dataflow_id: DataflowId,
@@ -55,10 +56,38 @@ pub enum DaemonCoordinatorEvent {
Heartbeat,
}

#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub struct BuildDataflowNodes {
pub build_id: BuildId,
pub session_id: SessionId,
/// Allows overwriting the base working dir when CLI and daemon are
/// running on the same machine.
///
/// Must not be used for multi-machine dataflows.
///
/// Note that nodes with git sources still use a subdirectory of
/// the base working dir.
pub local_working_dir: Option<PathBuf>,
pub git_sources: BTreeMap<NodeId, GitSource>,
pub prev_git_sources: BTreeMap<NodeId, GitSource>,
pub dataflow_descriptor: Descriptor,
pub nodes_on_machine: BTreeSet<NodeId>,
pub uv: bool,
}

#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub struct SpawnDataflowNodes {
pub build_id: Option<BuildId>,
pub session_id: SessionId,
pub dataflow_id: DataflowId,
pub working_dir: PathBuf,
/// Allows overwriting the base working dir when CLI and daemon are
/// running on the same machine.
///
/// Must not be used for multi-machine dataflows.
///
/// Note that nodes with git sources still use a subdirectory of
/// the base working dir.
pub local_working_dir: Option<PathBuf>,
pub nodes: BTreeMap<NodeId, ResolvedNode>,
pub dataflow_descriptor: Descriptor,
pub spawn_nodes: BTreeSet<NodeId>,


+ 13
- 2
libraries/message/src/daemon_to_coordinator.rs View File

@@ -3,7 +3,9 @@ use std::collections::BTreeMap;
pub use crate::common::{
DataMessage, LogLevel, LogMessage, NodeError, NodeErrorCause, NodeExitStatus, Timestamped,
};
use crate::{common::DaemonId, current_crate_version, id::NodeId, versions_compatible, DataflowId};
use crate::{
common::DaemonId, current_crate_version, id::NodeId, versions_compatible, BuildId, DataflowId,
};

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub enum CoordinatorRequest {
@@ -46,6 +48,14 @@ impl DaemonRegisterRequest {

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub enum DaemonEvent {
BuildResult {
build_id: BuildId,
result: Result<(), String>,
},
SpawnResult {
dataflow_id: DataflowId,
result: Result<(), String>,
},
AllNodesReady {
dataflow_id: DataflowId,
exited_before_subscribe: Vec<NodeId>,
@@ -73,7 +83,8 @@ impl DataflowDaemonResult {

#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub enum DaemonCoordinatorReply {
SpawnResult(Result<(), String>),
TriggerBuildResult(Result<(), String>),
TriggerSpawnResult(Result<(), String>),
ReloadResult(Result<(), String>),
StopResult(Result<(), String>),
DestroyResult {


+ 2
- 2
libraries/message/src/daemon_to_node.rs View File

@@ -2,7 +2,7 @@ use std::{net::SocketAddr, path::PathBuf};

use crate::{
config::NodeRunConfig,
descriptor::{Descriptor, OperatorDefinition},
descriptor::OperatorDefinition,
id::{DataId, NodeId, OperatorId},
metadata::Metadata,
DataflowId,
@@ -23,7 +23,7 @@ pub struct NodeConfig {
pub node_id: NodeId,
pub run_config: NodeRunConfig,
pub daemon_communication: DaemonCommunication,
pub dataflow_descriptor: Descriptor,
pub dataflow_descriptor: serde_yaml::Value,
pub dynamic: bool,
}



+ 40
- 7
libraries/message/src/descriptor.rs View File

@@ -23,18 +23,19 @@ pub struct Descriptor {
#[serde(default)]
pub communication: CommunicationConfig,
#[schemars(skip)]
#[serde(default, rename = "_unstable_deploy")]
pub deploy: Deploy,
#[serde(rename = "_unstable_deploy")]
pub deploy: Option<Deploy>,
pub nodes: Vec<Node>,
#[schemars(skip)]
#[serde(default, rename = "_unstable_debug")]
pub debug: Debug,
}

#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)]
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(deny_unknown_fields)]
pub struct Deploy {
pub machine: Option<String>,
pub working_dir: Option<PathBuf>,
}

#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)]
@@ -58,8 +59,8 @@ pub struct Node {

/// Unstable machine deployment configuration
#[schemars(skip)]
#[serde(default, rename = "_unstable_deploy")]
pub deploy: Deploy,
#[serde(rename = "_unstable_deploy")]
pub deploy: Option<Deploy>,

#[serde(default, skip_serializing_if = "Option::is_none")]
pub operators: Option<RuntimeNode>,
@@ -70,6 +71,15 @@ pub struct Node {

#[serde(default, skip_serializing_if = "Option::is_none")]
pub path: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub git: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub branch: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub tag: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub rev: Option<String>,

#[serde(default, skip_serializing_if = "Option::is_none")]
pub args: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
@@ -90,7 +100,7 @@ pub struct ResolvedNode {
pub env: Option<BTreeMap<String, EnvValue>>,

#[serde(default)]
pub deploy: Deploy,
pub deploy: Option<Deploy>,

#[serde(flatten)]
pub kind: CoreNodeKind,
@@ -216,7 +226,8 @@ pub struct CustomNode {
/// args: some_node.py
///
/// Source can match any executable in PATH.
pub source: String,
pub path: String,
pub source: NodeSource,
/// Args for the executable.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub args: Option<String>,
@@ -234,6 +245,28 @@ pub struct CustomNode {
pub run_config: NodeRunConfig,
}

#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub enum NodeSource {
Local,
GitBranch {
repo: String,
rev: Option<GitRepoRev>,
},
}

#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub enum ResolvedNodeSource {
Local,
GitCommit { repo: String, commit_hash: String },
}

#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub enum GitRepoRev {
Branch(String),
Tag(String),
Rev(String),
}

#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(untagged)]
pub enum EnvValue {


+ 33
- 0
libraries/message/src/lib.rs View File

@@ -24,9 +24,42 @@ pub mod coordinator_to_cli;

pub use arrow_data;
pub use arrow_schema;
use uuid::{Timestamp, Uuid};

pub type DataflowId = uuid::Uuid;

#[derive(
Debug, Clone, Copy, serde::Serialize, serde::Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash,
)]
pub struct SessionId(uuid::Uuid);

impl SessionId {
pub fn generate() -> Self {
Self(Uuid::new_v7(Timestamp::now(uuid::NoContext)))
}

pub fn uuid(&self) -> uuid::Uuid {
self.0
}
}

#[derive(
Debug, Clone, Copy, serde::Serialize, serde::Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash,
)]
pub struct BuildId(uuid::Uuid);

impl BuildId {
pub fn generate() -> Self {
Self(Uuid::new_v7(Timestamp::now(uuid::NoContext)))
}
}

impl std::fmt::Display for BuildId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "BuildId({})", self.0)
}
}

fn current_crate_version() -> semver::Version {
let crate_version_raw = env!("CARGO_PKG_VERSION");



Loading…
Cancel
Save