diff --git a/src/Discord.Net.Core/Audio/AudioInStream.cs b/src/Discord.Net.Core/Audio/AudioInStream.cs index 4023f9c86..a6b5c5e6b 100644 --- a/src/Discord.Net.Core/Audio/AudioInStream.cs +++ b/src/Discord.Net.Core/Audio/AudioInStream.cs @@ -1,6 +1,7 @@ using System; using System.IO; using System.Threading; +using System.Threading.Tasks; namespace Discord.Audio { @@ -10,6 +11,16 @@ namespace Discord.Audio public override bool CanSeek => false; public override bool CanWrite => true; + public abstract Task ReadFrameAsync(CancellationToken cancelToken); + + public RTPFrame? ReadFrame() + { + return ReadFrameAsync(CancellationToken.None).GetAwaiter().GetResult(); + } + public override int Read(byte[] buffer, int offset, int count) + { + return ReadAsync(buffer, offset, count, CancellationToken.None).GetAwaiter().GetResult(); + } public override void Write(byte[] buffer, int offset, int count) { WriteAsync(buffer, offset, count, CancellationToken.None).GetAwaiter().GetResult(); diff --git a/src/Discord.Net.Core/Audio/RTPFrame.cs b/src/Discord.Net.Core/Audio/RTPFrame.cs new file mode 100644 index 000000000..5005870f4 --- /dev/null +++ b/src/Discord.Net.Core/Audio/RTPFrame.cs @@ -0,0 +1,16 @@ +namespace Discord.Audio +{ + public struct RTPFrame + { + public readonly ushort Sequence; + public readonly uint Timestamp; + public readonly byte[] Payload; + + public RTPFrame(ushort sequence, uint timestamp, byte[] payload) + { + Sequence = sequence; + Timestamp = timestamp; + Payload = payload; + } + } +} \ No newline at end of file diff --git a/src/Discord.Net.WebSocket/Audio/Opus/OpusDecoder.cs b/src/Discord.Net.WebSocket/Audio/Opus/OpusDecoder.cs index ea9376f82..b2ecf5987 100644 --- a/src/Discord.Net.WebSocket/Audio/Opus/OpusDecoder.cs +++ b/src/Discord.Net.WebSocket/Audio/Opus/OpusDecoder.cs @@ -11,6 +11,8 @@ namespace Discord.Audio private static extern void DestroyDecoder(IntPtr decoder); [DllImport("opus", EntryPoint = "opus_decode", CallingConvention = CallingConvention.Cdecl)] private static extern int Decode(IntPtr st, byte* data, int len, byte* pcm, int max_frame_size, int decode_fec); + [DllImport("opus", EntryPoint = "opus_decoder_ctl", CallingConvention = CallingConvention.Cdecl)] + private static extern int DecoderCtl(IntPtr st, OpusCtl request, int value); public OpusDecoder(int samplingRate, int channels) : base(samplingRate, channels) diff --git a/src/Discord.Net.WebSocket/Audio/Streams/BufferedWriteStream.cs b/src/Discord.Net.WebSocket/Audio/Streams/BufferedWriteStream.cs index f12417d4e..0eccd1a4c 100644 --- a/src/Discord.Net.WebSocket/Audio/Streams/BufferedWriteStream.cs +++ b/src/Discord.Net.WebSocket/Audio/Streams/BufferedWriteStream.cs @@ -34,6 +34,8 @@ namespace Discord.Audio.Streams private readonly int _ticksPerFrame, _queueLength; private bool _isPreloaded; + public BufferedWriteStream(AudioOutStream next, int samplesPerFrame, int bufferMillis, CancellationToken cancelToken, int maxFrameSize = 1500) + : this(next, samplesPerFrame, bufferMillis, cancelToken, null, maxFrameSize) { } internal BufferedWriteStream(AudioOutStream next, int samplesPerFrame, int bufferMillis, CancellationToken cancelToken, Logger logger, int maxFrameSize = 1500) { //maxFrameSize = 1275 was too limiting at 128*1024 @@ -55,7 +57,9 @@ namespace Discord.Audio.Streams private Task Run() { +#if DEBUG uint num = 0; +#endif return Task.Run(async () => { try @@ -92,7 +96,7 @@ namespace Discord.Audio.Streams } } else - await Task.Delay((int)(dist - (limit - 1))).ConfigureAwait(false); + await Task.Delay((int)(dist - (limit - 1))/*, _cancelToken*/).ConfigureAwait(false); } } catch (OperationCanceledException) { } diff --git a/src/Discord.Net.WebSocket/Audio/Streams/InputStream.cs b/src/Discord.Net.WebSocket/Audio/Streams/InputStream.cs new file mode 100644 index 000000000..d46db128b --- /dev/null +++ b/src/Discord.Net.WebSocket/Audio/Streams/InputStream.cs @@ -0,0 +1,73 @@ +using System; +using System.Collections.Concurrent; +using System.Threading; +using System.Threading.Tasks; + +namespace Discord.Audio.Streams +{ + /// Reads the payload from an RTP frame + public class InputStream : AudioInStream + { + private ConcurrentQueue _frames; + private ushort _nextSeq; + private uint _nextTimestamp; + private bool _hasHeader; + + public override bool CanRead => true; + public override bool CanSeek => false; + public override bool CanWrite => true; + + public InputStream(byte[] secretKey) + { + _frames = new ConcurrentQueue(); + } + + public override Task ReadFrameAsync(CancellationToken cancelToken) + { + if (_frames.TryDequeue(out var frame)) + return Task.FromResult(frame); + return Task.FromResult(null); + } + public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancelToken) + { + cancelToken.ThrowIfCancellationRequested(); + + if (_frames.TryDequeue(out var frame)) + { + if (count < frame.Payload.Length) + throw new InvalidOperationException("Buffer is too small."); + Buffer.BlockCopy(frame.Payload, 0, buffer, offset, frame.Payload.Length); + return Task.FromResult(frame.Payload.Length); + } + return Task.FromResult(0); + } + + public void WriteHeader(ushort seq, uint timestamp) + { + if (_hasHeader) + throw new InvalidOperationException("Header received with no payload"); + _hasHeader = true; + _nextSeq = seq; + _nextTimestamp = timestamp; + } + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancelToken) + { + cancelToken.ThrowIfCancellationRequested(); + + if (_frames.Count > 1000) + return Task.Delay(0); //Buffer overloaded + if (_hasHeader) + throw new InvalidOperationException("Received payload with an RTP header"); + byte[] payload = new byte[count]; + Buffer.BlockCopy(buffer, offset, payload, 0, count); + + _frames.Enqueue(new RTPFrame( + sequence: _nextSeq, + timestamp: _nextTimestamp, + payload: payload + )); + _hasHeader = false; + return Task.Delay(0); + } + } +} diff --git a/src/Discord.Net.WebSocket/Audio/Streams/OpusDecodeStream.cs b/src/Discord.Net.WebSocket/Audio/Streams/OpusDecodeStream.cs index c700a7f15..9df553bfe 100644 --- a/src/Discord.Net.WebSocket/Audio/Streams/OpusDecodeStream.cs +++ b/src/Discord.Net.WebSocket/Audio/Streams/OpusDecodeStream.cs @@ -1,34 +1,35 @@ -using System; -using System.Collections.Concurrent; +using System.Threading; +using System.Threading.Tasks; namespace Discord.Audio.Streams { /// Converts Opus to PCM - public class OpusDecodeStream : AudioInStream + public class OpusDecodeStream : AudioOutStream { - private readonly BlockingCollection _queuedData; //TODO: Replace with max-length ring buffer + private readonly AudioOutStream _next; private readonly byte[] _buffer; private readonly OpusDecoder _decoder; - internal OpusDecodeStream(AudioClient audioClient, int samplingRate, int channels = OpusConverter.MaxChannels, int bufferSize = 4000) + public OpusDecodeStream(AudioOutStream next, int samplingRate, int channels = OpusConverter.MaxChannels, int bufferSize = 4000) { + _next = next; _buffer = new byte[bufferSize]; _decoder = new OpusDecoder(samplingRate, channels); - _queuedData = new BlockingCollection(100); } - public override int Read(byte[] buffer, int offset, int count) + public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { - var queuedData = _queuedData.Take(); - Buffer.BlockCopy(queuedData, 0, buffer, offset, Math.Min(queuedData.Length, count)); - return queuedData.Length; + count = _decoder.DecodeFrame(buffer, offset, count, _buffer, 0); + await _next.WriteAsync(_buffer, 0, count, cancellationToken).ConfigureAwait(false); } - public override void Write(byte[] buffer, int offset, int count) + + public override async Task FlushAsync(CancellationToken cancelToken) { - count = _decoder.DecodeFrame(buffer, offset, count, _buffer, 0); - var newBuffer = new byte[count]; - Buffer.BlockCopy(_buffer, 0, newBuffer, 0, count); - _queuedData.Add(newBuffer); + await _next.FlushAsync(cancelToken).ConfigureAwait(false); + } + public override async Task ClearAsync(CancellationToken cancelToken) + { + await _next.ClearAsync(cancelToken).ConfigureAwait(false); } protected override void Dispose(bool disposing) diff --git a/src/Discord.Net.WebSocket/Audio/Streams/OpusEncodeStream.cs b/src/Discord.Net.WebSocket/Audio/Streams/OpusEncodeStream.cs index 01747fc05..ada8311fe 100644 --- a/src/Discord.Net.WebSocket/Audio/Streams/OpusEncodeStream.cs +++ b/src/Discord.Net.WebSocket/Audio/Streams/OpusEncodeStream.cs @@ -17,7 +17,7 @@ namespace Discord.Audio.Streams private byte[] _partialFrameBuffer; private int _partialFramePos; - internal OpusEncodeStream(AudioOutStream next, int channels, int samplesPerFrame, int bitrate, AudioApplication application, int bufferSize = 4000) + public OpusEncodeStream(AudioOutStream next, int channels, int samplesPerFrame, int bitrate, AudioApplication application, int bufferSize = 4000) { _next = next; _encoder = new OpusEncoder(SampleRate, channels, bitrate, application); @@ -26,10 +26,6 @@ namespace Discord.Audio.Streams _partialFrameBuffer = new byte[_frameSize]; } - public override void Write(byte[] buffer, int offset, int count) - { - WriteAsync(buffer, offset, count, CancellationToken.None).GetAwaiter().GetResult(); - } public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { //Assume threadsafe diff --git a/src/Discord.Net.WebSocket/Audio/Streams/RTPReadStream.cs b/src/Discord.Net.WebSocket/Audio/Streams/RTPReadStream.cs index 0cc7a1529..9a57612bf 100644 --- a/src/Discord.Net.WebSocket/Audio/Streams/RTPReadStream.cs +++ b/src/Discord.Net.WebSocket/Audio/Streams/RTPReadStream.cs @@ -1,59 +1,49 @@ using System; -using System.Collections.Concurrent; using System.IO; +using System.Threading; +using System.Threading.Tasks; namespace Discord.Audio.Streams { /// Reads the payload from an RTP frame - public class RTPReadStream : AudioInStream + public class RTPReadStream : AudioOutStream { - private readonly BlockingCollection _queuedData; //TODO: Replace with max-length ring buffer - //private readonly BlockingCollection _queuedData; //TODO: Replace with max-length ring buffer - private readonly AudioClient _audioClient; + private readonly InputStream _queue; + private readonly AudioOutStream _next; private readonly byte[] _buffer, _nonce, _secretKey; public override bool CanRead => true; public override bool CanSeek => false; public override bool CanWrite => true; - internal RTPReadStream(AudioClient audioClient, byte[] secretKey, int bufferSize = 4000) + public RTPReadStream(InputStream queue, byte[] secretKey, int bufferSize = 4000) + : this(queue, null, secretKey, bufferSize) { } + public RTPReadStream(InputStream queue, AudioOutStream next, byte[] secretKey, int bufferSize = 4000) { - _audioClient = audioClient; + _queue = queue; + _next = next; _secretKey = secretKey; _buffer = new byte[bufferSize]; - _queuedData = new BlockingCollection(100); _nonce = new byte[24]; } - /*public RTPFrame ReadFrame() - { - var queuedData = _queuedData.Take(); - Buffer.BlockCopy(queuedData, 0, buffer, offset, Math.Min(queuedData.Length, count)); - return queuedData.Length; - }*/ - public override int Read(byte[] buffer, int offset, int count) + public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancelToken) { - var queuedData = _queuedData.Take(); - Buffer.BlockCopy(queuedData, 0, buffer, offset, Math.Min(queuedData.Length, count)); - return queuedData.Length; - } - public override void Write(byte[] buffer, int offset, int count) - { - var newBuffer = new byte[count]; - Buffer.BlockCopy(buffer, 0, newBuffer, 0, count); - _queuedData.Add(newBuffer); - } + cancelToken.ThrowIfCancellationRequested(); - public override void Flush() { throw new NotSupportedException(); } + var payload = new byte[count - 12]; + Buffer.BlockCopy(buffer, offset + 12, payload, 0, count - 12); - public override long Length { get { throw new NotSupportedException(); } } - public override long Position - { - get { throw new NotSupportedException(); } - set { throw new NotSupportedException(); } - } + ushort seq = (ushort)((buffer[offset + 3] << 8) | + (buffer[offset + 2] << 0)); + + uint timestamp = (uint)((buffer[offset + 4] << 24) | + (buffer[offset + 5] << 16) | + (buffer[offset + 6] << 16) | + (buffer[offset + 7] << 0)); - public override void SetLength(long value) { throw new NotSupportedException(); } - public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); } + _queue.WriteHeader(seq, timestamp); + await (_next ?? _queue as Stream).WriteAsync(buffer, offset, count, cancelToken).ConfigureAwait(false); + } } } diff --git a/src/Discord.Net.WebSocket/Audio/Streams/RTPWriteStream.cs b/src/Discord.Net.WebSocket/Audio/Streams/RTPWriteStream.cs index 5b8877f8e..836cb4852 100644 --- a/src/Discord.Net.WebSocket/Audio/Streams/RTPWriteStream.cs +++ b/src/Discord.Net.WebSocket/Audio/Streams/RTPWriteStream.cs @@ -14,7 +14,7 @@ namespace Discord.Audio.Streams protected readonly byte[] _buffer; - internal RTPWriteStream(AudioOutStream next, int samplesPerFrame, uint ssrc, int bufferSize = 4000) + public RTPWriteStream(AudioOutStream next, int samplesPerFrame, uint ssrc, int bufferSize = 4000) { _next = next; _samplesPerFrame = samplesPerFrame; @@ -29,13 +29,10 @@ namespace Discord.Audio.Streams _header[11] = (byte)(_ssrc >> 0); } - public override void Write(byte[] buffer, int offset, int count) - { - WriteAsync(buffer, offset, count, CancellationToken.None).GetAwaiter().GetResult(); - } public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { cancellationToken.ThrowIfCancellationRequested(); + unchecked { if (_header[3]++ == byte.MaxValue) diff --git a/src/Discord.Net.WebSocket/Audio/Streams/SodiumDecryptStream.cs b/src/Discord.Net.WebSocket/Audio/Streams/SodiumDecryptStream.cs index fa2c0ec0a..f1421d28b 100644 --- a/src/Discord.Net.WebSocket/Audio/Streams/SodiumDecryptStream.cs +++ b/src/Discord.Net.WebSocket/Audio/Streams/SodiumDecryptStream.cs @@ -1,42 +1,46 @@ using System; -using System.Collections.Concurrent; +using System.Threading; +using System.Threading.Tasks; namespace Discord.Audio.Streams { /// Decrypts an RTP frame using libsodium - public class SodiumDecryptStream : AudioInStream + public class SodiumDecryptStream : AudioOutStream { - private readonly BlockingCollection _queuedData; //TODO: Replace with max-length ring buffer - private readonly AudioClient _audioClient; + private readonly AudioOutStream _next; private readonly byte[] _buffer, _nonce, _secretKey; public override bool CanRead => true; public override bool CanSeek => false; public override bool CanWrite => true; - internal SodiumDecryptStream(AudioClient audioClient, byte[] secretKey, int bufferSize = 4000) + public SodiumDecryptStream(AudioOutStream next, byte[] secretKey, int bufferSize = 4000) { - _audioClient = audioClient; + _next = next; _secretKey = secretKey; _buffer = new byte[bufferSize]; - _queuedData = new BlockingCollection(100); _nonce = new byte[24]; } - public override int Read(byte[] buffer, int offset, int count) - { - var queuedData = _queuedData.Take(); - Buffer.BlockCopy(queuedData, 0, buffer, offset, Math.Min(queuedData.Length, count)); - return queuedData.Length; - } - public override void Write(byte[] buffer, int offset, int count) + public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancelToken) { + cancelToken.ThrowIfCancellationRequested(); + Buffer.BlockCopy(buffer, 0, _nonce, 0, 12); //Copy RTP header to nonce count = SecretBox.Decrypt(buffer, offset, count, _buffer, 0, _nonce, _secretKey); var newBuffer = new byte[count]; Buffer.BlockCopy(_buffer, 0, newBuffer, 0, count); - _queuedData.Add(newBuffer); + await _next.WriteAsync(buffer, 0, count + 12, cancelToken).ConfigureAwait(false); + } + + public override async Task FlushAsync(CancellationToken cancelToken) + { + await _next.FlushAsync(cancelToken).ConfigureAwait(false); + } + public override async Task ClearAsync(CancellationToken cancelToken) + { + await _next.ClearAsync(cancelToken).ConfigureAwait(false); } } } diff --git a/src/Discord.Net.WebSocket/Audio/Streams/SodiumEncryptStream.cs b/src/Discord.Net.WebSocket/Audio/Streams/SodiumEncryptStream.cs index ef2fc1b27..90bc35e9d 100644 --- a/src/Discord.Net.WebSocket/Audio/Streams/SodiumEncryptStream.cs +++ b/src/Discord.Net.WebSocket/Audio/Streams/SodiumEncryptStream.cs @@ -12,7 +12,7 @@ namespace Discord.Audio.Streams //protected readonly byte[] _buffer; - internal SodiumEncryptStream(AudioOutStream next, byte[] secretKey/*, int bufferSize = 4000*/) + public SodiumEncryptStream(AudioOutStream next, byte[] secretKey/*, int bufferSize = 4000*/) { _next = next; _secretKey = secretKey; @@ -20,17 +20,13 @@ namespace Discord.Audio.Streams _nonce = new byte[24]; } - public override void Write(byte[] buffer, int offset, int count) + public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancelToken) { - WriteAsync(buffer, offset, count, CancellationToken.None).GetAwaiter().GetResult(); - } - public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) - { - cancellationToken.ThrowIfCancellationRequested(); + cancelToken.ThrowIfCancellationRequested(); Buffer.BlockCopy(buffer, offset, _nonce, 0, 12); //Copy nonce from RTP header count = SecretBox.Encrypt(buffer, offset + 12, count - 12, buffer, 12, _nonce, _secretKey); - await _next.WriteAsync(buffer, 0, count + 12, cancellationToken).ConfigureAwait(false); + await _next.WriteAsync(buffer, 0, count + 12, cancelToken).ConfigureAwait(false); } public override async Task FlushAsync(CancellationToken cancelToken)