Browse Source

Make `dora build` command behave like `dora start` without spawning

Instead of running all the build commands directly, run them on their intended target machines through the coordinator.

This commit is a breaking change because a coordinator connection is now required for `dora build`.
tags/v0.3.12-rc0
Philipp Oppermann 9 months ago
parent
commit
bcb861932c
Failed to extract signature
9 changed files with 145 additions and 123 deletions
  1. +0
    -67
      binaries/cli/src/build.rs
  2. +85
    -48
      binaries/cli/src/lib.rs
  3. +26
    -2
      binaries/coordinator/src/lib.rs
  4. +14
    -3
      binaries/coordinator/src/run/mod.rs
  5. +5
    -0
      binaries/daemon/src/lib.rs
  6. +12
    -3
      binaries/daemon/src/spawn.rs
  7. +1
    -0
      examples/multiple-daemons/run.rs
  8. +1
    -0
      libraries/message/src/cli_to_coordinator.rs
  9. +1
    -0
      libraries/message/src/coordinator_to_daemon.rs

+ 0
- 67
binaries/cli/src/build.rs View File

@@ -1,67 +0,0 @@
use dora_core::{
build::run_build_command,
config::OperatorId,
descriptor::{Descriptor, DescriptorExt, NodeExt, SINGLE_OPERATOR_DEFAULT_ID},
};
use eyre::Context;

use crate::resolve_dataflow;

pub fn build(dataflow: String, uv: bool) -> eyre::Result<()> {
let dataflow = resolve_dataflow(dataflow).context("could not resolve dataflow")?;
let descriptor = Descriptor::blocking_read(&dataflow)?;
let dataflow_absolute = if dataflow.is_relative() {
std::env::current_dir().unwrap().join(dataflow)
} else {
dataflow.to_owned()
};
let working_dir = dataflow_absolute.parent().unwrap();

let default_op_id = OperatorId::from(SINGLE_OPERATOR_DEFAULT_ID.to_string());

for node in descriptor.nodes {
match node.kind()? {
dora_core::descriptor::NodeKind::Standard(_) => {
if let Some(build) = &node.build {
run_build_command(build, working_dir, uv, &node.env).with_context(|| {
format!("build command failed for standard node `{}`", node.id)
})?
}
}
dora_core::descriptor::NodeKind::Runtime(runtime_node) => {
for operator in &runtime_node.operators {
if let Some(build) = &operator.config.build {
run_build_command(build, working_dir, uv, &node.env).with_context(
|| {
format!(
"build command failed for operator `{}/{}`",
node.id, operator.id
)
},
)?;
}
}
}
dora_core::descriptor::NodeKind::Custom(custom_node) => {
if let Some(build) = &custom_node.build {
run_build_command(build, working_dir, uv, &node.env).with_context(|| {
format!("build command failed for custom node `{}`", node.id)
})?
}
}
dora_core::descriptor::NodeKind::Operator(operator) => {
if let Some(build) = &operator.config.build {
run_build_command(build, working_dir, uv, &node.env).with_context(|| {
format!(
"build command failed for operator `{}/{}`",
node.id,
operator.id.as_ref().unwrap_or(&default_op_id)
)
})?
}
}
}
}

Ok(())
}

+ 85
- 48
binaries/cli/src/lib.rs View File

@@ -33,7 +33,6 @@ use tracing::level_filters::LevelFilter;
use uuid::Uuid;

mod attach;
pub(crate) mod build;
mod check;
mod formatting;
mod graph;
@@ -83,6 +82,12 @@ enum Command {
/// Path to the dataflow descriptor file
#[clap(value_name = "PATH")]
dataflow: String,
/// Address of the dora coordinator
#[clap(long, value_name = "IP", default_value_t = LOCALHOST)]
coordinator_addr: IpAddr,
/// Port number of the coordinator control server
#[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)]
coordinator_port: u16,
// Use UV to build nodes.
#[clap(long, action)]
uv: bool,
@@ -366,8 +371,13 @@ fn run(args: Args) -> eyre::Result<()> {
} => {
graph::create(dataflow, mermaid, open)?;
}
Command::Build { dataflow, uv } => {
build::build(dataflow, uv)?;
Command::Build {
dataflow,
coordinator_addr,
coordinator_port,
uv,
} => {
start_dataflow(dataflow, None, coordinator_addr, coordinator_port, uv, true)?;
}
Command::New {
args,
@@ -419,26 +429,15 @@ fn run(args: Args) -> eyre::Result<()> {
hot_reload,
uv,
} => {
let dataflow = resolve_dataflow(dataflow).context("could not resolve dataflow")?;
let dataflow_descriptor =
Descriptor::blocking_read(&dataflow).wrap_err("Failed to read yaml dataflow")?;
let working_dir = dataflow
.canonicalize()
.context("failed to canonicalize dataflow path")?
.parent()
.ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))?
.to_owned();

let coordinator_socket = (coordinator_addr, coordinator_port).into();
let mut session = connect_to_coordinator(coordinator_socket)
.wrap_err("failed to connect to dora coordinator")?;
let dataflow_id = start_dataflow(
dataflow_descriptor.clone(),
name,
working_dir,
&mut *session,
uv,
)?;
let (dataflow, dataflow_descriptor, coordinator_socket, mut session, dataflow_id) =
start_dataflow(
dataflow,
name,
coordinator_addr,
coordinator_port,
uv,
false,
)?;

let attach = match (attach, detach) {
(true, true) => eyre::bail!("both `--attach` and `--detach` are given"),
@@ -613,34 +612,72 @@ fn run(args: Args) -> eyre::Result<()> {
}

fn start_dataflow(
dataflow: Descriptor,
dataflow: String,
name: Option<String>,
local_working_dir: PathBuf,
session: &mut TcpRequestReplyConnection,
coordinator_addr: IpAddr,
coordinator_port: u16,
uv: bool,
) -> Result<Uuid, eyre::ErrReport> {
let reply_raw = session
.request(
&serde_json::to_vec(&ControlRequest::Start {
dataflow,
name,
local_working_dir,
uv,
})
.unwrap(),
)
.wrap_err("failed to send start dataflow message")?;

let result: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match result {
ControlRequestReply::DataflowStarted { uuid } => {
eprintln!("{uuid}");
Ok(uuid)
build_only: bool,
) -> Result<
(
PathBuf,
Descriptor,
SocketAddr,
Box<TcpRequestReplyConnection>,
Uuid,
),
eyre::Error,
> {
let dataflow = resolve_dataflow(dataflow).context("could not resolve dataflow")?;
let dataflow_descriptor =
Descriptor::blocking_read(&dataflow).wrap_err("Failed to read yaml dataflow")?;
let working_dir = dataflow
.canonicalize()
.context("failed to canonicalize dataflow path")?
.parent()
.ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))?
.to_owned();
let coordinator_socket = (coordinator_addr, coordinator_port).into();
let mut session = connect_to_coordinator(coordinator_socket)
.wrap_err("failed to connect to dora coordinator")?;
let dataflow_id = {
let dataflow = dataflow_descriptor.clone();
let session: &mut TcpRequestReplyConnection = &mut *session;
let reply_raw = session
.request(
&serde_json::to_vec(&ControlRequest::Start {
dataflow,
name,
local_working_dir: working_dir,
uv,
build_only,
})
.unwrap(),
)
.wrap_err("failed to send start dataflow message")?;

let result: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match result {
ControlRequestReply::DataflowStarted { uuid } => {
if build_only {
eprintln!("dataflow build successful");
} else {
eprintln!("{uuid}");
}
uuid
}
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected start dataflow reply: {other:?}"),
}
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected start dataflow reply: {other:?}"),
}
};
Ok((
dataflow,
dataflow_descriptor,
coordinator_socket,
session,
dataflow_id,
))
}

fn stop_dataflow_interactive(


+ 26
- 2
binaries/coordinator/src/lib.rs View File

@@ -394,6 +394,7 @@ async fn start_inner(
name,
local_working_dir,
uv,
build_only,
} => {
let name = name.or_else(|| names::Generator::default().next());

@@ -414,6 +415,7 @@ async fn start_inner(
&mut daemon_connections,
&clock,
uv,
build_only,
)
.await?;
Ok(dataflow)
@@ -716,13 +718,23 @@ async fn start_inner(
match result {
Ok(()) => {
if dataflow.pending_spawn_results.is_empty() {
tracing::info!("successfully spawned dataflow `{dataflow_id}`");
tracing::info!(
"successfully {} dataflow `{dataflow_id}`",
if dataflow.build_only {
"built"
} else {
"spawned"
}
);
if let Some(reply_tx) = dataflow.spawn_result_tx.take() {
let _ =
reply_tx.send(Ok(ControlRequestReply::DataflowStarted {
uuid: dataflow_id,
}));
}
if dataflow.build_only {
running_dataflows.remove(&dataflow_id);
}
}
}
Err(err) => {
@@ -846,6 +858,8 @@ struct RunningDataflow {

pending_spawn_results: BTreeSet<DaemonId>,
spawn_result_tx: Option<tokio::sync::oneshot::Sender<eyre::Result<ControlRequestReply>>>,

build_only: bool,
}

struct ArchivedDataflow {
@@ -1045,12 +1059,21 @@ async fn start_dataflow(
daemon_connections: &mut DaemonConnections,
clock: &HLC,
uv: bool,
build_only: bool,
) -> eyre::Result<RunningDataflow> {
let SpawnedDataflow {
uuid,
daemons,
nodes,
} = spawn_dataflow(dataflow, working_dir, daemon_connections, clock, uv).await?;
} = spawn_dataflow(
dataflow,
working_dir,
daemon_connections,
clock,
uv,
build_only,
)
.await?;
Ok(RunningDataflow {
uuid,
name,
@@ -1066,6 +1089,7 @@ async fn start_dataflow(
log_subscribers: Vec::new(),
pending_spawn_results: daemons,
spawn_result_tx: None,
build_only,
})
}



+ 14
- 3
binaries/coordinator/src/run/mod.rs View File

@@ -26,6 +26,7 @@ pub(super) async fn spawn_dataflow(
daemon_connections: &mut DaemonConnections,
clock: &HLC,
uv: bool,
build_only: bool,
) -> eyre::Result<SpawnedDataflow> {
let nodes = dataflow.resolve_aliases_and_set_defaults()?;
let uuid = Uuid::new_v7(Timestamp::now(NoContext));
@@ -36,7 +37,8 @@ pub(super) async fn spawn_dataflow(
for (machine, nodes_on_machine) in &nodes_by_daemon {
let spawn_nodes = nodes_on_machine.iter().map(|n| n.id.clone()).collect();
tracing::debug!(
"Spawning dataflow `{uuid}` on machine `{machine:?}` (nodes: {spawn_nodes:?})"
"{} dataflow `{uuid}` on machine `{machine:?}` (nodes: {spawn_nodes:?})",
if build_only { "Building" } else { "Spawning" }
);

let spawn_command = SpawnDataflowNodes {
@@ -46,6 +48,7 @@ pub(super) async fn spawn_dataflow(
dataflow_descriptor: dataflow.clone(),
spawn_nodes,
uv,
build_only,
};
let message = serde_json::to_vec(&Timestamped {
inner: DaemonCoordinatorEvent::Spawn(spawn_command),
@@ -54,11 +57,19 @@ pub(super) async fn spawn_dataflow(

let daemon_id = spawn_dataflow_on_machine(daemon_connections, machine.as_deref(), &message)
.await
.wrap_err_with(|| format!("failed to spawn dataflow on machine `{machine:?}`"))?;
.wrap_err_with(|| {
format!(
"failed to {} dataflow on machine `{machine:?}`",
if build_only { "build" } else { "spawn" }
)
})?;
daemons.insert(daemon_id);
}

tracing::info!("successfully triggered dataflow spawn `{uuid}`");
tracing::info!(
"successfully triggered dataflow {} `{uuid}`",
if build_only { "build" } else { "spawn" }
);

Ok(SpawnedDataflow {
uuid,


+ 5
- 0
binaries/daemon/src/lib.rs View File

@@ -167,6 +167,7 @@ impl Daemon {
nodes,
dataflow_descriptor: descriptor,
uv,
build_only: false,
};

let clock = Arc::new(HLC::default());
@@ -470,6 +471,7 @@ impl Daemon {
dataflow_descriptor,
spawn_nodes,
uv,
build_only,
}) => {
match dataflow_descriptor.communication.remote {
dora_core::config::RemoteCommunicationConfig::Tcp => {}
@@ -490,6 +492,7 @@ impl Daemon {
dataflow_descriptor,
spawn_nodes,
uv,
build_only,
)
.await;
let (trigger_result, result_task) = match result {
@@ -762,6 +765,7 @@ impl Daemon {
dataflow_descriptor: Descriptor,
spawn_nodes: BTreeSet<NodeId>,
uv: bool,
build_only: bool,
) -> eyre::Result<impl Future<Output = eyre::Result<()>>> {
let mut logger = self.logger.for_dataflow(dataflow_id);
let dataflow =
@@ -821,6 +825,7 @@ impl Daemon {
dataflow_descriptor,
clock: self.clock.clone(),
uv,
build_only,
};

let mut tasks = Vec::new();


+ 12
- 3
binaries/daemon/src/spawn.rs View File

@@ -56,6 +56,7 @@ pub struct Spawner {
/// clock is required for generating timestamps when dropping messages early because queue is full
pub clock: Arc<HLC>,
pub uv: bool,
pub build_only: bool,
}

impl Spawner {
@@ -148,8 +149,12 @@ impl Spawner {
self.build_node(logger, &node.env, self.working_dir.clone(), build)
.await?;
}
spawn_command_from_path(&self.working_dir, self.uv, logger, &n, true)
.await?
if self.build_only {
None
} else {
spawn_command_from_path(&self.working_dir, self.uv, logger, &n, true)
.await?
}
}
dora_message::descriptor::NodeSource::GitBranch { repo, rev } => {
self.spawn_git_node(&n, repo, rev, logger, &node.env, prepared_git.unwrap())
@@ -680,7 +685,11 @@ impl Spawner {
self.build_node(logger, node_env, clone_dir.clone(), build)
.await?;
}
spawn_command_from_path(&clone_dir, self.uv, logger, node, true).await
if self.build_only {
Ok(None)
} else {
spawn_command_from_path(&clone_dir, self.uv, logger, node, true).await
}
}

async fn build_node(


+ 1
- 0
examples/multiple-daemons/run.rs View File

@@ -143,6 +143,7 @@ async fn start_dataflow(
local_working_dir: working_dir,
name: None,
uv: false,
build_only: false,
},
reply_sender,
}))


+ 1
- 0
libraries/message/src/cli_to_coordinator.rs View File

@@ -16,6 +16,7 @@ pub enum ControlRequest {
// binaries from CLI to coordinator/daemon
local_working_dir: PathBuf,
uv: bool,
build_only: bool,
},
Reload {
dataflow_id: Uuid,


+ 1
- 0
libraries/message/src/coordinator_to_daemon.rs View File

@@ -63,4 +63,5 @@ pub struct SpawnDataflowNodes {
pub dataflow_descriptor: Descriptor,
pub spawn_nodes: BTreeSet<NodeId>,
pub uv: bool,
pub build_only: bool,
}

Loading…
Cancel
Save