diff --git a/src/Discord.Net/DiscordVoiceSocket.cs b/src/Discord.Net/DiscordVoiceSocket.cs index 9bc408c29..9aeaed41b 100644 --- a/src/Discord.Net/DiscordVoiceSocket.cs +++ b/src/Discord.Net/DiscordVoiceSocket.cs @@ -2,7 +2,7 @@ using Newtonsoft.Json; using Newtonsoft.Json.Linq; using System; -using System.Collections.Generic; +using System.Collections.Concurrent; using System.Diagnostics; using System.Linq; using System.Net; @@ -36,7 +36,7 @@ namespace Discord #if !DNXCORE50 private OpusEncoder _encoder; - private Queue _sendQueue; + private ConcurrentQueue _sendQueue; private UdpClient _udp; private IPEndPoint _endpoint; private bool _isReady; @@ -51,7 +51,7 @@ namespace Discord { _connectWaitOnLogin = new ManualResetEventSlim(false); #if !DNXCORE50 - _sendQueue = new Queue(); + _sendQueue = new ConcurrentQueue(); _encoder = new OpusEncoder(48000, 1, 20, Application.Audio); #endif } @@ -61,6 +61,7 @@ namespace Discord { _udp = new UdpClient(new IPEndPoint(IPAddress.Any, 0)); _udp.AllowNatTraversal(true); + _isReady = false; } protected override void OnDisconnect() { @@ -68,16 +69,16 @@ namespace Discord } #endif - protected override Task[] CreateTasks(CancellationToken cancelToken) + protected override Task[] CreateTasks() { return new Task[] { #if !DNXCORE50 - Task.Factory.StartNew(ReceiveAsync, cancelToken, TaskCreationOptions.LongRunning, TaskScheduler.Default).Result, - Task.Factory.StartNew(SendAsync, cancelToken, TaskCreationOptions.LongRunning, TaskScheduler.Default).Result, + ReceiveAsync(), + SendAsync(), #endif - Task.Factory.StartNew(WatcherAsync, cancelToken, TaskCreationOptions.LongRunning, TaskScheduler.Default).Result - }.Concat(base.CreateTasks(cancelToken)).ToArray(); + WatcherAsync() + }.Concat(base.CreateTasks()).ToArray(); } public async Task Login(string serverId, string userId, string sessionId, string token) @@ -124,24 +125,9 @@ namespace Discord private async Task SendAsync() { var cancelToken = _disconnectToken.Token; + Packet packet; try { - while (!cancelToken.IsCancellationRequested && !_isReady) - { - lock (_sendQueue) - { - while (_sendQueue.Count > 0) - { - var packet = _sendQueue.Dequeue(); - _udp.Send(packet.Data, packet.Count); - } - } - await Task.Delay(_sendInterval); - } - - if (cancelToken.IsCancellationRequested) - return; - uint timestamp = 0; double nextTicks = 0.0; double ticksPerFrame = Stopwatch.Frequency / 1000.0 * _encoder.FrameLength; @@ -157,38 +143,35 @@ namespace Discord rtpPacket[10] = (byte)((_ssrc >> 8) & 0xFF); rtpPacket[11] = (byte)((_ssrc >> 0) & 0xFF); - if (sw.ElapsedTicks > nextTicks) + if (_isReady && sw.ElapsedTicks > nextTicks) { - lock (_sendQueue) + while (sw.ElapsedTicks > nextTicks) { - while (sw.ElapsedTicks > nextTicks) + if (_sendQueue.TryDequeue(out packet)) { - if (_sendQueue.Count > 0) - { - var packet = _sendQueue.Dequeue(); - ushort sequence = unchecked(_sequence++); - rtpPacket[2] = (byte)((sequence >> 8) & 0xFF); - rtpPacket[3] = (byte)((sequence >> 0) & 0xFF); - rtpPacket[4] = (byte)((timestamp >> 24) & 0xFF); - rtpPacket[5] = (byte)((timestamp >> 16) & 0xFF); - rtpPacket[6] = (byte)((timestamp >> 8) & 0xFF); - rtpPacket[7] = (byte)((timestamp >> 0) & 0xFF); - Buffer.BlockCopy(packet.Data, 0, rtpPacket, 12, packet.Count); - _udp.Send(rtpPacket, packet.Count + 12); - } - timestamp = unchecked(timestamp + samplesPerFrame); - nextTicks += ticksPerFrame; + ushort sequence = unchecked(_sequence++); + rtpPacket[2] = (byte)((sequence >> 8) & 0xFF); + rtpPacket[3] = (byte)((sequence >> 0) & 0xFF); + rtpPacket[4] = (byte)((timestamp >> 24) & 0xFF); + rtpPacket[5] = (byte)((timestamp >> 16) & 0xFF); + rtpPacket[6] = (byte)((timestamp >> 8) & 0xFF); + rtpPacket[7] = (byte)((timestamp >> 0) & 0xFF); + Buffer.BlockCopy(packet.Data, 0, rtpPacket, 12, packet.Count); + await _udp.SendAsync(rtpPacket, packet.Count + 12); } + timestamp = unchecked(timestamp + samplesPerFrame); + nextTicks += ticksPerFrame; } } - /*else - await Task.Delay(1);*/ + else + await Task.Delay(1); } } catch { } finally { _disconnectToken.Cancel(); } } #endif + //Closes the UDP socket when _disconnectToken is triggered, since UDPClient doesn't allow passing a canceltoken private async Task WatcherAsync() { try @@ -212,31 +195,34 @@ namespace Discord { case 2: //READY { - var payload = (msg.Payload as JToken).ToObject(); - _heartbeatInterval = payload.HeartbeatInterval; - _ssrc = payload.SSRC; #if !DNXCORE50 - _endpoint = new IPEndPoint((await Dns.GetHostAddressesAsync(_host)).FirstOrDefault(), payload.Port); - //_mode = payload.Modes.LastOrDefault(); - _mode = "plain"; - _udp.Connect(_endpoint); + if (!_isReady) + { +#endif + var payload = (msg.Payload as JToken).ToObject(); + _heartbeatInterval = payload.HeartbeatInterval; + _ssrc = payload.SSRC; +#if !DNXCORE50 + _endpoint = new IPEndPoint((await Dns.GetHostAddressesAsync(_host)).FirstOrDefault(), payload.Port); + //_mode = payload.Modes.LastOrDefault(); + _mode = "plain"; + _udp.Connect(_endpoint); - lock (_rand) _sequence = (ushort)_rand.Next(0, ushort.MaxValue); - _isReady = false; - _sendQueue.Enqueue(new Packet(new byte[70] { - (byte)((_ssrc >> 24) & 0xFF), - (byte)((_ssrc >> 16) & 0xFF), - (byte)((_ssrc >> 8) & 0xFF), - (byte)((_ssrc >> 0) & 0xFF), - 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, - 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, - 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, - 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, - 0x0, 0x0, 0x0, 0x0, 0x0, 0x0 - }, 70)); + //No thread issue here because SendAsync doesn't start until _isReady is true + _udp.Send(new byte[70] { + (byte)((_ssrc >> 24) & 0xFF), + (byte)((_ssrc >> 16) & 0xFF), + (byte)((_ssrc >> 8) & 0xFF), + (byte)((_ssrc >> 0) & 0xFF), + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0 }, 70); + } #else - _connectWaitOnLogin.Set(); + _connectWaitOnLogin.Set(); #endif } break; @@ -246,7 +232,7 @@ namespace Discord var payload = (msg.Payload as JToken).ToObject(); _secretKey = payload.SecretKey; SendIsTalking(true); - _connectWaitOnLogin.Set(); + _connectWaitOnLogin.Set(); } break; #endif @@ -275,6 +261,7 @@ namespace Discord _myIp = Encoding.ASCII.GetString(buffer, 4, 70 - 6).TrimEnd('\0'); + _isReady = true; var login2 = new VoiceWebSocketCommands.Login2(); login2.Payload.Protocol = "udp"; login2.Payload.SocketData.Address = _myIp; diff --git a/src/Discord.Net/DiscordWebSocket.cs b/src/Discord.Net/DiscordWebSocket.cs index 02764fbd1..1408ac4f7 100644 --- a/src/Discord.Net/DiscordWebSocket.cs +++ b/src/Discord.Net/DiscordWebSocket.cs @@ -53,8 +53,7 @@ namespace Discord OnConnect(); _lastHeartbeat = DateTime.UtcNow; - _tasks = Task.WhenAll(CreateTasks(cancelToken)) - .ContinueWith(x => + _tasks = Task.Factory.ContinueWhenAll(CreateTasks(), x => { //Do not clean up until both tasks have ended _heartbeatInterval = 0; @@ -96,12 +95,12 @@ namespace Discord RaiseConnected(); } - protected virtual Task[] CreateTasks(CancellationToken cancelToken) + protected virtual Task[] CreateTasks() { return new Task[] { - Task.Factory.StartNew(ReceiveAsync, cancelToken, TaskCreationOptions.LongRunning, TaskScheduler.Default).Result, - Task.Factory.StartNew(SendAsync, cancelToken, TaskCreationOptions.LongRunning, TaskScheduler.Default).Result + ReceiveAsync(), + SendAsync() }; } @@ -132,8 +131,7 @@ namespace Discord while (!result.EndOfMessage); //TODO: Remove this - if (this is DiscordVoiceSocket) - System.Diagnostics.Debug.WriteLine(">>> " + builder.ToString()); + System.Diagnostics.Debug.WriteLine(">>> " + builder.ToString()); await ProcessMessage(builder.ToString()); builder.Clear(); @@ -161,7 +159,7 @@ namespace Discord } while (_sendQueue.TryDequeue(out bytes)) await SendMessage(bytes, cancelToken); - await Task.Delay(_sendInterval); + await Task.Delay(_sendInterval, cancelToken); } } catch { } @@ -174,16 +172,14 @@ namespace Discord protected void QueueMessage(object message) { //TODO: Remove this - if (this is DiscordVoiceSocket) - System.Diagnostics.Debug.WriteLine("<<< " + JsonConvert.SerializeObject(message)); + System.Diagnostics.Debug.WriteLine("<<< " + JsonConvert.SerializeObject(message)); var bytes = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message)); _sendQueue.Enqueue(bytes); } protected Task SendMessage(object message, CancellationToken cancelToken) { //TODO: Remove this - if (this is DiscordVoiceSocket) - System.Diagnostics.Debug.WriteLine("<<< " + JsonConvert.SerializeObject(message)); + System.Diagnostics.Debug.WriteLine("<<< " + JsonConvert.SerializeObject(message)); return SendMessage(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message)), cancelToken); } protected async Task SendMessage(byte[] message, CancellationToken cancelToken)