diff --git a/src/Discord.Net.Core/Audio/IAudioClient.cs b/src/Discord.Net.Core/Audio/IAudioClient.cs index 5e7aa2f6e..38e25d7b5 100644 --- a/src/Discord.Net.Core/Audio/IAudioClient.cs +++ b/src/Discord.Net.Core/Audio/IAudioClient.cs @@ -22,7 +22,7 @@ namespace Discord.Audio /// /// Samples per frame. Must be 120, 240, 480, 960, 1920 or 2880, representing 2.5, 5, 10, 20, 40 or 60 milliseconds respectively. /// - Stream CreateOpusStream(int samplesPerFrame); + Stream CreateOpusStream(int samplesPerFrame, int bufferMillis = 1000); /// /// Creates a new outgoing stream accepting Opus-encoded data. This is a direct stream with no internal timer. /// @@ -35,7 +35,7 @@ namespace Discord.Audio /// Samples per frame. Must be 120, 240, 480, 960, 1920 or 2880, representing 2.5, 5, 10, 20, 40 or 60 milliseconds respectively. /// /// - Stream CreatePCMStream(int samplesPerFrame, int channels = 2, int? bitrate = null); + Stream CreatePCMStream(int samplesPerFrame, int channels = 2, int? bitrate = null, int bufferMillis = 1000); /// /// Creates a new direct outgoing stream accepting PCM (raw) data. This is a direct stream with no internal timer. /// diff --git a/src/Discord.Net.WebSocket/Audio/AudioClient.cs b/src/Discord.Net.WebSocket/Audio/AudioClient.cs index e18995e86..dd01aa2f9 100644 --- a/src/Discord.Net.WebSocket/Audio/AudioClient.cs +++ b/src/Discord.Net.WebSocket/Audio/AudioClient.cs @@ -13,7 +13,7 @@ using System.Threading.Tasks; namespace Discord.Audio { - public class AudioClient : IAudioClient, IDisposable + internal class AudioClient : IAudioClient, IDisposable { public event Func Connected { @@ -74,7 +74,7 @@ namespace Discord.Audio ApiClient.SentGatewayMessage += async opCode => await _audioLogger.DebugAsync($"Sent {opCode}").ConfigureAwait(false); ApiClient.SentDiscovery += async () => await _audioLogger.DebugAsync($"Sent Discovery").ConfigureAwait(false); - ApiClient.SentData += async bytes => await _audioLogger.DebugAsync($"Sent {bytes} Bytes").ConfigureAwait(false); + //ApiClient.SentData += async bytes => await _audioLogger.DebugAsync($"Sent {bytes} Bytes").ConfigureAwait(false); ApiClient.ReceivedEvent += ProcessMessageAsync; ApiClient.ReceivedPacket += ProcessPacketAsync; ApiClient.Disconnected += async ex => @@ -170,10 +170,10 @@ namespace Discord.Audio await Discord.ApiClient.SendVoiceStateUpdateAsync(Guild.Id, null, false, false).ConfigureAwait(false); } - public Stream CreateOpusStream(int samplesPerFrame) + public Stream CreateOpusStream(int samplesPerFrame, int bufferMillis) { CheckSamplesPerFrame(samplesPerFrame); - var target = new BufferedAudioTarget(ApiClient, samplesPerFrame, _cancelTokenSource.Token); + var target = new BufferedAudioTarget(ApiClient, samplesPerFrame, bufferMillis, _cancelTokenSource.Token); return new RTPWriteStream(target, _secretKey, samplesPerFrame, _ssrc); } public Stream CreateDirectOpusStream(int samplesPerFrame) @@ -182,13 +182,13 @@ namespace Discord.Audio var target = new DirectAudioTarget(ApiClient); return new RTPWriteStream(target, _secretKey, samplesPerFrame, _ssrc); } - public Stream CreatePCMStream(int samplesPerFrame, int channels = 2, int? bitrate = null) + public Stream CreatePCMStream(int samplesPerFrame, int channels, int? bitrate, int bufferMillis) { CheckSamplesPerFrame(samplesPerFrame); - var target = new BufferedAudioTarget(ApiClient, samplesPerFrame, _cancelTokenSource.Token); + var target = new BufferedAudioTarget(ApiClient, samplesPerFrame, bufferMillis, _cancelTokenSource.Token); return new OpusEncodeStream(target, _secretKey, channels, samplesPerFrame, _ssrc, bitrate); } - public Stream CreateDirectPCMStream(int samplesPerFrame, int channels = 2, int? bitrate = null) + public Stream CreateDirectPCMStream(int samplesPerFrame, int channels, int? bitrate) { CheckSamplesPerFrame(samplesPerFrame); var target = new DirectAudioTarget(ApiClient); diff --git a/src/Discord.Net.WebSocket/Audio/Targets/BufferedAudioTarget.cs b/src/Discord.Net.WebSocket/Audio/Targets/BufferedAudioTarget.cs index 32b7c2c4d..d302dca87 100644 --- a/src/Discord.Net.WebSocket/Audio/Targets/BufferedAudioTarget.cs +++ b/src/Discord.Net.WebSocket/Audio/Targets/BufferedAudioTarget.cs @@ -1,6 +1,5 @@ using System; using System.Collections.Concurrent; -using System.Diagnostics; using System.Threading; using System.Threading.Tasks; @@ -8,62 +7,98 @@ namespace Discord.Audio { internal class BufferedAudioTarget : IAudioTarget, IDisposable { - private static readonly byte[] _silencePacket = new byte[] { 0xF8, 0xFF, 0xFE }; + private struct Frame + { + public Frame(byte[] buffer, int bytes) + { + Buffer = buffer; + Bytes = bytes; + } + + public readonly byte[] Buffer; + public readonly int Bytes; + } + + private static readonly byte[] _silenceFrame = new byte[] { 0xF8, 0xFF, 0xFE }; private Task _task; private DiscordVoiceAPIClient _client; private CancellationTokenSource _cancelTokenSource; - private ConcurrentQueue _queue; + private CancellationToken _cancelToken; + private ConcurrentQueue _queuedFrames; + private ConcurrentQueue _bufferPool; + private SemaphoreSlim _queueLock; + private int _ticksPerFrame; - internal BufferedAudioTarget(DiscordVoiceAPIClient client, int samplesPerFrame, CancellationToken cancelToken) + internal BufferedAudioTarget(DiscordVoiceAPIClient client, int samplesPerFrame, int bufferMillis, CancellationToken cancelToken) { _client = client; - long ticksPerFrame = samplesPerFrame / 48; + _ticksPerFrame = samplesPerFrame / 48; + int queueLength = (bufferMillis + (_ticksPerFrame - 1)) / _ticksPerFrame; //Round up _cancelTokenSource = new CancellationTokenSource(); - cancelToken = CancellationTokenSource.CreateLinkedTokenSource(_cancelTokenSource.Token, cancelToken).Token; - _queue = new ConcurrentQueue(); //TODO: We need a better queue + _cancelToken = CancellationTokenSource.CreateLinkedTokenSource(_cancelTokenSource.Token, cancelToken).Token; + _queuedFrames = new ConcurrentQueue(); + _bufferPool = new ConcurrentQueue(); + for (int i = 0; i < queueLength; i++) + _bufferPool.Enqueue(new byte[1275]); + _queueLock = new SemaphoreSlim(queueLength, queueLength); - _task = Run(ticksPerFrame, cancelToken); + _task = Run(); } - private Task Run(long ticksPerFrame, CancellationToken cancelToken) + private Task Run() { return Task.Run(async () => { - long nextTick = Environment.TickCount; - while (!cancelToken.IsCancellationRequested) + try { - long tick = Environment.TickCount; - long dist = nextTick - tick; - if (dist <= 0) + long nextTick = Environment.TickCount; + while (!_cancelToken.IsCancellationRequested) { - byte[] buffer; - if (_queue.TryDequeue(out buffer)) - await _client.SendAsync(buffer, buffer.Length).ConfigureAwait(false); - else - await _client.SendAsync(_silencePacket, _silencePacket.Length).ConfigureAwait(false); - nextTick += ticksPerFrame; + long tick = Environment.TickCount; + long dist = nextTick - tick; + if (dist <= 0) + { + Frame frame; + if (_queuedFrames.TryDequeue(out frame)) + { +#if NETSTANDARD1_3 + Console.WriteLine("Pop"); +#endif + await _client.SendAsync(frame.Buffer, frame.Bytes).ConfigureAwait(false); + _bufferPool.Enqueue(frame.Buffer); + _queueLock.Release(); + } + else + await _client.SendAsync(_silenceFrame, _silenceFrame.Length).ConfigureAwait(false); + nextTick += _ticksPerFrame; + } + else if (dist > 1) + await Task.Delay((int)dist).ConfigureAwait(false); } - else if (dist > 1) - await Task.Delay((int)dist).ConfigureAwait(false); } + catch (OperationCanceledException) { } }); } - public Task SendAsync(byte[] buffer, int count) + public async Task SendAsync(byte[] data, int count) { - byte[] newBuffer = new byte[count]; - Buffer.BlockCopy(buffer, 0, newBuffer, 0, count); - _queue.Enqueue(newBuffer); - return Task.Delay(0); + await _queueLock.WaitAsync(-1, _cancelToken).ConfigureAwait(false); +#if NETSTANDARD1_3 + Console.WriteLine("Push"); +#endif + byte[] buffer; + _bufferPool.TryDequeue(out buffer); + Buffer.BlockCopy(data, 0, buffer, 0, count); + _queuedFrames.Enqueue(new Frame(buffer, count)); } public async Task FlushAsync() { while (true) { - if (_queue.Count == 0) + if (_queuedFrames.Count == 0) return; await Task.Delay(250).ConfigureAwait(false); } diff --git a/src/Discord.Net.WebSocket/Entities/Guilds/SocketGuild.cs b/src/Discord.Net.WebSocket/Entities/Guilds/SocketGuild.cs index 048b86a4a..eddee15c7 100644 --- a/src/Discord.Net.WebSocket/Entities/Guilds/SocketGuild.cs +++ b/src/Discord.Net.WebSocket/Entities/Guilds/SocketGuild.cs @@ -32,6 +32,7 @@ namespace Discord.WebSocket private ConcurrentDictionary _voiceStates; private ImmutableArray _emojis; private ImmutableArray _features; + private AudioClient _audioClient; internal bool _available; public string Name { get; private set; } @@ -42,7 +43,6 @@ namespace Discord.WebSocket public DefaultMessageNotifications DefaultMessageNotifications { get; private set; } public int MemberCount { get; set; } public int DownloadedMemberCount { get; private set; } - public AudioClient AudioClient { get; private set; } public ulong? AFKChannelId { get; private set; } public ulong? EmbedChannelId { get; private set; } @@ -59,6 +59,7 @@ namespace Discord.WebSocket public bool IsSynced => _syncPromise.Task.IsCompleted; public Task SyncPromise => _syncPromise.Task; public Task DownloaderPromise => _downloaderPromise.Task; + public IAudioClient AudioClient => _audioClient; public SocketGuildUser CurrentUser { get @@ -69,7 +70,6 @@ namespace Discord.WebSocket return null; } } - public SocketRole EveryoneRole => GetRole(Id); public IReadOnlyCollection Channels { @@ -476,9 +476,9 @@ namespace Discord.WebSocket { _audioConnectPromise?.TrySetCanceledAsync(); //Cancel any previous audio connection _audioConnectPromise = null; - if (AudioClient != null) - await AudioClient.DisconnectAsync().ConfigureAwait(false); - AudioClient = null; + if (_audioClient != null) + await _audioClient.DisconnectAsync().ConfigureAwait(false); + _audioClient = null; } internal async Task FinishConnectAudio(int id, string url, string token) { @@ -487,7 +487,7 @@ namespace Discord.WebSocket await _audioLock.WaitAsync().ConfigureAwait(false); try { - if (AudioClient == null) + if (_audioClient == null) { var audioClient = new AudioClient(this, id); var promise = _audioConnectPromise; @@ -497,7 +497,7 @@ namespace Discord.WebSocket if (!promise.Task.IsCompleted) { try { audioClient.Dispose(); } catch { } - AudioClient = null; + _audioClient = null; if (ex != null) await promise.TrySetExceptionAsync(ex); else @@ -535,10 +535,10 @@ namespace Discord.WebSocket _audioLock.Release(); }*/ }; - AudioClient = audioClient; + _audioClient = audioClient; } - await AudioClient.ConnectAsync(url, Discord.CurrentUser.Id, voiceState.VoiceSessionId, token).ConfigureAwait(false); - await _audioConnectPromise.TrySetResultAsync(AudioClient).ConfigureAwait(false); + await _audioClient.ConnectAsync(url, Discord.CurrentUser.Id, voiceState.VoiceSessionId, token).ConfigureAwait(false); + await _audioConnectPromise.TrySetResultAsync(_audioClient).ConfigureAwait(false); } catch (OperationCanceledException) {