@@ -1,7 +1,6 @@
using Discord.API.Voice;
using Discord.Logging;
using Discord.Net.Converters;
using Discord.Net.WebSockets;
using Newtonsoft.Json;
using System;
using System.Threading;
@@ -9,7 +8,7 @@ using System.Threading.Tasks;
namespace Discord.Audio
{
internal class AudioClient : IAudioClient
internal class AudioClient : IAudioClient, IDisposable
{
public event Func<Task> Connected
{
@@ -17,12 +16,12 @@ namespace Discord.Audio
remove { _connectedEvent.Remove(value); }
}
private readonly AsyncEvent<Func<Task>> _connectedEvent = new AsyncEvent<Func<Task>>();
public event Func<Task> Disconnected
public event Func<Exception, Task> Disconnected
{
add { _disconnectedEvent.Add(value); }
remove { _disconnectedEvent.Remove(value); }
}
private readonly AsyncEvent<Func<Task>> _disconnectedEvent = new AsyncEvent<Func<Task>>();
private readonly AsyncEvent<Func<Exception, Task>> _disconnectedEvent = new AsyncEvent<Func<Exception, Task>>();
public event Func<int, int, Task> LatencyUpdated
{
add { _latencyUpdatedEvent.Add(value); }
@@ -34,28 +33,30 @@ namespace Discord.Audio
#if BENCHMARK
private readonly ILogger _benchmarkLogger;
#endif
private readonly JsonSerializer _serializer;
internal readonly SemaphoreSlim _connectionLock;
private readonly JsonSerializer _serializer;
private TaskCompletionSource<bool> _connectTask;
private CancellationTokenSource _cancelToken;
private Task _heartbeatTask, _reconnectTask ;
private Task _heartbeatTask;
private long _heartbeatTime;
private bool _isReconnecting;
private string _url;
private bool _isDisposed;
private DiscordSocketClient Discor d { get; }
public CachedGuild Guil d { get; }
public DiscordVoiceAPIClient ApiClient { get; private set; }
public ConnectionState ConnectionState { get; private set; }
public int Latency { get; private set; }
private DiscordSocketClient Discord => Guild.Discord;
/// <summary> Creates a new REST/WebSocket discord client. </summary>
internal AudioClient(DiscordSocketClient discord, ulong guildId, ulong userId, string sessionId, string token, WebSocketProvider webSocketProvider, ILogManager logManager)
internal AudioClient(CachedGuild guild )
{
Discord = discor d;
Guild = guil d;
_webSocketLogger = l ogManager.CreateLogger("Audio");
_udpLogger = l ogManager.CreateLogger("AudioUDP");
_webSocketLogger = Discord.L ogManager.CreateLogger("Audio");
_udpLogger = Discord.L ogManager.CreateLogger("AudioUDP");
#if BENCHMARK
_benchmarkLogger = logManager.CreateLogger("Benchmark");
#endif
@@ -69,38 +70,34 @@ namespace Discord.Audio
e.ErrorContext.Handled = true;
};
ApiClient = new DiscordVoiceAPIClient(guildId, userId, sessionId, token, w ebSocketProvider);
ApiClient = new DiscordVoiceAPIClient(guild.Id, Discord.W ebSocketProvider);
ApiClient.SentGatewayMessage += async opCode => await _webSocketLogger.DebugAsync($"Sent {(VoiceOpCode)opCode}").ConfigureAwait(false);
ApiClient.ReceivedEvent += ProcessMessageAsync;
ApiClient.Disconnected += async ex =>
{
if (ex != null)
{
await _webSocketLogger.WarningAsync($"Connection Closed: {ex.Message}").ConfigureAwait(false);
await StartReconnectAsync().ConfigureAwait(false);
}
else
await _webSocketLogger.WarningAsync($"Connection Closed").ConfigureAwait(false);
};
}
/// <inheritdoc />
public async Task ConnectAsync(string url)
public async Task ConnectAsync(string url, ulong userId, string sessionId, string token )
{
await _connectionLock.WaitAsync().ConfigureAwait(false);
try
{
_isReconnecting = false;
await ConnectInternalAsync(url).ConfigureAwait(false);
await ConnectInternalAsync(url, userId, sessionId, token).ConfigureAwait(false);
}
finally { _connectionLock.Release(); }
}
private async Task ConnectInternalAsync(string url)
private async Task ConnectInternalAsync(string url, ulong userId, string sessionId, string token )
{
var state = ConnectionState;
if (state == ConnectionState.Connecting || state == ConnectionState.Connected)
await DisconnectInternalAsync().ConfigureAwait(false);
await DisconnectInternalAsync(null ).ConfigureAwait(false);
ConnectionState = ConnectionState.Connecting;
await _webSocketLogger.InfoAsync("Connecting").ConfigureAwait(false);
@@ -109,7 +106,7 @@ namespace Discord.Audio
_url = url;
_connectTask = new TaskCompletionSource<bool>();
_cancelToken = new CancellationTokenSource();
await ApiClient.ConnectAsync(url).ConfigureAwait(false);
await ApiClient.ConnectAsync(url, userId, sessionId, token ).ConfigureAwait(false);
await _connectedEvent.InvokeAsync().ConfigureAwait(false);
await _connectTask.Task.ConfigureAwait(false);
@@ -119,7 +116,7 @@ namespace Discord.Audio
}
catch (Exception)
{
await DisconnectInternalAsync().ConfigureAwait(false);
await DisconnectInternalAsync(null ).ConfigureAwait(false);
throw;
}
}
@@ -129,12 +126,20 @@ namespace Discord.Audio
await _connectionLock.WaitAsync().ConfigureAwait(false);
try
{
_isReconnecting = false;
await DisconnectInternalAsync().ConfigureAwait(false);
await DisconnectInternalAsync(null).ConfigureAwait(false);
}
finally { _connectionLock.Release(); }
}
private async Task DisconnectAsync(Exception ex)
{
await _connectionLock.WaitAsync().ConfigureAwait(false);
try
{
await DisconnectInternalAsync(ex).ConfigureAwait(false);
}
finally { _connectionLock.Release(); }
}
private async Task DisconnectInternalAsync()
private async Task DisconnectInternalAsync(Exception ex )
{
if (ConnectionState == ConnectionState.Disconnected) return;
ConnectionState = ConnectionState.Disconnecting;
@@ -155,61 +160,7 @@ namespace Discord.Audio
ConnectionState = ConnectionState.Disconnected;
await _webSocketLogger.InfoAsync("Disconnected").ConfigureAwait(false);
await _disconnectedEvent.InvokeAsync().ConfigureAwait(false);
}
private async Task StartReconnectAsync()
{
//TODO: Is this thread-safe?
if (_reconnectTask != null) return;
await _connectionLock.WaitAsync().ConfigureAwait(false);
try
{
if (_reconnectTask != null) return;
_isReconnecting = true;
_reconnectTask = ReconnectInternalAsync();
}
finally { _connectionLock.Release(); }
}
private async Task ReconnectInternalAsync()
{
try
{
int nextReconnectDelay = 1000;
while (_isReconnecting)
{
try
{
await Task.Delay(nextReconnectDelay).ConfigureAwait(false);
nextReconnectDelay *= 2;
if (nextReconnectDelay > 30000)
nextReconnectDelay = 30000;
await _connectionLock.WaitAsync().ConfigureAwait(false);
try
{
await ConnectInternalAsync(_url).ConfigureAwait(false);
}
finally { _connectionLock.Release(); }
return;
}
catch (Exception ex)
{
await _webSocketLogger.WarningAsync("Reconnect failed", ex).ConfigureAwait(false);
}
}
}
finally
{
await _connectionLock.WaitAsync().ConfigureAwait(false);
try
{
_isReconnecting = false;
_reconnectTask = null;
}
finally { _connectionLock.Release(); }
}
await _disconnectedEvent.InvokeAsync(ex).ConfigureAwait(false);
}
private async Task ProcessMessageAsync(VoiceOpCode opCode, object payload)
@@ -285,7 +236,7 @@ namespace Discord.Audio
if (ConnectionState == ConnectionState.Connected)
{
await _webSocketLogger.WarningAsync("Server missed last heartbeat").ConfigureAwait(false);
await StartReconnectAsync( ).ConfigureAwait(false);
await DisconnectInternalAsync(new Exception("Server missed last heartbeat") ).ConfigureAwait(false);
return;
}
}
@@ -296,5 +247,14 @@ namespace Discord.Audio
}
catch (OperationCanceledException) { }
}
internal virtual void Dispose(bool disposing)
{
if (!_isDisposed)
_isDisposed = true;
ApiClient.Dispose();
}
/// <inheritdoc />
public void Dispose() => Dispose(true);
}
}