You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

run.rs 8.8 kB

5 months ago
5 months ago
1 year ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. use dora_cli::session::DataflowSession;
  2. use dora_coordinator::{ControlEvent, Event};
  3. use dora_core::{
  4. descriptor::{DescriptorExt, read_as_descriptor},
  5. topics::{DORA_COORDINATOR_PORT_CONTROL_DEFAULT, DORA_COORDINATOR_PORT_DEFAULT},
  6. };
  7. use dora_message::{
  8. cli_to_coordinator::ControlRequest,
  9. common::DaemonId,
  10. coordinator_to_cli::{ControlRequestReply, DataflowIdAndName},
  11. };
  12. use dora_tracing::TracingBuilder;
  13. use eyre::{Context, bail};
  14. use std::{
  15. collections::BTreeSet,
  16. net::{IpAddr, Ipv4Addr, SocketAddr},
  17. path::Path,
  18. time::Duration,
  19. };
  20. use tokio::{
  21. sync::{
  22. mpsc::{self, Sender},
  23. oneshot,
  24. },
  25. task::JoinSet,
  26. };
  27. use tokio_stream::wrappers::ReceiverStream;
  28. use uuid::Uuid;
  29. #[tokio::main]
  30. async fn main() -> eyre::Result<()> {
  31. TracingBuilder::new("multiple-daemon-runner")
  32. .with_stdout("debug")
  33. .build()?;
  34. let root = Path::new(env!("CARGO_MANIFEST_DIR"));
  35. std::env::set_current_dir(root.join(file!()).parent().unwrap())
  36. .wrap_err("failed to set working dir")?;
  37. let dataflow = Path::new("dataflow.yml");
  38. build_dataflow(dataflow).await?;
  39. let (coordinator_events_tx, coordinator_events_rx) = mpsc::channel(1);
  40. let coordinator_bind = SocketAddr::new(
  41. IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
  42. DORA_COORDINATOR_PORT_DEFAULT,
  43. );
  44. let coordinator_control_bind = SocketAddr::new(
  45. IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
  46. DORA_COORDINATOR_PORT_CONTROL_DEFAULT,
  47. );
  48. let (coordinator_port, coordinator) = dora_coordinator::start(
  49. coordinator_bind,
  50. coordinator_control_bind,
  51. ReceiverStream::new(coordinator_events_rx),
  52. )
  53. .await?;
  54. tracing::info!("coordinator running on {coordinator_port}");
  55. let coordinator_addr = Ipv4Addr::LOCALHOST;
  56. let daemon_a = run_daemon(coordinator_addr.to_string(), "A");
  57. let daemon_b = run_daemon(coordinator_addr.to_string(), "B");
  58. tracing::info!("Spawning coordinator and daemons");
  59. let mut tasks = JoinSet::new();
  60. tasks.spawn(coordinator);
  61. tasks.spawn(daemon_b);
  62. tasks.spawn(daemon_a);
  63. tracing::info!("waiting until daemons are connected to coordinator");
  64. let mut retries = 0;
  65. loop {
  66. let connected_machines = connected_machines(&coordinator_events_tx).await?;
  67. if connected_machines
  68. .iter()
  69. .any(|id| id.matches_machine_id("A"))
  70. && connected_machines
  71. .iter()
  72. .any(|id| id.matches_machine_id("B"))
  73. {
  74. break;
  75. } else if retries > 20 {
  76. bail!("daemon not connected after {retries} retries");
  77. } else {
  78. std::thread::sleep(Duration::from_millis(500));
  79. retries += 1
  80. }
  81. }
  82. tracing::info!("starting dataflow");
  83. let uuid = start_dataflow(dataflow, &coordinator_events_tx).await?;
  84. tracing::info!("started dataflow under ID `{uuid}`");
  85. let running = running_dataflows(&coordinator_events_tx).await?;
  86. if !running.iter().map(|d| d.uuid).any(|id| id == uuid) {
  87. bail!("dataflow `{uuid}` is not running");
  88. }
  89. tracing::info!("waiting for dataflow `{uuid}` to finish");
  90. let mut retries = 0;
  91. loop {
  92. let running = running_dataflows(&coordinator_events_tx).await?;
  93. if running.is_empty() {
  94. break;
  95. } else if retries > 100 {
  96. bail!("dataflow not finished after {retries} retries");
  97. } else {
  98. tracing::debug!("not done yet");
  99. std::thread::sleep(Duration::from_millis(500));
  100. retries += 1
  101. }
  102. }
  103. tracing::info!("dataflow `{uuid}` finished, destroying coordinator");
  104. destroy(&coordinator_events_tx).await?;
  105. tracing::info!("joining tasks");
  106. while let Some(res) = tasks.join_next().await {
  107. res.unwrap()?;
  108. }
  109. tracing::info!("done");
  110. Ok(())
  111. }
  112. async fn start_dataflow(
  113. dataflow: &Path,
  114. coordinator_events_tx: &Sender<Event>,
  115. ) -> eyre::Result<Uuid> {
  116. let dataflow_descriptor = read_as_descriptor(dataflow)
  117. .await
  118. .wrap_err("failed to read yaml dataflow")?;
  119. let working_dir = dataflow
  120. .canonicalize()
  121. .context("failed to canonicalize dataflow path")?
  122. .parent()
  123. .ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))?
  124. .to_owned();
  125. dataflow_descriptor
  126. .check(&working_dir)
  127. .wrap_err("could not validate yaml")?;
  128. let dataflow_session =
  129. DataflowSession::read_session(dataflow).context("failed to read DataflowSession")?;
  130. let (reply_sender, reply) = oneshot::channel();
  131. coordinator_events_tx
  132. .send(Event::Control(ControlEvent::IncomingRequest {
  133. request: ControlRequest::Start {
  134. build_id: dataflow_session.build_id,
  135. session_id: dataflow_session.session_id,
  136. dataflow: dataflow_descriptor,
  137. local_working_dir: Some(working_dir),
  138. name: None,
  139. uv: false,
  140. },
  141. reply_sender,
  142. }))
  143. .await?;
  144. let result = reply.await??;
  145. let uuid = match result {
  146. ControlRequestReply::DataflowStartTriggered { uuid } => uuid,
  147. ControlRequestReply::Error(err) => bail!("{err}"),
  148. other => bail!("unexpected start dataflow reply: {other:?}"),
  149. };
  150. let (reply_sender, reply) = oneshot::channel();
  151. coordinator_events_tx
  152. .send(Event::Control(ControlEvent::IncomingRequest {
  153. request: ControlRequest::WaitForSpawn { dataflow_id: uuid },
  154. reply_sender,
  155. }))
  156. .await?;
  157. let result = reply.await??;
  158. let uuid = match result {
  159. ControlRequestReply::DataflowSpawned { uuid } => uuid,
  160. ControlRequestReply::Error(err) => bail!("{err}"),
  161. other => bail!("unexpected start dataflow reply: {other:?}"),
  162. };
  163. Ok(uuid)
  164. }
  165. async fn connected_machines(
  166. coordinator_events_tx: &Sender<Event>,
  167. ) -> eyre::Result<BTreeSet<DaemonId>> {
  168. let (reply_sender, reply) = oneshot::channel();
  169. coordinator_events_tx
  170. .send(Event::Control(ControlEvent::IncomingRequest {
  171. request: ControlRequest::ConnectedMachines,
  172. reply_sender,
  173. }))
  174. .await?;
  175. let result = reply.await??;
  176. let machines = match result {
  177. ControlRequestReply::ConnectedDaemons(machines) => machines,
  178. ControlRequestReply::Error(err) => bail!("{err}"),
  179. other => bail!("unexpected start dataflow reply: {other:?}"),
  180. };
  181. Ok(machines)
  182. }
  183. async fn running_dataflows(
  184. coordinator_events_tx: &Sender<Event>,
  185. ) -> eyre::Result<Vec<DataflowIdAndName>> {
  186. let (reply_sender, reply) = oneshot::channel();
  187. coordinator_events_tx
  188. .send(Event::Control(ControlEvent::IncomingRequest {
  189. request: ControlRequest::List,
  190. reply_sender,
  191. }))
  192. .await?;
  193. let result = reply.await??;
  194. let dataflows = match result {
  195. ControlRequestReply::DataflowList(list) => list.get_active(),
  196. ControlRequestReply::Error(err) => bail!("{err}"),
  197. other => bail!("unexpected start dataflow reply: {other:?}"),
  198. };
  199. Ok(dataflows)
  200. }
  201. async fn destroy(coordinator_events_tx: &Sender<Event>) -> eyre::Result<()> {
  202. let (reply_sender, reply) = oneshot::channel();
  203. coordinator_events_tx
  204. .send(Event::Control(ControlEvent::IncomingRequest {
  205. request: ControlRequest::Destroy,
  206. reply_sender,
  207. }))
  208. .await?;
  209. let result = reply.await??;
  210. match result {
  211. ControlRequestReply::DestroyOk => Ok(()),
  212. ControlRequestReply::Error(err) => bail!("{err}"),
  213. other => bail!("unexpected start dataflow reply: {other:?}"),
  214. }
  215. }
  216. async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> {
  217. let cargo = std::env::var("CARGO").unwrap();
  218. let mut cmd = tokio::process::Command::new(&cargo);
  219. cmd.arg("run");
  220. cmd.arg("--package").arg("dora-cli");
  221. cmd.arg("--release");
  222. cmd.arg("--").arg("build").arg(dataflow);
  223. if !cmd.status().await?.success() {
  224. bail!("failed to build dataflow");
  225. };
  226. Ok(())
  227. }
  228. async fn run_daemon(coordinator: String, machine_id: &str) -> eyre::Result<()> {
  229. let cargo = std::env::var("CARGO").unwrap();
  230. let mut cmd = tokio::process::Command::new(&cargo);
  231. cmd.arg("run");
  232. cmd.arg("--package").arg("dora-cli");
  233. cmd.arg("--release");
  234. cmd.arg("--")
  235. .arg("daemon")
  236. .arg("--machine-id")
  237. .arg(machine_id)
  238. .arg("--coordinator-addr")
  239. .arg(coordinator)
  240. .arg("--local-listen-port")
  241. .arg("9843"); // random port
  242. if !cmd.status().await?.success() {
  243. bail!("failed to run dataflow");
  244. };
  245. Ok(())
  246. }