You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

ConnectionManager.cs 9.6 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. using Discord.Logging;
  2. using System;
  3. using System.Threading;
  4. using System.Threading.Tasks;
  5. using Discord.Net;
  6. namespace Discord
  7. {
  8. internal class ConnectionManager : IDisposable
  9. {
  10. public event Func<Task> Connected { add { _connectedEvent.Add(value); } remove { _connectedEvent.Remove(value); } }
  11. private readonly AsyncEvent<Func<Task>> _connectedEvent = new AsyncEvent<Func<Task>>();
  12. public event Func<Exception, bool, Task> Disconnected { add { _disconnectedEvent.Add(value); } remove { _disconnectedEvent.Remove(value); } }
  13. private readonly AsyncEvent<Func<Exception, bool, Task>> _disconnectedEvent = new AsyncEvent<Func<Exception, bool, Task>>();
  14. private readonly SemaphoreSlim _stateLock;
  15. private readonly Logger _logger;
  16. private readonly int _connectionTimeout;
  17. private readonly Func<Task> _onConnecting;
  18. private readonly Func<Exception, Task> _onDisconnecting;
  19. private TaskCompletionSource<bool> _connectionPromise, _readyPromise;
  20. private CancellationTokenSource _combinedCancelToken, _reconnectCancelToken, _connectionCancelToken;
  21. private Task _task;
  22. private bool _isDisposed;
  23. public ConnectionState State { get; private set; }
  24. public CancellationToken CancelToken { get; private set; }
  25. internal ConnectionManager(SemaphoreSlim stateLock, Logger logger, int connectionTimeout,
  26. Func<Task> onConnecting, Func<Exception, Task> onDisconnecting, Action<Func<Exception, Task>> clientDisconnectHandler)
  27. {
  28. _stateLock = stateLock;
  29. _logger = logger;
  30. _connectionTimeout = connectionTimeout;
  31. _onConnecting = onConnecting;
  32. _onDisconnecting = onDisconnecting;
  33. clientDisconnectHandler(ex =>
  34. {
  35. if (ex != null)
  36. {
  37. var ex2 = ex as WebSocketClosedException;
  38. if (ex2?.CloseCode == 4006)
  39. CriticalError(new Exception("WebSocket session expired", ex));
  40. else
  41. Error(new Exception("WebSocket connection was closed", ex));
  42. }
  43. else
  44. Error(new Exception("WebSocket connection was closed"));
  45. return Task.Delay(0);
  46. });
  47. }
  48. public virtual async Task StartAsync()
  49. {
  50. await AcquireConnectionLock().ConfigureAwait(false);
  51. var reconnectCancelToken = new CancellationTokenSource();
  52. _reconnectCancelToken?.Dispose();
  53. _reconnectCancelToken = reconnectCancelToken;
  54. _task = Task.Run(async () =>
  55. {
  56. try
  57. {
  58. Random jitter = new Random();
  59. int nextReconnectDelay = 1000;
  60. while (!reconnectCancelToken.IsCancellationRequested)
  61. {
  62. try
  63. {
  64. await ConnectAsync(reconnectCancelToken).ConfigureAwait(false);
  65. nextReconnectDelay = 1000; //Reset delay
  66. await _connectionPromise.Task.ConfigureAwait(false);
  67. }
  68. catch (OperationCanceledException ex)
  69. {
  70. Cancel(); //In case this exception didn't come from another Error call
  71. await DisconnectAsync(ex, !reconnectCancelToken.IsCancellationRequested).ConfigureAwait(false);
  72. }
  73. catch (Exception ex)
  74. {
  75. Error(ex); //In case this exception didn't come from another Error call
  76. if (!reconnectCancelToken.IsCancellationRequested)
  77. {
  78. await _logger.WarningAsync(ex).ConfigureAwait(false);
  79. await DisconnectAsync(ex, true).ConfigureAwait(false);
  80. }
  81. else
  82. {
  83. await _logger.ErrorAsync(ex).ConfigureAwait(false);
  84. await DisconnectAsync(ex, false).ConfigureAwait(false);
  85. }
  86. }
  87. if (!reconnectCancelToken.IsCancellationRequested)
  88. {
  89. //Wait before reconnecting
  90. await Task.Delay(nextReconnectDelay, reconnectCancelToken.Token).ConfigureAwait(false);
  91. nextReconnectDelay = (nextReconnectDelay * 2) + jitter.Next(-250, 250);
  92. if (nextReconnectDelay > 60000)
  93. nextReconnectDelay = 60000;
  94. }
  95. }
  96. }
  97. finally { _stateLock.Release(); }
  98. });
  99. }
  100. public virtual Task StopAsync()
  101. {
  102. Cancel();
  103. return Task.CompletedTask;
  104. }
  105. private async Task ConnectAsync(CancellationTokenSource reconnectCancelToken)
  106. {
  107. _connectionCancelToken?.Dispose();
  108. _combinedCancelToken?.Dispose();
  109. _connectionCancelToken = new CancellationTokenSource();
  110. _combinedCancelToken = CancellationTokenSource.CreateLinkedTokenSource(_connectionCancelToken.Token, reconnectCancelToken.Token);
  111. CancelToken = _combinedCancelToken.Token;
  112. _connectionPromise = new TaskCompletionSource<bool>();
  113. State = ConnectionState.Connecting;
  114. await _logger.InfoAsync("Connecting").ConfigureAwait(false);
  115. try
  116. {
  117. var readyPromise = new TaskCompletionSource<bool>();
  118. _readyPromise = readyPromise;
  119. //Abort connection on timeout
  120. var cancelToken = CancelToken;
  121. var _ = Task.Run(async () =>
  122. {
  123. try
  124. {
  125. await Task.Delay(_connectionTimeout, cancelToken).ConfigureAwait(false);
  126. readyPromise.TrySetException(new TimeoutException());
  127. }
  128. catch (OperationCanceledException) { }
  129. });
  130. try
  131. {
  132. await _onConnecting().ConfigureAwait(false);
  133. }
  134. catch (TaskCanceledException ex)
  135. {
  136. Exception innerEx = ex.InnerException ?? new OperationCanceledException("Failed to connect.");
  137. Error(innerEx);
  138. throw innerEx;
  139. }
  140. await _logger.InfoAsync("Connected").ConfigureAwait(false);
  141. State = ConnectionState.Connected;
  142. await _logger.DebugAsync("Raising Event").ConfigureAwait(false);
  143. await _connectedEvent.InvokeAsync().ConfigureAwait(false);
  144. }
  145. catch (Exception ex)
  146. {
  147. Error(ex);
  148. throw;
  149. }
  150. }
  151. private async Task DisconnectAsync(Exception ex, bool isReconnecting)
  152. {
  153. if (State == ConnectionState.Disconnected) return;
  154. State = ConnectionState.Disconnecting;
  155. await _logger.InfoAsync("Disconnecting").ConfigureAwait(false);
  156. await _onDisconnecting(ex).ConfigureAwait(false);
  157. await _disconnectedEvent.InvokeAsync(ex, isReconnecting).ConfigureAwait(false);
  158. State = ConnectionState.Disconnected;
  159. await _logger.InfoAsync("Disconnected").ConfigureAwait(false);
  160. }
  161. public async Task CompleteAsync()
  162. {
  163. await _readyPromise.TrySetResultAsync(true).ConfigureAwait(false);
  164. }
  165. public async Task WaitAsync()
  166. {
  167. await _readyPromise.Task.ConfigureAwait(false);
  168. }
  169. public void Cancel()
  170. {
  171. _readyPromise?.TrySetCanceled();
  172. _connectionPromise?.TrySetCanceled();
  173. _reconnectCancelToken?.Cancel();
  174. _connectionCancelToken?.Cancel();
  175. }
  176. public void Error(Exception ex)
  177. {
  178. _readyPromise.TrySetException(ex);
  179. _connectionPromise.TrySetException(ex);
  180. _connectionCancelToken?.Cancel();
  181. }
  182. public void CriticalError(Exception ex)
  183. {
  184. _reconnectCancelToken?.Cancel();
  185. Error(ex);
  186. }
  187. public void Reconnect()
  188. {
  189. _readyPromise.TrySetCanceled();
  190. _connectionPromise.TrySetCanceled();
  191. _connectionCancelToken?.Cancel();
  192. }
  193. private async Task AcquireConnectionLock()
  194. {
  195. while (true)
  196. {
  197. await StopAsync().ConfigureAwait(false);
  198. if (await _stateLock.WaitAsync(0).ConfigureAwait(false))
  199. break;
  200. }
  201. }
  202. protected virtual void Dispose(bool disposing)
  203. {
  204. if (!_isDisposed)
  205. {
  206. if (disposing)
  207. {
  208. _combinedCancelToken?.Dispose();
  209. _reconnectCancelToken?.Dispose();
  210. _connectionCancelToken?.Dispose();
  211. }
  212. _isDisposed = true;
  213. }
  214. }
  215. public void Dispose()
  216. {
  217. Dispose(true);
  218. }
  219. }
  220. }