@@ -34,7 +34,6 @@ namespace Discord.Net
private readonly ConcurrentQueue<MessageEdit> _pendingEdits;
private readonly ConcurrentQueue<MessageEdit> _pendingEdits;
private readonly ConcurrentQueue<Message> _pendingDeletes;
private readonly ConcurrentQueue<Message> _pendingDeletes;
private readonly ConcurrentDictionary<int, string> _pendingSendsByNonce;
private readonly ConcurrentDictionary<int, string> _pendingSendsByNonce;
//private readonly ConcurrentQueue<Task> _pendingTasks;
private int _nextWarning;
private int _nextWarning;
private int _count;
private int _count;
@@ -52,7 +51,6 @@ namespace Discord.Net
_pendingEdits = new ConcurrentQueue<MessageEdit>();
_pendingEdits = new ConcurrentQueue<MessageEdit>();
_pendingDeletes = new ConcurrentQueue<Message>();
_pendingDeletes = new ConcurrentQueue<Message>();
_pendingSendsByNonce = new ConcurrentDictionary<int, string>();
_pendingSendsByNonce = new ConcurrentDictionary<int, string>();
//_pendingTasks = new ConcurrentQueue<Task>();
}
}
internal Message QueueSend(Channel channel, string text, bool isTTS)
internal Message QueueSend(Channel channel, string text, bool isTTS)
@@ -101,8 +99,7 @@ namespace Discord.Net
{
{
RunSendQueue(cancelToken),
RunSendQueue(cancelToken),
RunEditQueue(cancelToken),
RunEditQueue(cancelToken),
RunDeleteQueue(cancelToken),
//RunTaskQueue(cancelToken)
RunDeleteQueue(cancelToken)
};
};
}
}
private Task RunSendQueue(CancellationToken cancelToken)
private Task RunSendQueue(CancellationToken cancelToken)
@@ -116,15 +113,32 @@ namespace Discord.Net
Message msg;
Message msg;
while (_pendingSends.TryDequeue(out msg))
while (_pendingSends.TryDequeue(out msg))
{
{
DecrementCount();
string text;
string text;
if (_pendingSendsByNonce.TryRemove(msg.Nonce, out text)) //If it was delete from queue, this will fail
if (_pendingSendsByNonce.TryRemove(msg.Nonce, out text)) //If it was deleted from queue, this will fail
{
{
msg.RawText = text;
msg.Text = msg.Resolve(text);
await Send(msg).ConfigureAwait(false);
try
{
msg.RawText = text;
msg.Text = msg.Resolve(text);
var request = new SendMessageRequest(msg.Channel.Id)
{
Content = msg.RawText,
Nonce = msg.Nonce.ToString(),
IsTTS = msg.IsTTS
};
var response = await _rest.Send(request).ConfigureAwait(false);
msg.State = MessageState.Normal;
msg.Id = response.Id;
msg.Update(response);
}
catch (Exception ex)
{
msg.State = MessageState.Failed;
_logger.Error("Failed to send message", ex);
}
}
}
}
}
await Task.Delay((int)Discord.DiscordConfig.MessageQueueInterval).ConfigureAwait(false);
await Task.Delay((int)Discord.DiscordConfig.MessageQueueInterval).ConfigureAwait(false);
}
}
}
}
@@ -142,11 +156,23 @@ namespace Discord.Net
MessageEdit edit;
MessageEdit edit;
while (_pendingEdits.TryPeek(out edit) && edit.Message.State != MessageState.Queued)
while (_pendingEdits.TryPeek(out edit) && edit.Message.State != MessageState.Queued)
{
{
_pendingEdits.TryDequeue(out edit);
if (edit.Message.State == MessageState.Normal)
await Edit(edit.Message, edit.NewText).ConfigureAwait(false);
if (_pendingEdits.TryDequeue(out edit))
{
DecrementCount();
if (edit.Message.State == MessageState.Normal)
{
try
{
var request = new UpdateMessageRequest(edit.Message.Channel.Id, edit.Message.Id)
{
Content = edit.NewText
};
await _rest.Send(request).ConfigureAwait(false);
}
catch (Exception ex) { _logger.Error("Failed to edit message", ex); }
}
}
}
}
await Task.Delay((int)Discord.DiscordConfig.MessageQueueInterval).ConfigureAwait(false);
await Task.Delay((int)Discord.DiscordConfig.MessageQueueInterval).ConfigureAwait(false);
}
}
}
}
@@ -164,9 +190,20 @@ namespace Discord.Net
Message msg;
Message msg;
while (_pendingDeletes.TryPeek(out msg) && msg.State != MessageState.Queued)
while (_pendingDeletes.TryPeek(out msg) && msg.State != MessageState.Queued)
{
{
_pendingDeletes.TryDequeue(out msg);
if (msg.State == MessageState.Normal)
await Delete(msg).ConfigureAwait(false);
if (_pendingDeletes.TryDequeue(out msg))
{
DecrementCount();
if (msg.State == MessageState.Normal)
{
try
{
var request = new DeleteMessageRequest(msg.Channel.Id, msg.Id);
await _rest.Send(request).ConfigureAwait(false);
}
catch (HttpException ex) when (ex.StatusCode == HttpStatusCode.NotFound) { } //Ignore
catch (Exception ex) { _logger.Error("Failed to delete message", ex); }
}
}
}
}
await Task.Delay((int)Discord.DiscordConfig.MessageQueueInterval).ConfigureAwait(false);
await Task.Delay((int)Discord.DiscordConfig.MessageQueueInterval).ConfigureAwait(false);
@@ -175,86 +212,6 @@ namespace Discord.Net
catch (OperationCanceledException) { }
catch (OperationCanceledException) { }
}));
}));
}
}
/*private Task RunTaskQueue(CancellationToken cancelToken)
{
return Task.Run(async () =>
{
Task task;
while (!cancelToken.IsCancellationRequested)
{
while (_pendingTasks.TryPeek(out task) && task.IsCompleted)
{
_pendingTasks.TryDequeue(out task); //Should never fail
if (task.IsFaulted)
_logger.Warning("Error during Edit/Delete", task.Exception);
}
await Task.Delay((int)Discord.DiscordConfig.MessageQueueInterval).ConfigureAwait(false);
}
//Wait for remaining tasks to complete
while (_pendingTasks.TryDequeue(out task))
{
if (!task.IsCompleted)
await task.ConfigureAwait(false);
}
});
}*/
internal async Task Send(Message msg)
{
try
{
var request = new SendMessageRequest(msg.Channel.Id)
{
Content = msg.RawText,
Nonce = msg.Nonce.ToString(),
IsTTS = msg.IsTTS
};
var response = await _rest.Send(request).ConfigureAwait(false);
msg.State = MessageState.Normal;
msg.Id = response.Id;
msg.Update(response);
}
catch (Exception ex)
{
msg.State = MessageState.Failed;
_logger.Error("Failed to send message", ex);
}
}
internal async Task Edit(Message msg, string text)
{
if (msg.State == MessageState.Normal)
{
try
{
var request = new UpdateMessageRequest(msg.Channel.Id, msg.Id)
{
Content = text
};
await _rest.Send(request).ConfigureAwait(false);
}
catch (Exception ex)
{
_logger.Error("Failed to edit message", ex);
}
}
}
internal async Task Delete(Message msg)
{
if (msg.State == MessageState.Normal)
{
try
{
var request = new DeleteMessageRequest(msg.Channel.Id, msg.Id);
await _rest.Send(request).ConfigureAwait(false);
}
catch (HttpException ex) when (ex.StatusCode == HttpStatusCode.NotFound) { } //Ignore
catch (Exception ex)
{
_logger.Error("Failed to delete message", ex);
}
}
}
private void IncrementCount()
private void IncrementCount()
{
{
@@ -267,6 +224,12 @@ namespace Discord.Net
else if (count < WarningStart) //Reset once the problem is solved
else if (count < WarningStart) //Reset once the problem is solved
_nextWarning = WarningStart;
_nextWarning = WarningStart;
}
}
private void DecrementCount()
{
int count = Interlocked.Decrement(ref _count);
if (count < WarningStart) //Reset once the problem is solved
_nextWarning = WarningStart;
}
/// <summary> Clears all queued message sends/edits/deletes. </summary>
/// <summary> Clears all queued message sends/edits/deletes. </summary>
public void Clear()
public void Clear()
@@ -274,9 +237,12 @@ namespace Discord.Net
Message msg;
Message msg;
MessageEdit edit;
MessageEdit edit;
while (_pendingSends.TryDequeue(out msg)) { }
while (_pendingEdits.TryDequeue(out edit)) { }
while (_pendingDeletes.TryDequeue(out msg)) { }
while (_pendingSends.TryDequeue(out msg))
DecrementCount();
while (_pendingEdits.TryDequeue(out edit))
DecrementCount();
while (_pendingDeletes.TryDequeue(out msg))
DecrementCount();
_pendingSendsByNonce.Clear();
_pendingSendsByNonce.Clear();
}
}