diff --git a/src/Discord.Net.Core/Audio/IAudioClient.cs b/src/Discord.Net.Core/Audio/IAudioClient.cs index 2fbb4a450..746515fa7 100644 --- a/src/Discord.Net.Core/Audio/IAudioClient.cs +++ b/src/Discord.Net.Core/Audio/IAudioClient.cs @@ -17,7 +17,35 @@ namespace Discord.Audio Task DisconnectAsync(); + /// + /// Creates a new outgoing stream accepting Opus-encoded data. + /// + /// Samples per frame. Must be 120, 240, 480, 960, 1920 or 2880, representing 2.5, 5, 10, 20, 40 or 60 milliseconds respectively. + /// The size of the internal buffer used for encryption. + /// Stream CreateOpusStream(int samplesPerFrame, int bufferSize = 4000); + /// + /// Creates a new outgoing stream accepting Opus-encoded data. This is a direct stream with no internal timer. + /// + /// Samples per frame. Must be 120, 240, 480, 960, 1920 or 2880, representing 2.5, 5, 10, 20, 40 or 60 milliseconds respectively. + /// The size of the internal buffer used for encryption. + /// + Stream CreateDirectOpusStream(int samplesPerFrame, int bufferSize = 4000); + /// + /// Creates a new outgoing stream accepting PCM (raw) data. + /// + /// Samples per frame. Must be 120, 240, 480, 960, 1920 or 2880, representing 2.5, 5, 10, 20, 40 or 60 milliseconds respectively. + /// + /// The size of the internal buffer used for encoding and encryption. + /// Stream CreatePCMStream(int samplesPerFrame, int? bitrate = null, int bufferSize = 4000); + /// + /// Creates a new direct outgoing stream accepting PCM (raw) data. This is a direct stream with no internal timer. + /// + /// Samples per frame. Must be 120, 240, 480, 960, 1920 or 2880, representing 2.5, 5, 10, 20, 40 or 60 milliseconds respectively. + /// + /// The size of the internal buffer used for encoding and encryption. + /// + Stream CreateDirectPCMStream(int samplesPerFrame, int? bitrate = null, int bufferSize = 4000); } } diff --git a/src/Discord.Net.WebSocket/Audio/AudioClient.cs b/src/Discord.Net.WebSocket/Audio/AudioClient.cs index 7443c569a..2d11baaec 100644 --- a/src/Discord.Net.WebSocket/Audio/AudioClient.cs +++ b/src/Discord.Net.WebSocket/Audio/AudioClient.cs @@ -39,7 +39,7 @@ namespace Discord.Audio private readonly JsonSerializer _serializer; private TaskCompletionSource _connectTask; - private CancellationTokenSource _cancelToken; + private CancellationTokenSource _cancelTokenSource; private Task _heartbeatTask; private long _heartbeatTime; private string _url; @@ -110,7 +110,7 @@ namespace Discord.Audio { _url = url; _connectTask = new TaskCompletionSource(); - _cancelToken = new CancellationTokenSource(); + _cancelTokenSource = new CancellationTokenSource(); await ApiClient.ConnectAsync("wss://" + url).ConfigureAwait(false); await ApiClient.SendIdentityAsync(userId, sessionId, token).ConfigureAwait(false); @@ -152,7 +152,7 @@ namespace Discord.Audio await _audioLogger.InfoAsync("Disconnecting").ConfigureAwait(false); //Signal tasks to complete - try { _cancelToken.Cancel(); } catch { } + try { _cancelTokenSource.Cancel(); } catch { } //Disconnect from server await ApiClient.DisconnectAsync().ConfigureAwait(false); @@ -169,19 +169,35 @@ namespace Discord.Audio await _disconnectedEvent.InvokeAsync(ex).ConfigureAwait(false); } - public void Send(byte[] data, int count) + public Stream CreateOpusStream(int samplesPerFrame, int bufferSize = 4000) { - //TODO: Queue these? - ApiClient.SendAsync(data, count).ConfigureAwait(false); + CheckSamplesPerFrame(samplesPerFrame); + var target = new BufferedAudioTarget(ApiClient, samplesPerFrame, _cancelTokenSource.Token); + return new RTPWriteStream(target, _secretKey, samplesPerFrame, _ssrc, bufferSize = 4000); } - - public Stream CreateOpusStream(int samplesPerFrame, int bufferSize = 4000) + public Stream CreateDirectOpusStream(int samplesPerFrame, int bufferSize = 4000) { - return new RTPWriteStream(this, _secretKey, samplesPerFrame, _ssrc, bufferSize = 4000); + CheckSamplesPerFrame(samplesPerFrame); + var target = new DirectAudioTarget(ApiClient); + return new RTPWriteStream(target, _secretKey, samplesPerFrame, _ssrc, bufferSize = 4000); } public Stream CreatePCMStream(int samplesPerFrame, int? bitrate = null, int bufferSize = 4000) { - return new OpusEncodeStream(this, _secretKey, samplesPerFrame, _ssrc, bitrate, bufferSize); + CheckSamplesPerFrame(samplesPerFrame); + var target = new BufferedAudioTarget(ApiClient, samplesPerFrame, _cancelTokenSource.Token); + return new OpusEncodeStream(target, _secretKey, samplesPerFrame, _ssrc, bitrate, bufferSize); + } + public Stream CreateDirectPCMStream(int samplesPerFrame, int? bitrate = null, int bufferSize = 4000) + { + CheckSamplesPerFrame(samplesPerFrame); + var target = new DirectAudioTarget(ApiClient); + return new OpusEncodeStream(target, _secretKey, samplesPerFrame, _ssrc, bitrate, bufferSize); + } + private void CheckSamplesPerFrame(int samplesPerFrame) + { + if (samplesPerFrame != 120 && samplesPerFrame != 240 && samplesPerFrame != 480 && + samplesPerFrame != 960 && samplesPerFrame != 1920 && samplesPerFrame != 2880) + throw new ArgumentException("Value must be 120, 240, 480, 960, 1920 or 2880", nameof(samplesPerFrame)); } private async Task ProcessMessageAsync(VoiceOpCode opCode, object payload) @@ -201,7 +217,7 @@ namespace Discord.Audio throw new InvalidOperationException($"Discord does not support {DiscordVoiceAPIClient.Mode}"); _heartbeatTime = 0; - _heartbeatTask = RunHeartbeatAsync(data.HeartbeatInterval, _cancelToken.Token); + _heartbeatTask = RunHeartbeatAsync(data.HeartbeatInterval, _cancelTokenSource.Token); ApiClient.SetUdpEndpoint(_url, data.Port); await ApiClient.SendDiscoveryAsync(_ssrc).ConfigureAwait(false); diff --git a/src/Discord.Net.WebSocket/Audio/Opus/OpusEncoder.cs b/src/Discord.Net.WebSocket/Audio/Opus/OpusEncoder.cs index c1eb3843d..ca044bd69 100644 --- a/src/Discord.Net.WebSocket/Audio/Opus/OpusEncoder.cs +++ b/src/Discord.Net.WebSocket/Audio/Opus/OpusEncoder.cs @@ -52,7 +52,7 @@ namespace Discord.Audio throw new Exception($"Opus Error: {(OpusError)result}"); } - /// Gets or sets whether Forward Error Correction is enabled. + /// Gets or sets the encoder's bitrate. public void SetBitrate(int value) { if (value < 1 || value > DiscordVoiceAPIClient.MaxBitrate) diff --git a/src/Discord.Net.WebSocket/Audio/Streams/OpusEncodeStream.cs b/src/Discord.Net.WebSocket/Audio/Streams/OpusEncodeStream.cs index 3806cc8bb..f86901ef1 100644 --- a/src/Discord.Net.WebSocket/Audio/Streams/OpusEncodeStream.cs +++ b/src/Discord.Net.WebSocket/Audio/Streams/OpusEncodeStream.cs @@ -7,8 +7,8 @@ private readonly OpusEncoder _encoder; - internal OpusEncodeStream(AudioClient audioClient, byte[] secretKey, int samplesPerFrame, uint ssrc, int? bitrate = null, int bufferSize = 4000) - : base(audioClient, secretKey, samplesPerFrame, ssrc, bufferSize) + internal OpusEncodeStream(IAudioTarget target, byte[] secretKey, int samplesPerFrame, uint ssrc, int? bitrate = null, int bufferSize = 4000) + : base(target, secretKey, samplesPerFrame, ssrc, bufferSize) { _encoder = new OpusEncoder(SampleRate, Channels); diff --git a/src/Discord.Net.WebSocket/Audio/Streams/RTPWriteStream.cs b/src/Discord.Net.WebSocket/Audio/Streams/RTPWriteStream.cs index db755c877..5ea0d2473 100644 --- a/src/Discord.Net.WebSocket/Audio/Streams/RTPWriteStream.cs +++ b/src/Discord.Net.WebSocket/Audio/Streams/RTPWriteStream.cs @@ -5,7 +5,7 @@ namespace Discord.Audio { internal class RTPWriteStream : Stream { - private readonly AudioClient _audioClient; + private readonly IAudioTarget _target; private readonly byte[] _nonce, _secretKey; private int _samplesPerFrame; private uint _ssrc, _timestamp = 0; @@ -16,9 +16,9 @@ namespace Discord.Audio public override bool CanSeek => false; public override bool CanWrite => true; - internal RTPWriteStream(AudioClient audioClient, byte[] secretKey, int samplesPerFrame, uint ssrc, int bufferSize = 4000) + internal RTPWriteStream(IAudioTarget target, byte[] secretKey, int samplesPerFrame, uint ssrc, int bufferSize = 4000) { - _audioClient = audioClient; + _target = target; _secretKey = secretKey; _samplesPerFrame = samplesPerFrame; _ssrc = ssrc; @@ -48,7 +48,7 @@ namespace Discord.Audio count = SecretBox.Encrypt(buffer, offset, count, _buffer, 12, _nonce, _secretKey); Buffer.BlockCopy(_nonce, 0, _buffer, 0, 12); //Copy the RTP header from nonce to buffer - _audioClient.Send(_buffer, count + 12); + _target.SendAsync(_buffer, count + 12).GetAwaiter().GetResult(); } public override void Flush() { } diff --git a/src/Discord.Net.WebSocket/Audio/Targets/BufferedAudioTarget.cs b/src/Discord.Net.WebSocket/Audio/Targets/BufferedAudioTarget.cs new file mode 100644 index 000000000..e965d4aca --- /dev/null +++ b/src/Discord.Net.WebSocket/Audio/Targets/BufferedAudioTarget.cs @@ -0,0 +1,79 @@ +using System; +using System.Collections.Concurrent; +using System.Diagnostics; +using System.Threading; +using System.Threading.Tasks; + +namespace Discord.Audio +{ + internal class BufferedAudioTarget : IAudioTarget, IDisposable + { + private static readonly byte[] _silencePacket = new byte[] { 0xF8, 0xFF, 0xFE }; + + private double _ticksPerFrame; + private Task _task; + private DiscordVoiceAPIClient _client; + private CancellationTokenSource _cancelTokenSource; + private ConcurrentQueue _queue; + + internal BufferedAudioTarget(DiscordVoiceAPIClient client, int samplesPerFrame, CancellationToken cancelToken) + { + _client = client; + double milliseconds = samplesPerFrame / 48.0; + double ticksPerFrame = Stopwatch.Frequency / 1000.0 * milliseconds; + + _cancelTokenSource = new CancellationTokenSource(); + cancelToken = CancellationTokenSource.CreateLinkedTokenSource(_cancelTokenSource.Token, cancelToken).Token; + _queue = new ConcurrentQueue(); //TODO: We need a better queue + + _task = Run(ticksPerFrame, cancelToken); + } + + private Task Run(double ticksPerFrame, CancellationToken cancelToken) + { + return Task.Run(async () => + { + var stopwatch = Stopwatch.StartNew(); + long lastTick = stopwatch.ElapsedTicks; + double ticksPerMilli = Stopwatch.Frequency / 1000.0; + while (!cancelToken.IsCancellationRequested) + { + long thisTick = stopwatch.ElapsedTicks; + double remaining = ticksPerFrame - (thisTick - lastTick); + if (remaining <= 0) + { + byte[] buffer; + if (_queue.TryDequeue(out buffer)) + await _client.SendAsync(buffer, buffer.Length).ConfigureAwait(false); + else + await _client.SendAsync(_silencePacket, _silencePacket.Length).ConfigureAwait(false); + lastTick = thisTick; + } + else if (remaining > 1) + { + int millis = (int)Math.Floor(remaining / ticksPerMilli); + await Task.Delay(millis).ConfigureAwait(false); + } + } + }); + } + + public Task SendAsync(byte[] buffer, int count) + { + byte[] newBuffer = new byte[count]; + Buffer.BlockCopy(buffer, 0, newBuffer, 0, count); + _queue.Enqueue(newBuffer); + return Task.Delay(0); + } + + protected void Dispose(bool disposing) + { + if (disposing) + _cancelTokenSource.Cancel(); + } + public void Dispose() + { + Dispose(true); + } + } +} diff --git a/src/Discord.Net.WebSocket/Audio/Targets/DirectAudioTarget.cs b/src/Discord.Net.WebSocket/Audio/Targets/DirectAudioTarget.cs new file mode 100644 index 000000000..08cdfcfe6 --- /dev/null +++ b/src/Discord.Net.WebSocket/Audio/Targets/DirectAudioTarget.cs @@ -0,0 +1,16 @@ +using System.Threading.Tasks; + +namespace Discord.Audio +{ + internal class DirectAudioTarget : IAudioTarget + { + private readonly DiscordVoiceAPIClient _client; + public DirectAudioTarget(DiscordVoiceAPIClient client) + { + _client = client; + } + + public Task SendAsync(byte[] buffer, int count) + => _client.SendAsync(buffer, count); + } +} diff --git a/src/Discord.Net.WebSocket/Audio/Targets/IAudioTarget.cs b/src/Discord.Net.WebSocket/Audio/Targets/IAudioTarget.cs new file mode 100644 index 000000000..51b19a862 --- /dev/null +++ b/src/Discord.Net.WebSocket/Audio/Targets/IAudioTarget.cs @@ -0,0 +1,9 @@ +using System.Threading.Tasks; + +namespace Discord.Audio +{ + internal interface IAudioTarget + { + Task SendAsync(byte[] buffer, int count); + } +} diff --git a/src/Discord.Net.WebSocket/Discord.Net.WebSocket.csproj b/src/Discord.Net.WebSocket/Discord.Net.WebSocket.csproj index df16ad855..593d9e75e 100644 --- a/src/Discord.Net.WebSocket/Discord.Net.WebSocket.csproj +++ b/src/Discord.Net.WebSocket/Discord.Net.WebSocket.csproj @@ -1,4 +1,4 @@ - + A core Discord.Net library containing the WebSocket client and models. 1.0.0-beta2 diff --git a/src/Discord.Net.WebSocket/DiscordSocketClient.cs b/src/Discord.Net.WebSocket/DiscordSocketClient.cs index 647847498..c0bbb750b 100644 --- a/src/Discord.Net.WebSocket/DiscordSocketClient.cs +++ b/src/Discord.Net.WebSocket/DiscordSocketClient.cs @@ -1563,10 +1563,10 @@ namespace Discord.WebSocket { before = guild.GetVoiceState(data.UserId)?.Clone() ?? SocketVoiceState.Default; after = guild.AddOrUpdateVoiceState(State, data); - if (data.UserId == CurrentUser.Id) + /*if (data.UserId == CurrentUser.Id) { var _ = guild.FinishJoinAudioChannel().ConfigureAwait(false); - } + }*/ } else { diff --git a/src/Discord.Net.WebSocket/Entities/Channels/ISocketAudioChannel.cs b/src/Discord.Net.WebSocket/Entities/Channels/ISocketAudioChannel.cs index 7056a4df5..7b9bf07f0 100644 --- a/src/Discord.Net.WebSocket/Entities/Channels/ISocketAudioChannel.cs +++ b/src/Discord.Net.WebSocket/Entities/Channels/ISocketAudioChannel.cs @@ -1,6 +1,10 @@ -namespace Discord.WebSocket +using Discord.Audio; +using System.Threading.Tasks; + +namespace Discord.WebSocket { public interface ISocketAudioChannel : IAudioChannel { + Task ConnectAsync(); } } diff --git a/src/Discord.Net.WebSocket/Entities/Channels/SocketGroupChannel.cs b/src/Discord.Net.WebSocket/Entities/Channels/SocketGroupChannel.cs index 980b6dc2a..706d972df 100644 --- a/src/Discord.Net.WebSocket/Entities/Channels/SocketGroupChannel.cs +++ b/src/Discord.Net.WebSocket/Entities/Channels/SocketGroupChannel.cs @@ -1,4 +1,5 @@ -using Discord.Rest; +using Discord.Audio; +using Discord.Rest; using System; using System.Collections.Concurrent; using System.Collections.Generic; @@ -64,6 +65,11 @@ namespace Discord.WebSocket public Task LeaveAsync(RequestOptions options = null) => ChannelHelper.DeleteAsync(this, Discord, options); + public Task ConnectAsync() + { + throw new NotSupportedException("Voice is not yet supported for group channels."); + } + //Messages public SocketMessage GetCachedMessage(ulong id) => _messages?.Get(id); diff --git a/src/Discord.Net.WebSocket/Entities/Channels/SocketVoiceChannel.cs b/src/Discord.Net.WebSocket/Entities/Channels/SocketVoiceChannel.cs index a2c1e217b..072ccc787 100644 --- a/src/Discord.Net.WebSocket/Entities/Channels/SocketVoiceChannel.cs +++ b/src/Discord.Net.WebSocket/Entities/Channels/SocketVoiceChannel.cs @@ -41,6 +41,17 @@ namespace Discord.WebSocket public Task ModifyAsync(Action func, RequestOptions options = null) => ChannelHelper.ModifyAsync(this, Discord, func, options); + public async Task ConnectAsync() + { + var audioMode = Discord.AudioMode; + if (audioMode == AudioMode.Disabled) + throw new InvalidOperationException($"Audio is not enabled on this client, {nameof(DiscordSocketConfig.AudioMode)} in {nameof(DiscordSocketConfig)} must be set."); + + return await Guild.ConnectAudioAsync(Id, + (audioMode & AudioMode.Incoming) == 0, + (audioMode & AudioMode.Outgoing) == 0).ConfigureAwait(false); + } + public override SocketGuildUser GetUser(ulong id) { var user = Guild.GetUser(id); @@ -52,9 +63,6 @@ namespace Discord.WebSocket private string DebuggerDisplay => $"{Name} ({Id}, Voice)"; internal new SocketVoiceChannel Clone() => MemberwiseClone() as SocketVoiceChannel; - //IVoiceChannel - Task IVoiceChannel.ConnectAsync() { throw new NotSupportedException(); } - //IGuildChannel Task IGuildChannel.GetUserAsync(ulong id, CacheMode mode, RequestOptions options) => Task.FromResult(GetUser(id)); diff --git a/src/Discord.Net.WebSocket/Entities/Guilds/SocketGuild.cs b/src/Discord.Net.WebSocket/Entities/Guilds/SocketGuild.cs index 7ed5bf8f9..c9da2226a 100644 --- a/src/Discord.Net.WebSocket/Entities/Guilds/SocketGuild.cs +++ b/src/Discord.Net.WebSocket/Entities/Guilds/SocketGuild.cs @@ -421,34 +421,64 @@ namespace Discord.WebSocket } //Audio - public async Task DisconnectAudioAsync(AudioClient client = null) + internal async Task ConnectAudioAsync(ulong channelId, bool selfDeaf, bool selfMute) { + selfDeaf = false; + selfMute = false; + + TaskCompletionSource promise; + await _audioLock.WaitAsync().ConfigureAwait(false); try { - await DisconnectAudioInternalAsync(client).ConfigureAwait(false); + await DisconnectAudioInternalAsync().ConfigureAwait(false); + promise = new TaskCompletionSource(); + _audioConnectPromise = promise; + await Discord.ApiClient.SendVoiceStateUpdateAsync(Id, channelId, selfDeaf, selfMute).ConfigureAwait(false); + } + catch (Exception) + { + await DisconnectAudioInternalAsync().ConfigureAwait(false); + throw; } finally { _audioLock.Release(); } + + try + { + var timeoutTask = Task.Delay(15000); + if (await Task.WhenAny(promise.Task, timeoutTask) == timeoutTask) + throw new TimeoutException(); + return await promise.Task.ConfigureAwait(false); + } + catch (Exception) + { + await DisconnectAudioAsync().ConfigureAwait(false); + throw; + } } - private async Task DisconnectAudioInternalAsync(AudioClient client = null) + + internal async Task DisconnectAudioAsync() { - var oldClient = AudioClient; - if (oldClient != null) + await _audioLock.WaitAsync().ConfigureAwait(false); + try { - if (client == null || oldClient == client) - { - _audioConnectPromise?.TrySetCanceledAsync(); //Cancel any previous audio connection - _audioConnectPromise = null; - } - if (oldClient == client) - { - AudioClient = null; - await oldClient.DisconnectAsync().ConfigureAwait(false); - } + await DisconnectAudioInternalAsync().ConfigureAwait(false); } + finally + { + _audioLock.Release(); + } + } + private async Task DisconnectAudioInternalAsync() + { + _audioConnectPromise?.TrySetCanceledAsync(); //Cancel any previous audio connection + _audioConnectPromise = null; + if (AudioClient != null) + await AudioClient.DisconnectAsync().ConfigureAwait(false); + AudioClient = null; } internal async Task FinishConnectAudio(int id, string url, string token) { @@ -462,6 +492,14 @@ namespace Discord.WebSocket var audioClient = new AudioClient(this, id); audioClient.Disconnected += async ex => { + //If the initial connection hasn't been made yet, reconnecting will lead to deadlocks + if (!_audioConnectPromise.Task.IsCompleted) + { + try { audioClient.Dispose(); } catch { } + AudioClient = null; + return; + } + await _audioLock.WaitAsync().ConfigureAwait(false); try { @@ -476,14 +514,14 @@ namespace Discord.WebSocket { var voiceChannelId = voiceState2.Value.VoiceChannel?.Id; if (voiceChannelId != null) + { await Discord.ApiClient.SendVoiceStateUpdateAsync(Id, voiceChannelId, voiceState2.Value.IsSelfDeafened, voiceState2.Value.IsSelfMuted); + return; + } } } - else - { - try { AudioClient.Dispose(); } catch { } - AudioClient = null; - } + try { audioClient.Dispose(); } catch { } + AudioClient = null; } } finally @@ -498,25 +536,12 @@ namespace Discord.WebSocket } catch (OperationCanceledException) { - await DisconnectAudioAsync().ConfigureAwait(false); + await DisconnectAudioInternalAsync().ConfigureAwait(false); } catch (Exception e) { await _audioConnectPromise.SetExceptionAsync(e).ConfigureAwait(false); - await DisconnectAudioAsync().ConfigureAwait(false); - } - finally - { - _audioLock.Release(); - } - } - internal async Task FinishJoinAudioChannel() - { - await _audioLock.WaitAsync().ConfigureAwait(false); - try - { - if (AudioClient != null) - await _audioConnectPromise.TrySetResultAsync(AudioClient).ConfigureAwait(false); + await DisconnectAudioInternalAsync().ConfigureAwait(false); } finally { diff --git a/src/Discord.Net.WebSocket/Net/DefaultUdpSocket.cs b/src/Discord.Net.WebSocket/Net/DefaultUdpSocket.cs index 20620a3be..eb184e345 100644 --- a/src/Discord.Net.WebSocket/Net/DefaultUdpSocket.cs +++ b/src/Discord.Net.WebSocket/Net/DefaultUdpSocket.cs @@ -22,6 +22,7 @@ namespace Discord.Net.Udp public DefaultUdpSocket() { _lock = new SemaphoreSlim(1, 1); + _cancelTokenSource = new CancellationTokenSource(); } private void Dispose(bool disposing) { @@ -57,7 +58,7 @@ namespace Discord.Net.Udp _cancelTokenSource = new CancellationTokenSource(); _cancelToken = CancellationTokenSource.CreateLinkedTokenSource(_parentToken, _cancelTokenSource.Token).Token; - _udp = new UdpClient(); + _udp = new UdpClient(0); _task = RunAsync(_cancelToken); }