You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

1 year ago
1 year ago

  1. use dora_coordinator::{ControlEvent, Event};
  2. use dora_core::{
  3. descriptor::Descriptor,
  4. topics::{
  5. ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_CONTROL_DEFAULT,
  6. DORA_COORDINATOR_PORT_DEFAULT,
  7. },
  8. };
  9. use dora_tracing::set_up_tracing;
  10. use eyre::{bail, Context};
  11. use std::{
  12. collections::BTreeSet,
  13. net::{IpAddr, Ipv4Addr, SocketAddr},
  14. path::Path,
  15. time::Duration,
  16. };
  17. use tokio::{
  18. sync::{
  19. mpsc::{self, Sender},
  20. oneshot,
  21. },
  22. task::JoinSet,
  23. };
  24. use tokio_stream::wrappers::ReceiverStream;
  25. use uuid::Uuid;
  26. #[tokio::main]
  27. async fn main() -> eyre::Result<()> {
  28. set_up_tracing("multiple-daemon-runner").wrap_err("failed to set up tracing subscriber")?;
  29. let root = Path::new(env!("CARGO_MANIFEST_DIR"));
  30. std::env::set_current_dir(root.join(file!()).parent().unwrap())
  31. .wrap_err("failed to set working dir")?;
  32. let dataflow = Path::new("dataflow.yml");
  33. build_dataflow(dataflow).await?;
  34. let (coordinator_events_tx, coordinator_events_rx) = mpsc::channel(1);
  35. let coordinator_bind = SocketAddr::new(
  36. IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
  37. DORA_COORDINATOR_PORT_DEFAULT,
  38. );
  39. let coordinator_control_bind = SocketAddr::new(
  40. IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
  41. DORA_COORDINATOR_PORT_CONTROL_DEFAULT,
  42. );
  43. let (coordinator_port, coordinator) = dora_coordinator::start(
  44. coordinator_bind,
  45. coordinator_control_bind,
  46. ReceiverStream::new(coordinator_events_rx),
  47. )
  48. .await?;
  49. let coordinator_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), coordinator_port);
  50. let daemon_a = run_daemon(coordinator_addr.to_string(), "A", 9843); // Random port
  51. let daemon_b = run_daemon(coordinator_addr.to_string(), "B", 9842);
  52. tracing::info!("Spawning coordinator and daemons");
  53. let mut tasks = JoinSet::new();
  54. tasks.spawn(coordinator);
  55. tasks.spawn(daemon_a);
  56. tasks.spawn(daemon_b);
  57. tracing::info!("waiting until daemons are connected to coordinator");
  58. let mut retries = 0;
  59. loop {
  60. let connected_machines = connected_machines(&coordinator_events_tx).await?;
  61. if connected_machines.contains("A") && connected_machines.contains("B") {
  62. break;
  63. } else if retries > 20 {
  64. bail!("daemon not connected after {retries} retries");
  65. } else {
  66. std::thread::sleep(Duration::from_millis(500));
  67. retries += 1
  68. }
  69. }
  70. tracing::info!("starting dataflow");
  71. let uuid = start_dataflow(dataflow, &coordinator_events_tx).await?;
  72. tracing::info!("started dataflow under ID `{uuid}`");
  73. let running = running_dataflows(&coordinator_events_tx).await?;
  74. if !running.iter().map(|d| d.uuid).any(|id| id == uuid) {
  75. bail!("dataflow `{uuid}` is not running");
  76. }
  77. tracing::info!("waiting for dataflow `{uuid}` to finish");
  78. let mut retries = 0;
  79. loop {
  80. let running = running_dataflows(&coordinator_events_tx).await?;
  81. if running.is_empty() {
  82. break;
  83. } else if retries > 100 {
  84. bail!("dataflow not finished after {retries} retries");
  85. } else {
  86. tracing::debug!("not done yet");
  87. std::thread::sleep(Duration::from_millis(500));
  88. retries += 1
  89. }
  90. }
  91. tracing::info!("dataflow `{uuid}` finished, destroying coordinator");
  92. destroy(&coordinator_events_tx).await?;
  93. tracing::info!("joining tasks");
  94. while let Some(res) = tasks.join_next().await {
  95. res.unwrap()?;
  96. }
  97. tracing::info!("done");
  98. Ok(())
  99. }
  100. async fn start_dataflow(
  101. dataflow: &Path,
  102. coordinator_events_tx: &Sender<Event>,
  103. ) -> eyre::Result<Uuid> {
  104. let dataflow_descriptor = Descriptor::read(dataflow)
  105. .await
  106. .wrap_err("failed to read yaml dataflow")?;
  107. let working_dir = dataflow
  108. .canonicalize()
  109. .context("failed to canonicalize dataflow path")?
  110. .parent()
  111. .ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))?
  112. .to_owned();
  113. dataflow_descriptor
  114. .check(&working_dir)
  115. .wrap_err("could not validate yaml")?;
  116. let (reply_sender, reply) = oneshot::channel();
  117. coordinator_events_tx
  118. .send(Event::Control(ControlEvent::IncomingRequest {
  119. request: ControlRequest::Start {
  120. dataflow: dataflow_descriptor,
  121. local_working_dir: working_dir,
  122. name: None,
  123. },
  124. reply_sender,
  125. }))
  126. .await?;
  127. let result = reply.await??;
  128. let uuid = match result {
  129. ControlRequestReply::DataflowStarted { uuid } => uuid,
  130. ControlRequestReply::Error(err) => bail!("{err}"),
  131. other => bail!("unexpected start dataflow reply: {other:?}"),
  132. };
  133. Ok(uuid)
  134. }
  135. async fn connected_machines(
  136. coordinator_events_tx: &Sender<Event>,
  137. ) -> eyre::Result<BTreeSet<String>> {
  138. let (reply_sender, reply) = oneshot::channel();
  139. coordinator_events_tx
  140. .send(Event::Control(ControlEvent::IncomingRequest {
  141. request: ControlRequest::ConnectedMachines,
  142. reply_sender,
  143. }))
  144. .await?;
  145. let result = reply.await??;
  146. let machines = match result {
  147. ControlRequestReply::ConnectedMachines(machines) => machines,
  148. ControlRequestReply::Error(err) => bail!("{err}"),
  149. other => bail!("unexpected start dataflow reply: {other:?}"),
  150. };
  151. Ok(machines)
  152. }
  153. async fn running_dataflows(coordinator_events_tx: &Sender<Event>) -> eyre::Result<Vec<DataflowId>> {
  154. let (reply_sender, reply) = oneshot::channel();
  155. coordinator_events_tx
  156. .send(Event::Control(ControlEvent::IncomingRequest {
  157. request: ControlRequest::List,
  158. reply_sender,
  159. }))
  160. .await?;
  161. let result = reply.await??;
  162. let dataflows = match result {
  163. ControlRequestReply::DataflowList(list) => list.get_active(),
  164. ControlRequestReply::Error(err) => bail!("{err}"),
  165. other => bail!("unexpected start dataflow reply: {other:?}"),
  166. };
  167. Ok(dataflows)
  168. }
  169. async fn destroy(coordinator_events_tx: &Sender<Event>) -> eyre::Result<()> {
  170. let (reply_sender, reply) = oneshot::channel();
  171. coordinator_events_tx
  172. .send(Event::Control(ControlEvent::IncomingRequest {
  173. request: ControlRequest::Destroy,
  174. reply_sender,
  175. }))
  176. .await?;
  177. let result = reply.await??;
  178. match result {
  179. ControlRequestReply::DestroyOk => Ok(()),
  180. ControlRequestReply::Error(err) => bail!("{err}"),
  181. other => bail!("unexpected start dataflow reply: {other:?}"),
  182. }
  183. }
  184. async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> {
  185. let cargo = std::env::var("CARGO").unwrap();
  186. let mut cmd = tokio::process::Command::new(&cargo);
  187. cmd.arg("run");
  188. cmd.arg("--package").arg("dora-cli");
  189. cmd.arg("--").arg("build").arg(dataflow);
  190. if !cmd.status().await?.success() {
  191. bail!("failed to build dataflow");
  192. };
  193. Ok(())
  194. }
  195. async fn run_daemon(
  196. coordinator: String,
  197. machine_id: &str,
  198. local_listen_port: u16,
  199. ) -> eyre::Result<()> {
  200. let cargo = std::env::var("CARGO").unwrap();
  201. let mut cmd = tokio::process::Command::new(&cargo);
  202. cmd.arg("run");
  203. cmd.arg("--package").arg("dora-cli");
  204. cmd.arg("--")
  205. .arg("daemon")
  206. .arg("--machine-id")
  207. .arg(machine_id)
  208. .arg("--coordinator-addr")
  209. .arg(coordinator)
  210. .arg("--local-listen-port")
  211. .arg(local_listen_port.to_string());
  212. if !cmd.status().await?.success() {
  213. bail!("failed to run dataflow");
  214. };
  215. Ok(())
  216. }