diff --git a/src/Discord.Net/DiscordClient.cs b/src/Discord.Net/DiscordClient.cs
index bdcc8c74a..8ed7022eb 100644
--- a/src/Discord.Net/DiscordClient.cs
+++ b/src/Discord.Net/DiscordClient.cs
@@ -68,8 +68,6 @@ namespace Discord
/// Gets a collection of all servers this client is a member of.
public IEnumerable Servers => _servers.Select(x => x.Value);
- // /// Gets a collection of all channels this client is a member of.
- // public IEnumerable Channels => _channels.Select(x => x.Value);
/// Gets a collection of all private channels this client is a member of.
public IEnumerable PrivateChannels => _privateChannels.Select(x => x.Value);
/// Gets a collection of all voice regions currently offered by Discord.
@@ -198,11 +196,8 @@ namespace Discord
await Login(email, password, token).ConfigureAwait(false);
await GatewaySocket.Connect(ClientAPI, CancelToken).ConfigureAwait(false);
- Task[] tasks = new[]
- {
- CancelToken.Wait(),
- MessageQueue.Run(CancelToken)
- };
+ var tasks = new[] { CancelToken.Wait() }
+ .Concat(MessageQueue.Run(CancelToken));
await _taskManager.Start(tasks, cancelSource).ConfigureAwait(false);
GatewaySocket.WaitForConnection(CancelToken);
diff --git a/src/Discord.Net/MessageQueue.cs b/src/Discord.Net/MessageQueue.cs
index 6860a1b59..7128f504d 100644
--- a/src/Discord.Net/MessageQueue.cs
+++ b/src/Discord.Net/MessageQueue.cs
@@ -3,6 +3,7 @@ using Discord.Logging;
using Discord.Net.Rest;
using System;
using System.Collections.Concurrent;
+using System.Collections.Generic;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
@@ -12,42 +13,16 @@ namespace Discord.Net
/// Manages an outgoing message queue for DiscordClient.
public class MessageQueue
{
- private interface IQueuedAction
+ private struct MessageEdit
{
- Task Do(MessageQueue queue);
- }
-
- private struct SendAction : IQueuedAction
- {
- private readonly Message _msg;
-
- public SendAction(Message msg)
- {
- _msg = msg;
- }
- Task IQueuedAction.Do(MessageQueue queue) => queue.Send(_msg);
- }
- private struct EditAction : IQueuedAction
- {
- private readonly Message _msg;
- private readonly string _text;
-
- public EditAction(Message msg, string text)
- {
- _msg = msg;
- _text = text;
- }
- Task IQueuedAction.Do(MessageQueue queue) => queue.Edit(_msg, _text);
- }
- private struct DeleteAction : IQueuedAction
- {
- private readonly Message _msg;
+ public readonly Message Message;
+ public readonly string NewText;
- public DeleteAction(Message msg)
+ public MessageEdit(Message message, string newText)
{
- _msg = msg;
+ Message = message;
+ NewText = newText;
}
- Task IQueuedAction.Do(MessageQueue queue) => queue.Delete(_msg);
}
private const int WarningStart = 30;
@@ -55,12 +30,16 @@ namespace Discord.Net
private readonly Random _nonceRand;
private readonly RestClient _rest;
private readonly Logger _logger;
- private readonly ConcurrentQueue _pendingActions;
- private readonly ConcurrentDictionary _pendingSends;
+ private readonly ConcurrentQueue _pendingSends;
+ private readonly ConcurrentQueue _pendingEdits;
+ private readonly ConcurrentQueue _pendingDeletes;
+ private readonly ConcurrentDictionary _pendingSendsByNonce;
+ private readonly ConcurrentQueue _pendingTasks;
private int _nextWarning;
+ private int _count;
/// Gets the current number of queued actions.
- public int Count { get; private set; }
+ public int Count => _count;
internal MessageQueue(RestClient rest, Logger logger)
{
@@ -68,8 +47,11 @@ namespace Discord.Net
_logger = logger;
_nonceRand = new Random();
- _pendingActions = new ConcurrentQueue();
- _pendingSends = new ConcurrentDictionary();
+ _pendingSends = new ConcurrentQueue();
+ _pendingEdits = new ConcurrentQueue();
+ _pendingDeletes = new ConcurrentQueue();
+ _pendingSendsByNonce = new ConcurrentDictionary();
+ _pendingTasks = new ConcurrentQueue();
}
internal Message QueueSend(Channel channel, string text, bool isTTS)
@@ -78,28 +60,45 @@ namespace Discord.Net
msg.RawText = text;
msg.Text = msg.Resolve(text);
msg.Nonce = GenerateNonce();
- msg.State = MessageState.Queued;
- _pendingSends.TryAdd(msg.Nonce, msg);
- _pendingActions.Enqueue(new SendAction(msg));
+ if (_pendingSendsByNonce.TryAdd(msg.Nonce, msg))
+ {
+ msg.State = MessageState.Queued;
+ IncrementCount();
+ _pendingSends.Enqueue(msg.Nonce);
+ }
+ else
+ msg.State = MessageState.Failed;
return msg;
}
internal void QueueEdit(Message msg, string text)
{
- _pendingActions.Enqueue(new EditAction(msg, text));
+ IncrementCount();
+ _pendingEdits.Enqueue(new MessageEdit(msg, text));
}
internal void QueueDelete(Message msg)
{
Message ignored;
- if (msg.State == MessageState.Queued && _pendingSends.TryRemove(msg.Nonce, out ignored))
+ if (msg.State == MessageState.Queued && _pendingSendsByNonce.TryRemove(msg.Nonce, out ignored))
{
+ //Successfully stopped the message from being sent in the first place
msg.State = MessageState.Aborted;
return;
}
-
- _pendingActions.Enqueue(new DeleteAction(msg));
+ IncrementCount();
+ _pendingDeletes.Enqueue(msg);
}
- internal Task Run(CancellationToken cancelToken)
+ internal Task[] Run(CancellationToken cancelToken)
+ {
+ return new[]
+ {
+ RunSendQueue(cancelToken),
+ RunEditQueue(cancelToken),
+ RunDeleteQueue(cancelToken),
+ RunTaskQueue(cancelToken)
+ };
+ }
+ private Task RunSendQueue(CancellationToken cancelToken)
{
_nextWarning = WarningStart;
return Task.Run((Func)(async () =>
@@ -108,18 +107,13 @@ namespace Discord.Net
{
while (!cancelToken.IsCancellationRequested)
{
- Count = _pendingActions.Count;
- if (Count >= _nextWarning)
+ int nonce;
+ while (_pendingSends.TryDequeue(out nonce))
{
- _nextWarning *= 2;
- _logger.Warning($"Queue is backed up, currently at {Count} actions.");
+ Message msg;
+ if (_pendingSendsByNonce.TryRemove(nonce, out msg)) //If it was delete from queue, this will fail
+ await Send(msg).ConfigureAwait(false);
}
- else if (Count < WarningStart) //Reset once the problem is solved
- _nextWarning = WarningStart;
-
- IQueuedAction queuedAction;
- while (_pendingActions.TryDequeue(out queuedAction))
- await queuedAction.Do(this).ConfigureAwait(false);
await Task.Delay((int)Discord.DiscordConfig.MessageQueueInterval).ConfigureAwait(false);
}
@@ -127,29 +121,96 @@ namespace Discord.Net
catch (OperationCanceledException) { }
}));
}
+ private Task RunEditQueue(CancellationToken cancelToken)
+ {
+ _nextWarning = WarningStart;
+ return Task.Run((Func)(async () =>
+ {
+ try
+ {
+ while (!cancelToken.IsCancellationRequested)
+ {
+ MessageEdit edit;
+ while (_pendingEdits.TryPeek(out edit) && edit.Message.State != MessageState.Queued)
+ {
+ _pendingEdits.TryDequeue(out edit);
+ if (edit.Message.State == MessageState.Normal)
+ await Edit(edit.Message, edit.NewText);
+ }
- internal async Task Send(Message msg)
+ await Task.Delay((int)Discord.DiscordConfig.MessageQueueInterval).ConfigureAwait(false);
+ }
+ }
+ catch (OperationCanceledException) { }
+ }));
+ }
+ private Task RunDeleteQueue(CancellationToken cancelToken)
{
- if (_pendingSends.TryRemove(msg.Nonce, out msg)) //Remove it from pending
+ _nextWarning = WarningStart;
+ return Task.Run((Func)(async () =>
{
try
{
- var request = new SendMessageRequest(msg.Channel.Id)
+ while (!cancelToken.IsCancellationRequested)
{
- Content = msg.RawText,
- Nonce = msg.Nonce.ToString(),
- IsTTS = msg.IsTTS
- };
- var response = await _rest.Send(request).ConfigureAwait(false);
- msg.Id = response.Id;
- msg.Update(response);
- msg.State = MessageState.Normal;
+ Message msg;
+ while (_pendingDeletes.TryPeek(out msg) && msg.State != MessageState.Queued)
+ {
+ _pendingDeletes.TryDequeue(out msg);
+ if (msg.State == MessageState.Normal)
+ _pendingTasks.Enqueue(Delete(msg));
+ }
+
+ await Task.Delay((int)Discord.DiscordConfig.MessageQueueInterval).ConfigureAwait(false);
+ }
}
- catch (Exception ex)
+ catch (OperationCanceledException) { }
+ }));
+ }
+ private Task RunTaskQueue(CancellationToken cancelToken)
+ {
+ return Task.Run(async () =>
+ {
+ Task task;
+ while (!cancelToken.IsCancellationRequested)
+ {
+ while (_pendingTasks.TryPeek(out task) && task.IsCompleted)
+ {
+ _pendingTasks.TryDequeue(out task); //Should never fail
+ if (task.IsFaulted)
+ _logger.Warning("Error during Edit/Delete", task.Exception);
+ }
+ await Task.Delay((int)Discord.DiscordConfig.MessageQueueInterval).ConfigureAwait(false);
+ }
+
+ //Wait for remaining tasks to complete
+ while (_pendingTasks.TryDequeue(out task))
{
- msg.State = MessageState.Failed;
- _logger.Error("Failed to send message", ex);
+ if (!task.IsCompleted)
+ await task.ConfigureAwait(false);
}
+ });
+ }
+
+ internal async Task Send(Message msg)
+ {
+ try
+ {
+ var request = new SendMessageRequest(msg.Channel.Id)
+ {
+ Content = msg.RawText,
+ Nonce = msg.Nonce.ToString(),
+ IsTTS = msg.IsTTS
+ };
+ var response = await _rest.Send(request).ConfigureAwait(false);
+ msg.State = MessageState.Normal;
+ msg.Id = response.Id;
+ msg.Update(response);
+ }
+ catch (Exception ex)
+ {
+ msg.State = MessageState.Failed;
+ _logger.Error("Failed to send message", ex);
}
}
internal async Task Edit(Message msg, string text)
@@ -187,11 +248,29 @@ namespace Discord.Net
}
}
- /// Clears all queued message sends/edits/deletes
+ private void IncrementCount()
+ {
+ int count = Interlocked.Increment(ref _count);
+ if (count >= _nextWarning)
+ {
+ _nextWarning *= 2;
+ _logger.Warning($"Queue is backed up, currently at {count} actions.");
+ }
+ else if (count < WarningStart) //Reset once the problem is solved
+ _nextWarning = WarningStart;
+ }
+
+ /// Clears all queued message sends/edits/deletes.
public void Clear()
{
- IQueuedAction ignored;
- while (_pendingActions.TryDequeue(out ignored)) { }
+ int nonce;
+ while (_pendingSends.TryDequeue(out nonce)) { }
+
+ MessageEdit edit;
+ while (_pendingEdits.TryDequeue(out edit)) { }
+
+ Message msg;
+ while (_pendingDeletes.TryDequeue(out msg)) { }
}
private int GenerateNonce()