Browse Source

Fixed MessageQueue race condition

tags/1.0-rc
RogueException 9 years ago
parent
commit
7cccf6b30c
1 changed files with 36 additions and 34 deletions
  1. +36
    -34
      src/Discord.Net/Net/Rest/RequestQueue/RequestQueueBucket.cs

+ 36
- 34
src/Discord.Net/Net/Rest/RequestQueue/RequestQueueBucket.cs View File

@@ -16,8 +16,8 @@ namespace Discord.Net.Rest
private readonly ConcurrentQueue<RestRequest> _queue; private readonly ConcurrentQueue<RestRequest> _queue;
private readonly SemaphoreSlim _lock; private readonly SemaphoreSlim _lock;
private Task _resetTask; private Task _resetTask;
private DateTime? _retryAfter;
private bool _waitingToProcess;
private bool _waitingToProcess, _destroyed; //TODO: Remove _destroyed
private int _id;


public int WindowMaxCount { get; } public int WindowMaxCount { get; }
public int WindowSeconds { get; } public int WindowSeconds { get; }
@@ -44,10 +44,12 @@ namespace Discord.Net.Rest
WindowSeconds = windowSeconds; WindowSeconds = windowSeconds;
_queue = new ConcurrentQueue<RestRequest>(); _queue = new ConcurrentQueue<RestRequest>();
_lock = new SemaphoreSlim(1, 1); _lock = new SemaphoreSlim(1, 1);
_id = new System.Random().Next(0, int.MaxValue);
} }


public void Queue(RestRequest request) public void Queue(RestRequest request)
{ {
if (_destroyed) throw new Exception();
//Assume this obj's parent is under lock //Assume this obj's parent is under lock


_queue.Enqueue(request); _queue.Enqueue(request);
@@ -75,7 +77,7 @@ namespace Discord.Net.Rest
//If we're waiting to reset (due to a rate limit exception, or preemptive check), abort //If we're waiting to reset (due to a rate limit exception, or preemptive check), abort
if (WindowCount == WindowMaxCount) return; if (WindowCount == WindowMaxCount) return;
//Get next request, return if queue is empty //Get next request, return if queue is empty
if (!_queue.TryPeek(out request)) return;
if (!_queue.TryPeek(out request)) break;


try try
{ {
@@ -88,17 +90,20 @@ namespace Discord.Net.Rest
} }
catch (HttpRateLimitException ex) //Preemptive check failed, use Discord's time instead of our own catch (HttpRateLimitException ex) //Preemptive check failed, use Discord's time instead of our own
{ {
if (_resetTask == null)
WindowCount = WindowMaxCount;
var task = _resetTask;
if (task != null)
{ {
//No reset has been queued yet, lets create one as if this *was* preemptive
_resetTask = ResetAfter(ex.RetryAfterMilliseconds);
Debug($"External rate limit: Reset in {ex.RetryAfterMilliseconds} ms");
Debug($"External rate limit: Extended to {ex.RetryAfterMilliseconds} ms");
var retryAfter = DateTime.UtcNow.AddMilliseconds(ex.RetryAfterMilliseconds);
await task.ConfigureAwait(false);
int millis = (int)Math.Ceiling((DateTime.UtcNow - retryAfter).TotalMilliseconds);
_resetTask = ResetAfter(millis);
} }
else else
{ {
//A preemptive reset is already queued, set RetryAfter to extend it
_retryAfter = DateTime.UtcNow.AddMilliseconds(ex.RetryAfterMilliseconds);
Debug($"External rate limit: Extended to {ex.RetryAfterMilliseconds} ms");
Debug($"External rate limit: Reset in {ex.RetryAfterMilliseconds} ms");
_resetTask = ResetAfter(ex.RetryAfterMilliseconds);
} }
return; return;
} }
@@ -132,6 +137,25 @@ namespace Discord.Net.Rest
Debug($"Internal rate limit: Reset in {WindowSeconds * 1000} ms"); Debug($"Internal rate limit: Reset in {WindowSeconds * 1000} ms");
} }
} }

//If queue is empty, non-global, and there is no active rate limit, remove this bucket
if (_resetTask == null && _bucketGroup == BucketGroup.Guild)
{
try
{
await _parent.Lock().ConfigureAwait(false);
if (_queue.IsEmpty) //Double check, in case a request was queued before we got both locks
{
Debug($"Destroy");
_parent.DestroyGuildBucket((GuildBucket)_bucketId, _guildId);
_destroyed = true;
}
}
finally
{
_parent.Unlock();
}
}
} }
finally finally
{ {
@@ -155,36 +179,14 @@ namespace Discord.Net.Rest
{ {
await Lock().ConfigureAwait(false); await Lock().ConfigureAwait(false);


//If an extension has been planned, start a new wait task
if (_retryAfter != null)
{
_resetTask = ResetAfter((int)(_retryAfter.Value - DateTime.UtcNow).TotalMilliseconds);
_retryAfter = null;
return;
}

Debug($"Reset"); Debug($"Reset");

//Reset the current window count and set our state back to normal //Reset the current window count and set our state back to normal
WindowCount = 0; WindowCount = 0;
_resetTask = null; _resetTask = null;


//Wait is over, work through the current queue //Wait is over, work through the current queue
await ProcessQueue().ConfigureAwait(false); await ProcessQueue().ConfigureAwait(false);
//If queue is empty and non-global, remove this bucket
if (_bucketGroup == BucketGroup.Guild && _queue.IsEmpty)
{
try
{
await _parent.Lock().ConfigureAwait(false);
if (_queue.IsEmpty) //Double check, in case a request was queued before we got both locks
_parent.DestroyGuildBucket((GuildBucket)_bucketId, _guildId);
}
finally
{
_parent.Unlock();
}
}
} }
finally finally
{ {
@@ -217,7 +219,7 @@ namespace Discord.Net.Rest
name = "Unknown"; name = "Unknown";
break; break;
} }
System.Diagnostics.Debug.WriteLine($"[{name}] {text}");
System.Diagnostics.Debug.WriteLine($"[{name} {_id}] {text}");
} }
} }
} }

Loading…
Cancel
Save