diff --git a/src/Discord.Net.Core/Audio/AudioStream.cs b/src/Discord.Net.Core/Audio/AudioStream.cs index 224409f8a..d39bcc48a 100644 --- a/src/Discord.Net.Core/Audio/AudioStream.cs +++ b/src/Discord.Net.Core/Audio/AudioStream.cs @@ -11,6 +11,7 @@ namespace Discord.Audio public override bool CanSeek => false; public override bool CanWrite => false; + public virtual void WriteHeader(ushort seq, uint timestamp, bool missed) { } public override void Write(byte[] buffer, int offset, int count) { WriteAsync(buffer, offset, count, CancellationToken.None).GetAwaiter().GetResult(); diff --git a/src/Discord.Net.Core/Audio/RTPFrame.cs b/src/Discord.Net.Core/Audio/RTPFrame.cs index 5005870f4..6254b7173 100644 --- a/src/Discord.Net.Core/Audio/RTPFrame.cs +++ b/src/Discord.Net.Core/Audio/RTPFrame.cs @@ -5,12 +5,14 @@ namespace Discord.Audio public readonly ushort Sequence; public readonly uint Timestamp; public readonly byte[] Payload; + public readonly bool Missed; - public RTPFrame(ushort sequence, uint timestamp, byte[] payload) + public RTPFrame(ushort sequence, uint timestamp, byte[] payload, bool missed) { Sequence = sequence; Timestamp = timestamp; Payload = payload; + Missed = missed; } } } \ No newline at end of file diff --git a/src/Discord.Net.Core/Net/Udp/IUdpSocket.cs b/src/Discord.Net.Core/Net/Udp/IUdpSocket.cs index feb94b683..10ac652b3 100644 --- a/src/Discord.Net.Core/Net/Udp/IUdpSocket.cs +++ b/src/Discord.Net.Core/Net/Udp/IUdpSocket.cs @@ -8,6 +8,8 @@ namespace Discord.Net.Udp { event Func ReceivedDatagram; + ushort Port { get; } + void SetCancelToken(CancellationToken cancelToken); void SetDestination(string ip, int port); diff --git a/src/Discord.Net.Providers.UdpClient/UDPClient.cs b/src/Discord.Net.Providers.UdpClient/UDPClient.cs index 459feb335..dfd05cf38 100644 --- a/src/Discord.Net.Providers.UdpClient/UDPClient.cs +++ b/src/Discord.Net.Providers.UdpClient/UDPClient.cs @@ -18,6 +18,8 @@ namespace Discord.Net.Providers.UDPClient private CancellationToken _cancelToken, _parentToken; private Task _task; private bool _isDisposed; + + public ushort Port => (ushort)((_udp?.Client.LocalEndPoint as IPEndPoint)?.Port ?? 0); public UDPClient() { diff --git a/src/Discord.Net.WebSocket/Audio/AudioClient.cs b/src/Discord.Net.WebSocket/Audio/AudioClient.cs index c497b2632..ceaea01cc 100644 --- a/src/Discord.Net.WebSocket/Audio/AudioClient.cs +++ b/src/Discord.Net.WebSocket/Audio/AudioClient.cs @@ -107,6 +107,7 @@ namespace Discord.Audio { await _audioLogger.DebugAsync("Connecting ApiClient").ConfigureAwait(false); await ApiClient.ConnectAsync("wss://" + _url).ConfigureAwait(false); + await _audioLogger.DebugAsync("Listening on port " + ApiClient.UdpPort).ConfigureAwait(false); await _audioLogger.DebugAsync("Sending Identity").ConfigureAwait(false); await ApiClient.SendIdentityAsync(_userId, _sessionId, _token).ConfigureAwait(false); @@ -175,7 +176,8 @@ namespace Discord.Audio { var readerStream = new InputStream(); var opusDecoder = new OpusDecodeStream(readerStream); - var rtpReader = new RTPReadStream(readerStream, opusDecoder); + //var jitterBuffer = new JitterBuffer(opusDecoder, _audioLogger); + var rtpReader = new RTPReadStream(opusDecoder); var decryptStream = new SodiumDecryptStream(rtpReader, this); _streams.TryAdd(userId, new StreamPair(readerStream, decryptStream)); await _streamCreatedEvent.InvokeAsync(userId, readerStream); diff --git a/src/Discord.Net.WebSocket/Audio/Opus/OpusConverter.cs b/src/Discord.Net.WebSocket/Audio/Opus/OpusConverter.cs index 28581ea4e..4179ce9c9 100644 --- a/src/Discord.Net.WebSocket/Audio/Opus/OpusConverter.cs +++ b/src/Discord.Net.WebSocket/Audio/Opus/OpusConverter.cs @@ -14,7 +14,7 @@ namespace Discord.Audio public const int FrameSamplesPerChannel = SamplingRate / 1000 * FrameMillis; public const int FrameSamples = FrameSamplesPerChannel * Channels; - public const int FrameBytes = FrameSamples * SampleBytes; + public const int FrameBytes = FrameSamplesPerChannel * SampleBytes; protected bool _isDisposed = false; diff --git a/src/Discord.Net.WebSocket/Audio/Opus/OpusDecoder.cs b/src/Discord.Net.WebSocket/Audio/Opus/OpusDecoder.cs index 2c8d8036d..41c48e1ac 100644 --- a/src/Discord.Net.WebSocket/Audio/Opus/OpusDecoder.cs +++ b/src/Discord.Net.WebSocket/Audio/Opus/OpusDecoder.cs @@ -20,12 +20,12 @@ namespace Discord.Audio CheckError(error); } - public unsafe int DecodeFrame(byte[] input, int inputOffset, int inputCount, byte[] output, int outputOffset) + public unsafe int DecodeFrame(byte[] input, int inputOffset, int inputCount, byte[] output, int outputOffset, bool decodeFEC) { int result = 0; fixed (byte* inPtr = input) fixed (byte* outPtr = output) - result = Decode(_ptr, inPtr + inputOffset, inputCount, outPtr + outputOffset, FrameSamplesPerChannel, 1); + result = Decode(_ptr, inPtr + inputOffset, inputCount, outPtr + outputOffset, FrameSamplesPerChannel, decodeFEC ? 1 : 0); CheckError(result); return result * SampleBytes; } diff --git a/src/Discord.Net.WebSocket/Audio/Streams/BufferedWriteStream.cs b/src/Discord.Net.WebSocket/Audio/Streams/BufferedWriteStream.cs index 1764fa66a..e5065345f 100644 --- a/src/Discord.Net.WebSocket/Audio/Streams/BufferedWriteStream.cs +++ b/src/Discord.Net.WebSocket/Audio/Streams/BufferedWriteStream.cs @@ -35,7 +35,7 @@ namespace Discord.Audio.Streams private readonly SemaphoreSlim _queueLock; private readonly Logger _logger; private readonly int _ticksPerFrame, _queueLength; - private bool _isPreloaded; + private bool _isPreloaded, _isSpeaking; private int _silenceFrames; public BufferedWriteStream(AudioStream next, IAudioClient client, int bufferMillis, CancellationToken cancelToken, int maxFrameSize = 1500) @@ -45,7 +45,7 @@ namespace Discord.Audio.Streams //maxFrameSize = 1275 was too limiting at 128kbps,2ch,60ms _next = next; _client = client; - _ticksPerFrame = OpusEncoder.FrameSamples / 48; + _ticksPerFrame = OpusEncoder.FrameMillis; _logger = logger; _queueLength = (bufferMillis + (_ticksPerFrame - 1)) / _ticksPerFrame; //Round up @@ -60,6 +60,12 @@ namespace Discord.Audio.Streams _task = Run(); } + protected override void Dispose(bool disposing) + { + if (disposing) + _cancelTokenSource.Cancel(); + base.Dispose(disposing); + } private Task Run() { @@ -71,6 +77,8 @@ namespace Discord.Audio.Streams await Task.Delay(1).ConfigureAwait(false); long nextTick = Environment.TickCount; + ushort seq = 0; + uint timestamp = 0; while (!_cancelToken.IsCancellationRequested) { long tick = Environment.TickCount; @@ -80,14 +88,20 @@ namespace Discord.Audio.Streams Frame frame; if (_queuedFrames.TryDequeue(out frame)) { - await _client.ApiClient.SendSetSpeaking(true).ConfigureAwait(false); + if (!_isSpeaking) + { + await _client.ApiClient.SendSetSpeaking(true).ConfigureAwait(false); + _isSpeaking = true; + } + _next.WriteHeader(seq++, timestamp, false); await _next.WriteAsync(frame.Buffer, 0, frame.Bytes).ConfigureAwait(false); _bufferPool.Enqueue(frame.Buffer); _queueLock.Release(); nextTick += _ticksPerFrame; + timestamp += OpusEncoder.FrameSamplesPerChannel; _silenceFrames = 0; #if DEBUG - var _ = _logger.DebugAsync($"Sent {frame.Bytes} bytes ({_queuedFrames.Count} frames buffered)"); + var _ = _logger?.DebugAsync($"Sent {frame.Bytes} bytes ({_queuedFrames.Count} frames buffered)"); #endif } else @@ -95,13 +109,20 @@ namespace Discord.Audio.Streams while ((nextTick - tick) <= 0) { if (_silenceFrames++ < MaxSilenceFrames) + { + _next.WriteHeader(seq++, timestamp, false); await _next.WriteAsync(_silenceFrame, 0, _silenceFrame.Length).ConfigureAwait(false); - else + } + else if (_isSpeaking) + { await _client.ApiClient.SendSetSpeaking(false).ConfigureAwait(false); + _isSpeaking = false; + } nextTick += _ticksPerFrame; + timestamp += OpusEncoder.FrameSamplesPerChannel; } #if DEBUG - var _ = _logger.DebugAsync($"Buffer underrun"); + var _ = _logger?.DebugAsync($"Buffer underrun"); #endif } } @@ -125,19 +146,16 @@ namespace Discord.Audio.Streams if (!_bufferPool.TryDequeue(out buffer)) { #if DEBUG - var _ = _logger.DebugAsync($"Buffer overflow"); //Should never happen because of the queueLock + var _ = _logger?.DebugAsync($"Buffer overflow"); //Should never happen because of the queueLock #endif return; } Buffer.BlockCopy(data, offset, buffer, 0, count); _queuedFrames.Enqueue(new Frame(buffer, count)); -#if DEBUG - //var _ await _logger.DebugAsync($"Queued {count} bytes ({_queuedFrames.Count} frames buffered)"); -#endif if (!_isPreloaded && _queuedFrames.Count == _queueLength) { #if DEBUG - var _ = _logger.DebugAsync($"Preloaded"); + var _ = _logger?.DebugAsync($"Preloaded"); #endif _isPreloaded = true; } @@ -161,10 +179,5 @@ namespace Discord.Audio.Streams while (_queuedFrames.TryDequeue(out ignored)); return Task.Delay(0); } - protected override void Dispose(bool disposing) - { - if (disposing) - _cancelTokenSource.Cancel(); - } } } \ No newline at end of file diff --git a/src/Discord.Net.WebSocket/Audio/Streams/InputStream.cs b/src/Discord.Net.WebSocket/Audio/Streams/InputStream.cs index e29302fa0..a46b6d3d2 100644 --- a/src/Discord.Net.WebSocket/Audio/Streams/InputStream.cs +++ b/src/Discord.Net.WebSocket/Audio/Streams/InputStream.cs @@ -14,6 +14,7 @@ namespace Discord.Audio.Streams private SemaphoreSlim _signal; private ushort _nextSeq; private uint _nextTimestamp; + private bool _nextMissed; private bool _hasHeader; private bool _isDisposed; @@ -60,13 +61,14 @@ namespace Discord.Audio.Streams return frame; } - public void WriteHeader(ushort seq, uint timestamp) + public override void WriteHeader(ushort seq, uint timestamp, bool missed) { if (_hasHeader) throw new InvalidOperationException("Header received with no payload"); _hasHeader = true; _nextSeq = seq; _nextTimestamp = timestamp; + _nextMissed = missed; } public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancelToken) { @@ -79,16 +81,17 @@ namespace Discord.Audio.Streams } if (!_hasHeader) throw new InvalidOperationException("Received payload without an RTP header"); + _hasHeader = false; byte[] payload = new byte[count]; Buffer.BlockCopy(buffer, offset, payload, 0, count); _frames.Enqueue(new RTPFrame( sequence: _nextSeq, timestamp: _nextTimestamp, + missed: _nextMissed, payload: payload )); _signal.Release(); - _hasHeader = false; return Task.Delay(0); } diff --git a/src/Discord.Net.WebSocket/Audio/Streams/OpusDecodeStream.cs b/src/Discord.Net.WebSocket/Audio/Streams/OpusDecodeStream.cs index 96c809cca..43289c60e 100644 --- a/src/Discord.Net.WebSocket/Audio/Streams/OpusDecodeStream.cs +++ b/src/Discord.Net.WebSocket/Audio/Streams/OpusDecodeStream.cs @@ -1,4 +1,5 @@ -using System.Threading; +using System; +using System.Threading; using System.Threading.Tasks; namespace Discord.Audio.Streams @@ -11,6 +12,8 @@ namespace Discord.Audio.Streams private readonly AudioStream _next; private readonly OpusDecoder _decoder; private readonly byte[] _buffer; + private bool _nextMissed; + private bool _hasHeader; public OpusDecodeStream(AudioStream next) { @@ -19,10 +22,35 @@ namespace Discord.Audio.Streams _decoder = new OpusDecoder(); } + public override void WriteHeader(ushort seq, uint timestamp, bool missed) + { + if (_hasHeader) + throw new InvalidOperationException("Header received with no payload"); + _nextMissed = missed; + _hasHeader = true; + _next.WriteHeader(seq, timestamp, missed); + } public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { - count = _decoder.DecodeFrame(buffer, offset, count, _buffer, 0); - await _next.WriteAsync(_buffer, 0, count, cancellationToken).ConfigureAwait(false); + if (!_hasHeader) + throw new InvalidOperationException("Received payload without an RTP header"); + _hasHeader = false; + + if (!_nextMissed) + { + count = _decoder.DecodeFrame(buffer, offset, count, _buffer, 0, false); + await _next.WriteAsync(_buffer, 0, count, cancellationToken).ConfigureAwait(false); + } + else if (count > 0) + { + count = _decoder.DecodeFrame(buffer, offset, count, _buffer, 0, true); + await _next.WriteAsync(_buffer, 0, count, cancellationToken).ConfigureAwait(false); + } + else + { + count = _decoder.DecodeFrame(null, 0, 0, _buffer, 0, true); + await _next.WriteAsync(_buffer, 0, count, cancellationToken).ConfigureAwait(false); + } } public override async Task FlushAsync(CancellationToken cancelToken) diff --git a/src/Discord.Net.WebSocket/Audio/Streams/RTPReadStream.cs b/src/Discord.Net.WebSocket/Audio/Streams/RTPReadStream.cs index 292a9303a..2cedea114 100644 --- a/src/Discord.Net.WebSocket/Audio/Streams/RTPReadStream.cs +++ b/src/Discord.Net.WebSocket/Audio/Streams/RTPReadStream.cs @@ -1,5 +1,4 @@ -using System.IO; -using System.Threading; +using System.Threading; using System.Threading.Tasks; namespace Discord.Audio.Streams @@ -7,7 +6,6 @@ namespace Discord.Audio.Streams /// Reads the payload from an RTP frame public class RTPReadStream : AudioOutStream { - private readonly InputStream _queue; private readonly AudioStream _next; private readonly byte[] _buffer, _nonce; @@ -15,11 +13,8 @@ namespace Discord.Audio.Streams public override bool CanSeek => false; public override bool CanWrite => true; - public RTPReadStream(InputStream queue, int bufferSize = 4000) - : this(queue, null, bufferSize) { } - public RTPReadStream(InputStream queue, AudioStream next, int bufferSize = 4000) + public RTPReadStream(AudioStream next, int bufferSize = 4000) { - _queue = queue; _next = next; _buffer = new byte[bufferSize]; _nonce = new byte[24]; @@ -36,11 +31,11 @@ namespace Discord.Audio.Streams uint timestamp = (uint)((buffer[offset + 4] << 24) | (buffer[offset + 5] << 16) | - (buffer[offset + 6] << 16) | + (buffer[offset + 6] << 8) | (buffer[offset + 7] << 0)); - _queue.WriteHeader(seq, timestamp); - await (_next ?? _queue as Stream).WriteAsync(buffer, offset + headerSize, count - headerSize, cancelToken).ConfigureAwait(false); + _next.WriteHeader(seq, timestamp, false); + await _next.WriteAsync(buffer, offset + headerSize, count - headerSize, cancelToken).ConfigureAwait(false); } public static bool TryReadSsrc(byte[] buffer, int offset, out uint ssrc) @@ -58,7 +53,7 @@ namespace Discord.Audio.Streams ssrc = (uint)((buffer[offset + 8] << 24) | (buffer[offset + 9] << 16) | - (buffer[offset + 10] << 16) | + (buffer[offset + 10] << 8) | (buffer[offset + 11] << 0)); return true; } diff --git a/src/Discord.Net.WebSocket/Audio/Streams/RTPWriteStream.cs b/src/Discord.Net.WebSocket/Audio/Streams/RTPWriteStream.cs index 40d6f21f5..78f895381 100644 --- a/src/Discord.Net.WebSocket/Audio/Streams/RTPWriteStream.cs +++ b/src/Discord.Net.WebSocket/Audio/Streams/RTPWriteStream.cs @@ -10,7 +10,10 @@ namespace Discord.Audio.Streams private readonly AudioStream _next; private readonly byte[] _header; protected readonly byte[] _buffer; - private uint _ssrc, _timestamp = 0; + private uint _ssrc; + private ushort _nextSeq; + private uint _nextTimestamp; + private bool _hasHeader; public RTPWriteStream(AudioStream next, uint ssrc, int bufferSize = 4000) { @@ -26,20 +29,30 @@ namespace Discord.Audio.Streams _header[11] = (byte)(_ssrc >> 0); } + public override void WriteHeader(ushort seq, uint timestamp, bool missed) + { + if (_hasHeader) + throw new InvalidOperationException("Header received with no payload"); + _hasHeader = true; + _nextSeq = seq; + _nextTimestamp = timestamp; + } public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { cancellationToken.ThrowIfCancellationRequested(); + if (!_hasHeader) + throw new InvalidOperationException("Received payload without an RTP header"); + _hasHeader = false; + unchecked { - if (_header[3]++ == byte.MaxValue) - _header[2]++; - - _timestamp += (uint)OpusEncoder.FrameSamples; - _header[4] = (byte)(_timestamp >> 24); - _header[5] = (byte)(_timestamp >> 16); - _header[6] = (byte)(_timestamp >> 8); - _header[7] = (byte)(_timestamp >> 0); + _header[2] = (byte)(_nextSeq >> 8); + _header[3] = (byte)(_nextSeq >> 0); + _header[4] = (byte)(_nextTimestamp >> 24); + _header[5] = (byte)(_nextTimestamp >> 16); + _header[6] = (byte)(_nextTimestamp >> 8); + _header[7] = (byte)(_nextTimestamp >> 0); } Buffer.BlockCopy(_header, 0, _buffer, 0, 12); //Copy RTP header from to the buffer Buffer.BlockCopy(buffer, offset, _buffer, 12, count); diff --git a/src/Discord.Net.WebSocket/DiscordVoiceApiClient.cs b/src/Discord.Net.WebSocket/DiscordVoiceApiClient.cs index 05671d71d..25dc2cf7b 100644 --- a/src/Discord.Net.WebSocket/DiscordVoiceApiClient.cs +++ b/src/Discord.Net.WebSocket/DiscordVoiceApiClient.cs @@ -48,6 +48,8 @@ namespace Discord.Audio internal IWebSocketClient WebSocketClient { get; } public ConnectionState ConnectionState { get; private set; } + public ushort UdpPort => _udp.Port; + internal DiscordVoiceAPIClient(ulong guildId, WebSocketProvider webSocketProvider, UdpSocketProvider udpSocketProvider, JsonSerializer serializer = null) { GuildId = guildId; diff --git a/src/Discord.Net.WebSocket/Net/DefaultUdpSocket.cs b/src/Discord.Net.WebSocket/Net/DefaultUdpSocket.cs index 3366250cc..013ba62fc 100644 --- a/src/Discord.Net.WebSocket/Net/DefaultUdpSocket.cs +++ b/src/Discord.Net.WebSocket/Net/DefaultUdpSocket.cs @@ -18,6 +18,8 @@ namespace Discord.Net.Udp private CancellationToken _cancelToken, _parentToken; private Task _task; private bool _isDisposed; + + public ushort Port => (ushort)((_udp?.Client.LocalEndPoint as IPEndPoint)?.Port ?? 0); public DefaultUdpSocket() {