You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

RequestQueueBucket.cs 12 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317
  1. using Discord.Serialization;
  2. using System;
  3. #if DEBUG_LIMITS
  4. using System.Diagnostics;
  5. #endif
  6. using System.IO;
  7. using System.Net;
  8. using System.Threading;
  9. using System.Threading.Tasks;
  10. namespace Discord.Net.Queue
  11. {
  12. internal class RequestBucket
  13. {
  14. private class Error
  15. {
  16. [ModelProperty("code")]
  17. public int Code { get; set; }
  18. [ModelProperty("message")]
  19. public string Message { get; set; }
  20. }
  21. private static int _nextId = 0;
  22. private readonly object _lock;
  23. private readonly RequestQueue _queue;
  24. private int _semaphore;
  25. private DateTimeOffset? _resetTick;
  26. public string Id { get; private set; }
  27. public int WindowCount { get; private set; }
  28. public DateTimeOffset LastAttemptAt { get; private set; }
  29. public RequestBucket(RequestQueue queue, RestRequest request, string id)
  30. {
  31. _queue = queue;
  32. Id = id;
  33. _lock = new object();
  34. if (request.Options.IsClientBucket)
  35. WindowCount = ClientBucket.Get(request.Options.BucketId).WindowCount;
  36. else
  37. WindowCount = 1; //Only allow one request until we get a header back
  38. _semaphore = WindowCount;
  39. _resetTick = null;
  40. LastAttemptAt = DateTimeOffset.UtcNow;
  41. }
  42. public async Task<ReadOnlyBuffer<byte>> SendAsync(RestRequest request)
  43. {
  44. int id = Interlocked.Increment(ref _nextId);
  45. #if DEBUG_LIMITS
  46. Debug.WriteLine($"[{id}] Start");
  47. #endif
  48. LastAttemptAt = DateTimeOffset.UtcNow;
  49. while (true)
  50. {
  51. await _queue.EnterGlobalAsync(id, request).ConfigureAwait(false);
  52. await EnterAsync(id, request).ConfigureAwait(false);
  53. #if DEBUG_LIMITS
  54. Debug.WriteLine($"[{id}] Sending...");
  55. #endif
  56. var info = default(RateLimitInfo);
  57. try
  58. {
  59. var response = await request.SendAsync().ConfigureAwait(false);
  60. info = new RateLimitInfo(response.Headers);
  61. if (response.StatusCode < (HttpStatusCode)200 || response.StatusCode >= (HttpStatusCode)300)
  62. {
  63. switch (response.StatusCode)
  64. {
  65. case (HttpStatusCode)429:
  66. if (info.IsGlobal)
  67. {
  68. #if DEBUG_LIMITS
  69. Debug.WriteLine($"[{id}] (!) 429 [Global]");
  70. #endif
  71. _queue.PauseGlobal(info);
  72. }
  73. else
  74. {
  75. #if DEBUG_LIMITS
  76. Debug.WriteLine($"[{id}] (!) 429");
  77. #endif
  78. UpdateRateLimit(id, request, info, true);
  79. }
  80. await _queue.RaiseRateLimitTriggered(Id, info).ConfigureAwait(false);
  81. continue; //Retry
  82. case HttpStatusCode.BadGateway: //502
  83. #if DEBUG_LIMITS
  84. Debug.WriteLine($"[{id}] (!) 502");
  85. #endif
  86. if ((request.Options.RetryMode & RetryMode.Retry502) == 0)
  87. throw new HttpException(HttpStatusCode.BadGateway, null);
  88. continue; //Retry
  89. default:
  90. int? code = null;
  91. string reason = null;
  92. if (response.Data.Length > 0)
  93. {
  94. try
  95. {
  96. var error = Serializer.Json.Read<Error>(response.Data);
  97. code = error.Code;
  98. reason = error.Message;
  99. }
  100. catch { }
  101. }
  102. throw new HttpException(response.StatusCode, code, reason);
  103. }
  104. }
  105. else
  106. {
  107. #if DEBUG_LIMITS
  108. Debug.WriteLine($"[{id}] Success");
  109. #endif
  110. return response.Data;
  111. }
  112. }
  113. //catch (HttpException) { throw; } //Pass through
  114. catch (TimeoutException)
  115. {
  116. #if DEBUG_LIMITS
  117. Debug.WriteLine($"[{id}] Timeout");
  118. #endif
  119. if ((request.Options.RetryMode & RetryMode.RetryTimeouts) == 0)
  120. throw;
  121. await Task.Delay(500);
  122. continue; //Retry
  123. }
  124. /*catch (Exception)
  125. {
  126. #if DEBUG_LIMITS
  127. Debug.WriteLine($"[{id}] Error");
  128. #endif
  129. if ((request.Options.RetryMode & RetryMode.RetryErrors) == 0)
  130. throw;
  131. await Task.Delay(500);
  132. continue; //Retry
  133. }*/
  134. finally
  135. {
  136. UpdateRateLimit(id, request, info, false);
  137. #if DEBUG_LIMITS
  138. Debug.WriteLine($"[{id}] Stop");
  139. #endif
  140. }
  141. }
  142. }
  143. private async Task EnterAsync(int id, RestRequest request)
  144. {
  145. int windowCount;
  146. DateTimeOffset? resetAt;
  147. bool isRateLimited = false;
  148. while (true)
  149. {
  150. if (DateTimeOffset.UtcNow > request.TimeoutAt || request.Options.CancelToken.IsCancellationRequested)
  151. {
  152. if (!isRateLimited)
  153. throw new TimeoutException();
  154. else
  155. throw new RateLimitedException();
  156. }
  157. lock (_lock)
  158. {
  159. windowCount = WindowCount;
  160. resetAt = _resetTick;
  161. }
  162. DateTimeOffset? timeoutAt = request.TimeoutAt;
  163. if (windowCount > 0 && Interlocked.Decrement(ref _semaphore) < 0)
  164. {
  165. if (!isRateLimited)
  166. {
  167. isRateLimited = true;
  168. await _queue.RaiseRateLimitTriggered(Id, null).ConfigureAwait(false);
  169. }
  170. if ((request.Options.RetryMode & RetryMode.RetryRatelimit) == 0)
  171. throw new RateLimitedException();
  172. if (resetAt.HasValue)
  173. {
  174. if (resetAt > timeoutAt)
  175. throw new RateLimitedException();
  176. int millis = (int)Math.Ceiling((resetAt.Value - DateTimeOffset.UtcNow).TotalMilliseconds);
  177. #if DEBUG_LIMITS
  178. Debug.WriteLine($"[{id}] Sleeping {millis} ms (Pre-emptive)");
  179. #endif
  180. if (millis > 0)
  181. await Task.Delay(millis, request.Options.CancelToken).ConfigureAwait(false);
  182. }
  183. else
  184. {
  185. if ((timeoutAt.Value - DateTimeOffset.UtcNow).TotalMilliseconds < 500.0)
  186. throw new RateLimitedException();
  187. #if DEBUG_LIMITS
  188. Debug.WriteLine($"[{id}] Sleeping 500* ms (Pre-emptive)");
  189. #endif
  190. await Task.Delay(500, request.Options.CancelToken).ConfigureAwait(false);
  191. }
  192. continue;
  193. }
  194. #if DEBUG_LIMITS
  195. else
  196. Debug.WriteLine($"[{id}] Entered Semaphore ({_semaphore}/{WindowCount} remaining)");
  197. #endif
  198. break;
  199. }
  200. }
  201. private void UpdateRateLimit(int id, RestRequest request, RateLimitInfo info, bool is429)
  202. {
  203. if (WindowCount == 0)
  204. return;
  205. lock (_lock)
  206. {
  207. bool hasQueuedReset = _resetTick != null;
  208. if (info.Limit.HasValue && WindowCount != info.Limit.Value)
  209. {
  210. WindowCount = info.Limit.Value;
  211. _semaphore = info.Remaining.Value;
  212. #if DEBUG_LIMITS
  213. Debug.WriteLine($"[{id}] Upgraded Semaphore to {info.Remaining.Value}/{WindowCount}");
  214. #endif
  215. }
  216. long now = DateTimeUtils.ToUnixSeconds(DateTimeOffset.UtcNow);
  217. DateTimeOffset? resetTick = null;
  218. //Using X-RateLimit-Remaining causes a race condition
  219. /*if (info.Remaining.HasValue)
  220. {
  221. Debug.WriteLine($"[{id}] X-RateLimit-Remaining: " + info.Remaining.Value);
  222. _semaphore = info.Remaining.Value;
  223. }*/
  224. if (info.RetryAfter.HasValue)
  225. {
  226. //RetryAfter is more accurate than Reset, where available
  227. resetTick = DateTimeOffset.UtcNow.AddMilliseconds(info.RetryAfter.Value);
  228. #if DEBUG_LIMITS
  229. Debug.WriteLine($"[{id}] Retry-After: {info.RetryAfter.Value} ({info.RetryAfter.Value} ms)");
  230. #endif
  231. }
  232. else if (info.Reset.HasValue)
  233. {
  234. resetTick = info.Reset.Value.AddSeconds(info.Lag?.TotalSeconds ?? 1.0);
  235. int diff = (int)(resetTick.Value - DateTimeOffset.UtcNow).TotalMilliseconds;
  236. #if DEBUG_LIMITS
  237. Debug.WriteLine($"[{id}] X-RateLimit-Reset: {info.Reset.Value.ToUnixTimeSeconds()} ({diff} ms, {info.Lag?.TotalMilliseconds} ms lag)");
  238. #endif
  239. }
  240. else if (request.Options.IsClientBucket && request.Options.BucketId != null)
  241. {
  242. resetTick = DateTimeOffset.UtcNow.AddSeconds(ClientBucket.Get(request.Options.BucketId).WindowSeconds);
  243. #if DEBUG_LIMITS
  244. Debug.WriteLine($"[{id}] Client Bucket ({ClientBucket.Get(request.Options.BucketId).WindowSeconds * 1000} ms)");
  245. #endif
  246. }
  247. if (resetTick == null)
  248. {
  249. WindowCount = 0; //No rate limit info, disable limits on this bucket (should only ever happen with a user token)
  250. #if DEBUG_LIMITS
  251. Debug.WriteLine($"[{id}] Disabled Semaphore");
  252. #endif
  253. return;
  254. }
  255. if (!hasQueuedReset || resetTick > _resetTick)
  256. {
  257. _resetTick = resetTick;
  258. LastAttemptAt = resetTick.Value; //Make sure we dont destroy this until after its been reset
  259. #if DEBUG_LIMITS
  260. Debug.WriteLine($"[{id}] Reset in {(int)Math.Ceiling((resetTick - DateTimeOffset.UtcNow).Value.TotalMilliseconds)} ms");
  261. #endif
  262. if (!hasQueuedReset)
  263. {
  264. var _ = QueueReset(id, (int)Math.Ceiling((_resetTick.Value - DateTimeOffset.UtcNow).TotalMilliseconds));
  265. }
  266. }
  267. }
  268. }
  269. private async Task QueueReset(int id, int millis)
  270. {
  271. while (true)
  272. {
  273. if (millis > 0)
  274. await Task.Delay(millis).ConfigureAwait(false);
  275. lock (_lock)
  276. {
  277. millis = (int)Math.Ceiling((_resetTick.Value - DateTimeOffset.UtcNow).TotalMilliseconds);
  278. if (millis <= 0) //Make sure we havent gotten a more accurate reset time
  279. {
  280. #if DEBUG_LIMITS
  281. Debug.WriteLine($"[{id}] * Reset *");
  282. #endif
  283. _semaphore = WindowCount;
  284. _resetTick = null;
  285. return;
  286. }
  287. }
  288. }
  289. }
  290. }
  291. }