diff --git a/src/Discord.Net.Core/Audio/AudioStream.cs b/src/Discord.Net.Core/Audio/AudioStream.cs index d39bcc48a..97820ea73 100644 --- a/src/Discord.Net.Core/Audio/AudioStream.cs +++ b/src/Discord.Net.Core/Audio/AudioStream.cs @@ -11,7 +11,10 @@ namespace Discord.Audio public override bool CanSeek => false; public override bool CanWrite => false; - public virtual void WriteHeader(ushort seq, uint timestamp, bool missed) { } + public virtual void WriteHeader(ushort seq, uint timestamp, bool missed) + { + throw new InvalidOperationException("This stream does not accept headers"); + } public override void Write(byte[] buffer, int offset, int count) { WriteAsync(buffer, offset, count, CancellationToken.None).GetAwaiter().GetResult(); diff --git a/src/Discord.Net.WebSocket/Audio/AudioClient.cs b/src/Discord.Net.WebSocket/Audio/AudioClient.cs index 19639a418..0ca45a557 100644 --- a/src/Discord.Net.WebSocket/Audio/AudioClient.cs +++ b/src/Discord.Net.WebSocket/Audio/AudioClient.cs @@ -142,31 +142,31 @@ namespace Discord.Audio public AudioOutStream CreateOpusStream(int bufferMillis) { - var outputStream = new OutputStream(ApiClient); - var sodiumEncrypter = new SodiumEncryptStream( outputStream, this); - var rtpWriter = new RTPWriteStream(sodiumEncrypter, _ssrc); - return new BufferedWriteStream(rtpWriter, this, bufferMillis, _connection.CancelToken, _audioLogger); + var outputStream = new OutputStream(ApiClient); //Ignores header + var sodiumEncrypter = new SodiumEncryptStream( outputStream, this); //Passes header + var rtpWriter = new RTPWriteStream(sodiumEncrypter, _ssrc); //Consumes header, passes + return new BufferedWriteStream(rtpWriter, this, bufferMillis, _connection.CancelToken, _audioLogger); //Generates header } public AudioOutStream CreateDirectOpusStream() { - var outputStream = new OutputStream(ApiClient); - var sodiumEncrypter = new SodiumEncryptStream(outputStream, this); - return new RTPWriteStream(sodiumEncrypter, _ssrc); + var outputStream = new OutputStream(ApiClient); //Ignores header + var sodiumEncrypter = new SodiumEncryptStream(outputStream, this); //Passes header + return new RTPWriteStream(sodiumEncrypter, _ssrc); //Consumes header (external input), passes } public AudioOutStream CreatePCMStream(AudioApplication application, int? bitrate, int bufferMillis) { - var outputStream = new OutputStream(ApiClient); - var sodiumEncrypter = new SodiumEncryptStream(outputStream, this); - var rtpWriter = new RTPWriteStream(sodiumEncrypter, _ssrc); - var bufferedStream = new BufferedWriteStream(rtpWriter, this, bufferMillis, _connection.CancelToken, _audioLogger); - return new OpusEncodeStream(bufferedStream, bitrate ?? (96 * 1024), application); + var outputStream = new OutputStream(ApiClient); //Ignores header + var sodiumEncrypter = new SodiumEncryptStream(outputStream, this); //Passes header + var rtpWriter = new RTPWriteStream(sodiumEncrypter, _ssrc); //Consumes header, passes + var bufferedStream = new BufferedWriteStream(rtpWriter, this, bufferMillis, _connection.CancelToken, _audioLogger); //Ignores header, generates header + return new OpusEncodeStream(bufferedStream, bitrate ?? (96 * 1024), application); //Generates header } public AudioOutStream CreateDirectPCMStream(AudioApplication application, int? bitrate) { - var outputStream = new OutputStream(ApiClient); - var sodiumEncrypter = new SodiumEncryptStream(outputStream, this); - var rtpWriter = new RTPWriteStream(sodiumEncrypter, _ssrc); - return new OpusEncodeStream(rtpWriter, bitrate ?? (96 * 1024), application); + var outputStream = new OutputStream(ApiClient); //Ignores header + var sodiumEncrypter = new SodiumEncryptStream(outputStream, this); //Passes header + var rtpWriter = new RTPWriteStream(sodiumEncrypter, _ssrc); //Consumes header, passes + return new OpusEncodeStream(rtpWriter, bitrate ?? (96 * 1024), application); //Generates header } internal async Task CreateInputStreamAsync(ulong userId) @@ -174,11 +174,11 @@ namespace Discord.Audio //Assume Thread-safe if (!_streams.ContainsKey(userId)) { - var readerStream = new InputStream(); - var opusDecoder = new OpusDecodeStream(readerStream); + var readerStream = new InputStream(); //Consumes header + var opusDecoder = new OpusDecodeStream(readerStream); //Passes header //var jitterBuffer = new JitterBuffer(opusDecoder, _audioLogger); - var rtpReader = new RTPReadStream(opusDecoder); - var decryptStream = new SodiumDecryptStream(rtpReader, this); + var rtpReader = new RTPReadStream(opusDecoder); //Generates header + var decryptStream = new SodiumDecryptStream(rtpReader, this); //No header _streams.TryAdd(userId, new StreamPair(readerStream, decryptStream)); await _streamCreatedEvent.InvokeAsync(userId, readerStream); } diff --git a/src/Discord.Net.WebSocket/Audio/Streams/BufferedWriteStream.cs b/src/Discord.Net.WebSocket/Audio/Streams/BufferedWriteStream.cs index 29586389c..fb302f132 100644 --- a/src/Discord.Net.WebSocket/Audio/Streams/BufferedWriteStream.cs +++ b/src/Discord.Net.WebSocket/Audio/Streams/BufferedWriteStream.cs @@ -88,11 +88,12 @@ namespace Discord.Audio.Streams if (_queuedFrames.TryDequeue(out Frame frame)) { await _client.SetSpeakingAsync(true).ConfigureAwait(false); - _next.WriteHeader(seq++, timestamp, false); + _next.WriteHeader(seq, timestamp, false); await _next.WriteAsync(frame.Buffer, 0, frame.Bytes).ConfigureAwait(false); _bufferPool.Enqueue(frame.Buffer); _queueLock.Release(); nextTick += _ticksPerFrame; + seq++; timestamp += OpusEncoder.FrameSamplesPerChannel; _silenceFrames = 0; #if DEBUG @@ -105,12 +106,13 @@ namespace Discord.Audio.Streams { if (_silenceFrames++ < MaxSilenceFrames) { - _next.WriteHeader(seq++, timestamp, false); + _next.WriteHeader(seq, timestamp, false); await _next.WriteAsync(_silenceFrame, 0, _silenceFrame.Length).ConfigureAwait(false); } else await _client.SetSpeakingAsync(false).ConfigureAwait(false); nextTick += _ticksPerFrame; + seq++; timestamp += OpusEncoder.FrameSamplesPerChannel; } #if DEBUG @@ -126,6 +128,7 @@ namespace Discord.Audio.Streams }); } + public override void WriteHeader(ushort seq, uint timestamp, bool missed) { } //Ignore, we use our own timing public override async Task WriteAsync(byte[] data, int offset, int count, CancellationToken cancelToken) { if (cancelToken.CanBeCanceled) diff --git a/src/Discord.Net.WebSocket/Audio/Streams/JitterBuffer.cs b/src/Discord.Net.WebSocket/Audio/Streams/JitterBuffer.cs index a5ecdea6f..10f842a9d 100644 --- a/src/Discord.Net.WebSocket/Audio/Streams/JitterBuffer.cs +++ b/src/Discord.Net.WebSocket/Audio/Streams/JitterBuffer.cs @@ -1,4 +1,4 @@ -using Discord.Logging; +/*using Discord.Logging; using System; using System.Collections.Concurrent; using System.Threading; @@ -243,4 +243,4 @@ namespace Discord.Audio.Streams return Task.Delay(0); } } -} \ No newline at end of file +}*/ \ No newline at end of file diff --git a/src/Discord.Net.WebSocket/Audio/Streams/OpusDecodeStream.cs b/src/Discord.Net.WebSocket/Audio/Streams/OpusDecodeStream.cs index 43289c60e..58c4f4c70 100644 --- a/src/Discord.Net.WebSocket/Audio/Streams/OpusDecodeStream.cs +++ b/src/Discord.Net.WebSocket/Audio/Streams/OpusDecodeStream.cs @@ -25,12 +25,13 @@ namespace Discord.Audio.Streams public override void WriteHeader(ushort seq, uint timestamp, bool missed) { if (_hasHeader) - throw new InvalidOperationException("Header received with no payload"); - _nextMissed = missed; + throw new InvalidOperationException("Header received with no payload"); _hasHeader = true; + + _nextMissed = missed; _next.WriteHeader(seq, timestamp, missed); } - public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancelToken) { if (!_hasHeader) throw new InvalidOperationException("Received payload without an RTP header"); @@ -39,17 +40,17 @@ namespace Discord.Audio.Streams if (!_nextMissed) { count = _decoder.DecodeFrame(buffer, offset, count, _buffer, 0, false); - await _next.WriteAsync(_buffer, 0, count, cancellationToken).ConfigureAwait(false); + await _next.WriteAsync(_buffer, 0, count, cancelToken).ConfigureAwait(false); } else if (count > 0) { - count = _decoder.DecodeFrame(buffer, offset, count, _buffer, 0, true); - await _next.WriteAsync(_buffer, 0, count, cancellationToken).ConfigureAwait(false); + count = _decoder.DecodeFrame(buffer, offset, count, _buffer, 0, true); + await _next.WriteAsync(_buffer, 0, count, cancelToken).ConfigureAwait(false); } else { - count = _decoder.DecodeFrame(null, 0, 0, _buffer, 0, true); - await _next.WriteAsync(_buffer, 0, count, cancellationToken).ConfigureAwait(false); + count = _decoder.DecodeFrame(null, 0, 0, _buffer, 0, true); + await _next.WriteAsync(_buffer, 0, count, cancelToken).ConfigureAwait(false); } } diff --git a/src/Discord.Net.WebSocket/Audio/Streams/OpusEncodeStream.cs b/src/Discord.Net.WebSocket/Audio/Streams/OpusEncodeStream.cs index 2a3c03a47..a7779a84c 100644 --- a/src/Discord.Net.WebSocket/Audio/Streams/OpusEncodeStream.cs +++ b/src/Discord.Net.WebSocket/Audio/Streams/OpusEncodeStream.cs @@ -13,6 +13,8 @@ namespace Discord.Audio.Streams private readonly OpusEncoder _encoder; private readonly byte[] _buffer; private int _partialFramePos; + private ushort _seq; + private uint _timestamp; public OpusEncodeStream(AudioStream next, int bitrate, AudioApplication application) { @@ -21,7 +23,7 @@ namespace Discord.Audio.Streams _buffer = new byte[OpusConverter.FrameBytes]; } - public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancelToken) { //Assume threadsafe while (count > 0) @@ -30,10 +32,13 @@ namespace Discord.Audio.Streams { //We have enough data and no partial frames. Pass the buffer directly to the encoder int encFrameSize = _encoder.EncodeFrame(buffer, offset, _buffer, 0); - await _next.WriteAsync(_buffer, 0, encFrameSize, cancellationToken).ConfigureAwait(false); + _next.WriteHeader(_seq, _timestamp, false); + await _next.WriteAsync(_buffer, 0, encFrameSize, cancelToken).ConfigureAwait(false); offset += OpusConverter.FrameBytes; count -= OpusConverter.FrameBytes; + _seq++; + _timestamp += OpusConverter.FrameBytes; } else if (_partialFramePos + count >= OpusConverter.FrameBytes) { @@ -41,11 +46,14 @@ namespace Discord.Audio.Streams int partialSize = OpusConverter.FrameBytes - _partialFramePos; Buffer.BlockCopy(buffer, offset, _buffer, _partialFramePos, partialSize); int encFrameSize = _encoder.EncodeFrame(_buffer, 0, _buffer, 0); - await _next.WriteAsync(_buffer, 0, encFrameSize, cancellationToken).ConfigureAwait(false); + _next.WriteHeader(_seq, _timestamp, false); + await _next.WriteAsync(_buffer, 0, encFrameSize, cancelToken).ConfigureAwait(false); offset += partialSize; count -= partialSize; _partialFramePos = 0; + _seq++; + _timestamp += OpusConverter.FrameBytes; } else { @@ -57,8 +65,8 @@ namespace Discord.Audio.Streams } } - /* - public override async Task FlushAsync(CancellationToken cancellationToken) + /* //Opus throws memory errors on bad frames + public override async Task FlushAsync(CancellationToken cancelToken) { try { @@ -67,7 +75,7 @@ namespace Discord.Audio.Streams } catch (Exception) { } //Incomplete frame _partialFramePos = 0; - await base.FlushAsync(cancellationToken).ConfigureAwait(false); + await base.FlushAsync(cancelToken).ConfigureAwait(false); }*/ public override async Task FlushAsync(CancellationToken cancelToken) diff --git a/src/Discord.Net.WebSocket/Audio/Streams/OutputStream.cs b/src/Discord.Net.WebSocket/Audio/Streams/OutputStream.cs index 6238e93b4..cba4e3cb6 100644 --- a/src/Discord.Net.WebSocket/Audio/Streams/OutputStream.cs +++ b/src/Discord.Net.WebSocket/Audio/Streams/OutputStream.cs @@ -13,7 +13,8 @@ namespace Discord.Audio.Streams { _client = client; } - + + public override void WriteHeader(ushort seq, uint timestamp, bool missed) { } //Ignore public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancelToken) { cancelToken.ThrowIfCancellationRequested(); diff --git a/src/Discord.Net.WebSocket/Audio/Streams/RTPWriteStream.cs b/src/Discord.Net.WebSocket/Audio/Streams/RTPWriteStream.cs index 78f895381..ce407eada 100644 --- a/src/Discord.Net.WebSocket/Audio/Streams/RTPWriteStream.cs +++ b/src/Discord.Net.WebSocket/Audio/Streams/RTPWriteStream.cs @@ -33,14 +33,14 @@ namespace Discord.Audio.Streams { if (_hasHeader) throw new InvalidOperationException("Header received with no payload"); + _hasHeader = true; _nextSeq = seq; _nextTimestamp = timestamp; } - public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancelToken) { - cancellationToken.ThrowIfCancellationRequested(); - + cancelToken.ThrowIfCancellationRequested(); if (!_hasHeader) throw new InvalidOperationException("Received payload without an RTP header"); _hasHeader = false; @@ -57,6 +57,7 @@ namespace Discord.Audio.Streams Buffer.BlockCopy(_header, 0, _buffer, 0, 12); //Copy RTP header from to the buffer Buffer.BlockCopy(buffer, offset, _buffer, 12, count); + _next.WriteHeader(_nextSeq, _nextTimestamp, false); await _next.WriteAsync(_buffer, 0, count + 12).ConfigureAwait(false); } diff --git a/src/Discord.Net.WebSocket/Audio/Streams/SodiumEncryptStream.cs b/src/Discord.Net.WebSocket/Audio/Streams/SodiumEncryptStream.cs index b00a7f403..2e7a7e276 100644 --- a/src/Discord.Net.WebSocket/Audio/Streams/SodiumEncryptStream.cs +++ b/src/Discord.Net.WebSocket/Audio/Streams/SodiumEncryptStream.cs @@ -10,6 +10,9 @@ namespace Discord.Audio.Streams private readonly AudioClient _client; private readonly AudioStream _next; private readonly byte[] _nonce; + private bool _hasHeader; + private ushort _nextSeq; + private uint _nextTimestamp; public SodiumEncryptStream(AudioStream next, IAudioClient client) { @@ -17,16 +20,28 @@ namespace Discord.Audio.Streams _client = (AudioClient)client; _nonce = new byte[24]; } + + public override void WriteHeader(ushort seq, uint timestamp, bool missed) + { + if (_hasHeader) + throw new InvalidOperationException("Header received with no payload"); + _nextSeq = seq; + _nextTimestamp = timestamp; + } public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancelToken) { cancelToken.ThrowIfCancellationRequested(); + if (!_hasHeader) + throw new InvalidOperationException("Received payload without an RTP header"); + _hasHeader = false; if (_client.SecretKey == null) return; Buffer.BlockCopy(buffer, offset, _nonce, 0, 12); //Copy nonce from RTP header count = SecretBox.Encrypt(buffer, offset + 12, count - 12, buffer, 12, _nonce, _client.SecretKey); + _next.WriteHeader(_nextSeq, _nextTimestamp, false); await _next.WriteAsync(buffer, 0, count + 12, cancelToken).ConfigureAwait(false); }