Browse Source

Comment out uses of communication layer in coordinator for now

tags/v0.2.0-candidate
Philipp Oppermann 3 years ago
parent
commit
1d2d71b15a
Failed to extract signature
3 changed files with 26 additions and 28 deletions
  1. +14
    -13
      binaries/coordinator/src/lib.rs
  2. +11
    -11
      binaries/coordinator/src/run/mod.rs
  3. +1
    -4
      binaries/runtime/Cargo.toml

+ 14
- 13
binaries/coordinator/src/lib.rs View File

@@ -7,7 +7,6 @@ use dora_core::{
StopDataflowResult,
},
};
use dora_node_api::{communication, manual_stop_publisher};
use eyre::{bail, eyre, WrapErr};
use futures::StreamExt;
use futures_concurrency::stream::Merge;
@@ -259,18 +258,20 @@ async fn stop_dataflow(
Some(dataflow) => dataflow.communication_config.clone(),
None => bail!("No running dataflow found with UUID `{uuid}`"),
};
let mut communication =
tokio::task::spawn_blocking(move || communication::init(&communication_config))
.await
.wrap_err("failed to join communication layer init task")?
.wrap_err("failed to init communication layer")?;
tracing::info!("sending stop message to dataflow `{uuid}`");
let manual_stop_publisher = manual_stop_publisher(communication.as_mut())?;
tokio::task::spawn_blocking(move || manual_stop_publisher())
.await
.wrap_err("failed to join stop publish task")?
.map_err(|err| eyre!(err))
.wrap_err("failed to send stop message")?;

todo!();
// let mut communication =
// tokio::task::spawn_blocking(move || communication::init(&communication_config))
// .await
// .wrap_err("failed to join communication layer init task")?
// .wrap_err("failed to init communication layer")?;
// tracing::info!("sending stop message to dataflow `{uuid}`");
// let manual_stop_publisher = manual_stop_publisher(communication.as_mut())?;
// tokio::task::spawn_blocking(move || manual_stop_publisher())
// .await
// .wrap_err("failed to join stop publish task")?
// .map_err(|err| eyre!(err))
// .wrap_err("failed to send stop message")?;
Ok(())
}



+ 11
- 11
binaries/coordinator/src/run/mod.rs View File

@@ -3,7 +3,6 @@ use dora_core::{
config::{format_duration, CommunicationConfig, NodeId},
descriptor::{self, collect_dora_timers, CoreNodeKind, Descriptor},
};
use dora_node_api::communication;
use eyre::{bail, eyre, WrapErr};
use futures::{stream::FuturesUnordered, StreamExt};
use std::{env::consts::EXE_EXTENSION, path::Path};
@@ -93,11 +92,11 @@ pub async fn spawn_dataflow(runtime: &Path, dataflow_path: &Path) -> eyre::Resul
}
for interval in dora_timers {
let communication_config = communication_config.clone();
let mut communication =
tokio::task::spawn_blocking(move || communication::init(&communication_config))
.await
.wrap_err("failed to join communication layer init task")?
.wrap_err("failed to init communication layer")?;
// let mut communication =
// tokio::task::spawn_blocking(move || communication::init(&communication_config))
// .await
// .wrap_err("failed to join communication layer init task")?
// .wrap_err("failed to init communication layer")?;
tokio::spawn(async move {
let topic = {
let duration = format_duration(interval);
@@ -108,11 +107,12 @@ pub async fn spawn_dataflow(runtime: &Path, dataflow_path: &Path) -> eyre::Resul
while (stream.next().await).is_some() {
let metadata = dora_message::Metadata::new(hlc.new_timestamp());
let data = metadata.serialize().unwrap();
communication
.publisher(&topic)
.unwrap()
.publish(&data)
.expect("failed to publish timer tick message");
// communication
// .publisher(&topic)
// .unwrap()
// .publish(&data)
// .expect("failed to publish timer tick message");
todo!()
}
});
}


+ 1
- 4
binaries/runtime/Cargo.toml View File

@@ -8,10 +8,7 @@ license = "Apache-2.0"

[dependencies]
clap = { version = "3.1.12", features = ["derive"] }
dora-node-api = { path = "../../apis/rust/node", default-features = false, features = [
"zenoh",
"iceoryx",
] }
dora-node-api = { path = "../../apis/rust/node", default-features = false }
dora-operator-api-python = { path = "../../apis/python/operator" }
dora-operator-api-types = { path = "../../apis/rust/operator/types" }
dora-core = { version = "0.1.0", path = "../../libraries/core" }


Loading…
Cancel
Save