From 9c4608189065a110f975c0f5ba3e577322292f2c Mon Sep 17 00:00:00 2001 From: Paulo Date: Mon, 25 May 2020 20:21:45 -0300 Subject: [PATCH] Remove specific RequestQueue for WebSocket and other changes The only account limit is for identify that is dealt in a different way (exclusive semaphore), so websocket queues can be shared with REST and don't need to be shared between clients anymore. Also added the ratelimit for presence updates. --- .../Entities/Gateway/GatewayLimits.cs | 21 ++++++++++++++++- .../Net/Queue/GatewayBucket.cs | 12 ++++++++-- .../Net/Queue/RequestQueue.cs | 23 +++++++++++++++++++ .../Net/Queue/RequestQueueBucket.cs | 8 ++++--- src/Discord.Net.WebSocket/BaseSocketClient.cs | 2 +- .../DiscordShardedClient.cs | 10 +------- .../DiscordSocketApiClient.cs | 18 ++++----------- .../DiscordSocketClient.cs | 10 +------- .../DiscordSocketConfig.cs | 2 -- 9 files changed, 66 insertions(+), 40 deletions(-) diff --git a/src/Discord.Net.Rest/Entities/Gateway/GatewayLimits.cs b/src/Discord.Net.Rest/Entities/Gateway/GatewayLimits.cs index 7c0ea68f5..8b5cdf32b 100644 --- a/src/Discord.Net.Rest/Entities/Gateway/GatewayLimits.cs +++ b/src/Discord.Net.Rest/Entities/Gateway/GatewayLimits.cs @@ -1,3 +1,5 @@ +using System; + namespace Discord.Rest { /// @@ -9,13 +11,28 @@ namespace Discord.Rest /// Gets or sets the global limits for the gateway rate limiter. /// /// - /// This property includes all the other limits, like Identify. + /// This property includes all the other limits, like Identify, + /// and it is per websocket. /// public GatewayLimit Global { get; set; } /// /// Gets or sets the limits of Identify requests. /// + /// + /// This limit is included into but it is + /// also per account. + /// public GatewayLimit Identify { get; set; } + /// + /// Gets or sets the limits of Presence Update requests. + /// + /// + /// Presence updates include activity (playing, watching, etc) + /// and status (online, idle, etc) + /// + public GatewayLimit PresenceUpdate { get; set; } + + public string IdentifySemaphoreName { get; set; } /// /// Initializes a new with the default values. @@ -24,6 +41,8 @@ namespace Discord.Rest { Global = new GatewayLimit(120, 60); Identify = new GatewayLimit(1, 5); + PresenceUpdate = new GatewayLimit(5, 60); + IdentifySemaphoreName = Guid.NewGuid().ToString(); } internal static GatewayLimits GetOrCreate(GatewayLimits limits) diff --git a/src/Discord.Net.Rest/Net/Queue/GatewayBucket.cs b/src/Discord.Net.Rest/Net/Queue/GatewayBucket.cs index 2177031df..b1f6aae0e 100644 --- a/src/Discord.Net.Rest/Net/Queue/GatewayBucket.cs +++ b/src/Discord.Net.Rest/Net/Queue/GatewayBucket.cs @@ -6,12 +6,14 @@ namespace Discord.Net.Queue public enum GatewayBucketType { Unbucketed = 0, - Identify = 1 + Identify = 1, + PresenceUpdate = 2, } internal struct GatewayBucket { private static ImmutableDictionary DefsByType; private static ImmutableDictionary DefsById; + private static string IdentifySemaphoreName; static GatewayBucket() { @@ -20,6 +22,7 @@ namespace Discord.Net.Queue public static GatewayBucket Get(GatewayBucketType type) => DefsByType[type]; public static GatewayBucket Get(string id) => DefsById[id]; + public static string GetIdentifySemaphoreName() => IdentifySemaphoreName; public static void SetLimits(GatewayLimits limits) { @@ -28,11 +31,14 @@ namespace Discord.Net.Queue Preconditions.GreaterThan(limits.Global.Seconds, 0, nameof(limits.Global.Seconds), "Global seconds must be greater than zero."); Preconditions.GreaterThan(limits.Identify.Count, 0, nameof(limits.Identify.Count), "Identify count must be greater than zero."); Preconditions.GreaterThan(limits.Identify.Seconds, 0, nameof(limits.Identify.Seconds), "Identify seconds must be greater than zero."); + Preconditions.GreaterThan(limits.PresenceUpdate.Count, 0, nameof(limits.PresenceUpdate.Count), "PresenceUpdate count must be greater than zero."); + Preconditions.GreaterThan(limits.PresenceUpdate.Seconds, 0, nameof(limits.PresenceUpdate.Seconds), "PresenceUpdate seconds must be greater than zero."); var buckets = new[] { new GatewayBucket(GatewayBucketType.Unbucketed, "", limits.Global.Count, limits.Global.Seconds), - new GatewayBucket(GatewayBucketType.Identify, "", limits.Identify.Count, limits.Identify.Seconds) + new GatewayBucket(GatewayBucketType.Identify, "", limits.Identify.Count, limits.Identify.Seconds), + new GatewayBucket(GatewayBucketType.PresenceUpdate, "", limits.Identify.Count, limits.Identify.Seconds), }; var builder = ImmutableDictionary.CreateBuilder(); @@ -44,6 +50,8 @@ namespace Discord.Net.Queue foreach (var bucket in buckets) builder2.Add(bucket.Id, bucket); DefsById = builder2.ToImmutable(); + + IdentifySemaphoreName = limits.IdentifySemaphoreName; } public GatewayBucketType Type { get; } diff --git a/src/Discord.Net.Rest/Net/Queue/RequestQueue.cs b/src/Discord.Net.Rest/Net/Queue/RequestQueue.cs index be7dd8b38..639aef7c5 100644 --- a/src/Discord.Net.Rest/Net/Queue/RequestQueue.cs +++ b/src/Discord.Net.Rest/Net/Queue/RequestQueue.cs @@ -1,3 +1,4 @@ +using Discord.Rest; using System; using System.Collections.Concurrent; #if DEBUG_LIMITS @@ -22,12 +23,15 @@ namespace Discord.Net.Queue private CancellationTokenSource _requestCancelTokenSource; private CancellationToken _requestCancelToken; //Parent token + Clear token private DateTimeOffset _waitUntil; + private Semaphore _identifySemaphore; private Task _cleanupTask; public RequestQueue() { _tokenLock = new SemaphoreSlim(1, 1); + int semaphoreCount = GatewayBucket.Get(GatewayBucketType.Identify).WindowCount; + _identifySemaphore = new Semaphore(semaphoreCount, semaphoreCount, GatewayBucket.GetIdentifySemaphoreName()); _clearToken = new CancellationTokenSource(); _cancelTokenSource = new CancellationTokenSource(); @@ -120,10 +124,22 @@ namespace Discord.Net.Queue } internal async Task EnterGlobalAsync(int id, WebSocketRequest request) { + //If this is a global request (unbucketed), it'll be dealt in EnterAsync var requestBucket = GatewayBucket.Get(request.Options.BucketId); if (requestBucket.Type == GatewayBucketType.Unbucketed) return; + //Identify is per-account so we won't trigger global until we can actually go for it + if (requestBucket.Type == GatewayBucketType.Identify) + { + while (!_identifySemaphore.WaitOne(0)) //To not block the thread + await Task.Delay(100, request.CancelToken); +#if DEBUG_LIMITS + Debug.WriteLine($"[{id}] Acquired identify ticket"); +#endif + } + + //It's not a global request, so need to remove one from global (per-session) var globalBucketType = GatewayBucket.Get(GatewayBucketType.Unbucketed); var options = RequestOptions.CreateOrClone(request.Options); options.BucketId = globalBucketType.Id; @@ -131,6 +147,13 @@ namespace Discord.Net.Queue var globalBucket = GetOrCreateBucket(globalBucketType.Id, globalRequest); await globalBucket.TriggerAsync(id, globalRequest); } + internal void ReleaseIdentifySemaphore(int id) + { + _identifySemaphore.Release(); +#if DEBUG_LIMITS + Debug.WriteLine($"[{id}] Released identify ticket"); +#endif + } private RequestBucket GetOrCreateBucket(string id, IRequest request) { diff --git a/src/Discord.Net.Rest/Net/Queue/RequestQueueBucket.cs b/src/Discord.Net.Rest/Net/Queue/RequestQueueBucket.cs index ef5b247fd..b308e909f 100644 --- a/src/Discord.Net.Rest/Net/Queue/RequestQueueBucket.cs +++ b/src/Discord.Net.Rest/Net/Queue/RequestQueueBucket.cs @@ -348,7 +348,7 @@ namespace Discord.Net.Queue #if DEBUG_LIMITS Debug.WriteLine($"[{id}] Reset in {(int)Math.Ceiling((resetTick - DateTimeOffset.UtcNow).Value.TotalMilliseconds)} ms"); #endif - var _ = QueueReset(id, (int)Math.Ceiling((_resetTick.Value - DateTimeOffset.UtcNow).TotalMilliseconds)); + var _ = QueueReset(id, (int)Math.Ceiling((_resetTick.Value - DateTimeOffset.UtcNow).TotalMilliseconds), request); } return; } @@ -372,12 +372,12 @@ namespace Discord.Net.Queue if (!hasQueuedReset) { - var _ = QueueReset(id, (int)Math.Ceiling((_resetTick.Value - DateTimeOffset.UtcNow).TotalMilliseconds)); + var _ = QueueReset(id, (int)Math.Ceiling((_resetTick.Value - DateTimeOffset.UtcNow).TotalMilliseconds), request); } } } } - private async Task QueueReset(int id, int millis) + private async Task QueueReset(int id, int millis, IRequest request) { while (true) { @@ -391,6 +391,8 @@ namespace Discord.Net.Queue #if DEBUG_LIMITS Debug.WriteLine($"[{id}] * Reset *"); #endif + if (request is WebSocketRequest webSocketRequest && webSocketRequest.Options.BucketId == GatewayBucket.Get(GatewayBucketType.Identify).Id) + _queue.ReleaseIdentifySemaphore(id); _semaphore = WindowCount; _resetTick = null; return; diff --git a/src/Discord.Net.WebSocket/BaseSocketClient.cs b/src/Discord.Net.WebSocket/BaseSocketClient.cs index fec6b23cf..548bb75bf 100644 --- a/src/Discord.Net.WebSocket/BaseSocketClient.cs +++ b/src/Discord.Net.WebSocket/BaseSocketClient.cs @@ -80,7 +80,7 @@ namespace Discord.WebSocket internal BaseSocketClient(DiscordSocketConfig config, DiscordRestApiClient client) : base(config, client) => BaseConfig = config; private static DiscordSocketApiClient CreateApiClient(DiscordSocketConfig config) - => new DiscordSocketApiClient(config.RestClientProvider, config.WebSocketProvider, DiscordRestConfig.UserAgent, config.WebsocketRequestQueue, + => new DiscordSocketApiClient(config.RestClientProvider, config.WebSocketProvider, DiscordRestConfig.UserAgent, rateLimitPrecision: config.RateLimitPrecision, useSystemClock: config.UseSystemClock); diff --git a/src/Discord.Net.WebSocket/DiscordShardedClient.cs b/src/Discord.Net.WebSocket/DiscordShardedClient.cs index 13eacc6eb..e5d31e5c3 100644 --- a/src/Discord.Net.WebSocket/DiscordShardedClient.cs +++ b/src/Discord.Net.WebSocket/DiscordShardedClient.cs @@ -85,17 +85,9 @@ namespace Discord.WebSocket RegisterEvents(_shards[i], i == 0); } } - - ApiClient.WebSocketRequestQueue.RateLimitTriggered += async (id, info) => - { - if (info == null) - await _restLogger.VerboseAsync($"Preemptive Rate limit triggered: {id ?? "null"}").ConfigureAwait(false); - else - await _restLogger.WarningAsync($"Rate limit triggered: {id ?? "null"}").ConfigureAwait(false); - }; } private static API.DiscordSocketApiClient CreateApiClient(DiscordSocketConfig config) - => new API.DiscordSocketApiClient(config.RestClientProvider, config.WebSocketProvider, DiscordRestConfig.UserAgent, config.WebsocketRequestQueue, + => new API.DiscordSocketApiClient(config.RestClientProvider, config.WebSocketProvider, DiscordRestConfig.UserAgent, rateLimitPrecision: config.RateLimitPrecision); internal override async Task OnLoginAsync(TokenType tokenType, string token) diff --git a/src/Discord.Net.WebSocket/DiscordSocketApiClient.cs b/src/Discord.Net.WebSocket/DiscordSocketApiClient.cs index 1bf59994e..86c297070 100644 --- a/src/Discord.Net.WebSocket/DiscordSocketApiClient.cs +++ b/src/Discord.Net.WebSocket/DiscordSocketApiClient.cs @@ -37,17 +37,7 @@ namespace Discord.API public ConnectionState ConnectionState { get; private set; } - internal RequestQueue WebSocketRequestQueue { get; } - public DiscordSocketApiClient(RestClientProvider restClientProvider, WebSocketProvider webSocketProvider, string userAgent, - string url = null, RetryMode defaultRetryMode = RetryMode.AlwaysRetry, JsonSerializer serializer = null, - RateLimitPrecision rateLimitPrecision = RateLimitPrecision.Second, - bool useSystemClock = true) - : this(restClientProvider, webSocketProvider, userAgent, null, url, defaultRetryMode, serializer, rateLimitPrecision, useSystemClock) - { - } - - internal DiscordSocketApiClient(RestClientProvider restClientProvider, WebSocketProvider webSocketProvider, string userAgent, RequestQueue websocketRequestQueue, string url = null, RetryMode defaultRetryMode = RetryMode.AlwaysRetry, JsonSerializer serializer = null, RateLimitPrecision rateLimitPrecision = RateLimitPrecision.Second, bool useSystemClock = true) @@ -58,7 +48,6 @@ namespace Discord.API _isExplicitUrl = true; WebSocketClient = webSocketProvider(); //WebSocketClient.SetHeader("user-agent", DiscordConfig.UserAgent); (Causes issues in .NET Framework 4.6+) - WebSocketRequestQueue = websocketRequestQueue ?? new RequestQueue(); WebSocketClient.BinaryMessage += async (data, index, count) => { @@ -218,8 +207,9 @@ namespace Discord.API bytes = Encoding.UTF8.GetBytes(SerializeJson(payload)); options.IsGatewayBucket = true; - options.BucketId = GatewayBucket.Get(opCode == GatewayOpCode.Identify ? GatewayBucketType.Identify : GatewayBucketType.Unbucketed).Id; - await WebSocketRequestQueue.SendAsync(new WebSocketRequest(WebSocketClient, bytes, true, options)).ConfigureAwait(false); + if (string.IsNullOrEmpty(options.BucketId)) + options.BucketId = GatewayBucket.Get(GatewayBucketType.Unbucketed).Id; + await RequestQueue.SendAsync(new WebSocketRequest(WebSocketClient, bytes, true, options)).ConfigureAwait(false); await _sentGatewayMessageEvent.InvokeAsync(opCode).ConfigureAwait(false); } @@ -240,6 +230,7 @@ namespace Discord.API if (totalShards > 1) msg.ShardingParams = new int[] { shardID, totalShards }; + options.BucketId = GatewayBucket.Get(GatewayBucketType.Identify).Id; await SendGatewayAsync(GatewayOpCode.Identify, msg, options: options).ConfigureAwait(false); } public async Task SendResumeAsync(string sessionId, int lastSeq, RequestOptions options = null) @@ -268,6 +259,7 @@ namespace Discord.API IsAFK = isAFK, Game = game }; + options.BucketId = GatewayBucket.Get(GatewayBucketType.PresenceUpdate).Id; await SendGatewayAsync(GatewayOpCode.StatusUpdate, args, options: options).ConfigureAwait(false); } public async Task SendRequestMembersAsync(IEnumerable guildIds, RequestOptions options = null) diff --git a/src/Discord.Net.WebSocket/DiscordSocketClient.cs b/src/Discord.Net.WebSocket/DiscordSocketClient.cs index 18ebda07d..c3979ebb4 100644 --- a/src/Discord.Net.WebSocket/DiscordSocketClient.cs +++ b/src/Discord.Net.WebSocket/DiscordSocketClient.cs @@ -122,14 +122,6 @@ namespace Discord.WebSocket public DiscordSocketClient(DiscordSocketConfig config) : this(config, CreateApiClient(config), null, null) { GatewayBucket.SetLimits(GatewayLimits.GetOrCreate(config.GatewayLimits)); - - ApiClient.WebSocketRequestQueue.RateLimitTriggered += async (id, info) => - { - if (info == null) - await _restLogger.VerboseAsync($"Preemptive Rate limit triggered: {id ?? "null"}").ConfigureAwait(false); - else - await _restLogger.WarningAsync($"Rate limit triggered: {id ?? "null"}").ConfigureAwait(false); - }; } internal DiscordSocketClient(DiscordSocketConfig config, SemaphoreSlim groupLock, DiscordSocketClient parentClient) : this(config, CreateApiClient(config), groupLock, parentClient) { } #pragma warning restore IDISP004 @@ -190,7 +182,7 @@ namespace Discord.WebSocket _largeGuilds = new ConcurrentQueue(); } private static API.DiscordSocketApiClient CreateApiClient(DiscordSocketConfig config) - => new API.DiscordSocketApiClient(config.RestClientProvider, config.WebSocketProvider, DiscordRestConfig.UserAgent, config.WebsocketRequestQueue, config.GatewayHost, + => new API.DiscordSocketApiClient(config.RestClientProvider, config.WebSocketProvider, DiscordRestConfig.UserAgent, config.GatewayHost, rateLimitPrecision: config.RateLimitPrecision); /// internal override void Dispose(bool disposing) diff --git a/src/Discord.Net.WebSocket/DiscordSocketConfig.cs b/src/Discord.Net.WebSocket/DiscordSocketConfig.cs index d6f86c924..4df080f91 100644 --- a/src/Discord.Net.WebSocket/DiscordSocketConfig.cs +++ b/src/Discord.Net.WebSocket/DiscordSocketConfig.cs @@ -135,8 +135,6 @@ namespace Discord.WebSocket /// public GatewayLimits GatewayLimits { get; set; } = new GatewayLimits(); - internal RequestQueue WebsocketRequestQueue { get; } = new RequestQueue(); - /// /// Initializes a default configuration. ///