Browse Source

Added more missing ConfigureAwaits, ignore OperationCanceledException in MessageQueue

tags/docs-0.9
RogueException 9 years ago
parent
commit
780da1ce1e
4 changed files with 39 additions and 267 deletions
  1. +0
    -242
      src/Discord.Net.Audio/AudioClient.cs.old
  2. +1
    -1
      src/Discord.Net.Audio/AudioService.cs
  3. +8
    -8
      src/Discord.Net.Audio/Net/VoiceSocket.cs
  4. +30
    -16
      src/Discord.Net/MessageQueue.cs

+ 0
- 242
src/Discord.Net.Audio/AudioClient.cs.old View File

@@ -1,242 +0,0 @@
using Discord.API.Client.GatewaySocket;
using Discord.Logging;
using Discord.Net.Rest;
using Discord.Net.WebSockets;
using Newtonsoft.Json;
using Nito.AsyncEx;
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

namespace Discord.Audio
{
internal class AudioClient : IAudioClient
{
private class OutStream : Stream
{
public override bool CanRead => false;
public override bool CanSeek => false;
public override bool CanWrite => true;

private readonly AudioClient _client;

internal OutStream(AudioClient client)
{
_client = client;
}

public override long Length { get { throw new InvalidOperationException(); } }
public override long Position
{
get { throw new InvalidOperationException(); }
set { throw new InvalidOperationException(); }
}
public override void Flush() { throw new InvalidOperationException(); }
public override long Seek(long offset, SeekOrigin origin) { throw new InvalidOperationException(); }
public override void SetLength(long value) { throw new InvalidOperationException(); }
public override int Read(byte[] buffer, int offset, int count) { throw new InvalidOperationException(); }
public override void Write(byte[] buffer, int offset, int count)
{
_client.Send(buffer, offset, count);
}
}
private readonly JsonSerializer _serializer;
private readonly bool _ownsGateway;
private TaskManager _taskManager;
private CancellationToken _cancelToken;

internal AudioService Service { get; }
internal Logger Logger { get; }
public int Id { get; }
public GatewaySocket GatewaySocket { get; }
public VoiceSocket VoiceSocket { get; }
public Stream OutputStream { get; }

public ConnectionState State => VoiceSocket.State;
public Server Server => VoiceSocket.Server;
public Channel Channel => VoiceSocket.Channel;

public AudioClient(AudioService service, int clientId, Server server, GatewaySocket gatewaySocket, bool ownsGateway, Logger logger)
{
Service = service;
_serializer = service.Client.Serializer;
Id = clientId;
GatewaySocket = gatewaySocket;
_ownsGateway = ownsGateway;
Logger = logger;
OutputStream = new OutStream(this);
_taskManager = new TaskManager(Cleanup, true);

GatewaySocket.ReceivedDispatch += OnReceivedDispatch;

VoiceSocket = new VoiceSocket(service.Client.Config, service.Config, service.Client.Serializer, logger);
VoiceSocket.Server = server;

/*_voiceSocket.Connected += (s, e) => RaiseVoiceConnected();
_voiceSocket.Disconnected += async (s, e) =>
{
_voiceSocket.CurrentServerId;
if (voiceServerId != null)
_gatewaySocket.SendLeaveVoice(voiceServerId.Value);
await _voiceSocket.Disconnect().ConfigureAwait(false);
RaiseVoiceDisconnected(socket.CurrentServerId.Value, e);
if (e.WasUnexpected)
await socket.Reconnect().ConfigureAwait(false);
};*/

/*_voiceSocket.IsSpeaking += (s, e) =>
{
if (_voiceSocket.State == WebSocketState.Connected)
{
var user = _users[e.UserId, socket.CurrentServerId];
bool value = e.IsSpeaking;
if (user.IsSpeaking != value)
{
user.IsSpeaking = value;
var channel = _channels[_voiceSocket.CurrentChannelId];
RaiseUserIsSpeaking(user, channel, value);
if (Config.TrackActivity)
user.UpdateActivity();
}
}
};*/

/*this.Connected += (s, e) =>
{
_voiceSocket.ParentCancelToken = _cancelToken;
};*/
}

public async Task Join(Channel channel)
{
if (channel == null) throw new ArgumentNullException(nameof(channel));
if (channel.Type != ChannelType.Voice)
throw new ArgumentException("Channel must be a voice channel.", nameof(channel));
if (channel.Server != VoiceSocket.Server)
throw new ArgumentException("This is channel is not part of the current server.", nameof(channel));
if (channel == VoiceSocket.Channel) return;
if (VoiceSocket.Server == null)
throw new InvalidOperationException("This client has been closed.");

SendVoiceUpdate(channel.Server.Id, channel.Id);
await Task.Run(() => VoiceSocket.WaitForConnection(_cancelToken));
}
public async Task Connect(RestClient rest = null)
{
var cancelSource = new CancellationTokenSource();
_cancelToken = cancelSource.Token;

Task[] tasks;
if (rest != null)
tasks = new Task[] { GatewaySocket.Connect(rest, _cancelToken) };
else
tasks = new Task[0];

await _taskManager.Start(tasks, cancelSource);
}

public Task Disconnect() => _taskManager.Stop(true);

private async Task Cleanup()
{
var server = VoiceSocket.Server;
VoiceSocket.Server = null;
VoiceSocket.Channel = null;

await Service.RemoveClient(server, this).ConfigureAwait(false);
SendVoiceUpdate(server.Id, null);

await VoiceSocket.Disconnect().ConfigureAwait(false);
if (_ownsGateway)
await GatewaySocket.Disconnect().ConfigureAwait(false);
}

private async void OnReceivedDispatch(object sender, WebSocketEventEventArgs e)
{
try
{
switch (e.Type)
{
case "VOICE_STATE_UPDATE":
{
var data = e.Payload.ToObject<VoiceStateUpdateEvent>(_serializer);
if (data.GuildId == VoiceSocket.Server?.Id && data.UserId == Service.Client.CurrentUser?.Id)
{
if (data.ChannelId == null)
await Disconnect().ConfigureAwait(false);
else
{
var channel = Service.Client.GetChannel(data.ChannelId.Value);
if (channel != null)
VoiceSocket.Channel = channel;
else
{
Logger.Warning("VOICE_STATE_UPDATE referenced an unknown channel, disconnecting.");
await Disconnect().ConfigureAwait(false);
}
}
}
}
break;
case "VOICE_SERVER_UPDATE":
{
var data = e.Payload.ToObject<VoiceServerUpdateEvent>(_serializer);
if (data.GuildId == VoiceSocket.Server?.Id)
{
var client = Service.Client;
var id = client.CurrentUser?.Id;
if (id != null)
{
var host = "wss://" + e.Payload.Value<string>("endpoint").Split(':')[0];
await VoiceSocket.Connect(host, data.Token, id.Value, GatewaySocket.SessionId, _cancelToken).ConfigureAwait(false);
}
}
}
break;
}
}
catch (Exception ex)
{
Logger.Error($"Error handling {e.Type} event", ex);
}
}

/// <summary> Sends a PCM frame to the voice server. Will block until space frees up in the outgoing buffer. </summary>
/// <param name="data">PCM frame to send. This must be a single or collection of uncompressed 48Kz monochannel 20ms PCM frames. </param>
/// <param name="count">Number of bytes in this frame. </param>
public void Send(byte[] data, int offset, int count)
{
if (data == null) throw new ArgumentException(nameof(data));
if (count < 0) throw new ArgumentOutOfRangeException(nameof(count));
if (offset < 0) throw new ArgumentOutOfRangeException(nameof(count));
if (VoiceSocket.Server == null) return; //Has been closed
if (count == 0) return;

VoiceSocket.SendPCMFrames(data, offset, count);
}

/// <summary> Clears the PCM buffer. </summary>
public void Clear()
{
if (VoiceSocket.Server == null) return; //Has been closed
VoiceSocket.ClearPCMFrames();
}

/// <summary> Returns a task that completes once the voice output buffer is empty. </summary>
public void Wait()
{
if (VoiceSocket.Server == null) return; //Has been closed
VoiceSocket.WaitForQueue();
}

public void SendVoiceUpdate(ulong? serverId, ulong? channelId)
{
GatewaySocket.SendUpdateVoice(serverId, channelId,
(Service.Config.Mode | AudioMode.Outgoing) == 0,
(Service.Config.Mode | AudioMode.Incoming) == 0);
}
}
}

+ 1
- 1
src/Discord.Net.Audio/AudioService.cs View File

@@ -141,7 +141,7 @@ namespace Discord.Audio
{ {
if (_defaultClient.Server != server) if (_defaultClient.Server != server)
{ {
await _defaultClient.Disconnect();
await _defaultClient.Disconnect().ConfigureAwait(false);
_defaultClient.VoiceSocket.Server = server; _defaultClient.VoiceSocket.Server = server;
await _defaultClient.Connect().ConfigureAwait(false); await _defaultClient.Connect().ConfigureAwait(false);
} }


+ 8
- 8
src/Discord.Net.Audio/Net/VoiceSocket.cs View File

@@ -129,11 +129,11 @@ namespace Discord.Net.WebSockets
protected override async Task Cleanup() protected override async Task Cleanup()
{ {
var sendThread = _sendTask; var sendThread = _sendTask;
if (sendThread != null) await sendThread;
if (sendThread != null) await sendThread.ConfigureAwait(false);
_sendTask = null; _sendTask = null;


var receiveThread = _receiveTask; var receiveThread = _receiveTask;
if (receiveThread != null) await receiveThread;
if (receiveThread != null) await receiveThread.ConfigureAwait(false);
_receiveTask = null; _receiveTask = null;


OpusDecoder decoder; OpusDecoder decoder;
@@ -146,7 +146,7 @@ namespace Discord.Net.WebSockets
ClearPCMFrames(); ClearPCMFrames();
_udp = null; _udp = null;


await base.Cleanup();
await base.Cleanup().ConfigureAwait(false);
} }


private async Task ReceiveVoiceAsync(CancellationToken cancelToken) private async Task ReceiveVoiceAsync(CancellationToken cancelToken)
@@ -166,7 +166,7 @@ namespace Discord.Net.WebSockets


while (!cancelToken.IsCancellationRequested) while (!cancelToken.IsCancellationRequested)
{ {
await Task.Delay(1);
await Task.Delay(1).ConfigureAwait(false);
if (_udp.Available > 0) if (_udp.Available > 0)
{ {
#if !DOTNET5_4 #if !DOTNET5_4
@@ -256,7 +256,7 @@ namespace Discord.Net.WebSockets
try try
{ {
while (!cancelToken.IsCancellationRequested && State != ConnectionState.Connected) while (!cancelToken.IsCancellationRequested && State != ConnectionState.Connected)
await Task.Delay(1);
await Task.Delay(1).ConfigureAwait(false);


if (cancelToken.IsCancellationRequested) if (cancelToken.IsCancellationRequested)
return; return;
@@ -372,10 +372,10 @@ namespace Discord.Net.WebSockets
{ {
int time = (int)Math.Floor(ticksToNextFrame / ticksPerMillisecond); int time = (int)Math.Floor(ticksToNextFrame / ticksPerMillisecond);
if (time > 0) if (time > 0)
await Task.Delay(time);
await Task.Delay(time).ConfigureAwait(false);
} }
else else
await Task.Delay(1); //Give as much time to the encrypter as possible
await Task.Delay(1).ConfigureAwait(false); //Give as much time to the encrypter as possible
} }
} }
} }
@@ -446,7 +446,7 @@ namespace Discord.Net.WebSockets
var payload = (msg.Payload as JToken).ToObject<SessionDescriptionEvent>(_serializer); var payload = (msg.Payload as JToken).ToObject<SessionDescriptionEvent>(_serializer);
_secretKey = payload.SecretKey; _secretKey = payload.SecretKey;
SendSetSpeaking(true); SendSetSpeaking(true);
await EndConnect();
await EndConnect().ConfigureAwait(false);
} }
break; break;
case OpCodes.Speaking: case OpCodes.Speaking:


+ 30
- 16
src/Discord.Net/MessageQueue.cs View File

@@ -104,23 +104,27 @@ namespace Discord.Net
_nextWarning = WarningStart; _nextWarning = WarningStart;
return Task.Run(async () => return Task.Run(async () =>
{ {
while (!cancelToken.IsCancellationRequested)
try
{ {
Count = _pendingActions.Count;
if (Count >= _nextWarning)
while (!cancelToken.IsCancellationRequested)
{ {
_nextWarning *= 2;
_logger.Warning($"Queue is backed up, currently at {Count} actions.");
Count = _pendingActions.Count;
if (Count >= _nextWarning)
{
_nextWarning *= 2;
_logger.Warning($"Queue is backed up, currently at {Count} actions.");
}
else if (Count < WarningStart) //Reset once the problem is solved
_nextWarning = WarningStart;

IQueuedAction queuedAction;
while (_pendingActions.TryDequeue(out queuedAction))
await queuedAction.Do(this).ConfigureAwait(false);

await Task.Delay(interval).ConfigureAwait(false);
} }
else if (Count < WarningStart) //Reset once the problem is solved
_nextWarning = WarningStart;

IQueuedAction queuedAction;
while (_pendingActions.TryDequeue(out queuedAction))
await queuedAction.Do(this).ConfigureAwait(false);

await Task.Delay(interval).ConfigureAwait(false);
} }
catch (OperationCanceledException) { }
}); });
} }


@@ -141,7 +145,11 @@ namespace Discord.Net
msg.Update(response); msg.Update(response);
msg.State = MessageState.Normal; msg.State = MessageState.Normal;
} }
catch (Exception ex) { msg.State = MessageState.Failed; _logger.Error("Failed to send message", ex); }
catch (Exception ex)
{
msg.State = MessageState.Failed;
_logger.Error("Failed to send message", ex);
}
} }
} }
internal async Task Edit(Message msg, string text) internal async Task Edit(Message msg, string text)
@@ -156,7 +164,10 @@ namespace Discord.Net
}; };
await _rest.Send(request).ConfigureAwait(false); await _rest.Send(request).ConfigureAwait(false);
} }
catch (Exception ex) { _logger.Error("Failed to edit message", ex); }
catch (Exception ex)
{
_logger.Error("Failed to edit message", ex);
}
} }
} }
internal async Task Delete(Message msg) internal async Task Delete(Message msg)
@@ -169,7 +180,10 @@ namespace Discord.Net
await _rest.Send(request).ConfigureAwait(false); await _rest.Send(request).ConfigureAwait(false);
} }
catch (HttpException ex) when (ex.StatusCode == HttpStatusCode.NotFound) { } //Ignore catch (HttpException ex) when (ex.StatusCode == HttpStatusCode.NotFound) { } //Ignore
catch (Exception ex) { _logger.Error("Failed to delete message", ex); }
catch (Exception ex)
{
_logger.Error("Failed to delete message", ex);
}
} }
} }




Loading…
Cancel
Save