You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

ConnectionManager.cs 9.3 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. using Discord.Logging;
  2. using System;
  3. using System.Threading;
  4. using System.Threading.Tasks;
  5. using Discord.Net;
  6. namespace Discord
  7. {
  8. internal class ConnectionManager : IDisposable
  9. {
  10. public event Func<Task> Connected { add { _connectedEvent.Add(value); } remove { _connectedEvent.Remove(value); } }
  11. private readonly AsyncEvent<Func<Task>> _connectedEvent = new AsyncEvent<Func<Task>>();
  12. public event Func<Exception, bool, Task> Disconnected { add { _disconnectedEvent.Add(value); } remove { _disconnectedEvent.Remove(value); } }
  13. private readonly AsyncEvent<Func<Exception, bool, Task>> _disconnectedEvent = new AsyncEvent<Func<Exception, bool, Task>>();
  14. private readonly SemaphoreSlim _stateLock;
  15. private readonly Logger _logger;
  16. private readonly int _connectionTimeout;
  17. private readonly Func<Task> _onConnecting;
  18. private readonly Func<Exception, Task> _onDisconnecting;
  19. private TaskCompletionSource<bool> _connectionPromise, _readyPromise;
  20. private CancellationTokenSource _combinedCancelToken, _reconnectCancelToken, _connectionCancelToken;
  21. private Task _task;
  22. private bool _isDisposed;
  23. public ConnectionState State { get; private set; }
  24. public CancellationToken CancelToken { get; private set; }
  25. internal ConnectionManager(SemaphoreSlim stateLock, Logger logger, int connectionTimeout,
  26. Func<Task> onConnecting, Func<Exception, Task> onDisconnecting, Action<Func<Exception, Task>> clientDisconnectHandler)
  27. {
  28. _stateLock = stateLock;
  29. _logger = logger;
  30. _connectionTimeout = connectionTimeout;
  31. _onConnecting = onConnecting;
  32. _onDisconnecting = onDisconnecting;
  33. clientDisconnectHandler(ex =>
  34. {
  35. if (ex != null)
  36. {
  37. var ex2 = ex as WebSocketClosedException;
  38. if (ex2?.CloseCode == 4006)
  39. CriticalError(new Exception("WebSocket session expired", ex));
  40. else
  41. Error(new Exception("WebSocket connection was closed", ex));
  42. }
  43. else
  44. Error(new Exception("WebSocket connection was closed"));
  45. return Task.Delay(0);
  46. });
  47. }
  48. public virtual async Task StartAsync()
  49. {
  50. await AcquireConnectionLock().ConfigureAwait(false);
  51. var reconnectCancelToken = new CancellationTokenSource();
  52. _reconnectCancelToken?.Dispose();
  53. _reconnectCancelToken = reconnectCancelToken;
  54. _task = Task.Run(async () =>
  55. {
  56. try
  57. {
  58. Random jitter = new Random();
  59. int nextReconnectDelay = 1000;
  60. while (!reconnectCancelToken.IsCancellationRequested)
  61. {
  62. try
  63. {
  64. await ConnectAsync(reconnectCancelToken).ConfigureAwait(false);
  65. nextReconnectDelay = 1000; //Reset delay
  66. await _connectionPromise.Task.ConfigureAwait(false);
  67. }
  68. catch (OperationCanceledException ex)
  69. {
  70. Cancel(); //In case this exception didn't come from another Error call
  71. await DisconnectAsync(ex, !reconnectCancelToken.IsCancellationRequested).ConfigureAwait(false);
  72. }
  73. catch (Exception ex)
  74. {
  75. Error(ex); //In case this exception didn't come from another Error call
  76. if (!reconnectCancelToken.IsCancellationRequested)
  77. {
  78. await _logger.WarningAsync(ex).ConfigureAwait(false);
  79. await DisconnectAsync(ex, true).ConfigureAwait(false);
  80. }
  81. else
  82. {
  83. await _logger.ErrorAsync(ex).ConfigureAwait(false);
  84. await DisconnectAsync(ex, false).ConfigureAwait(false);
  85. }
  86. }
  87. if (!reconnectCancelToken.IsCancellationRequested)
  88. {
  89. //Wait before reconnecting
  90. await Task.Delay(nextReconnectDelay, reconnectCancelToken.Token).ConfigureAwait(false);
  91. nextReconnectDelay = (nextReconnectDelay * 2) + jitter.Next(-250, 250);
  92. if (nextReconnectDelay > 60000)
  93. nextReconnectDelay = 60000;
  94. }
  95. }
  96. }
  97. finally { _stateLock.Release(); }
  98. });
  99. }
  100. public virtual async Task StopAsync()
  101. {
  102. Cancel();
  103. var task = _task;
  104. if (task != null)
  105. await task.ConfigureAwait(false);
  106. }
  107. private async Task ConnectAsync(CancellationTokenSource reconnectCancelToken)
  108. {
  109. _connectionCancelToken?.Dispose();
  110. _combinedCancelToken?.Dispose();
  111. _connectionCancelToken = new CancellationTokenSource();
  112. _combinedCancelToken = CancellationTokenSource.CreateLinkedTokenSource(_connectionCancelToken.Token, reconnectCancelToken.Token);
  113. CancelToken = _combinedCancelToken.Token;
  114. _connectionPromise = new TaskCompletionSource<bool>();
  115. State = ConnectionState.Connecting;
  116. await _logger.InfoAsync("Connecting").ConfigureAwait(false);
  117. try
  118. {
  119. var readyPromise = new TaskCompletionSource<bool>();
  120. _readyPromise = readyPromise;
  121. //Abort connection on timeout
  122. var cancelToken = CancelToken;
  123. var _ = Task.Run(async () =>
  124. {
  125. try
  126. {
  127. await Task.Delay(_connectionTimeout, cancelToken).ConfigureAwait(false);
  128. readyPromise.TrySetException(new TimeoutException());
  129. }
  130. catch (OperationCanceledException) { }
  131. });
  132. await _onConnecting().ConfigureAwait(false);
  133. await _logger.InfoAsync("Connected").ConfigureAwait(false);
  134. State = ConnectionState.Connected;
  135. await _logger.DebugAsync("Raising Event").ConfigureAwait(false);
  136. await _connectedEvent.InvokeAsync().ConfigureAwait(false);
  137. }
  138. catch (Exception ex)
  139. {
  140. Error(ex);
  141. throw;
  142. }
  143. }
  144. private async Task DisconnectAsync(Exception ex, bool isReconnecting)
  145. {
  146. if (State == ConnectionState.Disconnected) return;
  147. State = ConnectionState.Disconnecting;
  148. await _logger.InfoAsync("Disconnecting").ConfigureAwait(false);
  149. await _onDisconnecting(ex).ConfigureAwait(false);
  150. await _logger.InfoAsync("Disconnected").ConfigureAwait(false);
  151. State = ConnectionState.Disconnected;
  152. await _disconnectedEvent.InvokeAsync(ex, isReconnecting).ConfigureAwait(false);
  153. }
  154. public async Task CompleteAsync()
  155. {
  156. await _readyPromise.TrySetResultAsync(true).ConfigureAwait(false);
  157. }
  158. public async Task WaitAsync()
  159. {
  160. await _readyPromise.Task.ConfigureAwait(false);
  161. }
  162. public void Cancel()
  163. {
  164. _readyPromise?.TrySetCanceled();
  165. _connectionPromise?.TrySetCanceled();
  166. _reconnectCancelToken?.Cancel();
  167. _connectionCancelToken?.Cancel();
  168. }
  169. public void Error(Exception ex)
  170. {
  171. _readyPromise.TrySetException(ex);
  172. _connectionPromise.TrySetException(ex);
  173. _connectionCancelToken?.Cancel();
  174. }
  175. public void CriticalError(Exception ex)
  176. {
  177. _reconnectCancelToken?.Cancel();
  178. Error(ex);
  179. }
  180. public void Reconnect()
  181. {
  182. _readyPromise.TrySetCanceled();
  183. _connectionPromise.TrySetCanceled();
  184. _connectionCancelToken?.Cancel();
  185. }
  186. private async Task AcquireConnectionLock()
  187. {
  188. while (true)
  189. {
  190. await StopAsync().ConfigureAwait(false);
  191. if (await _stateLock.WaitAsync(0).ConfigureAwait(false))
  192. break;
  193. }
  194. }
  195. protected virtual void Dispose(bool disposing)
  196. {
  197. if (!_isDisposed)
  198. {
  199. if (disposing)
  200. {
  201. _combinedCancelToken?.Dispose();
  202. _reconnectCancelToken?.Dispose();
  203. _connectionCancelToken?.Dispose();
  204. }
  205. _isDisposed = true;
  206. }
  207. }
  208. public void Dispose()
  209. {
  210. Dispose(true);
  211. }
  212. }
  213. }