@@ -1,4 +1,5 @@
using Discord.Audio;
#define USE_THREAD
using Discord.Audio;
using Discord.Helpers;
using Discord.Helpers;
using Newtonsoft.Json;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Newtonsoft.Json.Linq;
@@ -16,46 +17,51 @@ namespace Discord.WebSockets.Voice
{
{
internal partial class VoiceWebSocket : WebSocket
internal partial class VoiceWebSocket : WebSocket
{
{
private const string EncryptedMode = "xsalsa20_poly1305";
private const int MaxOpusSize = 4000;
private const string EncryptedMode = "xsalsa20_poly1305";
private const string UnencryptedMode = "plain";
private const string UnencryptedMode = "plain";
private readonly int _targetAudioBufferLength;
private readonly Random _rand;
private readonly int _targetAudioBufferLength;
private OpusEncoder _encoder;
private readonly ConcurrentDictionary<uint, OpusDecoder> _decoders;
private ManualResetEventSlim _connectWaitOnLogin;
private ManualResetEventSlim _connectWaitOnLogin;
private uint _ssrc;
private uint _ssrc;
private readonly Random _rand = new Random();
private OpusEncoder _encoder;
private ConcurrentQueue<byte[]> _sendQueue;
private ConcurrentQueue<byte[]> _sendQueue;
private ManualResetEventSlim _sendQueueWait, _sendQueueEmptyWait;
private ManualResetEventSlim _sendQueueWait, _sendQueueEmptyWait;
private UdpClient _udp;
private UdpClient _udp;
private IPEndPoint _endpoint;
private IPEndPoint _endpoint;
private bool _isClearing, _isEncrypted;
private bool _isClearing, _isEncrypted;
private byte[] _secretKey;
private byte[] _secretKey, _encodingBuffer ;
private ushort _sequence;
private ushort _sequence;
private byte[] _encodingBuffer;
private string _serverId, _userId, _sessionId, _token, _encryptionMode;
private string _userId, _sessionId, _token, _encryptionMode;
private Server _server;
private Channel _channel;
#if USE_THREAD
#if USE_THREAD
private Thread _sendThread;
private Thread _sendThread;
#endif
#endif
public string CurrentVoiceServerId => _serverId ;
public Server CurrentVoiceServer => _server ;
public VoiceWebSocket(DiscordClient client)
public VoiceWebSocket(DiscordClient client)
: base(client)
: base(client)
{
{
_connectWaitOnLogin = new ManualResetEventSlim(false);
_rand = new Random();
_connectWaitOnLogin = new ManualResetEventSlim(false);
_decoders = new ConcurrentDictionary<uint, OpusDecoder>();
_sendQueue = new ConcurrentQueue<byte[]>();
_sendQueue = new ConcurrentQueue<byte[]>();
_sendQueueWait = new ManualResetEventSlim(true);
_sendQueueWait = new ManualResetEventSlim(true);
_sendQueueEmptyWait = new ManualResetEventSlim(true);
_sendQueueEmptyWait = new ManualResetEventSlim(true);
_encoder = new OpusEncoder(48000, 1, 20, Opus.Application.Audio);
_encodingBuffer = new byte[4000];
_targetAudioBufferLength = client.Config.VoiceBufferLength / 20; //20 ms frames
_targetAudioBufferLength = client.Config.VoiceBufferLength / 20; //20 ms frames
}
_encodingBuffer = new byte[MaxOpusSize];
}
public void SetServer(string serverId )
public void SetChannel(Server server, Channel channel )
{
{
_serverId = serverId;
_server = server;
_channel = channel;
}
}
public async Task Login(string userId, string sessionId, string token, CancellationToken cancelToken)
public async Task Login(string userId, string sessionId, string token, CancellationToken cancelToken)
{
{
@@ -69,7 +75,7 @@ namespace Discord.WebSockets.Voice
_userId = userId;
_userId = userId;
_sessionId = sessionId;
_sessionId = sessionId;
_token = token;
_token = token;
await Connect().ConfigureAwait(false);
await Connect().ConfigureAwait(false);
}
}
public async Task Reconnect()
public async Task Reconnect()
@@ -107,26 +113,29 @@ namespace Discord.WebSockets.Voice
#endif
#endif
LoginCommand msg = new LoginCommand();
LoginCommand msg = new LoginCommand();
msg.Payload.ServerId = _serverId;
msg.Payload.ServerId = _server. Id;
msg.Payload.SessionId = _sessionId;
msg.Payload.SessionId = _sessionId;
msg.Payload.Token = _token;
msg.Payload.Token = _token;
msg.Payload.UserId = _userId;
msg.Payload.UserId = _userId;
QueueMessage(msg);
QueueMessage(msg);
#if USE_THREAD
#if USE_THREAD
_sendThread = new Thread(new ThreadStart(() => SendVoiceAsync(_disconnect Token)));
_sendThread = new Thread(new ThreadStart(() => SendVoiceAsync(_cancel Token)));
_sendThread.Start();
_sendThread.Start();
#if !DNXCORE50
return new Task[] { WatcherAsync() }.Concat(base.Run()).ToArray();
#else
return base.Run();
#endif
#endif
return new Task[]
{
#else //!USE_THREAD
return new Task[] { Task.WhenAll(
ReceiveVoiceAsync(),
ReceiveVoiceAsync(),
#if !USE_THREAD
SendVoiceAsync(),
SendVoiceAsync(),
#endif
#if !DNXCORE50
#if !DNXCORE50
WatcherAsync()
WatcherAsync()
#endif
#endif
}.Concat(base.Run()).ToArray();
)}.Concat(base.Run()).ToArray();
#endif
}
}
protected override Task Cleanup()
protected override Task Cleanup()
{
{
@@ -134,6 +143,13 @@ namespace Discord.WebSockets.Voice
_sendThread.Join();
_sendThread.Join();
_sendThread = null;
_sendThread = null;
#endif
#endif
OpusDecoder decoder;
foreach (var pair in _decoders)
{
if (_decoders.TryRemove(pair.Key, out decoder))
decoder.Dispose();
}
ClearPCMFrames();
ClearPCMFrames();
if (!_wasDisconnectUnexpected)
if (!_wasDisconnectUnexpected)
@@ -147,39 +163,137 @@ namespace Discord.WebSockets.Voice
return base.Cleanup();
return base.Cleanup();
}
}
private async Task ReceiveVoiceAsync()
#if USE_THREAD
private void ReceiveVoiceAsync(CancellationToken cancelToken)
{
#else
private Task ReceiveVoiceAsync()
{
{
var cancelToken = _cancelToken;
var cancelToken = _cancelToken;
await Task.Run(async () =>
return Task.Run(async () =>
{
#endif
try
{
{
try
byte[] packet, decodingBuffer = null, nonce = null, result;
int packetLength, resultOffset, resultLength;
IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, 0);
if ((_client.Config.VoiceMode & DiscordVoiceMode.Incoming) != 0)
{
{
while (!cancelToken.IsCancellationRequested)
decodingBuffer = new byte[MaxOpusSize];
nonce = new byte[24];
}
while (!cancelToken.IsCancellationRequested)
{
#if USE_THREAD
Thread.Sleep(1);
#elif DNXCORE50
await Task.Delay(1).ConfigureAwait(false);
#endif
#if USE_THREAD || DNXCORE50
if (_udp.Available > 0)
{
{
#if DNXCORE50
if (_udp.Available > 0)
{
packet = _udp.Receive(ref endpoint);
#else
var msg = await _udp.ReceiveAsync().ConfigureAwait(false);
endpoint = msg.Endpoint;
receievedPacket = msg.Buffer;
#endif
#endif
var result = await _udp.ReceiveAsync().ConfigureAwait(false);
ProcessUdpMessage(result);
#if DNXCORE50
packetLength = packet.Length;
if (packetLength > 0 && endpoint.Equals(_endpoint))
{
if (_state != (int)WebSocketState.Connected)
{
if (packetLength != 70)
return;
int port = packet[68] | packet[69] << 8;
string ip = Encoding.ASCII.GetString(packet, 4, 70 - 6).TrimEnd('\0');
CompleteConnect();
var login2 = new Login2Command();
login2.Payload.Protocol = "udp";
login2.Payload.SocketData.Address = ip;
login2.Payload.SocketData.Mode = _encryptionMode;
login2.Payload.SocketData.Port = port;
QueueMessage(login2);
if ((_client.Config.VoiceMode & DiscordVoiceMode.Incoming) == 0)
return;
}
else
{
//Parse RTP Data
if (packetLength < 12)
return;
byte flags = packet[0];
if (flags != 0x80)
return;
byte payloadType = packet[1];
if (payloadType != 0x78)
return;
ushort sequenceNumber = (ushort)((packet[2] << 8) |
packet[3] << 0);
uint timestamp = (uint)((packet[4] << 24) |
(packet[5] << 16) |
(packet[6] << 8) |
(packet[7] << 0));
uint ssrc = (uint)((packet[8] << 24) |
(packet[9] << 16) |
(packet[10] << 8) |
(packet[11] << 0));
//Decrypt
if (_isEncrypted)
{
if (packetLength < 28) //12 + 16 (RTP + Poly1305 MAC)
return;
Buffer.BlockCopy(packet, 0, nonce, 0, 12);
int ret = Sodium.Decrypt(packet, 12, packetLength - 12, decodingBuffer, nonce, _secretKey);
if (ret != 0)
continue;
result = decodingBuffer;
resultOffset = 0;
resultLength = packetLength - 28;
}
else //Plain
{
result = packet;
resultOffset = 12;
resultLength = packetLength - 12;
}
/*if (_logLevel >= LogMessageSeverity.Debug)
RaiseOnLog(LogMessageSeverity.Debug, $"Received {buffer.Length - 12} bytes.");*/
string userId = _channel.GetUserId(ssrc);
if (userId != null)
RaiseOnPacket(userId, _channel.Id, result, resultOffset, resultLength);
}
}
}
else
await Task.Delay(1).ConfigureAwait(false);
#endif
#if USE_THREAD || DNXCORE50
}
}
#endif
}
}
catch (OperationCanceledException) { }
catch (InvalidOperationException) { } //Includes ObjectDisposedException
catch (Exception ex) { await DisconnectInternal(ex); }
}
catch (OperationCanceledException) { }
catch (InvalidOperationException) { } //Includes ObjectDisposedException
#if !USE_THREAD
}).ConfigureAwait(false);
}).ConfigureAwait(false);
#endif
}
}
#if USE_THREAD
#if USE_THREAD
private void SendVoiceAsync(CancellationTokenSource cancelSource)
private void SendVoiceAsync(CancellationToken cancelToken )
{
{
var cancelToken = cancelSource.Token;
#else
#else
private Task SendVoiceAsync()
private Task SendVoiceAsync()
{
{
@@ -189,103 +303,114 @@ namespace Discord.WebSockets.Voice
{
{
#endif
#endif
byte[] packet;
try
try
{
while (!cancelToken.IsCancellationRequested && _state != (int)WebSocketState.Connected)
{
{
while (!cancelToken.IsCancellationRequested && _state != (int)WebSocketState.Connected)
{
#if USE_THREAD
#if USE_THREAD
Thread.Sleep(1);
Thread.Sleep(1);
#else
#else
await Task.Delay(1);
await Task.Delay(1);
#endif
#endif
}
}
if (cancelToken.IsCancellationRequested)
return;
uint timestamp = 0;
double nextTicks = 0.0;
double ticksPerMillisecond = Stopwatch.Frequency / 1000.0;
double ticksPerFrame = ticksPerMillisecond * _encoder.FrameLength;
double spinLockThreshold = 1.5 * ticksPerMillisecond;
uint samplesPerFrame = (uint)_encoder.SamplesPerFrame;
Stopwatch sw = Stopwatch.StartNew();
byte[] rtpPacket = new byte[_encodingBuffer.Length + 12];
byte[] nonce = null;
rtpPacket[0] = 0x80; //Flags;
rtpPacket[1] = 0x78; //Payload Type
rtpPacket[8] = (byte)((_ssrc >> 24) & 0xFF);
rtpPacket[9] = (byte)((_ssrc >> 16) & 0xFF);
rtpPacket[10] = (byte)((_ssrc >> 8) & 0xFF);
rtpPacket[11] = (byte)((_ssrc >> 0) & 0xFF);
if (_isEncrypted)
{
nonce = new byte[24];
Buffer.BlockCopy(rtpPacket, 0, nonce, 0, 12);
}
if (cancelToken.IsCancellationRequested)
return;
while (!cancelToken.IsCancellationRequested)
byte[] queuedPacket, result, nonce = null;
uint timestamp = 0;
double nextTicks = 0.0;
double ticksPerMillisecond = Stopwatch.Frequency / 1000.0;
double ticksPerFrame = ticksPerMillisecond * _encoder.FrameLength;
double spinLockThreshold = 1.5 * ticksPerMillisecond;
uint samplesPerFrame = (uint)_encoder.SamplesPerFrame;
Stopwatch sw = Stopwatch.StartNew();
if (_isEncrypted)
{
nonce = new byte[24];
result = new byte[MaxOpusSize + 12 + 16];
}
else
result = new byte[MaxOpusSize + 12];
int rtpPacketLength = 0;
result[0] = 0x80; //Flags;
result[1] = 0x78; //Payload Type
result[8] = (byte)((_ssrc >> 24) & 0xFF);
result[9] = (byte)((_ssrc >> 16) & 0xFF);
result[10] = (byte)((_ssrc >> 8) & 0xFF);
result[11] = (byte)((_ssrc >> 0) & 0xFF);
if (_isEncrypted)
Buffer.BlockCopy(result, 0, nonce, 0, 12);
while (!cancelToken.IsCancellationRequested)
{
double ticksToNextFrame = nextTicks - sw.ElapsedTicks;
if (ticksToNextFrame <= 0.0)
{
{
double ticksToNextFrame = nextTicks - sw.ElapsedTicks;
if (ticksToNextFrame <= 0.0)
while (sw.ElapsedTicks > nextTicks)
{
{
while (sw.ElapsedTicks > nextTicks)
if (!_isClearing )
{
{
if (!_isClearing)
if (_sendQueue.TryDequeue(out queuedPacket) )
{
{
if (_sendQueue.TryDequeue(out packet))
ushort sequence = unchecked(_sequence++);
result[2] = (byte)((sequence >> 8) & 0xFF);
result[3] = (byte)((sequence >> 0) & 0xFF);
result[4] = (byte)((timestamp >> 24) & 0xFF);
result[5] = (byte)((timestamp >> 16) & 0xFF);
result[6] = (byte)((timestamp >> 8) & 0xFF);
result[7] = (byte)((timestamp >> 0) & 0xFF);
if (_isEncrypted)
{
Buffer.BlockCopy(result, 2, nonce, 2, 6); //Update nonce
int ret = Sodium.Encrypt(queuedPacket, queuedPacket.Length, result, 12, nonce, _secretKey);
if (ret != 0)
continue;
rtpPacketLength = queuedPacket.Length + 12 + 16;
}
else
{
{
ushort sequence = unchecked(_sequence++);
rtpPacket[2] = (byte)((sequence >> 8) & 0xFF);
rtpPacket[3] = (byte)((sequence >> 0) & 0xFF);
rtpPacket[4] = (byte)((timestamp >> 24) & 0xFF);
rtpPacket[5] = (byte)((timestamp >> 16) & 0xFF);
rtpPacket[6] = (byte)((timestamp >> 8) & 0xFF);
rtpPacket[7] = (byte)((timestamp >> 0) & 0xFF);
if (_isEncrypted)
{
Buffer.BlockCopy(rtpPacket, 2, nonce, 2, 6); //Update nonce
int ret = Sodium.Encrypt(packet, packet.Length, packet, nonce, _secretKey);
if (ret != 0)
continue;
}
Buffer.BlockCopy(packet, 0, rtpPacket, 12, packet.Length);
Buffer.BlockCopy(queuedPacket, 0, result, 12, queuedPacket.Length);
rtpPacketLength = queuedPacket.Length + 12;
}
#if USE_THREAD
#if USE_THREAD
_udp.Send(rtpPacket, packet.Length + 12);
_udp.Send(result, rtpPacketLength);
#else
#else
await _udp.SendAsync(rtpPacket, packet.Length + 12).ConfigureAwait(false);
await _udp.SendAsync(rtpPacket, rtpPacketLength).ConfigureAwait(false);
#endif
#endif
}
timestamp = unchecked(timestamp + samplesPerFrame);
nextTicks += ticksPerFrame;
}
timestamp = unchecked(timestamp + samplesPerFrame);
nextTicks += ticksPerFrame;
//If we have less than our target data buffered, request more
int count = _sendQueue.Count;
if (count == 0)
{
_sendQueueWait.Set();
_sendQueueEmptyWait.Set();
}
else if (count < _targetAudioBufferLength)
_sendQueueWait.Set();
//If we have less than our target data buffered, request more
int count = _sendQueue.Count;
if (count == 0)
{
_sendQueueWait.Set();
_sendQueueEmptyWait.Set();
}
}
else if (count < _targetAudioBufferLength)
_sendQueueWait.Set();
}
}
}
}
//Dont sleep for 1 millisecond if we need to output audio in the next 1.5
else if (_sendQueue.Count == 0 || ticksToNextFrame >= spinLockThreshold)
}
//Dont sleep for 1 millisecond if we need to output audio in the next 1.5
else if (_sendQueue.Count == 0 || ticksToNextFrame >= spinLockThreshold)
#if USE_THREAD
#if USE_THREAD
Thread.Sleep(1);
Thread.Sleep(1);
#else
#else
await Task.Delay(1).ConfigureAwait(false);
await Task.Delay(1).ConfigureAwait(false);
#endif
#endif
}
}
}
catch (OperationCanceledException) { }
catch (InvalidOperationException) { } //Includes ObjectDisposedException
}
catch (OperationCanceledException) { }
catch (InvalidOperationException) { } //Includes ObjectDisposedException
#if !USE_THREAD
#if !USE_THREAD
});
}).ConfigureAwait(false) ;
#endif
#endif
}
}
#if !DNXCORE50
#if !DNXCORE50
@@ -331,16 +456,12 @@ namespace Discord.WebSockets.Voice
_sequence = (ushort)_rand.Next(0, ushort.MaxValue);
_sequence = (ushort)_rand.Next(0, ushort.MaxValue);
//No thread issue here because SendAsync doesn't start until _isReady is true
//No thread issue here because SendAsync doesn't start until _isReady is true
await _udp.SendAsync(new byte[70] {
(byte)((_ssrc >> 24) & 0xFF),
(byte)((_ssrc >> 16) & 0xFF),
(byte)((_ssrc >> 8) & 0xFF),
(byte)((_ssrc >> 0) & 0xFF),
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0 }, 70).ConfigureAwait(false);
byte[] packet = new byte[70];
packet[0] = (byte)((_ssrc >> 24) & 0xFF);
packet[1] = (byte)((_ssrc >> 16) & 0xFF);
packet[2] = (byte)((_ssrc >> 8) & 0xFF);
packet[3] = (byte)((_ssrc >> 0) & 0xFF);
await _udp.SendAsync(packet, 70).ConfigureAwait(false);
}
}
}
}
break;
break;
@@ -365,98 +486,6 @@ namespace Discord.WebSockets.Voice
}
}
}
}
private void ProcessUdpMessage(UdpReceiveResult msg)
{
if (msg.Buffer.Length > 0 && msg.RemoteEndPoint.Equals(_endpoint))
{
byte[] buffer = msg.Buffer;
int length = msg.Buffer.Length;
if (_state != (int)WebSocketState.Connected)
{
if (length != 70)
{
if (_logLevel >= LogMessageSeverity.Warning)
RaiseOnLog(LogMessageSeverity.Warning, $"Unexpected message length. Expected 70, got {length}.");
return;
}
int port = buffer[68] | buffer[69] << 8;
string ip = Encoding.ASCII.GetString(buffer, 4, 70 - 6).TrimEnd('\0');
CompleteConnect();
var login2 = new Login2Command();
login2.Payload.Protocol = "udp";
login2.Payload.SocketData.Address = ip;
login2.Payload.SocketData.Mode = _encryptionMode;
login2.Payload.SocketData.Port = port;
QueueMessage(login2);
}
else
{
//Parse RTP Data
if (length < 12)
{
if (_logLevel >= LogMessageSeverity.Warning)
RaiseOnLog(LogMessageSeverity.Warning, $"Unexpected message length. Expected >= 12, got {length}.");
return;
}
byte flags = buffer[0];
if (flags != 0x80)
{
if (_logLevel >= LogMessageSeverity.Warning)
RaiseOnLog(LogMessageSeverity.Warning, $"Unexpected Flags: {flags}");
return;
}
byte payloadType = buffer[1];
if (payloadType != 0x78)
{
if (_logLevel >= LogMessageSeverity.Warning)
RaiseOnLog(LogMessageSeverity.Warning, $"Unexpected Payload Type: {payloadType}");
return;
}
ushort sequenceNumber = (ushort)((buffer[2] << 8) |
buffer[3] << 0);
uint timestamp = (uint)((buffer[4] << 24) |
(buffer[5] << 16) |
(buffer[6] << 8) |
(buffer[7] << 0));
uint ssrc = (uint)((buffer[8] << 24) |
(buffer[9] << 16) |
(buffer[10] << 8) |
(buffer[11] << 0));
//Decrypt
/*if (_mode == "xsalsa20_poly1305")
{
if (length < 36) //12 + 24
throw new Exception($"Unexpected message length. Expected >= 36, got {length}.");
byte[] nonce = new byte[24]; //16 bytes static, 8 bytes incrementing?
Buffer.BlockCopy(buffer, 12, nonce, 0, 24);
byte[] cipherText = new byte[buffer.Length - 36];
Buffer.BlockCopy(buffer, 36, cipherText, 0, cipherText.Length);
Sodium.SecretBox.Open(cipherText, nonce, _secretKey);
}
else //Plain
{
byte[] newBuffer = new byte[buffer.Length - 12];
Buffer.BlockCopy(buffer, 12, newBuffer, 0, newBuffer.Length);
buffer = newBuffer;
}*/
if (_logLevel >= LogMessageSeverity.Debug)
RaiseOnLog(LogMessageSeverity.Debug, $"Received {buffer.Length - 12} bytes.");
//TODO: Use Voice Data
}
}
}
public void SendPCMFrames(byte[] data, int bytes)
public void SendPCMFrames(byte[] data, int bytes)
{
{
int frameSize = _encoder.FrameSize;
int frameSize = _encoder.FrameSize;
@@ -491,17 +520,11 @@ namespace Discord.WebSockets.Voice
//Wipe the end of the buffer
//Wipe the end of the buffer
for (int j = lastFrameSize; j < frameSize; j++)
for (int j = lastFrameSize; j < frameSize; j++)
data[j] = 0;
data[j] = 0;
}
}
//Encode the frame
//Encode the frame
int encodedLength = _encoder.EncodeFrame(data, pos, _encodingBuffer);
int encodedLength = _encoder.EncodeFrame(data, pos, _encodingBuffer);
//TODO: Handle Encryption
if (_isEncrypted)
{
}
//Copy result to the queue
//Copy result to the queue
payload = new byte[encodedLength];
payload = new byte[encodedLength];
Buffer.BlockCopy(_encodingBuffer, 0, payload, 0, encodedLength);
Buffer.BlockCopy(_encodingBuffer, 0, payload, 0, encodedLength);
@@ -515,8 +538,8 @@ namespace Discord.WebSockets.Voice
}
}
}
}
if (_logLevel >= LogMessageSeverity.Debug)
RaiseOnLog(LogMessageSeverity.Debug, $"Queued {bytes} bytes for voice output.");
/* if (_logLevel >= LogMessageSeverity.Debug)
RaiseOnLog(LogMessageSeverity.Debug, $"Queued {bytes} bytes for voice output.");*/
}
}
public void ClearPCMFrames()
public void ClearPCMFrames()
{
{