Browse Source

Update dora-coordinator to start dataflows through dora-daemon

tags/v0.2.0-candidate
Philipp Oppermann 3 years ago
parent
commit
cc1dc971ed
Failed to extract signature
7 changed files with 91 additions and 167 deletions
  1. +6
    -2
      binaries/coordinator/src/lib.rs
  2. +0
    -96
      binaries/coordinator/src/run/custom.rs
  3. +52
    -37
      binaries/coordinator/src/run/mod.rs
  4. +3
    -26
      binaries/daemon/src/main.rs
  5. +2
    -4
      binaries/daemon/src/spawn.rs
  6. +27
    -1
      libraries/core/src/daemon_messages.rs
  7. +1
    -1
      libraries/core/src/descriptor/mod.rs

+ 6
- 2
binaries/coordinator/src/lib.rs View File

@@ -48,10 +48,12 @@ pub async fn run(args: Args) -> eyre::Result<()> {
.with_file_name("dora-runtime")
});

let daemon_connections = &mut HashMap::new(); // TODO

match run_dataflow {
Some(path) => {
// start the given dataflow directly
run::run_dataflow(&path, &runtime_path)
run::run_dataflow(&path, &runtime_path, daemon_connections)
.await
.wrap_err_with(|| format!("failed to run dataflow at {}", path.display()))?;
}
@@ -168,6 +170,7 @@ async fn start(runtime_path: &Path) -> eyre::Result<()> {
name,
runtime_path,
&dataflow_events_tx,
&mut daemon_connections,
)
.await?;
Ok(dataflow)
@@ -331,6 +334,7 @@ async fn start_dataflow(
name: Option<String>,
runtime_path: &Path,
dataflow_events_tx: &Option<tokio::sync::mpsc::Sender<Event>>,
daemon_connections: &mut HashMap<String, TcpStream>,
) -> eyre::Result<RunningDataflow> {
// TODO: send Spawn message to daemon

@@ -343,7 +347,7 @@ async fn start_dataflow(
uuid,
communication_config,
tasks,
} = spawn_dataflow(&runtime_path, path).await?;
} = spawn_dataflow(&runtime_path, path, daemon_connections).await?;
let path = path.to_owned();
let task = async move {
let result = await_tasks(tasks)


+ 0
- 96
binaries/coordinator/src/run/custom.rs View File

@@ -1,96 +0,0 @@
use super::command_init_common_env;
use dora_core::{
config::NodeId,
descriptor::{self, resolve_path, source_is_url, EnvValue},
};
use dora_download::download_file;
use eyre::{bail, eyre, WrapErr};
use std::{collections::BTreeMap, env::consts::EXE_EXTENSION, path::Path};

const SHELL_SOURCE: &str = "shell";

#[tracing::instrument]
pub(super) async fn spawn_custom_node(
node_id: NodeId,
node: &descriptor::CustomNode,
envs: &Option<BTreeMap<String, EnvValue>>,
communication: &dora_core::config::CommunicationConfig,
working_dir: &Path,
) -> eyre::Result<tokio::task::JoinHandle<eyre::Result<(), eyre::Error>>> {
let resolved_path = if source_is_url(&node.source) {
// try to download the shared library
let target_path = Path::new("build")
.join(node_id.to_string())
.with_extension(EXE_EXTENSION);
download_file(&node.source, &target_path)
.await
.wrap_err("failed to download custom node")?;
Ok(target_path.clone())
} else {
resolve_path(&node.source, working_dir)
};

let mut command = if let Ok(path) = &resolved_path {
let mut command = tokio::process::Command::new(path);
if let Some(args) = &node.args {
command.args(args.split_ascii_whitespace());
}
command
} else if node.source == SHELL_SOURCE {
if cfg!(target_os = "windows") {
let mut cmd = tokio::process::Command::new("cmd");
cmd.args(["/C", &node.args.clone().unwrap_or_default()]);
cmd
} else {
let mut cmd = tokio::process::Command::new("sh");
cmd.args(["-c", &node.args.clone().unwrap_or_default()]);
cmd
}
} else {
bail!("could not understand node source: {}", node.source);
};

command_init_common_env(&mut command, &node_id, communication)?;
command.env(
"DORA_NODE_RUN_CONFIG",
serde_yaml::to_string(&node.run_config)
.wrap_err("failed to serialize custom node run config")?,
);
command.current_dir(working_dir);

// Injecting the env variable defined in the `yaml` into
// the node runtime.
if let Some(envs) = envs {
for (key, value) in envs {
command.env(key, value.to_string());
}
}

let mut child = command.spawn().wrap_err_with(|| {
if let Ok(path) = resolved_path {
format!(
"failed to run source path: `{}` with args `{}`",
path.display(),
node.args.as_deref().unwrap_or_default()
)
} else {
format!(
"failed to run command: `{}` with args `{}`",
node.source,
node.args.as_deref().unwrap_or_default()
)
}
})?;
let result = tokio::spawn(async move {
let status = child.wait().await.context("child process failed")?;
if status.success() {
tracing::info!("node {node_id} finished");
Ok(())
} else if let Some(code) = status.code() {
Err(eyre!("node {node_id} failed with exit code: {code}"))
} else {
Err(eyre!("node {node_id} failed (unknown exit code)"))
}
});
Ok(result)
}

+ 52
- 37
binaries/coordinator/src/run/mod.rs View File

@@ -1,23 +1,40 @@
use self::{custom::spawn_custom_node, runtime::spawn_runtime_node};
use crate::tcp_utils::tcp_send;

use self::runtime::spawn_runtime_node;
use dora_core::{
config::{format_duration, CommunicationConfig, NodeId},
daemon_messages::{DaemonCoordinatorEvent, SpawnDataflowNodes, SpawnNodeParams},
descriptor::{self, collect_dora_timers, CoreNodeKind, Descriptor},
};
use eyre::{bail, eyre, WrapErr};
use eyre::{bail, eyre, ContextCompat, WrapErr};
use futures::{stream::FuturesUnordered, StreamExt};
use std::{env::consts::EXE_EXTENSION, path::Path};
use std::{
collections::{BTreeMap, HashMap},
env::consts::EXE_EXTENSION,
path::Path,
};
use tokio::net::TcpStream;
use tokio_stream::wrappers::IntervalStream;
use uuid::Uuid;

mod custom;
mod runtime;

pub async fn run_dataflow(dataflow_path: &Path, runtime: &Path) -> eyre::Result<()> {
let tasks = spawn_dataflow(runtime, dataflow_path).await?.tasks;
pub async fn run_dataflow(
dataflow_path: &Path,
runtime: &Path,
daemon_connections: &mut HashMap<String, TcpStream>,
) -> eyre::Result<()> {
let tasks = spawn_dataflow(runtime, dataflow_path, daemon_connections)
.await?
.tasks;
await_tasks(tasks).await
}

pub async fn spawn_dataflow(runtime: &Path, dataflow_path: &Path) -> eyre::Result<SpawnedDataflow> {
pub async fn spawn_dataflow(
runtime: &Path,
dataflow_path: &Path,
daemon_connections: &mut HashMap<String, TcpStream>,
) -> eyre::Result<SpawnedDataflow> {
let mut runtime = runtime.with_extension(EXE_EXTENSION);
let descriptor = read_descriptor(dataflow_path).await.wrap_err_with(|| {
format!(
@@ -57,39 +74,37 @@ pub async fn spawn_dataflow(runtime: &Path, dataflow_path: &Path) -> eyre::Resul
}
}
}
let tasks = FuturesUnordered::new();
for node in nodes {
let node_id = node.id.clone();

let mut custom_nodes = BTreeMap::new();
for node in nodes {
match node.kind {
descriptor::CoreNodeKind::Custom(custom) => {
let result = spawn_custom_node(
node_id.clone(),
&custom,
&node.env,
&communication_config,
&working_dir,
)
.await
.wrap_err_with(|| format!("failed to spawn custom node {node_id}"))?;
tasks.push(result);
}
descriptor::CoreNodeKind::Runtime(runtime_node) => {
if !runtime_node.operators.is_empty() {
let result = spawn_runtime_node(
&runtime,
node_id.clone(),
&runtime_node,
&node.env,
&communication_config,
&working_dir,
)
.wrap_err_with(|| format!("failed to spawn runtime node {node_id}"))?;
tasks.push(result);
}
CoreNodeKind::Runtime(_) => todo!(),
CoreNodeKind::Custom(n) => {
custom_nodes.insert(
node.id.clone(),
SpawnNodeParams {
node_id: node.id,
node: n,
working_dir: working_dir.clone(),
},
);
}
}
}

let spawn_command = SpawnDataflowNodes {
dataflow_id: uuid,
nodes: custom_nodes,
};
let message = serde_json::to_vec(&DaemonCoordinatorEvent::Spawn(spawn_command))?;
let daemon_connection = daemon_connections
.get_mut("")
.wrap_err("no daemon connection")?; // TODO: take from dataflow spec
tcp_send(daemon_connection, &message)
.await
.wrap_err("failed to send spawn message to daemon")?;

// TODO
for interval in dora_timers {
let communication_config = communication_config.clone();
// let mut communication =
@@ -112,12 +127,12 @@ pub async fn spawn_dataflow(runtime: &Path, dataflow_path: &Path) -> eyre::Resul
// .unwrap()
// .publish(&data)
// .expect("failed to publish timer tick message");
todo!()
// todo!()
}
});
}
Ok(SpawnedDataflow {
tasks,
tasks: FuturesUnordered::new(), // TODO
communication_config,
uuid,
})


+ 3
- 26
binaries/daemon/src/main.rs View File

@@ -1,7 +1,6 @@
use dora_core::{
config::{DataId, NodeId},
daemon_messages::{self, ControlReply},
descriptor,
daemon_messages::{self, ControlReply, DaemonCoordinatorEvent, DataflowId, SpawnDataflowNodes},
topics::DORA_COORDINATOR_PORT_DEFAULT,
};
use dora_message::{uhlc, Metadata};
@@ -9,9 +8,8 @@ use eyre::{bail, eyre, Context};
use futures_concurrency::stream::Merge;
use shared_memory::{Shmem, ShmemConf};
use std::{
collections::{BTreeMap, HashMap},
collections::HashMap,
net::{Ipv4Addr, SocketAddr},
path::PathBuf,
};
use tokio::{
net::TcpStream,
@@ -123,7 +121,7 @@ impl Daemon {
) -> eyre::Result<()> {
match event {
DaemonCoordinatorEvent::Spawn(SpawnDataflowNodes { dataflow_id, nodes }) => {
let node_tasks = match self.node_tasks.entry(dataflow_id.clone()) {
let node_tasks = match self.node_tasks.entry(dataflow_id) {
std::collections::hash_map::Entry::Vacant(entry) => {
entry.insert(Default::default())
}
@@ -245,27 +243,6 @@ pub enum DaemonNodeEvent {
},
}

#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub enum DaemonCoordinatorEvent {
Spawn(SpawnDataflowNodes),
}

type DataflowId = String;

#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub struct SpawnDataflowNodes {
pub dataflow_id: DataflowId,
pub nodes: BTreeMap<NodeId, SpawnNodeParams>,
}

#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub struct SpawnNodeParams {
pub node_id: NodeId,
pub node: descriptor::CustomNode,
pub envs: Option<BTreeMap<String, descriptor::EnvValue>>,
pub working_dir: PathBuf,
}

type MessageId = String;

fn set_up_tracing() -> eyre::Result<()> {


+ 2
- 4
binaries/daemon/src/spawn.rs View File

@@ -1,6 +1,5 @@
use crate::SpawnNodeParams;
use dora_core::{
daemon_messages::NodeConfig,
daemon_messages::{NodeConfig, SpawnNodeParams},
descriptor::{resolve_path, source_is_url},
};
use dora_download::download_file;
@@ -15,7 +14,6 @@ pub async fn spawn_node(
let SpawnNodeParams {
node_id,
node,
envs,
working_dir,
} = params;

@@ -50,7 +48,7 @@ pub async fn spawn_node(

// Injecting the env variable defined in the `yaml` into
// the node runtime.
if let Some(envs) = envs {
if let Some(envs) = node.envs {
for (key, value) in envs {
command.env(key, value.to_string());
}


+ 27
- 1
libraries/core/src/daemon_messages.rs View File

@@ -1,7 +1,13 @@
use crate::config::{DataId, NodeId, NodeRunConfig};
use std::{collections::BTreeMap, path::PathBuf};

use crate::{
config::{DataId, NodeId, NodeRunConfig},
descriptor,
};
use dora_message::Metadata;
use eyre::Context;
use shared_memory::{Shmem, ShmemConf};
use uuid::Uuid;

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct NodeConfig {
@@ -67,3 +73,23 @@ impl std::ops::Deref for MappedInputData {
unsafe { self.memory.as_slice() }
}
}

#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub enum DaemonCoordinatorEvent {
Spawn(SpawnDataflowNodes),
}

pub type DataflowId = Uuid;

#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub struct SpawnDataflowNodes {
pub dataflow_id: DataflowId,
pub nodes: BTreeMap<NodeId, SpawnNodeParams>,
}

#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub struct SpawnNodeParams {
pub node_id: NodeId,
pub node: descriptor::CustomNode,
pub working_dir: PathBuf,
}

+ 1
- 1
libraries/core/src/descriptor/mod.rs View File

@@ -211,7 +211,7 @@ pub struct CustomNode {
pub source: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub args: Option<String>,
pub working_directory: Option<BTreeMap<String, EnvValue>>,
pub envs: Option<BTreeMap<String, EnvValue>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub build: Option<String>,



Loading…
Cancel
Save