From c9171619d91a1abda7bf0f36e65b3894b0df827d Mon Sep 17 00:00:00 2001 From: RogueException Date: Wed, 22 Jun 2016 19:35:49 -0300 Subject: [PATCH] Added Ids to rate limit, improved bucket logic, added channel bucks and buckettargets, and exposed bucket data. --- src/Discord.Net/DiscordClient.cs | 12 ++- src/Discord.Net/DiscordSocketClient.cs | 8 +- src/Discord.Net/Net/Queue/BucketDefinition.cs | 16 ---- .../Net/Queue/Definitions/BucketDefinition.cs | 30 ++++++ .../Queue/{ => Definitions}/BucketGroup.cs | 3 +- .../Net/Queue/Definitions/BucketTarget.cs | 9 ++ .../Net/Queue/Definitions/ChannelBucket.cs | 7 ++ .../Queue/{ => Definitions}/GlobalBucket.cs | 0 .../Queue/{ => Definitions}/GuildBucket.cs | 0 src/Discord.Net/Net/Queue/RequestQueue.cs | 96 ++++++++++++++----- .../Net/Queue/RequestQueueBucket.cs | 62 +++++++++--- src/Discord.Net/Net/RateLimitException.cs | 1 + 12 files changed, 183 insertions(+), 61 deletions(-) delete mode 100644 src/Discord.Net/Net/Queue/BucketDefinition.cs create mode 100644 src/Discord.Net/Net/Queue/Definitions/BucketDefinition.cs rename src/Discord.Net/Net/Queue/{ => Definitions}/BucketGroup.cs (74%) create mode 100644 src/Discord.Net/Net/Queue/Definitions/BucketTarget.cs create mode 100644 src/Discord.Net/Net/Queue/Definitions/ChannelBucket.cs rename src/Discord.Net/Net/Queue/{ => Definitions}/GlobalBucket.cs (100%) rename src/Discord.Net/Net/Queue/{ => Definitions}/GuildBucket.cs (100%) diff --git a/src/Discord.Net/DiscordClient.cs b/src/Discord.Net/DiscordClient.cs index 001c117e4..eafc06810 100644 --- a/src/Discord.Net/DiscordClient.cs +++ b/src/Discord.Net/DiscordClient.cs @@ -18,7 +18,7 @@ namespace Discord public event Func Log; public event Func LoggedIn, LoggedOut; - internal readonly Logger _discordLogger, _restLogger; + internal readonly Logger _discordLogger, _restLogger, _queueLogger; internal readonly SemaphoreSlim _connectionLock; internal readonly LogManager _log; internal readonly RequestQueue _requestQueue; @@ -38,12 +38,20 @@ namespace Discord _log.Message += async msg => await Log.RaiseAsync(msg).ConfigureAwait(false); _discordLogger = _log.CreateLogger("Discord"); _restLogger = _log.CreateLogger("Rest"); + _queueLogger = _log.CreateLogger("Queue"); _connectionLock = new SemaphoreSlim(1, 1); + _requestQueue = new RequestQueue(); + _requestQueue.RateLimitTriggered += async (id, bucket, millis) => + { + await _queueLogger.WarningAsync($"Rate limit triggered (id = \"{id ?? "null"}\")").ConfigureAwait(false); + if (bucket == null && id != null) + await _queueLogger.WarningAsync($"Unknown rate limit bucket \"{id ?? "null"}\"").ConfigureAwait(false); + }; ApiClient = new API.DiscordApiClient(config.RestClientProvider, (config as DiscordSocketConfig)?.WebSocketProvider, requestQueue: _requestQueue); - ApiClient.SentRequest += async (method, endpoint, millis) => await _log.VerboseAsync("Rest", $"{method} {endpoint}: {millis} ms").ConfigureAwait(false); + ApiClient.SentRequest += async (method, endpoint, millis) => await _restLogger.VerboseAsync($"{method} {endpoint}: {millis} ms").ConfigureAwait(false); } /// diff --git a/src/Discord.Net/DiscordSocketClient.cs b/src/Discord.Net/DiscordSocketClient.cs index 95f943d09..33aa1a105 100644 --- a/src/Discord.Net/DiscordSocketClient.cs +++ b/src/Discord.Net/DiscordSocketClient.cs @@ -21,8 +21,7 @@ namespace Discord //TODO: Add resume logic public class DiscordSocketClient : DiscordClient, IDiscordClient { - public event Func Connected, Disconnected; - public event Func Ready; + public event Func Connected, Disconnected, Ready; //public event Func VoiceConnected, VoiceDisconnected; public event Func ChannelCreated, ChannelDestroyed; public event Func ChannelUpdated; @@ -174,6 +173,7 @@ namespace Discord _connectTask = new TaskCompletionSource(); _cancelToken = new CancellationTokenSource(); await ApiClient.ConnectAsync().ConfigureAwait(false); + await Connected.RaiseAsync().ConfigureAwait(false); await _connectTask.Task.ConfigureAwait(false); @@ -185,8 +185,6 @@ namespace Discord await DisconnectInternalAsync().ConfigureAwait(false); throw; } - - await Connected.RaiseAsync().ConfigureAwait(false); } /// public async Task DisconnectAsync() @@ -1139,6 +1137,7 @@ namespace Discord private async Task RunHeartbeatAsync(int intervalMillis, CancellationToken cancelToken) { + //Clean this up when Discord's session patch is live try { while (!cancelToken.IsCancellationRequested) @@ -1161,7 +1160,6 @@ namespace Discord } catch (OperationCanceledException) { } } - private async Task WaitForGuildsAsync(CancellationToken cancelToken) { while ((_unavailableGuilds != 0) && (Environment.TickCount - _lastGuildAvailableTime < 2000)) diff --git a/src/Discord.Net/Net/Queue/BucketDefinition.cs b/src/Discord.Net/Net/Queue/BucketDefinition.cs deleted file mode 100644 index 64292213d..000000000 --- a/src/Discord.Net/Net/Queue/BucketDefinition.cs +++ /dev/null @@ -1,16 +0,0 @@ -namespace Discord.Net.Queue -{ - internal struct BucketDefinition - { - public int WindowCount { get; } - public int WindowSeconds { get; } - public GlobalBucket? Parent { get; } - - public BucketDefinition(int windowCount, int windowSeconds, GlobalBucket? parent = null) - { - WindowCount = windowCount; - WindowSeconds = windowSeconds; - Parent = parent; - } - } -} diff --git a/src/Discord.Net/Net/Queue/Definitions/BucketDefinition.cs b/src/Discord.Net/Net/Queue/Definitions/BucketDefinition.cs new file mode 100644 index 000000000..cfc53b0c8 --- /dev/null +++ b/src/Discord.Net/Net/Queue/Definitions/BucketDefinition.cs @@ -0,0 +1,30 @@ +namespace Discord.Net.Queue +{ + public sealed class Bucket + { + /// Gets the unique identifier for this bucket. + public string Id { get; } + /// Gets the name of this bucket. + public string Name { get; } + /// Gets the amount of requests that may be sent per window. + public int WindowCount { get; } + /// Gets the length of this bucket's window, in seconds. + public int WindowSeconds { get; } + /// Gets the type of account this bucket affects. + public BucketTarget Target { get; } + /// Gets this bucket's parent. + public GlobalBucket? Parent { get; } + + internal Bucket(string id, int windowCount, int windowSeconds, BucketTarget target, GlobalBucket? parent = null) + : this(id, id, windowCount, windowSeconds, target, parent) { } + internal Bucket(string id, string name, int windowCount, int windowSeconds, BucketTarget target, GlobalBucket? parent = null) + { + Id = id; + Name = name; + WindowCount = windowCount; + WindowSeconds = windowSeconds; + Target = target; + Parent = parent; + } + } +} diff --git a/src/Discord.Net/Net/Queue/BucketGroup.cs b/src/Discord.Net/Net/Queue/Definitions/BucketGroup.cs similarity index 74% rename from src/Discord.Net/Net/Queue/BucketGroup.cs rename to src/Discord.Net/Net/Queue/Definitions/BucketGroup.cs index 0b0367065..e7b0a4181 100644 --- a/src/Discord.Net/Net/Queue/BucketGroup.cs +++ b/src/Discord.Net/Net/Queue/Definitions/BucketGroup.cs @@ -3,6 +3,7 @@ public enum BucketGroup { Global, - Guild + Guild, + Channel } } diff --git a/src/Discord.Net/Net/Queue/Definitions/BucketTarget.cs b/src/Discord.Net/Net/Queue/Definitions/BucketTarget.cs new file mode 100644 index 000000000..0e5a5d552 --- /dev/null +++ b/src/Discord.Net/Net/Queue/Definitions/BucketTarget.cs @@ -0,0 +1,9 @@ +namespace Discord.Net.Queue +{ + public enum BucketTarget + { + Client, + Bot, + Both + } +} diff --git a/src/Discord.Net/Net/Queue/Definitions/ChannelBucket.cs b/src/Discord.Net/Net/Queue/Definitions/ChannelBucket.cs new file mode 100644 index 000000000..235e6dfdf --- /dev/null +++ b/src/Discord.Net/Net/Queue/Definitions/ChannelBucket.cs @@ -0,0 +1,7 @@ +namespace Discord.Net.Queue +{ + public enum ChannelBucket + { + SendEditMessage, + } +} diff --git a/src/Discord.Net/Net/Queue/GlobalBucket.cs b/src/Discord.Net/Net/Queue/Definitions/GlobalBucket.cs similarity index 100% rename from src/Discord.Net/Net/Queue/GlobalBucket.cs rename to src/Discord.Net/Net/Queue/Definitions/GlobalBucket.cs diff --git a/src/Discord.Net/Net/Queue/GuildBucket.cs b/src/Discord.Net/Net/Queue/Definitions/GuildBucket.cs similarity index 100% rename from src/Discord.Net/Net/Queue/GuildBucket.cs rename to src/Discord.Net/Net/Queue/Definitions/GuildBucket.cs diff --git a/src/Discord.Net/Net/Queue/RequestQueue.cs b/src/Discord.Net/Net/Queue/RequestQueue.cs index 27b11a38d..f23176411 100644 --- a/src/Discord.Net/Net/Queue/RequestQueue.cs +++ b/src/Discord.Net/Net/Queue/RequestQueue.cs @@ -10,52 +10,83 @@ namespace Discord.Net.Queue { public class RequestQueue { - private readonly static ImmutableDictionary _globalLimits; - private readonly static ImmutableDictionary _guildLimits; + public event Func RateLimitTriggered; + + private readonly static ImmutableDictionary _globalLimits; + private readonly static ImmutableDictionary _guildLimits; + private readonly static ImmutableDictionary _channelLimits; private readonly SemaphoreSlim _lock; private readonly RequestQueueBucket[] _globalBuckets; private readonly ConcurrentDictionary[] _guildBuckets; + private readonly ConcurrentDictionary[] _channelBuckets; private CancellationTokenSource _clearToken; private CancellationToken _parentToken; private CancellationToken _cancelToken; static RequestQueue() { - _globalLimits = new Dictionary + _globalLimits = new Dictionary { //REST - [GlobalBucket.GeneralRest] = new BucketDefinition(0, 0), //No Limit + [GlobalBucket.GeneralRest] = new Bucket(null, "rest", 0, 0, BucketTarget.Both), //No Limit //[GlobalBucket.Login] = new BucketDefinition(1, 1), - [GlobalBucket.DirectMessage] = new BucketDefinition(5, 5), - [GlobalBucket.SendEditMessage] = new BucketDefinition(50, 10), + [GlobalBucket.DirectMessage] = new Bucket("bot:msg:dm", 5, 5, BucketTarget.Bot), + [GlobalBucket.SendEditMessage] = new Bucket("bot:msg:global", 50, 10, BucketTarget.Bot), //Gateway - [GlobalBucket.GeneralGateway] = new BucketDefinition(120, 60), - [GlobalBucket.UpdateStatus] = new BucketDefinition(5, 1, GlobalBucket.GeneralGateway) + [GlobalBucket.GeneralGateway] = new Bucket(null, "gateway", 120, 60, BucketTarget.Both), + [GlobalBucket.UpdateStatus] = new Bucket(null, "status", 5, 1, BucketTarget.Both, GlobalBucket.GeneralGateway) + }.ToImmutableDictionary(); + + _guildLimits = new Dictionary + { + //REST + [GuildBucket.SendEditMessage] = new Bucket("bot:msg:server", 5, 5, BucketTarget.Bot, GlobalBucket.SendEditMessage), + [GuildBucket.DeleteMessage] = new Bucket("dmsg", 5, 1, BucketTarget.Bot), + [GuildBucket.DeleteMessages] = new Bucket("bdmsg", 1, 1, BucketTarget.Bot), + [GuildBucket.ModifyMember] = new Bucket("guild_member", 10, 10, BucketTarget.Bot), + [GuildBucket.Nickname] = new Bucket("guild_member_nick", 1, 1, BucketTarget.Bot) }.ToImmutableDictionary(); - _guildLimits = new Dictionary + //Client-Only + _channelLimits = new Dictionary { //REST - [GuildBucket.SendEditMessage] = new BucketDefinition(5, 5, GlobalBucket.SendEditMessage), - [GuildBucket.DeleteMessage] = new BucketDefinition(5, 1), - [GuildBucket.DeleteMessages] = new BucketDefinition(1, 1), - [GuildBucket.ModifyMember] = new BucketDefinition(10, 10), - [GuildBucket.Nickname] = new BucketDefinition(1, 1) + [ChannelBucket.SendEditMessage] = new Bucket("msg", 10, 10, BucketTarget.Client, GlobalBucket.SendEditMessage), }.ToImmutableDictionary(); } + public static Bucket GetBucketInfo(GlobalBucket bucket) => _globalLimits[bucket]; + public static Bucket GetBucketInfo(GuildBucket bucket) => _guildLimits[bucket]; + public static Bucket GetBucketInfo(ChannelBucket bucket) => _channelLimits[bucket]; + public RequestQueue() { _lock = new SemaphoreSlim(1, 1); _globalBuckets = new RequestQueueBucket[_globalLimits.Count]; foreach (var pair in _globalLimits) - _globalBuckets[(int)pair.Key] = CreateBucket(pair.Value); + { + //var target = _globalLimits[pair.Key].Target; + //if (target == BucketTarget.Both || (target == BucketTarget.Bot && isBot) || (target == BucketTarget.Client && !isBot)) + _globalBuckets[(int)pair.Key] = CreateBucket(pair.Value); + } _guildBuckets = new ConcurrentDictionary[_guildLimits.Count]; for (int i = 0; i < _guildLimits.Count; i++) - _guildBuckets[i] = new ConcurrentDictionary(); + { + //var target = _guildLimits[(GuildBucket)i].Target; + //if (target == BucketTarget.Both || (target == BucketTarget.Bot && isBot) || (target == BucketTarget.Client && !isBot)) + _guildBuckets[i] = new ConcurrentDictionary(); + } + + _channelBuckets = new ConcurrentDictionary[_channelLimits.Count]; + for (int i = 0; i < _channelLimits.Count; i++) + { + //var target = _channelLimits[(GuildBucket)i].Target; + //if (target == BucketTarget.Both || (target == BucketTarget.Bot && isBot) || (target == BucketTarget.Client && !isBot)) + _channelBuckets[i] = new ConcurrentDictionary(); + } _clearToken = new CancellationTokenSource(); _cancelToken = CancellationToken.None; @@ -72,23 +103,23 @@ namespace Discord.Net.Queue finally { _lock.Release(); } } - internal async Task SendAsync(RestRequest request, BucketGroup group, int bucketId, ulong guildId) + internal async Task SendAsync(RestRequest request, BucketGroup group, int bucketId, ulong objId) { request.CancelToken = _cancelToken; - var bucket = GetBucket(group, bucketId, guildId); + var bucket = GetBucket(group, bucketId, objId); return await bucket.SendAsync(request).ConfigureAwait(false); } - internal async Task SendAsync(WebSocketRequest request, BucketGroup group, int bucketId, ulong guildId) + internal async Task SendAsync(WebSocketRequest request, BucketGroup group, int bucketId, ulong objId) { request.CancelToken = _cancelToken; - var bucket = GetBucket(group, bucketId, guildId); + var bucket = GetBucket(group, bucketId, objId); return await bucket.SendAsync(request).ConfigureAwait(false); } - private RequestQueueBucket CreateBucket(BucketDefinition def) + private RequestQueueBucket CreateBucket(Bucket def) { var parent = def.Parent != null ? GetGlobalBucket(def.Parent.Value) : null; - return new RequestQueueBucket(def.WindowCount, def.WindowSeconds * 1000, parent); + return new RequestQueueBucket(this, def, parent); } public void DestroyGuildBucket(GuildBucket type, ulong guildId) @@ -97,15 +128,23 @@ namespace Discord.Net.Queue RequestQueueBucket bucket; _guildBuckets[(int)type].TryRemove(guildId, out bucket); } + public void DestroyChannelBucket(ChannelBucket type, ulong channelId) + { + //Assume this object is locked + RequestQueueBucket bucket; + _channelBuckets[(int)type].TryRemove(channelId, out bucket); + } - private RequestQueueBucket GetBucket(BucketGroup group, int bucketId, ulong guildId) + private RequestQueueBucket GetBucket(BucketGroup group, int bucketId, ulong objId) { switch (group) { case BucketGroup.Global: return GetGlobalBucket((GlobalBucket)bucketId); case BucketGroup.Guild: - return GetGuildBucket((GuildBucket)bucketId, guildId); + return GetGuildBucket((GuildBucket)bucketId, objId); + case BucketGroup.Channel: + return GetChannelBucket((ChannelBucket)bucketId, objId); default: throw new ArgumentException($"Unknown bucket group: {group}", nameof(group)); } @@ -118,6 +157,10 @@ namespace Discord.Net.Queue { return _guildBuckets[(int)type].GetOrAdd(guildId, _ => CreateBucket(_guildLimits[type])); } + private RequestQueueBucket GetChannelBucket(ChannelBucket type, ulong channelId) + { + return _channelBuckets[(int)type].GetOrAdd(channelId, _ => CreateBucket(_channelLimits[type])); + } public async Task ClearAsync() { @@ -133,5 +176,10 @@ namespace Discord.Net.Queue } finally { _lock.Release(); } } + + internal async Task RaiseRateLimitTriggered(string id, Bucket bucket, int millis) + { + await RateLimitTriggered.Invoke(id, bucket, millis).ConfigureAwait(false); + } } } diff --git a/src/Discord.Net/Net/Queue/RequestQueueBucket.cs b/src/Discord.Net/Net/Queue/RequestQueueBucket.cs index 08bd1a388..a61d71476 100644 --- a/src/Discord.Net/Net/Queue/RequestQueueBucket.cs +++ b/src/Discord.Net/Net/Queue/RequestQueueBucket.cs @@ -9,27 +9,51 @@ namespace Discord.Net.Queue { internal class RequestQueueBucket { - private readonly int _windowMilliseconds; + private readonly RequestQueue _queue; private readonly SemaphoreSlim _semaphore; private readonly object _pauseLock; private int _pauseEndTick; private TaskCompletionSource _resumeNotifier; + public Bucket Definition { get; } public RequestQueueBucket Parent { get; } public Task _resetTask { get; } - public RequestQueueBucket(int windowCount, int windowMilliseconds, RequestQueueBucket parent = null) + public RequestQueueBucket(RequestQueue queue, Bucket definition, RequestQueueBucket parent = null) { - if (windowCount != 0) - _semaphore = new SemaphoreSlim(windowCount, windowCount); + _queue = queue; + Definition = definition; + if (definition.WindowCount != 0) + _semaphore = new SemaphoreSlim(definition.WindowCount, definition.WindowCount); + Parent = parent; + _pauseLock = new object(); _resumeNotifier = new TaskCompletionSource(); _resumeNotifier.SetResult(0); - _windowMilliseconds = windowMilliseconds; - Parent = parent; } public async Task SendAsync(IQueuedRequest request) + { + while (true) + { + try + { + return await SendAsyncInternal(request).ConfigureAwait(false); + } + catch (HttpRateLimitException ex) + { + //When a 429 occurs, we drop all our locks, including the ones we wanted. + //This is generally safe though since 429s actually occuring should be very rare. + RequestQueueBucket bucket; + bool success = FindBucket(ex.BucketId, out bucket); + + await _queue.RaiseRateLimitTriggered(ex.BucketId, success ? bucket.Definition : (Bucket)null, ex.RetryAfterMilliseconds).ConfigureAwait(false); + + bucket.Pause(ex.RetryAfterMilliseconds); + } + } + } + private async Task SendAsyncInternal(IQueuedRequest request) { var endTick = request.TimeoutTick; @@ -64,7 +88,7 @@ namespace Discord.Net.Queue { //If there's a parent bucket, pass this request to them if (Parent != null) - return await Parent.SendAsync(request).ConfigureAwait(false); + return await Parent.SendAsyncInternal(request).ConfigureAwait(false); //We have all our semaphores, send the request return await request.SendAsync().ConfigureAwait(false); @@ -73,11 +97,6 @@ namespace Discord.Net.Queue { continue; } - catch (HttpRateLimitException ex) - { - Pause(ex.RetryAfterMilliseconds); - continue; - } } } finally @@ -88,6 +107,23 @@ namespace Discord.Net.Queue } } + private bool FindBucket(string id, out RequestQueueBucket bucket) + { + //Keep going up until we find a bucket with matching id or we're at the topmost bucket + if (Definition.Id == id) + { + bucket = this; + return true; + } + else if (Parent == null) + { + bucket = this; + return false; + } + else + return Parent.FindBucket(id, out bucket); + } + private void Pause(int milliseconds) { lock (_pauseLock) @@ -120,7 +156,7 @@ namespace Discord.Net.Queue } private async Task QueueExitAsync() { - await Task.Delay(_windowMilliseconds).ConfigureAwait(false); + await Task.Delay(Definition.WindowSeconds * 1000).ConfigureAwait(false); _semaphore.Release(); } } diff --git a/src/Discord.Net/Net/RateLimitException.cs b/src/Discord.Net/Net/RateLimitException.cs index a07e90760..4fa4900e9 100644 --- a/src/Discord.Net/Net/RateLimitException.cs +++ b/src/Discord.Net/Net/RateLimitException.cs @@ -4,6 +4,7 @@ namespace Discord.Net { public class HttpRateLimitException : HttpException { + public string BucketId { get; } public int RetryAfterMilliseconds { get; } public HttpRateLimitException(int retryAfterMilliseconds)