From 547d7d241f92ab8c7dcbdfae7863b5f37d2b68d5 Mon Sep 17 00:00:00 2001 From: RogueException Date: Sat, 28 Jan 2017 19:04:20 -0400 Subject: [PATCH] Added a few debug tools for stability testing --- .../Discord.Net.DebugTools.csproj | 31 +++ .../UnstableUdpClient.cs | 142 +++++++++++ .../UnstableUdpClientProvider.cs | 9 + .../UnstableWebSocketClient.cs | 237 ++++++++++++++++++ .../UnstableWebSocketClientProvider.cs | 9 + 5 files changed, 428 insertions(+) create mode 100644 src/Discord.Net.DebugTools/Discord.Net.DebugTools.csproj create mode 100644 src/Discord.Net.DebugTools/UnstableUdpClient.cs create mode 100644 src/Discord.Net.DebugTools/UnstableUdpClientProvider.cs create mode 100644 src/Discord.Net.DebugTools/UnstableWebSocketClient.cs create mode 100644 src/Discord.Net.DebugTools/UnstableWebSocketClientProvider.cs diff --git a/src/Discord.Net.DebugTools/Discord.Net.DebugTools.csproj b/src/Discord.Net.DebugTools/Discord.Net.DebugTools.csproj new file mode 100644 index 000000000..da0c8b0fd --- /dev/null +++ b/src/Discord.Net.DebugTools/Discord.Net.DebugTools.csproj @@ -0,0 +1,31 @@ + + + 1.0.0 + rc-dev + rc-$(BuildNumber) + netstandard1.6 + Discord.Net.DebugTools + RogueException + A Discord.Net extension adding random helper classes for diagnosing issues. + discord;discordapp + https://github.com/RogueException/Discord.Net + http://opensource.org/licenses/MIT + git + git://github.com/RogueException/Discord.Net + Discord + true + + + + + + + + + + + $(NoWarn);CS1573;CS1591 + true + true + + \ No newline at end of file diff --git a/src/Discord.Net.DebugTools/UnstableUdpClient.cs b/src/Discord.Net.DebugTools/UnstableUdpClient.cs new file mode 100644 index 000000000..297c689cf --- /dev/null +++ b/src/Discord.Net.DebugTools/UnstableUdpClient.cs @@ -0,0 +1,142 @@ +using Discord.Net.Udp; +using System; +using System.Net; +using System.Net.Sockets; +using System.Threading; +using System.Threading.Tasks; + +namespace Discord.Net.Providers.UnstableUdpSocket +{ + internal class UnstableUdpSocket : IUdpSocket, IDisposable + { + private const double FailureRate = 0.10; //10% + + public event Func ReceivedDatagram; + + private readonly SemaphoreSlim _lock; + private readonly Random _rand; + private UdpClient _udp; + private IPEndPoint _destination; + private CancellationTokenSource _cancelTokenSource; + private CancellationToken _cancelToken, _parentToken; + private Task _task; + private bool _isDisposed; + + public UnstableUdpSocket() + { + _lock = new SemaphoreSlim(1, 1); + _rand = new Random(); + _cancelTokenSource = new CancellationTokenSource(); + } + private void Dispose(bool disposing) + { + if (!_isDisposed) + { + if (disposing) + StopInternalAsync(true).GetAwaiter().GetResult(); + _isDisposed = true; + } + } + public void Dispose() + { + Dispose(true); + } + + + public async Task StartAsync() + { + await _lock.WaitAsync().ConfigureAwait(false); + try + { + await StartInternalAsync(_cancelToken).ConfigureAwait(false); + } + finally + { + _lock.Release(); + } + } + public async Task StartInternalAsync(CancellationToken cancelToken) + { + await StopInternalAsync().ConfigureAwait(false); + + _cancelTokenSource = new CancellationTokenSource(); + _cancelToken = CancellationTokenSource.CreateLinkedTokenSource(_parentToken, _cancelTokenSource.Token).Token; + + _udp = new UdpClient(0); + + _task = RunAsync(_cancelToken); + } + public async Task StopAsync() + { + await _lock.WaitAsync().ConfigureAwait(false); + try + { + await StopInternalAsync().ConfigureAwait(false); + } + finally + { + _lock.Release(); + } + } + public async Task StopInternalAsync(bool isDisposing = false) + { + try { _cancelTokenSource.Cancel(false); } catch { } + + if (!isDisposing) + await (_task ?? Task.Delay(0)).ConfigureAwait(false); + + if (_udp != null) + { + try { _udp.Dispose(); } + catch { } + _udp = null; + } + } + + public void SetDestination(string host, int port) + { + var entry = Dns.GetHostEntryAsync(host).GetAwaiter().GetResult(); + _destination = new IPEndPoint(entry.AddressList[0], port); + } + public void SetCancelToken(CancellationToken cancelToken) + { + _parentToken = cancelToken; + _cancelToken = CancellationTokenSource.CreateLinkedTokenSource(_parentToken, _cancelTokenSource.Token).Token; + } + + public async Task SendAsync(byte[] data, int index, int count) + { + if (!UnstableCheck()) + return; + + if (index != 0) //Should never happen? + { + var newData = new byte[count]; + Buffer.BlockCopy(data, index, newData, 0, count); + data = newData; + } + + await _udp.SendAsync(data, count, _destination).ConfigureAwait(false); + } + + private async Task RunAsync(CancellationToken cancelToken) + { + var closeTask = Task.Delay(-1, cancelToken); + while (!cancelToken.IsCancellationRequested) + { + var receiveTask = _udp.ReceiveAsync(); + var task = await Task.WhenAny(closeTask, receiveTask).ConfigureAwait(false); + if (task == closeTask) + break; + + var result = receiveTask.Result; + await ReceivedDatagram(result.Buffer, 0, result.Buffer.Length).ConfigureAwait(false); + } + } + + private bool UnstableCheck() + { + return _rand.NextDouble() > FailureRate; + } + } +} \ No newline at end of file diff --git a/src/Discord.Net.DebugTools/UnstableUdpClientProvider.cs b/src/Discord.Net.DebugTools/UnstableUdpClientProvider.cs new file mode 100644 index 000000000..e78514602 --- /dev/null +++ b/src/Discord.Net.DebugTools/UnstableUdpClientProvider.cs @@ -0,0 +1,9 @@ +using Discord.Net.Udp; + +namespace Discord.Net.Providers.UnstableUdpSocket +{ + public static class UnstableUdpSocketProvider + { + public static readonly UdpSocketProvider Instance = () => new UnstableUdpSocket(); + } +} diff --git a/src/Discord.Net.DebugTools/UnstableWebSocketClient.cs b/src/Discord.Net.DebugTools/UnstableWebSocketClient.cs new file mode 100644 index 000000000..e12e103c5 --- /dev/null +++ b/src/Discord.Net.DebugTools/UnstableWebSocketClient.cs @@ -0,0 +1,237 @@ +using Discord.Net.WebSockets; +using System; +using System.Collections.Generic; +using System.ComponentModel; +using System.IO; +using System.Net.WebSockets; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Discord.Net.Providers.UnstableWebSocket +{ + internal class UnstableWebSocketClient : IWebSocketClient, IDisposable + { + public const int ReceiveChunkSize = 16 * 1024; //16KB + public const int SendChunkSize = 4 * 1024; //4KB + private const int HR_TIMEOUT = -2147012894; + private const double FailureRate = 0.10; //10% + + public event Func BinaryMessage; + public event Func TextMessage; + public event Func Closed; + + private readonly SemaphoreSlim _lock; + private readonly Dictionary _headers; + private readonly Random _rand; + private ClientWebSocket _client; + private Task _task; + private CancellationTokenSource _cancelTokenSource; + private CancellationToken _cancelToken, _parentToken; + private bool _isDisposed; + + public UnstableWebSocketClient() + { + _lock = new SemaphoreSlim(1, 1); + _rand = new Random(); + _cancelTokenSource = new CancellationTokenSource(); + _cancelToken = CancellationToken.None; + _parentToken = CancellationToken.None; + _headers = new Dictionary(); + } + private void Dispose(bool disposing) + { + if (!_isDisposed) + { + if (disposing) + DisconnectInternalAsync(true).GetAwaiter().GetResult(); + _isDisposed = true; + } + } + public void Dispose() + { + Dispose(true); + } + + public async Task ConnectAsync(string host) + { + await _lock.WaitAsync().ConfigureAwait(false); + try + { + await ConnectInternalAsync(host).ConfigureAwait(false); + } + finally + { + _lock.Release(); + } + } + private async Task ConnectInternalAsync(string host) + { + await DisconnectInternalAsync().ConfigureAwait(false); + + _cancelTokenSource = new CancellationTokenSource(); + _cancelToken = CancellationTokenSource.CreateLinkedTokenSource(_parentToken, _cancelTokenSource.Token).Token; + + _client = new ClientWebSocket(); + _client.Options.Proxy = null; + _client.Options.KeepAliveInterval = TimeSpan.Zero; + foreach (var header in _headers) + { + if (header.Value != null) + _client.Options.SetRequestHeader(header.Key, header.Value); + } + + await _client.ConnectAsync(new Uri(host), _cancelToken).ConfigureAwait(false); + _task = RunAsync(_cancelToken); + } + + public async Task DisconnectAsync() + { + await _lock.WaitAsync().ConfigureAwait(false); + try + { + await DisconnectInternalAsync().ConfigureAwait(false); + } + finally + { + _lock.Release(); + } + } + private async Task DisconnectInternalAsync(bool isDisposing = false) + { + try { _cancelTokenSource.Cancel(false); } catch { } + + if (!isDisposing) + await (_task ?? Task.Delay(0)).ConfigureAwait(false); + + if (_client != null && _client.State == WebSocketState.Open) + { + var token = new CancellationToken(); + if (!isDisposing) + { + try { await _client.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "", token); } + catch { } + } + try { _client.Dispose(); } + catch { } + _client = null; + } + } + + public void SetHeader(string key, string value) + { + _headers[key] = value; + } + public void SetCancelToken(CancellationToken cancelToken) + { + _parentToken = cancelToken; + _cancelToken = CancellationTokenSource.CreateLinkedTokenSource(_parentToken, _cancelTokenSource.Token).Token; + } + + public async Task SendAsync(byte[] data, int index, int count, bool isText) + { + await _lock.WaitAsync().ConfigureAwait(false); + try + { + if (!UnstableCheck()) + return; + + if (_client == null) return; + + int frameCount = (int)Math.Ceiling((double)count / SendChunkSize); + + for (int i = 0; i < frameCount; i++, index += SendChunkSize) + { + bool isLast = i == (frameCount - 1); + + int frameSize; + if (isLast) + frameSize = count - (i * SendChunkSize); + else + frameSize = SendChunkSize; + + var type = isText ? WebSocketMessageType.Text : WebSocketMessageType.Binary; + await _client.SendAsync(new ArraySegment(data, index, count), type, isLast, _cancelToken).ConfigureAwait(false); + } + } + finally + { + _lock.Release(); + } + } + + private async Task RunAsync(CancellationToken cancelToken) + { + var buffer = new ArraySegment(new byte[ReceiveChunkSize]); + + try + { + while (!cancelToken.IsCancellationRequested) + { + WebSocketReceiveResult socketResult = await _client.ReceiveAsync(buffer, cancelToken).ConfigureAwait(false); + byte[] result; + int resultCount; + + if (socketResult.MessageType == WebSocketMessageType.Close) + { + var _ = Closed(new WebSocketClosedException((int)socketResult.CloseStatus, socketResult.CloseStatusDescription)); + return; + } + + if (!socketResult.EndOfMessage) + { + //This is a large message (likely just READY), lets create a temporary expandable stream + using (var stream = new MemoryStream()) + { + stream.Write(buffer.Array, 0, socketResult.Count); + do + { + if (cancelToken.IsCancellationRequested) return; + socketResult = await _client.ReceiveAsync(buffer, cancelToken).ConfigureAwait(false); + stream.Write(buffer.Array, 0, socketResult.Count); + } + while (socketResult == null || !socketResult.EndOfMessage); + + //Use the internal buffer if we can get it + resultCount = (int)stream.Length; + ArraySegment streamBuffer; + if (stream.TryGetBuffer(out streamBuffer)) + result = streamBuffer.Array; + else + result = stream.ToArray(); + } + } + else + { + //Small message + resultCount = socketResult.Count; + result = buffer.Array; + } + + if (socketResult.MessageType == WebSocketMessageType.Text) + { + string text = Encoding.UTF8.GetString(result, 0, resultCount); + await TextMessage(text).ConfigureAwait(false); + } + else + await BinaryMessage(result, 0, resultCount).ConfigureAwait(false); + } + } + catch (Win32Exception ex) when (ex.HResult == HR_TIMEOUT) + { + var _ = Closed(new Exception("Connection timed out.", ex)); + } + catch (OperationCanceledException) { } + catch (Exception ex) + { + //This cannot be awaited otherwise we'll deadlock when DiscordApiClient waits for this task to complete. + var _ = Closed(ex); + } + } + + private bool UnstableCheck() + { + return _rand.NextDouble() > FailureRate; + } + } +} \ No newline at end of file diff --git a/src/Discord.Net.DebugTools/UnstableWebSocketClientProvider.cs b/src/Discord.Net.DebugTools/UnstableWebSocketClientProvider.cs new file mode 100644 index 000000000..9619e8882 --- /dev/null +++ b/src/Discord.Net.DebugTools/UnstableWebSocketClientProvider.cs @@ -0,0 +1,9 @@ +using Discord.Net.WebSockets; + +namespace Discord.Net.Providers.UnstableWebSocket +{ + public static class UnstableWebSocketProvider + { + public static readonly WebSocketProvider Instance = () => new UnstableWebSocketClient(); + } +}