diff --git a/src/Discord.Net/WebSockets/Voice/VoiceBuffer.cs b/src/Discord.Net/WebSockets/Voice/VoiceBuffer.cs new file mode 100644 index 000000000..b8fa2ec90 --- /dev/null +++ b/src/Discord.Net/WebSockets/Voice/VoiceBuffer.cs @@ -0,0 +1,121 @@ +using System; +using System.Threading; + +namespace Discord.WebSockets.Voice +{ + public class VoiceBuffer + { + private readonly int _frameSize, _frameCount, _bufferSize; + private readonly byte[] _buffer; + private readonly byte[] _blankFrame; + private ushort _readCursor, _writeCursor; + private ManualResetEventSlim _underflowEvent, _notOverflowEvent; + private bool _isClearing; + + public VoiceBuffer(int frameCount, int frameSize) + { + _frameSize = frameSize; + _frameCount = frameCount; + _bufferSize = _frameSize * _frameCount; + _readCursor = 0; + _writeCursor = 0; + _buffer = new byte[_bufferSize]; + _blankFrame = new byte[_frameSize]; + _underflowEvent = new ManualResetEventSlim(); //Notifies when an underflow has occurred + _notOverflowEvent = new ManualResetEventSlim(); //Notifies when an overflow is solved + } + + public void Push(byte[] buffer, int bytes, CancellationToken cancelToken) + { + int wholeFrames = bytes / _frameSize; + int expectedBytes = wholeFrames * _frameSize; + int lastFrameSize = bytes - expectedBytes; + + lock (this) + { + for (int i = 0, pos = 0; i <= wholeFrames; i++, pos += _frameSize) + { + int write = _writeCursor; + int read = _readCursor; + Console.WriteLine($"W {read} - {write}"); + + //If the read cursor is in the next position, wait for it to move. + ushort nextPosition = _writeCursor; + AdvanceCursorPos(ref nextPosition); + if (_readCursor == nextPosition) + { + _notOverflowEvent.Reset(); + _notOverflowEvent.Wait(cancelToken); + } + + if (i == wholeFrames) + { + //If there are no partial frames, skip this step + if (lastFrameSize == 0) + break; + + //Copy partial frame + Buffer.BlockCopy(buffer, pos, _buffer, _writeCursor * _frameSize, lastFrameSize); + + //Wipe the end of the buffer + Buffer.BlockCopy(_blankFrame, 0, _buffer, _writeCursor * _frameSize + lastFrameSize, _frameSize - lastFrameSize); + } + else + { + //Copy full frame + Buffer.BlockCopy(buffer, pos, _buffer, _writeCursor * _frameSize, _frameSize); + } + + //Advance the write cursor to the next position + AdvanceCursorPos(ref _writeCursor); + _underflowEvent.Set(); + } + } + } + + public bool Pop(byte[] buffer) + { + if (_writeCursor == _readCursor) + { + _underflowEvent.Set(); + _notOverflowEvent.Set(); + return false; + } + + bool isClearing = _isClearing; + if (!isClearing) + Buffer.BlockCopy(_buffer, _readCursor * _frameSize, buffer, 0, _frameSize); + + //Advance the read cursor to the next position + AdvanceCursorPos(ref _readCursor); + _notOverflowEvent.Set(); + return !isClearing; + } + + public void Clear(CancellationToken cancelToken) + { + lock (this) + { + _isClearing = true; + for (int i = 0; i < _frameCount; i++) + Buffer.BlockCopy(_blankFrame, 0, _buffer, i * _frameCount, i++); + _underflowEvent.Wait(cancelToken); + _writeCursor = 0; + _readCursor = 0; + _isClearing = false; + } + } + + public void Wait(CancellationToken cancelToken) + { + _underflowEvent.Wait(cancelToken); + } + + private void AdvanceCursorPos(ref ushort pos) + { + pos++; + if (pos == _frameCount) + pos = 0; + } + } +} \ No newline at end of file diff --git a/src/Discord.Net/WebSockets/Voice/VoiceWebSocket.cs b/src/Discord.Net/WebSockets/Voice/VoiceWebSocket.cs index c8d7906cb..5aedd5ee0 100644 --- a/src/Discord.Net/WebSockets/Voice/VoiceWebSocket.cs +++ b/src/Discord.Net/WebSockets/Voice/VoiceWebSocket.cs @@ -18,9 +18,8 @@ namespace Discord.WebSockets.Voice { internal partial class VoiceWebSocket : WebSocket { - private const int MaxOpusSize = 4000; //Max size of a single 20ms Opus frame - private const double SpinLockMilliseconds = 3.0; //If we're going to send audio in the next X milliseconds, dont use Task.Delay or Thread.Sleep - private const string EncryptedMode = "xsalsa20_poly1305"; + private const int MaxOpusSize = 4000; + private const string EncryptedMode = "xsalsa20_poly1305"; private const string UnencryptedMode = "plain"; private readonly Random _rand; @@ -30,11 +29,10 @@ namespace Discord.WebSockets.Voice private uint _ssrc; private ConcurrentDictionary _ssrcMapping; - private ConcurrentQueue _sendQueue; - private ManualResetEventSlim _sendQueueWait, _sendQueueEmptyWait; + private VoiceBuffer _sendQueue; private UdpClient _udp; private IPEndPoint _endpoint; - private bool _isClearing, _isEncrypted; + private bool _isEncrypted; private byte[] _secretKey, _encodingBuffer; private ushort _sequence; private string _serverId, _channelId, _userId, _sessionId, _token, _encryptionMode; @@ -51,13 +49,11 @@ namespace Discord.WebSockets.Voice { _rand = new Random(); _decoders = new ConcurrentDictionary(); - _sendQueue = new ConcurrentQueue(); - _sendQueueWait = new ManualResetEventSlim(true); - _sendQueueEmptyWait = new ManualResetEventSlim(true); _targetAudioBufferLength = client.Config.VoiceBufferLength / 20; //20 ms frames _encodingBuffer = new byte[MaxOpusSize]; _ssrcMapping = new ConcurrentDictionary(); _encoder = new OpusEncoder(48000, 1, 20, Opus.Application.Audio); + _sendQueue = new VoiceBuffer((int)Math.Ceiling(client.Config.VoiceBufferLength / (double)_encoder.FrameLength), _encoder.FrameSize); } public Task SetChannel(string serverId, string channelId) @@ -108,9 +104,7 @@ namespace Discord.WebSockets.Voice } protected override IEnumerable GetTasks() - { - _isClearing = false; - + { _udp = new UdpClient(new IPEndPoint(IPAddress.Any, 0)); #if !DNX451 && !__MonoCS__ _udp.AllowNatTraversal(true); @@ -347,33 +341,35 @@ namespace Discord.WebSockets.Voice if (cancelToken.IsCancellationRequested) return; - byte[] queuedPacket, result, nonce = null; + byte[] frame = new byte[_encoder.FrameSize]; + byte[] encodedFrame = new byte[MaxOpusSize]; + byte[] udpPacket, nonce = null; uint timestamp = 0; double nextTicks = 0.0; double ticksPerMillisecond = Stopwatch.Frequency / 1000.0; - double spinLockThreshold = SpinLockMilliseconds * ticksPerMillisecond; double ticksPerFrame = ticksPerMillisecond * _encoder.FrameLength; + double spinLockThreshold = 3 * ticksPerMillisecond; uint samplesPerFrame = (uint)_encoder.SamplesPerFrame; Stopwatch sw = Stopwatch.StartNew(); if (_isEncrypted) { nonce = new byte[24]; - result = new byte[MaxOpusSize + 12 + 16]; + udpPacket = new byte[MaxOpusSize + 12 + 16]; } else - result = new byte[MaxOpusSize + 12]; + udpPacket = new byte[MaxOpusSize + 12]; int rtpPacketLength = 0; - result[0] = 0x80; //Flags; - result[1] = 0x78; //Payload Type - result[8] = (byte)((_ssrc >> 24) & 0xFF); - result[9] = (byte)((_ssrc >> 16) & 0xFF); - result[10] = (byte)((_ssrc >> 8) & 0xFF); - result[11] = (byte)((_ssrc >> 0) & 0xFF); + udpPacket[0] = 0x80; //Flags; + udpPacket[1] = 0x78; //Payload Type + udpPacket[8] = (byte)((_ssrc >> 24) & 0xFF); + udpPacket[9] = (byte)((_ssrc >> 16) & 0xFF); + udpPacket[10] = (byte)((_ssrc >> 8) & 0xFF); + udpPacket[11] = (byte)((_ssrc >> 0) & 0xFF); - if (_isEncrypted) - Buffer.BlockCopy(result, 0, nonce, 0, 12); + if (_isEncrypted) + Buffer.BlockCopy(udpPacket, 0, nonce, 0, 12); while (!cancelToken.IsCancellationRequested) { @@ -382,50 +378,41 @@ namespace Discord.WebSockets.Voice { while (sw.ElapsedTicks > nextTicks) { - if (!_isClearing) + if (_sendQueue.Pop(frame)) { - if (_sendQueue.TryDequeue(out queuedPacket)) + ushort sequence = unchecked(_sequence++); + udpPacket[2] = (byte)((sequence >> 8) & 0xFF); + udpPacket[3] = (byte)((sequence >> 0) & 0xFF); + udpPacket[4] = (byte)((timestamp >> 24) & 0xFF); + udpPacket[5] = (byte)((timestamp >> 16) & 0xFF); + udpPacket[6] = (byte)((timestamp >> 8) & 0xFF); + udpPacket[7] = (byte)((timestamp >> 0) & 0xFF); + + //Encode + int encodedLength = _encoder.EncodeFrame(frame, 0, encodedFrame); + + //Encrypt + if (_isEncrypted) { - ushort sequence = unchecked(_sequence++); - result[2] = (byte)((sequence >> 8) & 0xFF); - result[3] = (byte)((sequence >> 0) & 0xFF); - result[4] = (byte)((timestamp >> 24) & 0xFF); - result[5] = (byte)((timestamp >> 16) & 0xFF); - result[6] = (byte)((timestamp >> 8) & 0xFF); - result[7] = (byte)((timestamp >> 0) & 0xFF); - - if (_isEncrypted) - { - Buffer.BlockCopy(result, 2, nonce, 2, 6); //Update nonce - int ret = Sodium.Encrypt(queuedPacket, queuedPacket.Length, result, 12, nonce, _secretKey); - if (ret != 0) - continue; - rtpPacketLength = queuedPacket.Length + 12 + 16; - } - else - { - Buffer.BlockCopy(queuedPacket, 0, result, 12, queuedPacket.Length); - rtpPacketLength = queuedPacket.Length + 12; - } -#if USE_THREAD - _udp.Send(result, rtpPacketLength); -#else - await _udp.SendAsync(rtpPacket, rtpPacketLength).ConfigureAwait(false); -#endif + Buffer.BlockCopy(udpPacket, 2, nonce, 2, 6); //Update nonce + int ret = Sodium.Encrypt(encodedFrame, encodedLength, udpPacket, 12, nonce, _secretKey); + if (ret != 0) + continue; + rtpPacketLength = encodedLength + 12 + 16; } - timestamp = unchecked(timestamp + samplesPerFrame); - nextTicks += ticksPerFrame; - - //If we have less than our target data buffered, request more - int count = _sendQueue.Count; - if (count == 0) + else { - _sendQueueWait.Set(); - _sendQueueEmptyWait.Set(); + Buffer.BlockCopy(encodedFrame, 0, udpPacket, 12, encodedLength); + rtpPacketLength = encodedLength + 12; } - else if (count < _targetAudioBufferLength) - _sendQueueWait.Set(); +#if USE_THREAD + _udp.Send(udpPacket, rtpPacketLength); +#else + await _udp.SendAsync(rtpPacket, rtpPacketLength).ConfigureAwait(false); +#endif } + timestamp = unchecked(timestamp + samplesPerFrame); + nextTicks += ticksPerFrame; } } //Dont sleep if we need to output audio in the next spinLockThreshold @@ -433,21 +420,21 @@ namespace Discord.WebSockets.Voice { int time = (int)Math.Ceiling((ticksToNextFrame - spinLockThreshold) / ticksPerMillisecond); #if USE_THREAD - Thread.Sleep(time); + Thread.Sleep(1); #else - await Task.Delay(time).ConfigureAwait(false); + await Task.Delay(1).ConfigureAwait(false); #endif } //Don't spinlock if we're not actually sending audio (or buffer underrunning) - else if (_sendQueue.Count == 0) + /*else if (_sendQueue.Count == 0) { int time = (int)Math.Ceiling(ticksToNextFrame / ticksPerMillisecond); #if USE_THREAD - Thread.Sleep(time); + Thread.Sleep(1); #else - await Task.Delay(time).ConfigureAwait(false); + await Task.Delay(1).ConfigureAwait(false); #endif - } + }*/ } } catch (OperationCanceledException) { } @@ -531,67 +518,11 @@ namespace Discord.WebSockets.Voice public void SendPCMFrames(byte[] data, int bytes) { - int frameSize = _encoder.FrameSize; - int frames = bytes / frameSize; - int expectedBytes = frames * frameSize; - int lastFrameSize = bytes - expectedBytes; - - //If this only consists of a partial frame and the buffer is too small to pad the end, make a new one - if (data.Length < frameSize) - { - byte[] newData = new byte[frameSize]; - Buffer.BlockCopy(data, 0, newData, 0, bytes); - data = newData; - } - - byte[] payload; - //Opus encoder requires packets be queued in the same order they were generated, so all of this must still be locked. - lock (_encoder) - { - for (int i = 0, pos = 0; i <= frames; i++, pos += frameSize) - { - if (i == frames) - { - //If there are no partial frames, skip this step - if (lastFrameSize == 0) - break; - - //Take the partial frame from the end of the buffer and put it at the start - Buffer.BlockCopy(data, pos, data, 0, lastFrameSize); - pos = 0; - - //Wipe the end of the buffer - for (int j = lastFrameSize; j < frameSize; j++) - data[j] = 0; - } - - //Encode the frame - int encodedLength = _encoder.EncodeFrame(data, pos, _encodingBuffer); - - //Copy result to the queue - payload = new byte[encodedLength]; - Buffer.BlockCopy(_encodingBuffer, 0, payload, 0, encodedLength); - - //Wait until the queue has a spot open - _sendQueueWait.Wait(_cancelToken); - _sendQueue.Enqueue(payload); - if (_sendQueue.Count >= _targetAudioBufferLength) - _sendQueueWait.Reset(); - _sendQueueEmptyWait.Reset(); - } - } - - /*if (_logLevel >= LogMessageSeverity.Debug) - RaiseOnLog(LogMessageSeverity.Debug, $"Queued {bytes} bytes for voice output.");*/ + _sendQueue.Push(data, bytes, _cancelToken); } public void ClearPCMFrames() { - _isClearing = true; - byte[] ignored; - while (_sendQueue.TryDequeue(out ignored)) { } - if (_logLevel >= LogMessageSeverity.Debug) - RaiseOnLog(LogMessageSeverity.Debug, "Cleared the voice buffer."); - _isClearing = false; + _sendQueue.Clear(_cancelToken); } private void SendIsTalking(bool value) @@ -609,7 +540,7 @@ namespace Discord.WebSockets.Voice public void WaitForQueue() { - _sendQueueEmptyWait.Wait(_cancelToken); + _sendQueue.Wait(_cancelToken); } public Task WaitForConnection(int timeout) {