Browse Source

Make coordinator usable in examples

Allow passing runtime path via coordinator
tags/v0.2.3-rc
Philipp Oppermann 2 years ago
parent
commit
188813cd57
Failed to extract signature
6 changed files with 30 additions and 13 deletions
  1. +1
    -0
      Cargo.lock
  2. +1
    -0
      Cargo.toml
  3. +12
    -9
      binaries/coordinator/src/lib.rs
  4. +4
    -2
      binaries/coordinator/src/run/mod.rs
  5. +11
    -2
      binaries/daemon/src/lib.rs
  6. +1
    -0
      libraries/core/src/daemon_messages.rs

+ 1
- 0
Cargo.lock View File

@@ -1152,6 +1152,7 @@ dependencies = [
"futures",
"serde_yaml 0.8.26",
"tokio",
"tokio-stream",
"tracing",
"tracing-subscriber",
"uuid",


+ 1
- 0
Cargo.toml View File

@@ -68,6 +68,7 @@ uuid = { version = "1.2.1", features = ["v4", "serde"] }
tracing = "0.1.36"
tracing-subscriber = "0.3.15"
futures = "0.3.25"
tokio-stream = "0.1.11"

[[example]]
name = "c-dataflow"


+ 12
- 9
binaries/coordinator/src/lib.rs View File

@@ -2,7 +2,7 @@ use crate::{
run::spawn_dataflow,
tcp_utils::{tcp_receive, tcp_send},
};
use control::ControlEvent;
pub use control::ControlEvent;
use dora_core::{
config::{CommunicationConfig, DataId, NodeId, OperatorId},
coordinator_messages::RegisterResult,
@@ -41,9 +41,6 @@ pub struct Args {
#[clap(long)]
pub port: Option<u16>,

#[clap(long)]
pub run_dataflow: Option<PathBuf>,

#[clap(long)]
pub dora_runtime_path: Option<PathBuf>,
}
@@ -70,7 +67,7 @@ pub async fn start(
.port();
let mut tasks = FuturesUnordered::new();
let future = async move {
start_inner(listener, &tasks, external_events).await?;
start_inner(listener, &tasks, external_events, args.dora_runtime_path).await?;

tracing::debug!("coordinator main loop finished, waiting on spawned tasks");
while let Some(join_result) = tasks.next().await {
@@ -88,6 +85,7 @@ async fn start_inner(
listener: TcpListener,
tasks: &FuturesUnordered<JoinHandle<()>>,
external_events: impl Stream<Item = Event> + Unpin,
runtime_path: Option<PathBuf>,
) -> eyre::Result<()> {
let new_daemon_connections = TcpListenerStream::new(listener).map(|c| {
c.map(Event::NewDaemonConnection)
@@ -278,9 +276,13 @@ async fn start_inner(
bail!("there is already a running dataflow with name `{name}`");
}
}
let dataflow =
start_dataflow(&dataflow_path, name, &mut daemon_connections)
.await?;
let dataflow = start_dataflow(
&dataflow_path,
name,
runtime_path.clone(),
&mut daemon_connections,
)
.await?;
Ok(dataflow)
};
inner.await.map(|dataflow| {
@@ -617,13 +619,14 @@ async fn reload_dataflow(
async fn start_dataflow(
path: &Path,
name: Option<String>,
runtime_path: Option<PathBuf>,
daemon_connections: &mut HashMap<String, TcpStream>,
) -> eyre::Result<RunningDataflow> {
let SpawnedDataflow {
uuid,
communication_config,
machines,
} = spawn_dataflow(path, daemon_connections).await?;
} = spawn_dataflow(path, runtime_path, daemon_connections).await?;
Ok(RunningDataflow {
uuid,
name,


+ 4
- 2
binaries/coordinator/src/run/mod.rs View File

@@ -8,7 +8,7 @@ use dora_core::{
use eyre::{bail, eyre, ContextCompat, WrapErr};
use std::{
collections::{BTreeSet, HashMap},
path::Path,
path::{Path, PathBuf},
};
use tokio::net::TcpStream;
use uuid::Uuid;
@@ -16,6 +16,7 @@ use uuid::Uuid;
#[tracing::instrument(skip(daemon_connections))]
pub async fn spawn_dataflow(
dataflow_path: &Path,
runtime_path: Option<PathBuf>,
daemon_connections: &mut HashMap<String, TcpStream>,
) -> eyre::Result<SpawnedDataflow> {
let descriptor = Descriptor::read(dataflow_path).await.wrap_err_with(|| {
@@ -24,7 +25,7 @@ pub async fn spawn_dataflow(
dataflow_path.display()
)
})?;
descriptor.check(dataflow_path, None)?;
descriptor.check(dataflow_path, runtime_path.clone())?;
let working_dir = dataflow_path
.canonicalize()
.context("failed to canoncialize dataflow path")?
@@ -48,6 +49,7 @@ pub async fn spawn_dataflow(
working_dir,
nodes,
daemon_communication: descriptor.daemon_config,
runtime_path,
};
let message = serde_json::to_vec(&DaemonCoordinatorEvent::Spawn(spawn_command))?;



+ 11
- 2
binaries/daemon/src/lib.rs View File

@@ -103,6 +103,7 @@ impl Daemon {
working_dir,
nodes,
daemon_communication: descriptor.daemon_config,
runtime_path: dora_runtime_path.clone(),
};

let exit_when_done = spawn_command
@@ -254,9 +255,16 @@ impl Daemon {
working_dir,
nodes,
daemon_communication,
runtime_path,
}) => {
let result = self
.spawn_dataflow(dataflow_id, working_dir, nodes, daemon_communication)
.spawn_dataflow(
dataflow_id,
working_dir,
nodes,
runtime_path.as_deref(),
daemon_communication,
)
.await;
if let Err(err) = &result {
tracing::error!("{err:?}");
@@ -350,6 +358,7 @@ impl Daemon {
dataflow_id: uuid::Uuid,
working_dir: PathBuf,
nodes: Vec<ResolvedNode>,
runtime_path: Option<&Path>,
daemon_communication_config: DaemonCommunicationConfig,
) -> eyre::Result<()> {
let dataflow = RunningDataflow::new(dataflow_id);
@@ -405,7 +414,7 @@ impl Daemon {
node,
self.events_tx.clone(),
daemon_communication_config,
self.dora_runtime_path.as_deref(),
runtime_path.or(self.dora_runtime_path.as_deref()),
)
.await
.wrap_err_with(|| format!("failed to spawn node `{node_id}`"))?;


+ 1
- 0
libraries/core/src/daemon_messages.rs View File

@@ -232,6 +232,7 @@ pub struct SpawnDataflowNodes {
pub working_dir: PathBuf,
pub nodes: Vec<ResolvedNode>,
pub daemon_communication: DaemonCommunicationConfig,
pub runtime_path: Option<PathBuf>,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]


Loading…
Cancel
Save