Browse Source

Started adding IAudioClient incoming stream creation/destruction events

tags/1.0-rc
RogueException 8 years ago
parent
commit
158ce0f922
13 changed files with 94 additions and 58 deletions
  1. +3
    -1
      src/Discord.Net.Core/Audio/IAudioClient.cs
  2. +7
    -1
      src/Discord.Net.Core/Entities/Channels/IAudioChannel.cs
  3. +1
    -4
      src/Discord.Net.Core/Entities/Channels/IVoiceChannel.cs
  4. +5
    -1
      src/Discord.Net.Rest/Entities/Channels/RestGroupChannel.cs
  5. +2
    -2
      src/Discord.Net.Rest/Entities/Channels/RestVoiceChannel.cs
  6. +5
    -1
      src/Discord.Net.Rpc/Entities/Channels/RpcGroupChannel.cs
  7. +2
    -2
      src/Discord.Net.Rpc/Entities/Channels/RpcVoiceChannel.cs
  8. +23
    -6
      src/Discord.Net.WebSocket/Audio/AudioClient.cs
  9. +4
    -2
      src/Discord.Net.WebSocket/DiscordSocketClient.cs
  10. +0
    -1
      src/Discord.Net.WebSocket/Entities/Channels/ISocketAudioChannel.cs
  11. +3
    -0
      src/Discord.Net.WebSocket/Entities/Channels/SocketGroupChannel.cs
  12. +2
    -2
      src/Discord.Net.WebSocket/Entities/Channels/SocketVoiceChannel.cs
  13. +37
    -35
      src/Discord.Net.WebSocket/Entities/Guilds/SocketGuild.cs

+ 3
- 1
src/Discord.Net.Core/Audio/IAudioClient.cs View File

@@ -8,7 +8,9 @@ namespace Discord.Audio
event Func<Task> Connected; event Func<Task> Connected;
event Func<Exception, Task> Disconnected; event Func<Exception, Task> Disconnected;
event Func<int, int, Task> LatencyUpdated; event Func<int, int, Task> LatencyUpdated;
event Func<ulong, AudioInStream, Task> StreamCreated;
event Func<ulong, Task> StreamDestroyed;

/// <summary> Gets the current connection state of this client. </summary> /// <summary> Gets the current connection state of this client. </summary>
ConnectionState ConnectionState { get; } ConnectionState ConnectionState { get; }
/// <summary> Gets the estimated round-trip latency, in milliseconds, to the gateway server. </summary> /// <summary> Gets the estimated round-trip latency, in milliseconds, to the gateway server. </summary>


+ 7
- 1
src/Discord.Net.Core/Entities/Channels/IAudioChannel.cs View File

@@ -1,6 +1,12 @@
namespace Discord
using Discord.Audio;
using System;
using System.Threading.Tasks;

namespace Discord
{ {
public interface IAudioChannel : IChannel public interface IAudioChannel : IChannel
{ {
/// <summary> Connects to this audio channel. </summary>
Task<IAudioClient> ConnectAsync(Action<IAudioClient> configAction = null);
} }
} }

+ 1
- 4
src/Discord.Net.Core/Entities/Channels/IVoiceChannel.cs View File

@@ -1,5 +1,4 @@
using Discord.Audio;
using System;
using System;
using System.Threading.Tasks; using System.Threading.Tasks;


namespace Discord namespace Discord
@@ -13,7 +12,5 @@ namespace Discord


/// <summary> Modifies this voice channel. </summary> /// <summary> Modifies this voice channel. </summary>
Task ModifyAsync(Action<VoiceChannelProperties> func, RequestOptions options = null); Task ModifyAsync(Action<VoiceChannelProperties> func, RequestOptions options = null);
/// <summary> Connects to this voice channel. </summary>
Task<IAudioClient> ConnectAsync();
} }
} }

+ 5
- 1
src/Discord.Net.Rest/Entities/Channels/RestGroupChannel.cs View File

@@ -1,4 +1,5 @@
using System;
using Discord.Audio;
using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Collections.Immutable; using System.Collections.Immutable;
using System.Diagnostics; using System.Diagnostics;
@@ -145,6 +146,9 @@ namespace Discord.Rest
IDisposable IMessageChannel.EnterTypingState(RequestOptions options) IDisposable IMessageChannel.EnterTypingState(RequestOptions options)
=> EnterTypingState(options); => EnterTypingState(options);


//IAudioChannel
Task<IAudioClient> IAudioChannel.ConnectAsync(Action<IAudioClient> configAction) { throw new NotSupportedException(); }

//IChannel //IChannel
Task<IUser> IChannel.GetUserAsync(ulong id, CacheMode mode, RequestOptions options) Task<IUser> IChannel.GetUserAsync(ulong id, CacheMode mode, RequestOptions options)
=> Task.FromResult<IUser>(GetUser(id)); => Task.FromResult<IUser>(GetUser(id));


+ 2
- 2
src/Discord.Net.Rest/Entities/Channels/RestVoiceChannel.cs View File

@@ -40,8 +40,8 @@ namespace Discord.Rest


private string DebuggerDisplay => $"{Name} ({Id}, Voice)"; private string DebuggerDisplay => $"{Name} ({Id}, Voice)";


//IVoiceChannel
Task<IAudioClient> IVoiceChannel.ConnectAsync() { throw new NotSupportedException(); }
//IAudioChannel
Task<IAudioClient> IAudioChannel.ConnectAsync(Action<IAudioClient> configAction) { throw new NotSupportedException(); }


//IGuildChannel //IGuildChannel
Task<IGuildUser> IGuildChannel.GetUserAsync(ulong id, CacheMode mode, RequestOptions options) Task<IGuildUser> IGuildChannel.GetUserAsync(ulong id, CacheMode mode, RequestOptions options)


+ 5
- 1
src/Discord.Net.Rpc/Entities/Channels/RpcGroupChannel.cs View File

@@ -1,4 +1,5 @@
using Discord.Rest;
using Discord.Audio;
using Discord.Rest;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Collections.Immutable; using System.Collections.Immutable;
@@ -112,6 +113,9 @@ namespace Discord.Rpc
IDisposable IMessageChannel.EnterTypingState(RequestOptions options) IDisposable IMessageChannel.EnterTypingState(RequestOptions options)
=> EnterTypingState(options); => EnterTypingState(options);


//IAudioChannel
Task<IAudioClient> IAudioChannel.ConnectAsync(Action<IAudioClient> configAction) { throw new NotSupportedException(); }

//IChannel //IChannel
string IChannel.Name { get { throw new NotSupportedException(); } } string IChannel.Name { get { throw new NotSupportedException(); } }




+ 2
- 2
src/Discord.Net.Rpc/Entities/Channels/RpcVoiceChannel.cs View File

@@ -42,7 +42,7 @@ namespace Discord.Rpc


private string DebuggerDisplay => $"{Name} ({Id}, Voice)"; private string DebuggerDisplay => $"{Name} ({Id}, Voice)";


//IVoiceChannel
Task<IAudioClient> IVoiceChannel.ConnectAsync() { throw new NotSupportedException(); }
//IAudioChannel
Task<IAudioClient> IAudioChannel.ConnectAsync(Action<IAudioClient> configAction) { throw new NotSupportedException(); }
} }
} }

+ 23
- 6
src/Discord.Net.WebSocket/Audio/AudioClient.cs View File

@@ -47,6 +47,18 @@ namespace Discord.Audio
remove { _latencyUpdatedEvent.Remove(value); } remove { _latencyUpdatedEvent.Remove(value); }
} }
private readonly AsyncEvent<Func<int, int, Task>> _latencyUpdatedEvent = new AsyncEvent<Func<int, int, Task>>(); private readonly AsyncEvent<Func<int, int, Task>> _latencyUpdatedEvent = new AsyncEvent<Func<int, int, Task>>();
public event Func<ulong, AudioInStream, Task> StreamCreated
{
add { _streamCreated.Add(value); }
remove { _streamCreated.Remove(value); }
}
private readonly AsyncEvent<Func<ulong, AudioInStream, Task>> _streamCreated = new AsyncEvent<Func<ulong, AudioInStream, Task>>();
public event Func<ulong, Task> StreamDestroyed
{
add { _streamDestroyed.Add(value); }
remove { _streamDestroyed.Remove(value); }
}
private readonly AsyncEvent<Func<ulong, Task>> _streamDestroyed = new AsyncEvent<Func<ulong, Task>>();


private readonly Logger _audioLogger; private readonly Logger _audioLogger;
private readonly JsonSerializer _serializer; private readonly JsonSerializer _serializer;
@@ -182,7 +194,7 @@ namespace Discord.Audio
throw new ArgumentException("Value must be 120, 240, 480, 960, 1920 or 2880", nameof(samplesPerFrame)); throw new ArgumentException("Value must be 120, 240, 480, 960, 1920 or 2880", nameof(samplesPerFrame));
} }


internal void CreateInputStream(ulong userId)
internal async Task CreateInputStreamAsync(ulong userId)
{ {
//Assume Thread-safe //Assume Thread-safe
if (!_streams.ContainsKey(userId)) if (!_streams.ContainsKey(userId))
@@ -190,6 +202,7 @@ namespace Discord.Audio
var readerStream = new InputStream(); var readerStream = new InputStream();
var writerStream = new OpusDecodeStream(new RTPReadStream(readerStream, _secretKey)); var writerStream = new OpusDecodeStream(new RTPReadStream(readerStream, _secretKey));
_streams.TryAdd(userId, new StreamPair(readerStream, writerStream)); _streams.TryAdd(userId, new StreamPair(readerStream, writerStream));
await _streamCreated.InvokeAsync(userId, readerStream);
} }
} }
internal AudioInStream GetInputStream(ulong id) internal AudioInStream GetInputStream(ulong id)
@@ -199,14 +212,18 @@ namespace Discord.Audio
return streamPair.Reader; return streamPair.Reader;
return null; return null;
} }
internal void RemoveInputStream(ulong userId)
internal async Task RemoveInputStreamAsync(ulong userId)
{ {
_streams.TryRemove(userId, out var ignored);
if (_streams.TryRemove(userId, out var ignored))
await _streamDestroyed.InvokeAsync(userId).ConfigureAwait(false);
} }
internal void ClearInputStreams()
internal async Task ClearInputStreamsAsync()
{ {
foreach (var pair in _streams.Values)
pair.Reader.Dispose();
foreach (var pair in _streams)
{
pair.Value.Reader.Dispose();
await _streamDestroyed.InvokeAsync(pair.Key).ConfigureAwait(false);
}
_ssrcMap.Clear(); _ssrcMap.Clear();
_streams.Clear(); _streams.Clear();
} }


+ 4
- 2
src/Discord.Net.WebSocket/DiscordSocketClient.cs View File

@@ -1414,7 +1414,7 @@ namespace Discord.WebSocket
if (data.ChannelId != null) if (data.ChannelId != null)
{ {
before = guild.GetVoiceState(data.UserId)?.Clone() ?? SocketVoiceState.Default; before = guild.GetVoiceState(data.UserId)?.Clone() ?? SocketVoiceState.Default;
after = guild.AddOrUpdateVoiceState(State, data);
after = await guild.AddOrUpdateVoiceStateAsync(State, data).ConfigureAwait(false);
/*if (data.UserId == CurrentUser.Id) /*if (data.UserId == CurrentUser.Id)
{ {
var _ = guild.FinishJoinAudioChannel().ConfigureAwait(false); var _ = guild.FinishJoinAudioChannel().ConfigureAwait(false);
@@ -1471,7 +1471,7 @@ namespace Discord.WebSocket
if (guild != null) if (guild != null)
{ {
string endpoint = data.Endpoint.Substring(0, data.Endpoint.LastIndexOf(':')); string endpoint = data.Endpoint.Substring(0, data.Endpoint.LastIndexOf(':'));
var _ = guild.FinishConnectAudio(_nextAudioId++, endpoint, data.Token).ConfigureAwait(false);
var _ = guild.FinishConnectAudio(endpoint, data.Token).ConfigureAwait(false);
} }
else else
{ {
@@ -1725,6 +1725,8 @@ namespace Discord.WebSocket
} }
} }


internal int GetAudioId() => _nextAudioId++;

//IDiscordClient //IDiscordClient
async Task<IApplication> IDiscordClient.GetApplicationInfoAsync(RequestOptions options) async Task<IApplication> IDiscordClient.GetApplicationInfoAsync(RequestOptions options)
=> await GetApplicationInfoAsync().ConfigureAwait(false); => await GetApplicationInfoAsync().ConfigureAwait(false);


+ 0
- 1
src/Discord.Net.WebSocket/Entities/Channels/ISocketAudioChannel.cs View File

@@ -5,6 +5,5 @@ namespace Discord.WebSocket
{ {
public interface ISocketAudioChannel : IAudioChannel public interface ISocketAudioChannel : IAudioChannel
{ {
Task<IAudioClient> ConnectAsync();
} }
} }

+ 3
- 0
src/Discord.Net.WebSocket/Entities/Channels/SocketGroupChannel.cs View File

@@ -212,6 +212,9 @@ namespace Discord.WebSocket
IDisposable IMessageChannel.EnterTypingState(RequestOptions options) IDisposable IMessageChannel.EnterTypingState(RequestOptions options)
=> EnterTypingState(options); => EnterTypingState(options);


//IAudioChannel
Task<IAudioClient> IAudioChannel.ConnectAsync(Action<IAudioClient> configAction) { throw new NotSupportedException(); }

//IChannel //IChannel
Task<IUser> IChannel.GetUserAsync(ulong id, CacheMode mode, RequestOptions options) Task<IUser> IChannel.GetUserAsync(ulong id, CacheMode mode, RequestOptions options)
=> Task.FromResult<IUser>(GetUser(id)); => Task.FromResult<IUser>(GetUser(id));


+ 2
- 2
src/Discord.Net.WebSocket/Entities/Channels/SocketVoiceChannel.cs View File

@@ -40,9 +40,9 @@ namespace Discord.WebSocket
public Task ModifyAsync(Action<VoiceChannelProperties> func, RequestOptions options = null) public Task ModifyAsync(Action<VoiceChannelProperties> func, RequestOptions options = null)
=> ChannelHelper.ModifyAsync(this, Discord, func, options); => ChannelHelper.ModifyAsync(this, Discord, func, options);


public async Task<IAudioClient> ConnectAsync()
public async Task<IAudioClient> ConnectAsync(Action<IAudioClient> configAction = null)
{ {
return await Guild.ConnectAudioAsync(Id, false, false).ConfigureAwait(false);
return await Guild.ConnectAudioAsync(Id, false, false, configAction).ConfigureAwait(false);
} }


public override SocketGuildUser GetUser(ulong id) public override SocketGuildUser GetUser(ulong id)


+ 37
- 35
src/Discord.Net.WebSocket/Entities/Guilds/SocketGuild.cs View File

@@ -423,7 +423,7 @@ namespace Discord.WebSocket
} }


//Voice States //Voice States
internal SocketVoiceState AddOrUpdateVoiceState(ClientState state, VoiceStateModel model)
internal async Task<SocketVoiceState> AddOrUpdateVoiceStateAsync(ClientState state, VoiceStateModel model)
{ {
var voiceChannel = state.GetChannel(model.ChannelId.Value) as SocketVoiceChannel; var voiceChannel = state.GetChannel(model.ChannelId.Value) as SocketVoiceChannel;
var before = GetVoiceState(model.UserId) ?? SocketVoiceState.Default; var before = GetVoiceState(model.UserId) ?? SocketVoiceState.Default;
@@ -433,12 +433,12 @@ namespace Discord.WebSocket
if (_audioClient != null && before.VoiceChannel?.Id != after.VoiceChannel?.Id) if (_audioClient != null && before.VoiceChannel?.Id != after.VoiceChannel?.Id)
{ {
if (model.UserId == CurrentUser.Id) if (model.UserId == CurrentUser.Id)
RepopulateAudioStreams();
await RepopulateAudioStreamsAsync().ConfigureAwait(false);
else else
{ {
_audioClient.RemoveInputStream(model.UserId); //User changed channels, end their stream
await _audioClient.RemoveInputStreamAsync(model.UserId).ConfigureAwait(false); //User changed channels, end their stream
if (CurrentUser.VoiceChannel != null && after.VoiceChannel?.Id == CurrentUser.VoiceChannel?.Id) if (CurrentUser.VoiceChannel != null && after.VoiceChannel?.Id == CurrentUser.VoiceChannel?.Id)
_audioClient.CreateInputStream(model.UserId);
await _audioClient.CreateInputStreamAsync(model.UserId).ConfigureAwait(false);
} }
} }


@@ -464,7 +464,7 @@ namespace Discord.WebSocket
{ {
return _audioClient?.GetInputStream(userId); return _audioClient?.GetInputStream(userId);
} }
internal async Task<IAudioClient> ConnectAudioAsync(ulong channelId, bool selfDeaf, bool selfMute)
internal async Task<IAudioClient> ConnectAudioAsync(ulong channelId, bool selfDeaf, bool selfMute, Action<IAudioClient> configAction)
{ {
selfDeaf = false; selfDeaf = false;
selfMute = false; selfMute = false;
@@ -477,6 +477,32 @@ namespace Discord.WebSocket
await DisconnectAudioInternalAsync().ConfigureAwait(false); await DisconnectAudioInternalAsync().ConfigureAwait(false);
promise = new TaskCompletionSource<AudioClient>(); promise = new TaskCompletionSource<AudioClient>();
_audioConnectPromise = promise; _audioConnectPromise = promise;

if (_audioClient == null)
{
var audioClient = new AudioClient(this, Discord.GetAudioId());
audioClient.Disconnected += async ex =>
{
if (!promise.Task.IsCompleted)
{
try { audioClient.Dispose(); } catch { }
_audioClient = null;
if (ex != null)
await promise.TrySetExceptionAsync(ex);
else
await promise.TrySetCanceledAsync();
return;
}
};
audioClient.Connected += () =>
{
var _ = promise.TrySetResultAsync(_audioClient);
return Task.Delay(0);
};
configAction?.Invoke(audioClient);
_audioClient = audioClient;
}

await Discord.ApiClient.SendVoiceStateUpdateAsync(Id, channelId, selfDeaf, selfMute).ConfigureAwait(false); await Discord.ApiClient.SendVoiceStateUpdateAsync(Id, channelId, selfDeaf, selfMute).ConfigureAwait(false);
} }
catch (Exception) catch (Exception)
@@ -523,7 +549,7 @@ namespace Discord.WebSocket
await _audioClient.StopAsync().ConfigureAwait(false); await _audioClient.StopAsync().ConfigureAwait(false);
_audioClient = null; _audioClient = null;
} }
internal async Task FinishConnectAudio(int id, string url, string token)
internal async Task FinishConnectAudio(string url, string token)
{ {
//TODO: Mem Leak: Disconnected/Connected handlers arent cleaned up //TODO: Mem Leak: Disconnected/Connected handlers arent cleaned up
var voiceState = GetVoiceState(Discord.CurrentUser.Id).Value; var voiceState = GetVoiceState(Discord.CurrentUser.Id).Value;
@@ -531,31 +557,7 @@ namespace Discord.WebSocket
await _audioLock.WaitAsync().ConfigureAwait(false); await _audioLock.WaitAsync().ConfigureAwait(false);
try try
{ {
var promise = _audioConnectPromise;
if (_audioClient == null)
{
var audioClient = new AudioClient(this, id);
audioClient.Disconnected += async ex =>
{
if (!promise.Task.IsCompleted)
{
try { audioClient.Dispose(); } catch { }
_audioClient = null;
if (ex != null)
await promise.TrySetExceptionAsync(ex);
else
await promise.TrySetCanceledAsync();
return;
}
};
_audioClient = audioClient;
RepopulateAudioStreams();
}
_audioClient.Connected += () =>
{
var _ = promise.TrySetResultAsync(_audioClient);
return Task.Delay(0);
};
await RepopulateAudioStreamsAsync().ConfigureAwait(false);
await _audioClient.StartAsync(url, Discord.CurrentUser.Id, voiceState.VoiceSessionId, token).ConfigureAwait(false); await _audioClient.StartAsync(url, Discord.CurrentUser.Id, voiceState.VoiceSessionId, token).ConfigureAwait(false);
} }
catch (OperationCanceledException) catch (OperationCanceledException)
@@ -573,15 +575,15 @@ namespace Discord.WebSocket
} }
} }


internal void RepopulateAudioStreams()
internal async Task RepopulateAudioStreamsAsync()
{ {
_audioClient.ClearInputStreams(); //We changed channels, end all current streams
await _audioClient.ClearInputStreamsAsync().ConfigureAwait(false); //We changed channels, end all current streams
if (CurrentUser.VoiceChannel != null) if (CurrentUser.VoiceChannel != null)
{ {
foreach (var pair in _voiceStates) foreach (var pair in _voiceStates)
{ {
if (pair.Value.VoiceChannel?.Id == CurrentUser.VoiceChannel?.Id)
_audioClient.CreateInputStream(pair.Key);
if (pair.Value.VoiceChannel?.Id == CurrentUser.VoiceChannel?.Id && pair.Key != CurrentUser.Id)
await _audioClient.CreateInputStreamAsync(pair.Key).ConfigureAwait(false);
} }
} }
} }


Loading…
Cancel
Save