Philipp Oppermann 7 months ago
parent
commit
0abd3a0280
Failed to extract signature
17 changed files with 638 additions and 370 deletions
  1. +107
    -0
      binaries/cli/src/command/build/distributed.rs
  2. +0
    -0
      binaries/cli/src/command/build/git.rs
  3. +16
    -0
      binaries/cli/src/command/build/local.rs
  4. +153
    -0
      binaries/cli/src/command/build/mod.rs
  5. +0
    -0
      binaries/cli/src/command/check.rs
  6. +0
    -0
      binaries/cli/src/command/logs.rs
  7. +55
    -0
      binaries/cli/src/command/mod.rs
  8. +22
    -0
      binaries/cli/src/command/run.rs
  9. +1
    -47
      binaries/cli/src/command/start/attach.rs
  10. +167
    -0
      binaries/cli/src/command/start/mod.rs
  11. +1
    -1
      binaries/cli/src/command/up.rs
  12. +30
    -320
      binaries/cli/src/lib.rs
  13. +48
    -0
      binaries/cli/src/output.rs
  14. +9
    -1
      binaries/coordinator/src/control.rs
  15. +20
    -0
      binaries/coordinator/src/lib.rs
  16. +1
    -0
      libraries/message/src/cli_to_coordinator.rs
  17. +8
    -1
      libraries/message/src/coordinator_to_cli.rs

+ 107
- 0
binaries/cli/src/command/build/distributed.rs View File

@@ -0,0 +1,107 @@
use communication_layer_request_reply::{TcpConnection, TcpRequestReplyConnection};
use dora_core::descriptor::Descriptor;
use dora_message::{
cli_to_coordinator::ControlRequest,
common::{GitSource, LogMessage},
coordinator_to_cli::ControlRequestReply,
id::NodeId,
BuildId,
};
use eyre::{bail, Context};
use std::{
collections::BTreeMap,
net::{SocketAddr, TcpStream},
};

use crate::{output::print_log_message, session::DataflowSession};

pub fn build_distributed_dataflow(
session: &mut TcpRequestReplyConnection,
dataflow: Descriptor,
git_sources: &BTreeMap<NodeId, GitSource>,
dataflow_session: &DataflowSession,
local_working_dir: Option<std::path::PathBuf>,
uv: bool,
) -> eyre::Result<BuildId> {
let build_id = {
let reply_raw = session
.request(
&serde_json::to_vec(&ControlRequest::Build {
session_id: dataflow_session.session_id,
dataflow,
git_sources: git_sources.clone(),
prev_git_sources: dataflow_session.git_sources.clone(),
local_working_dir,
uv,
})
.unwrap(),
)
.wrap_err("failed to send start dataflow message")?;

let result: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match result {
ControlRequestReply::DataflowBuildTriggered { build_id } => {
eprintln!("dataflow build triggered: {build_id}");
build_id
}
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected start dataflow reply: {other:?}"),
}
};
Ok(build_id)
}

pub fn wait_until_dataflow_built(
build_id: BuildId,
session: &mut TcpRequestReplyConnection,
coordinator_socket: SocketAddr,
log_level: log::LevelFilter,
) -> eyre::Result<BuildId> {
// subscribe to log messages
let mut log_session = TcpConnection {
stream: TcpStream::connect(coordinator_socket)
.wrap_err("failed to connect to dora coordinator")?,
};
log_session
.send(
&serde_json::to_vec(&ControlRequest::BuildLogSubscribe {
build_id,
level: log_level,
})
.wrap_err("failed to serialize message")?,
)
.wrap_err("failed to send build log subscribe request to coordinator")?;
std::thread::spawn(move || {
while let Ok(raw) = log_session.receive() {
let parsed: eyre::Result<LogMessage> =
serde_json::from_slice(&raw).context("failed to parse log message");
match parsed {
Ok(log_message) => {
print_log_message(log_message);
}
Err(err) => {
tracing::warn!("failed to parse log message: {err:?}")
}
}
}
});

let reply_raw = session
.request(&serde_json::to_vec(&ControlRequest::WaitForBuild { build_id }).unwrap())
.wrap_err("failed to send WaitForBuild message")?;

let result: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match result {
ControlRequestReply::DataflowBuildFinished { build_id, result } => match result {
Ok(()) => {
eprintln!("dataflow build finished successfully");
Ok(build_id)
}
Err(err) => bail!("{err}"),
},
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected start dataflow reply: {other:?}"),
}
}

binaries/cli/src/git.rs → binaries/cli/src/command/build/git.rs View File


+ 16
- 0
binaries/cli/src/command/build/local.rs View File

@@ -0,0 +1,16 @@
use std::{collections::BTreeMap, path::PathBuf};

use dora_core::descriptor::Descriptor;
use dora_message::{common::GitSource, id::NodeId};

use crate::session::DataflowSession;

pub fn build_dataflow_locally(
dataflow: Descriptor,
git_sources: &BTreeMap<NodeId, GitSource>,
dataflow_session: &DataflowSession,
working_dir: PathBuf,
uv: bool,
) -> eyre::Result<()> {
todo!()
}

+ 153
- 0
binaries/cli/src/command/build/mod.rs View File

@@ -0,0 +1,153 @@
use communication_layer_request_reply::TcpRequestReplyConnection;
use dora_core::{
descriptor::{CoreNodeKind, CustomNode, Descriptor, DescriptorExt},
topics::{DORA_COORDINATOR_PORT_CONTROL_DEFAULT, LOCALHOST},
};
use dora_message::descriptor::NodeSource;
use eyre::Context;
use std::collections::BTreeMap;

use crate::{connect_to_coordinator, resolve_dataflow, session::DataflowSession};

use distributed::{build_distributed_dataflow, wait_until_dataflow_built};
use local::build_dataflow_locally;

mod distributed;
mod git;
mod local;

pub fn build(
dataflow: String,
coordinator_addr: Option<std::net::IpAddr>,
coordinator_port: Option<u16>,
uv: bool,
force_local: bool,
) -> eyre::Result<()> {
let dataflow_path = resolve_dataflow(dataflow).context("could not resolve dataflow")?;
let dataflow_descriptor =
Descriptor::blocking_read(&dataflow_path).wrap_err("Failed to read yaml dataflow")?;
let mut dataflow_session =
DataflowSession::read_session(&dataflow_path).context("failed to read DataflowSession")?;

let mut git_sources = BTreeMap::new();
let resolved_nodes = dataflow_descriptor
.resolve_aliases_and_set_defaults()
.context("failed to resolve nodes")?;
for (node_id, node) in resolved_nodes {
if let CoreNodeKind::Custom(CustomNode {
source: NodeSource::GitBranch { repo, rev },
..
}) = node.kind
{
let source = git::fetch_commit_hash(repo, rev)
.with_context(|| format!("failed to find commit hash for `{node_id}`"))?;
git_sources.insert(node_id, source);
}
}

let session = connect_to_coordinator_with_defaults(coordinator_addr, coordinator_port);

let build_kind = if force_local {
// user explicitly requested a local build
BuildKind::Local
} else if coordinator_addr.is_some() || coordinator_port.is_some() {
// explicit coordinator address or port set -> there should be a coordinator running
BuildKind::ThroughCoordinator {
coordinator_session: session.context("failed to connect to coordinator")?,
}
} else {
match session {
Ok(coordinator_session) => {
// we found a local coordinator instance at default port -> use it for building
BuildKind::ThroughCoordinator {
coordinator_session,
}
}
Err(_) => {
// no coordinator instance found -> do a local build
BuildKind::Local
}
}
};

match build_kind {
BuildKind::Local => {
println!("running local build");
// use dataflow dir as base working dir
let local_working_dir = dunce::canonicalize(&dataflow_path)
.context("failed to canonicalize dataflow path")?
.parent()
.ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))?
.to_owned();
build_dataflow_locally(
dataflow_descriptor,
&git_sources,
&dataflow_session,
local_working_dir,
uv,
)?;

dataflow_session.git_sources = git_sources;
dataflow_session
.write_out_for_dataflow(&dataflow_path)
.context("failed to write out dataflow session file")?;
}
BuildKind::ThroughCoordinator {
mut coordinator_session,
} => {
let local_working_dir = super::local_working_dir(
&dataflow_path,
&dataflow_descriptor,
&mut *coordinator_session,
)?;
let build_id = build_distributed_dataflow(
&mut *coordinator_session,
dataflow_descriptor,
&git_sources,
&dataflow_session,
local_working_dir,
uv,
)?;

dataflow_session.git_sources = git_sources;
dataflow_session
.write_out_for_dataflow(&dataflow_path)
.context("failed to write out dataflow session file")?;

// wait until dataflow build is finished

wait_until_dataflow_built(
build_id,
&mut *coordinator_session,
coordinator_socket(coordinator_addr, coordinator_port),
log::LevelFilter::Info,
)?;
}
}

Ok(())
}

enum BuildKind {
Local,
ThroughCoordinator {
coordinator_session: Box<TcpRequestReplyConnection>,
},
}

fn connect_to_coordinator_with_defaults(
coordinator_addr: Option<std::net::IpAddr>,
coordinator_port: Option<u16>,
) -> std::io::Result<Box<TcpRequestReplyConnection>> {
let coordinator_socket = coordinator_socket(coordinator_addr, coordinator_port);
connect_to_coordinator(coordinator_socket)
}

fn coordinator_socket(
coordinator_addr: Option<std::net::IpAddr>,
coordinator_port: Option<u16>,
) -> std::net::SocketAddr {
let coordinator_addr = coordinator_addr.unwrap_or(LOCALHOST);
let coordinator_port = coordinator_port.unwrap_or(DORA_COORDINATOR_PORT_CONTROL_DEFAULT);
(coordinator_addr, coordinator_port).into()
}

binaries/cli/src/check.rs → binaries/cli/src/command/check.rs View File


binaries/cli/src/logs.rs → binaries/cli/src/command/logs.rs View File


+ 55
- 0
binaries/cli/src/command/mod.rs View File

@@ -0,0 +1,55 @@
use std::path::{Path, PathBuf};

use communication_layer_request_reply::TcpRequestReplyConnection;
use dora_core::descriptor::Descriptor;
use dora_message::{cli_to_coordinator::ControlRequest, coordinator_to_cli::ControlRequestReply};
use eyre::{bail, Context, ContextCompat};

pub mod build;
pub mod check;
pub mod logs;
pub mod run;
pub mod start;
pub mod up;

fn local_working_dir(
dataflow_path: &Path,
dataflow_descriptor: &Descriptor,
coordinator_session: &mut TcpRequestReplyConnection,
) -> eyre::Result<Option<PathBuf>> {
Ok(
if dataflow_descriptor
.nodes
.iter()
.all(|n| n.deploy.machine.is_none())
&& cli_and_daemon_on_same_machine(coordinator_session)?
{
Some(
dunce::canonicalize(dataflow_path)
.context("failed to canonicalize dataflow file path")?
.parent()
.context("dataflow path has no parent dir")?
.to_owned(),
)
} else {
None
},
)
}

fn cli_and_daemon_on_same_machine(session: &mut TcpRequestReplyConnection) -> eyre::Result<bool> {
let reply_raw = session
.request(&serde_json::to_vec(&ControlRequest::CliAndDefaultDaemonOnSameMachine).unwrap())
.wrap_err("failed to send start dataflow message")?;

let result: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match result {
ControlRequestReply::CliAndDefaultDaemonIps {
default_daemon,
cli,
} => Ok(default_daemon.is_some() && default_daemon == cli),
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected start dataflow reply: {other:?}"),
}
}

+ 22
- 0
binaries/cli/src/command/run.rs View File

@@ -0,0 +1,22 @@
use dora_daemon::Daemon;
use eyre::Context;
use tokio::runtime::Builder;

use crate::{handle_dataflow_result, resolve_dataflow, session::DataflowSession};

pub fn run(dataflow: String, uv: bool) -> Result<(), eyre::Error> {
let dataflow_path = resolve_dataflow(dataflow).context("could not resolve dataflow")?;
let dataflow_session =
DataflowSession::read_session(&dataflow_path).context("failed to read DataflowSession")?;
let rt = Builder::new_multi_thread()
.enable_all()
.build()
.context("tokio runtime failed")?;
let result = rt.block_on(Daemon::run_dataflow(
&dataflow_path,
dataflow_session.build_id,
dataflow_session.session_id,
uv,
))?;
handle_dataflow_result(result, None)
}

binaries/cli/src/attach.rs → binaries/cli/src/command/start/attach.rs View File

@@ -1,4 +1,3 @@
use colored::Colorize;
use communication_layer_request_reply::{TcpConnection, TcpRequestReplyConnection};
use dora_core::descriptor::{resolve_path, CoreNodeKind, Descriptor, DescriptorExt};
use dora_message::cli_to_coordinator::ControlRequest;
@@ -16,6 +15,7 @@ use tracing::{error, info};
use uuid::Uuid;

use crate::handle_dataflow_result;
use crate::output::print_log_message;

pub fn attach_dataflow(
dataflow: Descriptor,
@@ -183,52 +183,6 @@ pub fn attach_dataflow(
}
}

pub fn print_log_message(log_message: LogMessage) {
let LogMessage {
build_id,
dataflow_id,
node_id,
daemon_id,
level,
target,
module_path: _,
file: _,
line: _,
message,
} = log_message;
let level = match level {
log::Level::Error => "ERROR".red(),
log::Level::Warn => "WARN ".yellow(),
log::Level::Info => "INFO ".green(),
other => format!("{other:5}").normal(),
};
let dataflow = if let Some(dataflow_id) = dataflow_id {
format!(" dataflow `{dataflow_id}`").cyan()
} else {
String::new().cyan()
};
let build = if let Some(build_id) = build_id {
format!(" build `{build_id}`").cyan()
} else {
String::new().cyan()
};
let daemon = match daemon_id {
Some(id) => format!(" on daemon `{id}`"),
None => " on default daemon".to_string(),
}
.bright_black();
let node = match node_id {
Some(node_id) => format!(" {node_id}").bold(),
None => "".normal(),
};
let target = match target {
Some(target) => format!(" {target}").dimmed(),
None => "".normal(),
};

println!("{level}{build}{dataflow}{daemon}{node}{target}: {message}");
}

enum AttachEvent {
Control(ControlRequest),
Log(eyre::Result<LogMessage>),

+ 167
- 0
binaries/cli/src/command/start/mod.rs View File

@@ -0,0 +1,167 @@
use communication_layer_request_reply::{TcpConnection, TcpRequestReplyConnection};
use dora_core::descriptor::{Descriptor, DescriptorExt};
use dora_message::{
cli_to_coordinator::ControlRequest, common::LogMessage, coordinator_to_cli::ControlRequestReply,
};
use eyre::{bail, Context};
use std::{
net::{SocketAddr, TcpStream},
path::PathBuf,
};
use uuid::Uuid;

use crate::{
connect_to_coordinator, output::print_log_message, resolve_dataflow, session::DataflowSession,
};
use attach::attach_dataflow;

mod attach;

pub fn start(
dataflow: String,
name: Option<String>,
coordinator_socket: SocketAddr,
attach: bool,
detach: bool,
hot_reload: bool,
uv: bool,
) -> eyre::Result<()> {
let (dataflow, dataflow_descriptor, mut session, dataflow_id) =
start_dataflow(dataflow, name, coordinator_socket, uv)?;

let attach = match (attach, detach) {
(true, true) => eyre::bail!("both `--attach` and `--detach` are given"),
(true, false) => true,
(false, true) => false,
(false, false) => {
println!("attaching to dataflow (use `--detach` to run in background)");
true
}
};

if attach {
let log_level = env_logger::Builder::new()
.filter_level(log::LevelFilter::Info)
.parse_default_env()
.build()
.filter();

attach_dataflow(
dataflow_descriptor,
dataflow,
dataflow_id,
&mut *session,
hot_reload,
coordinator_socket,
log_level,
)
} else {
// wait until dataflow is started
wait_until_dataflow_started(
dataflow_id,
&mut session,
coordinator_socket,
log::LevelFilter::Info,
)
}
}

fn start_dataflow(
dataflow: String,
name: Option<String>,
coordinator_socket: SocketAddr,
uv: bool,
) -> Result<(PathBuf, Descriptor, Box<TcpRequestReplyConnection>, Uuid), eyre::Error> {
let dataflow = resolve_dataflow(dataflow).context("could not resolve dataflow")?;
let dataflow_descriptor =
Descriptor::blocking_read(&dataflow).wrap_err("Failed to read yaml dataflow")?;
let dataflow_session =
DataflowSession::read_session(&dataflow).context("failed to read DataflowSession")?;

let mut session = connect_to_coordinator(coordinator_socket)
.wrap_err("failed to connect to dora coordinator")?;

let local_working_dir =
super::local_working_dir(&dataflow, &dataflow_descriptor, &mut *session)?;

let dataflow_id = {
let dataflow = dataflow_descriptor.clone();
let session: &mut TcpRequestReplyConnection = &mut *session;
let reply_raw = session
.request(
&serde_json::to_vec(&ControlRequest::Start {
build_id: dataflow_session.build_id,
session_id: dataflow_session.session_id,
dataflow,
name,
local_working_dir,
uv,
})
.unwrap(),
)
.wrap_err("failed to send start dataflow message")?;

let result: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match result {
ControlRequestReply::DataflowStartTriggered { uuid } => {
eprintln!("dataflow start triggered: {uuid}");
uuid
}
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected start dataflow reply: {other:?}"),
}
};
Ok((dataflow, dataflow_descriptor, session, dataflow_id))
}

fn wait_until_dataflow_started(
dataflow_id: Uuid,
session: &mut Box<TcpRequestReplyConnection>,
coordinator_addr: SocketAddr,
log_level: log::LevelFilter,
) -> eyre::Result<()> {
// subscribe to log messages
let mut log_session = TcpConnection {
stream: TcpStream::connect(coordinator_addr)
.wrap_err("failed to connect to dora coordinator")?,
};
log_session
.send(
&serde_json::to_vec(&ControlRequest::LogSubscribe {
dataflow_id,
level: log_level,
})
.wrap_err("failed to serialize message")?,
)
.wrap_err("failed to send log subscribe request to coordinator")?;
std::thread::spawn(move || {
while let Ok(raw) = log_session.receive() {
let parsed: eyre::Result<LogMessage> =
serde_json::from_slice(&raw).context("failed to parse log message");
match parsed {
Ok(log_message) => {
print_log_message(log_message);
}
Err(err) => {
tracing::warn!("failed to parse log message: {err:?}")
}
}
}
});

let reply_raw = session
.request(&serde_json::to_vec(&ControlRequest::WaitForSpawn { dataflow_id }).unwrap())
.wrap_err("failed to send start dataflow message")?;

let result: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match result {
ControlRequestReply::DataflowSpawned { uuid } => {
eprintln!("dataflow started: {uuid}");
}
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected start dataflow reply: {other:?}"),
}
Ok(())
}

binaries/cli/src/up.rs → binaries/cli/src/command/up.rs View File

@@ -1,4 +1,4 @@
use crate::{check::daemon_running, connect_to_coordinator, LOCALHOST};
use crate::{command::check::daemon_running, connect_to_coordinator, LOCALHOST};
use dora_core::topics::DORA_COORDINATOR_PORT_CONTROL_DEFAULT;
use dora_message::{cli_to_coordinator::ControlRequest, coordinator_to_cli::ControlRequestReply};
use eyre::{bail, Context, ContextCompat};

+ 30
- 320
binaries/cli/src/lib.rs View File

@@ -1,11 +1,8 @@
use attach::{attach_dataflow, print_log_message};
use colored::Colorize;
use communication_layer_request_reply::{
RequestReplyLayer, TcpConnection, TcpLayer, TcpRequestReplyConnection,
};
use communication_layer_request_reply::{RequestReplyLayer, TcpLayer, TcpRequestReplyConnection};
use dora_coordinator::Event;
use dora_core::{
descriptor::{source_is_url, CoreNodeKind, CustomNode, Descriptor, DescriptorExt},
descriptor::{source_is_url, Descriptor, DescriptorExt},
topics::{
DORA_COORDINATOR_PORT_CONTROL_DEFAULT, DORA_COORDINATOR_PORT_DEFAULT,
DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT,
@@ -15,10 +12,7 @@ use dora_daemon::Daemon;
use dora_download::download_file;
use dora_message::{
cli_to_coordinator::ControlRequest,
common::LogMessage,
coordinator_to_cli::{ControlRequestReply, DataflowList, DataflowResult, DataflowStatus},
descriptor::NodeSource,
BuildId,
};
#[cfg(feature = "tracing")]
use dora_tracing::set_up_tracing;
@@ -26,12 +20,7 @@ use dora_tracing::{set_up_tracing_opts, FileLogging};
use duration_str::parse;
use eyre::{bail, Context};
use formatting::FormatDataflowError;
use std::{
collections::BTreeMap,
env::current_dir,
io::Write,
net::{SocketAddr, TcpStream},
};
use std::{env::current_dir, io::Write, net::SocketAddr};
use std::{
net::{IpAddr, Ipv4Addr},
path::PathBuf,
@@ -42,16 +31,12 @@ use tokio::runtime::Builder;
use tracing::level_filters::LevelFilter;
use uuid::Uuid;

mod attach;
mod check;
pub mod command;
mod formatting;
mod git;
mod graph;
mod logs;
pub mod run;
pub mod output;
pub mod session;
mod template;
mod up;

const LOCALHOST: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
const LISTEN_WILDCARD: IpAddr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0));
@@ -96,14 +81,17 @@ enum Command {
#[clap(value_name = "PATH")]
dataflow: String,
/// Address of the dora coordinator
#[clap(long, value_name = "IP", default_value_t = LOCALHOST)]
coordinator_addr: IpAddr,
#[clap(long, value_name = "IP")]
coordinator_addr: Option<IpAddr>,
/// Port number of the coordinator control server
#[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)]
coordinator_port: u16,
#[clap(long, value_name = "PORT")]
coordinator_port: Option<u16>,
// Use UV to build nodes.
#[clap(long, action)]
uv: bool,
// Run build on local machine
#[clap(long, action)]
local: bool,
},
/// Generate a new project or node. Choose the language between Rust, Python, C or C++.
New {
@@ -353,12 +341,6 @@ fn run_cli(args: Args) -> eyre::Result<()> {
}
};

let log_level = env_logger::Builder::new()
.filter_level(log::LevelFilter::Info)
.parse_default_env()
.build()
.filter();

match args.command {
Command::Check {
dataflow,
@@ -373,9 +355,9 @@ fn run_cli(args: Args) -> eyre::Result<()> {
.ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))?
.to_owned();
Descriptor::blocking_read(&dataflow)?.check(&working_dir)?;
check::check_environment((coordinator_addr, coordinator_port).into())?
command::check::check_environment((coordinator_addr, coordinator_port).into())?
}
None => check::check_environment((coordinator_addr, coordinator_port).into())?,
None => command::check::check_environment((coordinator_addr, coordinator_port).into())?,
},
Command::Graph {
dataflow,
@@ -389,24 +371,15 @@ fn run_cli(args: Args) -> eyre::Result<()> {
coordinator_addr,
coordinator_port,
uv,
} => {
let coordinator_socket = (coordinator_addr, coordinator_port).into();
let (mut session, build_id) = build_dataflow(dataflow, coordinator_socket, uv)?;
// wait until dataflow build is finished
wait_until_dataflow_built(
build_id,
&mut session,
coordinator_socket,
log::LevelFilter::Info,
)?;
}
local,
} => command::build::build(dataflow, coordinator_addr, coordinator_port, uv, local)?,
Command::New {
args,
internal_create_with_path_dependencies,
} => template::create(args, internal_create_with_path_dependencies)?,
Command::Run { dataflow, uv } => run::run(dataflow, uv)?,
Command::Run { dataflow, uv } => command::run::run(dataflow, uv)?,
Command::Up { config } => {
up::up(config.as_deref())?;
command::up::up(config.as_deref())?;
}
Command::Logs {
dataflow,
@@ -421,7 +394,7 @@ fn run_cli(args: Args) -> eyre::Result<()> {
if let Some(dataflow) = dataflow {
let uuid = Uuid::parse_str(&dataflow).ok();
let name = if uuid.is_some() { None } else { Some(dataflow) };
logs::logs(&mut *session, uuid, name, node)?
command::logs::logs(&mut *session, uuid, name, node)?
} else {
let active: Vec<dora_message::coordinator_to_cli::DataflowIdAndName> =
list.get_active();
@@ -430,7 +403,7 @@ fn run_cli(args: Args) -> eyre::Result<()> {
[uuid] => uuid.clone(),
_ => inquire::Select::new("Choose dataflow to show logs:", active).prompt()?,
};
logs::logs(&mut *session, Some(uuid.uuid), None, node)?
command::logs::logs(&mut *session, Some(uuid.uuid), None, node)?
}
}
Command::Start {
@@ -444,38 +417,15 @@ fn run_cli(args: Args) -> eyre::Result<()> {
uv,
} => {
let coordinator_socket = (coordinator_addr, coordinator_port).into();
let (dataflow, dataflow_descriptor, mut session, dataflow_id) =
start_dataflow(dataflow, name, coordinator_socket, uv)?;

let attach = match (attach, detach) {
(true, true) => eyre::bail!("both `--attach` and `--detach` are given"),
(true, false) => true,
(false, true) => false,
(false, false) => {
println!("attaching to dataflow (use `--detach` to run in background)");
true
}
};

if attach {
attach_dataflow(
dataflow_descriptor,
dataflow,
dataflow_id,
&mut *session,
hot_reload,
coordinator_socket,
log_level,
)?
} else {
// wait until dataflow is started
wait_until_dataflow_started(
dataflow_id,
&mut session,
coordinator_socket,
log::LevelFilter::Info,
)?;
}
command::start::start(
dataflow,
name,
coordinator_socket,
attach,
detach,
hot_reload,
uv,
)?
}
Command::List {
coordinator_addr,
@@ -505,7 +455,7 @@ fn run_cli(args: Args) -> eyre::Result<()> {
config,
coordinator_addr,
coordinator_port,
} => up::destroy(
} => command::up::destroy(
config.as_deref(),
(coordinator_addr, coordinator_port).into(),
)?,
@@ -629,246 +579,6 @@ fn run_cli(args: Args) -> eyre::Result<()> {
Ok(())
}

fn build_dataflow(
dataflow: String,
coordinator_socket: SocketAddr,
uv: bool,
) -> eyre::Result<(Box<TcpRequestReplyConnection>, BuildId)> {
let dataflow_path = resolve_dataflow(dataflow).context("could not resolve dataflow")?;
let dataflow_descriptor =
Descriptor::blocking_read(&dataflow_path).wrap_err("Failed to read yaml dataflow")?;
let mut dataflow_session =
DataflowSession::read_session(&dataflow_path).context("failed to read DataflowSession")?;

let mut git_sources = BTreeMap::new();
let resolved_nodes = dataflow_descriptor
.resolve_aliases_and_set_defaults()
.context("failed to resolve nodes")?;
for (node_id, node) in resolved_nodes {
if let CoreNodeKind::Custom(CustomNode {
source: NodeSource::GitBranch { repo, rev },
..
}) = node.kind
{
let source = git::fetch_commit_hash(repo, rev)
.with_context(|| format!("failed to find commit hash for `{node_id}`"))?;
git_sources.insert(node_id, source);
}
}

let cli_and_daemon_on_same_machine = false; // TODO FIXME set to true if on same machine
let local_working_dir = if cli_and_daemon_on_same_machine {
// use dataflow dir as base working dir
Some(
dunce::canonicalize(&dataflow_path)
.context("failed to canonicalize dataflow path")?
.parent()
.ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))?
.to_owned(),
)
} else {
None
};
let mut session = connect_to_coordinator(coordinator_socket)
.wrap_err("failed to connect to dora coordinator")?;
let build_id = {
let dataflow = dataflow_descriptor.clone();
let session: &mut TcpRequestReplyConnection = &mut *session;
let reply_raw = session
.request(
&serde_json::to_vec(&ControlRequest::Build {
session_id: dataflow_session.session_id,
dataflow,
git_sources: git_sources.clone(),
prev_git_sources: dataflow_session.git_sources.clone(),
local_working_dir,
uv,
})
.unwrap(),
)
.wrap_err("failed to send start dataflow message")?;

let result: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match result {
ControlRequestReply::DataflowBuildTriggered { build_id } => {
eprintln!("dataflow build triggered: {build_id}");
dataflow_session.git_sources = git_sources;
dataflow_session
.write_out_for_dataflow(&dataflow_path)
.context("failed to write out dataflow session file")?;
build_id
}
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected start dataflow reply: {other:?}"),
}
};
Ok((session, build_id))
}

fn wait_until_dataflow_built(
build_id: BuildId,
session: &mut Box<TcpRequestReplyConnection>,
coordinator_addr: SocketAddr,
log_level: log::LevelFilter,
) -> eyre::Result<BuildId> {
// subscribe to log messages
let mut log_session = TcpConnection {
stream: TcpStream::connect(coordinator_addr)
.wrap_err("failed to connect to dora coordinator")?,
};
log_session
.send(
&serde_json::to_vec(&ControlRequest::BuildLogSubscribe {
build_id,
level: log_level,
})
.wrap_err("failed to serialize message")?,
)
.wrap_err("failed to send build log subscribe request to coordinator")?;
std::thread::spawn(move || {
while let Ok(raw) = log_session.receive() {
let parsed: eyre::Result<LogMessage> =
serde_json::from_slice(&raw).context("failed to parse log message");
match parsed {
Ok(log_message) => {
print_log_message(log_message);
}
Err(err) => {
tracing::warn!("failed to parse log message: {err:?}")
}
}
}
});

let reply_raw = session
.request(&serde_json::to_vec(&ControlRequest::WaitForBuild { build_id }).unwrap())
.wrap_err("failed to send WaitForBuild message")?;

let result: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match result {
ControlRequestReply::DataflowBuildFinished { build_id, result } => match result {
Ok(()) => {
eprintln!("dataflow build finished successfully");
Ok(build_id)
}
Err(err) => bail!("{err}"),
},
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected start dataflow reply: {other:?}"),
}
}

fn start_dataflow(
dataflow: String,
name: Option<String>,
coordinator_socket: SocketAddr,
uv: bool,
) -> Result<(PathBuf, Descriptor, Box<TcpRequestReplyConnection>, Uuid), eyre::Error> {
let dataflow = resolve_dataflow(dataflow).context("could not resolve dataflow")?;
let dataflow_descriptor =
Descriptor::blocking_read(&dataflow).wrap_err("Failed to read yaml dataflow")?;
let dataflow_session =
DataflowSession::read_session(&dataflow).context("failed to read DataflowSession")?;

let cli_and_daemon_on_same_machine = false; // TODO FIXME set to true if on same machine
let local_working_dir = if cli_and_daemon_on_same_machine {
// use dataflow dir as base working dir
Some(
dunce::canonicalize(&dataflow)
.context("failed to canonicalize dataflow path")?
.parent()
.ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))?
.to_owned(),
)
} else {
None
};
let mut session = connect_to_coordinator(coordinator_socket)
.wrap_err("failed to connect to dora coordinator")?;
let dataflow_id = {
let dataflow = dataflow_descriptor.clone();
let session: &mut TcpRequestReplyConnection = &mut *session;
let reply_raw = session
.request(
&serde_json::to_vec(&ControlRequest::Start {
build_id: dataflow_session.build_id,
session_id: dataflow_session.session_id,
dataflow,
name,
local_working_dir,
uv,
})
.unwrap(),
)
.wrap_err("failed to send start dataflow message")?;

let result: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match result {
ControlRequestReply::DataflowStartTriggered { uuid } => {
eprintln!("dataflow start triggered: {uuid}");
uuid
}
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected start dataflow reply: {other:?}"),
}
};
Ok((dataflow, dataflow_descriptor, session, dataflow_id))
}

fn wait_until_dataflow_started(
dataflow_id: Uuid,
session: &mut Box<TcpRequestReplyConnection>,
coordinator_addr: SocketAddr,
log_level: log::LevelFilter,
) -> eyre::Result<()> {
// subscribe to log messages
let mut log_session = TcpConnection {
stream: TcpStream::connect(coordinator_addr)
.wrap_err("failed to connect to dora coordinator")?,
};
log_session
.send(
&serde_json::to_vec(&ControlRequest::LogSubscribe {
dataflow_id,
level: log_level,
})
.wrap_err("failed to serialize message")?,
)
.wrap_err("failed to send log subscribe request to coordinator")?;
std::thread::spawn(move || {
while let Ok(raw) = log_session.receive() {
let parsed: eyre::Result<LogMessage> =
serde_json::from_slice(&raw).context("failed to parse log message");
match parsed {
Ok(log_message) => {
print_log_message(log_message);
}
Err(err) => {
tracing::warn!("failed to parse log message: {err:?}")
}
}
}
});

let reply_raw = session
.request(&serde_json::to_vec(&ControlRequest::WaitForSpawn { dataflow_id }).unwrap())
.wrap_err("failed to send start dataflow message")?;

let result: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match result {
ControlRequestReply::DataflowSpawned { uuid } => {
eprintln!("dataflow started: {uuid}");
}
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected start dataflow reply: {other:?}"),
}
Ok(())
}

fn stop_dataflow_interactive(
grace_duration: Option<Duration>,
session: &mut TcpRequestReplyConnection,


+ 48
- 0
binaries/cli/src/output.rs View File

@@ -0,0 +1,48 @@
use colored::Colorize;
use dora_message::common::LogMessage;

pub fn print_log_message(log_message: LogMessage) {
let LogMessage {
build_id,
dataflow_id,
node_id,
daemon_id,
level,
target,
module_path: _,
file: _,
line: _,
message,
} = log_message;
let level = match level {
log::Level::Error => "ERROR".red(),
log::Level::Warn => "WARN ".yellow(),
log::Level::Info => "INFO ".green(),
other => format!("{other:5}").normal(),
};
let dataflow = if let Some(dataflow_id) = dataflow_id {
format!(" dataflow `{dataflow_id}`").cyan()
} else {
String::new().cyan()
};
let build = if let Some(build_id) = build_id {
format!(" build `{build_id}`").cyan()
} else {
String::new().cyan()
};
let daemon = match daemon_id {
Some(id) => format!(" on daemon `{id}`"),
None => " on default daemon".to_string(),
}
.bright_black();
let node = match node_id {
Some(node_id) => format!(" {node_id}").bold(),
None => "".normal(),
};
let target = match target {
Some(target) => format!(" {target}").dimmed(),
None => "".normal(),
};

println!("{level}{build}{dataflow}{daemon}{node}{target}: {message}");
}

+ 9
- 1
binaries/coordinator/src/control.rs View File

@@ -81,6 +81,7 @@ async fn handle_requests(
tx: mpsc::Sender<ControlEvent>,
_finish_tx: mpsc::Sender<()>,
) {
let peer_addr = connection.peer_addr().ok();
loop {
let next_request = tcp_receive(&mut connection).map(Either::Left);
let coordinator_stopped = tx.closed().map(Either::Right);
@@ -127,11 +128,18 @@ async fn handle_requests(
break;
}

let result = match request {
let mut result = match request {
Ok(request) => handle_request(request, &tx).await,
Err(err) => Err(err),
};

if let Ok(ControlRequestReply::CliAndDefaultDaemonIps { cli, .. }) = &mut result {
if cli.is_none() {
// fill cli IP address in reply
*cli = peer_addr.map(|s| s.ip());
}
}

let reply = result.unwrap_or_else(|err| ControlRequestReply::Error(format!("{err:?}")));
let serialized: Vec<u8> =
match serde_json::to_vec(&reply).wrap_err("failed to serialize ControlRequestReply") {


+ 20
- 0
binaries/coordinator/src/lib.rs View File

@@ -144,6 +144,10 @@ impl DaemonConnections {
}
}

fn get(&self, id: &DaemonId) -> Option<&DaemonConnection> {
self.daemons.get(id)
}

fn get_mut(&mut self, id: &DaemonId) -> Option<&mut DaemonConnection> {
self.daemons.get_mut(id)
}
@@ -710,6 +714,22 @@ async fn start_inner(
"BuildLogSubscribe request should be handled separately"
)));
}
ControlRequest::CliAndDefaultDaemonOnSameMachine => {
let mut default_daemon_ip = None;
if let Some(default_id) = daemon_connections.unnamed().next() {
if let Some(connection) = daemon_connections.get(default_id) {
if let Ok(addr) = connection.stream.peer_addr() {
default_daemon_ip = Some(addr.ip());
}
}
}
let _ = reply_sender.send(Ok(
ControlRequestReply::CliAndDefaultDaemonIps {
default_daemon: default_daemon_ip,
cli: None, // filled later
},
));
}
}
}
ControlEvent::Error(err) => tracing::error!("{err:?}"),


+ 1
- 0
libraries/message/src/cli_to_coordinator.rs View File

@@ -80,4 +80,5 @@ pub enum ControlRequest {
build_id: BuildId,
level: log::LevelFilter,
},
CliAndDefaultDaemonOnSameMachine,
}

+ 8
- 1
libraries/message/src/coordinator_to_cli.rs View File

@@ -1,4 +1,7 @@
use std::collections::{BTreeMap, BTreeSet};
use std::{
collections::{BTreeMap, BTreeSet},
net::IpAddr,
};

use uuid::Uuid;

@@ -34,6 +37,10 @@ pub enum ControlRequestReply {
DaemonConnected(bool),
ConnectedDaemons(BTreeSet<DaemonId>),
Logs(Vec<u8>),
CliAndDefaultDaemonIps {
default_daemon: Option<IpAddr>,
cli: Option<IpAddr>,
},
}

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]


Loading…
Cancel
Save