diff --git a/src/Discord.Net/DiscordClient.cs b/src/Discord.Net/DiscordClient.cs index c7da4d21a..bf7e0622d 100644 --- a/src/Discord.Net/DiscordClient.cs +++ b/src/Discord.Net/DiscordClient.cs @@ -1328,6 +1328,12 @@ namespace Discord { _voiceWebSocket.SendPCMFrame(data, count); } + + /// Clears the PCM buffer. + public void ClearVoicePCM() + { + _voiceWebSocket.ClearPCMFrames(); + } #endif //Profile diff --git a/src/Discord.Net/DiscordVoiceSocket.cs b/src/Discord.Net/DiscordVoiceSocket.cs index a77dbdc72..14d84d971 100644 --- a/src/Discord.Net/DiscordVoiceSocket.cs +++ b/src/Discord.Net/DiscordVoiceSocket.cs @@ -1,4 +1,6 @@ -using Discord.API.Models; +//#define USE_THREAD + +using Discord.API.Models; using Newtonsoft.Json; using Newtonsoft.Json.Linq; using System; @@ -39,11 +41,14 @@ namespace Discord private ConcurrentQueue _sendQueue; private UdpClient _udp; private IPEndPoint _endpoint; - private bool _isReady; + private bool _isReady, _isClearing; private byte[] _secretKey; private string _myIp; private ushort _sequence; private string _mode; +#if USE_THREAD + private Thread _sendThread; +#endif #endif public DiscordVoiceSocket(DiscordClient client, int timeout, int interval) @@ -62,20 +67,31 @@ namespace Discord _udp = new UdpClient(new IPEndPoint(IPAddress.Any, 0)); _udp.AllowNatTraversal(true); _isReady = false; + _isClearing = false; } protected override void OnDisconnect() { _udp = null; - } +#if USE_THREAD + _sendThread.Join(); + _sendThread = null; +#endif + } #endif protected override Task[] CreateTasks() { - return new Task[] +#if !DNXCORE50 && USE_THREAD + _sendThread = new Thread(new ThreadStart(() => SendAsync(_disconnectToken))); + _sendThread.Start(); +#endif + return new Task[] { #if !DNXCORE50 ReceiveAsync(), +#if !USE_THREAD SendAsync(), +#endif #endif WatcherAsync() }.Concat(base.CreateTasks()).ToArray(); @@ -110,73 +126,109 @@ namespace Discord #if !DNXCORE50 private async Task ReceiveAsync() { - var cancelToken = _disconnectToken.Token; + var cancelSource = _disconnectToken; + var cancelToken = cancelSource.Token; + await Task.Yield(); + try { while (!cancelToken.IsCancellationRequested) { - var result = await _udp.ReceiveAsync(); + var result = await _udp.ReceiveAsync(); ProcessUdpMessage(result); } } catch { } - finally { _disconnectToken.Cancel(); } + finally { cancelSource.Cancel(); } } + +#if USE_THREAD + private void SendAsync(CancellationTokenSource cancelSource) + { + var cancelToken = cancelSource.Token; +#else private async Task SendAsync() { - var cancelToken = _disconnectToken.Token; + var cancelSource = _disconnectToken; + var cancelToken = cancelSource.Token; + await Task.Yield(); +#endif + Packet packet; try { + while (!cancelToken.IsCancellationRequested && !_isReady) + Thread.Sleep(1); + + if (cancelToken.IsCancellationRequested) + return; + uint timestamp = 0; double nextTicks = 0.0; - double ticksPerFrame = Stopwatch.Frequency / 1000.0 * _encoder.FrameLength; + double ticksPerMillisecond = Stopwatch.Frequency / 1000.0; + double ticksPerFrame = ticksPerMillisecond * _encoder.FrameLength; + double spinLockThreshold = 1.5 * ticksPerMillisecond; uint samplesPerFrame = (uint)_encoder.SamplesPerFrame; Stopwatch sw = Stopwatch.StartNew(); + + byte[] rtpPacket = new byte[4012]; + rtpPacket[0] = 0x80; //Flags; + rtpPacket[1] = 0x78; //Payload Type + rtpPacket[8] = (byte)((_ssrc >> 24) & 0xFF); + rtpPacket[9] = (byte)((_ssrc >> 16) & 0xFF); + rtpPacket[10] = (byte)((_ssrc >> 8) & 0xFF); + rtpPacket[11] = (byte)((_ssrc >> 0) & 0xFF); + while (!cancelToken.IsCancellationRequested) { - byte[] rtpPacket = new byte[4012]; - rtpPacket[0] = 0x80; //Flags; - rtpPacket[1] = 0x78; //Payload Type - rtpPacket[8] = (byte)((_ssrc >> 24) & 0xFF); - rtpPacket[9] = (byte)((_ssrc >> 16) & 0xFF); - rtpPacket[10] = (byte)((_ssrc >> 8) & 0xFF); - rtpPacket[11] = (byte)((_ssrc >> 0) & 0xFF); - - if (_isReady && sw.ElapsedTicks > nextTicks) + double ticksToNextFrame = nextTicks - sw.ElapsedTicks; + if (ticksToNextFrame <= 0.0) { while (sw.ElapsedTicks > nextTicks) { - if (_sendQueue.TryDequeue(out packet)) + if (!_isClearing) { - ushort sequence = unchecked(_sequence++); - rtpPacket[2] = (byte)((sequence >> 8) & 0xFF); - rtpPacket[3] = (byte)((sequence >> 0) & 0xFF); - rtpPacket[4] = (byte)((timestamp >> 24) & 0xFF); - rtpPacket[5] = (byte)((timestamp >> 16) & 0xFF); - rtpPacket[6] = (byte)((timestamp >> 8) & 0xFF); - rtpPacket[7] = (byte)((timestamp >> 0) & 0xFF); - Buffer.BlockCopy(packet.Data, 0, rtpPacket, 12, packet.Count); - await _udp.SendAsync(rtpPacket, packet.Count + 12); + if (_sendQueue.TryDequeue(out packet)) + { + ushort sequence = unchecked(_sequence++); + rtpPacket[2] = (byte)((sequence >> 8) & 0xFF); + rtpPacket[3] = (byte)((sequence >> 0) & 0xFF); + rtpPacket[4] = (byte)((timestamp >> 24) & 0xFF); + rtpPacket[5] = (byte)((timestamp >> 16) & 0xFF); + rtpPacket[6] = (byte)((timestamp >> 8) & 0xFF); + rtpPacket[7] = (byte)((timestamp >> 0) & 0xFF); + Buffer.BlockCopy(packet.Data, 0, rtpPacket, 12, packet.Count); +#if USE_THREAD + _udp.Send(rtpPacket, packet.Count + 12); +#else + await _udp.SendAsync(rtpPacket, packet.Count + 12); +#endif + } + timestamp = unchecked(timestamp + samplesPerFrame); + nextTicks += ticksPerFrame; } - timestamp = unchecked(timestamp + samplesPerFrame); - nextTicks += ticksPerFrame; } } - else + //Dont sleep for 1 millisecond if we need to output audio in the next 1.5 + else if (_sendQueue.Count == 0 || ticksToNextFrame >= spinLockThreshold) +#if USE_THREAD + Thread.Sleep(1); +#else await Task.Delay(1); +#endif } } catch { } - finally { _disconnectToken.Cancel(); } + finally { cancelSource.Cancel(); } } #endif - //Closes the UDP socket when _disconnectToken is triggered, since UDPClient doesn't allow passing a canceltoken + //Closes the UDP socket when _disconnectToken is triggered, since UDPClient doesn't allow passing a canceltoken private async Task WatcherAsync() { + var cancelToken = _disconnectToken.Token; try { - await Task.Delay(-1, _disconnectToken.Token); + await Task.Delay(-1, cancelToken); } catch (TaskCanceledException) { } #if !DNXCORE50 @@ -210,7 +262,7 @@ namespace Discord _sequence = (ushort)_rand.Next(0, ushort.MaxValue); //No thread issue here because SendAsync doesn't start until _isReady is true - _udp.Send(new byte[70] { + await _udp.SendAsync(new byte[70] { (byte)((_ssrc >> 24) & 0xFF), (byte)((_ssrc >> 16) & 0xFF), (byte)((_ssrc >> 8) & 0xFF), @@ -322,16 +374,26 @@ namespace Discord if (count != _encoder.FrameSize) throw new InvalidOperationException($"Invalid frame size. Got {count}, expected {_encoder.FrameSize}."); - byte[] payload = new byte[4000]; - int encodedLength = _encoder.EncodeFrame(data, payload); - - if (_mode == "xsalsa20_poly1305") + lock (_encoder) { - //TODO: Encode + byte[] payload = new byte[4000]; + int encodedLength = _encoder.EncodeFrame(data, payload); + + if (_mode == "xsalsa20_poly1305") + { + //TODO: Encode + } + + lock (_sendQueue) + _sendQueue.Enqueue(new Packet(payload, encodedLength)); } - - lock (_sendQueue) - _sendQueue.Enqueue(new Packet(payload, encodedLength)); + } + public void ClearPCMFrames() + { + _isClearing = true; + Packet ignored; + while (_sendQueue.TryDequeue(out ignored)) { } + _isClearing = false; } private void SendIsTalking(bool value) diff --git a/src/Discord.Net/DiscordWebSocket.cs b/src/Discord.Net/DiscordWebSocket.cs index a79af48f3..1104ab007 100644 --- a/src/Discord.Net/DiscordWebSocket.cs +++ b/src/Discord.Net/DiscordWebSocket.cs @@ -55,20 +55,20 @@ namespace Discord _lastHeartbeat = DateTime.UtcNow; _tasks = Task.Factory.ContinueWhenAll(CreateTasks(), x => { - //Do not clean up until both tasks have ended - _heartbeatInterval = 0; - _lastHeartbeat = DateTime.MinValue; - _webSocket.Dispose(); - _webSocket = null; + //Do not clean up until all tasks have ended + OnDisconnect(); + _disconnectToken.Dispose(); _disconnectToken = null; //Clear send queue + _heartbeatInterval = 0; + _lastHeartbeat = DateTime.MinValue; + _webSocket.Dispose(); + _webSocket = null; byte[] ignored; while (_sendQueue.TryDequeue(out ignored)) { } - OnDisconnect(); - if (_isConnected) { _isConnected = false; @@ -106,7 +106,10 @@ namespace Discord private async Task ReceiveAsync() { - var cancelToken = _disconnectToken.Token; + var cancelSource = _disconnectToken; + var cancelToken = cancelSource.Token; + await Task.Yield(); + var buffer = new byte[ReceiveChunkSize]; var builder = new StringBuilder(); @@ -117,7 +120,7 @@ namespace Discord WebSocketReceiveResult result; do { - result = await _webSocket.ReceiveAsync(new ArraySegment(buffer), _disconnectToken.Token); + result = await _webSocket.ReceiveAsync(new ArraySegment(buffer), cancelToken); if (result.MessageType == WebSocketMessageType.Close) { @@ -139,11 +142,14 @@ namespace Discord } } catch { } - finally { _disconnectToken.Cancel(); } + finally { cancelSource.Cancel(); } } private async Task SendAsync() { - var cancelToken = _disconnectToken.Token; + var cancelSource = _disconnectToken; + var cancelToken = cancelSource.Token; + await Task.Yield(); + try { byte[] bytes; @@ -164,7 +170,7 @@ namespace Discord } } catch { } - finally { _disconnectToken.Cancel(); } + finally { cancelSource.Cancel(); } } protected abstract Task ProcessMessage(string json);