@@ -1,13 +1,9 @@
using Discord.API;
using Discord.API.Voice ;
using Discord.API.Voice ;
using Discord.Logging ;
using Discord.Net.Converters;
using Discord.Net.WebSockets;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.Diagnostics;
using System.Globalization;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
@@ -15,60 +11,115 @@ namespace Discord.Audio
{
public class AudioClient
{
public const int MaxBitrate = 128;
private const string Mode = "xsalsa20_poly1305";
public event Func<Task> Connected
{
add { _connectedEvent.Add(value); }
remove { _connectedEvent.Remove(value); }
}
private readonly AsyncEvent<Func<Task>> _connectedEvent = new AsyncEvent<Func<Task>>();
public event Func<Task> Disconnected
{
add { _disconnectedEvent.Add(value); }
remove { _disconnectedEvent.Remove(value); }
}
private readonly AsyncEvent<Func<Task>> _disconnectedEvent = new AsyncEvent<Func<Task>>();
public event Func<int, int, Task> LatencyUpdated
{
add { _latencyUpdatedEvent.Add(value); }
remove { _latencyUpdatedEvent.Remove(value); }
}
private readonly AsyncEvent<Func<int, int, Task>> _latencyUpdatedEvent = new AsyncEvent<Func<int, int, Task>>();
private readonly ILogger _webSocketLogger;
#if BENCHMARK
private readonly ILogger _benchmarkLogger;
#endif
private readonly JsonSerializer _serializer;
private readonly IWebSocketClient _gatewayClient;
private readonly SemaphoreSlim _connectionLock;
private CancellationTokenSource _connectCancelToken;
private readonly int _connectionTimeout, _reconnectDelay, _failedReconnectDelay;
internal readonly SemaphoreSlim _connectionLock;
private TaskCompletionSource<bool> _connectTask;
private CancellationTokenSource _cancelToken;
private Task _heartbeatTask, _reconnectTask;
private long _heartbeatTime;
private bool _isReconnecting;
private string _url;
public AudioAPIClient ApiClient { get; private set; }
/// <summary> Gets the current connection state of this client. </summary>
public ConnectionState ConnectionState { get; private set; }
/// <summary> Gets the estimated round-trip latency, in milliseconds, to the gateway server. </summary>
public int Latency { get; private set; }
internal AudioClient(WebSocketProvider provider, JsonSerializer serializer = null)
/// <summary> Creates a new REST/WebSocket discord client. </summary>
internal AudioClient(ulong guildId, ulong userId, string sessionId, string token, AudioConfig config, ILogManager logManager)
{
_connectionLock = new SemaphoreSlim(1, 1);
_connectionTimeout = config.ConnectionTimeout;
_reconnectDelay = config.ReconnectDelay;
_failedReconnectDelay = config.FailedReconnectDelay;
_serializer = serializer ?? new JsonSerializer { ContractResolver = new DiscordContractResolver() };
}
_webSocketLogger = logManager.CreateLogger("AudioWS");
#if BENCHMARK
_benchmarkLogger = logManager.CreateLogger("Benchmark");
#endif
public Task SendAsync(VoiceOpCode opCode, object payload, RequestOptions options = null)
{
byte[] bytes = null;
payload = new WebSocketMessage { Operation = (int)opCode, Payload = payload };
if (payload != null)
bytes = Encoding.UTF8.GetBytes(SerializeJson(payload));
//TODO: Send
return Task.CompletedTask;
}
_connectionLock = new SemaphoreSlim(1, 1);
//Gateway
public async Task SendHeartbeatAsync(int lastSeq, RequestOptions options = null)
{
await SendAsync(VoiceOpCode.Heartbeat, lastSeq, options: options).ConfigureAwait(false);
}
_serializer = new JsonSerializer { ContractResolver = new DiscordContractResolver() };
_serializer.Error += (s, e) =>
{
_webSocketLogger.WarningAsync(e.ErrorContext.Error).GetAwaiter().GetResult();
e.ErrorContext.Handled = true;
};
var webSocketProvider = config.WebSocketProvider; //TODO: Clean this check
ApiClient = new AudioAPIClient(guildId, userId, sessionId, token, config.WebSocketProvider);
ApiClient.SentGatewayMessage += async opCode => await _webSocketLogger.DebugAsync($"Sent {(VoiceOpCode)opCode}").ConfigureAwait(false);
ApiClient.ReceivedEvent += ProcessMessageAsync;
ApiClient.Disconnected += async ex =>
{
if (ex != null)
{
await _webSocketLogger.WarningAsync($"Connection Closed: {ex.Message}").ConfigureAwait(false);
await StartReconnectAsync().ConfigureAwait(false);
}
else
await _webSocketLogger.WarningAsync($"Connection Closed").ConfigureAwait(false);
};
}
/// <inheritdoc />
public async Task ConnectAsync(string url)
{
await _connectionLock.WaitAsync().ConfigureAwait(false);
try
{
_isReconnecting = false;
await ConnectInternalAsync(url).ConfigureAwait(false);
}
finally { _connectionLock.Release(); }
}
private async Task ConnectInternalAsync(string url)
{
var state = ConnectionState;
if (state == ConnectionState.Connecting || state == ConnectionState.Connected)
await DisconnectInternalAsync().ConfigureAwait(false);
ConnectionState = ConnectionState.Connecting;
await _webSocketLogger.InfoAsync("Connecting").ConfigureAwait(false);
try
{
_connectCancelToken = new CancellationTokenSource();
_gatewayClient.SetCancelToken(_connectCancelToken.Token);
await _gatewayClient.ConnectAsync(url).ConfigureAwait(false);
_url = url;
_connectTask = new TaskCompletionSource<bool>();
_cancelToken = new CancellationTokenSource();
await ApiClient.ConnectAsync(url).ConfigureAwait(false);
await _connectedEvent.InvokeAsync().ConfigureAwait(false);
await _connectTask.Task.ConfigureAwait(false);
ConnectionState = ConnectionState.Connected;
await _webSocketLogger.InfoAsync("Connected").ConfigureAwait(false);
}
catch (Exception)
{
@@ -76,12 +127,13 @@ namespace Discord.Audio
throw;
}
}
/// <inheritdoc />
public async Task DisconnectAsync()
{
await _connectionLock.WaitAsync().ConfigureAwait(false);
try
{
_isReconnecting = false;
await DisconnectInternalAsync().ConfigureAwait(false);
}
finally { _connectionLock.Release(); }
@@ -90,30 +142,163 @@ namespace Discord.Audio
{
if (ConnectionState == ConnectionState.Disconnected) return;
ConnectionState = ConnectionState.Disconnecting;
try { _connectCancelToken?.Cancel(false); }
catch { }
await _webSocketLogger.InfoAsync("Disconnecting").ConfigureAwait(false);
await _gatewayClient.DisconnectAsync().ConfigureAwait(false);
//Signal tasks to complete
try { _cancelToken.Cancel(); } catch { }
//Disconnect from server
await ApiClient.DisconnectAsync().ConfigureAwait(false);
//Wait for tasks to complete
var heartbeatTask = _heartbeatTask;
if (heartbeatTask != null)
await heartbeatTask.ConfigureAwait(false);
_heartbeatTask = null;
ConnectionState = ConnectionState.Disconnected;
await _webSocketLogger.InfoAsync("Disconnected").ConfigureAwait(false);
await _disconnectedEvent.InvokeAsync().ConfigureAwait(false);
}
private async Task StartReconnectAsync()
{
//TODO: Is this thread-safe?
if (_reconnectTask != null) return;
await _connectionLock.WaitAsync().ConfigureAwait(false);
try
{
if (_reconnectTask != null) return;
_isReconnecting = true;
_reconnectTask = ReconnectInternalAsync();
}
finally { _connectionLock.Release(); }
}
private async Task ReconnectInternalAsync()
{
try
{
int nextReconnectDelay = 1000;
while (_isReconnecting)
{
try
{
await Task.Delay(nextReconnectDelay).ConfigureAwait(false);
nextReconnectDelay *= 2;
if (nextReconnectDelay > 30000)
nextReconnectDelay = 30000;
await _connectionLock.WaitAsync().ConfigureAwait(false);
try
{
await ConnectInternalAsync(_url).ConfigureAwait(false);
}
finally { _connectionLock.Release(); }
return;
}
catch (Exception ex)
{
await _webSocketLogger.WarningAsync("Reconnect failed", ex).ConfigureAwait(false);
}
}
}
finally
{
await _connectionLock.WaitAsync().ConfigureAwait(false);
try
{
_isReconnecting = false;
_reconnectTask = null;
}
finally { _connectionLock.Release(); }
}
}
//Helpers
private static double ToMilliseconds(Stopwatch stopwatch) => Math.Round((double)stopwatch.ElapsedTicks / (double)Stopwatch.Frequency * 1000.0, 2);
private string SerializeJson(object value)
private async Task ProcessMessageAsync(VoiceOpCode opCode, object payload)
{
var sb = new StringBuilder(256);
using (TextWriter text = new StringWriter(sb, CultureInfo.InvariantCulture))
using (JsonWriter writer = new JsonTextWriter(text))
_serializer.Serialize(writer, value);
return sb.ToString();
#if BENCHMARK
Stopwatch stopwatch = Stopwatch.StartNew();
try
{
#endif
try
{
switch (opCode)
{
/*case VoiceOpCode.Ready:
{
await _webSocketLogger.DebugAsync("Received Ready").ConfigureAwait(false);
var data = (payload as JToken).ToObject<ReadyEvent>(_serializer);
_heartbeatTime = 0;
_heartbeatTask = RunHeartbeatAsync(data.HeartbeatInterval, _cancelToken.Token);
}
break;*/
case VoiceOpCode.HeartbeatAck:
{
await _webSocketLogger.DebugAsync("Received HeartbeatAck").ConfigureAwait(false);
var heartbeatTime = _heartbeatTime;
if (heartbeatTime != 0)
{
int latency = (int)(Environment.TickCount - _heartbeatTime);
_heartbeatTime = 0;
await _webSocketLogger.VerboseAsync($"Latency = {latency} ms").ConfigureAwait(false);
int before = Latency;
Latency = latency;
await _latencyUpdatedEvent.InvokeAsync(before, latency).ConfigureAwait(false);
}
}
break;
default:
await _webSocketLogger.WarningAsync($"Unknown OpCode ({opCode})").ConfigureAwait(false);
return;
}
}
catch (Exception ex)
{
await _webSocketLogger.ErrorAsync($"Error handling {opCode}", ex).ConfigureAwait(false);
return;
}
#if BENCHMARK
}
finally
{
stopwatch.Stop();
double millis = Math.Round(stopwatch.ElapsedTicks / (double)Stopwatch.Frequency * 1000.0, 2);
await _benchmarkLogger.DebugAsync($"{millis} ms").ConfigureAwait(false);
}
#endif
}
private T DeserializeJson<T>(Stream jsonStream)
private async Task RunHeartbeatAsync(int intervalMillis, CancellationToken cancelToken)
{
using (TextReader text = new StreamReader(jsonStream))
using (JsonReader reader = new JsonTextReader(text))
return _serializer.Deserialize<T>(reader);
//Clean this up when Discord's session patch is live
try
{
while (!cancelToken.IsCancellationRequested)
{
await Task.Delay(intervalMillis, cancelToken).ConfigureAwait(false);
if (_heartbeatTime != 0) //Server never responded to our last heartbeat
{
if (ConnectionState == ConnectionState.Connected)
{
await _webSocketLogger.WarningAsync("Server missed last heartbeat").ConfigureAwait(false);
await StartReconnectAsync().ConfigureAwait(false);
return;
}
}
else
_heartbeatTime = Environment.TickCount;
await ApiClient.SendHeartbeatAsync().ConfigureAwait(false);
}
}
catch (OperationCanceledException) { }
}
}
}