Browse Source

Fixing merge, ignore limit for heartbeat, and dispose

pull/1537/head
Paulo 4 years ago
parent
commit
5b65b99eb1
7 changed files with 60 additions and 40 deletions
  1. +2
    -2
      src/Discord.Net.Rest/Net/Queue/GatewayBucket.cs
  2. +8
    -0
      src/Discord.Net.Rest/Net/Queue/RequestQueue.cs
  3. +24
    -1
      src/Discord.Net.Rest/Net/Queue/RequestQueueBucket.cs
  4. +3
    -1
      src/Discord.Net.Rest/Net/Queue/Requests/WebSocketRequest.cs
  5. +2
    -5
      src/Discord.Net.WebSocket/DiscordShardedClient.cs
  6. +3
    -1
      src/Discord.Net.WebSocket/DiscordSocketApiClient.cs
  7. +18
    -30
      src/Discord.Net.WebSocket/DiscordSocketClient.cs

+ 2
- 2
src/Discord.Net.Rest/Net/Queue/GatewayBucket.cs View File

@@ -1,4 +1,3 @@
using Discord.Rest;
using System.Collections.Immutable; using System.Collections.Immutable;


namespace Discord.Net.Queue namespace Discord.Net.Queue
@@ -18,7 +17,8 @@ namespace Discord.Net.Queue
{ {
var buckets = new[] var buckets = new[]
{ {
new GatewayBucket(GatewayBucketType.Unbucketed, BucketId.Create(null, "<gateway-unbucketed>", null), 120, 60),
// Limit is 120/60s, but 3 will be reserved for heartbeats (2 for possible heartbeats in the same timeframe and a possible failure)
new GatewayBucket(GatewayBucketType.Unbucketed, BucketId.Create(null, "<gateway-unbucketed>", null), 117, 60),
new GatewayBucket(GatewayBucketType.Identify, BucketId.Create(null, "<gateway-identify>", null), 1, 5), new GatewayBucket(GatewayBucketType.Identify, BucketId.Create(null, "<gateway-identify>", null), 1, 5),
new GatewayBucket(GatewayBucketType.PresenceUpdate, BucketId.Create(null, "<gateway-presenceupdate>", null), 5, 60), new GatewayBucket(GatewayBucketType.PresenceUpdate, BucketId.Create(null, "<gateway-presenceupdate>", null), 5, 60),
}; };


+ 8
- 0
src/Discord.Net.Rest/Net/Queue/RequestQueue.cs View File

@@ -206,6 +206,12 @@ namespace Discord.Net.Queue
return (null, null); return (null, null);
} }


public void ClearGatewayBuckets()
{
foreach (var gwBucket in (GatewayBucketType[])Enum.GetValues(typeof(GatewayBucketType)))
_buckets.TryRemove(GatewayBucket.Get(gwBucket).Id, out _);
}

private async Task RunCleanup() private async Task RunCleanup()
{ {
try try
@@ -236,6 +242,8 @@ namespace Discord.Net.Queue
_tokenLock?.Dispose(); _tokenLock?.Dispose();
_clearToken?.Dispose(); _clearToken?.Dispose();
_requestCancelTokenSource?.Dispose(); _requestCancelTokenSource?.Dispose();
_masterIdentifySemaphore?.Dispose();
_identifySemaphore?.Dispose();
} }
} }
} }

+ 24
- 1
src/Discord.Net.Rest/Net/Queue/RequestQueueBucket.cs View File

@@ -248,8 +248,31 @@ namespace Discord.Net.Queue
{ {
if (!isRateLimited) if (!isRateLimited)
{ {
bool ignoreRatelimit = false;
isRateLimited = true; isRateLimited = true;
await _queue.RaiseRateLimitTriggered(Id, null, $"{request.Method} {request.Endpoint}").ConfigureAwait(false);
switch (request)
{
case RestRequest restRequest:
await _queue.RaiseRateLimitTriggered(Id, null, $"{restRequest.Method} {restRequest.Endpoint}").ConfigureAwait(false);
break;
case WebSocketRequest webSocketRequest:
if (webSocketRequest.IgnoreLimit)
{
ignoreRatelimit = true;
break;
}
await _queue.RaiseRateLimitTriggered(Id, null, Id.Endpoint).ConfigureAwait(false);
break;
default:
throw new InvalidOperationException("Unknown request type");
}
if (ignoreRatelimit)
{
#if DEBUG_LIMITS
Debug.WriteLine($"[{id}] Ignoring ratelimit");
#endif
break;
}
} }


ThrowRetryLimit(request); ThrowRetryLimit(request);


+ 3
- 1
src/Discord.Net.Rest/Net/Queue/Requests/WebSocketRequest.cs View File

@@ -11,18 +11,20 @@ namespace Discord.Net.Queue
public IWebSocketClient Client { get; } public IWebSocketClient Client { get; }
public byte[] Data { get; } public byte[] Data { get; }
public bool IsText { get; } public bool IsText { get; }
public bool IgnoreLimit { get; }
public DateTimeOffset? TimeoutAt { get; } public DateTimeOffset? TimeoutAt { get; }
public TaskCompletionSource<Stream> Promise { get; } public TaskCompletionSource<Stream> Promise { get; }
public RequestOptions Options { get; } public RequestOptions Options { get; }
public CancellationToken CancelToken { get; internal set; } public CancellationToken CancelToken { get; internal set; }


public WebSocketRequest(IWebSocketClient client, byte[] data, bool isText, RequestOptions options)
public WebSocketRequest(IWebSocketClient client, byte[] data, bool isText, bool ignoreLimit, RequestOptions options)
{ {
Preconditions.NotNull(options, nameof(options)); Preconditions.NotNull(options, nameof(options));


Client = client; Client = client;
Data = data; Data = data;
IsText = isText; IsText = isText;
IgnoreLimit = ignoreLimit;
Options = options; Options = options;
TimeoutAt = options.Timeout.HasValue ? DateTimeOffset.UtcNow.AddMilliseconds(options.Timeout.Value) : (DateTimeOffset?)null; TimeoutAt = options.Timeout.HasValue ? DateTimeOffset.UtcNow.AddMilliseconds(options.Timeout.Value) : (DateTimeOffset?)null;
Promise = new TaskCompletionSource<Stream>(); Promise = new TaskCompletionSource<Stream>();


+ 2
- 5
src/Discord.Net.WebSocket/DiscordShardedClient.cs View File

@@ -12,7 +12,6 @@ namespace Discord.WebSocket
public partial class DiscordShardedClient : BaseSocketClient, IDiscordClient public partial class DiscordShardedClient : BaseSocketClient, IDiscordClient
{ {
private readonly DiscordSocketConfig _baseConfig; private readonly DiscordSocketConfig _baseConfig;
private readonly SemaphoreSlim _connectionGroupLock;
private readonly Dictionary<int, int> _shardIdsToIndex; private readonly Dictionary<int, int> _shardIdsToIndex;
private readonly bool _automaticShards; private readonly bool _automaticShards;
private int[] _shardIds; private int[] _shardIds;
@@ -65,7 +64,6 @@ namespace Discord.WebSocket
_shardIdsToIndex = new Dictionary<int, int>(); _shardIdsToIndex = new Dictionary<int, int>();
config.DisplayInitialLog = false; config.DisplayInitialLog = false;
_baseConfig = config; _baseConfig = config;
_connectionGroupLock = new SemaphoreSlim(1, 1);


if (config.TotalShards == null) if (config.TotalShards == null)
_automaticShards = true; _automaticShards = true;
@@ -88,7 +86,7 @@ namespace Discord.WebSocket
_shardIdsToIndex.Add(_shardIds[i], i); _shardIdsToIndex.Add(_shardIds[i], i);
var newConfig = config.Clone(); var newConfig = config.Clone();
newConfig.ShardId = _shardIds[i]; newConfig.ShardId = _shardIds[i];
_shards[i] = new DiscordSocketClient(newConfig, _connectionGroupLock, i != 0 ? _shards[0] : null, masterIdentifySemaphore, config.IdentifyMaxConcurrency > 1 ? null : identifySemaphores[i / config.IdentifyMaxConcurrency], config.IdentifyMaxConcurrency);
_shards[i] = new DiscordSocketClient(newConfig, i != 0 ? _shards[0] : null, masterIdentifySemaphore, config.IdentifyMaxConcurrency > 1 ? null : identifySemaphores[i / config.IdentifyMaxConcurrency], config.IdentifyMaxConcurrency);
RegisterEvents(_shards[i], i == 0); RegisterEvents(_shards[i], i == 0);
} }
} }
@@ -122,7 +120,7 @@ namespace Discord.WebSocket
var newConfig = _baseConfig.Clone(); var newConfig = _baseConfig.Clone();
newConfig.ShardId = _shardIds[i]; newConfig.ShardId = _shardIds[i];
newConfig.TotalShards = _totalShards; newConfig.TotalShards = _totalShards;
_shards[i] = new DiscordSocketClient(newConfig, _connectionGroupLock, i != 0 ? _shards[0] : null, masterIdentifySemaphore, maxConcurrency > 1 ? null : identifySemaphores[i / maxConcurrency], maxConcurrency);
_shards[i] = new DiscordSocketClient(newConfig, i != 0 ? _shards[0] : null, masterIdentifySemaphore, maxConcurrency > 1 ? null : identifySemaphores[i / maxConcurrency], maxConcurrency);
RegisterEvents(_shards[i], i == 0); RegisterEvents(_shards[i], i == 0);
} }
} }
@@ -418,7 +416,6 @@ namespace Discord.WebSocket
foreach (var client in _shards) foreach (var client in _shards)
client?.Dispose(); client?.Dispose();
} }
_connectionGroupLock?.Dispose();
} }


_isDisposed = true; _isDisposed = true;


+ 3
- 1
src/Discord.Net.WebSocket/DiscordSocketApiClient.cs View File

@@ -133,6 +133,8 @@ namespace Discord.API
if (WebSocketClient == null) if (WebSocketClient == null)
throw new NotSupportedException("This client is not configured with WebSocket support."); throw new NotSupportedException("This client is not configured with WebSocket support.");


RequestQueue.ClearGatewayBuckets();

//Re-create streams to reset the zlib state //Re-create streams to reset the zlib state
_compressed?.Dispose(); _compressed?.Dispose();
_decompressor?.Dispose(); _decompressor?.Dispose();
@@ -210,7 +212,7 @@ namespace Discord.API
options.IsGatewayBucket = true; options.IsGatewayBucket = true;
if (options.BucketId == null) if (options.BucketId == null)
options.BucketId = GatewayBucket.Get(GatewayBucketType.Unbucketed).Id; options.BucketId = GatewayBucket.Get(GatewayBucketType.Unbucketed).Id;
await RequestQueue.SendAsync(new WebSocketRequest(WebSocketClient, bytes, true, options)).ConfigureAwait(false);
await RequestQueue.SendAsync(new WebSocketRequest(WebSocketClient, bytes, true, opCode == GatewayOpCode.Heartbeat, options)).ConfigureAwait(false);
await _sentGatewayMessageEvent.InvokeAsync(opCode).ConfigureAwait(false); await _sentGatewayMessageEvent.InvokeAsync(opCode).ConfigureAwait(false);
} }




+ 18
- 30
src/Discord.Net.WebSocket/DiscordSocketClient.cs View File

@@ -26,7 +26,6 @@ namespace Discord.WebSocket
{ {
private readonly ConcurrentQueue<ulong> _largeGuilds; private readonly ConcurrentQueue<ulong> _largeGuilds;
private readonly JsonSerializer _serializer; private readonly JsonSerializer _serializer;
private readonly SemaphoreSlim _connectionGroupLock;
private readonly DiscordSocketClient _parentClient; private readonly DiscordSocketClient _parentClient;
private readonly ConcurrentQueue<long> _heartbeatTimes; private readonly ConcurrentQueue<long> _heartbeatTimes;
private readonly ConnectionManager _connection; private readonly ConnectionManager _connection;
@@ -119,10 +118,10 @@ namespace Discord.WebSocket
/// </summary> /// </summary>
/// <param name="config">The configuration to be used with the client.</param> /// <param name="config">The configuration to be used with the client.</param>
#pragma warning disable IDISP004 #pragma warning disable IDISP004
public DiscordSocketClient(DiscordSocketConfig config) : this(config, CreateApiClient(config, new SemaphoreSlim(1, 1), null, 1), null, null) { }
internal DiscordSocketClient(DiscordSocketConfig config, SemaphoreSlim groupLock, DiscordSocketClient parentClient, SemaphoreSlim identifyMasterSemaphore, SemaphoreSlim identifySemaphore, int identifyMaxConcurrency) : this(config, CreateApiClient(config, identifyMasterSemaphore, identifySemaphore, identifyMaxConcurrency), groupLock, parentClient) { }
public DiscordSocketClient(DiscordSocketConfig config) : this(config, CreateApiClient(config, new SemaphoreSlim(1, 1), null, 1), null) { }
internal DiscordSocketClient(DiscordSocketConfig config, DiscordSocketClient parentClient, SemaphoreSlim identifyMasterSemaphore, SemaphoreSlim identifySemaphore, int identifyMaxConcurrency) : this(config, CreateApiClient(config, identifyMasterSemaphore, identifySemaphore, identifyMaxConcurrency), parentClient) { }
#pragma warning restore IDISP004 #pragma warning restore IDISP004
private DiscordSocketClient(DiscordSocketConfig config, API.DiscordSocketApiClient client, SemaphoreSlim groupLock, DiscordSocketClient parentClient)
private DiscordSocketClient(DiscordSocketConfig config, API.DiscordSocketApiClient client, DiscordSocketClient parentClient)
: base(config, client) : base(config, client)
{ {
ShardId = config.ShardId ?? 0; ShardId = config.ShardId ?? 0;
@@ -148,7 +147,6 @@ namespace Discord.WebSocket
_connection.Disconnected += (ex, recon) => TimedInvokeAsync(_disconnectedEvent, nameof(Disconnected), ex); _connection.Disconnected += (ex, recon) => TimedInvokeAsync(_disconnectedEvent, nameof(Disconnected), ex);


_nextAudioId = 1; _nextAudioId = 1;
_connectionGroupLock = groupLock;
_parentClient = parentClient; _parentClient = parentClient;


_serializer = new JsonSerializer { ContractResolver = new DiscordContractResolver() }; _serializer = new JsonSerializer { ContractResolver = new DiscordContractResolver() };
@@ -230,35 +228,25 @@ namespace Discord.WebSocket


private async Task OnConnectingAsync() private async Task OnConnectingAsync()
{ {
if (_connectionGroupLock != null)
await _connectionGroupLock.WaitAsync(_connection.CancelToken).ConfigureAwait(false);
try
{
await _gatewayLogger.DebugAsync("Connecting ApiClient").ConfigureAwait(false);
await ApiClient.ConnectAsync().ConfigureAwait(false);

if (_sessionId != null)
{
await _gatewayLogger.DebugAsync("Resuming").ConfigureAwait(false);
await ApiClient.SendResumeAsync(_sessionId, _lastSeq).ConfigureAwait(false);
}
else
{
await _gatewayLogger.DebugAsync("Identifying").ConfigureAwait(false);
await ApiClient.SendIdentifyAsync(shardID: ShardId, totalShards: TotalShards, guildSubscriptions: _guildSubscriptions, gatewayIntents: _gatewayIntents).ConfigureAwait(false);
}

//Wait for READY
await _connection.WaitAsync().ConfigureAwait(false);
await _gatewayLogger.DebugAsync("Connecting ApiClient").ConfigureAwait(false);
await ApiClient.ConnectAsync().ConfigureAwait(false);


await _gatewayLogger.DebugAsync("Sending Status").ConfigureAwait(false);
await SendStatusAsync().ConfigureAwait(false);
if (_sessionId != null)
{
await _gatewayLogger.DebugAsync("Resuming").ConfigureAwait(false);
await ApiClient.SendResumeAsync(_sessionId, _lastSeq).ConfigureAwait(false);
} }
finally
else
{ {
if (_connectionGroupLock != null)
_connectionGroupLock.Release();
await _gatewayLogger.DebugAsync("Identifying").ConfigureAwait(false);
await ApiClient.SendIdentifyAsync(shardID: ShardId, totalShards: TotalShards, guildSubscriptions: _guildSubscriptions, gatewayIntents: _gatewayIntents).ConfigureAwait(false);
} }

//Wait for READY
await _connection.WaitAsync().ConfigureAwait(false);

await _gatewayLogger.DebugAsync("Sending Status").ConfigureAwait(false);
await SendStatusAsync().ConfigureAwait(false);
} }
private async Task OnDisconnectingAsync(Exception ex) private async Task OnDisconnectingAsync(Exception ex)
{ {


Loading…
Cancel
Save