From bcb8b538491f9ec19c477affaa1cd23e0a02eca5 Mon Sep 17 00:00:00 2001 From: Paulo Date: Sun, 17 May 2020 07:16:42 -0300 Subject: [PATCH] Implement gateway ratelimit --- src/Discord.Net.Core/RequestOptions.cs | 1 + .../Net/Queue/GatewayBucket.cs | 50 +++++++++++++ .../Net/Queue/RequestQueue.cs | 21 ++++-- .../Net/Queue/RequestQueueBucket.cs | 70 +++++++++++++++++-- .../Net/Queue/Requests/WebSocketRequest.cs | 4 +- .../DiscordSocketApiClient.cs | 5 +- 6 files changed, 137 insertions(+), 14 deletions(-) create mode 100644 src/Discord.Net.Rest/Net/Queue/GatewayBucket.cs diff --git a/src/Discord.Net.Core/RequestOptions.cs b/src/Discord.Net.Core/RequestOptions.cs index 1b05df2a3..02161e64d 100644 --- a/src/Discord.Net.Core/RequestOptions.cs +++ b/src/Discord.Net.Core/RequestOptions.cs @@ -60,6 +60,7 @@ namespace Discord internal string BucketId { get; set; } internal bool IsClientBucket { get; set; } internal bool IsReactionBucket { get; set; } + internal bool IsGatewayBucket { get; set; } internal static RequestOptions CreateOrClone(RequestOptions options) { diff --git a/src/Discord.Net.Rest/Net/Queue/GatewayBucket.cs b/src/Discord.Net.Rest/Net/Queue/GatewayBucket.cs new file mode 100644 index 000000000..1c2133fd4 --- /dev/null +++ b/src/Discord.Net.Rest/Net/Queue/GatewayBucket.cs @@ -0,0 +1,50 @@ +using System.Collections.Immutable; + +namespace Discord.Net.Queue +{ + public enum GatewayBucketType + { + Unbucketed = 0, + Identify = 1 + } + internal struct GatewayBucket + { + private static readonly ImmutableDictionary DefsByType; + private static readonly ImmutableDictionary DefsById; + + static GatewayBucket() + { + var buckets = new[] + { + new GatewayBucket(GatewayBucketType.Unbucketed, "", 120, 60), + new GatewayBucket(GatewayBucketType.Identify, "", 1, 5) + }; + + var builder = ImmutableDictionary.CreateBuilder(); + foreach (var bucket in buckets) + builder.Add(bucket.Type, bucket); + DefsByType = builder.ToImmutable(); + + var builder2 = ImmutableDictionary.CreateBuilder(); + foreach (var bucket in buckets) + builder2.Add(bucket.Id, bucket); + DefsById = builder2.ToImmutable(); + } + + public static GatewayBucket Get(GatewayBucketType type) => DefsByType[type]; + public static GatewayBucket Get(string id) => DefsById[id]; + + public GatewayBucketType Type { get; } + public string Id { get; } + public int WindowCount { get; } + public int WindowSeconds { get; } + + public GatewayBucket(GatewayBucketType type, string id, int count, int seconds) + { + Type = type; + Id = id; + WindowCount = count; + WindowSeconds = seconds; + } + } +} diff --git a/src/Discord.Net.Rest/Net/Queue/RequestQueue.cs b/src/Discord.Net.Rest/Net/Queue/RequestQueue.cs index 4baf76433..ca7bf6868 100644 --- a/src/Discord.Net.Rest/Net/Queue/RequestQueue.cs +++ b/src/Discord.Net.Rest/Net/Queue/RequestQueue.cs @@ -89,12 +89,23 @@ namespace Discord.Net.Queue } public async Task SendAsync(WebSocketRequest request) { - //TODO: Re-impl websocket buckets - request.CancelToken = _requestCancelToken; - await request.SendAsync().ConfigureAwait(false); + CancellationTokenSource createdTokenSource = null; + if (request.Options.CancelToken.CanBeCanceled) + { + createdTokenSource = CancellationTokenSource.CreateLinkedTokenSource(_requestCancelToken, request.Options.CancelToken); + request.Options.CancelToken = createdTokenSource.Token; + } + else + request.Options.CancelToken = _requestCancelToken; + + var bucket = GetOrCreateBucket(request.Options.BucketId, request); + await bucket.SendAsync(request).ConfigureAwait(false); + createdTokenSource?.Dispose(); + //request.CancelToken = _requestCancelToken; + //await request.SendAsync().ConfigureAwait(false); } - internal async Task EnterGlobalAsync(int id, RestRequest request) + internal async Task EnterGlobalAsync(int id, IRequest request) { int millis = (int)Math.Ceiling((_waitUntil - DateTimeOffset.UtcNow).TotalMilliseconds); if (millis > 0) @@ -110,7 +121,7 @@ namespace Discord.Net.Queue _waitUntil = DateTimeOffset.UtcNow.AddMilliseconds(info.RetryAfter.Value + (info.Lag?.TotalMilliseconds ?? 0.0)); } - private RequestBucket GetOrCreateBucket(string id, RestRequest request) + private RequestBucket GetOrCreateBucket(string id, IRequest request) { return _buckets.GetOrAdd(id, x => new RequestBucket(this, request, x)); } diff --git a/src/Discord.Net.Rest/Net/Queue/RequestQueueBucket.cs b/src/Discord.Net.Rest/Net/Queue/RequestQueueBucket.cs index 72dd1642d..eaa22f024 100644 --- a/src/Discord.Net.Rest/Net/Queue/RequestQueueBucket.cs +++ b/src/Discord.Net.Rest/Net/Queue/RequestQueueBucket.cs @@ -22,7 +22,7 @@ namespace Discord.Net.Queue public int WindowCount { get; private set; } public DateTimeOffset LastAttemptAt { get; private set; } - public RequestBucket(RequestQueue queue, RestRequest request, string id) + public RequestBucket(RequestQueue queue, IRequest request, string id) { _queue = queue; Id = id; @@ -31,13 +31,15 @@ namespace Discord.Net.Queue if (request.Options.IsClientBucket) WindowCount = ClientBucket.Get(request.Options.BucketId).WindowCount; + else if (request.Options.IsGatewayBucket) + WindowCount = GatewayBucket.Get(request.Options.BucketId).WindowCount; else WindowCount = 1; //Only allow one request until we get a header back _semaphore = WindowCount; _resetTick = null; LastAttemptAt = DateTimeOffset.UtcNow; } - + static int nextId = 0; public async Task SendAsync(RestRequest request) { @@ -149,8 +151,59 @@ namespace Discord.Net.Queue } } } + public async Task SendAsync(WebSocketRequest request) + { + int id = Interlocked.Increment(ref nextId); +#if DEBUG_LIMITS + Debug.WriteLine($"[{id}] Start"); +#endif + LastAttemptAt = DateTimeOffset.UtcNow; + while (true) + { + await _queue.EnterGlobalAsync(id, request).ConfigureAwait(false); + await EnterAsync(id, request).ConfigureAwait(false); - private async Task EnterAsync(int id, RestRequest request) +#if DEBUG_LIMITS + Debug.WriteLine($"[{id}] Sending..."); +#endif + try + { + await request.SendAsync().ConfigureAwait(false); + return; + } + catch (TimeoutException) + { +#if DEBUG_LIMITS + Debug.WriteLine($"[{id}] Timeout"); +#endif + if ((request.Options.RetryMode & RetryMode.RetryTimeouts) == 0) + throw; + + await Task.Delay(500).ConfigureAwait(false); + continue; //Retry + } + /*catch (Exception) + { +#if DEBUG_LIMITS + Debug.WriteLine($"[{id}] Error"); +#endif + if ((request.Options.RetryMode & RetryMode.RetryErrors) == 0) + throw; + + await Task.Delay(500); + continue; //Retry + }*/ + finally + { + UpdateRateLimit(id, request, default(RateLimitInfo), false); +#if DEBUG_LIMITS + Debug.WriteLine($"[{id}] Stop"); +#endif + } + } + } + + private async Task EnterAsync(int id, IRequest request) { int windowCount; DateTimeOffset? resetAt; @@ -213,7 +266,7 @@ namespace Discord.Net.Queue } } - private void UpdateRateLimit(int id, RestRequest request, RateLimitInfo info, bool is429) + private void UpdateRateLimit(int id, IRequest request, RateLimitInfo info, bool is429) { if (WindowCount == 0) return; @@ -273,6 +326,13 @@ namespace Discord.Net.Queue Debug.WriteLine($"[{id}] Client Bucket ({ClientBucket.Get(request.Options.BucketId).WindowSeconds * 1000} ms)"); #endif } + else if (request.Options.IsGatewayBucket && request.Options.BucketId != null) + { + resetTick = DateTimeOffset.UtcNow.AddSeconds(GatewayBucket.Get(request.Options.BucketId).WindowSeconds); +#if DEBUG_LIMITS + Debug.WriteLine($"[{id}] Gateway Bucket ({GatewayBucket.Get(request.Options.BucketId).WindowSeconds * 1000} ms)"); +#endif + } if (resetTick == null) { @@ -320,7 +380,7 @@ namespace Discord.Net.Queue } } - private void ThrowRetryLimit(RestRequest request) + private void ThrowRetryLimit(IRequest request) { if ((request.Options.RetryMode & RetryMode.RetryRatelimit) == 0) throw new RateLimitedException(request); diff --git a/src/Discord.Net.Rest/Net/Queue/Requests/WebSocketRequest.cs b/src/Discord.Net.Rest/Net/Queue/Requests/WebSocketRequest.cs index 81eb40b31..a153f7c20 100644 --- a/src/Discord.Net.Rest/Net/Queue/Requests/WebSocketRequest.cs +++ b/src/Discord.Net.Rest/Net/Queue/Requests/WebSocketRequest.cs @@ -9,7 +9,6 @@ namespace Discord.Net.Queue public class WebSocketRequest : IRequest { public IWebSocketClient Client { get; } - public string BucketId { get; } public byte[] Data { get; } public bool IsText { get; } public DateTimeOffset? TimeoutAt { get; } @@ -17,12 +16,11 @@ namespace Discord.Net.Queue public RequestOptions Options { get; } public CancellationToken CancelToken { get; internal set; } - public WebSocketRequest(IWebSocketClient client, string bucketId, byte[] data, bool isText, RequestOptions options) + public WebSocketRequest(IWebSocketClient client, byte[] data, bool isText, RequestOptions options) { Preconditions.NotNull(options, nameof(options)); Client = client; - BucketId = bucketId; Data = data; IsText = isText; Options = options; diff --git a/src/Discord.Net.WebSocket/DiscordSocketApiClient.cs b/src/Discord.Net.WebSocket/DiscordSocketApiClient.cs index ef97615e2..0e076b664 100644 --- a/src/Discord.Net.WebSocket/DiscordSocketApiClient.cs +++ b/src/Discord.Net.WebSocket/DiscordSocketApiClient.cs @@ -205,7 +205,10 @@ namespace Discord.API payload = new SocketFrame { Operation = (int)opCode, Payload = payload }; if (payload != null) bytes = Encoding.UTF8.GetBytes(SerializeJson(payload)); - await RequestQueue.SendAsync(new WebSocketRequest(WebSocketClient, null, bytes, true, options)).ConfigureAwait(false); + + options.IsGatewayBucket = true; + options.BucketId = GatewayBucket.Get(opCode == GatewayOpCode.Identify ? GatewayBucketType.Identify : GatewayBucketType.Unbucketed).Id; + await RequestQueue.SendAsync(new WebSocketRequest(WebSocketClient, bytes, true, options)).ConfigureAwait(false); await _sentGatewayMessageEvent.InvokeAsync(opCode).ConfigureAwait(false); }