@@ -1,16 +1,19 @@
using Discord.API.Rest;
using Discord.API.Gateway;
using Discord.API.Rest;
using Discord.Net;
using Discord.Net.Converters;
using Discord.Net.Queue;
using Discord.Net.Rest;
using Discord.Net.WebSockets;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Diagnostics;
using System.Globalization;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Net;
using System.Text;
@@ -21,14 +24,17 @@ namespace Discord.API
{
public class DiscordApiClient : IDisposable
{
internal event Func<SentRequestEventArgs, Task> SentRequest;
public event Func<string, string, double, Task> SentRequest;
public event Func<int, Task> SentGatewayMessage;
public event Func<GatewayOpCodes, string, JToken, Task> ReceivedGatewayEvent;
private readonly RequestQueue _requestQueue;
private readonly JsonSerializer _serializer;
private readonly IRestClient _restClient;
private readonly IWebSocketClient _gatewayClient;
private readonly SemaphoreSlim _connectionLock;
private CancellationTokenSource _loginCancelToken, _connectCancelToken;
private string _authToken;
private bool _isDisposed;
public LoginState LoginState { get; private set; }
@@ -48,6 +54,26 @@ namespace Discord.API
{
_gatewayClient = webSocketProvider();
_gatewayClient.SetHeader("user-agent", DiscordConfig.UserAgent);
_gatewayClient.BinaryMessage += async (data, index, count) =>
{
using (var compressed = new MemoryStream(data, index + 2, count - 2))
using (var decompressed = new MemoryStream())
{
using (var zlib = new DeflateStream(compressed, CompressionMode.Decompress))
zlib.CopyTo(decompressed);
decompressed.Position = 0;
using (var reader = new StreamReader(decompressed))
{
var msg = JsonConvert.DeserializeObject<WebSocketMessage>(reader.ReadToEnd());
await ReceivedGatewayEvent.Raise((GatewayOpCodes)msg.Operation, msg.Type, msg.Payload as JToken).ConfigureAwait(false);
}
}
};
_gatewayClient.TextMessage += async text =>
{
var msg = JsonConvert.DeserializeObject<WebSocketMessage>(text);
await ReceivedGatewayEvent.Raise((GatewayOpCodes)msg.Operation, msg.Type, msg.Payload as JToken).ConfigureAwait(false);
};
}
_serializer = serializer ?? new JsonSerializer { ContractResolver = new DiscordContractResolver() };
@@ -95,6 +121,7 @@ namespace Discord.API
_loginCancelToken = new CancellationTokenSource();
AuthTokenType = TokenType.User;
_authToken = null;
_restClient.SetHeader("authorization", null);
await _requestQueue.SetCancelToken(_loginCancelToken.Token).ConfigureAwait(false);
_restClient.SetCancelToken(_loginCancelToken.Token);
@@ -106,6 +133,7 @@ namespace Discord.API
}
AuthTokenType = tokenType;
_authToken = token;
switch (tokenType)
{
case TokenType.Bot:
@@ -181,7 +209,10 @@ namespace Discord.API
_gatewayClient.SetCancelToken(_connectCancelToken.Token);
var gatewayResponse = await GetGateway().ConfigureAwait(false);
await _gatewayClient.Connect(gatewayResponse.Url).ConfigureAwait(false);
var url = $"{gatewayResponse.Url}?v={DiscordConfig.GatewayAPIVersion}&encoding={DiscordConfig.GatewayEncoding}";
await _gatewayClient.Connect(url).ConfigureAwait(false);
await SendIdentify().ConfigureAwait(false);
ConnectionState = ConnectionState.Connected;
}
@@ -226,13 +257,13 @@ namespace Discord.API
=> SendInternal(method, endpoint, multipartArgs, true, bucket);
public async Task<TResponse> Send<TResponse>(string method, string endpoint, GlobalBucket bucket = GlobalBucket.General)
where TResponse : class
=> Deserialize<TResponse>(await SendInternal(method, endpoint, null, false, bucket).ConfigureAwait(false));
=> DeserializeJson <TResponse>(await SendInternal(method, endpoint, null, false, bucket).ConfigureAwait(false));
public async Task<TResponse> Send<TResponse>(string method, string endpoint, object payload, GlobalBucket bucket = GlobalBucket.General)
where TResponse : class
=> Deserialize<TResponse>(await SendInternal(method, endpoint, payload, false, bucket).ConfigureAwait(false));
=> DeserializeJson <TResponse>(await SendInternal(method, endpoint, payload, false, bucket).ConfigureAwait(false));
public async Task<TResponse> Send<TResponse>(string method, string endpoint, Stream file, IReadOnlyDictionary<string, string> multipartArgs, GlobalBucket bucket = GlobalBucket.General)
where TResponse : class
=> Deserialize<TResponse>(await SendInternal(method, endpoint, multipartArgs, false, bucket).ConfigureAwait(false));
=> DeserializeJson <TResponse>(await SendInternal(method, endpoint, multipartArgs, false, bucket).ConfigureAwait(false));
public Task Send(string method, string endpoint, GuildBucket bucket, ulong guildId)
=> SendInternal(method, endpoint, null, true, bucket, guildId);
@@ -242,14 +273,14 @@ namespace Discord.API
=> SendInternal(method, endpoint, multipartArgs, true, bucket, guildId);
public async Task<TResponse> Send<TResponse>(string method, string endpoint, GuildBucket bucket, ulong guildId)
where TResponse : class
=> Deserialize<TResponse>(await SendInternal(method, endpoint, null, false, bucket, guildId).ConfigureAwait(false));
=> DeserializeJson <TResponse>(await SendInternal(method, endpoint, null, false, bucket, guildId).ConfigureAwait(false));
public async Task<TResponse> Send<TResponse>(string method, string endpoint, object payload, GuildBucket bucket, ulong guildId)
where TResponse : class
=> Deserialize<TResponse>(await SendInternal(method, endpoint, payload, false, bucket, guildId).ConfigureAwait(false));
=> DeserializeJson <TResponse>(await SendInternal(method, endpoint, payload, false, bucket, guildId).ConfigureAwait(false));
public async Task<TResponse> Send<TResponse>(string method, string endpoint, Stream file, IReadOnlyDictionary<string, string> multipartArgs, GuildBucket bucket, ulong guildId)
where TResponse : class
=> Deserialize<TResponse>(await SendInternal(method, endpoint, multipartArgs, false, bucket, guildId).ConfigureAwait(false));
=> DeserializeJson <TResponse>(await SendInternal(method, endpoint, multipartArgs, false, bucket, guildId).ConfigureAwait(false));
private Task<Stream> SendInternal(string method, string endpoint, object payload, bool headerOnly, GlobalBucket bucket)
=> SendInternal(method, endpoint, payload, headerOnly, BucketGroup.Global, (int)bucket, 0);
private Task<Stream> SendInternal(string method, string endpoint, object payload, bool headerOnly, GuildBucket bucket, ulong guildId)
@@ -264,13 +295,12 @@ namespace Discord.API
var stopwatch = Stopwatch.StartNew();
string json = null;
if (payload != null)
json = Serialize(payload);
json = SerializeJson (payload);
var responseStream = await _requestQueue.Send(new RestRequest(_restClient, method, endpoint, json, headerOnly), group, bucketId, guildId).ConfigureAwait(false);
int bytes = headerOnly ? 0 : (int)responseStream.Length;
stopwatch.Stop();
double milliseconds = ToMilliseconds(stopwatch);
await SentRequest.Raise(new SentRequestEventArgs( method, endpoint, bytes, milliseconds) ).ConfigureAwait(false);
await SentRequest.Raise(method, endpoint, milliseconds).ConfigureAwait(false);
return responseStream;
}
@@ -282,11 +312,28 @@ namespace Discord.API
stopwatch.Stop();
double milliseconds = ToMilliseconds(stopwatch);
await SentRequest.Raise(new SentRequestEventArgs( method, endpoint, bytes, milliseconds) ).ConfigureAwait(false);
await SentRequest.Raise(method, endpoint, milliseconds).ConfigureAwait(false);
return responseStream;
}
public Task SendGateway(GatewayOpCodes opCode, object payload, GlobalBucket bucket = GlobalBucket.Gateway)
=> SendGateway((int)opCode, payload, BucketGroup.Global, (int)bucket, 0);
public Task SendGateway(VoiceOpCodes opCode, object payload, GlobalBucket bucket = GlobalBucket.Gateway)
=> SendGateway((int)opCode, payload, BucketGroup.Global, (int)bucket, 0);
public Task SendGateway(GatewayOpCodes opCode, object payload, GuildBucket bucket, ulong guildId)
=> SendGateway((int)opCode, payload, BucketGroup.Guild, (int)bucket, guildId);
public Task SendGateway(VoiceOpCodes opCode, object payload, GuildBucket bucket, ulong guildId)
=> SendGateway((int)opCode, payload, BucketGroup.Guild, (int)bucket, guildId);
private async Task SendGateway(int opCode, object payload, BucketGroup group, int bucketId, ulong guildId)
{
//TODO: Add ETF
byte[] bytes = null;
payload = new WebSocketMessage { Operation = opCode, Payload = payload };
if (payload != null)
bytes = Encoding.UTF8.GetBytes(SerializeJson(payload));
await _requestQueue.Send(new WebSocketRequest(_gatewayClient, bytes, true), group, bucketId, guildId).ConfigureAwait(false);
}
//Auth
public async Task ValidateToken()
@@ -299,6 +346,21 @@ namespace Discord.API
{
return await Send<GetGatewayResponse>("GET", "gateway").ConfigureAwait(false);
}
public async Task SendIdentify(int largeThreshold = 100, bool useCompression = true)
{
var props = new Dictionary<string, string>
{
["$device"] = "Discord.Net"
};
var msg = new IdentifyParams()
{
Token = _authToken,
Properties = props,
LargeThreshold = largeThreshold,
UseCompression = useCompression
};
await SendGateway(GatewayOpCodes.Identify, msg).ConfigureAwait(false);
}
//Channels
public async Task<Channel> GetChannel(ulong channelId)
@@ -986,7 +1048,7 @@ namespace Discord.API
//Helpers
private static double ToMilliseconds(Stopwatch stopwatch) => Math.Round((double)stopwatch.ElapsedTicks / (double)Stopwatch.Frequency * 1000.0, 2);
private string Serialize(object value)
private string SerializeJson (object value)
{
var sb = new StringBuilder(256);
using (TextWriter text = new StringWriter(sb, CultureInfo.InvariantCulture))
@@ -994,7 +1056,7 @@ namespace Discord.API
_serializer.Serialize(writer, value);
return sb.ToString();
}
private T Deserialize<T>(Stream jsonStream)
private T DeserializeJson <T>(Stream jsonStream)
{
using (TextReader text = new StreamReader(jsonStream))
using (JsonReader reader = new JsonTextReader(text))