Compare commits

...

1 Commits
main ... zenoh

Author SHA1 Message Date
  Philipp Oppermann 2a9cc1d7d9
Start implementing zenoh for daemon registration 1 year ago
3 changed files with 1116 additions and 157 deletions
Split View
  1. +1076
    -157
      Cargo.lock
  2. +1
    -0
      binaries/coordinator/Cargo.toml
  3. +39
    -0
      binaries/coordinator/src/lib.rs

+ 1076
- 157
Cargo.lock
File diff suppressed because it is too large
View File


+ 1
- 0
binaries/coordinator/Cargo.toml View File

@@ -28,3 +28,4 @@ names = "0.14.0"
ctrlc = "3.2.5"
log = { version = "0.4.21", features = ["serde"] }
dora-message = { workspace = true }
zenoh = "1.0.0"

+ 39
- 0
binaries/coordinator/src/lib.rs View File

@@ -44,6 +44,20 @@ pub async fn start(
bind_control: SocketAddr,
external_events: impl Stream<Item = Event> + Unpin,
) -> Result<(u16, impl Future<Output = eyre::Result<()>>), eyre::ErrReport> {
let zenoh_session = zenoh::open(zenoh::Config::default())
.await
.map_err(|e| eyre!(e))
.context("failed to open zenoh session")?;
let register_handler = zenoh_session
.declare_queryable("coordinator/register")
.await
.map_err(|e| eyre!(e))
.context("failed to declare register handler")?;
let register_stream = register_handler
.clone()
.into_stream()
.map(|query| Event::RegisterRequest(query));

let listener = listener::create_listener(bind).await?;
let port = listener
.local_addr()
@@ -68,6 +82,7 @@ pub async fn start(
new_daemon_connections,
control_events,
ctrlc_events,
register_stream,
)
.merge();

@@ -152,6 +167,29 @@ async fn start_inner(
tracing::trace!("Handling event {event:?}");
}
match event {
Event::RegisterRequest(query) => {
let parsed = serde_json::from_slice(
&query.payload().map(|p| p.to_bytes()).unwrap_or_default(),
);
let reply = match parsed {
Ok(request) => {
// TODO get machine ID `cat /etc/machine-id`, see https://stackoverflow.com/questions/10152762/best-way-to-get-machine-id-on-linux
// Windows: `wmic os get serialnumber`
// TODO map machine name to machine ID using config file
RegisterReply::Ok
}
Err(err) => RegisterReply::Err(format!("failed to deserialize request: {err}")),
};
let result = query
.reply(
"coordinator/register",
serde_json::to_vec(&reply).context("failed to serialize register reply")?,
)
.await;
if let Err(err) = result {
tracing::warn!("failed to reply to register request: {err}");
}
}
Event::NewDaemonConnection(connection) => {
connection.set_nodelay(true)?;
let events_tx = daemon_events_tx.clone();
@@ -986,6 +1024,7 @@ pub enum Event {
DaemonHeartbeatInterval,
CtrlC,
Log(LogMessage),
RegisterRequest(zenoh::query::Query),
}

impl Event {


Loading…
Cancel
Save