|
|
|
@@ -44,6 +44,20 @@ pub async fn start( |
|
|
|
bind_control: SocketAddr, |
|
|
|
external_events: impl Stream<Item = Event> + Unpin, |
|
|
|
) -> Result<(u16, impl Future<Output = eyre::Result<()>>), eyre::ErrReport> { |
|
|
|
let zenoh_session = zenoh::open(zenoh::Config::default()) |
|
|
|
.await |
|
|
|
.map_err(|e| eyre!(e)) |
|
|
|
.context("failed to open zenoh session")?; |
|
|
|
let register_handler = zenoh_session |
|
|
|
.declare_queryable("coordinator/register") |
|
|
|
.await |
|
|
|
.map_err(|e| eyre!(e)) |
|
|
|
.context("failed to declare register handler")?; |
|
|
|
let register_stream = register_handler |
|
|
|
.clone() |
|
|
|
.into_stream() |
|
|
|
.map(|query| Event::RegisterRequest(query)); |
|
|
|
|
|
|
|
let listener = listener::create_listener(bind).await?; |
|
|
|
let port = listener |
|
|
|
.local_addr() |
|
|
|
@@ -68,6 +82,7 @@ pub async fn start( |
|
|
|
new_daemon_connections, |
|
|
|
control_events, |
|
|
|
ctrlc_events, |
|
|
|
register_stream, |
|
|
|
) |
|
|
|
.merge(); |
|
|
|
|
|
|
|
@@ -152,6 +167,29 @@ async fn start_inner( |
|
|
|
tracing::trace!("Handling event {event:?}"); |
|
|
|
} |
|
|
|
match event { |
|
|
|
Event::RegisterRequest(query) => { |
|
|
|
let parsed = serde_json::from_slice( |
|
|
|
&query.payload().map(|p| p.to_bytes()).unwrap_or_default(), |
|
|
|
); |
|
|
|
let reply = match parsed { |
|
|
|
Ok(request) => { |
|
|
|
// TODO get machine ID `cat /etc/machine-id`, see https://stackoverflow.com/questions/10152762/best-way-to-get-machine-id-on-linux |
|
|
|
// Windows: `wmic os get serialnumber` |
|
|
|
// TODO map machine name to machine ID using config file |
|
|
|
RegisterReply::Ok |
|
|
|
} |
|
|
|
Err(err) => RegisterReply::Err(format!("failed to deserialize request: {err}")), |
|
|
|
}; |
|
|
|
let result = query |
|
|
|
.reply( |
|
|
|
"coordinator/register", |
|
|
|
serde_json::to_vec(&reply).context("failed to serialize register reply")?, |
|
|
|
) |
|
|
|
.await; |
|
|
|
if let Err(err) = result { |
|
|
|
tracing::warn!("failed to reply to register request: {err}"); |
|
|
|
} |
|
|
|
} |
|
|
|
Event::NewDaemonConnection(connection) => { |
|
|
|
connection.set_nodelay(true)?; |
|
|
|
let events_tx = daemon_events_tx.clone(); |
|
|
|
@@ -986,6 +1024,7 @@ pub enum Event { |
|
|
|
DaemonHeartbeatInterval, |
|
|
|
CtrlC, |
|
|
|
Log(LogMessage), |
|
|
|
RegisterRequest(zenoh::query::Query), |
|
|
|
} |
|
|
|
|
|
|
|
impl Event { |
|
|
|
|