Browse Source

Added ClearVoiceBuffer, thread support (disabled currently), and some minor voice performance improvements

tags/docs-0.9
Brandon Smith 9 years ago
parent
commit
4438f3d35c
3 changed files with 129 additions and 55 deletions
  1. +6
    -0
      src/Discord.Net/DiscordClient.cs
  2. +105
    -43
      src/Discord.Net/DiscordVoiceSocket.cs
  3. +18
    -12
      src/Discord.Net/DiscordWebSocket.cs

+ 6
- 0
src/Discord.Net/DiscordClient.cs View File

@@ -1328,6 +1328,12 @@ namespace Discord
{ {
_voiceWebSocket.SendPCMFrame(data, count); _voiceWebSocket.SendPCMFrame(data, count);
} }

/// <summary> Clears the PCM buffer. </summary>
public void ClearVoicePCM()
{
_voiceWebSocket.ClearPCMFrames();
}
#endif #endif


//Profile //Profile


+ 105
- 43
src/Discord.Net/DiscordVoiceSocket.cs View File

@@ -1,4 +1,6 @@
using Discord.API.Models;
//#define USE_THREAD

using Discord.API.Models;
using Newtonsoft.Json; using Newtonsoft.Json;
using Newtonsoft.Json.Linq; using Newtonsoft.Json.Linq;
using System; using System;
@@ -39,11 +41,14 @@ namespace Discord
private ConcurrentQueue<Packet> _sendQueue; private ConcurrentQueue<Packet> _sendQueue;
private UdpClient _udp; private UdpClient _udp;
private IPEndPoint _endpoint; private IPEndPoint _endpoint;
private bool _isReady;
private bool _isReady, _isClearing;
private byte[] _secretKey; private byte[] _secretKey;
private string _myIp; private string _myIp;
private ushort _sequence; private ushort _sequence;
private string _mode; private string _mode;
#if USE_THREAD
private Thread _sendThread;
#endif
#endif #endif


public DiscordVoiceSocket(DiscordClient client, int timeout, int interval) public DiscordVoiceSocket(DiscordClient client, int timeout, int interval)
@@ -62,20 +67,31 @@ namespace Discord
_udp = new UdpClient(new IPEndPoint(IPAddress.Any, 0)); _udp = new UdpClient(new IPEndPoint(IPAddress.Any, 0));
_udp.AllowNatTraversal(true); _udp.AllowNatTraversal(true);
_isReady = false; _isReady = false;
_isClearing = false;
} }
protected override void OnDisconnect() protected override void OnDisconnect()
{ {
_udp = null; _udp = null;
}
#if USE_THREAD
_sendThread.Join();
_sendThread = null;
#endif
}
#endif #endif


protected override Task[] CreateTasks() protected override Task[] CreateTasks()
{ {
return new Task[]
#if !DNXCORE50 && USE_THREAD
_sendThread = new Thread(new ThreadStart(() => SendAsync(_disconnectToken)));
_sendThread.Start();
#endif
return new Task[]
{ {
#if !DNXCORE50 #if !DNXCORE50
ReceiveAsync(), ReceiveAsync(),
#if !USE_THREAD
SendAsync(), SendAsync(),
#endif
#endif #endif
WatcherAsync() WatcherAsync()
}.Concat(base.CreateTasks()).ToArray(); }.Concat(base.CreateTasks()).ToArray();
@@ -110,73 +126,109 @@ namespace Discord
#if !DNXCORE50 #if !DNXCORE50
private async Task ReceiveAsync() private async Task ReceiveAsync()
{ {
var cancelToken = _disconnectToken.Token;
var cancelSource = _disconnectToken;
var cancelToken = cancelSource.Token;
await Task.Yield();

try try
{ {
while (!cancelToken.IsCancellationRequested) while (!cancelToken.IsCancellationRequested)
{ {
var result = await _udp.ReceiveAsync();
var result = await _udp.ReceiveAsync();
ProcessUdpMessage(result); ProcessUdpMessage(result);
} }
} }
catch { } catch { }
finally { _disconnectToken.Cancel(); }
finally { cancelSource.Cancel(); }
} }

#if USE_THREAD
private void SendAsync(CancellationTokenSource cancelSource)
{
var cancelToken = cancelSource.Token;
#else
private async Task SendAsync() private async Task SendAsync()
{ {
var cancelToken = _disconnectToken.Token;
var cancelSource = _disconnectToken;
var cancelToken = cancelSource.Token;
await Task.Yield();
#endif

Packet packet; Packet packet;
try try
{ {
while (!cancelToken.IsCancellationRequested && !_isReady)
Thread.Sleep(1);

if (cancelToken.IsCancellationRequested)
return;

uint timestamp = 0; uint timestamp = 0;
double nextTicks = 0.0; double nextTicks = 0.0;
double ticksPerFrame = Stopwatch.Frequency / 1000.0 * _encoder.FrameLength;
double ticksPerMillisecond = Stopwatch.Frequency / 1000.0;
double ticksPerFrame = ticksPerMillisecond * _encoder.FrameLength;
double spinLockThreshold = 1.5 * ticksPerMillisecond;
uint samplesPerFrame = (uint)_encoder.SamplesPerFrame; uint samplesPerFrame = (uint)_encoder.SamplesPerFrame;
Stopwatch sw = Stopwatch.StartNew(); Stopwatch sw = Stopwatch.StartNew();

byte[] rtpPacket = new byte[4012];
rtpPacket[0] = 0x80; //Flags;
rtpPacket[1] = 0x78; //Payload Type
rtpPacket[8] = (byte)((_ssrc >> 24) & 0xFF);
rtpPacket[9] = (byte)((_ssrc >> 16) & 0xFF);
rtpPacket[10] = (byte)((_ssrc >> 8) & 0xFF);
rtpPacket[11] = (byte)((_ssrc >> 0) & 0xFF);

while (!cancelToken.IsCancellationRequested) while (!cancelToken.IsCancellationRequested)
{ {
byte[] rtpPacket = new byte[4012];
rtpPacket[0] = 0x80; //Flags;
rtpPacket[1] = 0x78; //Payload Type
rtpPacket[8] = (byte)((_ssrc >> 24) & 0xFF);
rtpPacket[9] = (byte)((_ssrc >> 16) & 0xFF);
rtpPacket[10] = (byte)((_ssrc >> 8) & 0xFF);
rtpPacket[11] = (byte)((_ssrc >> 0) & 0xFF);

if (_isReady && sw.ElapsedTicks > nextTicks)
double ticksToNextFrame = nextTicks - sw.ElapsedTicks;
if (ticksToNextFrame <= 0.0)
{ {
while (sw.ElapsedTicks > nextTicks) while (sw.ElapsedTicks > nextTicks)
{ {
if (_sendQueue.TryDequeue(out packet))
if (!_isClearing)
{ {
ushort sequence = unchecked(_sequence++);
rtpPacket[2] = (byte)((sequence >> 8) & 0xFF);
rtpPacket[3] = (byte)((sequence >> 0) & 0xFF);
rtpPacket[4] = (byte)((timestamp >> 24) & 0xFF);
rtpPacket[5] = (byte)((timestamp >> 16) & 0xFF);
rtpPacket[6] = (byte)((timestamp >> 8) & 0xFF);
rtpPacket[7] = (byte)((timestamp >> 0) & 0xFF);
Buffer.BlockCopy(packet.Data, 0, rtpPacket, 12, packet.Count);
await _udp.SendAsync(rtpPacket, packet.Count + 12);
if (_sendQueue.TryDequeue(out packet))
{
ushort sequence = unchecked(_sequence++);
rtpPacket[2] = (byte)((sequence >> 8) & 0xFF);
rtpPacket[3] = (byte)((sequence >> 0) & 0xFF);
rtpPacket[4] = (byte)((timestamp >> 24) & 0xFF);
rtpPacket[5] = (byte)((timestamp >> 16) & 0xFF);
rtpPacket[6] = (byte)((timestamp >> 8) & 0xFF);
rtpPacket[7] = (byte)((timestamp >> 0) & 0xFF);
Buffer.BlockCopy(packet.Data, 0, rtpPacket, 12, packet.Count);
#if USE_THREAD
_udp.Send(rtpPacket, packet.Count + 12);
#else
await _udp.SendAsync(rtpPacket, packet.Count + 12);
#endif
}
timestamp = unchecked(timestamp + samplesPerFrame);
nextTicks += ticksPerFrame;
} }
timestamp = unchecked(timestamp + samplesPerFrame);
nextTicks += ticksPerFrame;
} }
} }
else
//Dont sleep for 1 millisecond if we need to output audio in the next 1.5
else if (_sendQueue.Count == 0 || ticksToNextFrame >= spinLockThreshold)
#if USE_THREAD
Thread.Sleep(1);
#else
await Task.Delay(1); await Task.Delay(1);
#endif
} }
} }
catch { } catch { }
finally { _disconnectToken.Cancel(); }
finally { cancelSource.Cancel(); }
} }
#endif #endif
//Closes the UDP socket when _disconnectToken is triggered, since UDPClient doesn't allow passing a canceltoken
//Closes the UDP socket when _disconnectToken is triggered, since UDPClient doesn't allow passing a canceltoken
private async Task WatcherAsync() private async Task WatcherAsync()
{ {
var cancelToken = _disconnectToken.Token;
try try
{ {
await Task.Delay(-1, _disconnectToken.Token);
await Task.Delay(-1, cancelToken);
} }
catch (TaskCanceledException) { } catch (TaskCanceledException) { }
#if !DNXCORE50 #if !DNXCORE50
@@ -210,7 +262,7 @@ namespace Discord


_sequence = (ushort)_rand.Next(0, ushort.MaxValue); _sequence = (ushort)_rand.Next(0, ushort.MaxValue);
//No thread issue here because SendAsync doesn't start until _isReady is true //No thread issue here because SendAsync doesn't start until _isReady is true
_udp.Send(new byte[70] {
await _udp.SendAsync(new byte[70] {
(byte)((_ssrc >> 24) & 0xFF), (byte)((_ssrc >> 24) & 0xFF),
(byte)((_ssrc >> 16) & 0xFF), (byte)((_ssrc >> 16) & 0xFF),
(byte)((_ssrc >> 8) & 0xFF), (byte)((_ssrc >> 8) & 0xFF),
@@ -322,16 +374,26 @@ namespace Discord
if (count != _encoder.FrameSize) if (count != _encoder.FrameSize)
throw new InvalidOperationException($"Invalid frame size. Got {count}, expected {_encoder.FrameSize}."); throw new InvalidOperationException($"Invalid frame size. Got {count}, expected {_encoder.FrameSize}.");


byte[] payload = new byte[4000];
int encodedLength = _encoder.EncodeFrame(data, payload);

if (_mode == "xsalsa20_poly1305")
lock (_encoder)
{ {
//TODO: Encode
byte[] payload = new byte[4000];
int encodedLength = _encoder.EncodeFrame(data, payload);

if (_mode == "xsalsa20_poly1305")
{
//TODO: Encode
}

lock (_sendQueue)
_sendQueue.Enqueue(new Packet(payload, encodedLength));
} }
lock (_sendQueue)
_sendQueue.Enqueue(new Packet(payload, encodedLength));
}
public void ClearPCMFrames()
{
_isClearing = true;
Packet ignored;
while (_sendQueue.TryDequeue(out ignored)) { }
_isClearing = false;
} }


private void SendIsTalking(bool value) private void SendIsTalking(bool value)


+ 18
- 12
src/Discord.Net/DiscordWebSocket.cs View File

@@ -55,20 +55,20 @@ namespace Discord
_lastHeartbeat = DateTime.UtcNow; _lastHeartbeat = DateTime.UtcNow;
_tasks = Task.Factory.ContinueWhenAll(CreateTasks(), x => _tasks = Task.Factory.ContinueWhenAll(CreateTasks(), x =>
{ {
//Do not clean up until both tasks have ended
_heartbeatInterval = 0;
_lastHeartbeat = DateTime.MinValue;
_webSocket.Dispose();
_webSocket = null;
//Do not clean up until all tasks have ended
OnDisconnect();

_disconnectToken.Dispose(); _disconnectToken.Dispose();
_disconnectToken = null; _disconnectToken = null;


//Clear send queue //Clear send queue
_heartbeatInterval = 0;
_lastHeartbeat = DateTime.MinValue;
_webSocket.Dispose();
_webSocket = null;
byte[] ignored; byte[] ignored;
while (_sendQueue.TryDequeue(out ignored)) { } while (_sendQueue.TryDequeue(out ignored)) { }


OnDisconnect();

if (_isConnected) if (_isConnected)
{ {
_isConnected = false; _isConnected = false;
@@ -106,7 +106,10 @@ namespace Discord


private async Task ReceiveAsync() private async Task ReceiveAsync()
{ {
var cancelToken = _disconnectToken.Token;
var cancelSource = _disconnectToken;
var cancelToken = cancelSource.Token;
await Task.Yield();

var buffer = new byte[ReceiveChunkSize]; var buffer = new byte[ReceiveChunkSize];
var builder = new StringBuilder(); var builder = new StringBuilder();


@@ -117,7 +120,7 @@ namespace Discord
WebSocketReceiveResult result; WebSocketReceiveResult result;
do do
{ {
result = await _webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), _disconnectToken.Token);
result = await _webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), cancelToken);


if (result.MessageType == WebSocketMessageType.Close) if (result.MessageType == WebSocketMessageType.Close)
{ {
@@ -139,11 +142,14 @@ namespace Discord
} }
} }
catch { } catch { }
finally { _disconnectToken.Cancel(); }
finally { cancelSource.Cancel(); }
} }
private async Task SendAsync() private async Task SendAsync()
{ {
var cancelToken = _disconnectToken.Token;
var cancelSource = _disconnectToken;
var cancelToken = cancelSource.Token;
await Task.Yield();

try try
{ {
byte[] bytes; byte[] bytes;
@@ -164,7 +170,7 @@ namespace Discord
} }
} }
catch { } catch { }
finally { _disconnectToken.Cancel(); }
finally { cancelSource.Cancel(); }
} }


protected abstract Task ProcessMessage(string json); protected abstract Task ProcessMessage(string json);


Loading…
Cancel
Save