@@ -1,16 +1,16 @@
using Discord.API.Models;
using Discord.Helpers;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using WebSocketMessage = Discord.API.Models.VoiceWebSocketCommands.WebSocketMessage;
using System.Text;
using WebSocketMessage = Discord.API.Models.VoiceWebSocketCommands.WebSocketMessage;
#if !DNXCORE50
using Opus.Net;
#endif
@@ -19,50 +19,63 @@ namespace Discord
{
internal sealed partial class DiscordVoiceSocket : DiscordWebSocket
{
private struct Packet
{
public byte[] Data;
public int Count;
public Packet(byte[] data, int count)
{
Data = data;
Count = count;
}
}
private ManualResetEventSlim _connectWaitOnLogin;
private UdpClient _udp;
private ConcurrentQueue<byte[]> _sendQueue;
private string _myIp;
private IPEndPoint _endpoint;
private byte[] _secretKey;
private string _mode;
private bool _isFirst;
private ushort _sequence;
private uint _ssrc;
private long _startTicks;
private readonly Random _rand = new Random();
#if !DNXCORE50
private OpusEncoder _encoder;
private Queue<Packet> _sendQueue;
private UdpClient _udp;
private IPEndPoint _endpoint;
private bool _isReady;
private byte[] _secretKey;
private string _myIp;
private ushort _sequence;
private string _mode;
#endif
public DiscordVoiceSocket(int timeout, int interval)
: base(timeout, interval)
public DiscordVoiceSocket(DiscordClient client, int timeout, int interval)
: base(client, timeout, interval)
{
_connectWaitOnLogin = new ManualResetEventSlim(false);
_sendQueue = new ConcurrentQueue<byte[]>();
#if !DNXCORE50
_encoder = OpusEncoder.Create(24000, 1, Application.Voip);
_sendQueue = new Queue<Packet>();
_encoder = new OpusEncoder(48000, 1, 20, Application.Audio);
#endif
}
#if !DNXCORE50
protected override void OnConnect()
{
_udp = new UdpClient(new IPEndPoint(IPAddress.Any, 0));
_udp.AllowNatTraversal(true);
_isFirst = true;
}
protected override void OnDisconnect()
{
_udp = null;
}
#endif
protected override Task[] CreateTasks(CancellationToken cancelToken)
{
return new Task[]
{
#if !DNXCORE50
Task.Factory.StartNew(ReceiveAsync, cancelToken, TaskCreationOptions.LongRunning, TaskScheduler.Default).Result,
Task.Factory.StartNew(SendAsync, cancelToken, TaskCreationOptions.LongRunning, TaskScheduler.Default).Result,
#endif
Task.Factory.StartNew(WatcherAsync, cancelToken, TaskCreationOptions.LongRunning, TaskScheduler.Default).Result
}.Concat(base.CreateTasks(cancelToken)).ToArray();
}
@@ -73,8 +86,6 @@ namespace Discord
_connectWaitOnLogin.Reset();
_sequence = 0;
VoiceWebSocketCommands.Login msg = new VoiceWebSocketCommands.Login();
msg.Payload.ServerId = serverId;
msg.Payload.SessionId = sessionId;
@@ -95,10 +106,10 @@ namespace Discord
SetConnected();
}
#if !DNXCORE50
private async Task ReceiveAsync()
{
var cancelToken = _disconnectToken.Token;
try
{
while (!cancelToken.IsCancellationRequested)
@@ -115,17 +126,69 @@ namespace Discord
var cancelToken = _disconnectToken.Token;
try
{
byte[] bytes;
while (!cancelToken.IsCancellationRequested)
while (!cancelToken.IsCancellationRequested && !_isReady)
{
while (_sendQueue.TryDequeue(out bytes))
await _udp.SendAsync(bytes, bytes.Length);
lock (_sendQueue)
{
while (_sendQueue.Count > 0)
{
var packet = _sendQueue.Dequeue();
_udp.Send(packet.Data, packet.Count);
}
}
await Task.Delay(_sendInterval);
}
if (cancelToken.IsCancellationRequested)
return;
uint timestamp = 0;
double nextTicks = 0.0;
double ticksPerFrame = Stopwatch.Frequency / 1000.0 * _encoder.FrameLength;
uint samplesPerFrame = (uint)_encoder.SamplesPerFrame;
Stopwatch sw = Stopwatch.StartNew();
while (!cancelToken.IsCancellationRequested)
{
byte[] rtpPacket = new byte[4012];
rtpPacket[0] = 0x80; //Flags;
rtpPacket[1] = 0x78; //Payload Type
rtpPacket[8] = (byte)((_ssrc >> 24) & 0xFF);
rtpPacket[9] = (byte)((_ssrc >> 16) & 0xFF);
rtpPacket[10] = (byte)((_ssrc >> 8) & 0xFF);
rtpPacket[11] = (byte)((_ssrc >> 0) & 0xFF);
if (sw.ElapsedTicks > nextTicks)
{
lock (_sendQueue)
{
while (sw.ElapsedTicks > nextTicks)
{
if (_sendQueue.Count > 0)
{
var packet = _sendQueue.Dequeue();
ushort sequence = unchecked(_sequence++);
rtpPacket[2] = (byte)((sequence >> 8) & 0xFF);
rtpPacket[3] = (byte)((sequence >> 0) & 0xFF);
rtpPacket[4] = (byte)((timestamp >> 24) & 0xFF);
rtpPacket[5] = (byte)((timestamp >> 16) & 0xFF);
rtpPacket[6] = (byte)((timestamp >> 8) & 0xFF);
rtpPacket[7] = (byte)((timestamp >> 0) & 0xFF);
Buffer.BlockCopy(packet.Data, 0, rtpPacket, 12, packet.Count);
_udp.Send(rtpPacket, packet.Count + 12);
}
timestamp = unchecked(timestamp + samplesPerFrame);
nextTicks += ticksPerFrame;
}
}
}
/*else
await Task.Delay(1);*/
}
}
catch { }
finally { _disconnectToken.Cancel(); }
}
#endif
private async Task WatcherAsync()
{
try
@@ -133,14 +196,16 @@ namespace Discord
await Task.Delay(-1, _disconnectToken.Token);
}
catch (TaskCanceledException) { }
#if DNXCORE50
finally { _udp.Dispose(); }
#else
#if !DNXCORE50
finally { _udp.Close(); }
#endif
}
#if DNXCORE50
protected override Task ProcessMessage(string json)
#else
protected override async Task ProcessMessage(string json)
#endif
{
var msg = JsonConvert.DeserializeObject<WebSocketMessage>(json);
switch (msg.Operation)
@@ -149,18 +214,17 @@ namespace Discord
{
var payload = (msg.Payload as JToken).ToObject<VoiceWebSocketEvents.Ready>();
_heartbeatInterval = payload.HeartbeatInterval;
_ssrc = payload.SSRC;
#if !DNXCORE50
_endpoint = new IPEndPoint((await Dns.GetHostAddressesAsync(_host)).FirstOrDefault(), payload.Port);
//_mode = payload.Modes.LastOrDefault();
_mode = "plain";
_udp.Connect(_endpoint);
lock(_rand)
{
_sequence = (ushort)_rand.Next(0, ushort.MaxValue);
_startTicks = DateTime.UtcNow.Ticks - _rand.Next();
}
_ssrc = payload.SSRC;
_sendQueue.Enqueue(new byte[70] {
lock (_rand)
_sequence = (ushort)_rand.Next(0, ushort.MaxValue);
_isReady = false;
_sendQueue.Enqueue(new Packet(new byte[70] {
(byte)((_ssrc >> 24) & 0xFF),
(byte)((_ssrc >> 16) & 0xFF),
(byte)((_ssrc >> 8) & 0xFF),
@@ -170,30 +234,40 @@ namespace Discord
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0
});
}, 70));
#else
_connectWaitOnLogin.Set();
#endif
}
break;
#if !DNXCORE50
case 4: //SESSION_DESCRIPTION
{
var payload = (msg.Payload as JToken).ToObject<VoiceWebSocketEvents.JoinServer>();
_secretKey = payload.SecretKey;
SendIsTalking(true);
_connectWaitOnLogin.Set();
}
break;
#endif
default:
RaiseOnDebugMessage("Unknown WebSocket operation ID: " + msg.Operation);
break;
}
#if DNXCORE50
return Task.CompletedTask;
#endif
}
#if !DNXCORE50
private void ProcessUdpMessage(UdpReceiveResult msg)
{
if (msg.Buffer.Length > 0 && msg.RemoteEndPoint.Equals(_endpoint))
{
byte[] buffer = msg.Buffer;
int length = msg.Buffer.Length;
if (_isFirst )
if (!_isReady )
{
_isFirst = fals e;
_isReady = tru e;
if (length != 70)
throw new Exception($"Unexpected message length. Expected 70, got {length}.");
@@ -256,36 +330,29 @@ namespace Discord
}
}
#if !DNXCORE50
public void SendWAV(byte[] buffer, int count)
public void SendPCMFrame(byte[] data, int count)
{
int encodedLength;
byte[] payload = _encoder.Encode(buffer, count, out encodedLength);
if (count != _encoder.FrameSize)
throw new InvalidOperationException($"Invalid frame size. Got {count}, expected {_encoder.FrameSize}.");
byte[] payload = new byte[4000];
int encodedLength = _encoder.EncodeFrame(data, payload);
if (_mode == "xsalsa20_poly1305")
{
//TODO: Encode
}
lock (_sendQueue)
_sendQueue.Enqueue(new Packet(payload, encodedLength));
}
byte[] packet = new byte[12 + encodedLength];
Buffer.BlockCopy(payload, 0, packet, 12, encodedLength);
ushort sequence = _sequence++;
long timestamp = (DateTime.UtcNow.Ticks - _startTicks) >> 2; //200ns resolution
packet[0] = 0x80; //Flags;
packet[1] = 0x78; //Payload Type
packet[2] = (byte)((sequence >> 8) & 0xFF);
packet[3] = (byte)((sequence >> 0) & 0xFF);
packet[4] = (byte)((timestamp >> 24) & 0xFF);
packet[5] = (byte)((timestamp >> 16) & 0xFF);
packet[6] = (byte)((timestamp >> 8) & 0xFF);
packet[7] = (byte)((timestamp >> 0) & 0xFF);
packet[8] = (byte)((_ssrc >> 24) & 0xFF);
packet[9] = (byte)((_ssrc >> 16) & 0xFF);
packet[10] = (byte)((_ssrc >> 8) & 0xFF);
packet[11] = (byte)((_ssrc >> 0) & 0xFF);
_sendQueue.Enqueue(packet);
private void SendIsTalking(bool value)
{
var isTalking = new VoiceWebSocketCommands.IsTalking();
isTalking.Payload.IsSpeaking = value;
isTalking.Payload.Delay = 0;
QueueMessage(isTalking);
}
#endif