From 8e0c65498b3b2225e98c6ff613f44a93da66aa45 Mon Sep 17 00:00:00 2001 From: RogueException Date: Sun, 26 Feb 2017 10:57:28 -0400 Subject: [PATCH] Cleaned up audio code --- .../Audio/AudioApplication.cs | 9 + src/Discord.Net.Core/Audio/AudioInStream.cs | 24 ++- src/Discord.Net.Core/Audio/AudioOutStream.cs | 31 +++- src/Discord.Net.Core/Audio/IAudioClient.cs | 4 +- .../Audio/AudioClient.cs | 31 ++-- .../Audio/Opus/OpusApplication.cs | 2 +- .../Audio/Opus/OpusCtl.cs | 10 +- .../Audio/Opus/OpusEncoder.cs | 70 +++++--- .../Audio/Opus/OpusSignal.cs | 9 + .../Audio/Streams/BufferedWriteStream.cs | 156 ++++++++++++++++++ .../Audio/Streams/OpusDecodeStream.cs | 24 ++- .../Audio/Streams/OpusEncodeStream.cs | 39 +++-- .../Audio/Streams/OutputStream.cs | 23 +++ .../Audio/Streams/RTPReadStream.cs | 16 +- .../Audio/Streams/RTPWriteStream.cs | 75 +++------ .../Audio/Streams/SodiumDecryptStream.cs | 42 +++++ .../Audio/Streams/SodiumEncryptStream.cs | 45 +++++ .../Audio/Targets/BufferedAudioTarget.cs | 119 ------------- .../Audio/Targets/DirectAudioTarget.cs | 22 --- .../Audio/Targets/IAudioTarget.cs | 12 -- .../DiscordVoiceApiClient.cs | 8 +- 21 files changed, 495 insertions(+), 276 deletions(-) create mode 100644 src/Discord.Net.Core/Audio/AudioApplication.cs create mode 100644 src/Discord.Net.WebSocket/Audio/Opus/OpusSignal.cs create mode 100644 src/Discord.Net.WebSocket/Audio/Streams/BufferedWriteStream.cs create mode 100644 src/Discord.Net.WebSocket/Audio/Streams/OutputStream.cs create mode 100644 src/Discord.Net.WebSocket/Audio/Streams/SodiumDecryptStream.cs create mode 100644 src/Discord.Net.WebSocket/Audio/Streams/SodiumEncryptStream.cs delete mode 100644 src/Discord.Net.WebSocket/Audio/Targets/BufferedAudioTarget.cs delete mode 100644 src/Discord.Net.WebSocket/Audio/Targets/DirectAudioTarget.cs delete mode 100644 src/Discord.Net.WebSocket/Audio/Targets/IAudioTarget.cs diff --git a/src/Discord.Net.Core/Audio/AudioApplication.cs b/src/Discord.Net.Core/Audio/AudioApplication.cs new file mode 100644 index 000000000..276d934b2 --- /dev/null +++ b/src/Discord.Net.Core/Audio/AudioApplication.cs @@ -0,0 +1,9 @@ +namespace Discord.Audio +{ + public enum AudioApplication : int + { + Voice, + Music, + Mixed + } +} \ No newline at end of file diff --git a/src/Discord.Net.Core/Audio/AudioInStream.cs b/src/Discord.Net.Core/Audio/AudioInStream.cs index e6c9c4b04..4023f9c86 100644 --- a/src/Discord.Net.Core/Audio/AudioInStream.cs +++ b/src/Discord.Net.Core/Audio/AudioInStream.cs @@ -1,8 +1,30 @@ -using System.IO; +using System; +using System.IO; +using System.Threading; namespace Discord.Audio { public abstract class AudioInStream : Stream { + public override bool CanRead => true; + public override bool CanSeek => false; + public override bool CanWrite => true; + + public override void Write(byte[] buffer, int offset, int count) + { + WriteAsync(buffer, offset, count, CancellationToken.None).GetAwaiter().GetResult(); + } + + public override void Flush() { throw new NotSupportedException(); } + + public override long Length { get { throw new NotSupportedException(); } } + public override long Position + { + get { throw new NotSupportedException(); } + set { throw new NotSupportedException(); } + } + + public override void SetLength(long value) { throw new NotSupportedException(); } + public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); } } } diff --git a/src/Discord.Net.Core/Audio/AudioOutStream.cs b/src/Discord.Net.Core/Audio/AudioOutStream.cs index dd91b71ee..2b4b012ee 100644 --- a/src/Discord.Net.Core/Audio/AudioOutStream.cs +++ b/src/Discord.Net.Core/Audio/AudioOutStream.cs @@ -1,4 +1,5 @@ -using System.IO; +using System; +using System.IO; using System.Threading; using System.Threading.Tasks; @@ -10,7 +11,31 @@ namespace Discord.Audio public override bool CanSeek => false; public override bool CanWrite => true; - public virtual void Clear() { } - public virtual Task ClearAsync(CancellationToken cancelToken) { return Task.Delay(0); } + public override void Write(byte[] buffer, int offset, int count) + { + WriteAsync(buffer, offset, count, CancellationToken.None).GetAwaiter().GetResult(); + } + public override void Flush() + { + FlushAsync(CancellationToken.None).GetAwaiter().GetResult(); + } + public void Clear() + { + ClearAsync(CancellationToken.None).GetAwaiter().GetResult(); + } + + public virtual Task ClearAsync(CancellationToken cancellationToken) { return Task.Delay(0); } + //public virtual Task WriteSilenceAsync(CancellationToken cancellationToken) { return Task.Delay(0); } + + public override long Length { get { throw new NotSupportedException(); } } + public override long Position + { + get { throw new NotSupportedException(); } + set { throw new NotSupportedException(); } + } + + public override int Read(byte[] buffer, int offset, int count) { throw new NotSupportedException(); } + public override void SetLength(long value) { throw new NotSupportedException(); } + public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); } } } diff --git a/src/Discord.Net.Core/Audio/IAudioClient.cs b/src/Discord.Net.Core/Audio/IAudioClient.cs index 4a6ae2e27..bea44fcf4 100644 --- a/src/Discord.Net.Core/Audio/IAudioClient.cs +++ b/src/Discord.Net.Core/Audio/IAudioClient.cs @@ -34,13 +34,13 @@ namespace Discord.Audio /// Samples per frame. Must be 120, 240, 480, 960, 1920 or 2880, representing 2.5, 5, 10, 20, 40 or 60 milliseconds respectively. /// /// - AudioOutStream CreatePCMStream(int samplesPerFrame, int channels = 2, int? bitrate = null, int bufferMillis = 1000); + AudioOutStream CreatePCMStream(AudioApplication application, int samplesPerFrame, int channels = 2, int? bitrate = null, int bufferMillis = 1000); /// /// Creates a new direct outgoing stream accepting PCM (raw) data. This is a direct stream with no internal timer. /// /// Samples per frame. Must be 120, 240, 480, 960, 1920 or 2880, representing 2.5, 5, 10, 20, 40 or 60 milliseconds respectively. /// /// - AudioOutStream CreateDirectPCMStream(int samplesPerFrame, int channels = 2, int? bitrate = null); + AudioOutStream CreateDirectPCMStream(AudioApplication application, int samplesPerFrame, int channels = 2, int? bitrate = null); } } diff --git a/src/Discord.Net.WebSocket/Audio/AudioClient.cs b/src/Discord.Net.WebSocket/Audio/AudioClient.cs index 30073baeb..7645604df 100644 --- a/src/Discord.Net.WebSocket/Audio/AudioClient.cs +++ b/src/Discord.Net.WebSocket/Audio/AudioClient.cs @@ -1,4 +1,5 @@ using Discord.API.Voice; +using Discord.Audio.Streams; using Discord.Logging; using Discord.Net.Converters; using Discord.WebSocket; @@ -80,7 +81,7 @@ namespace Discord.Audio { _audioLogger.WarningAsync(e.ErrorContext.Error).GetAwaiter().GetResult(); e.ErrorContext.Handled = true; - }; + }; LatencyUpdated += async (old, val) => await _audioLogger.VerboseAsync($"Latency = {val} ms").ConfigureAwait(false); } @@ -129,26 +130,34 @@ namespace Discord.Audio public AudioOutStream CreateOpusStream(int samplesPerFrame, int bufferMillis) { CheckSamplesPerFrame(samplesPerFrame); - var target = new BufferedAudioTarget(ApiClient, samplesPerFrame, bufferMillis, _connection.CancelToken); - return new RTPWriteStream(target, _secretKey, samplesPerFrame, _ssrc); + var outputStream = new OutputStream(ApiClient); + var sodiumEncrypter = new SodiumEncryptStream(outputStream, _secretKey); + var rtpWriter = new RTPWriteStream(sodiumEncrypter, samplesPerFrame, _ssrc); + return new BufferedWriteStream(rtpWriter, samplesPerFrame, bufferMillis, _connection.CancelToken, _audioLogger); } public AudioOutStream CreateDirectOpusStream(int samplesPerFrame) { CheckSamplesPerFrame(samplesPerFrame); - var target = new DirectAudioTarget(ApiClient); - return new RTPWriteStream(target, _secretKey, samplesPerFrame, _ssrc); + var outputStream = new OutputStream(ApiClient); + var sodiumEncrypter = new SodiumEncryptStream(outputStream, _secretKey); + return new RTPWriteStream(sodiumEncrypter, samplesPerFrame, _ssrc); } - public AudioOutStream CreatePCMStream(int samplesPerFrame, int channels, int? bitrate, int bufferMillis) + public AudioOutStream CreatePCMStream(AudioApplication application, int samplesPerFrame, int channels, int? bitrate, int bufferMillis) { CheckSamplesPerFrame(samplesPerFrame); - var target = new BufferedAudioTarget(ApiClient, samplesPerFrame, bufferMillis, _connection.CancelToken); - return new OpusEncodeStream(target, _secretKey, channels, samplesPerFrame, _ssrc, bitrate); + var outputStream = new OutputStream(ApiClient); + var sodiumEncrypter = new SodiumEncryptStream(outputStream, _secretKey); + var rtpWriter = new RTPWriteStream(sodiumEncrypter, samplesPerFrame, _ssrc); + var bufferedStream = new BufferedWriteStream(rtpWriter, samplesPerFrame, bufferMillis, _connection.CancelToken, _audioLogger); + return new OpusEncodeStream(bufferedStream, channels, samplesPerFrame, bitrate ?? (96 * 1024), application); } - public AudioOutStream CreateDirectPCMStream(int samplesPerFrame, int channels, int? bitrate) + public AudioOutStream CreateDirectPCMStream(AudioApplication application, int samplesPerFrame, int channels, int? bitrate) { CheckSamplesPerFrame(samplesPerFrame); - var target = new DirectAudioTarget(ApiClient); - return new OpusEncodeStream(target, _secretKey, channels, samplesPerFrame, _ssrc, bitrate); + var outputStream = new OutputStream(ApiClient); + var sodiumEncrypter = new SodiumEncryptStream(outputStream, _secretKey); + var rtpWriter = new RTPWriteStream(sodiumEncrypter, samplesPerFrame, _ssrc); + return new OpusEncodeStream(rtpWriter, channels, samplesPerFrame, bitrate ?? (96 * 1024), application); } private void CheckSamplesPerFrame(int samplesPerFrame) { diff --git a/src/Discord.Net.WebSocket/Audio/Opus/OpusApplication.cs b/src/Discord.Net.WebSocket/Audio/Opus/OpusApplication.cs index d6a3ce0cf..e288bb626 100644 --- a/src/Discord.Net.WebSocket/Audio/Opus/OpusApplication.cs +++ b/src/Discord.Net.WebSocket/Audio/Opus/OpusApplication.cs @@ -1,6 +1,6 @@ namespace Discord.Audio { - public enum OpusApplication : int + internal enum OpusApplication : int { Voice = 2048, MusicOrMixed = 2049, diff --git a/src/Discord.Net.WebSocket/Audio/Opus/OpusCtl.cs b/src/Discord.Net.WebSocket/Audio/Opus/OpusCtl.cs index e71213ae6..0b6a4e37f 100644 --- a/src/Discord.Net.WebSocket/Audio/Opus/OpusCtl.cs +++ b/src/Discord.Net.WebSocket/Audio/Opus/OpusCtl.cs @@ -1,10 +1,12 @@ namespace Discord.Audio { + //https://github.com/gcp/opus/blob/master/include/opus_defines.h internal enum OpusCtl : int { - SetBitrateRequest = 4002, - GetBitrateRequest = 4003, - SetInbandFECRequest = 4012, - GetInbandFECRequest = 4013 + SetBitrate = 4002, + SetBandwidth = 4008, + SetInbandFEC = 4012, + SetPacketLossPercent = 4014, + SetSignal = 4024 } } diff --git a/src/Discord.Net.WebSocket/Audio/Opus/OpusEncoder.cs b/src/Discord.Net.WebSocket/Audio/Opus/OpusEncoder.cs index 2cb3949a9..ca87c5fc6 100644 --- a/src/Discord.Net.WebSocket/Audio/Opus/OpusEncoder.cs +++ b/src/Discord.Net.WebSocket/Audio/Opus/OpusEncoder.cs @@ -15,17 +15,62 @@ namespace Discord.Audio private static extern int EncoderCtl(IntPtr st, OpusCtl request, int value); /// Gets the coding mode of the encoder. - public OpusApplication Application { get; } + public AudioApplication Application { get; } + public int BitRate { get;} - public OpusEncoder(int samplingRate, int channels, OpusApplication application = OpusApplication.MusicOrMixed) + public OpusEncoder(int samplingRate, int channels, int bitrate, AudioApplication application) : base(samplingRate, channels) { + if (bitrate < 1 || bitrate > DiscordVoiceAPIClient.MaxBitrate) + throw new ArgumentOutOfRangeException(nameof(bitrate)); + Application = application; + BitRate = bitrate; + + OpusApplication opusApplication; + OpusSignal opusSignal; + switch (application) + { + case AudioApplication.Mixed: + opusApplication = OpusApplication.MusicOrMixed; + opusSignal = OpusSignal.Auto; + break; + case AudioApplication.Music: + opusApplication = OpusApplication.MusicOrMixed; + opusSignal = OpusSignal.Music; + break; + case AudioApplication.Voice: + opusApplication = OpusApplication.Voice; + opusSignal = OpusSignal.Voice; + break; + default: + throw new ArgumentOutOfRangeException(nameof(application)); + } OpusError error; - _ptr = CreateEncoder(samplingRate, channels, (int)application, out error); + _ptr = CreateEncoder(samplingRate, channels, (int)opusApplication, out error); if (error != OpusError.OK) throw new Exception($"Opus Error: {error}"); + + var result = EncoderCtl(_ptr, OpusCtl.SetSignal, (int)opusSignal); + if (result < 0) + throw new Exception($"Opus Error: {(OpusError)result}"); + + result = EncoderCtl(_ptr, OpusCtl.SetPacketLossPercent, 5); //%% + if (result < 0) + throw new Exception($"Opus Error: {(OpusError)result}"); + + result = EncoderCtl(_ptr, OpusCtl.SetInbandFEC, 1); //True + if (result < 0) + throw new Exception($"Opus Error: {(OpusError)result}"); + + result = EncoderCtl(_ptr, OpusCtl.SetBitrate, bitrate); + if (result < 0) + throw new Exception($"Opus Error: {(OpusError)result}"); + + /*result = EncoderCtl(_ptr, OpusCtl.SetBandwidth, 1105); + if (result < 0) + throw new Exception($"Opus Error: {(OpusError)result}");*/ } /// Produces Opus encoded audio from PCM samples. @@ -44,25 +89,6 @@ namespace Discord.Audio return result; } - /// Gets or sets whether Forward Error Correction is enabled. - public void SetForwardErrorCorrection(bool value) - { - var result = EncoderCtl(_ptr, OpusCtl.SetInbandFECRequest, value ? 1 : 0); - if (result < 0) - throw new Exception($"Opus Error: {(OpusError)result}"); - } - - /// Gets or sets the encoder's bitrate. - public void SetBitrate(int value) - { - if (value < 1 || value > DiscordVoiceAPIClient.MaxBitrate) - throw new ArgumentOutOfRangeException(nameof(value)); - - var result = EncoderCtl(_ptr, OpusCtl.SetBitrateRequest, value); - if (result < 0) - throw new Exception($"Opus Error: {(OpusError)result}"); - } - protected override void Dispose(bool disposing) { if (_ptr != IntPtr.Zero) diff --git a/src/Discord.Net.WebSocket/Audio/Opus/OpusSignal.cs b/src/Discord.Net.WebSocket/Audio/Opus/OpusSignal.cs new file mode 100644 index 000000000..3f95183f4 --- /dev/null +++ b/src/Discord.Net.WebSocket/Audio/Opus/OpusSignal.cs @@ -0,0 +1,9 @@ +namespace Discord.Audio +{ + internal enum OpusSignal : int + { + Auto = -1000, + Voice = 3001, + Music = 3002, + } +} diff --git a/src/Discord.Net.WebSocket/Audio/Streams/BufferedWriteStream.cs b/src/Discord.Net.WebSocket/Audio/Streams/BufferedWriteStream.cs new file mode 100644 index 000000000..f12417d4e --- /dev/null +++ b/src/Discord.Net.WebSocket/Audio/Streams/BufferedWriteStream.cs @@ -0,0 +1,156 @@ +using Discord.Logging; +using System; +using System.Collections.Concurrent; +using System.Threading; +using System.Threading.Tasks; + +namespace Discord.Audio.Streams +{ + /// Wraps another stream with a timed buffer. + public class BufferedWriteStream : AudioOutStream + { + private struct Frame + { + public Frame(byte[] buffer, int bytes) + { + Buffer = buffer; + Bytes = bytes; + } + + public readonly byte[] Buffer; + public readonly int Bytes; + } + + private static readonly byte[] _silenceFrame = new byte[0]; + + private readonly AudioOutStream _next; + private readonly CancellationTokenSource _cancelTokenSource; + private readonly CancellationToken _cancelToken; + private readonly Task _task; + private readonly ConcurrentQueue _queuedFrames; + private readonly ConcurrentQueue _bufferPool; + private readonly SemaphoreSlim _queueLock; + private readonly Logger _logger; + private readonly int _ticksPerFrame, _queueLength; + private bool _isPreloaded; + + internal BufferedWriteStream(AudioOutStream next, int samplesPerFrame, int bufferMillis, CancellationToken cancelToken, Logger logger, int maxFrameSize = 1500) + { + //maxFrameSize = 1275 was too limiting at 128*1024 + _next = next; + _ticksPerFrame = samplesPerFrame / 48; + _logger = logger; + _queueLength = (bufferMillis + (_ticksPerFrame - 1)) / _ticksPerFrame; //Round up + + _cancelTokenSource = new CancellationTokenSource(); + _cancelToken = CancellationTokenSource.CreateLinkedTokenSource(_cancelTokenSource.Token, cancelToken).Token; + _queuedFrames = new ConcurrentQueue(); + _bufferPool = new ConcurrentQueue(); + for (int i = 0; i < _queueLength; i++) + _bufferPool.Enqueue(new byte[maxFrameSize]); + _queueLock = new SemaphoreSlim(_queueLength, _queueLength); + + _task = Run(); + } + + private Task Run() + { + uint num = 0; + return Task.Run(async () => + { + try + { + while (!_isPreloaded && !_cancelToken.IsCancellationRequested) + await Task.Delay(1).ConfigureAwait(false); + + long nextTick = Environment.TickCount; + while (!_cancelToken.IsCancellationRequested) + { + const int limit = 1; + long tick = Environment.TickCount; + long dist = nextTick - tick; + if (dist <= limit) + { + Frame frame; + if (_queuedFrames.TryDequeue(out frame)) + { + await _next.WriteAsync(frame.Buffer, 0, frame.Bytes).ConfigureAwait(false); + _bufferPool.Enqueue(frame.Buffer); + _queueLock.Release(); + nextTick += _ticksPerFrame; +#if DEBUG + var _ = _logger.DebugAsync($"{num++}: Sent {frame.Bytes} bytes ({_queuedFrames.Count} frames buffered)"); +#endif + } + else if (dist == 0) + { + await _next.WriteAsync(_silenceFrame, 0, _silenceFrame.Length).ConfigureAwait(false); + nextTick += _ticksPerFrame; +#if DEBUG + var _ = _logger.DebugAsync($"{num++}: Buffer underrun"); +#endif + } + } + else + await Task.Delay((int)(dist - (limit - 1))).ConfigureAwait(false); + } + } + catch (OperationCanceledException) { } + }); + } + + public override async Task WriteAsync(byte[] data, int offset, int count, CancellationToken cancelToken) + { + if (cancelToken.CanBeCanceled) + cancelToken = CancellationTokenSource.CreateLinkedTokenSource(cancelToken, _cancelToken).Token; + else + cancelToken = _cancelToken; + + await _queueLock.WaitAsync(-1, cancelToken).ConfigureAwait(false); + byte[] buffer; + if (!_bufferPool.TryDequeue(out buffer)) + { +#if DEBUG + var _ = _logger.DebugAsync($"Buffer overflow"); //Should never happen because of the queueLock +#endif + return; + } + Buffer.BlockCopy(data, offset, buffer, 0, count); + _queuedFrames.Enqueue(new Frame(buffer, count)); +#if DEBUG + //var _ await _logger.DebugAsync($"Queued {count} bytes ({_queuedFrames.Count} frames buffered)"); +#endif + if (!_isPreloaded && _queuedFrames.Count == _queueLength) + { +#if DEBUG + var _ = _logger.DebugAsync($"Preloaded"); +#endif + _isPreloaded = true; + } + } + + public override async Task FlushAsync(CancellationToken cancelToken) + { + while (true) + { + cancelToken.ThrowIfCancellationRequested(); + if (_queuedFrames.Count == 0) + return; + await Task.Delay(250, cancelToken).ConfigureAwait(false); + } + } + public override Task ClearAsync(CancellationToken cancelToken) + { + Frame ignored; + do + cancelToken.ThrowIfCancellationRequested(); + while (_queuedFrames.TryDequeue(out ignored)); + return Task.Delay(0); + } + protected override void Dispose(bool disposing) + { + if (disposing) + _cancelTokenSource.Cancel(); + } + } +} \ No newline at end of file diff --git a/src/Discord.Net.WebSocket/Audio/Streams/OpusDecodeStream.cs b/src/Discord.Net.WebSocket/Audio/Streams/OpusDecodeStream.cs index 3a650eeaf..c700a7f15 100644 --- a/src/Discord.Net.WebSocket/Audio/Streams/OpusDecodeStream.cs +++ b/src/Discord.Net.WebSocket/Audio/Streams/OpusDecodeStream.cs @@ -1,22 +1,34 @@ -namespace Discord.Audio +using System; +using System.Collections.Concurrent; + +namespace Discord.Audio.Streams { - internal class OpusDecodeStream : RTPReadStream + /// Converts Opus to PCM + public class OpusDecodeStream : AudioInStream { + private readonly BlockingCollection _queuedData; //TODO: Replace with max-length ring buffer private readonly byte[] _buffer; private readonly OpusDecoder _decoder; - internal OpusDecodeStream(AudioClient audioClient, byte[] secretKey, int samplingRate, - int channels = OpusConverter.MaxChannels, int bufferSize = 4000) - : base(audioClient, secretKey) + internal OpusDecodeStream(AudioClient audioClient, int samplingRate, int channels = OpusConverter.MaxChannels, int bufferSize = 4000) { _buffer = new byte[bufferSize]; _decoder = new OpusDecoder(samplingRate, channels); + _queuedData = new BlockingCollection(100); } public override int Read(byte[] buffer, int offset, int count) + { + var queuedData = _queuedData.Take(); + Buffer.BlockCopy(queuedData, 0, buffer, offset, Math.Min(queuedData.Length, count)); + return queuedData.Length; + } + public override void Write(byte[] buffer, int offset, int count) { count = _decoder.DecodeFrame(buffer, offset, count, _buffer, 0); - return base.Read(_buffer, 0, count); + var newBuffer = new byte[count]; + Buffer.BlockCopy(_buffer, 0, newBuffer, 0, count); + _queuedData.Add(newBuffer); } protected override void Dispose(bool disposing) diff --git a/src/Discord.Net.WebSocket/Audio/Streams/OpusEncodeStream.cs b/src/Discord.Net.WebSocket/Audio/Streams/OpusEncodeStream.cs index 570c4e73c..01747fc05 100644 --- a/src/Discord.Net.WebSocket/Audio/Streams/OpusEncodeStream.cs +++ b/src/Discord.Net.WebSocket/Audio/Streams/OpusEncodeStream.cs @@ -2,27 +2,28 @@ using System.Threading; using System.Threading.Tasks; -namespace Discord.Audio +namespace Discord.Audio.Streams { - internal class OpusEncodeStream : RTPWriteStream + /// Converts PCM to Opus + public class OpusEncodeStream : AudioOutStream { public const int SampleRate = 48000; + + private readonly AudioOutStream _next; + private readonly OpusEncoder _encoder; + private readonly byte[] _buffer; + private int _frameSize; private byte[] _partialFrameBuffer; private int _partialFramePos; - private readonly OpusEncoder _encoder; - - internal OpusEncodeStream(IAudioTarget target, byte[] secretKey, int channels, int samplesPerFrame, uint ssrc, int? bitrate = null) - : base(target, secretKey, samplesPerFrame, ssrc) + internal OpusEncodeStream(AudioOutStream next, int channels, int samplesPerFrame, int bitrate, AudioApplication application, int bufferSize = 4000) { - _encoder = new OpusEncoder(SampleRate, channels); + _next = next; + _encoder = new OpusEncoder(SampleRate, channels, bitrate, application); _frameSize = samplesPerFrame * channels * 2; + _buffer = new byte[bufferSize]; _partialFrameBuffer = new byte[_frameSize]; - - _encoder.SetForwardErrorCorrection(true); - if (bitrate != null) - _encoder.SetBitrate(bitrate.Value); } public override void Write(byte[] buffer, int offset, int count) @@ -43,7 +44,7 @@ namespace Discord.Audio _partialFramePos = 0; int encFrameSize = _encoder.EncodeFrame(_partialFrameBuffer, 0, _frameSize, _buffer, 0); - await base.WriteAsync(_buffer, 0, encFrameSize, cancellationToken).ConfigureAwait(false); + await _next.WriteAsync(_buffer, 0, encFrameSize, cancellationToken).ConfigureAwait(false); } else { @@ -54,10 +55,7 @@ namespace Discord.Audio } } - /*public override void Flush() - { - FlushAsync(CancellationToken.None).GetAwaiter().GetResult(); - } + /* public override async Task FlushAsync(CancellationToken cancellationToken) { try @@ -70,6 +68,15 @@ namespace Discord.Audio await base.FlushAsync(cancellationToken).ConfigureAwait(false); }*/ + public override async Task FlushAsync(CancellationToken cancelToken) + { + await _next.FlushAsync(cancelToken).ConfigureAwait(false); + } + public override async Task ClearAsync(CancellationToken cancelToken) + { + await _next.ClearAsync(cancelToken).ConfigureAwait(false); + } + protected override void Dispose(bool disposing) { base.Dispose(disposing); diff --git a/src/Discord.Net.WebSocket/Audio/Streams/OutputStream.cs b/src/Discord.Net.WebSocket/Audio/Streams/OutputStream.cs new file mode 100644 index 000000000..6238e93b4 --- /dev/null +++ b/src/Discord.Net.WebSocket/Audio/Streams/OutputStream.cs @@ -0,0 +1,23 @@ +using System.Threading; +using System.Threading.Tasks; + +namespace Discord.Audio.Streams +{ + /// Wraps an IAudioClient, sending voice data on write. + public class OutputStream : AudioOutStream + { + private readonly DiscordVoiceAPIClient _client; + public OutputStream(IAudioClient client) + : this((client as AudioClient).ApiClient) { } + internal OutputStream(DiscordVoiceAPIClient client) + { + _client = client; + } + + public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancelToken) + { + cancelToken.ThrowIfCancellationRequested(); + await _client.SendAsync(buffer, offset, count).ConfigureAwait(false); + } + } +} \ No newline at end of file diff --git a/src/Discord.Net.WebSocket/Audio/Streams/RTPReadStream.cs b/src/Discord.Net.WebSocket/Audio/Streams/RTPReadStream.cs index cfc804abe..0cc7a1529 100644 --- a/src/Discord.Net.WebSocket/Audio/Streams/RTPReadStream.cs +++ b/src/Discord.Net.WebSocket/Audio/Streams/RTPReadStream.cs @@ -2,11 +2,13 @@ using System.Collections.Concurrent; using System.IO; -namespace Discord.Audio +namespace Discord.Audio.Streams { - internal class RTPReadStream : Stream + /// Reads the payload from an RTP frame + public class RTPReadStream : AudioInStream { private readonly BlockingCollection _queuedData; //TODO: Replace with max-length ring buffer + //private readonly BlockingCollection _queuedData; //TODO: Replace with max-length ring buffer private readonly AudioClient _audioClient; private readonly byte[] _buffer, _nonce, _secretKey; @@ -23,6 +25,12 @@ namespace Discord.Audio _nonce = new byte[24]; } + /*public RTPFrame ReadFrame() + { + var queuedData = _queuedData.Take(); + Buffer.BlockCopy(queuedData, 0, buffer, offset, Math.Min(queuedData.Length, count)); + return queuedData.Length; + }*/ public override int Read(byte[] buffer, int offset, int count) { var queuedData = _queuedData.Take(); @@ -31,10 +39,8 @@ namespace Discord.Audio } public override void Write(byte[] buffer, int offset, int count) { - Buffer.BlockCopy(buffer, 0, _nonce, 0, 12); - count = SecretBox.Decrypt(buffer, offset, count, _buffer, 0, _nonce, _secretKey); var newBuffer = new byte[count]; - Buffer.BlockCopy(_buffer, 0, newBuffer, 0, count); + Buffer.BlockCopy(buffer, 0, newBuffer, 0, count); _queuedData.Add(newBuffer); } diff --git a/src/Discord.Net.WebSocket/Audio/Streams/RTPWriteStream.cs b/src/Discord.Net.WebSocket/Audio/Streams/RTPWriteStream.cs index 7ba95c591..5b8877f8e 100644 --- a/src/Discord.Net.WebSocket/Audio/Streams/RTPWriteStream.cs +++ b/src/Discord.Net.WebSocket/Audio/Streams/RTPWriteStream.cs @@ -1,33 +1,32 @@ using System; -using System.IO; using System.Threading; using System.Threading.Tasks; -namespace Discord.Audio +namespace Discord.Audio.Streams { - internal class RTPWriteStream : AudioOutStream + /// Wraps data in an RTP frame + public class RTPWriteStream : AudioOutStream { - private readonly IAudioTarget _target; - private readonly byte[] _nonce, _secretKey; + private readonly AudioOutStream _next; + private readonly byte[] _header; private int _samplesPerFrame; private uint _ssrc, _timestamp = 0; protected readonly byte[] _buffer; - internal RTPWriteStream(IAudioTarget target, byte[] secretKey, int samplesPerFrame, uint ssrc) + internal RTPWriteStream(AudioOutStream next, int samplesPerFrame, uint ssrc, int bufferSize = 4000) { - _target = target; - _secretKey = secretKey; + _next = next; _samplesPerFrame = samplesPerFrame; _ssrc = ssrc; - _buffer = new byte[4000]; - _nonce = new byte[24]; - _nonce[0] = 0x80; - _nonce[1] = 0x78; - _nonce[8] = (byte)(_ssrc >> 24); - _nonce[9] = (byte)(_ssrc >> 16); - _nonce[10] = (byte)(_ssrc >> 8); - _nonce[11] = (byte)(_ssrc >> 0); + _buffer = new byte[bufferSize]; + _header = new byte[24]; + _header[0] = 0x80; + _header[1] = 0x78; + _header[8] = (byte)(_ssrc >> 24); + _header[9] = (byte)(_ssrc >> 16); + _header[10] = (byte)(_ssrc >> 8); + _header[11] = (byte)(_ssrc >> 0); } public override void Write(byte[] buffer, int offset, int count) @@ -39,48 +38,28 @@ namespace Discord.Audio cancellationToken.ThrowIfCancellationRequested(); unchecked { - if (_nonce[3]++ == byte.MaxValue) - _nonce[2]++; + if (_header[3]++ == byte.MaxValue) + _header[2]++; _timestamp += (uint)_samplesPerFrame; - _nonce[4] = (byte)(_timestamp >> 24); - _nonce[5] = (byte)(_timestamp >> 16); - _nonce[6] = (byte)(_timestamp >> 8); - _nonce[7] = (byte)(_timestamp >> 0); + _header[4] = (byte)(_timestamp >> 24); + _header[5] = (byte)(_timestamp >> 16); + _header[6] = (byte)(_timestamp >> 8); + _header[7] = (byte)(_timestamp >> 0); } + Buffer.BlockCopy(_header, 0, _buffer, 0, 12); //Copy RTP header from to the buffer + Buffer.BlockCopy(buffer, offset, _buffer, 12, count); - count = SecretBox.Encrypt(buffer, offset, count, _buffer, 12, _nonce, _secretKey); - Buffer.BlockCopy(_nonce, 0, _buffer, 0, 12); //Copy the RTP header from nonce to buffer - await _target.SendAsync(_buffer, count + 12).ConfigureAwait(false); + await _next.WriteAsync(_buffer, 0, count + 12).ConfigureAwait(false); } - public override void Flush() + public override async Task FlushAsync(CancellationToken cancelToken) { - FlushAsync(CancellationToken.None).GetAwaiter().GetResult(); - } - public override async Task FlushAsync(CancellationToken cancellationToken) - { - await _target.FlushAsync(cancellationToken).ConfigureAwait(false); - } - - public override void Clear() - { - ClearAsync(CancellationToken.None).GetAwaiter().GetResult(); + await _next.FlushAsync(cancelToken).ConfigureAwait(false); } public override async Task ClearAsync(CancellationToken cancelToken) { - await _target.ClearAsync(cancelToken).ConfigureAwait(false); + await _next.ClearAsync(cancelToken).ConfigureAwait(false); } - - public override long Length { get { throw new NotSupportedException(); } } - public override long Position - { - get { throw new NotSupportedException(); } - set { throw new NotSupportedException(); } - } - - public override int Read(byte[] buffer, int offset, int count) { throw new NotSupportedException(); } - public override void SetLength(long value) { throw new NotSupportedException(); } - public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); } } } diff --git a/src/Discord.Net.WebSocket/Audio/Streams/SodiumDecryptStream.cs b/src/Discord.Net.WebSocket/Audio/Streams/SodiumDecryptStream.cs new file mode 100644 index 000000000..fa2c0ec0a --- /dev/null +++ b/src/Discord.Net.WebSocket/Audio/Streams/SodiumDecryptStream.cs @@ -0,0 +1,42 @@ +using System; +using System.Collections.Concurrent; + +namespace Discord.Audio.Streams +{ + /// Decrypts an RTP frame using libsodium + public class SodiumDecryptStream : AudioInStream + { + private readonly BlockingCollection _queuedData; //TODO: Replace with max-length ring buffer + private readonly AudioClient _audioClient; + private readonly byte[] _buffer, _nonce, _secretKey; + + public override bool CanRead => true; + public override bool CanSeek => false; + public override bool CanWrite => true; + + internal SodiumDecryptStream(AudioClient audioClient, byte[] secretKey, int bufferSize = 4000) + { + _audioClient = audioClient; + _secretKey = secretKey; + _buffer = new byte[bufferSize]; + _queuedData = new BlockingCollection(100); + _nonce = new byte[24]; + } + + public override int Read(byte[] buffer, int offset, int count) + { + var queuedData = _queuedData.Take(); + Buffer.BlockCopy(queuedData, 0, buffer, offset, Math.Min(queuedData.Length, count)); + return queuedData.Length; + } + public override void Write(byte[] buffer, int offset, int count) + { + Buffer.BlockCopy(buffer, 0, _nonce, 0, 12); //Copy RTP header to nonce + count = SecretBox.Decrypt(buffer, offset, count, _buffer, 0, _nonce, _secretKey); + + var newBuffer = new byte[count]; + Buffer.BlockCopy(_buffer, 0, newBuffer, 0, count); + _queuedData.Add(newBuffer); + } + } +} diff --git a/src/Discord.Net.WebSocket/Audio/Streams/SodiumEncryptStream.cs b/src/Discord.Net.WebSocket/Audio/Streams/SodiumEncryptStream.cs new file mode 100644 index 000000000..ef2fc1b27 --- /dev/null +++ b/src/Discord.Net.WebSocket/Audio/Streams/SodiumEncryptStream.cs @@ -0,0 +1,45 @@ +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace Discord.Audio.Streams +{ + /// Encrypts an RTP frame using libsodium + public class SodiumEncryptStream : AudioOutStream + { + private readonly AudioOutStream _next; + private readonly byte[] _nonce, _secretKey; + + //protected readonly byte[] _buffer; + + internal SodiumEncryptStream(AudioOutStream next, byte[] secretKey/*, int bufferSize = 4000*/) + { + _next = next; + _secretKey = secretKey; + //_buffer = new byte[bufferSize]; //TODO: Can Sodium do an in-place encrypt? + _nonce = new byte[24]; + } + + public override void Write(byte[] buffer, int offset, int count) + { + WriteAsync(buffer, offset, count, CancellationToken.None).GetAwaiter().GetResult(); + } + public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + cancellationToken.ThrowIfCancellationRequested(); + + Buffer.BlockCopy(buffer, offset, _nonce, 0, 12); //Copy nonce from RTP header + count = SecretBox.Encrypt(buffer, offset + 12, count - 12, buffer, 12, _nonce, _secretKey); + await _next.WriteAsync(buffer, 0, count + 12, cancellationToken).ConfigureAwait(false); + } + + public override async Task FlushAsync(CancellationToken cancelToken) + { + await _next.FlushAsync(cancelToken).ConfigureAwait(false); + } + public override async Task ClearAsync(CancellationToken cancelToken) + { + await _next.ClearAsync(cancelToken).ConfigureAwait(false); + } + } +} diff --git a/src/Discord.Net.WebSocket/Audio/Targets/BufferedAudioTarget.cs b/src/Discord.Net.WebSocket/Audio/Targets/BufferedAudioTarget.cs deleted file mode 100644 index b27c5c8b3..000000000 --- a/src/Discord.Net.WebSocket/Audio/Targets/BufferedAudioTarget.cs +++ /dev/null @@ -1,119 +0,0 @@ -using System; -using System.Collections.Concurrent; -using System.Threading; -using System.Threading.Tasks; - -namespace Discord.Audio -{ - internal class BufferedAudioTarget : IAudioTarget, IDisposable - { - private struct Frame - { - public Frame(byte[] buffer, int bytes) - { - Buffer = buffer; - Bytes = bytes; - } - - public readonly byte[] Buffer; - public readonly int Bytes; - } - - private static readonly byte[] _silenceFrame = new byte[] { 0xF8, 0xFF, 0xFE }; - - private Task _task; - private DiscordVoiceAPIClient _client; - private CancellationTokenSource _cancelTokenSource; - private CancellationToken _cancelToken; - private ConcurrentQueue _queuedFrames; - private ConcurrentQueue _bufferPool; - private SemaphoreSlim _queueLock; - private int _ticksPerFrame; - - internal BufferedAudioTarget(DiscordVoiceAPIClient client, int samplesPerFrame, int bufferMillis, CancellationToken cancelToken) - { - _client = client; - _ticksPerFrame = samplesPerFrame / 48; - int queueLength = (bufferMillis + (_ticksPerFrame - 1)) / _ticksPerFrame; //Round up - - _cancelTokenSource = new CancellationTokenSource(); - _cancelToken = CancellationTokenSource.CreateLinkedTokenSource(_cancelTokenSource.Token, cancelToken).Token; - _queuedFrames = new ConcurrentQueue(); - _bufferPool = new ConcurrentQueue(); - for (int i = 0; i < queueLength; i++) - _bufferPool.Enqueue(new byte[1275]); - _queueLock = new SemaphoreSlim(queueLength, queueLength); - - _task = Run(); - } - - private Task Run() - { - return Task.Run(async () => - { - try - { - long nextTick = Environment.TickCount; - while (!_cancelToken.IsCancellationRequested) - { - long tick = Environment.TickCount; - long dist = nextTick - tick; - if (dist <= 0) - { - Frame frame; - if (_queuedFrames.TryDequeue(out frame)) - { - await _client.SendAsync(frame.Buffer, frame.Bytes).ConfigureAwait(false); - _bufferPool.Enqueue(frame.Buffer); - _queueLock.Release(); - } - else - await _client.SendAsync(_silenceFrame, _silenceFrame.Length).ConfigureAwait(false); - nextTick += _ticksPerFrame; - } - else if (dist > 1) - await Task.Delay((int)dist).ConfigureAwait(false); - } - } - catch (OperationCanceledException) { } - }); - } - - public async Task SendAsync(byte[] data, int count) - { - await _queueLock.WaitAsync(-1, _cancelToken).ConfigureAwait(false); - byte[] buffer; - _bufferPool.TryDequeue(out buffer); - Buffer.BlockCopy(data, 0, buffer, 0, count); - _queuedFrames.Enqueue(new Frame(buffer, count)); - } - - public async Task FlushAsync(CancellationToken cancelToken) - { - while (true) - { - cancelToken.ThrowIfCancellationRequested(); - if (_queuedFrames.Count == 0) - return; - await Task.Delay(250, cancelToken).ConfigureAwait(false); - } - } - public Task ClearAsync(CancellationToken cancelToken) - { - Frame ignored; - do - cancelToken.ThrowIfCancellationRequested(); - while (_queuedFrames.TryDequeue(out ignored)); - return Task.Delay(0); - } - protected void Dispose(bool disposing) - { - if (disposing) - _cancelTokenSource.Cancel(); - } - public void Dispose() - { - Dispose(true); - } - } -} \ No newline at end of file diff --git a/src/Discord.Net.WebSocket/Audio/Targets/DirectAudioTarget.cs b/src/Discord.Net.WebSocket/Audio/Targets/DirectAudioTarget.cs deleted file mode 100644 index 2440fc0a8..000000000 --- a/src/Discord.Net.WebSocket/Audio/Targets/DirectAudioTarget.cs +++ /dev/null @@ -1,22 +0,0 @@ -using System.Threading; -using System.Threading.Tasks; - -namespace Discord.Audio -{ - internal class DirectAudioTarget : IAudioTarget - { - private readonly DiscordVoiceAPIClient _client; - public DirectAudioTarget(DiscordVoiceAPIClient client) - { - _client = client; - } - - public Task SendAsync(byte[] buffer, int count) - => _client.SendAsync(buffer, count); - - public Task FlushAsync(CancellationToken cancelToken) - => Task.Delay(0); - public Task ClearAsync(CancellationToken cancelToken) - => Task.Delay(0); - } -} diff --git a/src/Discord.Net.WebSocket/Audio/Targets/IAudioTarget.cs b/src/Discord.Net.WebSocket/Audio/Targets/IAudioTarget.cs deleted file mode 100644 index 1aa0d4ade..000000000 --- a/src/Discord.Net.WebSocket/Audio/Targets/IAudioTarget.cs +++ /dev/null @@ -1,12 +0,0 @@ -using System.Threading; -using System.Threading.Tasks; - -namespace Discord.Audio -{ - internal interface IAudioTarget - { - Task SendAsync(byte[] buffer, int count); - Task FlushAsync(CancellationToken cancelToken); - Task ClearAsync(CancellationToken cancelToken); - } -} diff --git a/src/Discord.Net.WebSocket/DiscordVoiceApiClient.cs b/src/Discord.Net.WebSocket/DiscordVoiceApiClient.cs index 27d2a8003..fa619b58c 100644 --- a/src/Discord.Net.WebSocket/DiscordVoiceApiClient.cs +++ b/src/Discord.Net.WebSocket/DiscordVoiceApiClient.cs @@ -64,7 +64,7 @@ namespace Discord.Audio }; WebSocketClient = webSocketProvider(); - //_gatewayClient.SetHeader("user-agent", DiscordConfig.UserAgent); (Causes issues in .Net 4.6+) + //_gatewayClient.SetHeader("user-agent", DiscordConfig.UserAgent); //(Causes issues in .Net 4.6+) WebSocketClient.BinaryMessage += async (data, index, count) => { using (var compressed = new MemoryStream(data, index + 2, count - 2)) @@ -117,9 +117,9 @@ namespace Discord.Audio await WebSocketClient.SendAsync(bytes, 0, bytes.Length, true).ConfigureAwait(false); await _sentGatewayMessageEvent.InvokeAsync(opCode).ConfigureAwait(false); } - public async Task SendAsync(byte[] data, int bytes) + public async Task SendAsync(byte[] data, int offset, int bytes) { - await _udp.SendAsync(data, 0, bytes).ConfigureAwait(false); + await _udp.SendAsync(data, offset, bytes).ConfigureAwait(false); await _sentDataEvent.InvokeAsync(bytes).ConfigureAwait(false); } @@ -224,7 +224,7 @@ namespace Discord.Audio packet[1] = (byte)(ssrc >> 16); packet[2] = (byte)(ssrc >> 8); packet[3] = (byte)(ssrc >> 0); - await SendAsync(packet, 70).ConfigureAwait(false); + await SendAsync(packet, 0, 70).ConfigureAwait(false); await _sentDiscoveryEvent.InvokeAsync().ConfigureAwait(false); }