Browse Source

Merge bb3caab788 into 77c277910b

pull/1095/merge
Takafumi Watanabe GitHub 5 months ago
parent
commit
4e09b4f4a2
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
16 changed files with 5983 additions and 7 deletions
  1. +392
    -7
      Cargo.lock
  2. +1
    -0
      Cargo.toml
  3. +1
    -0
      node-hub/dora-gst-webrtc-sink/.gitignore
  4. +4337
    -0
      node-hub/dora-gst-webrtc-sink/Cargo.lock
  5. +31
    -0
      node-hub/dora-gst-webrtc-sink/Cargo.toml
  6. +228
    -0
      node-hub/dora-gst-webrtc-sink/README.md
  7. +1
    -0
      node-hub/dora-gst-webrtc-sink/example/.gitignore
  8. +49
    -0
      node-hub/dora-gst-webrtc-sink/example/README.md
  9. +46
    -0
      node-hub/dora-gst-webrtc-sink/example/dataflow.yml
  10. +45
    -0
      node-hub/dora-gst-webrtc-sink/example/run_demo.sh
  11. +4
    -0
      node-hub/dora-gst-webrtc-sink/src/lib.rs
  12. +102
    -0
      node-hub/dora-gst-webrtc-sink/src/main.rs
  13. +150
    -0
      node-hub/dora-gst-webrtc-sink/src/peer_connection.rs
  14. +21
    -0
      node-hub/dora-gst-webrtc-sink/src/signaling.rs
  15. +76
    -0
      node-hub/dora-gst-webrtc-sink/src/video_source_manager.rs
  16. +499
    -0
      node-hub/dora-gst-webrtc-sink/src/webrtc_server.rs

+ 392
- 7
Cargo.lock View File

@@ -1032,6 +1032,12 @@ version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0"

[[package]]
name = "atomic_refcell"
version = "0.1.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41e67cd8309bbd06cd603a9e693a784ac2e5d1e955f11286e355089fcab3047c"

[[package]]
name = "atomig"
version = "0.4.2"
@@ -1802,7 +1808,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d067ad48b8650848b989a59a86c6c36a995d02d2bf778d45c3c5d57bc2718f02"
dependencies = [
"smallvec",
"target-lexicon",
"target-lexicon 0.12.16",
]

[[package]]
name = "cfg-expr"
version = "0.20.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d0390889d58f934f01cd49736275b4c2da15bcfc328c78ff2349907e6cabf22"
dependencies = [
"smallvec",
"target-lexicon 0.13.2",
]

[[package]]
@@ -2670,7 +2686,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ecb1c5e8f4dc438eedc1b534a54672fb0e0a56035dae6b50162787bd2c50e95"
dependencies = [
"libc",
"system-deps",
"system-deps 6.2.2",
]

[[package]]
@@ -3105,6 +3121,30 @@ dependencies = [
"uuid 1.16.0",
]

[[package]]
name = "dora-gst-webrtc-sink"
version = "0.3.12"
dependencies = [
"anyhow",
"arrow",
"dora-node-api",
"env_logger 0.11.6",
"eyre",
"futures",
"glib",
"gstreamer",
"gstreamer-app",
"gstreamer-sdp",
"gstreamer-webrtc",
"log",
"serde",
"serde_json",
"tokio",
"tokio-stream",
"uuid 1.16.0",
"warp",
]

[[package]]
name = "dora-kit-car"
version = "0.3.12"
@@ -4747,6 +4787,19 @@ version = "0.31.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f"

[[package]]
name = "gio-sys"
version = "0.20.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "521e93a7e56fc89e84aea9a52cfc9436816a4b363b030260b699950ff1336c83"
dependencies = [
"glib-sys",
"gobject-sys",
"libc",
"system-deps 7.0.5",
"windows-sys 0.59.0",
]

[[package]]
name = "git-version"
version = "0.3.9"
@@ -4803,6 +4856,50 @@ dependencies = [
"serde",
]

[[package]]
name = "glib"
version = "0.20.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ffc4b6e352d4716d84d7dde562dd9aee2a7d48beb872dd9ece7f2d1515b2d683"
dependencies = [
"bitflags 2.9.0",
"futures-channel",
"futures-core",
"futures-executor",
"futures-task",
"futures-util",
"gio-sys",
"glib-macros",
"glib-sys",
"gobject-sys",
"libc",
"memchr",
"smallvec",
]

[[package]]
name = "glib-macros"
version = "0.20.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8084af62f09475a3f529b1629c10c429d7600ee1398ae12dd3bf175d74e7145"
dependencies = [
"heck 0.5.0",
"proc-macro-crate",
"proc-macro2",
"quote",
"syn 2.0.101",
]

[[package]]
name = "glib-sys"
version = "0.20.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ab79e1ed126803a8fb827e3de0e2ff95191912b8db65cee467edb56fc4cc215"
dependencies = [
"libc",
"system-deps 7.0.5",
]

[[package]]
name = "glob"
version = "0.3.2"
@@ -4951,6 +5048,17 @@ dependencies = [
"gl_generator",
]

[[package]]
name = "gobject-sys"
version = "0.20.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec9aca94bb73989e3cfdbf8f2e0f1f6da04db4d291c431f444838925c4c63eda"
dependencies = [
"glib-sys",
"libc",
"system-deps 7.0.5",
]

[[package]]
name = "gpu-alloc"
version = "0.6.0"
@@ -5004,6 +5112,147 @@ dependencies = [
"winapi-util",
]

[[package]]
name = "gstreamer"
version = "0.23.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8757a87f3706560037a01a9f06a59fcc7bdb0864744dcf73546606e60c4316e1"
dependencies = [
"cfg-if 1.0.0",
"futures-channel",
"futures-core",
"futures-util",
"glib",
"gstreamer-sys",
"itertools 0.14.0",
"libc",
"muldiv",
"num-integer",
"num-rational",
"once_cell",
"option-operations",
"paste",
"pin-project-lite",
"smallvec",
"thiserror 2.0.12",
]

[[package]]
name = "gstreamer-app"
version = "0.23.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e9a883eb21aebcf1289158225c05f7aea5da6ecf71fa7f0ff1ce4d25baf004e"
dependencies = [
"futures-core",
"futures-sink",
"glib",
"gstreamer",
"gstreamer-app-sys",
"gstreamer-base",
"libc",
]

[[package]]
name = "gstreamer-app-sys"
version = "0.23.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94f7ef838306fe51852d503a14dc79ac42de005a59008a05098de3ecdaf05455"
dependencies = [
"glib-sys",
"gstreamer-base-sys",
"gstreamer-sys",
"libc",
"system-deps 7.0.5",
]

[[package]]
name = "gstreamer-base"
version = "0.23.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f19a74fd04ffdcb847dd322640f2cf520897129d00a7bcb92fd62a63f3e27404"
dependencies = [
"atomic_refcell",
"cfg-if 1.0.0",
"glib",
"gstreamer",
"gstreamer-base-sys",
"libc",
]

[[package]]
name = "gstreamer-base-sys"
version = "0.23.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87f2fb0037b6d3c5b51f60dea11e667910f33be222308ca5a101450018a09840"
dependencies = [
"glib-sys",
"gobject-sys",
"gstreamer-sys",
"libc",
"system-deps 7.0.5",
]

[[package]]
name = "gstreamer-sdp"
version = "0.23.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57f94ab92cb1dbd6d00e41208ab463b5fbce3eca65a4c9710585fede015a9d65"
dependencies = [
"glib",
"gstreamer",
"gstreamer-sdp-sys",
]

[[package]]
name = "gstreamer-sdp-sys"
version = "0.23.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "de44d5e90138ac1786a6418a38c73d9a78ee0d15680129f09f91df5309d658e0"
dependencies = [
"glib-sys",
"gstreamer-sys",
"libc",
"system-deps 7.0.5",
]

[[package]]
name = "gstreamer-sys"
version = "0.23.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "feea73b4d92dbf9c24a203c9cd0bcc740d584f6b5960d5faf359febf288919b2"
dependencies = [
"glib-sys",
"gobject-sys",
"libc",
"system-deps 7.0.5",
]

[[package]]
name = "gstreamer-webrtc"
version = "0.23.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c475e2fa45c6c14b971e2ac40e7bae035f19592cac68c391d12eb659fd1722b"
dependencies = [
"glib",
"gstreamer",
"gstreamer-sdp",
"gstreamer-webrtc-sys",
"libc",
]

[[package]]
name = "gstreamer-webrtc-sys"
version = "0.23.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c0ce6dd5e17757933233bf3fce2226eb2e8c06ec2325c2459a1022ae1d7d279"
dependencies = [
"glib-sys",
"gstreamer-sdp-sys",
"gstreamer-sys",
"libc",
"system-deps 7.0.5",
]

[[package]]
name = "h2"
version = "0.3.26"
@@ -5083,6 +5332,30 @@ dependencies = [
"foldhash",
]

[[package]]
name = "headers"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06683b93020a07e3dbcf5f8c0f6d40080d725bea7936fc01ad345c01b97dc270"
dependencies = [
"base64 0.21.7",
"bytes",
"headers-core",
"http 0.2.12",
"httpdate",
"mime",
"sha1",
]

[[package]]
name = "headers-core"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429"
dependencies = [
"http 0.2.12",
]

[[package]]
name = "heck"
version = "0.3.3"
@@ -7012,6 +7285,30 @@ dependencies = [
"syn 2.0.101",
]

[[package]]
name = "muldiv"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "956787520e75e9bd233246045d19f42fb73242759cc57fba9611d940ae96d4b0"

[[package]]
name = "multer"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "01acbdc23469fd8fe07ab135923371d5f5a422fbf9c522158677c8eb15bc51c2"
dependencies = [
"bytes",
"encoding_rs",
"futures-util",
"http 0.2.12",
"httparse",
"log",
"memchr",
"mime",
"spin 0.9.8",
"version_check",
]

[[package]]
name = "multiple-daemons-example-node"
version = "0.3.12"
@@ -8095,6 +8392,15 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d"

[[package]]
name = "option-operations"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c26d27bb1aeab65138e4bf7666045169d1717febcc9ff870166be8348b223d0"
dependencies = [
"paste",
]

[[package]]
name = "orbclient"
version = "0.3.48"
@@ -8963,7 +9269,7 @@ checksum = "94f6cbe86ef3bf18998d9df6e0f3fc1050a8c5efa409bf712e661a4366e010fb"
dependencies = [
"once_cell",
"python3-dll-a",
"target-lexicon",
"target-lexicon 0.12.16",
]

[[package]]
@@ -9368,7 +9674,7 @@ dependencies = [
"serde-big-array",
"signal-hook",
"simd_helpers",
"system-deps",
"system-deps 6.2.2",
"thiserror 1.0.69",
"toml",
"v_frame",
@@ -13218,7 +13524,20 @@ version = "6.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3e535eb8dded36d55ec13eddacd30dec501792ff23a0b1682c38601b8cf2349"
dependencies = [
"cfg-expr",
"cfg-expr 0.15.8",
"heck 0.5.0",
"pkg-config",
"toml",
"version-compare",
]

[[package]]
name = "system-deps"
version = "7.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e4be53aa0cba896d2dc615bd42bbc130acdcffa239e0a2d965ea5b3b2a86ffdb"
dependencies = [
"cfg-expr 0.20.1",
"heck 0.5.0",
"pkg-config",
"toml",
@@ -13257,6 +13576,12 @@ version = "0.12.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61c41af27dd6d1e27b1b16b489db798443478cef1f06a660c96db617ba5de3b1"

[[package]]
name = "target-lexicon"
version = "0.13.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e502f78cdbb8ba4718f566c418c52bc729126ffd16baee5baa718cf25dd5a69a"

[[package]]
name = "tempfile"
version = "3.19.1"
@@ -13660,6 +13985,18 @@ dependencies = [
"tokio-util",
]

[[package]]
name = "tokio-tungstenite"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c83b561d025642014097b66e6c1bb422783339e0909e4429cde4749d1990bc38"
dependencies = [
"futures-util",
"log",
"tokio",
"tungstenite 0.21.0",
]

[[package]]
name = "tokio-tungstenite"
version = "0.24.0"
@@ -13669,7 +14006,7 @@ dependencies = [
"futures-util",
"log",
"tokio",
"tungstenite",
"tungstenite 0.24.0",
]

[[package]]
@@ -14011,6 +14348,25 @@ version = "0.25.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2df906b07856748fa3f6e0ad0cbaa047052d4a7dd609e231c4f72cee8c36f31"

[[package]]
name = "tungstenite"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1"
dependencies = [
"byteorder",
"bytes",
"data-encoding",
"http 1.3.1",
"httparse",
"log",
"rand 0.8.5",
"sha1",
"thiserror 1.0.69",
"url",
"utf-8",
]

[[package]]
name = "tungstenite"
version = "0.24.0"
@@ -14520,6 +14876,35 @@ dependencies = [
"try-lock",
]

[[package]]
name = "warp"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4378d202ff965b011c64817db11d5829506d3404edeadb61f190d111da3f231c"
dependencies = [
"bytes",
"futures-channel",
"futures-util",
"headers",
"http 0.2.12",
"hyper 0.14.32",
"log",
"mime",
"mime_guess",
"multer",
"percent-encoding",
"pin-project",
"scoped-tls",
"serde",
"serde_json",
"serde_urlencoded",
"tokio",
"tokio-tungstenite 0.21.0",
"tokio-util",
"tower-service",
"tracing",
]

[[package]]
name = "wasi"
version = "0.11.0+wasi-snapshot-preview1"
@@ -16646,7 +17031,7 @@ dependencies = [
"async-trait",
"futures-util",
"tokio",
"tokio-tungstenite",
"tokio-tungstenite 0.24.0",
"tokio-util",
"tracing",
"url",


+ 1
- 0
Cargo.toml View File

@@ -40,6 +40,7 @@ members = [
"node-hub/dora-rav1e",
"node-hub/dora-dav1d",
"node-hub/dora-rustypot",
"node-hub/dora-gst-webrtc-sink",
"libraries/extensions/ros2-bridge",
"libraries/extensions/ros2-bridge/msg-gen",
"libraries/extensions/ros2-bridge/python",


+ 1
- 0
node-hub/dora-gst-webrtc-sink/.gitignore View File

@@ -0,0 +1 @@
/target

+ 4337
- 0
node-hub/dora-gst-webrtc-sink/Cargo.lock
File diff suppressed because it is too large
View File


+ 31
- 0
node-hub/dora-gst-webrtc-sink/Cargo.toml View File

@@ -0,0 +1,31 @@
[package]
name = "dora-gst-webrtc-sink"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
authors = ["dieu.detruit@gmail.com"]
license.workspace = true
description = "WebRTC sink node for dora-rs with multi-camera support"
repository = "https://github.com/dieu-detruit/dora-gst-webrtc-sink"
keywords = ["dora", "webrtc", "gstreamer", "video", "streaming"]
categories = ["multimedia::video", "network-programming"]

[dependencies]
dora-node-api = { workspace = true, features = ["tracing"] }
eyre = "0.6"
arrow = { workspace = true }
gstreamer = "0.23"
gstreamer-webrtc = "0.23"
gstreamer-sdp = "0.23"
gstreamer-app = "0.23"
glib = "0.20"
tokio = { version = "1.40", features = ["full"] }
warp = "0.3"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
anyhow = "1.0"
futures = "0.3"
env_logger = "0.11"
log = "0.4"
tokio-stream = "0.1"
uuid = { version = "1.10", features = ["v4"] }

+ 228
- 0
node-hub/dora-gst-webrtc-sink/README.md View File

@@ -0,0 +1,228 @@
# dora-gst-webrtc-sink

[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
[![Rust](https://img.shields.io/badge/rust-%23000000.svg?style=flat&logo=rust&logoColor=white)](https://www.rust-lang.org/)

A WebRTC sink node for [dora-rs](https://github.com/dora-rs/dora) that streams video via WebRTC with built-in signaling server. Supports multiple video sources and multiple clients per source.

## Features

- 🎥 Stream RGB8 format video from dora-rs inputs
- 📡 Built-in WebRTC signaling server (no external server needed)
- 🔄 Real-time streaming with latest frame priority (buffer size 1)
- 📹 **Multiple video sources**: Handle multiple cameras/video streams simultaneously
- 👥 **Multiple clients per source**: Each video source can serve multiple WebRTC clients
- 🔧 Dynamic video source management
- ⚙️ Configurable signaling server port via environment variable

## Prerequisites

- Rust 1.70+
- GStreamer 1.16+ with WebRTC plugins
- [dora-rs](https://github.com/dora-rs/dora)

### Installing GStreamer

#### Ubuntu/Debian
```bash
sudo apt-get update
sudo apt-get install \
libgstreamer1.0-dev \
libgstreamer-plugins-base1.0-dev \
libgstreamer-plugins-bad1.0-dev \
gstreamer1.0-plugins-good \
gstreamer1.0-plugins-bad \
gstreamer1.0-plugins-ugly \
gstreamer1.0-libav
```

#### macOS
```bash
brew install gstreamer gst-plugins-base gst-plugins-good gst-plugins-bad gst-plugins-ugly
```

#### Windows
Download and install from [GStreamer official website](https://gstreamer.freedesktop.org/download/)

## Installation

```bash
git clone https://github.com/yourusername/dora-gst-webrtc-sink.git
cd dora-gst-webrtc-sink
cargo build --release
```

## Usage

### Quick Start

1. Run the example demo:
```bash
cd example
./run_demo.sh
```

2. Open `example/webrtc-viewer.html` in your web browser
3. The viewer will automatically connect to both camera streams

### Environment Variables

- `SIGNALING_PORT`: WebSocket signaling server port (default: 8080)
- `RUST_LOG`: Log level (default: info)

### Input Format

The node supports two input formats:

1. **Legacy format** (backward compatible):
- Input ID: `image`
- Maps to the default video source

2. **Multi-camera format**:
- Input ID: `<video_id>/frame`
- Example: `camera1/frame`, `camera2/frame`, `front_camera/frame`
- Each unique `video_id` creates a separate video stream

All inputs expect:
- **Format**: RGB8
- **Resolution**: Automatically detected from input frame size
- **Framerate**: Automatically detected from input timing
- **Encoding parameter**: "rgb8"

### WebRTC Client Connection

Connect to the WebSocket signaling server at:
```
ws://localhost:8080/<video_id>
```

Example URLs:
- `ws://localhost:8080/camera1` - First camera
- `ws://localhost:8080/camera2` - Second camera
- `ws://localhost:8080/default` - Legacy support

## Example Dataflow Configuration

### Single Camera (Legacy)
```yaml
nodes:
- id: camera
path: camera-node
outputs:
- image
- id: webrtc-sink
path: dora-gst-webrtc-sink
inputs:
image: camera/image
```

### Multiple Cameras
```yaml
nodes:
- id: camera1
path: camera-node
outputs:
- frame
- id: camera2
path: camera-node
outputs:
- frame
- id: webrtc-sink
path: dora-gst-webrtc-sink
inputs:
camera1/frame: camera1/frame
camera2/frame: camera2/frame
```

## Web Client

The included `webrtc-viewer.html` provides:
- Automatic connection to multiple video streams
- Dynamic addition/removal of video streams
- Individual connection control per stream
- Real-time status and logging

## Architecture

```
┌─────────────┐ ┌──────────────────┐ ┌─────────────┐
│ Camera Node │────▶│ dora-gst-webrtc │────▶│ Web Browser │
│ (RGB8) │ │ -sink │ │ (WebRTC) │
└─────────────┘ │ │ └─────────────┘
│ ┌─────────────┐ │
┌─────────────┐ │ │ Signaling │ │ ┌─────────────┐
│ Camera Node │────▶│ │ Server │ │────▶│ Web Browser │
│ (RGB8) │ │ └─────────────┘ │ │ (WebRTC) │
└─────────────┘ └──────────────────┘ └─────────────┘
```

## Development

### Building from Source
```bash
cargo build --release
```

### Running Tests
```bash
cargo test
```

### Code Structure
- `src/main.rs` - Main entry point and dora node integration
- `src/webrtc_server.rs` - WebRTC server implementation
- `src/peer_connection.rs` - Individual peer connection management
- `src/video_source_manager.rs` - Multiple video source management
- `src/signaling.rs` - WebRTC signaling protocol messages

## Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

1. Fork the repository
2. Create your feature branch (`git checkout -b feature/amazing-feature`)
3. Commit your changes (`git commit -m 'Add some amazing feature'`)
4. Push to the branch (`git push origin feature/amazing-feature`)
5. Open a Pull Request

## License

This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.

## Acknowledgments

- [dora-rs](https://github.com/dora-rs/dora) - Dataflow-oriented robotics architecture
- [GStreamer](https://gstreamer.freedesktop.org/) - Multimedia framework
- [webrtcbin](https://gstreamer.freedesktop.org/documentation/webrtc/index.html) - GStreamer WebRTC implementation

## Troubleshooting

### GStreamer WebRTC plugin not found
Ensure GStreamer plugins are properly installed:
```bash
gst-inspect-1.0 webrtcbin
```

### Connection issues
1. Check firewall settings for port 8080 (signaling)
2. Ensure STUN server is accessible (uses Google's public STUN by default)
3. Check browser console for WebRTC errors

### Performance issues
- Adjust VP8 encoding parameters in `create_pipeline()`
- Consider reducing resolution or framerate
- Check CPU usage during encoding

## Roadmap

- [ ] H.264 codec support for better compatibility
- [ ] TURN server support for NAT traversal
- [ ] Connection quality metrics
- [ ] Support for additional video formats (NV12, I420)

## Contact

For questions, issues, or contributions, please use the GitHub issue tracker.

+ 1
- 0
node-hub/dora-gst-webrtc-sink/example/.gitignore View File

@@ -0,0 +1 @@
out

+ 49
- 0
node-hub/dora-gst-webrtc-sink/example/README.md View File

@@ -0,0 +1,49 @@
# WebRTC Sink Example

This example demonstrates how to use `dora-gst-webrtc-sink` with `dora-gst-test-source` to stream video via WebRTC.

## Prerequisites

1. Install dora-rs
2. Install GStreamer with WebRTC support:
```bash
# Ubuntu/Debian
sudo apt-get install libgstreamer1.0-dev libgstreamer-plugins-base1.0-dev \
libgstreamer-plugins-bad1.0-dev gstreamer1.0-plugins-bad \
gstreamer1.0-nice gstreamer1.0-tools
```

## Running the Demo

1. Run the demo script:
```bash
./run_demo.sh
```

2. Open `webrtc-viewer.html` in a web browser

3. Click the "Connect" button to start viewing the stream

## Components

- **dataflow.yml**: Dora dataflow configuration that connects test video source to WebRTC sink
- **webrtc-viewer.html**: Simple web-based WebRTC client for viewing the stream
- **run_demo.sh**: Script to build and run the demo

## Architecture

```
[dora-gst-test-source] ---(RGB images)---> [dora-gst-webrtc-sink]
|
v
WebSocket Server (:8080)
|
v
Web Browser Client
```

## Troubleshooting

- If the build fails with "gstreamer-webrtc-1.0 not found", install the GStreamer WebRTC plugins
- Make sure port 8080 is not already in use
- Check browser console for WebRTC connection errors

+ 46
- 0
node-hub/dora-gst-webrtc-sink/example/dataflow.yml View File

@@ -0,0 +1,46 @@
nodes:
# Camera 1 - Ball pattern
- id: camera1
build: cargo install --git
https://github.com/dieu-detruit/dora-gst-test-source dora-gst-test-source
path: dora-gst-test-source
inputs:
tick: dora/timer/millis/30
outputs:
- frame
env:
MOTION: '0'
IS_LIVE: 'true'
PATTERN: '18' # Ball pattern
FOREGROUND_COLOR: '0xffb28354'
IMAGE_COLS: '640'
IMAGE_ROWS: '480'
SOURCE_FPS: '30.0'

# Camera 2 - SMPTE pattern
- id: camera2
build: cargo install --git
https://github.com/dieu-detruit/dora-gst-test-source dora-gst-test-source
path: dora-gst-test-source
inputs:
tick: dora/timer/millis/30
outputs:
- frame
env:
MOTION: '0'
IS_LIVE: 'true'
PATTERN: '0' # SMPTE pattern
IMAGE_COLS: '640'
IMAGE_ROWS: '480'
SOURCE_FPS: '30.0'

# WebRTC sink that handles multiple sources
- id: webrtc-sink
build: cargo install --path ../
path: dora-gst-webrtc-sink
inputs:
camera1/frame: camera1/frame
camera2/frame: camera2/frame
env:
SIGNALING_PORT: "8080"
RUST_LOG: "info"

+ 45
- 0
node-hub/dora-gst-webrtc-sink/example/run_demo.sh View File

@@ -0,0 +1,45 @@
#!/bin/bash

echo "=== dora-gst-webrtc-sink Demo ==="
echo

# Check if dora is installed
if ! command -v dora &> /dev/null; then
echo "Error: 'dora' command not found. Please install dora-rs first."
echo "Visit: https://github.com/dora-rs/dora"
exit 1
fi

# Check if GStreamer WebRTC plugin is installed
if ! command -v gst-inspect-1.0 webrtc &> /dev/null; then
echo "Error: GStreamer not found. Please install GStreamer and its WebRTC plugins."
echo "Ubuntu/Debian: sudo apt-get install libgstreamer1.0-dev libgstreamer-plugins-bad1.0-dev"
exit 1
fi

echo
echo "Building dataflow..."
dora build dataflow.yml
if [ $? -ne 0 ]; then
echo "Error: Build failed. Please check the error messages above."
exit 1
fi

echo
echo "Starting multi-camera WebRTC streaming demo..."
echo "WebRTC signaling server will be available at: ws://localhost:8080/<video_id>"
echo
echo "Available video streams:"
echo " - camera1: Moving ball pattern (ws://localhost:8080/camera1)"
echo " - camera2: SMPTE test pattern (ws://localhost:8080/camera2)"
echo
echo "To view the streams:"
echo "1. Open 'webrtc-viewer.html' in a web browser"
echo "2. Both cameras will connect automatically"
echo "3. You can add more video streams using the interface"
echo
echo "Press Ctrl+C to stop"
echo

# Start the dataflow
dora run dataflow.yml

+ 4
- 0
node-hub/dora-gst-webrtc-sink/src/lib.rs View File

@@ -0,0 +1,4 @@
pub mod peer_connection;
pub mod signaling;
pub mod video_source_manager;
pub mod webrtc_server;

+ 102
- 0
node-hub/dora-gst-webrtc-sink/src/main.rs View File

@@ -0,0 +1,102 @@
use arrow::array::UInt8Array;
use dora_node_api::{DoraNode, Event};
use futures::stream::StreamExt;
use log::info;
use std::env;
use std::sync::Arc;
use uuid::Uuid;
use warp::Filter;

mod peer_connection;
mod signaling;
mod video_source_manager;
mod webrtc_server;

use video_source_manager::VideoSourceManager;

#[tokio::main]
async fn main() -> eyre::Result<()> {
env_logger::init();
gstreamer::init()?;

let source_manager = Arc::new(VideoSourceManager::new());

// WebSocket route with video_id parameter
let source_manager_clone = source_manager.clone();
let websocket_route =
warp::path!(String)
.and(warp::ws())
.map(move |video_id: String, ws: warp::ws::Ws| {
let source_manager = source_manager_clone.clone();
ws.on_upgrade(move |websocket| async move {
let (server, _) = source_manager.get_or_create_source(&video_id);
let client_id = Uuid::new_v4().to_string();
info!(
"New client {} connected to video source: {}",
client_id, video_id
);
server.handle_websocket(websocket, client_id).await;
})
});

let port = env::var("SIGNALING_PORT")
.unwrap_or_else(|_| "8080".to_string())
.parse::<u16>()
.map_err(|e| eyre::eyre!("Invalid SIGNALING_PORT: {}", e))?;

let server_task = tokio::spawn(async move {
info!("WebRTC signaling server listening on port {}", port);
info!(
"Connect to ws://localhost:{}/VIDEO_ID for specific video streams",
port
);
warp::serve(websocket_route).run(([0, 0, 0, 0], port)).await;
});

let (_node, mut events) = DoraNode::init_from_env()?;

while let Some(event) = events.next().await {
match event {
Event::Input { id, data, metadata } => {
// Parse input ID to extract video_id
// Expected format: "video_id/frame" or just "image" for backward compatibility
let parts: Vec<&str> = id.as_str().split('/').collect();

let (video_id, is_frame) = if parts.len() == 2 && parts[1] == "frame" {
(parts[0].to_string(), true)
} else if id.as_str() == "image" {
// Backward compatibility: treat "image" as "default/frame"
("default".to_string(), true)
} else {
continue; // Skip non-frame inputs
};

if is_frame {
let encoding = if let Some(param) = metadata.parameters.get("encoding") {
match param {
dora_node_api::Parameter::String(s) => s.to_lowercase(),
_ => "rgb8".to_string(),
}
} else {
"rgb8".to_string()
};

if encoding == "rgb8" {
if let Some(data_arr) = data.as_any().downcast_ref::<UInt8Array>() {
let bytes = data_arr.values().to_vec();
let (_, frame_sender) = source_manager.get_or_create_source(&video_id);
let _ = frame_sender.send(bytes).await;
}
}
}
}
Event::Stop(_) => {
break;
}
_ => {}
}
}

server_task.abort();
Ok(())
}

+ 150
- 0
node-hub/dora-gst-webrtc-sink/src/peer_connection.rs View File

@@ -0,0 +1,150 @@
use gstreamer::prelude::*;
use gstreamer_app::AppSrc;

#[derive(Debug, Clone)]
pub struct PeerConnection {
pub _id: String,
pub pipeline: gstreamer::Pipeline,
pub webrtcbin: gstreamer::Element,
pub appsrc: AppSrc,
caps_set: std::sync::Arc<std::sync::Mutex<bool>>,
frame_count: std::sync::Arc<std::sync::Mutex<u32>>,
first_frame_time: std::sync::Arc<std::sync::Mutex<Option<std::time::Instant>>>,
detected_framerate: std::sync::Arc<std::sync::Mutex<Option<gstreamer::Fraction>>>,
}

impl PeerConnection {
pub fn new(
id: String,
pipeline: gstreamer::Pipeline,
webrtcbin: gstreamer::Element,
appsrc: AppSrc,
) -> Self {
Self {
_id: id,
pipeline,
webrtcbin,
appsrc,
caps_set: std::sync::Arc::new(std::sync::Mutex::new(false)),
frame_count: std::sync::Arc::new(std::sync::Mutex::new(0)),
first_frame_time: std::sync::Arc::new(std::sync::Mutex::new(None)),
detected_framerate: std::sync::Arc::new(std::sync::Mutex::new(None)),
}
}

pub fn send_frame(&self, frame_data: &[u8]) -> Result<(), gstreamer::FlowError> {
// Track frame timing for framerate detection
let now = std::time::Instant::now();
let mut frame_count = self.frame_count.lock().unwrap();
let mut first_frame_time = self.first_frame_time.lock().unwrap();
let mut detected_framerate = self.detected_framerate.lock().unwrap();

if first_frame_time.is_none() {
*first_frame_time = Some(now);
}
*frame_count += 1;

// Detect framerate after receiving enough frames (e.g., 30 frames)
if *frame_count >= 30 && detected_framerate.is_none() {
if let Some(start_time) = *first_frame_time {
let elapsed = now.duration_since(start_time).as_secs_f64();
if elapsed > 0.0 {
let fps = (*frame_count as f64 - 1.0) / elapsed;
let fps_rounded = fps.round() as i32;

// Common framerates: 15, 24, 25, 30, 50, 60
let common_fps = [15, 24, 25, 30, 50, 60];
let mut best_fps = 30;
let mut min_diff = i32::MAX;

for &candidate in &common_fps {
let diff = (fps_rounded - candidate).abs();
if diff < min_diff {
min_diff = diff;
best_fps = candidate;
}
}

*detected_framerate = Some(gstreamer::Fraction::new(best_fps, 1));
log::info!(
"Detected framerate: {} fps (measured: {:.2} fps)",
best_fps,
fps
);
}
}
}

// Set caps on first frame or when framerate is detected
let mut caps_set = self.caps_set.lock().unwrap();
if !*caps_set || (*caps_set && detected_framerate.is_some() && *frame_count == 30) {
// Detect resolution from frame size (RGB = 3 bytes per pixel)
let pixels = frame_data.len() / 3;

// Common resolutions
let resolutions = [
(640, 480),
(800, 600),
(1024, 768),
(1280, 720),
(1280, 960),
(1280, 1024),
(1920, 1080),
(2560, 1440),
(3840, 2160),
];

let mut width = 640;
let mut height = 480;

for (w, h) in resolutions.iter() {
if (*w as usize) * (*h as usize) == pixels {
width = *w;
height = *h;
break;
}
}

// Build caps with detected or default framerate
let mut caps_builder = gstreamer::Caps::builder("video/x-raw")
.field("format", "RGB")
.field("width", width as i32)
.field("height", height as i32);

if let Some(framerate) = &*detected_framerate {
caps_builder = caps_builder.field("framerate", framerate);
} else {
// Use a default framerate initially
caps_builder = caps_builder.field("framerate", gstreamer::Fraction::new(30, 1));
}

let caps = caps_builder.build();
self.appsrc.set_caps(Some(&caps));
*caps_set = true;

if *frame_count == 30 {
log::info!("Updated caps with detected framerate");
}
}

let mut buffer = gstreamer::Buffer::from_slice(frame_data.to_vec());

// Set the buffer timestamp
let buffer_ref = buffer.get_mut().unwrap();
let clock = gstreamer::SystemClock::obtain();
let base_time = self.pipeline.base_time();
let now = clock.time();
if let (Some(now), Some(base_time)) = (now, base_time) {
buffer_ref.set_pts(now - base_time);
}

match self.appsrc.push_buffer(buffer) {
Ok(_) => Ok(()),
Err(e) => Err(e),
}
}

pub fn shutdown(&self) {
let _ = self.pipeline.set_state(gstreamer::State::Null);
}
}

+ 21
- 0
node-hub/dora-gst-webrtc-sink/src/signaling.rs View File

@@ -0,0 +1,21 @@
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum SignalingMessage {
#[serde(rename = "offer")]
Offer { sdp: String },
#[serde(rename = "answer")]
Answer { sdp: String },
#[serde(rename = "ice")]
Ice { candidate: IceCandidate },
}

#[derive(Debug, Serialize, Deserialize)]
pub struct IceCandidate {
pub candidate: String,
#[serde(rename = "sdpMLineIndex")]
pub sdp_mline_index: u32,
#[serde(rename = "sdpMid", skip_serializing_if = "Option::is_none")]
pub sdp_mid: Option<String>,
}

+ 76
- 0
node-hub/dora-gst-webrtc-sink/src/video_source_manager.rs View File

@@ -0,0 +1,76 @@
use log::info;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc;

use crate::webrtc_server::WebRTCServer;

#[derive(Clone)]
pub struct VideoSourceManager {
sources: Arc<Mutex<HashMap<String, VideoSource>>>,
}

#[derive(Clone)]
struct VideoSource {
_video_id: String,
server: Arc<WebRTCServer>,
frame_sender: mpsc::Sender<Vec<u8>>,
}

impl VideoSourceManager {
pub fn new() -> Self {
Self {
sources: Arc::new(Mutex::new(HashMap::new())),
}
}

pub fn get_or_create_source(
&self,
video_id: &str,
) -> (Arc<WebRTCServer>, mpsc::Sender<Vec<u8>>) {
let mut sources = self.sources.lock().unwrap();

if let Some(source) = sources.get(video_id) {
(source.server.clone(), source.frame_sender.clone())
} else {
info!("Creating new video source: {}", video_id);

let (frame_sender, mut frame_receiver) = mpsc::channel::<Vec<u8>>(1);
let frame_sender_clone = frame_sender.clone();
let server = Arc::new(WebRTCServer::new(frame_sender_clone));

let server_clone = server.clone();
let video_id_clone = video_id.to_string();
tokio::spawn(async move {
while let Some(frame) = frame_receiver.recv().await {
server_clone.send_frame_to_peers(&frame);
}
info!("Frame receiver for {} stopped", video_id_clone);
});

let source = VideoSource {
_video_id: video_id.to_string(),
server: server.clone(),
frame_sender: frame_sender.clone(),
};

sources.insert(video_id.to_string(), source);

(server, frame_sender)
}
}

#[allow(dead_code)]
pub fn get_source(&self, video_id: &str) -> Option<Arc<WebRTCServer>> {
let sources = self.sources.lock().unwrap();
sources.get(video_id).map(|s| s.server.clone())
}

#[allow(dead_code)]
pub fn remove_source(&self, video_id: &str) {
let mut sources = self.sources.lock().unwrap();
if sources.remove(video_id).is_some() {
info!("Removed video source: {}", video_id);
}
}
}

+ 499
- 0
node-hub/dora-gst-webrtc-sink/src/webrtc_server.rs View File

@@ -0,0 +1,499 @@
use anyhow::Context;
use futures::SinkExt;
use futures::stream::StreamExt;
use gstreamer::prelude::*;
use gstreamer_app::AppSrc;
use gstreamer_webrtc::{WebRTCICEConnectionState, WebRTCSessionDescription};
use log::{error, info, warn};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc;
use warp::ws::{Message, WebSocket};

use crate::peer_connection::PeerConnection;
use crate::signaling::SignalingMessage;

#[derive(Debug, Clone)]
pub struct WebRTCServer {
peers: Arc<Mutex<HashMap<String, PeerConnection>>>,
_frame_sender: mpsc::Sender<Vec<u8>>,
}

impl WebRTCServer {
pub fn new(frame_sender: mpsc::Sender<Vec<u8>>) -> Self {
Self {
peers: Arc::new(Mutex::new(HashMap::new())),
_frame_sender: frame_sender,
}
}

pub async fn handle_websocket(&self, ws: WebSocket, id: String) {
let (ws_sender, mut ws_receiver) = ws.split();
let ws_sender = Arc::new(Mutex::new(ws_sender));

while let Some(result) = ws_receiver.next().await {
match result {
Ok(msg) => {
if let Ok(text) = msg.to_str() {
if let Err(e) = self
.handle_signaling_message(text, &id, ws_sender.clone())
.await
{
error!("Error handling signaling message: {}", e);
}
}
}
Err(e) => {
error!("WebSocket error: {}", e);
break;
}
}
}

self.remove_peer(&id);
info!("Client {} disconnected", id);
}

async fn handle_signaling_message(
&self,
msg: &str,
peer_id: &str,
ws_sender: Arc<Mutex<futures::stream::SplitSink<WebSocket, Message>>>,
) -> anyhow::Result<()> {
// Parse JSON with detailed error handling
let message: SignalingMessage = match serde_json::from_str(msg) {
Ok(m) => m,
Err(e) => {
// Log the raw message for debugging
error!("Failed to deserialize signaling message. Error: {}", e);
error!("Raw message: {}", msg);

// Try to parse as generic JSON to provide more context
if let Ok(json_value) = serde_json::from_str::<serde_json::Value>(msg) {
error!("Parsed JSON structure: {:#?}", json_value);

// Check the type field
if let Some(msg_type) = json_value.get("type") {
error!("Message type field: {:?}", msg_type);
} else {
error!("Message missing 'type' field");
}

// For offer/answer messages, check sdp field
if let Some(sdp) = json_value.get("sdp") {
error!(
"SDP field type: {}",
if sdp.is_string() {
"string"
} else {
"non-string"
}
);
}

// For ICE messages, check candidate field
if let Some(candidate) = json_value.get("candidate") {
error!(
"Candidate field type: {}",
if candidate.is_string() {
"string"
} else {
"non-string"
}
);
}
}

return Err(anyhow::anyhow!(
"Failed to deserialize signaling message: {}",
e
));
}
};

match message {
SignalingMessage::Offer { sdp, .. } => {
info!("Received offer from {}", peer_id);
let _answer_sdp = self.handle_offer(&sdp, peer_id, ws_sender).await?;
}
SignalingMessage::Ice { candidate } => {
self.add_ice_candidate(peer_id, &candidate.candidate, candidate.sdp_mline_index)?;
}
_ => {
warn!("Unexpected message type");
}
}

Ok(())
}

async fn handle_offer(
&self,
sdp: &str,
peer_id: &str,
ws_sender: Arc<Mutex<futures::stream::SplitSink<WebSocket, Message>>>,
) -> anyhow::Result<String> {
let pipeline = self.create_pipeline(peer_id, ws_sender.clone())?;

let webrtcbin = pipeline
.by_name("webrtc")
.context("Failed to get webrtcbin")?;

let appsrc = pipeline
.by_name("videosrc")
.and_then(|e| e.dynamic_cast::<AppSrc>().ok())
.context("Failed to get appsrc")?;

// Configure appsrc
appsrc.set_property("is-live", true);
appsrc.set_property("format", gstreamer::Format::Time);

// Set pipeline to READY state before setting remote description
pipeline.set_state(gstreamer::State::Ready)?;

// Wait for state change to complete
let (state_change_return, _, _) = pipeline.state(gstreamer::ClockTime::from_seconds(5));
if state_change_return.is_err() {
return Err(anyhow::anyhow!("Failed to set pipeline to READY state"));
}

let ret = gstreamer_sdp::SDPMessage::parse_buffer(sdp.as_bytes())
.map_err(|_| anyhow::anyhow!("Failed to parse SDP"))?;

let offer = WebRTCSessionDescription::new(gstreamer_webrtc::WebRTCSDPType::Offer, ret);

let promise = gstreamer::Promise::new();
webrtcbin.emit_by_name::<()>("set-remote-description", &[&offer, &promise]);

// Wait for set-remote-description to complete
let result = promise.wait();
match result {
gstreamer::PromiseResult::Replied => {
// Check if there was an error in the reply
if let Some(reply) = promise.get_reply() {
if let Ok(error_value) = reply.value("error") {
if let Ok(error) = error_value.get::<glib::Error>() {
error!("set-remote-description error: {}", error);
return Err(anyhow::anyhow!(
"Failed to set remote description: {}",
error
));
}
}
}
}
_ => {
error!("Failed to set remote description: {:?}", result);
return Err(anyhow::anyhow!("Failed to set remote description"));
}
}

// Additional delay to ensure state is propagated
std::thread::sleep(std::time::Duration::from_millis(100));

// Store peer first to ensure it can receive frames
let peer = PeerConnection::new(
peer_id.to_string(),
pipeline.clone(),
webrtcbin.clone(),
appsrc.clone(),
);
self.peers.lock().unwrap().insert(peer_id.to_string(), peer);

// Now set pipeline to PLAYING state before creating answer
pipeline.set_state(gstreamer::State::Playing)?;

// Wait for state change to complete
let (state_change_return, _, _) = pipeline.state(gstreamer::ClockTime::from_seconds(5));
if state_change_return.is_err() {
warn!(
"Pipeline state change to PLAYING not fully successful: {:?}",
state_change_return
);
}

// Give pipeline time to initialize
std::thread::sleep(std::time::Duration::from_millis(200));

// Connect to pad-added signal to ensure pipeline is ready
webrtcbin.connect("pad-added", false, move |values| {
let _webrtc = match values[0].get::<gstreamer::Element>() {
Ok(elem) => elem,
Err(_) => {
error!("Failed to get webrtc element from values");
return None;
}
};

let _pad = match values[1].get::<gstreamer::Pad>() {
Ok(p) => p,
Err(_) => {
error!("Failed to get pad from values");
return None;
}
};

None
});

// Use promise without change func first
let promise = gstreamer::Promise::new();

webrtcbin.emit_by_name::<()>("create-answer", &[&None::<gstreamer::Structure>, &promise]);

// Wait for promise to complete
let reply = promise.wait();

// Small delay to ensure the answer is fully processed
std::thread::sleep(std::time::Duration::from_millis(100));

// Get the answer from promise reply
match reply {
gstreamer::PromiseResult::Replied => {
if let Some(reply_struct) = promise.get_reply() {
// Check for error in reply
if let Ok(error_value) = reply_struct.value("error") {
if let Ok(error) = error_value.get::<glib::Error>() {
error!("create-answer error: {}", error);
}
}

// The answer should be in the reply
if let Ok(answer_value) =
reply_struct.get::<gstreamer_webrtc::WebRTCSessionDescription>("answer")
{
// Set local description
let set_promise = gstreamer::Promise::new();
webrtcbin.emit_by_name::<()>(
"set-local-description",
&[&answer_value, &set_promise],
);
let _ = set_promise.wait();

let answer_sdp = answer_value.sdp().to_string();

let msg = SignalingMessage::Answer { sdp: answer_sdp };

if let Ok(json) = serde_json::to_string(&msg) {
let ws_sender_clone = ws_sender.clone();
tokio::task::spawn_blocking(move || {
let rt = tokio::runtime::Handle::current();
rt.block_on(async move {
if let Ok(mut sender) = ws_sender_clone.lock() {
let _ = sender.send(Message::text(json)).await;
}
});
});
}
} else {
error!("No answer in reply structure");

// Try to get local-description as fallback
if let Some(answer) = webrtcbin.property::<Option<
gstreamer_webrtc::WebRTCSessionDescription,
>>(
"local-description"
) {
let answer_sdp = answer.sdp().to_string();

let msg = SignalingMessage::Answer { sdp: answer_sdp };

if let Ok(json) = serde_json::to_string(&msg) {
let ws_sender_clone = ws_sender.clone();
tokio::task::spawn_blocking(move || {
let rt = tokio::runtime::Handle::current();
rt.block_on(async move {
if let Ok(mut sender) = ws_sender_clone.lock() {
let _ = sender.send(Message::text(json)).await;
}
});
});
}
}
}
}
}
_ => {
error!("Promise failed or was interrupted");
}
}

Ok(String::new())
}

fn create_pipeline(
&self,
peer_id: &str,
ws_sender: Arc<Mutex<futures::stream::SplitSink<WebSocket, Message>>>,
) -> anyhow::Result<gstreamer::Pipeline> {
// Accept any resolution and framerate from input
// Initial caps will be set dynamically when first frame arrives
let pipeline_str = format!(
"appsrc name=videosrc \
is-live=true format=time do-timestamp=true ! \
videoconvert ! \
vp8enc deadline=1 ! \
rtpvp8pay ! \
application/x-rtp,media=video,encoding-name=VP8,payload=96 ! \
webrtcbin name=webrtc bundle-policy=max-bundle"
);

let pipeline = gstreamer::parse::launch(&pipeline_str)?
.dynamic_cast::<gstreamer::Pipeline>()
.map_err(|_| anyhow::anyhow!("Failed to create pipeline"))?;

// Monitor pipeline messages
let bus = pipeline.bus().unwrap();
let peer_id_for_bus = peer_id.to_string();
std::thread::spawn(move || {
for msg in bus.iter_timed(gstreamer::ClockTime::NONE) {
use gstreamer::MessageView;
match msg.view() {
MessageView::StateChanged(_) => {}
MessageView::Error(err) => {
error!(
"Pipeline {} error: {} ({})",
peer_id_for_bus,
err.error(),
err.debug().unwrap_or_default()
);
}
MessageView::Warning(warn) => {
warn!(
"Pipeline {} warning: {} ({})",
peer_id_for_bus,
warn.error(),
warn.debug().unwrap_or_default()
);
}
MessageView::Eos(_) => {
break;
}
_ => {}
}
}
});

let webrtcbin = pipeline
.by_name("webrtc")
.context("Failed to get webrtcbin")?;

webrtcbin.set_property_from_str("stun-server", "stun://stun.l.google.com:19302");

let ws_sender_clone = ws_sender.clone();
webrtcbin.connect("on-ice-candidate", false, move |values| {
// Handle potential errors in callback to avoid panic
let _webrtc = match values[0].get::<gstreamer::Element>() {
Ok(elem) => elem,
Err(_) => {
error!("Failed to get webrtc element from values");
return None;
}
};

let mline_index = match values[1].get::<u32>() {
Ok(idx) => idx,
Err(_) => {
error!("Failed to get mline_index from values");
return None;
}
};

let candidate = match values[2].get::<String>() {
Ok(cand) => cand,
Err(_) => {
error!("Failed to get candidate from values");
return None;
}
};

let msg = SignalingMessage::Ice {
candidate: crate::signaling::IceCandidate {
candidate,
sdp_mline_index: mline_index,
sdp_mid: None,
},
};

if let Ok(json) = serde_json::to_string(&msg) {
let ws_sender = ws_sender_clone.clone();

// Use a dedicated runtime for sending messages from GStreamer threads
std::thread::spawn(move || {
// Create a small runtime just for this send operation
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build();

if let Ok(rt) = rt {
rt.block_on(async move {
if let Ok(mut sender) = ws_sender.lock() {
let _ = sender.send(Message::text(json)).await;
}
});
}
});
}

None
});

let peer_id_clone = peer_id.to_string();
let _ws_sender_clone = ws_sender.clone();
webrtcbin.connect("notify::ice-connection-state", false, move |values| {
let _webrtc = match values[0].get::<gstreamer::Element>() {
Ok(elem) => elem,
Err(_) => {
error!("Failed to get webrtc element from values");
return None;
}
};

let state = _webrtc.property::<WebRTCICEConnectionState>("ice-connection-state");
info!("ICE connection state for {}: {:?}", peer_id_clone, state);

// Also check connection state
let conn_state =
_webrtc.property::<gstreamer_webrtc::WebRTCPeerConnectionState>("connection-state");
info!(
"Peer connection state for {}: {:?}",
peer_id_clone, conn_state
);
None
});

Ok(pipeline)
}

fn add_ice_candidate(
&self,
peer_id: &str,
candidate: &str,
mline_index: u32,
) -> anyhow::Result<()> {
let peers = self.peers.lock().unwrap();
if let Some(peer) = peers.get(peer_id) {
peer.webrtcbin
.emit_by_name::<()>("add-ice-candidate", &[&mline_index, &candidate]);
} else {
warn!("Peer {} not found when adding ICE candidate", peer_id);
}
Ok(())
}

fn remove_peer(&self, peer_id: &str) {
let mut peers = self.peers.lock().unwrap();
if let Some(peer) = peers.remove(peer_id) {
peer.shutdown();
}
}

pub fn send_frame_to_peers(&self, frame_data: &[u8]) {
let peers = self.peers.lock().unwrap();
for (peer_id, peer) in peers.iter() {
if let Err(e) = peer.send_frame(frame_data) {
warn!("Failed to push buffer to peer {}: {}", peer_id, e);
}
}
}
}

Loading…
Cancel
Save