From cc1dc971edf078b1c4fa2e68d1160ed7f74eb650 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Fri, 9 Dec 2022 17:24:59 +0100 Subject: [PATCH] Update dora-coordinator to start dataflows through dora-daemon --- binaries/coordinator/src/lib.rs | 8 ++- binaries/coordinator/src/run/custom.rs | 96 -------------------------- binaries/coordinator/src/run/mod.rs | 89 ++++++++++++++---------- binaries/daemon/src/main.rs | 29 +------- binaries/daemon/src/spawn.rs | 6 +- libraries/core/src/daemon_messages.rs | 28 +++++++- libraries/core/src/descriptor/mod.rs | 2 +- 7 files changed, 91 insertions(+), 167 deletions(-) delete mode 100644 binaries/coordinator/src/run/custom.rs diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 1549f991..0143b5f8 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -48,10 +48,12 @@ pub async fn run(args: Args) -> eyre::Result<()> { .with_file_name("dora-runtime") }); + let daemon_connections = &mut HashMap::new(); // TODO + match run_dataflow { Some(path) => { // start the given dataflow directly - run::run_dataflow(&path, &runtime_path) + run::run_dataflow(&path, &runtime_path, daemon_connections) .await .wrap_err_with(|| format!("failed to run dataflow at {}", path.display()))?; } @@ -168,6 +170,7 @@ async fn start(runtime_path: &Path) -> eyre::Result<()> { name, runtime_path, &dataflow_events_tx, + &mut daemon_connections, ) .await?; Ok(dataflow) @@ -331,6 +334,7 @@ async fn start_dataflow( name: Option, runtime_path: &Path, dataflow_events_tx: &Option>, + daemon_connections: &mut HashMap, ) -> eyre::Result { // TODO: send Spawn message to daemon @@ -343,7 +347,7 @@ async fn start_dataflow( uuid, communication_config, tasks, - } = spawn_dataflow(&runtime_path, path).await?; + } = spawn_dataflow(&runtime_path, path, daemon_connections).await?; let path = path.to_owned(); let task = async move { let result = await_tasks(tasks) diff --git a/binaries/coordinator/src/run/custom.rs b/binaries/coordinator/src/run/custom.rs deleted file mode 100644 index cc29ab01..00000000 --- a/binaries/coordinator/src/run/custom.rs +++ /dev/null @@ -1,96 +0,0 @@ -use super::command_init_common_env; -use dora_core::{ - config::NodeId, - descriptor::{self, resolve_path, source_is_url, EnvValue}, -}; -use dora_download::download_file; -use eyre::{bail, eyre, WrapErr}; -use std::{collections::BTreeMap, env::consts::EXE_EXTENSION, path::Path}; - -const SHELL_SOURCE: &str = "shell"; - -#[tracing::instrument] -pub(super) async fn spawn_custom_node( - node_id: NodeId, - node: &descriptor::CustomNode, - envs: &Option>, - communication: &dora_core::config::CommunicationConfig, - working_dir: &Path, -) -> eyre::Result>> { - let resolved_path = if source_is_url(&node.source) { - // try to download the shared library - let target_path = Path::new("build") - .join(node_id.to_string()) - .with_extension(EXE_EXTENSION); - download_file(&node.source, &target_path) - .await - .wrap_err("failed to download custom node")?; - Ok(target_path.clone()) - } else { - resolve_path(&node.source, working_dir) - }; - - let mut command = if let Ok(path) = &resolved_path { - let mut command = tokio::process::Command::new(path); - if let Some(args) = &node.args { - command.args(args.split_ascii_whitespace()); - } - command - } else if node.source == SHELL_SOURCE { - if cfg!(target_os = "windows") { - let mut cmd = tokio::process::Command::new("cmd"); - cmd.args(["/C", &node.args.clone().unwrap_or_default()]); - cmd - } else { - let mut cmd = tokio::process::Command::new("sh"); - cmd.args(["-c", &node.args.clone().unwrap_or_default()]); - cmd - } - } else { - bail!("could not understand node source: {}", node.source); - }; - - command_init_common_env(&mut command, &node_id, communication)?; - command.env( - "DORA_NODE_RUN_CONFIG", - serde_yaml::to_string(&node.run_config) - .wrap_err("failed to serialize custom node run config")?, - ); - command.current_dir(working_dir); - - // Injecting the env variable defined in the `yaml` into - // the node runtime. - if let Some(envs) = envs { - for (key, value) in envs { - command.env(key, value.to_string()); - } - } - - let mut child = command.spawn().wrap_err_with(|| { - if let Ok(path) = resolved_path { - format!( - "failed to run source path: `{}` with args `{}`", - path.display(), - node.args.as_deref().unwrap_or_default() - ) - } else { - format!( - "failed to run command: `{}` with args `{}`", - node.source, - node.args.as_deref().unwrap_or_default() - ) - } - })?; - let result = tokio::spawn(async move { - let status = child.wait().await.context("child process failed")?; - if status.success() { - tracing::info!("node {node_id} finished"); - Ok(()) - } else if let Some(code) = status.code() { - Err(eyre!("node {node_id} failed with exit code: {code}")) - } else { - Err(eyre!("node {node_id} failed (unknown exit code)")) - } - }); - Ok(result) -} diff --git a/binaries/coordinator/src/run/mod.rs b/binaries/coordinator/src/run/mod.rs index 17b3c902..e7ef3022 100644 --- a/binaries/coordinator/src/run/mod.rs +++ b/binaries/coordinator/src/run/mod.rs @@ -1,23 +1,40 @@ -use self::{custom::spawn_custom_node, runtime::spawn_runtime_node}; +use crate::tcp_utils::tcp_send; + +use self::runtime::spawn_runtime_node; use dora_core::{ config::{format_duration, CommunicationConfig, NodeId}, + daemon_messages::{DaemonCoordinatorEvent, SpawnDataflowNodes, SpawnNodeParams}, descriptor::{self, collect_dora_timers, CoreNodeKind, Descriptor}, }; -use eyre::{bail, eyre, WrapErr}; +use eyre::{bail, eyre, ContextCompat, WrapErr}; use futures::{stream::FuturesUnordered, StreamExt}; -use std::{env::consts::EXE_EXTENSION, path::Path}; +use std::{ + collections::{BTreeMap, HashMap}, + env::consts::EXE_EXTENSION, + path::Path, +}; +use tokio::net::TcpStream; use tokio_stream::wrappers::IntervalStream; use uuid::Uuid; -mod custom; mod runtime; -pub async fn run_dataflow(dataflow_path: &Path, runtime: &Path) -> eyre::Result<()> { - let tasks = spawn_dataflow(runtime, dataflow_path).await?.tasks; +pub async fn run_dataflow( + dataflow_path: &Path, + runtime: &Path, + daemon_connections: &mut HashMap, +) -> eyre::Result<()> { + let tasks = spawn_dataflow(runtime, dataflow_path, daemon_connections) + .await? + .tasks; await_tasks(tasks).await } -pub async fn spawn_dataflow(runtime: &Path, dataflow_path: &Path) -> eyre::Result { +pub async fn spawn_dataflow( + runtime: &Path, + dataflow_path: &Path, + daemon_connections: &mut HashMap, +) -> eyre::Result { let mut runtime = runtime.with_extension(EXE_EXTENSION); let descriptor = read_descriptor(dataflow_path).await.wrap_err_with(|| { format!( @@ -57,39 +74,37 @@ pub async fn spawn_dataflow(runtime: &Path, dataflow_path: &Path) -> eyre::Resul } } } - let tasks = FuturesUnordered::new(); - for node in nodes { - let node_id = node.id.clone(); + let mut custom_nodes = BTreeMap::new(); + for node in nodes { match node.kind { - descriptor::CoreNodeKind::Custom(custom) => { - let result = spawn_custom_node( - node_id.clone(), - &custom, - &node.env, - &communication_config, - &working_dir, - ) - .await - .wrap_err_with(|| format!("failed to spawn custom node {node_id}"))?; - tasks.push(result); - } - descriptor::CoreNodeKind::Runtime(runtime_node) => { - if !runtime_node.operators.is_empty() { - let result = spawn_runtime_node( - &runtime, - node_id.clone(), - &runtime_node, - &node.env, - &communication_config, - &working_dir, - ) - .wrap_err_with(|| format!("failed to spawn runtime node {node_id}"))?; - tasks.push(result); - } + CoreNodeKind::Runtime(_) => todo!(), + CoreNodeKind::Custom(n) => { + custom_nodes.insert( + node.id.clone(), + SpawnNodeParams { + node_id: node.id, + node: n, + working_dir: working_dir.clone(), + }, + ); } } } + + let spawn_command = SpawnDataflowNodes { + dataflow_id: uuid, + nodes: custom_nodes, + }; + let message = serde_json::to_vec(&DaemonCoordinatorEvent::Spawn(spawn_command))?; + let daemon_connection = daemon_connections + .get_mut("") + .wrap_err("no daemon connection")?; // TODO: take from dataflow spec + tcp_send(daemon_connection, &message) + .await + .wrap_err("failed to send spawn message to daemon")?; + + // TODO for interval in dora_timers { let communication_config = communication_config.clone(); // let mut communication = @@ -112,12 +127,12 @@ pub async fn spawn_dataflow(runtime: &Path, dataflow_path: &Path) -> eyre::Resul // .unwrap() // .publish(&data) // .expect("failed to publish timer tick message"); - todo!() + // todo!() } }); } Ok(SpawnedDataflow { - tasks, + tasks: FuturesUnordered::new(), // TODO communication_config, uuid, }) diff --git a/binaries/daemon/src/main.rs b/binaries/daemon/src/main.rs index 3df07bbc..b8e62cd6 100644 --- a/binaries/daemon/src/main.rs +++ b/binaries/daemon/src/main.rs @@ -1,7 +1,6 @@ use dora_core::{ config::{DataId, NodeId}, - daemon_messages::{self, ControlReply}, - descriptor, + daemon_messages::{self, ControlReply, DaemonCoordinatorEvent, DataflowId, SpawnDataflowNodes}, topics::DORA_COORDINATOR_PORT_DEFAULT, }; use dora_message::{uhlc, Metadata}; @@ -9,9 +8,8 @@ use eyre::{bail, eyre, Context}; use futures_concurrency::stream::Merge; use shared_memory::{Shmem, ShmemConf}; use std::{ - collections::{BTreeMap, HashMap}, + collections::HashMap, net::{Ipv4Addr, SocketAddr}, - path::PathBuf, }; use tokio::{ net::TcpStream, @@ -123,7 +121,7 @@ impl Daemon { ) -> eyre::Result<()> { match event { DaemonCoordinatorEvent::Spawn(SpawnDataflowNodes { dataflow_id, nodes }) => { - let node_tasks = match self.node_tasks.entry(dataflow_id.clone()) { + let node_tasks = match self.node_tasks.entry(dataflow_id) { std::collections::hash_map::Entry::Vacant(entry) => { entry.insert(Default::default()) } @@ -245,27 +243,6 @@ pub enum DaemonNodeEvent { }, } -#[derive(Debug, serde::Deserialize, serde::Serialize)] -pub enum DaemonCoordinatorEvent { - Spawn(SpawnDataflowNodes), -} - -type DataflowId = String; - -#[derive(Debug, serde::Deserialize, serde::Serialize)] -pub struct SpawnDataflowNodes { - pub dataflow_id: DataflowId, - pub nodes: BTreeMap, -} - -#[derive(Debug, serde::Deserialize, serde::Serialize)] -pub struct SpawnNodeParams { - pub node_id: NodeId, - pub node: descriptor::CustomNode, - pub envs: Option>, - pub working_dir: PathBuf, -} - type MessageId = String; fn set_up_tracing() -> eyre::Result<()> { diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index 6f6e6b4b..832ee2a0 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -1,6 +1,5 @@ -use crate::SpawnNodeParams; use dora_core::{ - daemon_messages::NodeConfig, + daemon_messages::{NodeConfig, SpawnNodeParams}, descriptor::{resolve_path, source_is_url}, }; use dora_download::download_file; @@ -15,7 +14,6 @@ pub async fn spawn_node( let SpawnNodeParams { node_id, node, - envs, working_dir, } = params; @@ -50,7 +48,7 @@ pub async fn spawn_node( // Injecting the env variable defined in the `yaml` into // the node runtime. - if let Some(envs) = envs { + if let Some(envs) = node.envs { for (key, value) in envs { command.env(key, value.to_string()); } diff --git a/libraries/core/src/daemon_messages.rs b/libraries/core/src/daemon_messages.rs index df3a849a..526e97f8 100644 --- a/libraries/core/src/daemon_messages.rs +++ b/libraries/core/src/daemon_messages.rs @@ -1,7 +1,13 @@ -use crate::config::{DataId, NodeId, NodeRunConfig}; +use std::{collections::BTreeMap, path::PathBuf}; + +use crate::{ + config::{DataId, NodeId, NodeRunConfig}, + descriptor, +}; use dora_message::Metadata; use eyre::Context; use shared_memory::{Shmem, ShmemConf}; +use uuid::Uuid; #[derive(Debug, serde::Serialize, serde::Deserialize)] pub struct NodeConfig { @@ -67,3 +73,23 @@ impl std::ops::Deref for MappedInputData { unsafe { self.memory.as_slice() } } } + +#[derive(Debug, serde::Deserialize, serde::Serialize)] +pub enum DaemonCoordinatorEvent { + Spawn(SpawnDataflowNodes), +} + +pub type DataflowId = Uuid; + +#[derive(Debug, serde::Deserialize, serde::Serialize)] +pub struct SpawnDataflowNodes { + pub dataflow_id: DataflowId, + pub nodes: BTreeMap, +} + +#[derive(Debug, serde::Deserialize, serde::Serialize)] +pub struct SpawnNodeParams { + pub node_id: NodeId, + pub node: descriptor::CustomNode, + pub working_dir: PathBuf, +} diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index d3ed0459..53f614a9 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -211,7 +211,7 @@ pub struct CustomNode { pub source: String, #[serde(default, skip_serializing_if = "Option::is_none")] pub args: Option, - pub working_directory: Option>, + pub envs: Option>, #[serde(default, skip_serializing_if = "Option::is_none")] pub build: Option,