From 0abd3a0280e007b9974efeaa8c5806785974c25d Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Fri, 6 Jun 2025 14:49:59 +0200 Subject: [PATCH] Wip --- binaries/cli/src/command/build/distributed.rs | 107 ++++++ binaries/cli/src/{ => command/build}/git.rs | 0 binaries/cli/src/command/build/local.rs | 16 + binaries/cli/src/command/build/mod.rs | 153 ++++++++ binaries/cli/src/{ => command}/check.rs | 0 binaries/cli/src/{ => command}/logs.rs | 0 binaries/cli/src/command/mod.rs | 55 +++ binaries/cli/src/command/run.rs | 22 ++ .../cli/src/{ => command/start}/attach.rs | 48 +-- binaries/cli/src/command/start/mod.rs | 167 +++++++++ binaries/cli/src/{ => command}/up.rs | 2 +- binaries/cli/src/lib.rs | 350 ++---------------- binaries/cli/src/output.rs | 48 +++ binaries/coordinator/src/control.rs | 10 +- binaries/coordinator/src/lib.rs | 20 + libraries/message/src/cli_to_coordinator.rs | 1 + libraries/message/src/coordinator_to_cli.rs | 9 +- 17 files changed, 638 insertions(+), 370 deletions(-) create mode 100644 binaries/cli/src/command/build/distributed.rs rename binaries/cli/src/{ => command/build}/git.rs (100%) create mode 100644 binaries/cli/src/command/build/local.rs create mode 100644 binaries/cli/src/command/build/mod.rs rename binaries/cli/src/{ => command}/check.rs (100%) rename binaries/cli/src/{ => command}/logs.rs (100%) create mode 100644 binaries/cli/src/command/mod.rs create mode 100644 binaries/cli/src/command/run.rs rename binaries/cli/src/{ => command/start}/attach.rs (84%) create mode 100644 binaries/cli/src/command/start/mod.rs rename binaries/cli/src/{ => command}/up.rs (98%) create mode 100644 binaries/cli/src/output.rs diff --git a/binaries/cli/src/command/build/distributed.rs b/binaries/cli/src/command/build/distributed.rs new file mode 100644 index 00000000..9e7fca67 --- /dev/null +++ b/binaries/cli/src/command/build/distributed.rs @@ -0,0 +1,107 @@ +use communication_layer_request_reply::{TcpConnection, TcpRequestReplyConnection}; +use dora_core::descriptor::Descriptor; +use dora_message::{ + cli_to_coordinator::ControlRequest, + common::{GitSource, LogMessage}, + coordinator_to_cli::ControlRequestReply, + id::NodeId, + BuildId, +}; +use eyre::{bail, Context}; +use std::{ + collections::BTreeMap, + net::{SocketAddr, TcpStream}, +}; + +use crate::{output::print_log_message, session::DataflowSession}; + +pub fn build_distributed_dataflow( + session: &mut TcpRequestReplyConnection, + dataflow: Descriptor, + git_sources: &BTreeMap, + dataflow_session: &DataflowSession, + local_working_dir: Option, + uv: bool, +) -> eyre::Result { + let build_id = { + let reply_raw = session + .request( + &serde_json::to_vec(&ControlRequest::Build { + session_id: dataflow_session.session_id, + dataflow, + git_sources: git_sources.clone(), + prev_git_sources: dataflow_session.git_sources.clone(), + local_working_dir, + uv, + }) + .unwrap(), + ) + .wrap_err("failed to send start dataflow message")?; + + let result: ControlRequestReply = + serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; + match result { + ControlRequestReply::DataflowBuildTriggered { build_id } => { + eprintln!("dataflow build triggered: {build_id}"); + build_id + } + ControlRequestReply::Error(err) => bail!("{err}"), + other => bail!("unexpected start dataflow reply: {other:?}"), + } + }; + Ok(build_id) +} + +pub fn wait_until_dataflow_built( + build_id: BuildId, + session: &mut TcpRequestReplyConnection, + coordinator_socket: SocketAddr, + log_level: log::LevelFilter, +) -> eyre::Result { + // subscribe to log messages + let mut log_session = TcpConnection { + stream: TcpStream::connect(coordinator_socket) + .wrap_err("failed to connect to dora coordinator")?, + }; + log_session + .send( + &serde_json::to_vec(&ControlRequest::BuildLogSubscribe { + build_id, + level: log_level, + }) + .wrap_err("failed to serialize message")?, + ) + .wrap_err("failed to send build log subscribe request to coordinator")?; + std::thread::spawn(move || { + while let Ok(raw) = log_session.receive() { + let parsed: eyre::Result = + serde_json::from_slice(&raw).context("failed to parse log message"); + match parsed { + Ok(log_message) => { + print_log_message(log_message); + } + Err(err) => { + tracing::warn!("failed to parse log message: {err:?}") + } + } + } + }); + + let reply_raw = session + .request(&serde_json::to_vec(&ControlRequest::WaitForBuild { build_id }).unwrap()) + .wrap_err("failed to send WaitForBuild message")?; + + let result: ControlRequestReply = + serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; + match result { + ControlRequestReply::DataflowBuildFinished { build_id, result } => match result { + Ok(()) => { + eprintln!("dataflow build finished successfully"); + Ok(build_id) + } + Err(err) => bail!("{err}"), + }, + ControlRequestReply::Error(err) => bail!("{err}"), + other => bail!("unexpected start dataflow reply: {other:?}"), + } +} diff --git a/binaries/cli/src/git.rs b/binaries/cli/src/command/build/git.rs similarity index 100% rename from binaries/cli/src/git.rs rename to binaries/cli/src/command/build/git.rs diff --git a/binaries/cli/src/command/build/local.rs b/binaries/cli/src/command/build/local.rs new file mode 100644 index 00000000..3e18fc50 --- /dev/null +++ b/binaries/cli/src/command/build/local.rs @@ -0,0 +1,16 @@ +use std::{collections::BTreeMap, path::PathBuf}; + +use dora_core::descriptor::Descriptor; +use dora_message::{common::GitSource, id::NodeId}; + +use crate::session::DataflowSession; + +pub fn build_dataflow_locally( + dataflow: Descriptor, + git_sources: &BTreeMap, + dataflow_session: &DataflowSession, + working_dir: PathBuf, + uv: bool, +) -> eyre::Result<()> { + todo!() +} diff --git a/binaries/cli/src/command/build/mod.rs b/binaries/cli/src/command/build/mod.rs new file mode 100644 index 00000000..abf2ac32 --- /dev/null +++ b/binaries/cli/src/command/build/mod.rs @@ -0,0 +1,153 @@ +use communication_layer_request_reply::TcpRequestReplyConnection; +use dora_core::{ + descriptor::{CoreNodeKind, CustomNode, Descriptor, DescriptorExt}, + topics::{DORA_COORDINATOR_PORT_CONTROL_DEFAULT, LOCALHOST}, +}; +use dora_message::descriptor::NodeSource; +use eyre::Context; +use std::collections::BTreeMap; + +use crate::{connect_to_coordinator, resolve_dataflow, session::DataflowSession}; + +use distributed::{build_distributed_dataflow, wait_until_dataflow_built}; +use local::build_dataflow_locally; + +mod distributed; +mod git; +mod local; + +pub fn build( + dataflow: String, + coordinator_addr: Option, + coordinator_port: Option, + uv: bool, + force_local: bool, +) -> eyre::Result<()> { + let dataflow_path = resolve_dataflow(dataflow).context("could not resolve dataflow")?; + let dataflow_descriptor = + Descriptor::blocking_read(&dataflow_path).wrap_err("Failed to read yaml dataflow")?; + let mut dataflow_session = + DataflowSession::read_session(&dataflow_path).context("failed to read DataflowSession")?; + + let mut git_sources = BTreeMap::new(); + let resolved_nodes = dataflow_descriptor + .resolve_aliases_and_set_defaults() + .context("failed to resolve nodes")?; + for (node_id, node) in resolved_nodes { + if let CoreNodeKind::Custom(CustomNode { + source: NodeSource::GitBranch { repo, rev }, + .. + }) = node.kind + { + let source = git::fetch_commit_hash(repo, rev) + .with_context(|| format!("failed to find commit hash for `{node_id}`"))?; + git_sources.insert(node_id, source); + } + } + + let session = connect_to_coordinator_with_defaults(coordinator_addr, coordinator_port); + + let build_kind = if force_local { + // user explicitly requested a local build + BuildKind::Local + } else if coordinator_addr.is_some() || coordinator_port.is_some() { + // explicit coordinator address or port set -> there should be a coordinator running + BuildKind::ThroughCoordinator { + coordinator_session: session.context("failed to connect to coordinator")?, + } + } else { + match session { + Ok(coordinator_session) => { + // we found a local coordinator instance at default port -> use it for building + BuildKind::ThroughCoordinator { + coordinator_session, + } + } + Err(_) => { + // no coordinator instance found -> do a local build + BuildKind::Local + } + } + }; + + match build_kind { + BuildKind::Local => { + println!("running local build"); + // use dataflow dir as base working dir + let local_working_dir = dunce::canonicalize(&dataflow_path) + .context("failed to canonicalize dataflow path")? + .parent() + .ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))? + .to_owned(); + build_dataflow_locally( + dataflow_descriptor, + &git_sources, + &dataflow_session, + local_working_dir, + uv, + )?; + + dataflow_session.git_sources = git_sources; + dataflow_session + .write_out_for_dataflow(&dataflow_path) + .context("failed to write out dataflow session file")?; + } + BuildKind::ThroughCoordinator { + mut coordinator_session, + } => { + let local_working_dir = super::local_working_dir( + &dataflow_path, + &dataflow_descriptor, + &mut *coordinator_session, + )?; + let build_id = build_distributed_dataflow( + &mut *coordinator_session, + dataflow_descriptor, + &git_sources, + &dataflow_session, + local_working_dir, + uv, + )?; + + dataflow_session.git_sources = git_sources; + dataflow_session + .write_out_for_dataflow(&dataflow_path) + .context("failed to write out dataflow session file")?; + + // wait until dataflow build is finished + + wait_until_dataflow_built( + build_id, + &mut *coordinator_session, + coordinator_socket(coordinator_addr, coordinator_port), + log::LevelFilter::Info, + )?; + } + } + + Ok(()) +} + +enum BuildKind { + Local, + ThroughCoordinator { + coordinator_session: Box, + }, +} + +fn connect_to_coordinator_with_defaults( + coordinator_addr: Option, + coordinator_port: Option, +) -> std::io::Result> { + let coordinator_socket = coordinator_socket(coordinator_addr, coordinator_port); + connect_to_coordinator(coordinator_socket) +} + +fn coordinator_socket( + coordinator_addr: Option, + coordinator_port: Option, +) -> std::net::SocketAddr { + let coordinator_addr = coordinator_addr.unwrap_or(LOCALHOST); + let coordinator_port = coordinator_port.unwrap_or(DORA_COORDINATOR_PORT_CONTROL_DEFAULT); + (coordinator_addr, coordinator_port).into() +} diff --git a/binaries/cli/src/check.rs b/binaries/cli/src/command/check.rs similarity index 100% rename from binaries/cli/src/check.rs rename to binaries/cli/src/command/check.rs diff --git a/binaries/cli/src/logs.rs b/binaries/cli/src/command/logs.rs similarity index 100% rename from binaries/cli/src/logs.rs rename to binaries/cli/src/command/logs.rs diff --git a/binaries/cli/src/command/mod.rs b/binaries/cli/src/command/mod.rs new file mode 100644 index 00000000..fb80c1f6 --- /dev/null +++ b/binaries/cli/src/command/mod.rs @@ -0,0 +1,55 @@ +use std::path::{Path, PathBuf}; + +use communication_layer_request_reply::TcpRequestReplyConnection; +use dora_core::descriptor::Descriptor; +use dora_message::{cli_to_coordinator::ControlRequest, coordinator_to_cli::ControlRequestReply}; +use eyre::{bail, Context, ContextCompat}; + +pub mod build; +pub mod check; +pub mod logs; +pub mod run; +pub mod start; +pub mod up; + +fn local_working_dir( + dataflow_path: &Path, + dataflow_descriptor: &Descriptor, + coordinator_session: &mut TcpRequestReplyConnection, +) -> eyre::Result> { + Ok( + if dataflow_descriptor + .nodes + .iter() + .all(|n| n.deploy.machine.is_none()) + && cli_and_daemon_on_same_machine(coordinator_session)? + { + Some( + dunce::canonicalize(dataflow_path) + .context("failed to canonicalize dataflow file path")? + .parent() + .context("dataflow path has no parent dir")? + .to_owned(), + ) + } else { + None + }, + ) +} + +fn cli_and_daemon_on_same_machine(session: &mut TcpRequestReplyConnection) -> eyre::Result { + let reply_raw = session + .request(&serde_json::to_vec(&ControlRequest::CliAndDefaultDaemonOnSameMachine).unwrap()) + .wrap_err("failed to send start dataflow message")?; + + let result: ControlRequestReply = + serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; + match result { + ControlRequestReply::CliAndDefaultDaemonIps { + default_daemon, + cli, + } => Ok(default_daemon.is_some() && default_daemon == cli), + ControlRequestReply::Error(err) => bail!("{err}"), + other => bail!("unexpected start dataflow reply: {other:?}"), + } +} diff --git a/binaries/cli/src/command/run.rs b/binaries/cli/src/command/run.rs new file mode 100644 index 00000000..616a63b1 --- /dev/null +++ b/binaries/cli/src/command/run.rs @@ -0,0 +1,22 @@ +use dora_daemon::Daemon; +use eyre::Context; +use tokio::runtime::Builder; + +use crate::{handle_dataflow_result, resolve_dataflow, session::DataflowSession}; + +pub fn run(dataflow: String, uv: bool) -> Result<(), eyre::Error> { + let dataflow_path = resolve_dataflow(dataflow).context("could not resolve dataflow")?; + let dataflow_session = + DataflowSession::read_session(&dataflow_path).context("failed to read DataflowSession")?; + let rt = Builder::new_multi_thread() + .enable_all() + .build() + .context("tokio runtime failed")?; + let result = rt.block_on(Daemon::run_dataflow( + &dataflow_path, + dataflow_session.build_id, + dataflow_session.session_id, + uv, + ))?; + handle_dataflow_result(result, None) +} diff --git a/binaries/cli/src/attach.rs b/binaries/cli/src/command/start/attach.rs similarity index 84% rename from binaries/cli/src/attach.rs rename to binaries/cli/src/command/start/attach.rs index 8e9c3851..05d776e0 100644 --- a/binaries/cli/src/attach.rs +++ b/binaries/cli/src/command/start/attach.rs @@ -1,4 +1,3 @@ -use colored::Colorize; use communication_layer_request_reply::{TcpConnection, TcpRequestReplyConnection}; use dora_core::descriptor::{resolve_path, CoreNodeKind, Descriptor, DescriptorExt}; use dora_message::cli_to_coordinator::ControlRequest; @@ -16,6 +15,7 @@ use tracing::{error, info}; use uuid::Uuid; use crate::handle_dataflow_result; +use crate::output::print_log_message; pub fn attach_dataflow( dataflow: Descriptor, @@ -183,52 +183,6 @@ pub fn attach_dataflow( } } -pub fn print_log_message(log_message: LogMessage) { - let LogMessage { - build_id, - dataflow_id, - node_id, - daemon_id, - level, - target, - module_path: _, - file: _, - line: _, - message, - } = log_message; - let level = match level { - log::Level::Error => "ERROR".red(), - log::Level::Warn => "WARN ".yellow(), - log::Level::Info => "INFO ".green(), - other => format!("{other:5}").normal(), - }; - let dataflow = if let Some(dataflow_id) = dataflow_id { - format!(" dataflow `{dataflow_id}`").cyan() - } else { - String::new().cyan() - }; - let build = if let Some(build_id) = build_id { - format!(" build `{build_id}`").cyan() - } else { - String::new().cyan() - }; - let daemon = match daemon_id { - Some(id) => format!(" on daemon `{id}`"), - None => " on default daemon".to_string(), - } - .bright_black(); - let node = match node_id { - Some(node_id) => format!(" {node_id}").bold(), - None => "".normal(), - }; - let target = match target { - Some(target) => format!(" {target}").dimmed(), - None => "".normal(), - }; - - println!("{level}{build}{dataflow}{daemon}{node}{target}: {message}"); -} - enum AttachEvent { Control(ControlRequest), Log(eyre::Result), diff --git a/binaries/cli/src/command/start/mod.rs b/binaries/cli/src/command/start/mod.rs new file mode 100644 index 00000000..5275a62d --- /dev/null +++ b/binaries/cli/src/command/start/mod.rs @@ -0,0 +1,167 @@ +use communication_layer_request_reply::{TcpConnection, TcpRequestReplyConnection}; +use dora_core::descriptor::{Descriptor, DescriptorExt}; +use dora_message::{ + cli_to_coordinator::ControlRequest, common::LogMessage, coordinator_to_cli::ControlRequestReply, +}; +use eyre::{bail, Context}; +use std::{ + net::{SocketAddr, TcpStream}, + path::PathBuf, +}; +use uuid::Uuid; + +use crate::{ + connect_to_coordinator, output::print_log_message, resolve_dataflow, session::DataflowSession, +}; +use attach::attach_dataflow; + +mod attach; + +pub fn start( + dataflow: String, + name: Option, + coordinator_socket: SocketAddr, + attach: bool, + detach: bool, + hot_reload: bool, + uv: bool, +) -> eyre::Result<()> { + let (dataflow, dataflow_descriptor, mut session, dataflow_id) = + start_dataflow(dataflow, name, coordinator_socket, uv)?; + + let attach = match (attach, detach) { + (true, true) => eyre::bail!("both `--attach` and `--detach` are given"), + (true, false) => true, + (false, true) => false, + (false, false) => { + println!("attaching to dataflow (use `--detach` to run in background)"); + true + } + }; + + if attach { + let log_level = env_logger::Builder::new() + .filter_level(log::LevelFilter::Info) + .parse_default_env() + .build() + .filter(); + + attach_dataflow( + dataflow_descriptor, + dataflow, + dataflow_id, + &mut *session, + hot_reload, + coordinator_socket, + log_level, + ) + } else { + // wait until dataflow is started + wait_until_dataflow_started( + dataflow_id, + &mut session, + coordinator_socket, + log::LevelFilter::Info, + ) + } +} + +fn start_dataflow( + dataflow: String, + name: Option, + coordinator_socket: SocketAddr, + uv: bool, +) -> Result<(PathBuf, Descriptor, Box, Uuid), eyre::Error> { + let dataflow = resolve_dataflow(dataflow).context("could not resolve dataflow")?; + let dataflow_descriptor = + Descriptor::blocking_read(&dataflow).wrap_err("Failed to read yaml dataflow")?; + let dataflow_session = + DataflowSession::read_session(&dataflow).context("failed to read DataflowSession")?; + + let mut session = connect_to_coordinator(coordinator_socket) + .wrap_err("failed to connect to dora coordinator")?; + + let local_working_dir = + super::local_working_dir(&dataflow, &dataflow_descriptor, &mut *session)?; + + let dataflow_id = { + let dataflow = dataflow_descriptor.clone(); + let session: &mut TcpRequestReplyConnection = &mut *session; + let reply_raw = session + .request( + &serde_json::to_vec(&ControlRequest::Start { + build_id: dataflow_session.build_id, + session_id: dataflow_session.session_id, + dataflow, + name, + local_working_dir, + uv, + }) + .unwrap(), + ) + .wrap_err("failed to send start dataflow message")?; + + let result: ControlRequestReply = + serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; + match result { + ControlRequestReply::DataflowStartTriggered { uuid } => { + eprintln!("dataflow start triggered: {uuid}"); + uuid + } + ControlRequestReply::Error(err) => bail!("{err}"), + other => bail!("unexpected start dataflow reply: {other:?}"), + } + }; + Ok((dataflow, dataflow_descriptor, session, dataflow_id)) +} + +fn wait_until_dataflow_started( + dataflow_id: Uuid, + session: &mut Box, + coordinator_addr: SocketAddr, + log_level: log::LevelFilter, +) -> eyre::Result<()> { + // subscribe to log messages + let mut log_session = TcpConnection { + stream: TcpStream::connect(coordinator_addr) + .wrap_err("failed to connect to dora coordinator")?, + }; + log_session + .send( + &serde_json::to_vec(&ControlRequest::LogSubscribe { + dataflow_id, + level: log_level, + }) + .wrap_err("failed to serialize message")?, + ) + .wrap_err("failed to send log subscribe request to coordinator")?; + std::thread::spawn(move || { + while let Ok(raw) = log_session.receive() { + let parsed: eyre::Result = + serde_json::from_slice(&raw).context("failed to parse log message"); + match parsed { + Ok(log_message) => { + print_log_message(log_message); + } + Err(err) => { + tracing::warn!("failed to parse log message: {err:?}") + } + } + } + }); + + let reply_raw = session + .request(&serde_json::to_vec(&ControlRequest::WaitForSpawn { dataflow_id }).unwrap()) + .wrap_err("failed to send start dataflow message")?; + + let result: ControlRequestReply = + serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; + match result { + ControlRequestReply::DataflowSpawned { uuid } => { + eprintln!("dataflow started: {uuid}"); + } + ControlRequestReply::Error(err) => bail!("{err}"), + other => bail!("unexpected start dataflow reply: {other:?}"), + } + Ok(()) +} diff --git a/binaries/cli/src/up.rs b/binaries/cli/src/command/up.rs similarity index 98% rename from binaries/cli/src/up.rs rename to binaries/cli/src/command/up.rs index 16f1a4c1..03eead4a 100644 --- a/binaries/cli/src/up.rs +++ b/binaries/cli/src/command/up.rs @@ -1,4 +1,4 @@ -use crate::{check::daemon_running, connect_to_coordinator, LOCALHOST}; +use crate::{command::check::daemon_running, connect_to_coordinator, LOCALHOST}; use dora_core::topics::DORA_COORDINATOR_PORT_CONTROL_DEFAULT; use dora_message::{cli_to_coordinator::ControlRequest, coordinator_to_cli::ControlRequestReply}; use eyre::{bail, Context, ContextCompat}; diff --git a/binaries/cli/src/lib.rs b/binaries/cli/src/lib.rs index 28a032a3..278f9e90 100644 --- a/binaries/cli/src/lib.rs +++ b/binaries/cli/src/lib.rs @@ -1,11 +1,8 @@ -use attach::{attach_dataflow, print_log_message}; use colored::Colorize; -use communication_layer_request_reply::{ - RequestReplyLayer, TcpConnection, TcpLayer, TcpRequestReplyConnection, -}; +use communication_layer_request_reply::{RequestReplyLayer, TcpLayer, TcpRequestReplyConnection}; use dora_coordinator::Event; use dora_core::{ - descriptor::{source_is_url, CoreNodeKind, CustomNode, Descriptor, DescriptorExt}, + descriptor::{source_is_url, Descriptor, DescriptorExt}, topics::{ DORA_COORDINATOR_PORT_CONTROL_DEFAULT, DORA_COORDINATOR_PORT_DEFAULT, DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, @@ -15,10 +12,7 @@ use dora_daemon::Daemon; use dora_download::download_file; use dora_message::{ cli_to_coordinator::ControlRequest, - common::LogMessage, coordinator_to_cli::{ControlRequestReply, DataflowList, DataflowResult, DataflowStatus}, - descriptor::NodeSource, - BuildId, }; #[cfg(feature = "tracing")] use dora_tracing::set_up_tracing; @@ -26,12 +20,7 @@ use dora_tracing::{set_up_tracing_opts, FileLogging}; use duration_str::parse; use eyre::{bail, Context}; use formatting::FormatDataflowError; -use std::{ - collections::BTreeMap, - env::current_dir, - io::Write, - net::{SocketAddr, TcpStream}, -}; +use std::{env::current_dir, io::Write, net::SocketAddr}; use std::{ net::{IpAddr, Ipv4Addr}, path::PathBuf, @@ -42,16 +31,12 @@ use tokio::runtime::Builder; use tracing::level_filters::LevelFilter; use uuid::Uuid; -mod attach; -mod check; +pub mod command; mod formatting; -mod git; mod graph; -mod logs; -pub mod run; +pub mod output; pub mod session; mod template; -mod up; const LOCALHOST: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); const LISTEN_WILDCARD: IpAddr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)); @@ -96,14 +81,17 @@ enum Command { #[clap(value_name = "PATH")] dataflow: String, /// Address of the dora coordinator - #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] - coordinator_addr: IpAddr, + #[clap(long, value_name = "IP")] + coordinator_addr: Option, /// Port number of the coordinator control server - #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] - coordinator_port: u16, + #[clap(long, value_name = "PORT")] + coordinator_port: Option, // Use UV to build nodes. #[clap(long, action)] uv: bool, + // Run build on local machine + #[clap(long, action)] + local: bool, }, /// Generate a new project or node. Choose the language between Rust, Python, C or C++. New { @@ -353,12 +341,6 @@ fn run_cli(args: Args) -> eyre::Result<()> { } }; - let log_level = env_logger::Builder::new() - .filter_level(log::LevelFilter::Info) - .parse_default_env() - .build() - .filter(); - match args.command { Command::Check { dataflow, @@ -373,9 +355,9 @@ fn run_cli(args: Args) -> eyre::Result<()> { .ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))? .to_owned(); Descriptor::blocking_read(&dataflow)?.check(&working_dir)?; - check::check_environment((coordinator_addr, coordinator_port).into())? + command::check::check_environment((coordinator_addr, coordinator_port).into())? } - None => check::check_environment((coordinator_addr, coordinator_port).into())?, + None => command::check::check_environment((coordinator_addr, coordinator_port).into())?, }, Command::Graph { dataflow, @@ -389,24 +371,15 @@ fn run_cli(args: Args) -> eyre::Result<()> { coordinator_addr, coordinator_port, uv, - } => { - let coordinator_socket = (coordinator_addr, coordinator_port).into(); - let (mut session, build_id) = build_dataflow(dataflow, coordinator_socket, uv)?; - // wait until dataflow build is finished - wait_until_dataflow_built( - build_id, - &mut session, - coordinator_socket, - log::LevelFilter::Info, - )?; - } + local, + } => command::build::build(dataflow, coordinator_addr, coordinator_port, uv, local)?, Command::New { args, internal_create_with_path_dependencies, } => template::create(args, internal_create_with_path_dependencies)?, - Command::Run { dataflow, uv } => run::run(dataflow, uv)?, + Command::Run { dataflow, uv } => command::run::run(dataflow, uv)?, Command::Up { config } => { - up::up(config.as_deref())?; + command::up::up(config.as_deref())?; } Command::Logs { dataflow, @@ -421,7 +394,7 @@ fn run_cli(args: Args) -> eyre::Result<()> { if let Some(dataflow) = dataflow { let uuid = Uuid::parse_str(&dataflow).ok(); let name = if uuid.is_some() { None } else { Some(dataflow) }; - logs::logs(&mut *session, uuid, name, node)? + command::logs::logs(&mut *session, uuid, name, node)? } else { let active: Vec = list.get_active(); @@ -430,7 +403,7 @@ fn run_cli(args: Args) -> eyre::Result<()> { [uuid] => uuid.clone(), _ => inquire::Select::new("Choose dataflow to show logs:", active).prompt()?, }; - logs::logs(&mut *session, Some(uuid.uuid), None, node)? + command::logs::logs(&mut *session, Some(uuid.uuid), None, node)? } } Command::Start { @@ -444,38 +417,15 @@ fn run_cli(args: Args) -> eyre::Result<()> { uv, } => { let coordinator_socket = (coordinator_addr, coordinator_port).into(); - let (dataflow, dataflow_descriptor, mut session, dataflow_id) = - start_dataflow(dataflow, name, coordinator_socket, uv)?; - - let attach = match (attach, detach) { - (true, true) => eyre::bail!("both `--attach` and `--detach` are given"), - (true, false) => true, - (false, true) => false, - (false, false) => { - println!("attaching to dataflow (use `--detach` to run in background)"); - true - } - }; - - if attach { - attach_dataflow( - dataflow_descriptor, - dataflow, - dataflow_id, - &mut *session, - hot_reload, - coordinator_socket, - log_level, - )? - } else { - // wait until dataflow is started - wait_until_dataflow_started( - dataflow_id, - &mut session, - coordinator_socket, - log::LevelFilter::Info, - )?; - } + command::start::start( + dataflow, + name, + coordinator_socket, + attach, + detach, + hot_reload, + uv, + )? } Command::List { coordinator_addr, @@ -505,7 +455,7 @@ fn run_cli(args: Args) -> eyre::Result<()> { config, coordinator_addr, coordinator_port, - } => up::destroy( + } => command::up::destroy( config.as_deref(), (coordinator_addr, coordinator_port).into(), )?, @@ -629,246 +579,6 @@ fn run_cli(args: Args) -> eyre::Result<()> { Ok(()) } -fn build_dataflow( - dataflow: String, - coordinator_socket: SocketAddr, - uv: bool, -) -> eyre::Result<(Box, BuildId)> { - let dataflow_path = resolve_dataflow(dataflow).context("could not resolve dataflow")?; - let dataflow_descriptor = - Descriptor::blocking_read(&dataflow_path).wrap_err("Failed to read yaml dataflow")?; - let mut dataflow_session = - DataflowSession::read_session(&dataflow_path).context("failed to read DataflowSession")?; - - let mut git_sources = BTreeMap::new(); - let resolved_nodes = dataflow_descriptor - .resolve_aliases_and_set_defaults() - .context("failed to resolve nodes")?; - for (node_id, node) in resolved_nodes { - if let CoreNodeKind::Custom(CustomNode { - source: NodeSource::GitBranch { repo, rev }, - .. - }) = node.kind - { - let source = git::fetch_commit_hash(repo, rev) - .with_context(|| format!("failed to find commit hash for `{node_id}`"))?; - git_sources.insert(node_id, source); - } - } - - let cli_and_daemon_on_same_machine = false; // TODO FIXME set to true if on same machine - let local_working_dir = if cli_and_daemon_on_same_machine { - // use dataflow dir as base working dir - Some( - dunce::canonicalize(&dataflow_path) - .context("failed to canonicalize dataflow path")? - .parent() - .ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))? - .to_owned(), - ) - } else { - None - }; - let mut session = connect_to_coordinator(coordinator_socket) - .wrap_err("failed to connect to dora coordinator")?; - let build_id = { - let dataflow = dataflow_descriptor.clone(); - let session: &mut TcpRequestReplyConnection = &mut *session; - let reply_raw = session - .request( - &serde_json::to_vec(&ControlRequest::Build { - session_id: dataflow_session.session_id, - dataflow, - git_sources: git_sources.clone(), - prev_git_sources: dataflow_session.git_sources.clone(), - local_working_dir, - uv, - }) - .unwrap(), - ) - .wrap_err("failed to send start dataflow message")?; - - let result: ControlRequestReply = - serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; - match result { - ControlRequestReply::DataflowBuildTriggered { build_id } => { - eprintln!("dataflow build triggered: {build_id}"); - dataflow_session.git_sources = git_sources; - dataflow_session - .write_out_for_dataflow(&dataflow_path) - .context("failed to write out dataflow session file")?; - build_id - } - ControlRequestReply::Error(err) => bail!("{err}"), - other => bail!("unexpected start dataflow reply: {other:?}"), - } - }; - Ok((session, build_id)) -} - -fn wait_until_dataflow_built( - build_id: BuildId, - session: &mut Box, - coordinator_addr: SocketAddr, - log_level: log::LevelFilter, -) -> eyre::Result { - // subscribe to log messages - let mut log_session = TcpConnection { - stream: TcpStream::connect(coordinator_addr) - .wrap_err("failed to connect to dora coordinator")?, - }; - log_session - .send( - &serde_json::to_vec(&ControlRequest::BuildLogSubscribe { - build_id, - level: log_level, - }) - .wrap_err("failed to serialize message")?, - ) - .wrap_err("failed to send build log subscribe request to coordinator")?; - std::thread::spawn(move || { - while let Ok(raw) = log_session.receive() { - let parsed: eyre::Result = - serde_json::from_slice(&raw).context("failed to parse log message"); - match parsed { - Ok(log_message) => { - print_log_message(log_message); - } - Err(err) => { - tracing::warn!("failed to parse log message: {err:?}") - } - } - } - }); - - let reply_raw = session - .request(&serde_json::to_vec(&ControlRequest::WaitForBuild { build_id }).unwrap()) - .wrap_err("failed to send WaitForBuild message")?; - - let result: ControlRequestReply = - serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; - match result { - ControlRequestReply::DataflowBuildFinished { build_id, result } => match result { - Ok(()) => { - eprintln!("dataflow build finished successfully"); - Ok(build_id) - } - Err(err) => bail!("{err}"), - }, - ControlRequestReply::Error(err) => bail!("{err}"), - other => bail!("unexpected start dataflow reply: {other:?}"), - } -} - -fn start_dataflow( - dataflow: String, - name: Option, - coordinator_socket: SocketAddr, - uv: bool, -) -> Result<(PathBuf, Descriptor, Box, Uuid), eyre::Error> { - let dataflow = resolve_dataflow(dataflow).context("could not resolve dataflow")?; - let dataflow_descriptor = - Descriptor::blocking_read(&dataflow).wrap_err("Failed to read yaml dataflow")?; - let dataflow_session = - DataflowSession::read_session(&dataflow).context("failed to read DataflowSession")?; - - let cli_and_daemon_on_same_machine = false; // TODO FIXME set to true if on same machine - let local_working_dir = if cli_and_daemon_on_same_machine { - // use dataflow dir as base working dir - Some( - dunce::canonicalize(&dataflow) - .context("failed to canonicalize dataflow path")? - .parent() - .ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))? - .to_owned(), - ) - } else { - None - }; - let mut session = connect_to_coordinator(coordinator_socket) - .wrap_err("failed to connect to dora coordinator")?; - let dataflow_id = { - let dataflow = dataflow_descriptor.clone(); - let session: &mut TcpRequestReplyConnection = &mut *session; - let reply_raw = session - .request( - &serde_json::to_vec(&ControlRequest::Start { - build_id: dataflow_session.build_id, - session_id: dataflow_session.session_id, - dataflow, - name, - local_working_dir, - uv, - }) - .unwrap(), - ) - .wrap_err("failed to send start dataflow message")?; - - let result: ControlRequestReply = - serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; - match result { - ControlRequestReply::DataflowStartTriggered { uuid } => { - eprintln!("dataflow start triggered: {uuid}"); - uuid - } - ControlRequestReply::Error(err) => bail!("{err}"), - other => bail!("unexpected start dataflow reply: {other:?}"), - } - }; - Ok((dataflow, dataflow_descriptor, session, dataflow_id)) -} - -fn wait_until_dataflow_started( - dataflow_id: Uuid, - session: &mut Box, - coordinator_addr: SocketAddr, - log_level: log::LevelFilter, -) -> eyre::Result<()> { - // subscribe to log messages - let mut log_session = TcpConnection { - stream: TcpStream::connect(coordinator_addr) - .wrap_err("failed to connect to dora coordinator")?, - }; - log_session - .send( - &serde_json::to_vec(&ControlRequest::LogSubscribe { - dataflow_id, - level: log_level, - }) - .wrap_err("failed to serialize message")?, - ) - .wrap_err("failed to send log subscribe request to coordinator")?; - std::thread::spawn(move || { - while let Ok(raw) = log_session.receive() { - let parsed: eyre::Result = - serde_json::from_slice(&raw).context("failed to parse log message"); - match parsed { - Ok(log_message) => { - print_log_message(log_message); - } - Err(err) => { - tracing::warn!("failed to parse log message: {err:?}") - } - } - } - }); - - let reply_raw = session - .request(&serde_json::to_vec(&ControlRequest::WaitForSpawn { dataflow_id }).unwrap()) - .wrap_err("failed to send start dataflow message")?; - - let result: ControlRequestReply = - serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; - match result { - ControlRequestReply::DataflowSpawned { uuid } => { - eprintln!("dataflow started: {uuid}"); - } - ControlRequestReply::Error(err) => bail!("{err}"), - other => bail!("unexpected start dataflow reply: {other:?}"), - } - Ok(()) -} - fn stop_dataflow_interactive( grace_duration: Option, session: &mut TcpRequestReplyConnection, diff --git a/binaries/cli/src/output.rs b/binaries/cli/src/output.rs new file mode 100644 index 00000000..ad35ad67 --- /dev/null +++ b/binaries/cli/src/output.rs @@ -0,0 +1,48 @@ +use colored::Colorize; +use dora_message::common::LogMessage; + +pub fn print_log_message(log_message: LogMessage) { + let LogMessage { + build_id, + dataflow_id, + node_id, + daemon_id, + level, + target, + module_path: _, + file: _, + line: _, + message, + } = log_message; + let level = match level { + log::Level::Error => "ERROR".red(), + log::Level::Warn => "WARN ".yellow(), + log::Level::Info => "INFO ".green(), + other => format!("{other:5}").normal(), + }; + let dataflow = if let Some(dataflow_id) = dataflow_id { + format!(" dataflow `{dataflow_id}`").cyan() + } else { + String::new().cyan() + }; + let build = if let Some(build_id) = build_id { + format!(" build `{build_id}`").cyan() + } else { + String::new().cyan() + }; + let daemon = match daemon_id { + Some(id) => format!(" on daemon `{id}`"), + None => " on default daemon".to_string(), + } + .bright_black(); + let node = match node_id { + Some(node_id) => format!(" {node_id}").bold(), + None => "".normal(), + }; + let target = match target { + Some(target) => format!(" {target}").dimmed(), + None => "".normal(), + }; + + println!("{level}{build}{dataflow}{daemon}{node}{target}: {message}"); +} diff --git a/binaries/coordinator/src/control.rs b/binaries/coordinator/src/control.rs index ace212e2..446233a8 100644 --- a/binaries/coordinator/src/control.rs +++ b/binaries/coordinator/src/control.rs @@ -81,6 +81,7 @@ async fn handle_requests( tx: mpsc::Sender, _finish_tx: mpsc::Sender<()>, ) { + let peer_addr = connection.peer_addr().ok(); loop { let next_request = tcp_receive(&mut connection).map(Either::Left); let coordinator_stopped = tx.closed().map(Either::Right); @@ -127,11 +128,18 @@ async fn handle_requests( break; } - let result = match request { + let mut result = match request { Ok(request) => handle_request(request, &tx).await, Err(err) => Err(err), }; + if let Ok(ControlRequestReply::CliAndDefaultDaemonIps { cli, .. }) = &mut result { + if cli.is_none() { + // fill cli IP address in reply + *cli = peer_addr.map(|s| s.ip()); + } + } + let reply = result.unwrap_or_else(|err| ControlRequestReply::Error(format!("{err:?}"))); let serialized: Vec = match serde_json::to_vec(&reply).wrap_err("failed to serialize ControlRequestReply") { diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index bfaa9799..e31b97a6 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -144,6 +144,10 @@ impl DaemonConnections { } } + fn get(&self, id: &DaemonId) -> Option<&DaemonConnection> { + self.daemons.get(id) + } + fn get_mut(&mut self, id: &DaemonId) -> Option<&mut DaemonConnection> { self.daemons.get_mut(id) } @@ -710,6 +714,22 @@ async fn start_inner( "BuildLogSubscribe request should be handled separately" ))); } + ControlRequest::CliAndDefaultDaemonOnSameMachine => { + let mut default_daemon_ip = None; + if let Some(default_id) = daemon_connections.unnamed().next() { + if let Some(connection) = daemon_connections.get(default_id) { + if let Ok(addr) = connection.stream.peer_addr() { + default_daemon_ip = Some(addr.ip()); + } + } + } + let _ = reply_sender.send(Ok( + ControlRequestReply::CliAndDefaultDaemonIps { + default_daemon: default_daemon_ip, + cli: None, // filled later + }, + )); + } } } ControlEvent::Error(err) => tracing::error!("{err:?}"), diff --git a/libraries/message/src/cli_to_coordinator.rs b/libraries/message/src/cli_to_coordinator.rs index 6bbdcde3..bf3d3a03 100644 --- a/libraries/message/src/cli_to_coordinator.rs +++ b/libraries/message/src/cli_to_coordinator.rs @@ -80,4 +80,5 @@ pub enum ControlRequest { build_id: BuildId, level: log::LevelFilter, }, + CliAndDefaultDaemonOnSameMachine, } diff --git a/libraries/message/src/coordinator_to_cli.rs b/libraries/message/src/coordinator_to_cli.rs index 79dc85ae..02243468 100644 --- a/libraries/message/src/coordinator_to_cli.rs +++ b/libraries/message/src/coordinator_to_cli.rs @@ -1,4 +1,7 @@ -use std::collections::{BTreeMap, BTreeSet}; +use std::{ + collections::{BTreeMap, BTreeSet}, + net::IpAddr, +}; use uuid::Uuid; @@ -34,6 +37,10 @@ pub enum ControlRequestReply { DaemonConnected(bool), ConnectedDaemons(BTreeSet), Logs(Vec), + CliAndDefaultDaemonIps { + default_daemon: Option, + cli: Option, + }, } #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]