Browse Source

Make `dora run` error on distributed dataflows

tags/v0.3.12-rc0
Philipp Oppermann 7 months ago
parent
commit
829f483007
Failed to extract signature
6 changed files with 50 additions and 27 deletions
  1. +1
    -1
      binaries/cli/src/command/mod.rs
  2. +20
    -11
      binaries/coordinator/src/lib.rs
  3. +7
    -4
      binaries/coordinator/src/run/mod.rs
  4. +9
    -0
      binaries/daemon/src/lib.rs
  5. +7
    -5
      libraries/core/src/descriptor/validate.rs
  6. +6
    -6
      libraries/message/src/descriptor.rs

+ 1
- 1
binaries/cli/src/command/mod.rs View File

@@ -26,7 +26,7 @@ fn local_working_dir(
if dataflow_descriptor
.nodes
.iter()
.all(|n| n.deploy.machine.is_none())
.all(|n| n.deploy.as_ref().map(|d| d.machine.as_ref()).is_none())
&& cli_and_daemon_on_same_machine(coordinator_session)?
{
Some(


+ 20
- 11
binaries/coordinator/src/lib.rs View File

@@ -1197,7 +1197,7 @@ async fn retrieve_logs(
let machine_ids: Vec<Option<String>> = nodes
.values()
.filter(|node| node.id == node_id)
.map(|node| node.deploy.machine.clone())
.map(|node| node.deploy.as_ref().and_then(|d| d.machine.clone()))
.collect();

let machine_id = if let [machine_id] = &machine_ids[..] {
@@ -1263,14 +1263,24 @@ async fn build_dataflow(

let mut git_sources_by_daemon = git_sources
.into_iter()
.into_grouping_map_by(|(id, _)| nodes.get(id).and_then(|n| n.deploy.machine.as_ref()))
.into_grouping_map_by(|(id, _)| {
nodes
.get(id)
.and_then(|n| n.deploy.as_ref().and_then(|d| d.machine.as_ref()))
})
.collect();
let mut prev_git_sources_by_daemon = prev_git_sources
.into_iter()
.into_grouping_map_by(|(id, _)| nodes.get(id).and_then(|n| n.deploy.machine.as_ref()))
.into_grouping_map_by(|(id, _)| {
nodes
.get(id)
.and_then(|n| n.deploy.as_ref().and_then(|d| d.machine.as_ref()))
})
.collect();

let nodes_by_daemon = nodes.values().into_group_map_by(|n| &n.deploy.machine);
let nodes_by_daemon = nodes
.values()
.into_group_map_by(|n| n.deploy.as_ref().and_then(|d| d.machine.as_ref()));

let mut daemons = BTreeSet::new();
for (machine, nodes_on_machine) in &nodes_by_daemon {
@@ -1283,11 +1293,9 @@ async fn build_dataflow(
build_id,
session_id,
local_working_dir: local_working_dir.clone(),
git_sources: git_sources_by_daemon
.remove(&machine.as_ref())
.unwrap_or_default(),
git_sources: git_sources_by_daemon.remove(machine).unwrap_or_default(),
prev_git_sources: prev_git_sources_by_daemon
.remove(&machine.as_ref())
.remove(machine)
.unwrap_or_default(),
dataflow_descriptor: dataflow.clone(),
nodes_on_machine,
@@ -1298,9 +1306,10 @@ async fn build_dataflow(
timestamp: clock.new_timestamp(),
})?;

let daemon_id = build_dataflow_on_machine(daemon_connections, machine.as_deref(), &message)
.await
.wrap_err_with(|| format!("failed to build dataflow on machine `{machine:?}`"))?;
let daemon_id =
build_dataflow_on_machine(daemon_connections, machine.map(|s| s.as_str()), &message)
.await
.wrap_err_with(|| format!("failed to build dataflow on machine `{machine:?}`"))?;
daemons.insert(daemon_id);
}



+ 7
- 4
binaries/coordinator/src/run/mod.rs View File

@@ -33,7 +33,9 @@ pub(super) async fn spawn_dataflow(
let nodes = dataflow.resolve_aliases_and_set_defaults()?;
let uuid = Uuid::new_v7(Timestamp::now(NoContext));

let nodes_by_daemon = nodes.values().into_group_map_by(|n| &n.deploy.machine);
let nodes_by_daemon = nodes
.values()
.into_group_map_by(|n| n.deploy.as_ref().and_then(|d| d.machine.as_ref()));

let mut daemons = BTreeSet::new();
for (machine, nodes_on_machine) in &nodes_by_daemon {
@@ -57,9 +59,10 @@ pub(super) async fn spawn_dataflow(
timestamp: clock.new_timestamp(),
})?;

let daemon_id = spawn_dataflow_on_machine(daemon_connections, machine.as_deref(), &message)
.await
.wrap_err_with(|| format!("failed to spawn dataflow on machine `{machine:?}`"))?;
let daemon_id =
spawn_dataflow_on_machine(daemon_connections, machine.map(|m| m.as_str()), &message)
.await
.wrap_err_with(|| format!("failed to spawn dataflow on machine `{machine:?}`"))?;
daemons.insert(daemon_id);
}



+ 9
- 0
binaries/daemon/src/lib.rs View File

@@ -174,6 +174,15 @@ impl Daemon {
.to_owned();

let descriptor = read_as_descriptor(dataflow_path).await?;
if let Some(node) = descriptor.nodes.iter().find(|n| n.deploy.is_some()) {
eyre::bail!(
"node {} has a `deploy` section, which is not supported in `dora run`\n\n
Instead, you need to spawn a `dora coordinator` and one or more `dora daemon`
instances and then use `dora start`.",
node.id
)
}

descriptor.check(&working_dir)?;
let nodes = descriptor.resolve_aliases_and_set_defaults()?;



+ 7
- 5
libraries/core/src/descriptor/validate.rs View File

@@ -36,11 +36,13 @@ pub fn check_dataflow(
if source_is_url(source) {
info!("{source} is a URL."); // TODO: Implement url check.
} else if let Some(remote_daemon_id) = remote_daemon_id {
if let Some(machine) = &node.deploy.machine {
if remote_daemon_id.contains(&machine.as_str())
|| coordinator_is_remote
{
info!("skipping path check for remote node `{}`", node.id);
if let Some(deploy) = &node.deploy {
if let Some(machine) = &deploy.machine {
if remote_daemon_id.contains(&machine.as_str())
|| coordinator_is_remote
{
info!("skipping path check for remote node `{}`", node.id);
}
}
}
} else if custom.build.is_some() {


+ 6
- 6
libraries/message/src/descriptor.rs View File

@@ -23,15 +23,15 @@ pub struct Descriptor {
#[serde(default)]
pub communication: CommunicationConfig,
#[schemars(skip)]
#[serde(default, rename = "_unstable_deploy")]
pub deploy: Deploy,
#[serde(rename = "_unstable_deploy")]
pub deploy: Option<Deploy>,
pub nodes: Vec<Node>,
#[schemars(skip)]
#[serde(default, rename = "_unstable_debug")]
pub debug: Debug,
}

#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)]
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(deny_unknown_fields)]
pub struct Deploy {
pub machine: Option<String>,
@@ -58,8 +58,8 @@ pub struct Node {

/// Unstable machine deployment configuration
#[schemars(skip)]
#[serde(default, rename = "_unstable_deploy")]
pub deploy: Deploy,
#[serde(rename = "_unstable_deploy")]
pub deploy: Option<Deploy>,

#[serde(default, skip_serializing_if = "Option::is_none")]
pub operators: Option<RuntimeNode>,
@@ -99,7 +99,7 @@ pub struct ResolvedNode {
pub env: Option<BTreeMap<String, EnvValue>>,

#[serde(default)]
pub deploy: Deploy,
pub deploy: Option<Deploy>,

#[serde(flatten)]
pub kind: CoreNodeKind,


Loading…
Cancel
Save