@@ -18,9 +18,8 @@ namespace Discord.WebSockets.Voice
{
internal partial class VoiceWebSocket : WebSocket
{
private const int MaxOpusSize = 4000; //Max size of a single 20ms Opus frame
private const double SpinLockMilliseconds = 3.0; //If we're going to send audio in the next X milliseconds, dont use Task.Delay or Thread.Sleep
private const string EncryptedMode = "xsalsa20_poly1305";
private const int MaxOpusSize = 4000;
private const string EncryptedMode = "xsalsa20_poly1305";
private const string UnencryptedMode = "plain";
private readonly Random _rand;
@@ -30,11 +29,10 @@ namespace Discord.WebSockets.Voice
private uint _ssrc;
private ConcurrentDictionary<uint, string> _ssrcMapping;
private ConcurrentQueue<byte[]> _sendQueue;
private ManualResetEventSlim _sendQueueWait, _sendQueueEmptyWait;
private VoiceBuffer _sendQueue;
private UdpClient _udp;
private IPEndPoint _endpoint;
private bool _isClearing, _is Encrypted;
private bool _isEncrypted;
private byte[] _secretKey, _encodingBuffer;
private ushort _sequence;
private string _serverId, _channelId, _userId, _sessionId, _token, _encryptionMode;
@@ -51,13 +49,11 @@ namespace Discord.WebSockets.Voice
{
_rand = new Random();
_decoders = new ConcurrentDictionary<uint, OpusDecoder>();
_sendQueue = new ConcurrentQueue<byte[]>();
_sendQueueWait = new ManualResetEventSlim(true);
_sendQueueEmptyWait = new ManualResetEventSlim(true);
_targetAudioBufferLength = client.Config.VoiceBufferLength / 20; //20 ms frames
_encodingBuffer = new byte[MaxOpusSize];
_ssrcMapping = new ConcurrentDictionary<uint, string>();
_encoder = new OpusEncoder(48000, 1, 20, Opus.Application.Audio);
_sendQueue = new VoiceBuffer((int)Math.Ceiling(client.Config.VoiceBufferLength / (double)_encoder.FrameLength), _encoder.FrameSize);
}
public Task SetChannel(string serverId, string channelId)
@@ -108,9 +104,7 @@ namespace Discord.WebSockets.Voice
}
protected override IEnumerable<Task> GetTasks()
{
_isClearing = false;
{
_udp = new UdpClient(new IPEndPoint(IPAddress.Any, 0));
#if !DNX451 && !__MonoCS__
_udp.AllowNatTraversal(true);
@@ -347,33 +341,35 @@ namespace Discord.WebSockets.Voice
if (cancelToken.IsCancellationRequested)
return;
byte[] queuedPacket, result, nonce = null;
byte[] frame = new byte[_encoder.FrameSize];
byte[] encodedFrame = new byte[MaxOpusSize];
byte[] udpPacket, nonce = null;
uint timestamp = 0;
double nextTicks = 0.0;
double ticksPerMillisecond = Stopwatch.Frequency / 1000.0;
double spinLockThreshold = SpinLockMilliseconds * ticksPerMillisecond;
double ticksPerFrame = ticksPerMillisecond * _encoder.FrameLength;
double spinLockThreshold = 3 * ticksPerMillisecond;
uint samplesPerFrame = (uint)_encoder.SamplesPerFrame;
Stopwatch sw = Stopwatch.StartNew();
if (_isEncrypted)
{
nonce = new byte[24];
resul t = new byte[MaxOpusSize + 12 + 16];
udpPacke t = new byte[MaxOpusSize + 12 + 16];
}
else
resul t = new byte[MaxOpusSize + 12];
udpPacke t = new byte[MaxOpusSize + 12];
int rtpPacketLength = 0;
resul t[0] = 0x80; //Flags;
resul t[1] = 0x78; //Payload Type
resul t[8] = (byte)((_ssrc >> 24) & 0xFF);
resul t[9] = (byte)((_ssrc >> 16) & 0xFF);
resul t[10] = (byte)((_ssrc >> 8) & 0xFF);
resul t[11] = (byte)((_ssrc >> 0) & 0xFF);
udpPacke t[0] = 0x80; //Flags;
udpPacke t[1] = 0x78; //Payload Type
udpPacke t[8] = (byte)((_ssrc >> 24) & 0xFF);
udpPacke t[9] = (byte)((_ssrc >> 16) & 0xFF);
udpPacke t[10] = (byte)((_ssrc >> 8) & 0xFF);
udpPacke t[11] = (byte)((_ssrc >> 0) & 0xFF);
if (_isEncrypted)
Buffer.BlockCopy(resul t, 0, nonce, 0, 12);
if (_isEncrypted)
Buffer.BlockCopy(udpPacke t, 0, nonce, 0, 12);
while (!cancelToken.IsCancellationRequested)
{
@@ -382,50 +378,41 @@ namespace Discord.WebSockets.Voice
{
while (sw.ElapsedTicks > nextTicks)
{
if (!_isClearing )
if (_sendQueue.Pop(frame) )
{
if (_sendQueue.TryDequeue(out queuedPacket))
ushort sequence = unchecked(_sequence++);
udpPacket[2] = (byte)((sequence >> 8) & 0xFF);
udpPacket[3] = (byte)((sequence >> 0) & 0xFF);
udpPacket[4] = (byte)((timestamp >> 24) & 0xFF);
udpPacket[5] = (byte)((timestamp >> 16) & 0xFF);
udpPacket[6] = (byte)((timestamp >> 8) & 0xFF);
udpPacket[7] = (byte)((timestamp >> 0) & 0xFF);
//Encode
int encodedLength = _encoder.EncodeFrame(frame, 0, encodedFrame);
//Encrypt
if (_isEncrypted)
{
ushort sequence = unchecked(_sequence++);
result[2] = (byte)((sequence >> 8) & 0xFF);
result[3] = (byte)((sequence >> 0) & 0xFF);
result[4] = (byte)((timestamp >> 24) & 0xFF);
result[5] = (byte)((timestamp >> 16) & 0xFF);
result[6] = (byte)((timestamp >> 8) & 0xFF);
result[7] = (byte)((timestamp >> 0) & 0xFF);
if (_isEncrypted)
{
Buffer.BlockCopy(result, 2, nonce, 2, 6); //Update nonce
int ret = Sodium.Encrypt(queuedPacket, queuedPacket.Length, result, 12, nonce, _secretKey);
if (ret != 0)
continue;
rtpPacketLength = queuedPacket.Length + 12 + 16;
}
else
{
Buffer.BlockCopy(queuedPacket, 0, result, 12, queuedPacket.Length);
rtpPacketLength = queuedPacket.Length + 12;
}
#if USE_THREAD
_udp.Send(result, rtpPacketLength);
#else
await _udp.SendAsync(rtpPacket, rtpPacketLength).ConfigureAwait(false);
#endif
Buffer.BlockCopy(udpPacket, 2, nonce, 2, 6); //Update nonce
int ret = Sodium.Encrypt(encodedFrame, encodedLength, udpPacket, 12, nonce, _secretKey);
if (ret != 0)
continue;
rtpPacketLength = encodedLength + 12 + 16;
}
timestamp = unchecked(timestamp + samplesPerFrame);
nextTicks += ticksPerFrame;
//If we have less than our target data buffered, request more
int count = _sendQueue.Count;
if (count == 0)
else
{
_sendQueueWait.Set( );
_sendQueueEmptyWait.Set() ;
Buffer.BlockCopy(encodedFrame, 0, udpPacket, 12, encodedLength);
rtpPacketLength = encodedLength + 12;
}
else if (count < _targetAudioBufferLength)
_sendQueueWait.Set();
#if USE_THREAD
_udp.Send(udpPacket, rtpPacketLength);
#else
await _udp.SendAsync(rtpPacket, rtpPacketLength).ConfigureAwait(false);
#endif
}
timestamp = unchecked(timestamp + samplesPerFrame);
nextTicks += ticksPerFrame;
}
}
//Dont sleep if we need to output audio in the next spinLockThreshold
@@ -433,21 +420,21 @@ namespace Discord.WebSockets.Voice
{
int time = (int)Math.Ceiling((ticksToNextFrame - spinLockThreshold) / ticksPerMillisecond);
#if USE_THREAD
Thread.Sleep(time );
Thread.Sleep(1 );
#else
await Task.Delay(time ).ConfigureAwait(false);
await Task.Delay(1 ).ConfigureAwait(false);
#endif
}
//Don't spinlock if we're not actually sending audio (or buffer underrunning)
else if (_sendQueue.Count == 0)
/* else if (_sendQueue.Count == 0)
{
int time = (int)Math.Ceiling(ticksToNextFrame / ticksPerMillisecond);
#if USE_THREAD
Thread.Sleep(time );
Thread.Sleep(1 );
#else
await Task.Delay(time ).ConfigureAwait(false);
await Task.Delay(1 ).ConfigureAwait(false);
#endif
}
}*/
}
}
catch (OperationCanceledException) { }
@@ -531,67 +518,11 @@ namespace Discord.WebSockets.Voice
public void SendPCMFrames(byte[] data, int bytes)
{
int frameSize = _encoder.FrameSize;
int frames = bytes / frameSize;
int expectedBytes = frames * frameSize;
int lastFrameSize = bytes - expectedBytes;
//If this only consists of a partial frame and the buffer is too small to pad the end, make a new one
if (data.Length < frameSize)
{
byte[] newData = new byte[frameSize];
Buffer.BlockCopy(data, 0, newData, 0, bytes);
data = newData;
}
byte[] payload;
//Opus encoder requires packets be queued in the same order they were generated, so all of this must still be locked.
lock (_encoder)
{
for (int i = 0, pos = 0; i <= frames; i++, pos += frameSize)
{
if (i == frames)
{
//If there are no partial frames, skip this step
if (lastFrameSize == 0)
break;
//Take the partial frame from the end of the buffer and put it at the start
Buffer.BlockCopy(data, pos, data, 0, lastFrameSize);
pos = 0;
//Wipe the end of the buffer
for (int j = lastFrameSize; j < frameSize; j++)
data[j] = 0;
}
//Encode the frame
int encodedLength = _encoder.EncodeFrame(data, pos, _encodingBuffer);
//Copy result to the queue
payload = new byte[encodedLength];
Buffer.BlockCopy(_encodingBuffer, 0, payload, 0, encodedLength);
//Wait until the queue has a spot open
_sendQueueWait.Wait(_cancelToken);
_sendQueue.Enqueue(payload);
if (_sendQueue.Count >= _targetAudioBufferLength)
_sendQueueWait.Reset();
_sendQueueEmptyWait.Reset();
}
}
/*if (_logLevel >= LogMessageSeverity.Debug)
RaiseOnLog(LogMessageSeverity.Debug, $"Queued {bytes} bytes for voice output.");*/
_sendQueue.Push(data, bytes, _cancelToken);
}
public void ClearPCMFrames()
{
_isClearing = true;
byte[] ignored;
while (_sendQueue.TryDequeue(out ignored)) { }
if (_logLevel >= LogMessageSeverity.Debug)
RaiseOnLog(LogMessageSeverity.Debug, "Cleared the voice buffer.");
_isClearing = false;
_sendQueue.Clear(_cancelToken);
}
private void SendIsTalking(bool value)
@@ -609,7 +540,7 @@ namespace Discord.WebSockets.Voice
public void WaitForQueue()
{
_sendQueueEmptyWait .Wait(_cancelToken);
_sendQueue.Wait(_cancelToken);
}
public Task WaitForConnection(int timeout)
{