diff --git a/src/Discord.Net.WebSocket/API/Voice/SpeakingEvent.cs b/src/Discord.Net.WebSocket/API/Voice/SpeakingEvent.cs new file mode 100644 index 000000000..0272a8f53 --- /dev/null +++ b/src/Discord.Net.WebSocket/API/Voice/SpeakingEvent.cs @@ -0,0 +1,15 @@ +#pragma warning disable CS1591 +using Newtonsoft.Json; + +namespace Discord.API.Voice +{ + internal class SpeakingEvent + { + [JsonProperty("user_id")] + public ulong UserId { get; set; } + [JsonProperty("ssrc")] + public uint Ssrc { get; set; } + [JsonProperty("speaking")] + public bool Speaking { get; set; } + } +} diff --git a/src/Discord.Net.WebSocket/Audio/AudioClient.cs b/src/Discord.Net.WebSocket/Audio/AudioClient.cs index e2586d0f3..9fbfc348e 100644 --- a/src/Discord.Net.WebSocket/Audio/AudioClient.cs +++ b/src/Discord.Net.WebSocket/Audio/AudioClient.cs @@ -17,6 +17,18 @@ namespace Discord.Audio //TODO: Add audio reconnecting internal class AudioClient : IAudioClient, IDisposable { + internal struct StreamPair + { + public AudioInStream Reader; + public AudioOutStream Writer; + + public StreamPair(AudioInStream reader, AudioOutStream writer) + { + Reader = reader; + Writer = writer; + } + } + public event Func Connected { add { _connectedEvent.Add(value); } @@ -41,6 +53,8 @@ namespace Discord.Audio private readonly ConnectionManager _connection; private readonly SemaphoreSlim _stateLock; private readonly ConcurrentQueue _heartbeatTimes; + private readonly ConcurrentDictionary _ssrcMap; + private readonly ConcurrentDictionary _streams; private Task _heartbeatTask; private long _lastMessageTime; @@ -75,6 +89,8 @@ namespace Discord.Audio _connection.Connected += () => _connectedEvent.InvokeAsync(); _connection.Disconnected += (ex, recon) => _disconnectedEvent.InvokeAsync(ex); _heartbeatTimes = new ConcurrentQueue(); + _ssrcMap = new ConcurrentDictionary(); + _streams = new ConcurrentDictionary(); _serializer = new JsonSerializer { ContractResolver = new DiscordContractResolver() }; _serializer.Error += (s, e) => @@ -166,6 +182,35 @@ namespace Discord.Audio throw new ArgumentException("Value must be 120, 240, 480, 960, 1920 or 2880", nameof(samplesPerFrame)); } + internal void CreateInputStream(ulong userId) + { + //Assume Thread-safe + if (!_streams.ContainsKey(userId)) + { + var readerStream = new InputStream(); + var writerStream = new OpusDecodeStream(new RTPReadStream(readerStream, _secretKey)); + _streams.TryAdd(userId, new StreamPair(readerStream, writerStream)); + } + } + internal AudioInStream GetInputStream(ulong id) + { + StreamPair streamPair; + if (_streams.TryGetValue(id, out streamPair)) + return streamPair.Reader; + return null; + } + internal void RemoveInputStream(ulong userId) + { + _streams.TryRemove(userId, out var ignored); + } + internal void ClearInputStreams() + { + foreach (var pair in _streams.Values) + pair.Reader.Dispose(); + _ssrcMap.Clear(); + _streams.Clear(); + } + private async Task ProcessMessageAsync(VoiceOpCode opCode, object payload) { _lastMessageTime = Environment.TickCount; @@ -219,6 +264,14 @@ namespace Discord.Audio } } break; + case VoiceOpCode.Speaking: + { + await _audioLogger.DebugAsync("Received Speaking").ConfigureAwait(false); + + var data = (payload as JToken).ToObject(_serializer); + _ssrcMap[data.Ssrc] = data.UserId; //TODO: Memory Leak: SSRCs are never cleaned up + } + break; default: await _audioLogger.WarningAsync($"Unknown OpCode ({opCode})").ConfigureAwait(false); return; @@ -234,19 +287,56 @@ namespace Discord.Audio { if (!_connection.IsCompleted) { - if (packet.Length == 70) + if (packet.Length != 70) { - string ip; - int port; - try - { - ip = Encoding.UTF8.GetString(packet, 4, 70 - 6).TrimEnd('\0'); - port = packet[69] | (packet[68] << 8); - } - catch { return; } - - await _audioLogger.DebugAsync("Received Discovery").ConfigureAwait(false); - await ApiClient.SendSelectProtocol(ip, port).ConfigureAwait(false); + await _audioLogger.DebugAsync($"Malformed Packet").ConfigureAwait(false); + return; + } + string ip; + int port; + try + { + ip = Encoding.UTF8.GetString(packet, 4, 70 - 6).TrimEnd('\0'); + port = packet[69] | (packet[68] << 8); + } + catch (Exception ex) + { + await _audioLogger.DebugAsync($"Malformed Packet", ex).ConfigureAwait(false); + return; + } + + await _audioLogger.DebugAsync("Received Discovery").ConfigureAwait(false); + await ApiClient.SendSelectProtocol(ip, port).ConfigureAwait(false); + } + else + { + uint ssrc; + ulong userId; + StreamPair pair; + + if (!RTPReadStream.TryReadSsrc(packet, 0, out ssrc)) + { + await _audioLogger.DebugAsync($"Malformed Frame").ConfigureAwait(false); + return; + } + if (!_ssrcMap.TryGetValue(ssrc, out userId)) + { + await _audioLogger.DebugAsync($"Unknown SSRC {ssrc}").ConfigureAwait(false); + return; + } + if (!_streams.TryGetValue(userId, out pair)) + { + await _audioLogger.DebugAsync($"Unknown User {userId}").ConfigureAwait(false); + return; + } + try + { + await pair.Writer.WriteAsync(packet, 0, packet.Length).ConfigureAwait(false); + await _audioLogger.DebugAsync($"Received {packet.Length} bytes from user {userId}").ConfigureAwait(false); + } + catch (Exception ex) + { + await _audioLogger.DebugAsync($"Malformed Frame", ex).ConfigureAwait(false); } } } diff --git a/src/Discord.Net.WebSocket/Audio/AudioMode.cs b/src/Discord.Net.WebSocket/Audio/AudioMode.cs deleted file mode 100644 index 7cc5a08c1..000000000 --- a/src/Discord.Net.WebSocket/Audio/AudioMode.cs +++ /dev/null @@ -1,13 +0,0 @@ -using System; - -namespace Discord.Audio -{ - [Flags] - public enum AudioMode : byte - { - Disabled = 0, - Outgoing = 1, - Incoming = 2, - Both = Outgoing | Incoming - } -} diff --git a/src/Discord.Net.WebSocket/Audio/Streams/BufferedWriteStream.cs b/src/Discord.Net.WebSocket/Audio/Streams/BufferedWriteStream.cs index dcd053cc1..3040da855 100644 --- a/src/Discord.Net.WebSocket/Audio/Streams/BufferedWriteStream.cs +++ b/src/Discord.Net.WebSocket/Audio/Streams/BufferedWriteStream.cs @@ -59,9 +59,6 @@ namespace Discord.Audio.Streams { return Task.Run(async () => { -#if DEBUG - uint num = 0; -#endif try { while (!_isPreloaded && !_cancelToken.IsCancellationRequested) @@ -82,7 +79,7 @@ namespace Discord.Audio.Streams _queueLock.Release(); nextTick += _ticksPerFrame; #if DEBUG - var _ = _logger.DebugAsync($"{num++}: Sent {frame.Bytes} bytes ({_queuedFrames.Count} frames buffered)"); + var _ = _logger.DebugAsync($"Sent {frame.Bytes} bytes ({_queuedFrames.Count} frames buffered)"); #endif } else @@ -93,7 +90,7 @@ namespace Discord.Audio.Streams nextTick += _ticksPerFrame; } #if DEBUG - var _ = _logger.DebugAsync($"{num++}: Buffer underrun"); + var _ = _logger.DebugAsync($"Buffer underrun"); #endif } } diff --git a/src/Discord.Net.WebSocket/Audio/Streams/InputStream.cs b/src/Discord.Net.WebSocket/Audio/Streams/InputStream.cs index d46db128b..f10638bba 100644 --- a/src/Discord.Net.WebSocket/Audio/Streams/InputStream.cs +++ b/src/Discord.Net.WebSocket/Audio/Streams/InputStream.cs @@ -12,12 +12,13 @@ namespace Discord.Audio.Streams private ushort _nextSeq; private uint _nextTimestamp; private bool _hasHeader; + private bool _isDisposed; - public override bool CanRead => true; + public override bool CanRead => !_isDisposed; public override bool CanSeek => false; - public override bool CanWrite => true; + public override bool CanWrite => false; - public InputStream(byte[] secretKey) + public InputStream() { _frames = new ConcurrentQueue(); } @@ -54,10 +55,13 @@ namespace Discord.Audio.Streams { cancelToken.ThrowIfCancellationRequested(); - if (_frames.Count > 1000) + if (_frames.Count > 100) //1-2 seconds + { + _hasHeader = false; return Task.Delay(0); //Buffer overloaded - if (_hasHeader) - throw new InvalidOperationException("Received payload with an RTP header"); + } + if (!_hasHeader) + throw new InvalidOperationException("Received payload without an RTP header"); byte[] payload = new byte[count]; Buffer.BlockCopy(buffer, offset, payload, 0, count); @@ -69,5 +73,10 @@ namespace Discord.Audio.Streams _hasHeader = false; return Task.Delay(0); } + + protected override void Dispose(bool isDisposing) + { + _isDisposed = true; + } } } diff --git a/src/Discord.Net.WebSocket/Audio/Streams/OpusDecodeStream.cs b/src/Discord.Net.WebSocket/Audio/Streams/OpusDecodeStream.cs index 9df553bfe..2dc5a8781 100644 --- a/src/Discord.Net.WebSocket/Audio/Streams/OpusDecodeStream.cs +++ b/src/Discord.Net.WebSocket/Audio/Streams/OpusDecodeStream.cs @@ -6,15 +6,17 @@ namespace Discord.Audio.Streams /// Converts Opus to PCM public class OpusDecodeStream : AudioOutStream { + public const int SampleRate = OpusEncodeStream.SampleRate; + private readonly AudioOutStream _next; private readonly byte[] _buffer; private readonly OpusDecoder _decoder; - public OpusDecodeStream(AudioOutStream next, int samplingRate, int channels = OpusConverter.MaxChannels, int bufferSize = 4000) + public OpusDecodeStream(AudioOutStream next, int channels = OpusConverter.MaxChannels, int bufferSize = 4000) { _next = next; _buffer = new byte[bufferSize]; - _decoder = new OpusDecoder(samplingRate, channels); + _decoder = new OpusDecoder(SampleRate, channels); } public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) diff --git a/src/Discord.Net.WebSocket/Audio/Streams/RTPReadStream.cs b/src/Discord.Net.WebSocket/Audio/Streams/RTPReadStream.cs index 9a57612bf..b4aad9430 100644 --- a/src/Discord.Net.WebSocket/Audio/Streams/RTPReadStream.cs +++ b/src/Discord.Net.WebSocket/Audio/Streams/RTPReadStream.cs @@ -31,11 +31,14 @@ namespace Discord.Audio.Streams { cancelToken.ThrowIfCancellationRequested(); + if (buffer[offset + 0] != 0x80 || buffer[offset + 1] != 0x78) + return; + var payload = new byte[count - 12]; Buffer.BlockCopy(buffer, offset + 12, payload, 0, count - 12); - ushort seq = (ushort)((buffer[offset + 3] << 8) | - (buffer[offset + 2] << 0)); + ushort seq = (ushort)((buffer[offset + 2] << 8) | + (buffer[offset + 3] << 0)); uint timestamp = (uint)((buffer[offset + 4] << 24) | (buffer[offset + 5] << 16) | @@ -45,5 +48,20 @@ namespace Discord.Audio.Streams _queue.WriteHeader(seq, timestamp); await (_next ?? _queue as Stream).WriteAsync(buffer, offset, count, cancelToken).ConfigureAwait(false); } + + public static bool TryReadSsrc(byte[] buffer, int offset, out uint ssrc) + { + if (buffer.Length - offset < 12) + { + ssrc = 0; + return false; + } + + ssrc = (uint)((buffer[offset + 8] << 24) | + (buffer[offset + 9] << 16) | + (buffer[offset + 10] << 16) | + (buffer[offset + 11] << 0)); + return true; + } } } diff --git a/src/Discord.Net.WebSocket/DiscordSocketClient.cs b/src/Discord.Net.WebSocket/DiscordSocketClient.cs index 4c713c956..4f2f70321 100644 --- a/src/Discord.Net.WebSocket/DiscordSocketClient.cs +++ b/src/Discord.Net.WebSocket/DiscordSocketClient.cs @@ -1,6 +1,5 @@ using Discord.API; using Discord.API.Gateway; -using Discord.Audio; using Discord.Logging; using Discord.Net.Converters; using Discord.Net.Udp; @@ -54,7 +53,6 @@ namespace Discord.WebSocket internal int TotalShards { get; private set; } internal int MessageCacheSize { get; private set; } internal int LargeThreshold { get; private set; } - internal AudioMode AudioMode { get; private set; } internal ClientState State { get; private set; } internal UdpSocketProvider UdpSocketProvider { get; private set; } internal WebSocketProvider WebSocketProvider { get; private set; } @@ -82,7 +80,6 @@ namespace Discord.WebSocket TotalShards = config.TotalShards ?? 1; MessageCacheSize = config.MessageCacheSize; LargeThreshold = config.LargeThreshold; - AudioMode = config.AudioMode; UdpSocketProvider = config.UdpSocketProvider; WebSocketProvider = config.WebSocketProvider; AlwaysDownloadUsers = config.AlwaysDownloadUsers; @@ -520,7 +517,7 @@ namespace Discord.WebSocket await _gatewayLogger.InfoAsync("Resumed previous session").ConfigureAwait(false); } - return; + break; //Guilds case "GUILD_CREATE": @@ -605,7 +602,7 @@ namespace Discord.WebSocket return; } } - return; + break; case "GUILD_SYNC": { await _gatewayLogger.DebugAsync("Received Dispatch (GUILD_SYNC)").ConfigureAwait(false); @@ -627,7 +624,7 @@ namespace Discord.WebSocket return; } } - return; + break; case "GUILD_DELETE": { var data = (payload as JToken).ToObject(_serializer); @@ -1217,8 +1214,8 @@ namespace Discord.WebSocket await _gatewayLogger.WarningAsync("MESSAGE_REACTION_ADD referenced an unknown channel.").ConfigureAwait(false); return; } - break; } + break; case "MESSAGE_REACTION_REMOVE": { await _gatewayLogger.DebugAsync("Received Dispatch (MESSAGE_REACTION_REMOVE)").ConfigureAwait(false); @@ -1242,8 +1239,8 @@ namespace Discord.WebSocket await _gatewayLogger.WarningAsync("MESSAGE_REACTION_REMOVE referenced an unknown channel.").ConfigureAwait(false); return; } - break; } + break; case "MESSAGE_REACTION_REMOVE_ALL": { await _gatewayLogger.DebugAsync("Received Dispatch (MESSAGE_REACTION_REMOVE_ALL)").ConfigureAwait(false); @@ -1265,8 +1262,8 @@ namespace Discord.WebSocket await _gatewayLogger.WarningAsync("MESSAGE_REACTION_REMOVE_ALL referenced an unknown channel.").ConfigureAwait(false); return; } - break; } + break; case "MESSAGE_DELETE_BULK": { await _gatewayLogger.DebugAsync("Received Dispatch (MESSAGE_DELETE_BULK)").ConfigureAwait(false); @@ -1447,10 +1444,9 @@ namespace Discord.WebSocket } break; case "VOICE_SERVER_UPDATE": - await _gatewayLogger.DebugAsync("Received Dispatch (VOICE_SERVER_UPDATE)").ConfigureAwait(false); - - if (AudioMode != AudioMode.Disabled) { + await _gatewayLogger.DebugAsync("Received Dispatch (VOICE_SERVER_UPDATE)").ConfigureAwait(false); + var data = (payload as JToken).ToObject(_serializer); var guild = State.GetGuild(data.GuildId); if (guild != null) @@ -1464,7 +1460,7 @@ namespace Discord.WebSocket return; } } - return; + break; //Ignored (User only) case "CHANNEL_PINS_ACK": @@ -1475,32 +1471,31 @@ namespace Discord.WebSocket break; case "GUILD_INTEGRATIONS_UPDATE": await _gatewayLogger.DebugAsync("Ignored Dispatch (GUILD_INTEGRATIONS_UPDATE)").ConfigureAwait(false); - return; + break; case "MESSAGE_ACK": await _gatewayLogger.DebugAsync("Ignored Dispatch (MESSAGE_ACK)").ConfigureAwait(false); - return; + break; case "USER_SETTINGS_UPDATE": await _gatewayLogger.DebugAsync("Ignored Dispatch (USER_SETTINGS_UPDATE)").ConfigureAwait(false); - return; + break; case "WEBHOOKS_UPDATE": await _gatewayLogger.DebugAsync("Ignored Dispatch (WEBHOOKS_UPDATE)").ConfigureAwait(false); - return; + break; //Others default: await _gatewayLogger.WarningAsync($"Unknown Dispatch ({type})").ConfigureAwait(false); - return; + break; } break; default: await _gatewayLogger.WarningAsync($"Unknown OpCode ({opCode})").ConfigureAwait(false); - return; + break; } } catch (Exception ex) { await _gatewayLogger.ErrorAsync($"Error handling {opCode}{(type != null ? $" ({type})" : "")}", ex).ConfigureAwait(false); - return; } } diff --git a/src/Discord.Net.WebSocket/DiscordSocketConfig.cs b/src/Discord.Net.WebSocket/DiscordSocketConfig.cs index f42744c79..9ef030d72 100644 --- a/src/Discord.Net.WebSocket/DiscordSocketConfig.cs +++ b/src/Discord.Net.WebSocket/DiscordSocketConfig.cs @@ -1,5 +1,4 @@ -using Discord.Audio; -using Discord.Net.Udp; +using Discord.Net.Udp; using Discord.Net.WebSockets; using Discord.Rest; @@ -27,9 +26,6 @@ namespace Discord.WebSocket /// public int LargeThreshold { get; set; } = 250; - /// Gets or sets the type of audio this DiscordClient supports. - public AudioMode AudioMode { get; set; } = AudioMode.Disabled; - /// Gets or sets the provider used to generate new websocket connections. public WebSocketProvider WebSocketProvider { get; set; } /// Gets or sets the provider used to generate new udp sockets. diff --git a/src/Discord.Net.WebSocket/Entities/Channels/SocketVoiceChannel.cs b/src/Discord.Net.WebSocket/Entities/Channels/SocketVoiceChannel.cs index 71017a7c8..9ec0da72a 100644 --- a/src/Discord.Net.WebSocket/Entities/Channels/SocketVoiceChannel.cs +++ b/src/Discord.Net.WebSocket/Entities/Channels/SocketVoiceChannel.cs @@ -42,13 +42,7 @@ namespace Discord.WebSocket public async Task ConnectAsync() { - var audioMode = Discord.AudioMode; - if (audioMode == AudioMode.Disabled) - throw new InvalidOperationException($"Audio is not enabled on this client, {nameof(DiscordSocketConfig.AudioMode)} in {nameof(DiscordSocketConfig)} must be set."); - - return await Guild.ConnectAudioAsync(Id, - (audioMode & AudioMode.Incoming) == 0, - (audioMode & AudioMode.Outgoing) == 0).ConfigureAwait(false); + return await Guild.ConnectAudioAsync(Id, false, false).ConfigureAwait(false); } public override SocketGuildUser GetUser(ulong id) diff --git a/src/Discord.Net.WebSocket/Entities/Guilds/SocketGuild.cs b/src/Discord.Net.WebSocket/Entities/Guilds/SocketGuild.cs index 141d777b7..be63f4da2 100644 --- a/src/Discord.Net.WebSocket/Entities/Guilds/SocketGuild.cs +++ b/src/Discord.Net.WebSocket/Entities/Guilds/SocketGuild.cs @@ -426,9 +426,23 @@ namespace Discord.WebSocket internal SocketVoiceState AddOrUpdateVoiceState(ClientState state, VoiceStateModel model) { var voiceChannel = state.GetChannel(model.ChannelId.Value) as SocketVoiceChannel; - var voiceState = SocketVoiceState.Create(voiceChannel, model); - _voiceStates[model.UserId] = voiceState; - return voiceState; + var before = GetVoiceState(model.UserId) ?? SocketVoiceState.Default; + var after = SocketVoiceState.Create(voiceChannel, model); + _voiceStates[model.UserId] = after; + + if (before.VoiceChannel?.Id != after.VoiceChannel?.Id) + { + if (model.UserId == CurrentUser.Id) + RepopulateAudioStreams(); + else + { + _audioClient?.RemoveInputStream(model.UserId); //User changed channels, end their stream + if (CurrentUser.VoiceChannel != null && after.VoiceChannel?.Id == CurrentUser.VoiceChannel?.Id) + _audioClient.CreateInputStream(model.UserId); + } + } + + return after; } internal SocketVoiceState? GetVoiceState(ulong id) { @@ -446,6 +460,10 @@ namespace Discord.WebSocket } //Audio + internal AudioInStream GetAudioStream(ulong userId) + { + return _audioClient?.GetInputStream(userId); + } internal async Task ConnectAudioAsync(ulong channelId, bool selfDeaf, bool selfMute) { selfDeaf = false; @@ -531,6 +549,7 @@ namespace Discord.WebSocket } }; _audioClient = audioClient; + RepopulateAudioStreams(); } _audioClient.Connected += () => { @@ -554,6 +573,22 @@ namespace Discord.WebSocket } } + internal void RepopulateAudioStreams() + { + if (_audioClient != null) + { + _audioClient.ClearInputStreams(); //We changed channels, end all current streams + if (CurrentUser.VoiceChannel != null) + { + foreach (var pair in _voiceStates) + { + if (pair.Value.VoiceChannel?.Id == CurrentUser.VoiceChannel?.Id) + _audioClient.CreateInputStream(pair.Key); + } + } + } + } + public override string ToString() => Name; private string DebuggerDisplay => $"{Name} ({Id})"; internal SocketGuild Clone() => MemberwiseClone() as SocketGuild; diff --git a/src/Discord.Net.WebSocket/Entities/Users/SocketGuildUser.cs b/src/Discord.Net.WebSocket/Entities/Users/SocketGuildUser.cs index 5162839d7..1b2e10332 100644 --- a/src/Discord.Net.WebSocket/Entities/Users/SocketGuildUser.cs +++ b/src/Discord.Net.WebSocket/Entities/Users/SocketGuildUser.cs @@ -1,4 +1,5 @@ -using Discord.Rest; +using Discord.Audio; +using Discord.Rest; using System; using System.Collections.Generic; using System.Collections.Immutable; @@ -37,6 +38,7 @@ namespace Discord.WebSocket public SocketVoiceChannel VoiceChannel => VoiceState?.VoiceChannel; public string VoiceSessionId => VoiceState?.VoiceSessionId ?? ""; public SocketVoiceState? VoiceState => Guild.GetVoiceState(Id); + public AudioInStream AudioStream => Guild.GetAudioStream(Id); /// The position of the user within the role hirearchy. /// The returned value equal to the position of the highest role the user has,