Browse Source

added dora-gst-webrtc-sink to node-hub

pull/1095/head
Watanabe Takafumi 5 months ago
parent
commit
96ea8e488c
14 changed files with 5589 additions and 0 deletions
  1. +1
    -0
      node-hub/dora-gst-webrtc-sink/.gitignore
  2. +4337
    -0
      node-hub/dora-gst-webrtc-sink/Cargo.lock
  3. +30
    -0
      node-hub/dora-gst-webrtc-sink/Cargo.toml
  4. +228
    -0
      node-hub/dora-gst-webrtc-sink/README.md
  5. +1
    -0
      node-hub/dora-gst-webrtc-sink/example/.gitignore
  6. +49
    -0
      node-hub/dora-gst-webrtc-sink/example/README.md
  7. +46
    -0
      node-hub/dora-gst-webrtc-sink/example/dataflow.yml
  8. +45
    -0
      node-hub/dora-gst-webrtc-sink/example/run_demo.sh
  9. +4
    -0
      node-hub/dora-gst-webrtc-sink/src/lib.rs
  10. +102
    -0
      node-hub/dora-gst-webrtc-sink/src/main.rs
  11. +150
    -0
      node-hub/dora-gst-webrtc-sink/src/peer_connection.rs
  12. +21
    -0
      node-hub/dora-gst-webrtc-sink/src/signaling.rs
  13. +76
    -0
      node-hub/dora-gst-webrtc-sink/src/video_source_manager.rs
  14. +499
    -0
      node-hub/dora-gst-webrtc-sink/src/webrtc_server.rs

+ 1
- 0
node-hub/dora-gst-webrtc-sink/.gitignore View File

@@ -0,0 +1 @@
/target

+ 4337
- 0
node-hub/dora-gst-webrtc-sink/Cargo.lock
File diff suppressed because it is too large
View File


+ 30
- 0
node-hub/dora-gst-webrtc-sink/Cargo.toml View File

@@ -0,0 +1,30 @@
[package]
name = "dora-gst-webrtc-sink"
version = "0.1.0"
edition = "2021"
authors = ["dieu.detruit@gmail.com"]
license = "MIT"
description = "WebRTC sink node for dora-rs with multi-camera support"
repository = "https://github.com/dieu-detruit/dora-gst-webrtc-sink"
keywords = ["dora", "webrtc", "gstreamer", "video", "streaming"]
categories = ["multimedia::video", "network-programming"]

[dependencies]
dora-node-api = "0.3"
eyre = "0.6"
arrow = "54"
gstreamer = "0.23"
gstreamer-webrtc = "0.23"
gstreamer-sdp = "0.23"
gstreamer-app = "0.23"
glib = "0.20"
tokio = { version = "1.40", features = ["full"] }
warp = "0.3"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
anyhow = "1.0"
futures = "0.3"
env_logger = "0.11"
log = "0.4"
tokio-stream = "0.1"
uuid = { version = "1.10", features = ["v4"] }

+ 228
- 0
node-hub/dora-gst-webrtc-sink/README.md View File

@@ -0,0 +1,228 @@
# dora-gst-webrtc-sink

[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
[![Rust](https://img.shields.io/badge/rust-%23000000.svg?style=flat&logo=rust&logoColor=white)](https://www.rust-lang.org/)

A WebRTC sink node for [dora-rs](https://github.com/dora-rs/dora) that streams video via WebRTC with built-in signaling server. Supports multiple video sources and multiple clients per source.

## Features

- 🎥 Stream RGB8 format video from dora-rs inputs
- 📡 Built-in WebRTC signaling server (no external server needed)
- 🔄 Real-time streaming with latest frame priority (buffer size 1)
- 📹 **Multiple video sources**: Handle multiple cameras/video streams simultaneously
- 👥 **Multiple clients per source**: Each video source can serve multiple WebRTC clients
- 🔧 Dynamic video source management
- ⚙️ Configurable signaling server port via environment variable

## Prerequisites

- Rust 1.70+
- GStreamer 1.16+ with WebRTC plugins
- [dora-rs](https://github.com/dora-rs/dora)

### Installing GStreamer

#### Ubuntu/Debian
```bash
sudo apt-get update
sudo apt-get install \
libgstreamer1.0-dev \
libgstreamer-plugins-base1.0-dev \
libgstreamer-plugins-bad1.0-dev \
gstreamer1.0-plugins-good \
gstreamer1.0-plugins-bad \
gstreamer1.0-plugins-ugly \
gstreamer1.0-libav
```

#### macOS
```bash
brew install gstreamer gst-plugins-base gst-plugins-good gst-plugins-bad gst-plugins-ugly
```

#### Windows
Download and install from [GStreamer official website](https://gstreamer.freedesktop.org/download/)

## Installation

```bash
git clone https://github.com/yourusername/dora-gst-webrtc-sink.git
cd dora-gst-webrtc-sink
cargo build --release
```

## Usage

### Quick Start

1. Run the example demo:
```bash
cd example
./run_demo.sh
```

2. Open `example/webrtc-viewer.html` in your web browser
3. The viewer will automatically connect to both camera streams

### Environment Variables

- `SIGNALING_PORT`: WebSocket signaling server port (default: 8080)
- `RUST_LOG`: Log level (default: info)

### Input Format

The node supports two input formats:

1. **Legacy format** (backward compatible):
- Input ID: `image`
- Maps to the default video source

2. **Multi-camera format**:
- Input ID: `<video_id>/frame`
- Example: `camera1/frame`, `camera2/frame`, `front_camera/frame`
- Each unique `video_id` creates a separate video stream

All inputs expect:
- **Format**: RGB8
- **Resolution**: Automatically detected from input frame size
- **Framerate**: Automatically detected from input timing
- **Encoding parameter**: "rgb8"

### WebRTC Client Connection

Connect to the WebSocket signaling server at:
```
ws://localhost:8080/<video_id>
```

Example URLs:
- `ws://localhost:8080/camera1` - First camera
- `ws://localhost:8080/camera2` - Second camera
- `ws://localhost:8080/default` - Legacy support

## Example Dataflow Configuration

### Single Camera (Legacy)
```yaml
nodes:
- id: camera
path: camera-node
outputs:
- image
- id: webrtc-sink
path: dora-gst-webrtc-sink
inputs:
image: camera/image
```

### Multiple Cameras
```yaml
nodes:
- id: camera1
path: camera-node
outputs:
- frame
- id: camera2
path: camera-node
outputs:
- frame
- id: webrtc-sink
path: dora-gst-webrtc-sink
inputs:
camera1/frame: camera1/frame
camera2/frame: camera2/frame
```

## Web Client

The included `webrtc-viewer.html` provides:
- Automatic connection to multiple video streams
- Dynamic addition/removal of video streams
- Individual connection control per stream
- Real-time status and logging

## Architecture

```
┌─────────────┐ ┌──────────────────┐ ┌─────────────┐
│ Camera Node │────▶│ dora-gst-webrtc │────▶│ Web Browser │
│ (RGB8) │ │ -sink │ │ (WebRTC) │
└─────────────┘ │ │ └─────────────┘
│ ┌─────────────┐ │
┌─────────────┐ │ │ Signaling │ │ ┌─────────────┐
│ Camera Node │────▶│ │ Server │ │────▶│ Web Browser │
│ (RGB8) │ │ └─────────────┘ │ │ (WebRTC) │
└─────────────┘ └──────────────────┘ └─────────────┘
```

## Development

### Building from Source
```bash
cargo build --release
```

### Running Tests
```bash
cargo test
```

### Code Structure
- `src/main.rs` - Main entry point and dora node integration
- `src/webrtc_server.rs` - WebRTC server implementation
- `src/peer_connection.rs` - Individual peer connection management
- `src/video_source_manager.rs` - Multiple video source management
- `src/signaling.rs` - WebRTC signaling protocol messages

## Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

1. Fork the repository
2. Create your feature branch (`git checkout -b feature/amazing-feature`)
3. Commit your changes (`git commit -m 'Add some amazing feature'`)
4. Push to the branch (`git push origin feature/amazing-feature`)
5. Open a Pull Request

## License

This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.

## Acknowledgments

- [dora-rs](https://github.com/dora-rs/dora) - Dataflow-oriented robotics architecture
- [GStreamer](https://gstreamer.freedesktop.org/) - Multimedia framework
- [webrtcbin](https://gstreamer.freedesktop.org/documentation/webrtc/index.html) - GStreamer WebRTC implementation

## Troubleshooting

### GStreamer WebRTC plugin not found
Ensure GStreamer plugins are properly installed:
```bash
gst-inspect-1.0 webrtcbin
```

### Connection issues
1. Check firewall settings for port 8080 (signaling)
2. Ensure STUN server is accessible (uses Google's public STUN by default)
3. Check browser console for WebRTC errors

### Performance issues
- Adjust VP8 encoding parameters in `create_pipeline()`
- Consider reducing resolution or framerate
- Check CPU usage during encoding

## Roadmap

- [ ] H.264 codec support for better compatibility
- [ ] TURN server support for NAT traversal
- [ ] Connection quality metrics
- [ ] Support for additional video formats (NV12, I420)

## Contact

For questions, issues, or contributions, please use the GitHub issue tracker.

+ 1
- 0
node-hub/dora-gst-webrtc-sink/example/.gitignore View File

@@ -0,0 +1 @@
out

+ 49
- 0
node-hub/dora-gst-webrtc-sink/example/README.md View File

@@ -0,0 +1,49 @@
# WebRTC Sink Example

This example demonstrates how to use `dora-gst-webrtc-sink` with `dora-gst-test-source` to stream video via WebRTC.

## Prerequisites

1. Install dora-rs
2. Install GStreamer with WebRTC support:
```bash
# Ubuntu/Debian
sudo apt-get install libgstreamer1.0-dev libgstreamer-plugins-base1.0-dev \
libgstreamer-plugins-bad1.0-dev gstreamer1.0-plugins-bad \
gstreamer1.0-nice gstreamer1.0-tools
```

## Running the Demo

1. Run the demo script:
```bash
./run_demo.sh
```

2. Open `webrtc-viewer.html` in a web browser

3. Click the "Connect" button to start viewing the stream

## Components

- **dataflow.yml**: Dora dataflow configuration that connects test video source to WebRTC sink
- **webrtc-viewer.html**: Simple web-based WebRTC client for viewing the stream
- **run_demo.sh**: Script to build and run the demo

## Architecture

```
[dora-gst-test-source] ---(RGB images)---> [dora-gst-webrtc-sink]
|
v
WebSocket Server (:8080)
|
v
Web Browser Client
```

## Troubleshooting

- If the build fails with "gstreamer-webrtc-1.0 not found", install the GStreamer WebRTC plugins
- Make sure port 8080 is not already in use
- Check browser console for WebRTC connection errors

+ 46
- 0
node-hub/dora-gst-webrtc-sink/example/dataflow.yml View File

@@ -0,0 +1,46 @@
nodes:
# Camera 1 - Ball pattern
- id: camera1
build: cargo install --git
https://github.com/dieu-detruit/dora-gst-test-source dora-gst-test-source
path: dora-gst-test-source
inputs:
tick: dora/timer/millis/30
outputs:
- frame
env:
MOTION: '0'
IS_LIVE: 'true'
PATTERN: '18' # Ball pattern
FOREGROUND_COLOR: '0xffb28354'
IMAGE_COLS: '640'
IMAGE_ROWS: '480'
SOURCE_FPS: '30.0'

# Camera 2 - SMPTE pattern
- id: camera2
build: cargo install --git
https://github.com/dieu-detruit/dora-gst-test-source dora-gst-test-source
path: dora-gst-test-source
inputs:
tick: dora/timer/millis/30
outputs:
- frame
env:
MOTION: '0'
IS_LIVE: 'true'
PATTERN: '0' # SMPTE pattern
IMAGE_COLS: '640'
IMAGE_ROWS: '480'
SOURCE_FPS: '30.0'

# WebRTC sink that handles multiple sources
- id: webrtc-sink
build: cargo install --path ../
path: dora-gst-webrtc-sink
inputs:
camera1/frame: camera1/frame
camera2/frame: camera2/frame
env:
SIGNALING_PORT: "8080"
RUST_LOG: "info"

+ 45
- 0
node-hub/dora-gst-webrtc-sink/example/run_demo.sh View File

@@ -0,0 +1,45 @@
#!/bin/bash

echo "=== dora-gst-webrtc-sink Demo ==="
echo

# Check if dora is installed
if ! command -v dora &> /dev/null; then
echo "Error: 'dora' command not found. Please install dora-rs first."
echo "Visit: https://github.com/dora-rs/dora"
exit 1
fi

# Check if GStreamer WebRTC plugin is installed
if ! command -v gst-inspect-1.0 webrtc &> /dev/null; then
echo "Error: GStreamer not found. Please install GStreamer and its WebRTC plugins."
echo "Ubuntu/Debian: sudo apt-get install libgstreamer1.0-dev libgstreamer-plugins-bad1.0-dev"
exit 1
fi

echo
echo "Building dataflow..."
dora build dataflow.yml
if [ $? -ne 0 ]; then
echo "Error: Build failed. Please check the error messages above."
exit 1
fi

echo
echo "Starting multi-camera WebRTC streaming demo..."
echo "WebRTC signaling server will be available at: ws://localhost:8080/<video_id>"
echo
echo "Available video streams:"
echo " - camera1: Moving ball pattern (ws://localhost:8080/camera1)"
echo " - camera2: SMPTE test pattern (ws://localhost:8080/camera2)"
echo
echo "To view the streams:"
echo "1. Open 'webrtc-viewer.html' in a web browser"
echo "2. Both cameras will connect automatically"
echo "3. You can add more video streams using the interface"
echo
echo "Press Ctrl+C to stop"
echo

# Start the dataflow
dora run dataflow.yml

+ 4
- 0
node-hub/dora-gst-webrtc-sink/src/lib.rs View File

@@ -0,0 +1,4 @@
pub mod peer_connection;
pub mod signaling;
pub mod video_source_manager;
pub mod webrtc_server;

+ 102
- 0
node-hub/dora-gst-webrtc-sink/src/main.rs View File

@@ -0,0 +1,102 @@
use arrow::array::UInt8Array;
use dora_node_api::{DoraNode, Event};
use futures::stream::StreamExt;
use log::info;
use std::env;
use std::sync::Arc;
use uuid::Uuid;
use warp::Filter;

mod peer_connection;
mod signaling;
mod video_source_manager;
mod webrtc_server;

use video_source_manager::VideoSourceManager;

#[tokio::main]
async fn main() -> eyre::Result<()> {
env_logger::init();
gstreamer::init()?;

let source_manager = Arc::new(VideoSourceManager::new());

// WebSocket route with video_id parameter
let source_manager_clone = source_manager.clone();
let websocket_route =
warp::path!(String)
.and(warp::ws())
.map(move |video_id: String, ws: warp::ws::Ws| {
let source_manager = source_manager_clone.clone();
ws.on_upgrade(move |websocket| async move {
let (server, _) = source_manager.get_or_create_source(&video_id);
let client_id = Uuid::new_v4().to_string();
info!(
"New client {} connected to video source: {}",
client_id, video_id
);
server.handle_websocket(websocket, client_id).await;
})
});

let port = env::var("SIGNALING_PORT")
.unwrap_or_else(|_| "8080".to_string())
.parse::<u16>()
.map_err(|e| eyre::eyre!("Invalid SIGNALING_PORT: {}", e))?;

let server_task = tokio::spawn(async move {
info!("WebRTC signaling server listening on port {}", port);
info!(
"Connect to ws://localhost:{}/VIDEO_ID for specific video streams",
port
);
warp::serve(websocket_route).run(([0, 0, 0, 0], port)).await;
});

let (_node, mut events) = DoraNode::init_from_env()?;

while let Some(event) = events.next().await {
match event {
Event::Input { id, data, metadata } => {
// Parse input ID to extract video_id
// Expected format: "video_id/frame" or just "image" for backward compatibility
let parts: Vec<&str> = id.as_str().split('/').collect();

let (video_id, is_frame) = if parts.len() == 2 && parts[1] == "frame" {
(parts[0].to_string(), true)
} else if id.as_str() == "image" {
// Backward compatibility: treat "image" as "default/frame"
("default".to_string(), true)
} else {
continue; // Skip non-frame inputs
};

if is_frame {
let encoding = if let Some(param) = metadata.parameters.get("encoding") {
match param {
dora_node_api::Parameter::String(s) => s.to_lowercase(),
_ => "rgb8".to_string(),
}
} else {
"rgb8".to_string()
};

if encoding == "rgb8" {
if let Some(data_arr) = data.as_any().downcast_ref::<UInt8Array>() {
let bytes = data_arr.values().to_vec();
let (_, frame_sender) = source_manager.get_or_create_source(&video_id);
let _ = frame_sender.send(bytes).await;
}
}
}
}
Event::Stop(_) => {
break;
}
_ => {}
}
}

server_task.abort();
Ok(())
}

+ 150
- 0
node-hub/dora-gst-webrtc-sink/src/peer_connection.rs View File

@@ -0,0 +1,150 @@
use gstreamer::prelude::*;
use gstreamer_app::AppSrc;

#[derive(Debug, Clone)]
pub struct PeerConnection {
pub _id: String,
pub pipeline: gstreamer::Pipeline,
pub webrtcbin: gstreamer::Element,
pub appsrc: AppSrc,
caps_set: std::sync::Arc<std::sync::Mutex<bool>>,
frame_count: std::sync::Arc<std::sync::Mutex<u32>>,
first_frame_time: std::sync::Arc<std::sync::Mutex<Option<std::time::Instant>>>,
detected_framerate: std::sync::Arc<std::sync::Mutex<Option<gstreamer::Fraction>>>,
}

impl PeerConnection {
pub fn new(
id: String,
pipeline: gstreamer::Pipeline,
webrtcbin: gstreamer::Element,
appsrc: AppSrc,
) -> Self {
Self {
_id: id,
pipeline,
webrtcbin,
appsrc,
caps_set: std::sync::Arc::new(std::sync::Mutex::new(false)),
frame_count: std::sync::Arc::new(std::sync::Mutex::new(0)),
first_frame_time: std::sync::Arc::new(std::sync::Mutex::new(None)),
detected_framerate: std::sync::Arc::new(std::sync::Mutex::new(None)),
}
}

pub fn send_frame(&self, frame_data: &[u8]) -> Result<(), gstreamer::FlowError> {
// Track frame timing for framerate detection
let now = std::time::Instant::now();
let mut frame_count = self.frame_count.lock().unwrap();
let mut first_frame_time = self.first_frame_time.lock().unwrap();
let mut detected_framerate = self.detected_framerate.lock().unwrap();

if first_frame_time.is_none() {
*first_frame_time = Some(now);
}
*frame_count += 1;

// Detect framerate after receiving enough frames (e.g., 30 frames)
if *frame_count >= 30 && detected_framerate.is_none() {
if let Some(start_time) = *first_frame_time {
let elapsed = now.duration_since(start_time).as_secs_f64();
if elapsed > 0.0 {
let fps = (*frame_count as f64 - 1.0) / elapsed;
let fps_rounded = fps.round() as i32;

// Common framerates: 15, 24, 25, 30, 50, 60
let common_fps = [15, 24, 25, 30, 50, 60];
let mut best_fps = 30;
let mut min_diff = i32::MAX;

for &candidate in &common_fps {
let diff = (fps_rounded - candidate).abs();
if diff < min_diff {
min_diff = diff;
best_fps = candidate;
}
}

*detected_framerate = Some(gstreamer::Fraction::new(best_fps, 1));
log::info!(
"Detected framerate: {} fps (measured: {:.2} fps)",
best_fps,
fps
);
}
}
}

// Set caps on first frame or when framerate is detected
let mut caps_set = self.caps_set.lock().unwrap();
if !*caps_set || (*caps_set && detected_framerate.is_some() && *frame_count == 30) {
// Detect resolution from frame size (RGB = 3 bytes per pixel)
let pixels = frame_data.len() / 3;

// Common resolutions
let resolutions = [
(640, 480),
(800, 600),
(1024, 768),
(1280, 720),
(1280, 960),
(1280, 1024),
(1920, 1080),
(2560, 1440),
(3840, 2160),
];

let mut width = 640;
let mut height = 480;

for (w, h) in resolutions.iter() {
if (*w as usize) * (*h as usize) == pixels {
width = *w;
height = *h;
break;
}
}

// Build caps with detected or default framerate
let mut caps_builder = gstreamer::Caps::builder("video/x-raw")
.field("format", "RGB")
.field("width", width as i32)
.field("height", height as i32);

if let Some(framerate) = &*detected_framerate {
caps_builder = caps_builder.field("framerate", framerate);
} else {
// Use a default framerate initially
caps_builder = caps_builder.field("framerate", gstreamer::Fraction::new(30, 1));
}

let caps = caps_builder.build();
self.appsrc.set_caps(Some(&caps));
*caps_set = true;

if *frame_count == 30 {
log::info!("Updated caps with detected framerate");
}
}

let mut buffer = gstreamer::Buffer::from_slice(frame_data.to_vec());

// Set the buffer timestamp
let buffer_ref = buffer.get_mut().unwrap();
let clock = gstreamer::SystemClock::obtain();
let base_time = self.pipeline.base_time();
let now = clock.time();
if let (Some(now), Some(base_time)) = (now, base_time) {
buffer_ref.set_pts(now - base_time);
}

match self.appsrc.push_buffer(buffer) {
Ok(_) => Ok(()),
Err(e) => Err(e),
}
}

pub fn shutdown(&self) {
let _ = self.pipeline.set_state(gstreamer::State::Null);
}
}

+ 21
- 0
node-hub/dora-gst-webrtc-sink/src/signaling.rs View File

@@ -0,0 +1,21 @@
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum SignalingMessage {
#[serde(rename = "offer")]
Offer { sdp: String },
#[serde(rename = "answer")]
Answer { sdp: String },
#[serde(rename = "ice")]
Ice { candidate: IceCandidate },
}

#[derive(Debug, Serialize, Deserialize)]
pub struct IceCandidate {
pub candidate: String,
#[serde(rename = "sdpMLineIndex")]
pub sdp_mline_index: u32,
#[serde(rename = "sdpMid", skip_serializing_if = "Option::is_none")]
pub sdp_mid: Option<String>,
}

+ 76
- 0
node-hub/dora-gst-webrtc-sink/src/video_source_manager.rs View File

@@ -0,0 +1,76 @@
use log::info;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc;

use crate::webrtc_server::WebRTCServer;

#[derive(Clone)]
pub struct VideoSourceManager {
sources: Arc<Mutex<HashMap<String, VideoSource>>>,
}

#[derive(Clone)]
struct VideoSource {
_video_id: String,
server: Arc<WebRTCServer>,
frame_sender: mpsc::Sender<Vec<u8>>,
}

impl VideoSourceManager {
pub fn new() -> Self {
Self {
sources: Arc::new(Mutex::new(HashMap::new())),
}
}

pub fn get_or_create_source(
&self,
video_id: &str,
) -> (Arc<WebRTCServer>, mpsc::Sender<Vec<u8>>) {
let mut sources = self.sources.lock().unwrap();

if let Some(source) = sources.get(video_id) {
(source.server.clone(), source.frame_sender.clone())
} else {
info!("Creating new video source: {}", video_id);

let (frame_sender, mut frame_receiver) = mpsc::channel::<Vec<u8>>(1);
let frame_sender_clone = frame_sender.clone();
let server = Arc::new(WebRTCServer::new(frame_sender_clone));

let server_clone = server.clone();
let video_id_clone = video_id.to_string();
tokio::spawn(async move {
while let Some(frame) = frame_receiver.recv().await {
server_clone.send_frame_to_peers(&frame);
}
info!("Frame receiver for {} stopped", video_id_clone);
});

let source = VideoSource {
_video_id: video_id.to_string(),
server: server.clone(),
frame_sender: frame_sender.clone(),
};

sources.insert(video_id.to_string(), source);

(server, frame_sender)
}
}

#[allow(dead_code)]
pub fn get_source(&self, video_id: &str) -> Option<Arc<WebRTCServer>> {
let sources = self.sources.lock().unwrap();
sources.get(video_id).map(|s| s.server.clone())
}

#[allow(dead_code)]
pub fn remove_source(&self, video_id: &str) {
let mut sources = self.sources.lock().unwrap();
if sources.remove(video_id).is_some() {
info!("Removed video source: {}", video_id);
}
}
}

+ 499
- 0
node-hub/dora-gst-webrtc-sink/src/webrtc_server.rs View File

@@ -0,0 +1,499 @@
use anyhow::Context;
use futures::stream::StreamExt;
use futures::SinkExt;
use gstreamer::prelude::*;
use gstreamer_app::AppSrc;
use gstreamer_webrtc::{WebRTCICEConnectionState, WebRTCSessionDescription};
use log::{error, info, warn};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc;
use warp::ws::{Message, WebSocket};

use crate::peer_connection::PeerConnection;
use crate::signaling::SignalingMessage;

#[derive(Debug, Clone)]
pub struct WebRTCServer {
peers: Arc<Mutex<HashMap<String, PeerConnection>>>,
_frame_sender: mpsc::Sender<Vec<u8>>,
}

impl WebRTCServer {
pub fn new(frame_sender: mpsc::Sender<Vec<u8>>) -> Self {
Self {
peers: Arc::new(Mutex::new(HashMap::new())),
_frame_sender: frame_sender,
}
}

pub async fn handle_websocket(&self, ws: WebSocket, id: String) {
let (ws_sender, mut ws_receiver) = ws.split();
let ws_sender = Arc::new(Mutex::new(ws_sender));

while let Some(result) = ws_receiver.next().await {
match result {
Ok(msg) => {
if let Ok(text) = msg.to_str() {
if let Err(e) = self
.handle_signaling_message(text, &id, ws_sender.clone())
.await
{
error!("Error handling signaling message: {}", e);
}
}
}
Err(e) => {
error!("WebSocket error: {}", e);
break;
}
}
}

self.remove_peer(&id);
info!("Client {} disconnected", id);
}

async fn handle_signaling_message(
&self,
msg: &str,
peer_id: &str,
ws_sender: Arc<Mutex<futures::stream::SplitSink<WebSocket, Message>>>,
) -> anyhow::Result<()> {
// Parse JSON with detailed error handling
let message: SignalingMessage = match serde_json::from_str(msg) {
Ok(m) => m,
Err(e) => {
// Log the raw message for debugging
error!("Failed to deserialize signaling message. Error: {}", e);
error!("Raw message: {}", msg);

// Try to parse as generic JSON to provide more context
if let Ok(json_value) = serde_json::from_str::<serde_json::Value>(msg) {
error!("Parsed JSON structure: {:#?}", json_value);

// Check the type field
if let Some(msg_type) = json_value.get("type") {
error!("Message type field: {:?}", msg_type);
} else {
error!("Message missing 'type' field");
}

// For offer/answer messages, check sdp field
if let Some(sdp) = json_value.get("sdp") {
error!(
"SDP field type: {}",
if sdp.is_string() {
"string"
} else {
"non-string"
}
);
}

// For ICE messages, check candidate field
if let Some(candidate) = json_value.get("candidate") {
error!(
"Candidate field type: {}",
if candidate.is_string() {
"string"
} else {
"non-string"
}
);
}
}

return Err(anyhow::anyhow!(
"Failed to deserialize signaling message: {}",
e
));
}
};

match message {
SignalingMessage::Offer { sdp, .. } => {
info!("Received offer from {}", peer_id);
let _answer_sdp = self.handle_offer(&sdp, peer_id, ws_sender).await?;
}
SignalingMessage::Ice { candidate } => {
self.add_ice_candidate(peer_id, &candidate.candidate, candidate.sdp_mline_index)?;
}
_ => {
warn!("Unexpected message type");
}
}

Ok(())
}

async fn handle_offer(
&self,
sdp: &str,
peer_id: &str,
ws_sender: Arc<Mutex<futures::stream::SplitSink<WebSocket, Message>>>,
) -> anyhow::Result<String> {
let pipeline = self.create_pipeline(peer_id, ws_sender.clone())?;

let webrtcbin = pipeline
.by_name("webrtc")
.context("Failed to get webrtcbin")?;

let appsrc = pipeline
.by_name("videosrc")
.and_then(|e| e.dynamic_cast::<AppSrc>().ok())
.context("Failed to get appsrc")?;

// Configure appsrc
appsrc.set_property("is-live", true);
appsrc.set_property("format", gstreamer::Format::Time);

// Set pipeline to READY state before setting remote description
pipeline.set_state(gstreamer::State::Ready)?;

// Wait for state change to complete
let (state_change_return, _, _) = pipeline.state(gstreamer::ClockTime::from_seconds(5));
if state_change_return.is_err() {
return Err(anyhow::anyhow!("Failed to set pipeline to READY state"));
}

let ret = gstreamer_sdp::SDPMessage::parse_buffer(sdp.as_bytes())
.map_err(|_| anyhow::anyhow!("Failed to parse SDP"))?;

let offer = WebRTCSessionDescription::new(gstreamer_webrtc::WebRTCSDPType::Offer, ret);

let promise = gstreamer::Promise::new();
webrtcbin.emit_by_name::<()>("set-remote-description", &[&offer, &promise]);

// Wait for set-remote-description to complete
let result = promise.wait();
match result {
gstreamer::PromiseResult::Replied => {
// Check if there was an error in the reply
if let Some(reply) = promise.get_reply() {
if let Ok(error_value) = reply.value("error") {
if let Ok(error) = error_value.get::<glib::Error>() {
error!("set-remote-description error: {}", error);
return Err(anyhow::anyhow!(
"Failed to set remote description: {}",
error
));
}
}
}
}
_ => {
error!("Failed to set remote description: {:?}", result);
return Err(anyhow::anyhow!("Failed to set remote description"));
}
}

// Additional delay to ensure state is propagated
std::thread::sleep(std::time::Duration::from_millis(100));

// Store peer first to ensure it can receive frames
let peer = PeerConnection::new(
peer_id.to_string(),
pipeline.clone(),
webrtcbin.clone(),
appsrc.clone(),
);
self.peers.lock().unwrap().insert(peer_id.to_string(), peer);

// Now set pipeline to PLAYING state before creating answer
pipeline.set_state(gstreamer::State::Playing)?;

// Wait for state change to complete
let (state_change_return, _, _) = pipeline.state(gstreamer::ClockTime::from_seconds(5));
if state_change_return.is_err() {
warn!(
"Pipeline state change to PLAYING not fully successful: {:?}",
state_change_return
);
}

// Give pipeline time to initialize
std::thread::sleep(std::time::Duration::from_millis(200));

// Connect to pad-added signal to ensure pipeline is ready
webrtcbin.connect("pad-added", false, move |values| {
let _webrtc = match values[0].get::<gstreamer::Element>() {
Ok(elem) => elem,
Err(_) => {
error!("Failed to get webrtc element from values");
return None;
}
};

let _pad = match values[1].get::<gstreamer::Pad>() {
Ok(p) => p,
Err(_) => {
error!("Failed to get pad from values");
return None;
}
};

None
});

// Use promise without change func first
let promise = gstreamer::Promise::new();

webrtcbin.emit_by_name::<()>("create-answer", &[&None::<gstreamer::Structure>, &promise]);

// Wait for promise to complete
let reply = promise.wait();

// Small delay to ensure the answer is fully processed
std::thread::sleep(std::time::Duration::from_millis(100));

// Get the answer from promise reply
match reply {
gstreamer::PromiseResult::Replied => {
if let Some(reply_struct) = promise.get_reply() {
// Check for error in reply
if let Ok(error_value) = reply_struct.value("error") {
if let Ok(error) = error_value.get::<glib::Error>() {
error!("create-answer error: {}", error);
}
}

// The answer should be in the reply
if let Ok(answer_value) =
reply_struct.get::<gstreamer_webrtc::WebRTCSessionDescription>("answer")
{
// Set local description
let set_promise = gstreamer::Promise::new();
webrtcbin.emit_by_name::<()>(
"set-local-description",
&[&answer_value, &set_promise],
);
let _ = set_promise.wait();

let answer_sdp = answer_value.sdp().to_string();

let msg = SignalingMessage::Answer { sdp: answer_sdp };

if let Ok(json) = serde_json::to_string(&msg) {
let ws_sender_clone = ws_sender.clone();
tokio::task::spawn_blocking(move || {
let rt = tokio::runtime::Handle::current();
rt.block_on(async move {
if let Ok(mut sender) = ws_sender_clone.lock() {
let _ = sender.send(Message::text(json)).await;
}
});
});
}
} else {
error!("No answer in reply structure");

// Try to get local-description as fallback
if let Some(answer) = webrtcbin.property::<Option<
gstreamer_webrtc::WebRTCSessionDescription,
>>(
"local-description"
) {
let answer_sdp = answer.sdp().to_string();

let msg = SignalingMessage::Answer { sdp: answer_sdp };

if let Ok(json) = serde_json::to_string(&msg) {
let ws_sender_clone = ws_sender.clone();
tokio::task::spawn_blocking(move || {
let rt = tokio::runtime::Handle::current();
rt.block_on(async move {
if let Ok(mut sender) = ws_sender_clone.lock() {
let _ = sender.send(Message::text(json)).await;
}
});
});
}
}
}
}
}
_ => {
error!("Promise failed or was interrupted");
}
}

Ok(String::new())
}

fn create_pipeline(
&self,
peer_id: &str,
ws_sender: Arc<Mutex<futures::stream::SplitSink<WebSocket, Message>>>,
) -> anyhow::Result<gstreamer::Pipeline> {
// Accept any resolution and framerate from input
// Initial caps will be set dynamically when first frame arrives
let pipeline_str = format!(
"appsrc name=videosrc \
is-live=true format=time do-timestamp=true ! \
videoconvert ! \
vp8enc deadline=1 ! \
rtpvp8pay ! \
application/x-rtp,media=video,encoding-name=VP8,payload=96 ! \
webrtcbin name=webrtc bundle-policy=max-bundle"
);

let pipeline = gstreamer::parse::launch(&pipeline_str)?
.dynamic_cast::<gstreamer::Pipeline>()
.map_err(|_| anyhow::anyhow!("Failed to create pipeline"))?;

// Monitor pipeline messages
let bus = pipeline.bus().unwrap();
let peer_id_for_bus = peer_id.to_string();
std::thread::spawn(move || {
for msg in bus.iter_timed(gstreamer::ClockTime::NONE) {
use gstreamer::MessageView;
match msg.view() {
MessageView::StateChanged(_) => {}
MessageView::Error(err) => {
error!(
"Pipeline {} error: {} ({})",
peer_id_for_bus,
err.error(),
err.debug().unwrap_or_default()
);
}
MessageView::Warning(warn) => {
warn!(
"Pipeline {} warning: {} ({})",
peer_id_for_bus,
warn.error(),
warn.debug().unwrap_or_default()
);
}
MessageView::Eos(_) => {
break;
}
_ => {}
}
}
});

let webrtcbin = pipeline
.by_name("webrtc")
.context("Failed to get webrtcbin")?;

webrtcbin.set_property_from_str("stun-server", "stun://stun.l.google.com:19302");

let ws_sender_clone = ws_sender.clone();
webrtcbin.connect("on-ice-candidate", false, move |values| {
// Handle potential errors in callback to avoid panic
let _webrtc = match values[0].get::<gstreamer::Element>() {
Ok(elem) => elem,
Err(_) => {
error!("Failed to get webrtc element from values");
return None;
}
};

let mline_index = match values[1].get::<u32>() {
Ok(idx) => idx,
Err(_) => {
error!("Failed to get mline_index from values");
return None;
}
};

let candidate = match values[2].get::<String>() {
Ok(cand) => cand,
Err(_) => {
error!("Failed to get candidate from values");
return None;
}
};

let msg = SignalingMessage::Ice {
candidate: crate::signaling::IceCandidate {
candidate,
sdp_mline_index: mline_index,
sdp_mid: None,
},
};

if let Ok(json) = serde_json::to_string(&msg) {
let ws_sender = ws_sender_clone.clone();

// Use a dedicated runtime for sending messages from GStreamer threads
std::thread::spawn(move || {
// Create a small runtime just for this send operation
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build();

if let Ok(rt) = rt {
rt.block_on(async move {
if let Ok(mut sender) = ws_sender.lock() {
let _ = sender.send(Message::text(json)).await;
}
});
}
});
}

None
});

let peer_id_clone = peer_id.to_string();
let _ws_sender_clone = ws_sender.clone();
webrtcbin.connect("notify::ice-connection-state", false, move |values| {
let _webrtc = match values[0].get::<gstreamer::Element>() {
Ok(elem) => elem,
Err(_) => {
error!("Failed to get webrtc element from values");
return None;
}
};

let state = _webrtc.property::<WebRTCICEConnectionState>("ice-connection-state");
info!("ICE connection state for {}: {:?}", peer_id_clone, state);

// Also check connection state
let conn_state =
_webrtc.property::<gstreamer_webrtc::WebRTCPeerConnectionState>("connection-state");
info!(
"Peer connection state for {}: {:?}",
peer_id_clone, conn_state
);
None
});

Ok(pipeline)
}

fn add_ice_candidate(
&self,
peer_id: &str,
candidate: &str,
mline_index: u32,
) -> anyhow::Result<()> {
let peers = self.peers.lock().unwrap();
if let Some(peer) = peers.get(peer_id) {
peer.webrtcbin
.emit_by_name::<()>("add-ice-candidate", &[&mline_index, &candidate]);
} else {
warn!("Peer {} not found when adding ICE candidate", peer_id);
}
Ok(())
}

fn remove_peer(&self, peer_id: &str) {
let mut peers = self.peers.lock().unwrap();
if let Some(peer) = peers.remove(peer_id) {
peer.shutdown();
}
}

pub fn send_frame_to_peers(&self, frame_data: &[u8]) {
let peers = self.peers.lock().unwrap();
for (peer_id, peer) in peers.iter() {
if let Err(e) = peer.send_frame(frame_data) {
warn!("Failed to push buffer to peer {}: {}", peer_id, e);
}
}
}
}

Loading…
Cancel
Save