Browse Source

adding daemon, coordinator and runtime command in the cli

tags/v0.3.2-rc0
haixuanTao 2 years ago
parent
commit
6dfd4b17b3
5 changed files with 144 additions and 10 deletions
  1. +6
    -1
      Cargo.lock
  2. +6
    -0
      binaries/cli/Cargo.toml
  3. +129
    -5
      binaries/cli/src/main.rs
  4. +3
    -3
      binaries/coordinator/src/lib.rs
  5. +0
    -1
      binaries/daemon/Cargo.toml

+ 6
- 1
Cargo.lock View File

@@ -1417,17 +1417,23 @@ dependencies = [
"clap 4.4.6",
"communication-layer-request-reply",
"ctrlc",
"dora-coordinator",
"dora-core",
"dora-daemon",
"dora-node-api-c",
"dora-operator-api-c",
"dora-runtime",
"dora-tracing",
"eyre",
"futures",
"inquire",
"notify",
"serde",
"serde_json",
"serde_yaml 0.9.25",
"termcolor",
"tokio",
"tokio-stream",
"tracing",
"uuid",
"webbrowser",
@@ -1488,7 +1494,6 @@ dependencies = [
"ctrlc",
"dora-core",
"dora-download",
"dora-runtime",
"dora-tracing",
"eyre",
"flume",


+ 6
- 0
binaries/cli/Cargo.toml View File

@@ -35,3 +35,9 @@ ctrlc = "3.2.5"
tracing = "0.1.36"
dora-tracing = { workspace = true, optional = true }
bat = "0.23.0"
dora-daemon = { workspace = true }
dora-coordinator = { workspace = true }
dora-runtime = { workspace = true }
tokio = { version = "1.20.1", features = ["full"] }
tokio-stream = { version = "0.1.8", features = ["io-util", "net"] }
futures = "0.3.21"

+ 129
- 5
binaries/cli/src/main.rs View File

@@ -1,15 +1,25 @@
use std::path::PathBuf;
use std::{net::Ipv4Addr, path::PathBuf};

use attach::attach_dataflow;
use clap::Parser;
use communication_layer_request_reply::{RequestReplyLayer, TcpLayer, TcpRequestReplyConnection};
use dora_core::{
daemon_messages::Timestamped,
descriptor::Descriptor,
topics::{control_socket_addr, ControlRequest, ControlRequestReply, DataflowId},
message::uhlc::HLC,
topics::{
control_socket_addr, ControlRequest, ControlRequestReply, DataflowId,
DORA_COORDINATOR_PORT_DEFAULT,
},
};
use dora_daemon::Daemon;
#[cfg(feature = "tracing")]
use dora_tracing::set_up_tracing;
use eyre::{bail, Context};
use futures::{Stream, StreamExt};
use std::net::SocketAddr;
use tokio::{runtime::Builder, sync::mpsc};
use tokio_stream::wrappers::ReceiverStream;
use uuid::Uuid;

mod attach;
@@ -92,6 +102,24 @@ enum Command {
// Stats,
// Get,
// Upgrade,
/// Run daemon
Daemon {
#[clap(long)]
machine_id: Option<String>,
#[clap(long)]
coordinator_addr: Option<SocketAddr>,

#[clap(long)]
run_dataflow: Option<PathBuf>,
},
/// Run runtime
Runtime,
/// Run coordinator
Coordinator { port: Option<u16> },
}

enum Event {
CtrlC,
}

#[derive(Debug, clap::Args)]
@@ -127,10 +155,26 @@ fn main() {
}

fn run() -> eyre::Result<()> {
#[cfg(feature = "tracing")]
set_up_tracing("dora-cli").context("failed to set up tracing subscriber")?;
let args = Args::parse();

#[cfg(feature = "tracing")]
match args.command {
Command::Daemon { .. } => {
set_up_tracing("dora-daemon").context("failed to set up tracing subscriber")?;
}
Command::Runtime => {
set_up_tracing("dora-runtime").context("failed to set up tracing subscriber")?;
}
Command::Coordinator { .. } => {
set_up_tracing("dora-coordinator").context("failed to set up tracing subscriber")?;
}
_ => {
set_up_tracing("dora-cli").context("failed to set up tracing subscriber")?;
}
};

let ctrlc_events = set_up_ctrlc_handler()?;

match args.command {
Command::Check { dataflow } => match dataflow {
Some(dataflow) => {
@@ -227,7 +271,65 @@ fn run() -> eyre::Result<()> {
}
}
Command::Destroy { config } => up::destroy(config.as_deref())?,
}
Command::Coordinator { port } => {
let rt = Builder::new_multi_thread()
.enable_all()
.build()
.context("tokio runtime failed")?;
rt.block_on(async {
let (_, task) = dora_coordinator::start(
port,
ctrlc_events.map(|event| match event {
Event::CtrlC => dora_coordinator::Event::CtrlC,
}),
)
.await?;
task.await
})
.context("failed to run dora-coordinator")?
}
Command::Daemon {
coordinator_addr,
machine_id,
run_dataflow,
} => {
let rt = Builder::new_multi_thread()
.enable_io()
.build()
.context("tokio runtime failed")?;
rt.block_on(async {
match run_dataflow {
Some(dataflow_path) => {
tracing::info!("Starting dataflow `{}`", dataflow_path.display());

Daemon::run_dataflow(&dataflow_path).await
}
None => {
Daemon::run(
coordinator_addr.unwrap_or_else(|| {
tracing::info!("Starting in local mode");
let localhost = Ipv4Addr::new(127, 0, 0, 1);
(localhost, DORA_COORDINATOR_PORT_DEFAULT).into()
}),
machine_id.unwrap_or_default(),
ctrlc_events.map(|event| {
let clock = HLC::default();
match event {
Event::CtrlC => Timestamped {
inner: dora_daemon::Event::CtrlC,
timestamp: clock.new_timestamp(),
},
}
}),
)
.await
}
}
})
.context("failed to run dora-daemon")?
}
Command::Runtime => dora_runtime::main().context("Failed to run dora-runtime")?,
};

Ok(())
}
@@ -349,3 +451,25 @@ fn query_running_dataflows(
fn connect_to_coordinator() -> std::io::Result<Box<TcpRequestReplyConnection>> {
TcpLayer::new().connect(control_socket_addr())
}

fn set_up_ctrlc_handler() -> Result<impl Stream<Item = Event>, eyre::ErrReport> {
let (ctrlc_tx, ctrlc_rx) = mpsc::channel(1);

let mut ctrlc_sent = false;
ctrlc::set_handler(move || {
if ctrlc_sent {
tracing::warn!("received second ctrlc signal -> aborting immediately");
std::process::abort();
} else {
tracing::info!("received ctrlc signal");
if ctrlc_tx.blocking_send(Event::CtrlC).is_err() {
tracing::error!("failed to report ctrl-c event to dora-coordinator");
}

ctrlc_sent = true;
}
})
.wrap_err("failed to set ctrl-c handler")?;

Ok(ReceiverStream::new(ctrlc_rx))
}

+ 3
- 3
binaries/coordinator/src/lib.rs View File

@@ -48,7 +48,7 @@ pub struct Args {
pub async fn run(args: Args) -> eyre::Result<()> {
let ctrlc_events = set_up_ctrlc_handler()?;

let (_, task) = start(args, ctrlc_events).await?;
let (_, task) = start(None, ctrlc_events).await?;

task.await?;

@@ -56,10 +56,10 @@ pub async fn run(args: Args) -> eyre::Result<()> {
}

pub async fn start(
args: Args,
port: Option<u16>,
external_events: impl Stream<Item = Event> + Unpin,
) -> Result<(u16, impl Future<Output = eyre::Result<()>>), eyre::ErrReport> {
let port = args.port.unwrap_or(DORA_COORDINATOR_PORT_DEFAULT);
let port = port.unwrap_or(DORA_COORDINATOR_PORT_DEFAULT);
let listener = listener::create_listener(port).await?;
let port = listener
.local_addr()


+ 0
- 1
binaries/daemon/Cargo.toml View File

@@ -25,7 +25,6 @@ futures-concurrency = "7.1.0"
serde = { version = "1.0.136", features = ["derive"] }
serde_json = "1.0.86"
dora-core = { workspace = true }
dora-runtime = { workspace = true }
flume = "0.10.14"
dora-download = { workspace = true }
dora-tracing = { workspace = true, optional = true }


Loading…
Cancel
Save