From b95f2095ef2e758e44f8ebd629ed3d2fa6650e3c Mon Sep 17 00:00:00 2001 From: RogueException Date: Thu, 28 Jul 2016 06:11:03 -0300 Subject: [PATCH] Further separated Rest/Rpc/Socket logic, added separate improvements from SocketClient --- ...rdAPIClient.cs => DiscordRestApiClient.cs} | 269 +++--------------- src/Discord.Net/API/DiscordRpcAPIClient.cs | 156 +++------- src/Discord.Net/API/DiscordSocketApiClient.cs | 240 ++++++++++++++++ src/Discord.Net/DiscordRestClient.cs | 59 ++-- src/Discord.Net/DiscordRpcClient.cs | 183 +++++------- src/Discord.Net/DiscordRpcConfig.cs | 2 +- src/Discord.Net/DiscordSocketClient.cs | 8 +- src/Discord.Net/Entities/Users/SelfUser.cs | 28 +- .../WebSocket/Users/SocketSelfUser.cs | 27 +- src/Discord.Net/IDiscordClient.cs | 2 +- 10 files changed, 445 insertions(+), 529 deletions(-) rename src/Discord.Net/API/{DiscordAPIClient.cs => DiscordRestApiClient.cs} (81%) create mode 100644 src/Discord.Net/API/DiscordSocketApiClient.cs diff --git a/src/Discord.Net/API/DiscordAPIClient.cs b/src/Discord.Net/API/DiscordRestApiClient.cs similarity index 81% rename from src/Discord.Net/API/DiscordAPIClient.cs rename to src/Discord.Net/API/DiscordRestApiClient.cs index ab12fbb9f..90658ef9d 100644 --- a/src/Discord.Net/API/DiscordAPIClient.cs +++ b/src/Discord.Net/API/DiscordRestApiClient.cs @@ -21,91 +21,48 @@ using System.Threading.Tasks; namespace Discord.API { - public class DiscordApiClient : IDisposable + public class DiscordRestApiClient : IDisposable { - private object _eventLock = new object(); - public event Func SentRequest { add { _sentRequestEvent.Add(value); } remove { _sentRequestEvent.Remove(value); } } private readonly AsyncEvent> _sentRequestEvent = new AsyncEvent>(); - public event Func SentGatewayMessage { add { _sentGatewayMessageEvent.Add(value); } remove { _sentGatewayMessageEvent.Remove(value); } } - private readonly AsyncEvent> _sentGatewayMessageEvent = new AsyncEvent>(); - - public event Func ReceivedGatewayEvent { add { _receivedGatewayEvent.Add(value); } remove { _receivedGatewayEvent.Remove(value); } } - private readonly AsyncEvent> _receivedGatewayEvent = new AsyncEvent>(); - public event Func Disconnected { add { _disconnectedEvent.Add(value); } remove { _disconnectedEvent.Remove(value); } } - private readonly AsyncEvent> _disconnectedEvent = new AsyncEvent>(); - - private readonly RequestQueue _requestQueue; - private readonly JsonSerializer _serializer; - private readonly IRestClient _restClient; - private readonly IWebSocketClient _gatewayClient; - private readonly SemaphoreSlim _connectionLock; - private CancellationTokenSource _loginCancelToken, _connectCancelToken; - private string _authToken; - private string _gatewayUrl; - private bool _isDisposed; + + protected readonly JsonSerializer _serializer; + protected readonly SemaphoreSlim _stateLock; + private readonly RestClientProvider _restClientProvider; + + protected string _authToken; + protected bool _isDisposed; + private CancellationTokenSource _loginCancelToken; + private IRestClient _restClient; public LoginState LoginState { get; private set; } - public ConnectionState ConnectionState { get; private set; } public TokenType AuthTokenType { get; private set; } + internal RequestQueue RequestQueue { get; private set; } - public DiscordApiClient(RestClientProvider restClientProvider, WebSocketProvider webSocketProvider = null, JsonSerializer serializer = null, RequestQueue requestQueue = null) + public DiscordRestApiClient(RestClientProvider restClientProvider, JsonSerializer serializer = null, RequestQueue requestQueue = null) { - _connectionLock = new SemaphoreSlim(1, 1); + _restClientProvider = restClientProvider; + _serializer = serializer ?? new JsonSerializer { ContractResolver = new DiscordContractResolver() }; + RequestQueue = requestQueue; - _requestQueue = requestQueue ?? new RequestQueue(); + _stateLock = new SemaphoreSlim(1, 1); - _restClient = restClientProvider(DiscordRestConfig.ClientAPIUrl); + SetBaseUrl(DiscordConfig.ClientAPIUrl); + } + internal void SetBaseUrl(string baseUrl) + { + _restClient = _restClientProvider(baseUrl); _restClient.SetHeader("accept", "*/*"); _restClient.SetHeader("user-agent", DiscordRestConfig.UserAgent); - if (webSocketProvider != null) - { - _gatewayClient = webSocketProvider(); - //_gatewayClient.SetHeader("user-agent", DiscordConfig.UserAgent); (Causes issues in .Net 4.6+) - _gatewayClient.BinaryMessage += async (data, index, count) => - { - using (var compressed = new MemoryStream(data, index + 2, count - 2)) - using (var decompressed = new MemoryStream()) - { - using (var zlib = new DeflateStream(compressed, CompressionMode.Decompress)) - zlib.CopyTo(decompressed); - decompressed.Position = 0; - using (var reader = new StreamReader(decompressed)) - using (var jsonReader = new JsonTextReader(reader)) - { - var msg = _serializer.Deserialize(jsonReader); - await _receivedGatewayEvent.InvokeAsync((GatewayOpCode)msg.Operation, msg.Sequence, msg.Type, msg.Payload).ConfigureAwait(false); - } - } - }; - _gatewayClient.TextMessage += async text => - { - using (var reader = new StringReader(text)) - using (var jsonReader = new JsonTextReader(reader)) - { - var msg = _serializer.Deserialize(jsonReader); - await _receivedGatewayEvent.InvokeAsync((GatewayOpCode)msg.Operation, msg.Sequence, msg.Type, msg.Payload).ConfigureAwait(false); - } - }; - _gatewayClient.Closed += async ex => - { - await DisconnectAsync().ConfigureAwait(false); - await _disconnectedEvent.InvokeAsync(ex).ConfigureAwait(false); - }; - } - - _serializer = serializer ?? new JsonSerializer { ContractResolver = new DiscordContractResolver() }; } - private void Dispose(bool disposing) + internal virtual void Dispose(bool disposing) { if (!_isDisposed) { if (disposing) { _loginCancelToken?.Dispose(); - _connectCancelToken?.Dispose(); (_restClient as IDisposable)?.Dispose(); - (_gatewayClient as IDisposable)?.Dispose(); } _isDisposed = true; } @@ -114,12 +71,12 @@ namespace Discord.API public async Task LoginAsync(TokenType tokenType, string token, RequestOptions options = null) { - await _connectionLock.WaitAsync().ConfigureAwait(false); + await _stateLock.WaitAsync().ConfigureAwait(false); try { await LoginInternalAsync(tokenType, token, options).ConfigureAwait(false); } - finally { _connectionLock.Release(); } + finally { _stateLock.Release(); } } private async Task LoginInternalAsync(TokenType tokenType, string token, RequestOptions options = null) { @@ -134,7 +91,7 @@ namespace Discord.API AuthTokenType = TokenType.User; _authToken = null; _restClient.SetHeader("authorization", null); - await _requestQueue.SetCancelTokenAsync(_loginCancelToken.Token).ConfigureAwait(false); + await RequestQueue.SetCancelTokenAsync(_loginCancelToken.Token).ConfigureAwait(false); _restClient.SetCancelToken(_loginCancelToken.Token); AuthTokenType = tokenType; @@ -165,12 +122,12 @@ namespace Discord.API public async Task LogoutAsync() { - await _connectionLock.WaitAsync().ConfigureAwait(false); + await _stateLock.WaitAsync().ConfigureAwait(false); try { await LogoutInternalAsync().ConfigureAwait(false); } - finally { _connectionLock.Release(); } + finally { _stateLock.Release(); } } private async Task LogoutInternalAsync() { @@ -182,87 +139,16 @@ namespace Discord.API catch { } await DisconnectInternalAsync().ConfigureAwait(false); - await _requestQueue.ClearAsync().ConfigureAwait(false); + await RequestQueue.ClearAsync().ConfigureAwait(false); - await _requestQueue.SetCancelTokenAsync(CancellationToken.None).ConfigureAwait(false); + await RequestQueue.SetCancelTokenAsync(CancellationToken.None).ConfigureAwait(false); _restClient.SetCancelToken(CancellationToken.None); LoginState = LoginState.LoggedOut; } - public async Task ConnectAsync() - { - await _connectionLock.WaitAsync().ConfigureAwait(false); - try - { - await ConnectInternalAsync().ConfigureAwait(false); - } - finally { _connectionLock.Release(); } - } - private async Task ConnectInternalAsync() - { - if (LoginState != LoginState.LoggedIn) - throw new InvalidOperationException("You must log in before connecting."); - if (_gatewayClient == null) - throw new NotSupportedException("This client is not configured with websocket support."); - - ConnectionState = ConnectionState.Connecting; - try - { - _connectCancelToken = new CancellationTokenSource(); - if (_gatewayClient != null) - _gatewayClient.SetCancelToken(_connectCancelToken.Token); - - if (_gatewayUrl == null) - { - var gatewayResponse = await GetGatewayAsync().ConfigureAwait(false); - _gatewayUrl = $"{gatewayResponse.Url}?v={DiscordConfig.APIVersion}&encoding={DiscordSocketConfig.GatewayEncoding}"; - } - await _gatewayClient.ConnectAsync(_gatewayUrl).ConfigureAwait(false); - - ConnectionState = ConnectionState.Connected; - } - catch (Exception) - { - _gatewayUrl = null; //Uncache in case the gateway url changed - await DisconnectInternalAsync().ConfigureAwait(false); - throw; - } - } - - public async Task DisconnectAsync() - { - await _connectionLock.WaitAsync().ConfigureAwait(false); - try - { - await DisconnectInternalAsync().ConfigureAwait(false); - } - finally { _connectionLock.Release(); } - } - public async Task DisconnectAsync(Exception ex) - { - await _connectionLock.WaitAsync().ConfigureAwait(false); - try - { - await DisconnectInternalAsync().ConfigureAwait(false); - } - finally { _connectionLock.Release(); } - } - private async Task DisconnectInternalAsync() - { - if (_gatewayClient == null) - throw new NotSupportedException("This client is not configured with websocket support."); - - if (ConnectionState == ConnectionState.Disconnected) return; - ConnectionState = ConnectionState.Disconnecting; - - try { _connectCancelToken?.Cancel(false); } - catch { } - - await _gatewayClient.DisconnectAsync().ConfigureAwait(false); - - ConnectionState = ConnectionState.Disconnected; - } + internal virtual Task ConnectInternalAsync() => Task.CompletedTask; + internal virtual Task DisconnectInternalAsync() => Task.CompletedTask; //REST public Task SendAsync(string method, string endpoint, @@ -306,15 +192,6 @@ namespace Discord.API GuildBucket bucket, ulong guildId, RequestOptions options = null) where TResponse : class => DeserializeJson(await SendMultipartInternalAsync(method, endpoint, multipartArgs, false, BucketGroup.Guild, (int)bucket, guildId, options).ConfigureAwait(false)); - //Gateway - public Task SendGatewayAsync(GatewayOpCode opCode, object payload, - GlobalBucket bucket = GlobalBucket.GeneralGateway, RequestOptions options = null) - => SendGatewayInternalAsync(opCode, payload, BucketGroup.Global, (int)bucket, 0, options); - - public Task SendGatewayAsync(GatewayOpCode opCode, object payload, - GuildBucket bucket, ulong guildId, RequestOptions options = null) - => SendGatewayInternalAsync(opCode, payload, BucketGroup.Guild, (int)bucket, guildId, options); - //Core private async Task SendInternalAsync(string method, string endpoint, object payload, bool headerOnly, BucketGroup group, int bucketId, ulong guildId, RequestOptions options = null) @@ -323,7 +200,7 @@ namespace Discord.API string json = null; if (payload != null) json = SerializeJson(payload); - var responseStream = await _requestQueue.SendAsync(new RestRequest(_restClient, method, endpoint, json, headerOnly, options), group, bucketId, guildId).ConfigureAwait(false); + var responseStream = await RequestQueue.SendAsync(new RestRequest(_restClient, method, endpoint, json, headerOnly, options), group, bucketId, guildId).ConfigureAwait(false); stopwatch.Stop(); double milliseconds = ToMilliseconds(stopwatch); @@ -335,7 +212,7 @@ namespace Discord.API BucketGroup group, int bucketId, ulong guildId, RequestOptions options = null) { var stopwatch = Stopwatch.StartNew(); - var responseStream = await _requestQueue.SendAsync(new RestRequest(_restClient, method, endpoint, multipartArgs, headerOnly, options), group, bucketId, guildId).ConfigureAwait(false); + var responseStream = await RequestQueue.SendAsync(new RestRequest(_restClient, method, endpoint, multipartArgs, headerOnly, options), group, bucketId, guildId).ConfigureAwait(false); int bytes = headerOnly ? 0 : (int)responseStream.Length; stopwatch.Stop(); @@ -344,17 +221,6 @@ namespace Discord.API return responseStream; } - private async Task SendGatewayInternalAsync(GatewayOpCode opCode, object payload, - BucketGroup group, int bucketId, ulong guildId, RequestOptions options) - { - //TODO: Add ETF - byte[] bytes = null; - payload = new WebSocketMessage { Operation = (int)opCode, Payload = payload }; - if (payload != null) - bytes = Encoding.UTF8.GetBytes(SerializeJson(payload)); - await _requestQueue.SendAsync(new WebSocketRequest(_gatewayClient, bytes, true, options), group, bucketId, guildId).ConfigureAwait(false); - await _sentGatewayMessageEvent.InvokeAsync(opCode).ConfigureAwait(false); - } //Auth public async Task ValidateTokenAsync(RequestOptions options = null) @@ -362,69 +228,6 @@ namespace Discord.API await SendAsync("GET", "auth/login", options: options).ConfigureAwait(false); } - //Gateway - public async Task GetGatewayAsync(RequestOptions options = null) - { - return await SendAsync("GET", "gateway", options: options).ConfigureAwait(false); - } - public async Task SendIdentifyAsync(int largeThreshold = 100, bool useCompression = true, RequestOptions options = null) - { - var props = new Dictionary - { - ["$device"] = "Discord.Net" - }; - var msg = new IdentifyParams() - { - Token = _authToken, - Properties = props, - LargeThreshold = largeThreshold, - UseCompression = useCompression - }; - await SendGatewayAsync(GatewayOpCode.Identify, msg, options: options).ConfigureAwait(false); - } - public async Task SendResumeAsync(string sessionId, int lastSeq, RequestOptions options = null) - { - var msg = new ResumeParams() - { - Token = _authToken, - SessionId = sessionId, - Sequence = lastSeq - }; - await SendGatewayAsync(GatewayOpCode.Resume, msg, options: options).ConfigureAwait(false); - } - public async Task SendHeartbeatAsync(int lastSeq, RequestOptions options = null) - { - await SendGatewayAsync(GatewayOpCode.Heartbeat, lastSeq, options: options).ConfigureAwait(false); - } - public async Task SendStatusUpdateAsync(long? idleSince, Game game, RequestOptions options = null) - { - var args = new StatusUpdateParams - { - IdleSince = idleSince, - Game = game - }; - await SendGatewayAsync(GatewayOpCode.StatusUpdate, args, options: options).ConfigureAwait(false); - } - public async Task SendRequestMembersAsync(IEnumerable guildIds, RequestOptions options = null) - { - await SendGatewayAsync(GatewayOpCode.RequestGuildMembers, new RequestMembersParams { GuildIds = guildIds, Query = "", Limit = 0 }, options: options).ConfigureAwait(false); - } - public async Task SendVoiceStateUpdateAsync(ulong guildId, ulong? channelId, bool selfDeaf, bool selfMute, RequestOptions options = null) - { - var payload = new VoiceStateUpdateParams - { - GuildId = guildId, - ChannelId = channelId, - SelfDeaf = selfDeaf, - SelfMute = selfMute - }; - await SendGatewayAsync(GatewayOpCode.VoiceStateUpdate, payload, options: options).ConfigureAwait(false); - } - public async Task SendGuildSyncAsync(IEnumerable guildIds, RequestOptions options = null) - { - await SendGatewayAsync(GatewayOpCode.GuildSync, guildIds, options: options).ConfigureAwait(false); - } - //Channels public async Task GetChannelAsync(ulong channelId, RequestOptions options = null) { @@ -1230,8 +1033,8 @@ namespace Discord.API } //Helpers - private static double ToMilliseconds(Stopwatch stopwatch) => Math.Round((double)stopwatch.ElapsedTicks / (double)Stopwatch.Frequency * 1000.0, 2); - private string SerializeJson(object value) + protected static double ToMilliseconds(Stopwatch stopwatch) => Math.Round((double)stopwatch.ElapsedTicks / (double)Stopwatch.Frequency * 1000.0, 2); + protected string SerializeJson(object value) { var sb = new StringBuilder(256); using (TextWriter text = new StringWriter(sb, CultureInfo.InvariantCulture)) @@ -1239,7 +1042,7 @@ namespace Discord.API _serializer.Serialize(writer, value); return sb.ToString(); } - private T DeserializeJson(Stream jsonStream) + protected T DeserializeJson(Stream jsonStream) { using (TextReader text = new StreamReader(jsonStream)) using (JsonReader reader = new JsonTextReader(text)) diff --git a/src/Discord.Net/API/DiscordRpcAPIClient.cs b/src/Discord.Net/API/DiscordRpcAPIClient.cs index 8a813d8e5..49df7d671 100644 --- a/src/Discord.Net/API/DiscordRpcAPIClient.cs +++ b/src/Discord.Net/API/DiscordRpcAPIClient.cs @@ -2,6 +2,7 @@ using Discord.Logging; using Discord.Net.Converters; using Discord.Net.Queue; +using Discord.Net.Rest; using Discord.Net.WebSockets; using Newtonsoft.Json; using Newtonsoft.Json.Linq; @@ -17,7 +18,7 @@ using System.Threading.Tasks; namespace Discord.API { - public class DiscordRpcApiClient : IDisposable + public class DiscordRpcApiClient : DiscordRestApiClient, IDisposable { private abstract class RpcRequest { @@ -60,19 +61,16 @@ namespace Discord.API private readonly ConcurrentDictionary _requests; private readonly RequestQueue _requestQueue; - private readonly JsonSerializer _serializer; private readonly IWebSocketClient _webSocketClient; private readonly SemaphoreSlim _connectionLock; private readonly string _clientId; private CancellationTokenSource _loginCancelToken, _connectCancelToken; - private string _authToken; private string _origin; - private bool _isDisposed; - public LoginState LoginState { get; private set; } public ConnectionState ConnectionState { get; private set; } - public DiscordRpcApiClient(string clientId, string origin, WebSocketProvider webSocketProvider, JsonSerializer serializer = null, RequestQueue requestQueue = null) + public DiscordRpcApiClient(string clientId, string origin, RestClientProvider restClientProvider, WebSocketProvider webSocketProvider, JsonSerializer serializer = null, RequestQueue requestQueue = null) + : base(restClientProvider, serializer, requestQueue) { _connectionLock = new SemaphoreSlim(1, 1); _clientId = clientId; @@ -80,33 +78,19 @@ namespace Discord.API _requestQueue = requestQueue ?? new RequestQueue(); _requests = new ConcurrentDictionary(); - - if (webSocketProvider != null) + + _webSocketClient = webSocketProvider(); + //_webSocketClient.SetHeader("user-agent", DiscordConfig.UserAgent); (Causes issues in .Net 4.6+) + _webSocketClient.SetHeader("origin", _origin); + _webSocketClient.BinaryMessage += async (data, index, count) => { - _webSocketClient = webSocketProvider(); - //_webSocketClient.SetHeader("user-agent", DiscordConfig.UserAgent); (Causes issues in .Net 4.6+) - _webSocketClient.SetHeader("origin", _origin); - _webSocketClient.BinaryMessage += async (data, index, count) => + using (var compressed = new MemoryStream(data, index + 2, count - 2)) + using (var decompressed = new MemoryStream()) { - using (var compressed = new MemoryStream(data, index + 2, count - 2)) - using (var decompressed = new MemoryStream()) - { - using (var zlib = new DeflateStream(compressed, CompressionMode.Decompress)) - zlib.CopyTo(decompressed); - decompressed.Position = 0; - using (var reader = new StreamReader(decompressed)) - using (var jsonReader = new JsonTextReader(reader)) - { - var msg = _serializer.Deserialize(jsonReader); - await _receivedRpcEvent.InvokeAsync(msg.Cmd, msg.Event, msg.Data).ConfigureAwait(false); - if (msg.Nonce.IsSpecified && msg.Nonce.Value.HasValue) - ProcessMessage(msg); - } - } - }; - _webSocketClient.TextMessage += async text => - { - using (var reader = new StringReader(text)) + using (var zlib = new DeflateStream(compressed, CompressionMode.Decompress)) + zlib.CopyTo(decompressed); + decompressed.Position = 0; + using (var reader = new StreamReader(decompressed)) using (var jsonReader = new JsonTextReader(reader)) { var msg = _serializer.Deserialize(jsonReader); @@ -114,17 +98,26 @@ namespace Discord.API if (msg.Nonce.IsSpecified && msg.Nonce.Value.HasValue) ProcessMessage(msg); } - }; - _webSocketClient.Closed += async ex => + } + }; + _webSocketClient.TextMessage += async text => + { + using (var reader = new StringReader(text)) + using (var jsonReader = new JsonTextReader(reader)) { - await DisconnectAsync().ConfigureAwait(false); - await _disconnectedEvent.InvokeAsync(ex).ConfigureAwait(false); - }; - } - - _serializer = serializer ?? new JsonSerializer { ContractResolver = new DiscordContractResolver() }; + var msg = _serializer.Deserialize(jsonReader); + await _receivedRpcEvent.InvokeAsync(msg.Cmd, msg.Event, msg.Data).ConfigureAwait(false); + if (msg.Nonce.IsSpecified && msg.Nonce.Value.HasValue) + ProcessMessage(msg); + } + }; + _webSocketClient.Closed += async ex => + { + await DisconnectAsync().ConfigureAwait(false); + await _disconnectedEvent.InvokeAsync(ex).ConfigureAwait(false); + }; } - private void Dispose(bool disposing) + internal override void Dispose(bool disposing) { if (!_isDisposed) { @@ -136,67 +129,6 @@ namespace Discord.API _isDisposed = true; } } - public void Dispose() => Dispose(true); - - public async Task LoginAsync(TokenType tokenType, string token, bool upgrade = false, RequestOptions options = null) - { - await _connectionLock.WaitAsync().ConfigureAwait(false); - try - { - await LoginInternalAsync(tokenType, token, upgrade, options).ConfigureAwait(false); - } - finally { _connectionLock.Release(); } - } - private async Task LoginInternalAsync(TokenType tokenType, string token, bool upgrade = false, RequestOptions options = null) - { - if (!upgrade && LoginState != LoginState.LoggedOut) - await LogoutInternalAsync().ConfigureAwait(false); - - if (tokenType != TokenType.Bearer) - throw new InvalidOperationException("RPC only supports bearer tokens"); - - LoginState = LoginState.LoggingIn; - try - { - _loginCancelToken = new CancellationTokenSource(); - await _requestQueue.SetCancelTokenAsync(_loginCancelToken.Token).ConfigureAwait(false); - - _authToken = token; - - LoginState = LoginState.LoggedIn; - } - catch (Exception) - { - await LogoutInternalAsync().ConfigureAwait(false); - throw; - } - } - - public async Task LogoutAsync() - { - await _connectionLock.WaitAsync().ConfigureAwait(false); - try - { - await LogoutInternalAsync().ConfigureAwait(false); - } - finally { _connectionLock.Release(); } - } - private async Task LogoutInternalAsync() - { - //An exception here will lock the client into the unusable LoggingOut state, but that's probably fine since our client is in an undefined state too. - if (LoginState == LoginState.LoggedOut) return; - LoginState = LoginState.LoggingOut; - - try { _loginCancelToken?.Cancel(false); } - catch { } - - await DisconnectInternalAsync().ConfigureAwait(false); - await _requestQueue.ClearAsync().ConfigureAwait(false); - - await _requestQueue.SetCancelTokenAsync(CancellationToken.None).ConfigureAwait(false); - - LoginState = LoginState.LoggedOut; - } public async Task ConnectAsync() { @@ -207,7 +139,7 @@ namespace Discord.API } finally { _connectionLock.Release(); } } - private async Task ConnectInternalAsync() + internal override async Task ConnectInternalAsync() { /*if (LoginState != LoginState.LoggedIn) throw new InvalidOperationException("You must log in before connecting.");*/ @@ -226,6 +158,7 @@ namespace Discord.API { string url = $"wss://discordapp.io:{port}/?v={DiscordRpcConfig.RpcAPIVersion}&client_id={_clientId}"; await _webSocketClient.ConnectAsync(url).ConfigureAwait(false); + SetBaseUrl($"https://discordapp.io:{port}"); success = true; break; } @@ -254,7 +187,7 @@ namespace Discord.API } finally { _connectionLock.Release(); } } - private async Task DisconnectInternalAsync() + internal override async Task DisconnectInternalAsync() { if (_webSocketClient == null) throw new NotSupportedException("This client is not configured with websocket support."); @@ -421,22 +354,5 @@ namespace Discord.API else return false; } - - //Helpers - private static double ToMilliseconds(Stopwatch stopwatch) => Math.Round((double)stopwatch.ElapsedTicks / (double)Stopwatch.Frequency * 1000.0, 2); - private string SerializeJson(object value) - { - var sb = new StringBuilder(256); - using (TextWriter text = new StringWriter(sb, CultureInfo.InvariantCulture)) - using (JsonWriter writer = new JsonTextWriter(text)) - _serializer.Serialize(writer, value); - return sb.ToString(); - } - private T DeserializeJson(Stream jsonStream) - { - using (TextReader text = new StreamReader(jsonStream)) - using (JsonReader reader = new JsonTextReader(text)) - return _serializer.Deserialize(reader); - } } } diff --git a/src/Discord.Net/API/DiscordSocketApiClient.cs b/src/Discord.Net/API/DiscordSocketApiClient.cs new file mode 100644 index 000000000..70d660cd4 --- /dev/null +++ b/src/Discord.Net/API/DiscordSocketApiClient.cs @@ -0,0 +1,240 @@ +using Discord.API.Gateway; +using Discord.API.Rest; +using Discord.Net.Queue; +using Discord.Net.Rest; +using Discord.Net.WebSockets; +using Newtonsoft.Json; +using System; +using System.Collections.Generic; +using System.IO; +using System.IO.Compression; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Discord.API +{ + public class DiscordSocketApiClient : DiscordRestApiClient + { + public event Func SentGatewayMessage { add { _sentGatewayMessageEvent.Add(value); } remove { _sentGatewayMessageEvent.Remove(value); } } + private readonly AsyncEvent> _sentGatewayMessageEvent = new AsyncEvent>(); + public event Func ReceivedGatewayEvent { add { _receivedGatewayEvent.Add(value); } remove { _receivedGatewayEvent.Remove(value); } } + private readonly AsyncEvent> _receivedGatewayEvent = new AsyncEvent>(); + + public event Func Disconnected { add { _disconnectedEvent.Add(value); } remove { _disconnectedEvent.Remove(value); } } + private readonly AsyncEvent> _disconnectedEvent = new AsyncEvent>(); + + private readonly IWebSocketClient _gatewayClient; + private CancellationTokenSource _connectCancelToken; + private string _gatewayUrl; + + public ConnectionState ConnectionState { get; private set; } + + public DiscordSocketApiClient(RestClientProvider restClientProvider, WebSocketProvider webSocketProvider, JsonSerializer serializer = null, RequestQueue requestQueue = null) + : base(restClientProvider, serializer, requestQueue) + { + _gatewayClient = webSocketProvider(); + //_gatewayClient.SetHeader("user-agent", DiscordConfig.UserAgent); (Causes issues in .Net 4.6+) + _gatewayClient.BinaryMessage += async (data, index, count) => + { + using (var compressed = new MemoryStream(data, index + 2, count - 2)) + using (var decompressed = new MemoryStream()) + { + using (var zlib = new DeflateStream(compressed, CompressionMode.Decompress)) + zlib.CopyTo(decompressed); + decompressed.Position = 0; + using (var reader = new StreamReader(decompressed)) + using (var jsonReader = new JsonTextReader(reader)) + { + var msg = _serializer.Deserialize(jsonReader); + await _receivedGatewayEvent.InvokeAsync((GatewayOpCode)msg.Operation, msg.Sequence, msg.Type, msg.Payload).ConfigureAwait(false); + } + } + }; + _gatewayClient.TextMessage += async text => + { + using (var reader = new StringReader(text)) + using (var jsonReader = new JsonTextReader(reader)) + { + var msg = _serializer.Deserialize(jsonReader); + await _receivedGatewayEvent.InvokeAsync((GatewayOpCode)msg.Operation, msg.Sequence, msg.Type, msg.Payload).ConfigureAwait(false); + } + }; + _gatewayClient.Closed += async ex => + { + await DisconnectAsync().ConfigureAwait(false); + await _disconnectedEvent.InvokeAsync(ex).ConfigureAwait(false); + }; + } + internal override void Dispose(bool disposing) + { + if (!_isDisposed) + { + if (disposing) + { + _connectCancelToken?.Dispose(); + (_gatewayClient as IDisposable)?.Dispose(); + } + _isDisposed = true; + } + } + + public async Task ConnectAsync() + { + await _stateLock.WaitAsync().ConfigureAwait(false); + try + { + await ConnectInternalAsync().ConfigureAwait(false); + } + finally { _stateLock.Release(); } + } + internal override async Task ConnectInternalAsync() + { + if (LoginState != LoginState.LoggedIn) + throw new InvalidOperationException("You must log in before connecting."); + if (_gatewayClient == null) + throw new NotSupportedException("This client is not configured with websocket support."); + + ConnectionState = ConnectionState.Connecting; + try + { + _connectCancelToken = new CancellationTokenSource(); + if (_gatewayClient != null) + _gatewayClient.SetCancelToken(_connectCancelToken.Token); + + if (_gatewayUrl == null) + { + var gatewayResponse = await GetGatewayAsync().ConfigureAwait(false); + _gatewayUrl = $"{gatewayResponse.Url}?v={DiscordConfig.APIVersion}&encoding={DiscordSocketConfig.GatewayEncoding}"; + } + await _gatewayClient.ConnectAsync(_gatewayUrl).ConfigureAwait(false); + + ConnectionState = ConnectionState.Connected; + } + catch (Exception) + { + _gatewayUrl = null; //Uncache in case the gateway url changed + await DisconnectInternalAsync().ConfigureAwait(false); + throw; + } + } + + public async Task DisconnectAsync() + { + await _stateLock.WaitAsync().ConfigureAwait(false); + try + { + await DisconnectInternalAsync().ConfigureAwait(false); + } + finally { _stateLock.Release(); } + } + public async Task DisconnectAsync(Exception ex) + { + await _stateLock.WaitAsync().ConfigureAwait(false); + try + { + await DisconnectInternalAsync().ConfigureAwait(false); + } + finally { _stateLock.Release(); } + } + internal override async Task DisconnectInternalAsync() + { + if (_gatewayClient == null) + throw new NotSupportedException("This client is not configured with websocket support."); + + if (ConnectionState == ConnectionState.Disconnected) return; + ConnectionState = ConnectionState.Disconnecting; + + try { _connectCancelToken?.Cancel(false); } + catch { } + + await _gatewayClient.DisconnectAsync().ConfigureAwait(false); + + ConnectionState = ConnectionState.Disconnected; + } + + //Core + private async Task SendGatewayInternalAsync(GatewayOpCode opCode, object payload, + BucketGroup group, int bucketId, ulong guildId, RequestOptions options) + { + //TODO: Add ETF + byte[] bytes = null; + payload = new WebSocketMessage { Operation = (int)opCode, Payload = payload }; + if (payload != null) + bytes = Encoding.UTF8.GetBytes(SerializeJson(payload)); + await RequestQueue.SendAsync(new WebSocketRequest(_gatewayClient, bytes, true, options), group, bucketId, guildId).ConfigureAwait(false); + await _sentGatewayMessageEvent.InvokeAsync(opCode).ConfigureAwait(false); + } + + //Gateway + public Task SendGatewayAsync(GatewayOpCode opCode, object payload, + GlobalBucket bucket = GlobalBucket.GeneralGateway, RequestOptions options = null) + => SendGatewayInternalAsync(opCode, payload, BucketGroup.Global, (int)bucket, 0, options); + + public Task SendGatewayAsync(GatewayOpCode opCode, object payload, + GuildBucket bucket, ulong guildId, RequestOptions options = null) + => SendGatewayInternalAsync(opCode, payload, BucketGroup.Guild, (int)bucket, guildId, options); + + public async Task GetGatewayAsync(RequestOptions options = null) + { + return await SendAsync("GET", "gateway", options: options).ConfigureAwait(false); + } + public async Task SendIdentifyAsync(int largeThreshold = 100, bool useCompression = true, RequestOptions options = null) + { + var props = new Dictionary + { + ["$device"] = "Discord.Net" + }; + var msg = new IdentifyParams() + { + Token = _authToken, + Properties = props, + LargeThreshold = largeThreshold, + UseCompression = useCompression + }; + await SendGatewayAsync(GatewayOpCode.Identify, msg, options: options).ConfigureAwait(false); + } + public async Task SendResumeAsync(string sessionId, int lastSeq, RequestOptions options = null) + { + var msg = new ResumeParams() + { + Token = _authToken, + SessionId = sessionId, + Sequence = lastSeq + }; + await SendGatewayAsync(GatewayOpCode.Resume, msg, options: options).ConfigureAwait(false); + } + public async Task SendHeartbeatAsync(int lastSeq, RequestOptions options = null) + { + await SendGatewayAsync(GatewayOpCode.Heartbeat, lastSeq, options: options).ConfigureAwait(false); + } + public async Task SendStatusUpdateAsync(long? idleSince, Game game, RequestOptions options = null) + { + var args = new StatusUpdateParams + { + IdleSince = idleSince, + Game = game + }; + await SendGatewayAsync(GatewayOpCode.StatusUpdate, args, options: options).ConfigureAwait(false); + } + public async Task SendRequestMembersAsync(IEnumerable guildIds, RequestOptions options = null) + { + await SendGatewayAsync(GatewayOpCode.RequestGuildMembers, new RequestMembersParams { GuildIds = guildIds, Query = "", Limit = 0 }, options: options).ConfigureAwait(false); + } + public async Task SendVoiceStateUpdateAsync(ulong guildId, ulong? channelId, bool selfDeaf, bool selfMute, RequestOptions options = null) + { + var payload = new VoiceStateUpdateParams + { + GuildId = guildId, + ChannelId = channelId, + SelfDeaf = selfDeaf, + SelfMute = selfMute + }; + await SendGatewayAsync(GatewayOpCode.VoiceStateUpdate, payload, options: options).ConfigureAwait(false); + } + public async Task SendGuildSyncAsync(IEnumerable guildIds, RequestOptions options = null) + { + await SendGatewayAsync(GatewayOpCode.GuildSync, guildIds, options: options).ConfigureAwait(false); + } + } +} diff --git a/src/Discord.Net/DiscordRestClient.cs b/src/Discord.Net/DiscordRestClient.cs index 134072253..92c27e3da 100644 --- a/src/Discord.Net/DiscordRestClient.cs +++ b/src/Discord.Net/DiscordRestClient.cs @@ -27,20 +27,21 @@ namespace Discord internal readonly ILogger _clientLogger, _restLogger, _queueLogger; internal readonly SemaphoreSlim _connectionLock; - internal readonly RequestQueue _requestQueue; internal SelfUser _currentUser; private bool _isFirstLogSub; internal bool _isDisposed; - public API.DiscordApiClient ApiClient { get; } + public API.DiscordRestApiClient ApiClient { get; } internal LogManager LogManager { get; } public LoginState LoginState { get; private set; } /// Creates a new REST-only discord client. public DiscordRestClient() : this(new DiscordRestConfig()) { } + public DiscordRestClient(DiscordRestConfig config) : this(config, CreateApiClient(config)) { } /// Creates a new REST-only discord client. - public DiscordRestClient(DiscordRestConfig config) + internal DiscordRestClient(DiscordRestConfig config, API.DiscordRestApiClient client) { + ApiClient = client; LogManager = new LogManager(config.LogLevel); LogManager.Message += async msg => await _logEvent.InvokeAsync(msg).ConfigureAwait(false); _clientLogger = LogManager.CreateLogger("Client"); @@ -50,19 +51,16 @@ namespace Discord _connectionLock = new SemaphoreSlim(1, 1); - _requestQueue = new RequestQueue(); - _requestQueue.RateLimitTriggered += async (id, bucket, millis) => + ApiClient.RequestQueue.RateLimitTriggered += async (id, bucket, millis) => { await _queueLogger.WarningAsync($"Rate limit triggered (id = \"{id ?? "null"}\")").ConfigureAwait(false); if (bucket == null && id != null) await _queueLogger.WarningAsync($"Unknown rate limit bucket \"{id ?? "null"}\"").ConfigureAwait(false); }; - - var restProvider = config.RestClientProvider; - var webSocketProvider = (this is DiscordSocketClient) ? (config as DiscordSocketConfig)?.WebSocketProvider : null; //TODO: Clean this check - ApiClient = new API.DiscordApiClient(restProvider, webSocketProvider, requestQueue: _requestQueue); ApiClient.SentRequest += async (method, endpoint, millis) => await _restLogger.VerboseAsync($"{method} {endpoint}: {millis} ms").ConfigureAwait(false); } + private static API.DiscordRestApiClient CreateApiClient(DiscordRestConfig config) + => new API.DiscordRestApiClient(config.RestClientProvider, requestQueue: new RequestQueue()); /// public async Task LoginAsync(TokenType tokenType, string token, bool validateToken = true) @@ -89,27 +87,9 @@ namespace Discord try { await ApiClient.LoginAsync(tokenType, token).ConfigureAwait(false); - if (validateToken) - { - try - { - var user = await GetCurrentUserAsync().ConfigureAwait(false); - if (user == null) //Is using a cached DiscordClient - user = new SelfUser(this, await ApiClient.GetMyUserAsync().ConfigureAwait(false)); - - if (user.IsBot && tokenType == TokenType.User) - throw new InvalidOperationException($"A bot token used provided with {nameof(TokenType)}.{nameof(TokenType.User)}"); - else if (!user.IsBot && tokenType == TokenType.Bot) //Discord currently sends a 401 in this case - throw new InvalidOperationException($"A user token used provided with {nameof(TokenType)}.{nameof(TokenType.Bot)}"); - } - catch (HttpException ex) - { - throw new ArgumentException("Token validation failed", nameof(token), ex); - } - } - - await OnLoginAsync().ConfigureAwait(false); + await ValidateTokenAsync(tokenType, token).ConfigureAwait(false); + await OnLoginAsync(tokenType, token).ConfigureAwait(false); LoginState = LoginState.LoggedIn; } @@ -121,7 +101,26 @@ namespace Discord await _loggedInEvent.InvokeAsync().ConfigureAwait(false); } - protected virtual Task OnLoginAsync() => Task.CompletedTask; + protected virtual async Task ValidateTokenAsync(TokenType tokenType, string token) + { + try + { + var user = await GetCurrentUserAsync().ConfigureAwait(false); + if (user == null) //Is using a cached DiscordClient + user = new SelfUser(this, await ApiClient.GetMyUserAsync().ConfigureAwait(false)); + + if (user.IsBot && tokenType == TokenType.User) + throw new InvalidOperationException($"A bot token used provided with {nameof(TokenType)}.{nameof(TokenType.User)}"); + else if (!user.IsBot && tokenType == TokenType.Bot) //Discord currently sends a 401 in this case + throw new InvalidOperationException($"A user token used provided with {nameof(TokenType)}.{nameof(TokenType.Bot)}"); + } + catch (HttpException ex) + { + throw new ArgumentException("Token validation failed", nameof(token), ex); + } + } + protected virtual Task OnLoginAsync(TokenType tokenType, string token) => Task.CompletedTask; + /// public async Task LogoutAsync() diff --git a/src/Discord.Net/DiscordRpcClient.cs b/src/Discord.Net/DiscordRpcClient.cs index 97648771b..6d3f2ac64 100644 --- a/src/Discord.Net/DiscordRpcClient.cs +++ b/src/Discord.Net/DiscordRpcClient.cs @@ -1,6 +1,7 @@ using Discord.API.Rpc; using Discord.Logging; using Discord.Net.Converters; +using Discord.Net.Queue; using Newtonsoft.Json; using Newtonsoft.Json.Linq; using System; @@ -10,16 +11,8 @@ using System.Threading.Tasks; namespace Discord { - public class DiscordRpcClient + public class DiscordRpcClient : DiscordRestClient { - public event Func Log { add { _logEvent.Add(value); } remove { _logEvent.Remove(value); } } - private readonly AsyncEvent> _logEvent = new AsyncEvent>(); - - public event Func LoggedIn { add { _loggedInEvent.Add(value); } remove { _loggedInEvent.Remove(value); } } - private readonly AsyncEvent> _loggedInEvent = new AsyncEvent>(); - public event Func LoggedOut { add { _loggedOutEvent.Add(value); } remove { _loggedOutEvent.Remove(value); } } - private readonly AsyncEvent> _loggedOutEvent = new AsyncEvent>(); - public event Func Connected { add { _connectedEvent.Add(value); } remove { _connectedEvent.Remove(value); } } private readonly AsyncEvent> _connectedEvent = new AsyncEvent>(); public event Func Disconnected { add { _disconnectedEvent.Add(value); } remove { _disconnectedEvent.Remove(value); } } @@ -28,44 +21,36 @@ namespace Discord public event Func Ready { add { _readyEvent.Add(value); } remove { _readyEvent.Remove(value); } } private readonly AsyncEvent> _readyEvent = new AsyncEvent>(); - private readonly ILogger _clientLogger, _rpcLogger; - private readonly SemaphoreSlim _connectionLock; + private readonly ILogger _rpcLogger; private readonly JsonSerializer _serializer; private TaskCompletionSource _connectTask; - private CancellationTokenSource _cancelToken; - internal SelfUser _currentUser; + private CancellationTokenSource _cancelToken, _reconnectCancelToken; private Task _reconnectTask; private bool _isFirstLogSub; private bool _isReconnecting; - private bool _isDisposed; + private bool _canReconnect; - public API.DiscordRpcApiClient ApiClient { get; } - internal LogManager LogManager { get; } - public LoginState LoginState { get; private set; } public ConnectionState ConnectionState { get; private set; } + public new API.DiscordRpcApiClient ApiClient => base.ApiClient as API.DiscordRpcApiClient; + /// Creates a new RPC discord client. public DiscordRpcClient(string clientId, string origin) : this(new DiscordRpcConfig(clientId, origin)) { } /// Creates a new RPC discord client. public DiscordRpcClient(DiscordRpcConfig config) + : base(config, CreateApiClient(config)) { - LogManager = new LogManager(config.LogLevel); - LogManager.Message += async msg => await _logEvent.InvokeAsync(msg).ConfigureAwait(false); - _clientLogger = LogManager.CreateLogger("Client"); _rpcLogger = LogManager.CreateLogger("RPC"); _isFirstLogSub = true; - _connectionLock = new SemaphoreSlim(1, 1); - _serializer = new JsonSerializer { ContractResolver = new DiscordContractResolver() }; _serializer.Error += (s, e) => { _rpcLogger.WarningAsync(e.ErrorContext.Error).GetAwaiter().GetResult(); e.ErrorContext.Handled = true; }; - - ApiClient = new API.DiscordRpcApiClient(config.ClientId, config.Origin, config.WebSocketProvider); + ApiClient.SentRpcMessage += async opCode => await _rpcLogger.DebugAsync($"Sent {opCode}").ConfigureAwait(false); ApiClient.ReceivedRpcEvent += ProcessMessageAsync; ApiClient.Disconnected += async ex => @@ -79,91 +64,48 @@ namespace Discord await _rpcLogger.WarningAsync($"Connection Closed").ConfigureAwait(false); }; } - private void Dispose(bool disposing) + private static API.DiscordRpcApiClient CreateApiClient(DiscordRpcConfig config) + => new API.DiscordRpcApiClient(config.ClientId, config.Origin, config.RestClientProvider, config.WebSocketProvider, requestQueue: new RequestQueue()); + + internal override void Dispose(bool disposing) { if (!_isDisposed) - { ApiClient.Dispose(); - _isDisposed = true; - } } - public void Dispose() => Dispose(true); - - /// - public async Task LoginAsync(TokenType tokenType, string token, bool validateToken = true) + + protected override async Task OnLoginAsync(TokenType tokenType, string token) { - await _connectionLock.WaitAsync().ConfigureAwait(false); - try - { - await LoginInternalAsync(tokenType, token, validateToken).ConfigureAwait(false); - } - finally { _connectionLock.Release(); } + await ApiClient.LoginAsync(tokenType, token).ConfigureAwait(false); } - private async Task LoginInternalAsync(TokenType tokenType, string token, bool validateToken) + protected override async Task OnLogoutAsync() { - if (_isFirstLogSub) - { - _isFirstLogSub = false; - await WriteInitialLog().ConfigureAwait(false); - } - - if (LoginState != LoginState.LoggedOut) - await LogoutInternalAsync().ConfigureAwait(false); - LoginState = LoginState.LoggingIn; - - try - { - await ApiClient.LoginAsync(tokenType, token).ConfigureAwait(false); - - LoginState = LoginState.LoggedIn; - } - catch (Exception) - { - await LogoutInternalAsync().ConfigureAwait(false); - throw; - } - - await _loggedInEvent.InvokeAsync().ConfigureAwait(false); + await ApiClient.LogoutAsync().ConfigureAwait(false); } - /// - public async Task LogoutAsync() + protected override Task ValidateTokenAsync(TokenType tokenType, string token) { - await _connectionLock.WaitAsync().ConfigureAwait(false); - try - { - await LogoutInternalAsync().ConfigureAwait(false); - } - finally { _connectionLock.Release(); } + return Task.CompletedTask; //Validation is done in DiscordRpcAPIClient } - private async Task LogoutInternalAsync() - { - if (LoginState == LoginState.LoggedOut) return; - LoginState = LoginState.LoggingOut; - - await ApiClient.LogoutAsync().ConfigureAwait(false); - - _currentUser = null; - - LoginState = LoginState.LoggedOut; - await _loggedOutEvent.InvokeAsync().ConfigureAwait(false); - } - - public async Task ConnectAsync() + /// + public Task ConnectAsync() => ConnectAsync(false); + internal async Task ConnectAsync(bool ignoreLoginCheck) { await _connectionLock.WaitAsync().ConfigureAwait(false); try { _isReconnecting = false; - await ConnectInternalAsync().ConfigureAwait(false); + await ConnectInternalAsync(ignoreLoginCheck, false).ConfigureAwait(false); } finally { _connectionLock.Release(); } } - private async Task ConnectInternalAsync(bool ignoreLoginCheck = false) + private async Task ConnectInternalAsync(bool ignoreLoginCheck, bool isReconnecting) { - if (LoginState != LoginState.LoggedIn) - throw new InvalidOperationException("You must log in before connecting or call ConnectAndAuthorizeAsync."); + if (!ignoreLoginCheck && LoginState != LoginState.LoggedIn) + throw new InvalidOperationException("You must log in before connecting."); + + if (!isReconnecting && _reconnectCancelToken != null && !_reconnectCancelToken.IsCancellationRequested) + _reconnectCancelToken.Cancel(); if (_isFirstLogSub) { @@ -173,7 +115,7 @@ namespace Discord var state = ConnectionState; if (state == ConnectionState.Connecting || state == ConnectionState.Connected) - await DisconnectInternalAsync(null).ConfigureAwait(false); + await DisconnectInternalAsync(null, isReconnecting).ConfigureAwait(false); ConnectionState = ConnectionState.Connecting; await _rpcLogger.InfoAsync("Connecting").ConfigureAwait(false); @@ -185,13 +127,13 @@ namespace Discord await _connectedEvent.InvokeAsync().ConfigureAwait(false); await _connectTask.Task.ConfigureAwait(false); - + _canReconnect = true; ConnectionState = ConnectionState.Connected; await _rpcLogger.InfoAsync("Connected").ConfigureAwait(false); } catch (Exception) { - await DisconnectInternalAsync(null).ConfigureAwait(false); + await DisconnectInternalAsync(null, isReconnecting).ConfigureAwait(false); throw; } } @@ -202,12 +144,20 @@ namespace Discord try { _isReconnecting = false; - await DisconnectInternalAsync(null).ConfigureAwait(false); + await DisconnectInternalAsync(null, false).ConfigureAwait(false); } finally { _connectionLock.Release(); } } - private async Task DisconnectInternalAsync(Exception ex) + private async Task DisconnectInternalAsync(Exception ex, bool isReconnecting) { + if (!isReconnecting) + { + _canReconnect = false; + + if (_reconnectCancelToken != null && !_reconnectCancelToken.IsCancellationRequested) + _reconnectCancelToken.Cancel(); + } + if (ConnectionState == ConnectionState.Disconnected) return; ConnectionState = ConnectionState.Disconnecting; await _rpcLogger.InfoAsync("Disconnecting").ConfigureAwait(false); @@ -228,53 +178,51 @@ namespace Discord private async Task StartReconnectAsync(Exception ex) { - //TODO: Is this thread-safe? - if (_reconnectTask != null) return; - + _connectTask?.TrySetException(ex); await _connectionLock.WaitAsync().ConfigureAwait(false); try { - await DisconnectInternalAsync(ex).ConfigureAwait(false); - if (_reconnectTask != null) return; - _isReconnecting = true; - _reconnectTask = ReconnectInternalAsync(); + if (!_canReconnect || _reconnectTask != null) return; + await DisconnectInternalAsync(null, true).ConfigureAwait(false); + _reconnectCancelToken = new CancellationTokenSource(); + _reconnectTask = ReconnectInternalAsync(_reconnectCancelToken.Token); } finally { _connectionLock.Release(); } } - private async Task ReconnectInternalAsync() + private async Task ReconnectInternalAsync(CancellationToken cancelToken) { try { + Random jitter = new Random(); int nextReconnectDelay = 1000; - while (_isReconnecting) + while (true) { + await Task.Delay(nextReconnectDelay, cancelToken).ConfigureAwait(false); + nextReconnectDelay = nextReconnectDelay * 2 + jitter.Next(-250, 250); + if (nextReconnectDelay > 60000) + nextReconnectDelay = 60000; + + await _connectionLock.WaitAsync().ConfigureAwait(false); try { - await Task.Delay(nextReconnectDelay).ConfigureAwait(false); - nextReconnectDelay *= 2; - if (nextReconnectDelay > 30000) - nextReconnectDelay = 30000; - - await _connectionLock.WaitAsync().ConfigureAwait(false); - try - { - await ConnectInternalAsync().ConfigureAwait(false); - } - finally { _connectionLock.Release(); } + if (cancelToken.IsCancellationRequested) return; + await ConnectInternalAsync(false, true).ConfigureAwait(false); + _reconnectTask = null; return; } catch (Exception ex) { await _rpcLogger.WarningAsync("Reconnect failed", ex).ConfigureAwait(false); } + finally { _connectionLock.Release(); } } } - finally + catch (OperationCanceledException) { await _connectionLock.WaitAsync().ConfigureAwait(false); try { - _isReconnecting = false; + await _rpcLogger.DebugAsync("Reconnect cancelled").ConfigureAwait(false); _reconnectTask = null; } finally { _connectionLock.Release(); } @@ -283,7 +231,7 @@ namespace Discord public async Task AuthorizeAsync(string[] scopes) { - await ConnectAsync().ConfigureAwait(false); + await ConnectAsync(true).ConfigureAwait(false); var result = await ApiClient.SendAuthorizeAsync(scopes).ConfigureAwait(false); await DisconnectAsync().ConfigureAwait(false); return result.Code; @@ -314,7 +262,8 @@ namespace Discord //CancellationToken = cancelToken //TODO: Implement }; - await ApiClient.SendAuthenticateAsync(options).ConfigureAwait(false); //Has bearer + if (LoginState != LoginState.LoggedOut) + await ApiClient.SendAuthenticateAsync(options).ConfigureAwait(false); //Has bearer var __ = _connectTask.TrySetResultAsync(true); //Signal the .Connect() call to complete await _rpcLogger.InfoAsync("Ready").ConfigureAwait(false); diff --git a/src/Discord.Net/DiscordRpcConfig.cs b/src/Discord.Net/DiscordRpcConfig.cs index 7b11d5fc4..c7e38bd44 100644 --- a/src/Discord.Net/DiscordRpcConfig.cs +++ b/src/Discord.Net/DiscordRpcConfig.cs @@ -2,7 +2,7 @@ namespace Discord { - public class DiscordRpcConfig : DiscordConfig + public class DiscordRpcConfig : DiscordRestConfig { public const int RpcAPIVersion = 1; diff --git a/src/Discord.Net/DiscordSocketClient.cs b/src/Discord.Net/DiscordSocketClient.cs index ddad2a613..3d485d7e5 100644 --- a/src/Discord.Net/DiscordSocketClient.cs +++ b/src/Discord.Net/DiscordSocketClient.cs @@ -3,6 +3,7 @@ using Discord.Audio; using Discord.Extensions; using Discord.Logging; using Discord.Net.Converters; +using Discord.Net.Queue; using Discord.Net.WebSockets; using Newtonsoft.Json; using Newtonsoft.Json.Linq; @@ -52,6 +53,7 @@ namespace Discord internal DataStore DataStore { get; private set; } internal WebSocketProvider WebSocketProvider { get; private set; } + public new API.DiscordSocketApiClient ApiClient => base.ApiClient as API.DiscordSocketApiClient; internal SocketSelfUser CurrentUser => _currentUser as SocketSelfUser; internal IReadOnlyCollection Guilds => DataStore.Guilds; internal IReadOnlyCollection VoiceRegions => _voiceRegions.ToReadOnlyCollection(); @@ -60,7 +62,7 @@ namespace Discord public DiscordSocketClient() : this(new DiscordSocketConfig()) { } /// Creates a new REST/WebSocket discord client. public DiscordSocketClient(DiscordSocketConfig config) - : base(config) + : base(config, CreateApiClient(config)) { ShardId = config.ShardId; TotalShards = config.TotalShards; @@ -106,8 +108,10 @@ namespace Discord _voiceRegions = ImmutableDictionary.Create(); _largeGuilds = new ConcurrentQueue(); } + private static API.DiscordSocketApiClient CreateApiClient(DiscordSocketConfig config) + => new API.DiscordSocketApiClient(config.RestClientProvider, config.WebSocketProvider, requestQueue: new RequestQueue()); - protected override async Task OnLoginAsync() + protected override async Task OnLoginAsync(TokenType tokenType, string token) { var voiceRegions = await ApiClient.GetVoiceRegionsAsync().ConfigureAwait(false); _voiceRegions = voiceRegions.Select(x => new VoiceRegion(x)).ToImmutableDictionary(x => x.Id); diff --git a/src/Discord.Net/Entities/Users/SelfUser.cs b/src/Discord.Net/Entities/Users/SelfUser.cs index dca9ae837..32576b3e0 100644 --- a/src/Discord.Net/Entities/Users/SelfUser.cs +++ b/src/Discord.Net/Entities/Users/SelfUser.cs @@ -7,9 +7,9 @@ namespace Discord { internal class SelfUser : User, ISelfUser { - private long _idleSince; - private UserStatus _status; - private Game _game; + protected long _idleSince; + protected UserStatus _status; + protected Game _game; public string Email { get; private set; } public bool IsVerified { get; private set; } @@ -61,27 +61,7 @@ namespace Discord var model = await Discord.ApiClient.ModifySelfAsync(args).ConfigureAwait(false); Update(model, UpdateSource.Rest); } - public async Task ModifyStatusAsync(Action func) - { - if (func == null) throw new NullReferenceException(nameof(func)); - - var args = new ModifyPresenceParams(); - func(args); - var game = args._game.GetValueOrDefault(_game); - var status = args._status.GetValueOrDefault(_status); - - long idleSince = _idleSince; - if (status == UserStatus.Idle && _status != UserStatus.Idle) - idleSince = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); - var apiGame = game != null ? new API.Game { Name = game.Name, StreamType = game.StreamType, StreamUrl = game.StreamUrl } : null; - - await Discord.ApiClient.SendStatusUpdateAsync(status == UserStatus.Idle ? _idleSince : (long?)null, apiGame).ConfigureAwait(false); - - //Save values - _idleSince = idleSince; - _game = game; - _status = status; - } + Task ISelfUser.ModifyStatusAsync(Action func) { throw new NotSupportedException(); } } } diff --git a/src/Discord.Net/Entities/WebSocket/Users/SocketSelfUser.cs b/src/Discord.Net/Entities/WebSocket/Users/SocketSelfUser.cs index a4a6aa733..4aceb62ab 100644 --- a/src/Discord.Net/Entities/WebSocket/Users/SocketSelfUser.cs +++ b/src/Discord.Net/Entities/WebSocket/Users/SocketSelfUser.cs @@ -1,4 +1,6 @@ -using System; +using Discord.API.Rest; +using System; +using System.Threading.Tasks; using Model = Discord.API.User; namespace Discord @@ -15,6 +17,29 @@ namespace Discord { } + public async Task ModifyStatusAsync(Action func) + { + if (func == null) throw new NullReferenceException(nameof(func)); + + var args = new ModifyPresenceParams(); + func(args); + + var game = args._game.GetValueOrDefault(_game); + var status = args._status.GetValueOrDefault(_status); + + long idleSince = _idleSince; + if (status == UserStatus.Idle && _status != UserStatus.Idle) + idleSince = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); + var apiGame = game != null ? new API.Game { Name = game.Name, StreamType = game.StreamType, StreamUrl = game.StreamUrl } : null; + + await Discord.ApiClient.SendStatusUpdateAsync(status == UserStatus.Idle ? _idleSince : (long?)null, apiGame).ConfigureAwait(false); + + //Save values + _idleSince = idleSince; + _game = game; + _status = status; + } + public SocketSelfUser Clone() => MemberwiseClone() as SocketSelfUser; ISocketUser ISocketUser.Clone() => Clone(); } diff --git a/src/Discord.Net/IDiscordClient.cs b/src/Discord.Net/IDiscordClient.cs index 87034d2c1..263191099 100644 --- a/src/Discord.Net/IDiscordClient.cs +++ b/src/Discord.Net/IDiscordClient.cs @@ -13,7 +13,7 @@ namespace Discord { ConnectionState ConnectionState { get; } - DiscordApiClient ApiClient { get; } + DiscordRestApiClient ApiClient { get; } ILogManager LogManager { get; } Task ConnectAsync();