From 909127d330f9142448da502b124c417dcafed2aa Mon Sep 17 00:00:00 2001 From: RogueException Date: Sat, 1 Apr 2017 13:13:20 -0300 Subject: [PATCH] InputStream reads should wait until data is available. --- src/Discord.Net.Core/Audio/AudioInStream.cs | 4 +-- src/Discord.Net.Core/Audio/IAudioClient.cs | 1 + .../Audio/Streams/InputStream.cs | 33 ++++++++++--------- 3 files changed, 21 insertions(+), 17 deletions(-) diff --git a/src/Discord.Net.Core/Audio/AudioInStream.cs b/src/Discord.Net.Core/Audio/AudioInStream.cs index a6b5c5e6b..6503474e5 100644 --- a/src/Discord.Net.Core/Audio/AudioInStream.cs +++ b/src/Discord.Net.Core/Audio/AudioInStream.cs @@ -11,9 +11,9 @@ namespace Discord.Audio public override bool CanSeek => false; public override bool CanWrite => true; - public abstract Task ReadFrameAsync(CancellationToken cancelToken); + public abstract Task ReadFrameAsync(CancellationToken cancelToken); - public RTPFrame? ReadFrame() + public RTPFrame ReadFrame() { return ReadFrameAsync(CancellationToken.None).GetAwaiter().GetResult(); } diff --git a/src/Discord.Net.Core/Audio/IAudioClient.cs b/src/Discord.Net.Core/Audio/IAudioClient.cs index 149905654..3ee008320 100644 --- a/src/Discord.Net.Core/Audio/IAudioClient.cs +++ b/src/Discord.Net.Core/Audio/IAudioClient.cs @@ -10,6 +10,7 @@ namespace Discord.Audio event Func LatencyUpdated; event Func StreamCreated; event Func StreamDestroyed; + event Func SpeakingUpdated; /// Gets the current connection state of this client. ConnectionState ConnectionState { get; } diff --git a/src/Discord.Net.WebSocket/Audio/Streams/InputStream.cs b/src/Discord.Net.WebSocket/Audio/Streams/InputStream.cs index 4014c7e1e..14bb18851 100644 --- a/src/Discord.Net.WebSocket/Audio/Streams/InputStream.cs +++ b/src/Discord.Net.WebSocket/Audio/Streams/InputStream.cs @@ -8,7 +8,10 @@ namespace Discord.Audio.Streams /// Reads the payload from an RTP frame public class InputStream : AudioInStream { + private const int MaxFrames = 100; + private ConcurrentQueue _frames; + private SemaphoreSlim _signal; private ushort _nextSeq; private uint _nextTimestamp; private bool _hasHeader; @@ -21,28 +24,27 @@ namespace Discord.Audio.Streams public InputStream() { _frames = new ConcurrentQueue(); + _signal = new SemaphoreSlim(0, MaxFrames); } - public override Task ReadFrameAsync(CancellationToken cancelToken) + public override async Task ReadFrameAsync(CancellationToken cancelToken) { cancelToken.ThrowIfCancellationRequested(); - if (_frames.TryDequeue(out var frame)) - return Task.FromResult(frame); - return Task.FromResult(null); + RTPFrame frame; + await _signal.WaitAsync(cancelToken).ConfigureAwait(false); + _frames.TryDequeue(out frame); + return frame; } - public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancelToken) + public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancelToken) { cancelToken.ThrowIfCancellationRequested(); - - if (_frames.TryDequeue(out var frame)) - { - if (count < frame.Payload.Length) - throw new InvalidOperationException("Buffer is too small."); - Buffer.BlockCopy(frame.Payload, 0, buffer, offset, frame.Payload.Length); - return Task.FromResult(frame.Payload.Length); - } - return Task.FromResult(0); + + var frame = await ReadFrameAsync(cancelToken).ConfigureAwait(false); + if (count < frame.Payload.Length) + throw new InvalidOperationException("Buffer is too small."); + Buffer.BlockCopy(frame.Payload, 0, buffer, offset, frame.Payload.Length); + return frame.Payload.Length; } public void WriteHeader(ushort seq, uint timestamp) @@ -57,7 +59,7 @@ namespace Discord.Audio.Streams { cancelToken.ThrowIfCancellationRequested(); - if (_frames.Count > 100) //1-2 seconds + if (_frames.Count > MaxFrames) //1-2 seconds { _hasHeader = false; return Task.Delay(0); //Buffer overloaded @@ -72,6 +74,7 @@ namespace Discord.Audio.Streams timestamp: _nextTimestamp, payload: payload )); + _signal.Release(); _hasHeader = false; return Task.Delay(0); }