@@ -11,6 +11,7 @@ using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
namespace Discord.Audio
{
@@ -34,10 +35,11 @@ namespace Discord.Audio
private readonly ConnectionManager _connection;
private readonly SemaphoreSlim _stateLock;
private readonly ConcurrentQueue<long> _heartbeatTimes;
private readonly ConcurrentQueue<KeyValuePair<ulong, int>> _keepaliveTimes;
private readonly ConcurrentDictionary<uint, ulong> _ssrcMap;
private readonly ConcurrentDictionary<ulong, StreamPair> _streams;
private Task _heartbeatTask;
private Task _heartbeatTask, _keepaliveTask ;
private long _lastMessageTime;
private string _url, _sessionId, _token;
private ulong _userId;
@@ -46,6 +48,7 @@ namespace Discord.Audio
public SocketGuild Guild { get; }
public DiscordVoiceAPIClient ApiClient { get; private set; }
public int Latency { get; private set; }
public int UdpLatency { get; private set; }
public ulong ChannelId { get; internal set; }
internal byte[] SecretKey { get; private set; }
@@ -72,6 +75,7 @@ namespace Discord.Audio
_connection.Connected += () => _connectedEvent.InvokeAsync();
_connection.Disconnected += (ex, recon) => _disconnectedEvent.InvokeAsync(ex);
_heartbeatTimes = new ConcurrentQueue<long>();
_keepaliveTimes = new ConcurrentQueue<KeyValuePair<ulong, int>>();
_ssrcMap = new ConcurrentDictionary<uint, ulong>();
_streams = new ConcurrentDictionary<ulong, StreamPair>();
@@ -83,6 +87,7 @@ namespace Discord.Audio
};
LatencyUpdated += async (old, val) => await _audioLogger.DebugAsync($"Latency = {val} ms").ConfigureAwait(false);
UdpLatencyUpdated += async (old, val) => await _audioLogger.DebugAsync($"UDP Latency = {val} ms").ConfigureAwait(false);
}
internal async Task StartAsync(string url, ulong userId, string sessionId, string token)
@@ -119,6 +124,10 @@ namespace Discord.Audio
if (heartbeatTask != null)
await heartbeatTask.ConfigureAwait(false);
_heartbeatTask = null;
var keepaliveTask = _keepaliveTask;
if (keepaliveTask != null)
await keepaliveTask.ConfigureAwait(false);
_keepaliveTask = null;
long time;
while (_heartbeatTimes.TryDequeue(out time)) { }
@@ -242,6 +251,7 @@ namespace Discord.Audio
SecretKey = data.SecretKey;
await ApiClient.SendSetSpeaking(false).ConfigureAwait(false);
_keepaliveTask = RunKeepaliveAsync(5000, _connection.CancelToken);
var _ = _connection.CompleteAsync();
}
@@ -284,60 +294,94 @@ namespace Discord.Audio
}
private async Task ProcessPacketAsync(byte[] packet)
{
if (_connection.State == ConnectionState.Connecting)
try
{
if (packet.Length != 70)
{
await _audioLogger.DebugAsync($"Malformed Packet").ConfigureAwait(false);
return;
}
string ip;
int port;
try
if (_connection.State == ConnectionState.Connecting)
{
ip = Encoding.UTF8.GetString(packet, 4, 70 - 6).TrimEnd('\0');
port = (packet[69] << 8) | packet[68];
if (packet.Length != 70)
{
await _audioLogger.DebugAsync($"Malformed Packet").ConfigureAwait(false);
return;
}
string ip;
int port;
try
{
ip = Encoding.UTF8.GetString(packet, 4, 70 - 6).TrimEnd('\0');
port = (packet[69] << 8) | packet[68];
}
catch (Exception ex)
{
await _audioLogger.DebugAsync($"Malformed Packet", ex).ConfigureAwait(false);
return;
}
await _audioLogger.DebugAsync("Received Discovery").ConfigureAwait(false);
await ApiClient.SendSelectProtocol(ip, port).ConfigureAwait(false);
}
catch (Exception ex)
else if (_connection.State == ConnectionState.Connected )
{
await _audioLogger.DebugAsync($"Malformed Packet", ex).ConfigureAwait(false);
return;
if (packet.Length == 8)
{
await _audioLogger.DebugAsync("Received Keepalive").ConfigureAwait(false);
ulong value =
((ulong)packet[0] >> 0) |
((ulong)packet[1] >> 8) |
((ulong)packet[2] >> 16) |
((ulong)packet[3] >> 24) |
((ulong)packet[4] >> 32) |
((ulong)packet[5] >> 40) |
((ulong)packet[6] >> 48) |
((ulong)packet[7] >> 56);
while (_keepaliveTimes.TryDequeue(out var pair))
{
if (pair.Key == value)
{
int latency = (int)(Environment.TickCount - pair.Value);
int before = UdpLatency;
UdpLatency = latency;
await _udpLatencyUpdatedEvent.InvokeAsync(before, latency).ConfigureAwait(false);
break;
}
}
}
else
{
if (!RTPReadStream.TryReadSsrc(packet, 0, out var ssrc))
{
await _audioLogger.DebugAsync($"Malformed Frame").ConfigureAwait(false);
return;
}
if (!_ssrcMap.TryGetValue(ssrc, out var userId))
{
await _audioLogger.DebugAsync($"Unknown SSRC {ssrc}").ConfigureAwait(false);
return;
}
if (!_streams.TryGetValue(userId, out var pair))
{
await _audioLogger.DebugAsync($"Unknown User {userId}").ConfigureAwait(false);
return;
}
try
{
await pair.Writer.WriteAsync(packet, 0, packet.Length).ConfigureAwait(false);
}
catch (Exception ex)
{
await _audioLogger.DebugAsync($"Malformed Frame", ex).ConfigureAwait(false);
return;
}
//await _audioLogger.DebugAsync($"Received {packet.Length} bytes from user {userId}").ConfigureAwait(false);
}
}
await _audioLogger.DebugAsync("Received Discovery").ConfigureAwait(false);
await ApiClient.SendSelectProtocol(ip, port).ConfigureAwait(false);
}
else if (_connection.State == ConnectionState.Connected)
catch (Exception ex)
{
uint ssrc;
ulong userId;
StreamPair pair;
if (!RTPReadStream.TryReadSsrc(packet, 0, out ssrc))
{
await _audioLogger.DebugAsync($"Malformed Frame").ConfigureAwait(false);
return;
}
if (!_ssrcMap.TryGetValue(ssrc, out userId))
{
await _audioLogger.DebugAsync($"Unknown SSRC {ssrc}").ConfigureAwait(false);
return;
}
if (!_streams.TryGetValue(userId, out pair))
{
await _audioLogger.DebugAsync($"Unknown User {userId}").ConfigureAwait(false);
return;
}
try
{
await pair.Writer.WriteAsync(packet, 0, packet.Length).ConfigureAwait(false);
}
catch (Exception ex)
{
await _audioLogger.DebugAsync($"Malformed Frame", ex).ConfigureAwait(false);
return;
}
//await _audioLogger.DebugAsync($"Received {packet.Length} bytes from user {userId}").ConfigureAwait(false);
await _audioLogger.WarningAsync($"Failed to process UDP packet", ex).ConfigureAwait(false);
return;
}
}
@@ -366,7 +410,7 @@ namespace Discord.Audio
}
catch (Exception ex)
{
await _audioLogger.WarningAsync("Heartbeat Errored ", ex).ConfigureAwait(false);
await _audioLogger.WarningAsync("Failed to send heartbeat ", ex).ConfigureAwait(false);
}
await Task.Delay(intervalMillis, cancelToken).ConfigureAwait(false);
@@ -382,6 +426,40 @@ namespace Discord.Audio
await _audioLogger.ErrorAsync("Heartbeat Errored", ex).ConfigureAwait(false);
}
}
private async Task RunKeepaliveAsync(int intervalMillis, CancellationToken cancelToken)
{
var packet = new byte[8];
try
{
await _audioLogger.DebugAsync("Keepalive Started").ConfigureAwait(false);
while (!cancelToken.IsCancellationRequested)
{
var now = Environment.TickCount;
try
{
ulong value = await ApiClient.SendKeepaliveAsync().ConfigureAwait(false);
if (_keepaliveTimes.Count < 12) //No reply for 60 Seconds
_keepaliveTimes.Enqueue(new KeyValuePair<ulong, int>(value, now));
}
catch (Exception ex)
{
await _audioLogger.WarningAsync("Failed to send keepalive", ex).ConfigureAwait(false);
}
await Task.Delay(intervalMillis, cancelToken).ConfigureAwait(false);
}
await _audioLogger.DebugAsync("Keepalive Stopped").ConfigureAwait(false);
}
catch (OperationCanceledException)
{
await _audioLogger.DebugAsync("Keepalive Stopped").ConfigureAwait(false);
}
catch (Exception ex)
{
await _audioLogger.ErrorAsync("Keepalive Errored", ex).ConfigureAwait(false);
}
}
internal void Dispose(bool disposing)
{