From cf9fcc95219e062c67b9ac45f301f0a6ca3766a4 Mon Sep 17 00:00:00 2001 From: RogueException Date: Wed, 10 Feb 2016 17:19:41 -0400 Subject: [PATCH] Added new experimental MessageQueue --- src/Discord.Net/DiscordClient.cs | 9 +- src/Discord.Net/MessageQueue.cs | 223 +++++++++++++++++++++---------- 2 files changed, 153 insertions(+), 79 deletions(-) diff --git a/src/Discord.Net/DiscordClient.cs b/src/Discord.Net/DiscordClient.cs index bdcc8c74a..8ed7022eb 100644 --- a/src/Discord.Net/DiscordClient.cs +++ b/src/Discord.Net/DiscordClient.cs @@ -68,8 +68,6 @@ namespace Discord /// Gets a collection of all servers this client is a member of. public IEnumerable Servers => _servers.Select(x => x.Value); - // /// Gets a collection of all channels this client is a member of. - // public IEnumerable Channels => _channels.Select(x => x.Value); /// Gets a collection of all private channels this client is a member of. public IEnumerable PrivateChannels => _privateChannels.Select(x => x.Value); /// Gets a collection of all voice regions currently offered by Discord. @@ -198,11 +196,8 @@ namespace Discord await Login(email, password, token).ConfigureAwait(false); await GatewaySocket.Connect(ClientAPI, CancelToken).ConfigureAwait(false); - Task[] tasks = new[] - { - CancelToken.Wait(), - MessageQueue.Run(CancelToken) - }; + var tasks = new[] { CancelToken.Wait() } + .Concat(MessageQueue.Run(CancelToken)); await _taskManager.Start(tasks, cancelSource).ConfigureAwait(false); GatewaySocket.WaitForConnection(CancelToken); diff --git a/src/Discord.Net/MessageQueue.cs b/src/Discord.Net/MessageQueue.cs index 6860a1b59..7128f504d 100644 --- a/src/Discord.Net/MessageQueue.cs +++ b/src/Discord.Net/MessageQueue.cs @@ -3,6 +3,7 @@ using Discord.Logging; using Discord.Net.Rest; using System; using System.Collections.Concurrent; +using System.Collections.Generic; using System.Net; using System.Threading; using System.Threading.Tasks; @@ -12,42 +13,16 @@ namespace Discord.Net /// Manages an outgoing message queue for DiscordClient. public class MessageQueue { - private interface IQueuedAction + private struct MessageEdit { - Task Do(MessageQueue queue); - } - - private struct SendAction : IQueuedAction - { - private readonly Message _msg; - - public SendAction(Message msg) - { - _msg = msg; - } - Task IQueuedAction.Do(MessageQueue queue) => queue.Send(_msg); - } - private struct EditAction : IQueuedAction - { - private readonly Message _msg; - private readonly string _text; - - public EditAction(Message msg, string text) - { - _msg = msg; - _text = text; - } - Task IQueuedAction.Do(MessageQueue queue) => queue.Edit(_msg, _text); - } - private struct DeleteAction : IQueuedAction - { - private readonly Message _msg; + public readonly Message Message; + public readonly string NewText; - public DeleteAction(Message msg) + public MessageEdit(Message message, string newText) { - _msg = msg; + Message = message; + NewText = newText; } - Task IQueuedAction.Do(MessageQueue queue) => queue.Delete(_msg); } private const int WarningStart = 30; @@ -55,12 +30,16 @@ namespace Discord.Net private readonly Random _nonceRand; private readonly RestClient _rest; private readonly Logger _logger; - private readonly ConcurrentQueue _pendingActions; - private readonly ConcurrentDictionary _pendingSends; + private readonly ConcurrentQueue _pendingSends; + private readonly ConcurrentQueue _pendingEdits; + private readonly ConcurrentQueue _pendingDeletes; + private readonly ConcurrentDictionary _pendingSendsByNonce; + private readonly ConcurrentQueue _pendingTasks; private int _nextWarning; + private int _count; /// Gets the current number of queued actions. - public int Count { get; private set; } + public int Count => _count; internal MessageQueue(RestClient rest, Logger logger) { @@ -68,8 +47,11 @@ namespace Discord.Net _logger = logger; _nonceRand = new Random(); - _pendingActions = new ConcurrentQueue(); - _pendingSends = new ConcurrentDictionary(); + _pendingSends = new ConcurrentQueue(); + _pendingEdits = new ConcurrentQueue(); + _pendingDeletes = new ConcurrentQueue(); + _pendingSendsByNonce = new ConcurrentDictionary(); + _pendingTasks = new ConcurrentQueue(); } internal Message QueueSend(Channel channel, string text, bool isTTS) @@ -78,28 +60,45 @@ namespace Discord.Net msg.RawText = text; msg.Text = msg.Resolve(text); msg.Nonce = GenerateNonce(); - msg.State = MessageState.Queued; - _pendingSends.TryAdd(msg.Nonce, msg); - _pendingActions.Enqueue(new SendAction(msg)); + if (_pendingSendsByNonce.TryAdd(msg.Nonce, msg)) + { + msg.State = MessageState.Queued; + IncrementCount(); + _pendingSends.Enqueue(msg.Nonce); + } + else + msg.State = MessageState.Failed; return msg; } internal void QueueEdit(Message msg, string text) { - _pendingActions.Enqueue(new EditAction(msg, text)); + IncrementCount(); + _pendingEdits.Enqueue(new MessageEdit(msg, text)); } internal void QueueDelete(Message msg) { Message ignored; - if (msg.State == MessageState.Queued && _pendingSends.TryRemove(msg.Nonce, out ignored)) + if (msg.State == MessageState.Queued && _pendingSendsByNonce.TryRemove(msg.Nonce, out ignored)) { + //Successfully stopped the message from being sent in the first place msg.State = MessageState.Aborted; return; } - - _pendingActions.Enqueue(new DeleteAction(msg)); + IncrementCount(); + _pendingDeletes.Enqueue(msg); } - internal Task Run(CancellationToken cancelToken) + internal Task[] Run(CancellationToken cancelToken) + { + return new[] + { + RunSendQueue(cancelToken), + RunEditQueue(cancelToken), + RunDeleteQueue(cancelToken), + RunTaskQueue(cancelToken) + }; + } + private Task RunSendQueue(CancellationToken cancelToken) { _nextWarning = WarningStart; return Task.Run((Func)(async () => @@ -108,18 +107,13 @@ namespace Discord.Net { while (!cancelToken.IsCancellationRequested) { - Count = _pendingActions.Count; - if (Count >= _nextWarning) + int nonce; + while (_pendingSends.TryDequeue(out nonce)) { - _nextWarning *= 2; - _logger.Warning($"Queue is backed up, currently at {Count} actions."); + Message msg; + if (_pendingSendsByNonce.TryRemove(nonce, out msg)) //If it was delete from queue, this will fail + await Send(msg).ConfigureAwait(false); } - else if (Count < WarningStart) //Reset once the problem is solved - _nextWarning = WarningStart; - - IQueuedAction queuedAction; - while (_pendingActions.TryDequeue(out queuedAction)) - await queuedAction.Do(this).ConfigureAwait(false); await Task.Delay((int)Discord.DiscordConfig.MessageQueueInterval).ConfigureAwait(false); } @@ -127,29 +121,96 @@ namespace Discord.Net catch (OperationCanceledException) { } })); } + private Task RunEditQueue(CancellationToken cancelToken) + { + _nextWarning = WarningStart; + return Task.Run((Func)(async () => + { + try + { + while (!cancelToken.IsCancellationRequested) + { + MessageEdit edit; + while (_pendingEdits.TryPeek(out edit) && edit.Message.State != MessageState.Queued) + { + _pendingEdits.TryDequeue(out edit); + if (edit.Message.State == MessageState.Normal) + await Edit(edit.Message, edit.NewText); + } - internal async Task Send(Message msg) + await Task.Delay((int)Discord.DiscordConfig.MessageQueueInterval).ConfigureAwait(false); + } + } + catch (OperationCanceledException) { } + })); + } + private Task RunDeleteQueue(CancellationToken cancelToken) { - if (_pendingSends.TryRemove(msg.Nonce, out msg)) //Remove it from pending + _nextWarning = WarningStart; + return Task.Run((Func)(async () => { try { - var request = new SendMessageRequest(msg.Channel.Id) + while (!cancelToken.IsCancellationRequested) { - Content = msg.RawText, - Nonce = msg.Nonce.ToString(), - IsTTS = msg.IsTTS - }; - var response = await _rest.Send(request).ConfigureAwait(false); - msg.Id = response.Id; - msg.Update(response); - msg.State = MessageState.Normal; + Message msg; + while (_pendingDeletes.TryPeek(out msg) && msg.State != MessageState.Queued) + { + _pendingDeletes.TryDequeue(out msg); + if (msg.State == MessageState.Normal) + _pendingTasks.Enqueue(Delete(msg)); + } + + await Task.Delay((int)Discord.DiscordConfig.MessageQueueInterval).ConfigureAwait(false); + } } - catch (Exception ex) + catch (OperationCanceledException) { } + })); + } + private Task RunTaskQueue(CancellationToken cancelToken) + { + return Task.Run(async () => + { + Task task; + while (!cancelToken.IsCancellationRequested) + { + while (_pendingTasks.TryPeek(out task) && task.IsCompleted) + { + _pendingTasks.TryDequeue(out task); //Should never fail + if (task.IsFaulted) + _logger.Warning("Error during Edit/Delete", task.Exception); + } + await Task.Delay((int)Discord.DiscordConfig.MessageQueueInterval).ConfigureAwait(false); + } + + //Wait for remaining tasks to complete + while (_pendingTasks.TryDequeue(out task)) { - msg.State = MessageState.Failed; - _logger.Error("Failed to send message", ex); + if (!task.IsCompleted) + await task.ConfigureAwait(false); } + }); + } + + internal async Task Send(Message msg) + { + try + { + var request = new SendMessageRequest(msg.Channel.Id) + { + Content = msg.RawText, + Nonce = msg.Nonce.ToString(), + IsTTS = msg.IsTTS + }; + var response = await _rest.Send(request).ConfigureAwait(false); + msg.State = MessageState.Normal; + msg.Id = response.Id; + msg.Update(response); + } + catch (Exception ex) + { + msg.State = MessageState.Failed; + _logger.Error("Failed to send message", ex); } } internal async Task Edit(Message msg, string text) @@ -187,11 +248,29 @@ namespace Discord.Net } } - /// Clears all queued message sends/edits/deletes + private void IncrementCount() + { + int count = Interlocked.Increment(ref _count); + if (count >= _nextWarning) + { + _nextWarning *= 2; + _logger.Warning($"Queue is backed up, currently at {count} actions."); + } + else if (count < WarningStart) //Reset once the problem is solved + _nextWarning = WarningStart; + } + + /// Clears all queued message sends/edits/deletes. public void Clear() { - IQueuedAction ignored; - while (_pendingActions.TryDequeue(out ignored)) { } + int nonce; + while (_pendingSends.TryDequeue(out nonce)) { } + + MessageEdit edit; + while (_pendingEdits.TryDequeue(out edit)) { } + + Message msg; + while (_pendingDeletes.TryDequeue(out msg)) { } } private int GenerateNonce()