You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

UnstableWebSocketClient.cs 9.4 kB

8 years ago
8 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. using Discord.Net.WebSockets;
  2. using System;
  3. using System.Collections.Generic;
  4. using System.ComponentModel;
  5. using System.IO;
  6. using System.Net.WebSockets;
  7. using System.Text;
  8. using System.Threading;
  9. using System.Threading.Tasks;
  10. namespace Discord.Net.Providers.UnstableWebSocket
  11. {
  12. internal class UnstableWebSocketClient : IWebSocketClient, IDisposable
  13. {
  14. public const int ReceiveChunkSize = 16 * 1024; //16KB
  15. public const int SendChunkSize = 4 * 1024; //4KB
  16. private const int HR_TIMEOUT = -2147012894;
  17. private const double FailureRate = 0.10; //10%
  18. public event Func<byte[], int, int, Task> BinaryMessage;
  19. public event Func<string, Task> TextMessage;
  20. public event Func<Exception, Task> Closed;
  21. private readonly SemaphoreSlim _lock;
  22. private readonly Dictionary<string, string> _headers;
  23. private readonly Random _rand;
  24. private ClientWebSocket _client;
  25. private Task _task;
  26. private CancellationTokenSource _cancelTokenSource;
  27. private CancellationToken _cancelToken, _parentToken;
  28. private bool _isDisposed, _isDisconnecting;
  29. public UnstableWebSocketClient()
  30. {
  31. _lock = new SemaphoreSlim(1, 1);
  32. _rand = new Random();
  33. _cancelTokenSource = new CancellationTokenSource();
  34. _cancelToken = CancellationToken.None;
  35. _parentToken = CancellationToken.None;
  36. _headers = new Dictionary<string, string>();
  37. }
  38. private void Dispose(bool disposing)
  39. {
  40. if (!_isDisposed)
  41. {
  42. if (disposing)
  43. DisconnectInternalAsync(true).GetAwaiter().GetResult();
  44. _isDisposed = true;
  45. }
  46. }
  47. public void Dispose()
  48. {
  49. Dispose(true);
  50. }
  51. public async Task ConnectAsync(string host)
  52. {
  53. await _lock.WaitAsync().ConfigureAwait(false);
  54. try
  55. {
  56. await ConnectInternalAsync(host).ConfigureAwait(false);
  57. }
  58. finally
  59. {
  60. _lock.Release();
  61. }
  62. }
  63. private async Task ConnectInternalAsync(string host)
  64. {
  65. await DisconnectInternalAsync().ConfigureAwait(false);
  66. _cancelTokenSource = new CancellationTokenSource();
  67. _cancelToken = CancellationTokenSource.CreateLinkedTokenSource(_parentToken, _cancelTokenSource.Token).Token;
  68. _client = new ClientWebSocket();
  69. _client.Options.Proxy = null;
  70. _client.Options.KeepAliveInterval = TimeSpan.Zero;
  71. foreach (var header in _headers)
  72. {
  73. if (header.Value != null)
  74. _client.Options.SetRequestHeader(header.Key, header.Value);
  75. }
  76. await _client.ConnectAsync(new Uri(host), _cancelToken).ConfigureAwait(false);
  77. _task = RunAsync(_cancelToken);
  78. }
  79. public async Task DisconnectAsync()
  80. {
  81. await _lock.WaitAsync().ConfigureAwait(false);
  82. try
  83. {
  84. await DisconnectInternalAsync().ConfigureAwait(false);
  85. }
  86. finally
  87. {
  88. _lock.Release();
  89. }
  90. }
  91. private async Task DisconnectInternalAsync(bool isDisposing = false)
  92. {
  93. try { _cancelTokenSource.Cancel(false); } catch { }
  94. _isDisconnecting = true;
  95. try
  96. {
  97. await (_task ?? Task.Delay(0)).ConfigureAwait(false);
  98. _task = null;
  99. }
  100. finally { _isDisconnecting = false; }
  101. if (_client != null)
  102. {
  103. if (!isDisposing)
  104. {
  105. try { await _client.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "", new CancellationToken()); }
  106. catch { }
  107. }
  108. try { _client.Dispose(); }
  109. catch { }
  110. _client = null;
  111. }
  112. }
  113. private async Task OnClosed(Exception ex)
  114. {
  115. if (_isDisconnecting)
  116. return; //Ignore, this disconnect was requested.
  117. System.Diagnostics.Debug.WriteLine("OnClosed - " + ex.Message);
  118. await _lock.WaitAsync().ConfigureAwait(false);
  119. try
  120. {
  121. await DisconnectInternalAsync(false);
  122. }
  123. finally
  124. {
  125. _lock.Release();
  126. }
  127. await Closed(ex);
  128. }
  129. public void SetHeader(string key, string value)
  130. {
  131. _headers[key] = value;
  132. }
  133. public void SetCancelToken(CancellationToken cancelToken)
  134. {
  135. _parentToken = cancelToken;
  136. _cancelToken = CancellationTokenSource.CreateLinkedTokenSource(_parentToken, _cancelTokenSource.Token).Token;
  137. }
  138. public async Task SendAsync(byte[] data, int index, int count, bool isText)
  139. {
  140. await _lock.WaitAsync().ConfigureAwait(false);
  141. try
  142. {
  143. if (!UnstableCheck())
  144. return;
  145. if (_client == null) return;
  146. int frameCount = (int)Math.Ceiling((double)count / SendChunkSize);
  147. for (int i = 0; i < frameCount; i++, index += SendChunkSize)
  148. {
  149. bool isLast = i == (frameCount - 1);
  150. int frameSize;
  151. if (isLast)
  152. frameSize = count - (i * SendChunkSize);
  153. else
  154. frameSize = SendChunkSize;
  155. var type = isText ? WebSocketMessageType.Text : WebSocketMessageType.Binary;
  156. await _client.SendAsync(new ArraySegment<byte>(data, index, count), type, isLast, _cancelToken).ConfigureAwait(false);
  157. }
  158. }
  159. finally
  160. {
  161. _lock.Release();
  162. }
  163. }
  164. private async Task RunAsync(CancellationToken cancelToken)
  165. {
  166. var buffer = new ArraySegment<byte>(new byte[ReceiveChunkSize]);
  167. try
  168. {
  169. while (!cancelToken.IsCancellationRequested)
  170. {
  171. WebSocketReceiveResult socketResult = await _client.ReceiveAsync(buffer, cancelToken).ConfigureAwait(false);
  172. byte[] result;
  173. int resultCount;
  174. if (socketResult.MessageType == WebSocketMessageType.Close)
  175. throw new WebSocketClosedException((int)socketResult.CloseStatus, socketResult.CloseStatusDescription);
  176. if (!socketResult.EndOfMessage)
  177. {
  178. //This is a large message (likely just READY), lets create a temporary expandable stream
  179. using (var stream = new MemoryStream())
  180. {
  181. stream.Write(buffer.Array, 0, socketResult.Count);
  182. do
  183. {
  184. if (cancelToken.IsCancellationRequested) return;
  185. socketResult = await _client.ReceiveAsync(buffer, cancelToken).ConfigureAwait(false);
  186. stream.Write(buffer.Array, 0, socketResult.Count);
  187. }
  188. while (socketResult == null || !socketResult.EndOfMessage);
  189. //Use the internal buffer if we can get it
  190. resultCount = (int)stream.Length;
  191. #if MSTRYBUFFER
  192. if (stream.TryGetBuffer(out var streamBuffer))
  193. result = streamBuffer.Array;
  194. else
  195. result = stream.ToArray();
  196. #else
  197. result = stream.GetBuffer();
  198. #endif
  199. }
  200. }
  201. else
  202. {
  203. //Small message
  204. resultCount = socketResult.Count;
  205. result = buffer.Array;
  206. }
  207. if (socketResult.MessageType == WebSocketMessageType.Text)
  208. {
  209. string text = Encoding.UTF8.GetString(result, 0, resultCount);
  210. await TextMessage(text).ConfigureAwait(false);
  211. }
  212. else
  213. await BinaryMessage(result, 0, resultCount).ConfigureAwait(false);
  214. }
  215. }
  216. catch (Win32Exception ex) when (ex.HResult == HR_TIMEOUT)
  217. {
  218. var _ = OnClosed(new Exception("Connection timed out.", ex));
  219. }
  220. catch (OperationCanceledException) { }
  221. catch (Exception ex)
  222. {
  223. //This cannot be awaited otherwise we'll deadlock when DiscordApiClient waits for this task to complete.
  224. var _ = OnClosed(ex);
  225. }
  226. }
  227. private bool UnstableCheck()
  228. {
  229. return _rand.NextDouble() > FailureRate;
  230. }
  231. }
  232. }