@@ -14,7 +14,8 @@ namespace Discord.Net.Queue
{
{
public event Func<string, RateLimitInfo?, Task> RateLimitTriggered;
public event Func<string, RateLimitInfo?, Task> RateLimitTriggered;
private readonly ConcurrentDictionary<string, RequestBucket> _buckets;
private readonly ConcurrentDictionary<string, RequestBucket> _bucketsByHash;
private readonly ConcurrentDictionary<string, object> _bucketsById;
private readonly SemaphoreSlim _tokenLock;
private readonly SemaphoreSlim _tokenLock;
private readonly CancellationTokenSource _cancelTokenSource; //Dispose token
private readonly CancellationTokenSource _cancelTokenSource; //Dispose token
private CancellationTokenSource _clearToken;
private CancellationTokenSource _clearToken;
@@ -34,7 +35,8 @@ namespace Discord.Net.Queue
_requestCancelToken = CancellationToken.None;
_requestCancelToken = CancellationToken.None;
_parentToken = CancellationToken.None;
_parentToken = CancellationToken.None;
_buckets = new ConcurrentDictionary<string, RequestBucket>();
_bucketsByHash = new ConcurrentDictionary<string, RequestBucket>();
_bucketsById = new ConcurrentDictionary<string, object>();
_cleanupTask = RunCleanup();
_cleanupTask = RunCleanup();
}
}
@@ -112,12 +114,24 @@ namespace Discord.Net.Queue
private RequestBucket GetOrCreateBucket(string id, RestRequest request)
private RequestBucket GetOrCreateBucket(string id, RestRequest request)
{
{
return _buckets.GetOrAdd(id, x => new RequestBucket(this, request, x));
object obj = _bucketsById.GetOrAdd(id, x => new RequestBucket(this, request, x));
if (obj is string hash)
return _bucketsByHash.GetOrAdd(hash, x => new RequestBucket(this, request, x));
return (RequestBucket)obj;
}
}
internal async Task RaiseRateLimitTriggered(string bucketId, RateLimitInfo? info)
internal async Task RaiseRateLimitTriggered(string bucketId, RateLimitInfo? info)
{
{
await RateLimitTriggered(bucketId, info).ConfigureAwait(false);
await RateLimitTriggered(bucketId, info).ConfigureAwait(false);
}
}
internal void UpdateBucketHash(string id, string discordHash)
{
if (_bucketsById.TryGetValue(id, out object obj) && obj is RequestBucket bucket)
{
string hash = discordHash + id.Split(new char[] { ' ' }, 2)[1]; //remove http method, using hash now
_bucketsByHash.GetOrAdd(hash, bucket);
_bucketsById.TryUpdate(id, hash, bucket);
}
}
private async Task RunCleanup()
private async Task RunCleanup()
{
{
@@ -126,10 +140,21 @@ namespace Discord.Net.Queue
while (!_cancelTokenSource.IsCancellationRequested)
while (!_cancelTokenSource.IsCancellationRequested)
{
{
var now = DateTimeOffset.UtcNow;
var now = DateTimeOffset.UtcNow;
foreach (var bucket in _buckets.Select(x => x.Value))
foreach (var bucket in _bucketsById.Where(x => x.Value is RequestBucket) .Select(x => (RequestBucket) x.Value))
{
{
if ((now - bucket.LastAttemptAt).TotalMinutes > 1.0)
if ((now - bucket.LastAttemptAt).TotalMinutes > 1.0)
_buckets.TryRemove(bucket.Id, out _);
_bucketsById.TryRemove(bucket.Id, out _);
}
foreach (var kvp in _bucketsByHash)
{
var kvpHash = kvp.Key;
var kvpBucket = kvp.Value;
if ((now - kvpBucket.LastAttemptAt).TotalMinutes > 1.0)
{
_bucketsByHash.TryRemove(kvpHash, out _);
foreach (var key in _bucketsById.Where(x => x.Value is string hash && hash == kvpHash).Select(x => x.Key))
_bucketsById.TryRemove(key, out _);
}
}
}
await Task.Delay(60000, _cancelTokenSource.Token).ConfigureAwait(false); //Runs each minute
await Task.Delay(60000, _cancelTokenSource.Token).ConfigureAwait(false); //Runs each minute
}
}