Philipp Oppermann 7 months ago
parent
commit
c7473182ad
Failed to extract signature
14 changed files with 254 additions and 91 deletions
  1. +80
    -13
      binaries/coordinator/src/lib.rs
  2. +15
    -0
      binaries/coordinator/src/listener.rs
  3. +6
    -4
      binaries/coordinator/src/run/mod.rs
  4. +7
    -10
      binaries/daemon/src/build/git.rs
  5. +14
    -8
      binaries/daemon/src/build/mod.rs
  6. +75
    -37
      binaries/daemon/src/lib.rs
  7. +4
    -4
      binaries/daemon/src/log.rs
  8. +0
    -1
      examples/multiple-daemons/run.rs
  9. +20
    -7
      libraries/message/src/cli_to_coordinator.rs
  10. +2
    -2
      libraries/message/src/common.rs
  11. +1
    -0
      libraries/message/src/coordinator_to_cli.rs
  12. +20
    -4
      libraries/message/src/coordinator_to_daemon.rs
  13. +9
    -1
      libraries/message/src/daemon_to_coordinator.rs
  14. +1
    -0
      libraries/message/src/lib.rs

+ 80
- 13
binaries/coordinator/src/lib.rs View File

@@ -20,7 +20,7 @@ use dora_message::{
},
daemon_to_coordinator::{DaemonCoordinatorReply, DataflowDaemonResult},
descriptor::{Descriptor, ResolvedNode},
BuildId,
BuildId, DataflowId, SessionId,
};
use eyre::{bail, eyre, ContextCompat, Result, WrapErr};
use futures::{future::join_all, stream::FuturesUnordered, Future, Stream, StreamExt};
@@ -33,6 +33,7 @@ use std::{
btree_map::{Entry, OccupiedEntry},
BTreeMap, BTreeSet, HashMap,
},
env::current_dir,
net::SocketAddr,
path::PathBuf,
sync::Arc,
@@ -202,13 +203,15 @@ async fn start_inner(

let mut events = (abortable_events, daemon_events).merge();

let mut running_dataflows: HashMap<Uuid, RunningDataflow> = HashMap::new();
let mut dataflow_results: HashMap<Uuid, BTreeMap<DaemonId, DataflowDaemonResult>> =
let mut running_builds: HashMap<BuildId, RunningBuild> = HashMap::new();

let mut running_dataflows: HashMap<DataflowId, RunningDataflow> = HashMap::new();
let mut dataflow_results: HashMap<DataflowId, BTreeMap<DaemonId, DataflowDaemonResult>> =
HashMap::new();
let mut archived_dataflows: HashMap<Uuid, ArchivedDataflow> = HashMap::new();
let mut archived_dataflows: HashMap<DataflowId, ArchivedDataflow> = HashMap::new();
let mut daemon_connections = DaemonConnections::default();

let mut build_log_subscribers: BTreeMap<BuildId, LogSubscriber> = Default::default();
let mut build_log_subscribers: BTreeMap<SessionId, LogSubscriber> = Default::default();

while let Some(event) = events.next().await {
// used below for measuring the event handling duration
@@ -411,6 +414,7 @@ async fn start_inner(
} => {
match request {
ControlRequest::Build {
session_id,
dataflow,
git_sources,
prev_git_sources,
@@ -418,9 +422,10 @@ async fn start_inner(
uv,
} => {
// assign a random build id
let session_id = BuildId::new_v4();
let build_id = SessionId::new_v4();

let result = build_dataflow(
build_id,
session_id,
dataflow,
git_sources,
@@ -443,6 +448,7 @@ async fn start_inner(
}
}
ControlRequest::Start {
build_id,
session_id,
dataflow,
name,
@@ -462,6 +468,7 @@ async fn start_inner(
}
}
let dataflow = start_dataflow(
build_id,
session_id,
dataflow,
local_working_dir,
@@ -779,6 +786,42 @@ async fn start_inner(
tracing::info!("Daemon `{daemon_id}` exited");
daemon_connections.remove(&daemon_id);
}
Event::DataflowBuildResult {
build_id,
session_id,
daemon_id,
result,
} => match running_builds.get_mut(&build_id) {
Some(build) => {
build.pending_build_results.remove(&daemon_id);
match result {
Ok(()) => {}
Err(err) => {
build.errors.push(format!("{err:?}"));
}
};
if build.pending_build_results.is_empty() {
tracing::info!("dataflow build finished: `{build_id}`");
let build = running_builds.remove(&build_id).unwrap();
let result = if build.errors.is_empty() {
Ok(())
} else {
Err(format!("build failed: {}", build.errors.join("\n\n")))
};

build.build_result_sender.send(Ok(
ControlRequestReply::DataflowBuildFinished {
build_id,
session_id,
result,
},
));
}
}
None => {
tracing::warn!("received DataflowSpawnResult, but no matching dataflow in `running_dataflows` map");
}
},
Event::DataflowSpawnResult {
dataflow_id,
daemon_id,
@@ -836,7 +879,7 @@ async fn send_log_message(dataflow: &mut RunningDataflow, message: &LogMessage)

async fn send_log_message_to_subscriber(
message: &LogMessage,
mut subscriber: OccupiedEntry<'_, BuildId, LogSubscriber>,
mut subscriber: OccupiedEntry<'_, SessionId, LogSubscriber>,
) {
let send_result = tokio::time::timeout(
Duration::from_millis(100),
@@ -916,6 +959,19 @@ async fn send_heartbeat_message(
.wrap_err("failed to send heartbeat message to daemon")
}

struct RunningBuild {
build_id: BuildId,
/// The IDs of the daemons that the build is running on.
daemons: BTreeSet<DaemonId>,

errors: Vec<String>,
build_result_sender: tokio::sync::oneshot::Sender<eyre::Result<ControlRequestReply>>,

log_subscribers: Vec<LogSubscriber>,

pending_build_results: BTreeSet<DaemonId>,
}

struct RunningDataflow {
name: Option<String>,
uuid: Uuid,
@@ -1180,11 +1236,12 @@ async fn retrieve_logs(

#[tracing::instrument(skip(daemon_connections, clock))]
async fn build_dataflow(
session_id: BuildId,
build_id: BuildId,
session_id: SessionId,
dataflow: Descriptor,
git_sources: BTreeMap<NodeId, GitSource>,
prev_git_sources: BTreeMap<NodeId, GitSource>,
working_dir: PathBuf,
local_working_dir: Option<PathBuf>,
clock: &HLC,
uv: bool,
daemon_connections: &mut DaemonConnections,
@@ -1210,8 +1267,9 @@ async fn build_dataflow(
);

let build_command = BuildDataflowNodes {
build_id,
session_id,
working_dir: working_dir.clone(),
local_working_dir: local_working_dir.clone(),
nodes: nodes.clone(),
git_sources: git_sources_by_daemon
.remove(&machine.as_ref())
@@ -1278,9 +1336,10 @@ async fn build_dataflow_on_machine(
}

async fn start_dataflow(
session_id: Option<Uuid>,
build_id: Option<BuildId>,
session_id: SessionId,
dataflow: Descriptor,
working_dir: PathBuf,
local_working_dir: Option<PathBuf>,
name: Option<String>,
daemon_connections: &mut DaemonConnections,
clock: &HLC,
@@ -1291,9 +1350,10 @@ async fn start_dataflow(
daemons,
nodes,
} = spawn_dataflow(
build_id,
session_id,
dataflow,
working_dir,
local_working_dir,
daemon_connections,
clock,
uv,
@@ -1388,6 +1448,12 @@ pub enum Event {
DaemonExit {
daemon_id: dora_message::common::DaemonId,
},
DataflowBuildResult {
build_id: BuildId,
session_id: SessionId,
daemon_id: DaemonId,
result: eyre::Result<()>,
},
DataflowSpawnResult {
dataflow_id: uuid::Uuid,
daemon_id: DaemonId,
@@ -1417,6 +1483,7 @@ impl Event {
Event::CtrlC => "CtrlC",
Event::Log(_) => "Log",
Event::DaemonExit { .. } => "DaemonExit",
Event::DataflowBuildResult { .. } => "DataflowBuildResult",
Event::DataflowSpawnResult { .. } => "DataflowSpawnResult",
}
}


+ 15
- 0
binaries/coordinator/src/listener.rs View File

@@ -112,6 +112,21 @@ pub async fn handle_connection(
break;
}
}
DaemonEvent::BuildResult {
build_id,
session_id,
result,
} => {
let event = Event::DataflowBuildResult {
build_id,
session_id,
daemon_id,
result: result.map_err(|err| eyre::eyre!(err)),
};
if events_tx.send(event).await.is_err() {
break;
}
}
DaemonEvent::SpawnResult {
dataflow_id,
result,


+ 6
- 4
binaries/coordinator/src/run/mod.rs View File

@@ -10,7 +10,7 @@ use dora_message::{
daemon_to_coordinator::DaemonCoordinatorReply,
descriptor::{Descriptor, ResolvedNode},
id::NodeId,
BuildId,
SessionId,
};
use eyre::{bail, eyre, ContextCompat, WrapErr};
use itertools::Itertools;
@@ -22,9 +22,10 @@ use uuid::{NoContext, Timestamp, Uuid};

#[tracing::instrument(skip(daemon_connections, clock))]
pub(super) async fn spawn_dataflow(
session_id: Option<BuildId>,
build_id: Option<SessionId>,
session_id: SessionId,
dataflow: Descriptor,
working_dir: PathBuf,
local_working_dir: Option<PathBuf>,
daemon_connections: &mut DaemonConnections,
clock: &HLC,
uv: bool,
@@ -42,9 +43,10 @@ pub(super) async fn spawn_dataflow(
);

let spawn_command = SpawnDataflowNodes {
build_id,
session_id,
dataflow_id: uuid,
working_dir: working_dir.clone(),
local_working_dir: local_working_dir.clone(),
nodes: nodes.clone(),
dataflow_descriptor: dataflow.clone(),
spawn_nodes,


+ 7
- 10
binaries/daemon/src/build/git.rs View File

@@ -1,5 +1,5 @@
use crate::log::NodeBuildLogger;
use dora_message::{common::LogLevel, descriptor::GitRepoRev, BuildId, DataflowId};
use dora_message::{common::LogLevel, descriptor::GitRepoRev, DataflowId, SessionId};
use eyre::{ContextCompat, WrapErr};
use git2::FetchOptions;
use itertools::Itertools;
@@ -15,7 +15,7 @@ pub struct GitManager {
/// Directories that are currently in use by running dataflows.
clones_in_use: BTreeMap<PathBuf, BTreeSet<DataflowId>>,
/// Builds that are prepared, but not done yet.
prepared_builds: BTreeMap<BuildId, PreparedBuild>,
prepared_builds: BTreeMap<SessionId, PreparedBuild>,
reuse_for: BTreeMap<PathBuf, PathBuf>,
}

@@ -36,7 +36,7 @@ impl GitManager {
prev_commit_hash: Option<String>,
target_dir: &Path,
) -> eyre::Result<GitFolder> {
let clone_dir = Self::clone_dir_path(&target_dir, session_id, &repo_url, &commit_hash)?;
let clone_dir = Self::clone_dir_path(&target_dir, &repo_url, &commit_hash)?;

if let Some(using) = self.clones_in_use.get(&clone_dir) {
if !using.is_empty() {
@@ -60,7 +60,7 @@ impl GitManager {
} else if let Some(previous_commit_hash) = prev_commit_hash {
// we might be able to update a previous clone
let prev_clone_dir =
Self::clone_dir_path(&target_dir, session_id, &repo_url, &previous_commit_hash)?;
Self::clone_dir_path(&target_dir, &repo_url, &previous_commit_hash)?;

if self
.clones_in_use
@@ -109,7 +109,7 @@ impl GitManager {
.unwrap_or(false)
}

pub fn clone_dir_ready(&self, session_id: BuildId, dir: &Path) -> bool {
pub fn clone_dir_ready(&self, session_id: SessionId, dir: &Path) -> bool {
self.prepared_builds
.get(&session_id)
.map(|p| p.planned_clone_dirs.contains(dir))
@@ -117,7 +117,7 @@ impl GitManager {
|| dir.exists()
}

pub fn register_ready_clone_dir(&mut self, session_id: BuildId, dir: PathBuf) -> bool {
pub fn register_ready_clone_dir(&mut self, session_id: SessionId, dir: PathBuf) -> bool {
self.prepared_builds
.entry(session_id)
.or_default()
@@ -127,13 +127,10 @@ impl GitManager {

fn clone_dir_path(
base_dir: &Path,
session_id: BuildId,
repo_url: &Url,
commit_hash: &String,
) -> eyre::Result<PathBuf> {
let mut path = base_dir
.join(&session_id.to_string())
.join(repo_url.host_str().context("git URL has no hostname")?);
let mut path = base_dir.join(repo_url.host_str().context("git URL has no hostname")?);
path.extend(repo_url.path_segments().context("no path in git URL")?);
let path = path.join(commit_hash);
Ok(dunce::simplified(&path).to_owned())


+ 14
- 8
binaries/daemon/src/build/mod.rs View File

@@ -11,7 +11,7 @@ use dora_core::{
use dora_message::{
common::{GitSource, LogLevel, Timestamped},
descriptor::EnvValue,
BuildId,
SessionId,
};
use eyre::Context;
use tokio::sync::mpsc;
@@ -22,8 +22,8 @@ mod git;

#[derive(Clone)]
pub struct Builder {
pub session_id: BuildId,
pub working_dir: PathBuf,
pub session_id: SessionId,
pub base_working_dir: PathBuf,
pub daemon_tx: mpsc::Sender<Timestamped<Event>>,
pub dataflow_descriptor: Descriptor,
/// clock is required for generating timestamps when dropping messages early because queue is full
@@ -44,7 +44,7 @@ impl Builder {

let prepared_git = if let Some(GitSource { repo, commit_hash }) = git {
let repo_url = Url::parse(&repo).context("failed to parse git repository URL")?;
let target_dir = self.working_dir.join("build");
let target_dir = self.base_working_dir.join("git");
let prev_hash = prev_git.filter(|p| p.repo == repo).map(|p| p.commit_hash);
let git_folder = git_manager.choose_clone_dir(
self.session_id,
@@ -79,7 +79,7 @@ impl Builder {
dora_core::descriptor::CoreNodeKind::Custom(n) => {
let node_working_dir = match git_folder {
Some(git_folder) => git_folder.prepare(logger).await?,
None => self.working_dir.join(self.session_id.to_string()),
None => self.base_working_dir,
};

if let Some(build) = &n.build {
@@ -91,11 +91,17 @@ impl Builder {
// run build commands
for operator in &n.operators {
if let Some(build) = &operator.config.build {
build_node(logger, &node.env, self.working_dir.clone(), build, self.uv)
.await?;
build_node(
logger,
&node.env,
self.base_working_dir.clone(),
build,
self.uv,
)
.await?;
}
}
self.working_dir.clone()
self.base_working_dir.clone()
}
};
Ok(PreparedNode {


+ 75
- 37
binaries/daemon/src/lib.rs View File

@@ -12,7 +12,8 @@ use dora_core::{
};
use dora_message::{
common::{
DaemonId, DataMessage, DropToken, LogLevel, NodeError, NodeErrorCause, NodeExitStatus,
DaemonId, DataMessage, DropToken, GitSource, LogLevel, NodeError, NodeErrorCause,
NodeExitStatus,
},
coordinator_to_cli::DataflowResult,
coordinator_to_daemon::{BuildDataflowNodes, DaemonCoordinatorEvent, SpawnDataflowNodes},
@@ -24,7 +25,7 @@ use dora_message::{
descriptor::NodeSource,
metadata::{self, ArrowTypeInfo},
node_to_daemon::{DynamicNodeEvent, Timestamped},
DataflowId,
BuildId, DataflowId, SessionId,
};
use dora_node_api::{arrow::datatypes::DataType, Parameter};
use eyre::{bail, eyre, Context, ContextCompat, Result};
@@ -38,6 +39,7 @@ use socket_stream_utils::socket_stream_send;
use spawn::Spawner;
use std::{
collections::{BTreeMap, BTreeSet, HashMap},
env::current_dir,
future::Future,
net::SocketAddr,
path::{Path, PathBuf},
@@ -154,7 +156,13 @@ impl Daemon {
.map(|_| ())
}

pub async fn run_dataflow(dataflow_path: &Path, uv: bool) -> eyre::Result<DataflowResult> {
pub async fn run_dataflow(
dataflow_path: &Path,
build_id: Option<BuildId>,
session_id: SessionId,
local_working_dir: Option<PathBuf>,
uv: bool,
) -> eyre::Result<DataflowResult> {
let working_dir = dataflow_path
.canonicalize()
.context("failed to canonicalize dataflow path")?
@@ -168,9 +176,10 @@ impl Daemon {

let dataflow_id = Uuid::new_v7(Timestamp::now(NoContext));
let spawn_command = SpawnDataflowNodes {
build_id,
session_id,
dataflow_id,
working_dir,
local_working_dir,
spawn_nodes: nodes.keys().cloned().collect(),
nodes,
dataflow_descriptor: descriptor,
@@ -419,21 +428,26 @@ impl Daemon {
.await?;
}
},
Event::BuildDataflowResult { session_id, result } => {
Event::BuildDataflowResult {
build_id,
session_id,
result,
} => {
if let Some(connection) = &mut self.coordinator_connection {
let msg = serde_json::to_vec(&Timestamped {
inner: CoordinatorRequest::Event {
daemon_id: self.daemon_id.clone(),
event: DaemonEvent::SpawnResult {
dataflow_id,
event: DaemonEvent::BuildResult {
build_id,
session_id,
result: result.map_err(|err| format!("{err:?}")),
},
},
timestamp: self.clock.new_timestamp(),
})?;
socket_stream_send(connection, &msg)
.await
.wrap_err("failed to send Exit message to dora-coordinator")?;
socket_stream_send(connection, &msg).await.wrap_err(
"failed to send BuildDataflowResult message to dora-coordinator",
)?;
}
}
Event::SpawnDataflowResult {
@@ -451,9 +465,9 @@ impl Daemon {
},
timestamp: self.clock.new_timestamp(),
})?;
socket_stream_send(connection, &msg)
.await
.wrap_err("failed to send Exit message to dora-coordinator")?;
socket_stream_send(connection, &msg).await.wrap_err(
"failed to send SpawnDataflowResult message to dora-coordinator",
)?;
}
}
}
@@ -491,8 +505,9 @@ impl Daemon {
) -> eyre::Result<RunStatus> {
let status = match event {
DaemonCoordinatorEvent::Build(BuildDataflowNodes {
build_id,
session_id,
working_dir,
local_working_dir,
nodes,
git_sources,
prev_git_sources,
@@ -504,17 +519,12 @@ impl Daemon {
dora_core::config::RemoteCommunicationConfig::Tcp => {}
}

// Use the working directory if it exists, otherwise use the working directory where the daemon is spawned
let working_dir = if working_dir.exists() {
working_dir
} else {
std::env::current_dir().wrap_err("failed to get current working dir")?
};
let base_working_dir = self.base_working_dir(local_working_dir, session_id)?;

let result = self
.build_dataflow(
session_id,
working_dir,
base_working_dir,
nodes,
git_sources,
prev_git_sources,
@@ -538,6 +548,7 @@ impl Daemon {
tokio::spawn(async move {
let message = Timestamped {
inner: Event::BuildDataflowResult {
build_id,
session_id,
result: result_task.await,
},
@@ -557,9 +568,10 @@ impl Daemon {
RunStatus::Continue
}
DaemonCoordinatorEvent::Spawn(SpawnDataflowNodes {
build_id,
session_id,
dataflow_id,
working_dir,
local_working_dir,
nodes,
dataflow_descriptor,
spawn_nodes,
@@ -569,18 +581,13 @@ impl Daemon {
dora_core::config::RemoteCommunicationConfig::Tcp => {}
}

// Use the working directory if it exists, otherwise use the working directory where the daemon is spawned
let working_dir = if working_dir.exists() {
working_dir
} else {
std::env::current_dir().wrap_err("failed to get current working dir")?
};
let base_working_dir = self.base_working_dir(local_working_dir, session_id)?;

let result = self
.spawn_dataflow(
session_id,
build_id,
dataflow_id,
working_dir,
base_working_dir,
nodes,
dataflow_descriptor,
spawn_nodes,
@@ -852,7 +859,7 @@ impl Daemon {
async fn build_dataflow(
&mut self,
session_id: uuid::Uuid,
working_dir: PathBuf,
base_working_dir: PathBuf,
nodes: BTreeMap<NodeId, ResolvedNode>,
git_sources: BTreeMap<NodeId, GitSource>,
prev_git_sources: BTreeMap<NodeId, GitSource>,
@@ -862,7 +869,7 @@ impl Daemon {
) -> eyre::Result<impl Future<Output = eyre::Result<()>>> {
let builder = build::Builder {
session_id,
working_dir,
base_working_dir,
daemon_tx: self.events_tx.clone(),
dataflow_descriptor,
clock: self.clock.clone(),
@@ -945,9 +952,9 @@ impl Daemon {

async fn spawn_dataflow(
&mut self,
session_id: Option<uuid::Uuid>,
dataflow_id: uuid::Uuid,
working_dir: PathBuf,
build_id: Option<BuildId>,
dataflow_id: DataflowId,
base_working_dir: PathBuf,
nodes: BTreeMap<NodeId, ResolvedNode>,
dataflow_descriptor: Descriptor,
spawn_nodes: BTreeSet<NodeId>,
@@ -963,7 +970,8 @@ impl Daemon {
RunningDataflow::new(dataflow_id, self.daemon_id.clone(), &dataflow_descriptor);
let dataflow = match self.running.entry(dataflow_id) {
std::collections::hash_map::Entry::Vacant(entry) => {
self.working_dir.insert(dataflow_id, working_dir.clone());
self.working_dir
.insert(dataflow_id, base_working_dir.clone());
entry.insert(dataflow)
}
std::collections::hash_map::Entry::Occupied(_) => {
@@ -1971,6 +1979,34 @@ impl Daemon {
}
Ok(RunStatus::Continue)
}

fn base_working_dir(
&self,
local_working_dir: Option<PathBuf>,
session_id: Uuid,
) -> eyre::Result<PathBuf> {
match local_working_dir {
Some(working_dir) => {
// check that working directory exists
if working_dir.exists() {
Ok(working_dir)
} else {
bail!(
"working directory does not exist: {}",
working_dir.display(),
)
}
}
None => {
// use subfolder of daemon working dir
let daemon_working_dir =
current_dir().context("failed to get daemon working dir")?;
Ok(daemon_working_dir
.join("_work")
.join(session_id.to_string()))
}
}
}
}

async fn set_up_event_stream(
@@ -2445,7 +2481,8 @@ pub enum Event {
result: Result<RunningNode, NodeError>,
},
BuildDataflowResult {
session_id: Uuid,
build_id: BuildId,
session_id: SessionId,
result: eyre::Result<()>,
},
SpawnDataflowResult {
@@ -2473,6 +2510,7 @@ impl Event {
Event::SecondCtrlC => "SecondCtrlC",
Event::DaemonError(_) => "DaemonError",
Event::SpawnNodeResult { .. } => "SpawnNodeResult",
Event::BuildDataflowResult { .. } => "BuildDataflowResult",
Event::SpawnDataflowResult { .. } => "SpawnDataflowResult",
}
}


+ 4
- 4
binaries/daemon/src/log.rs View File

@@ -8,7 +8,7 @@ use dora_core::{config::NodeId, uhlc};
use dora_message::{
common::{DaemonId, LogLevel, LogMessage, Timestamped},
daemon_to_coordinator::{CoordinatorRequest, DaemonEvent},
BuildId,
SessionId,
};
use eyre::Context;
use tokio::net::TcpStream;
@@ -95,7 +95,7 @@ impl<'a> DataflowLogger<'a> {
}

pub struct NodeBuildLogger<'a> {
session_id: BuildId,
session_id: SessionId,
node_id: NodeId,
logger: CowMut<'a, DaemonLogger>,
}
@@ -133,7 +133,7 @@ impl DaemonLogger {
}
}

pub fn for_node_build(&mut self, session_id: BuildId, node_id: NodeId) -> NodeBuildLogger {
pub fn for_node_build(&mut self, session_id: SessionId, node_id: NodeId) -> NodeBuildLogger {
NodeBuildLogger {
session_id,
node_id,
@@ -170,7 +170,7 @@ impl DaemonLogger {

pub async fn log_build(
&mut self,
session_id: BuildId,
session_id: SessionId,
level: LogLevel,
node_id: Option<NodeId>,
message: impl Into<String>,


+ 0
- 1
examples/multiple-daemons/run.rs View File

@@ -146,7 +146,6 @@ async fn start_dataflow(
local_working_dir: working_dir,
name: None,
uv: false,
build_only: false,
},
reply_sender,
}))


+ 20
- 7
libraries/message/src/cli_to_coordinator.rs View File

@@ -6,26 +6,39 @@ use crate::{
common::GitSource,
descriptor::Descriptor,
id::{NodeId, OperatorId},
SessionId,
};

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub enum ControlRequest {
Build {
session_id: SessionId,
dataflow: Descriptor,
git_sources: BTreeMap<NodeId, GitSource>,
prev_git_sources: BTreeMap<NodeId, GitSource>,
// TODO: remove this once we figure out deploying of node/operator
// binaries from CLI to coordinator/daemon
local_working_dir: PathBuf,
/// Allows overwriting the base working dir when CLI and daemon are
/// running on the same machine.
///
/// Must not be used for multi-machine dataflows.
///
/// Note that nodes with git sources still use a subdirectory of
/// the base working dir.
local_working_dir: Option<PathBuf>,
uv: bool,
},
Start {
session_id: Option<Uuid>,
build_id: Option<Uuid>,
session_id: Uuid,
dataflow: Descriptor,
name: Option<String>,
// TODO: remove this once we figure out deploying of node/operator
// binaries from CLI to coordinator/daemon
local_working_dir: PathBuf,
/// Allows overwriting the base working dir when CLI and daemon are
/// running on the same machine.
///
/// Must not be used for multi-machine dataflows.
///
/// Note that nodes with git sources still use a subdirectory of
/// the base working dir.
local_working_dir: Option<PathBuf>,
uv: bool,
},
WaitForSpawn {


+ 2
- 2
libraries/message/src/common.rs View File

@@ -5,14 +5,14 @@ use aligned_vec::{AVec, ConstAlign};
use eyre::Context as _;
use uuid::Uuid;

use crate::{daemon_to_daemon::InterDaemonEvent, id::NodeId, BuildId, DataflowId};
use crate::{daemon_to_daemon::InterDaemonEvent, id::NodeId, SessionId, DataflowId};

pub use log::Level as LogLevel;

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[must_use]
pub struct LogMessage {
pub session_id: Option<BuildId>,
pub session_id: Option<SessionId>,
pub dataflow_id: Option<DataflowId>,
pub node_id: Option<NodeId>,
pub daemon_id: Option<DaemonId>,


+ 1
- 0
libraries/message/src/coordinator_to_cli.rs View File

@@ -13,6 +13,7 @@ pub enum ControlRequestReply {
session_id: Uuid,
},
DataflowBuildFinished {
build_id: Uuid,
session_id: Uuid,
result: Result<(), String>,
},


+ 20
- 4
libraries/message/src/coordinator_to_daemon.rs View File

@@ -10,7 +10,7 @@ use crate::{
common::{DaemonId, GitSource},
descriptor::{Descriptor, ResolvedNode},
id::{NodeId, OperatorId},
DataflowId,
BuildId, DataflowId, SessionId,
};

pub use crate::common::Timestamped;
@@ -60,8 +60,16 @@ pub enum DaemonCoordinatorEvent {

#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub struct BuildDataflowNodes {
pub build_id: Uuid,
pub session_id: Uuid,
pub working_dir: PathBuf,
/// Allows overwriting the base working dir when CLI and daemon are
/// running on the same machine.
///
/// Must not be used for multi-machine dataflows.
///
/// Note that nodes with git sources still use a subdirectory of
/// the base working dir.
pub local_working_dir: Option<PathBuf>,
pub nodes: BTreeMap<NodeId, ResolvedNode>,
pub git_sources: BTreeMap<NodeId, GitSource>,
pub prev_git_sources: BTreeMap<NodeId, GitSource>,
@@ -72,9 +80,17 @@ pub struct BuildDataflowNodes {

#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub struct SpawnDataflowNodes {
pub session_id: Option<Uuid>,
pub build_id: Option<BuildId>,
pub session_id: SessionId,
pub dataflow_id: DataflowId,
pub working_dir: PathBuf,
/// Allows overwriting the base working dir when CLI and daemon are
/// running on the same machine.
///
/// Must not be used for multi-machine dataflows.
///
/// Note that nodes with git sources still use a subdirectory of
/// the base working dir.
pub local_working_dir: Option<PathBuf>,
pub nodes: BTreeMap<NodeId, ResolvedNode>,
pub dataflow_descriptor: Descriptor,
pub spawn_nodes: BTreeSet<NodeId>,


+ 9
- 1
libraries/message/src/daemon_to_coordinator.rs View File

@@ -3,7 +3,10 @@ use std::collections::BTreeMap;
pub use crate::common::{
DataMessage, LogLevel, LogMessage, NodeError, NodeErrorCause, NodeExitStatus, Timestamped,
};
use crate::{common::DaemonId, current_crate_version, id::NodeId, versions_compatible, DataflowId};
use crate::{
common::DaemonId, current_crate_version, id::NodeId, versions_compatible, BuildId, DataflowId,
SessionId,
};

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub enum CoordinatorRequest {
@@ -46,6 +49,11 @@ impl DaemonRegisterRequest {

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub enum DaemonEvent {
BuildResult {
build_id: BuildId,
session_id: SessionId,
result: Result<(), String>,
},
SpawnResult {
dataflow_id: DataflowId,
result: Result<(), String>,


+ 1
- 0
libraries/message/src/lib.rs View File

@@ -26,6 +26,7 @@ pub use arrow_data;
pub use arrow_schema;

pub type DataflowId = uuid::Uuid;
pub type SessionId = uuid::Uuid;
pub type BuildId = uuid::Uuid;

fn current_crate_version() -> semver::Version {


Loading…
Cancel
Save