You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

DiscordRpcApiClient.cs 18 KiB

8 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397
  1. #pragma warning disable CS1591
  2. using Discord.API.Rpc;
  3. using Discord.Net.Queue;
  4. using Discord.Net.Rest;
  5. using Discord.Net.WebSockets;
  6. using Discord.Rpc;
  7. using Newtonsoft.Json;
  8. using Newtonsoft.Json.Linq;
  9. using System;
  10. using System.Collections.Concurrent;
  11. using System.Collections.Generic;
  12. using System.IO;
  13. using System.IO.Compression;
  14. using System.Text;
  15. using System.Threading;
  16. using System.Threading.Tasks;
  17. namespace Discord.API
  18. {
  19. internal class DiscordRpcApiClient : DiscordRestApiClient, IDisposable
  20. {
  21. private abstract class RpcRequest
  22. {
  23. public abstract Task SetResultAsync(JToken data, JsonSerializer serializer);
  24. public abstract Task SetExceptionAsync(JToken data, JsonSerializer serializer);
  25. }
  26. private class RpcRequest<T> : RpcRequest
  27. {
  28. public TaskCompletionSource<T> Promise { get; set; }
  29. public RpcRequest(RequestOptions options)
  30. {
  31. Promise = new TaskCompletionSource<T>();
  32. Task.Run(async () =>
  33. {
  34. await Task.Delay(options?.Timeout ?? 15000).ConfigureAwait(false);
  35. Promise.TrySetCanceled(); //Doesn't need to be async, we're already in a separate task
  36. });
  37. }
  38. public override Task SetResultAsync(JToken data, JsonSerializer serializer)
  39. {
  40. return Promise.TrySetResultAsync(data.ToObject<T>(serializer));
  41. }
  42. public override Task SetExceptionAsync(JToken data, JsonSerializer serializer)
  43. {
  44. var error = data.ToObject<ErrorEvent>(serializer);
  45. return Promise.TrySetExceptionAsync(new RpcException(error.Code, error.Message));
  46. }
  47. }
  48. private object _eventLock = new object();
  49. public event Func<string, Task> SentRpcMessage { add { _sentRpcMessageEvent.Add(value); } remove { _sentRpcMessageEvent.Remove(value); } }
  50. private readonly AsyncEvent<Func<string, Task>> _sentRpcMessageEvent = new AsyncEvent<Func<string, Task>>();
  51. public event Func<string, Optional<string>, Optional<object>, Task> ReceivedRpcEvent { add { _receivedRpcEvent.Add(value); } remove { _receivedRpcEvent.Remove(value); } }
  52. private readonly AsyncEvent<Func<string, Optional<string>, Optional<object>, Task>> _receivedRpcEvent = new AsyncEvent<Func<string, Optional<string>, Optional<object>, Task>>();
  53. public event Func<Exception, Task> Disconnected { add { _disconnectedEvent.Add(value); } remove { _disconnectedEvent.Remove(value); } }
  54. private readonly AsyncEvent<Func<Exception, Task>> _disconnectedEvent = new AsyncEvent<Func<Exception, Task>>();
  55. private readonly ConcurrentDictionary<Guid, RpcRequest> _requests;
  56. private readonly IWebSocketClient _webSocketClient;
  57. private readonly SemaphoreSlim _connectionLock;
  58. private readonly string _clientId;
  59. private CancellationTokenSource _stateCancelToken;
  60. private string _origin;
  61. public ConnectionState ConnectionState { get; private set; }
  62. public DiscordRpcApiClient(string clientId, string userAgent, string origin, RestClientProvider restClientProvider, WebSocketProvider webSocketProvider,
  63. RetryMode defaultRetryMode = RetryMode.AlwaysRetry, JsonSerializer serializer = null)
  64. : base(restClientProvider, userAgent, defaultRetryMode, serializer)
  65. {
  66. _connectionLock = new SemaphoreSlim(1, 1);
  67. _clientId = clientId;
  68. _origin = origin;
  69. _requests = new ConcurrentDictionary<Guid, RpcRequest>();
  70. _webSocketClient = webSocketProvider();
  71. //_webSocketClient.SetHeader("user-agent", DiscordConfig.UserAgent); (Causes issues in .Net 4.6+)
  72. _webSocketClient.SetHeader("origin", _origin);
  73. _webSocketClient.BinaryMessage += async (data, index, count) =>
  74. {
  75. using (var compressed = new MemoryStream(data, index + 2, count - 2))
  76. using (var decompressed = new MemoryStream())
  77. {
  78. using (var zlib = new DeflateStream(compressed, CompressionMode.Decompress))
  79. zlib.CopyTo(decompressed);
  80. decompressed.Position = 0;
  81. using (var reader = new StreamReader(decompressed))
  82. using (var jsonReader = new JsonTextReader(reader))
  83. {
  84. var msg = _serializer.Deserialize<API.Rpc.RpcFrame>(jsonReader);
  85. await _receivedRpcEvent.InvokeAsync(msg.Cmd, msg.Event, msg.Data).ConfigureAwait(false);
  86. if (msg.Nonce.IsSpecified && msg.Nonce.Value.HasValue)
  87. ProcessMessage(msg);
  88. }
  89. }
  90. };
  91. _webSocketClient.TextMessage += async text =>
  92. {
  93. using (var reader = new StringReader(text))
  94. using (var jsonReader = new JsonTextReader(reader))
  95. {
  96. var msg = _serializer.Deserialize<API.Rpc.RpcFrame>(jsonReader);
  97. await _receivedRpcEvent.InvokeAsync(msg.Cmd, msg.Event, msg.Data).ConfigureAwait(false);
  98. if (msg.Nonce.IsSpecified && msg.Nonce.Value.HasValue)
  99. ProcessMessage(msg);
  100. }
  101. };
  102. _webSocketClient.Closed += async ex =>
  103. {
  104. await DisconnectAsync().ConfigureAwait(false);
  105. await _disconnectedEvent.InvokeAsync(ex).ConfigureAwait(false);
  106. };
  107. }
  108. internal override void Dispose(bool disposing)
  109. {
  110. if (!_isDisposed)
  111. {
  112. if (disposing)
  113. {
  114. _stateCancelToken?.Dispose();
  115. (_webSocketClient as IDisposable)?.Dispose();
  116. }
  117. _isDisposed = true;
  118. }
  119. }
  120. public async Task ConnectAsync()
  121. {
  122. await _connectionLock.WaitAsync().ConfigureAwait(false);
  123. try
  124. {
  125. await ConnectInternalAsync().ConfigureAwait(false);
  126. }
  127. finally { _connectionLock.Release(); }
  128. }
  129. internal override async Task ConnectInternalAsync()
  130. {
  131. /*if (LoginState != LoginState.LoggedIn)
  132. throw new InvalidOperationException("Client is not logged in.");*/
  133. ConnectionState = ConnectionState.Connecting;
  134. try
  135. {
  136. _stateCancelToken = new CancellationTokenSource();
  137. if (_webSocketClient != null)
  138. _webSocketClient.SetCancelToken(_stateCancelToken.Token);
  139. bool success = false;
  140. int port;
  141. string uuid = Guid.NewGuid().ToString();
  142. for ( port = DiscordRpcConfig.PortRangeStart; port <= DiscordRpcConfig.PortRangeEnd; port++)
  143. {
  144. try
  145. {
  146. string url = $"wss://{uuid}.discordapp.io:{port}/?v={DiscordRpcConfig.RpcAPIVersion}&client_id={_clientId}";
  147. await _webSocketClient.ConnectAsync(url).ConfigureAwait(false);
  148. success = true;
  149. break;
  150. }
  151. catch (Exception)
  152. {
  153. }
  154. }
  155. if (!success)
  156. throw new Exception("Unable to connect to the RPC server.");
  157. SetBaseUrl($"https://{uuid}.discordapp.io:{port}/");
  158. ConnectionState = ConnectionState.Connected;
  159. }
  160. catch (Exception)
  161. {
  162. await DisconnectInternalAsync().ConfigureAwait(false);
  163. throw;
  164. }
  165. }
  166. public async Task DisconnectAsync()
  167. {
  168. await _connectionLock.WaitAsync().ConfigureAwait(false);
  169. try
  170. {
  171. await DisconnectInternalAsync().ConfigureAwait(false);
  172. }
  173. finally { _connectionLock.Release(); }
  174. }
  175. internal override async Task DisconnectInternalAsync()
  176. {
  177. if (_webSocketClient == null)
  178. throw new NotSupportedException("This client is not configured with websocket support.");
  179. if (ConnectionState == ConnectionState.Disconnected) return;
  180. ConnectionState = ConnectionState.Disconnecting;
  181. try { _stateCancelToken?.Cancel(false); }
  182. catch { }
  183. await _webSocketClient.DisconnectAsync().ConfigureAwait(false);
  184. ConnectionState = ConnectionState.Disconnected;
  185. }
  186. //Core
  187. public async Task<TResponse> SendRpcAsync<TResponse>(string cmd, object payload, Optional<string> evt = default(Optional<string>), RequestOptions options = null)
  188. where TResponse : class
  189. {
  190. return await SendRpcAsyncInternal<TResponse>(cmd, payload, evt, options).ConfigureAwait(false);
  191. }
  192. private async Task<TResponse> SendRpcAsyncInternal<TResponse>(string cmd, object payload, Optional<string> evt, RequestOptions options)
  193. where TResponse : class
  194. {
  195. byte[] bytes = null;
  196. var guid = Guid.NewGuid();
  197. payload = new API.Rpc.RpcFrame { Cmd = cmd, Event = evt, Args = payload, Nonce = guid };
  198. if (payload != null)
  199. {
  200. var json = SerializeJson(payload);
  201. bytes = Encoding.UTF8.GetBytes(json);
  202. }
  203. var requestTracker = new RpcRequest<TResponse>(options);
  204. _requests[guid] = requestTracker;
  205. await RequestQueue.SendAsync(new WebSocketRequest(_webSocketClient, null, bytes, true, options)).ConfigureAwait(false);
  206. await _sentRpcMessageEvent.InvokeAsync(cmd).ConfigureAwait(false);
  207. return await requestTracker.Promise.Task.ConfigureAwait(false);
  208. }
  209. //Rpc
  210. public async Task<AuthenticateResponse> SendAuthenticateAsync(RequestOptions options = null)
  211. {
  212. options = RequestOptions.CreateOrClone(options);
  213. var msg = new AuthenticateParams
  214. {
  215. AccessToken = AuthToken
  216. };
  217. options.IgnoreState = true;
  218. return await SendRpcAsync<AuthenticateResponse>("AUTHENTICATE", msg, options: options).ConfigureAwait(false);
  219. }
  220. public async Task<AuthorizeResponse> SendAuthorizeAsync(IReadOnlyCollection<string> scopes, string rpcToken = null, RequestOptions options = null)
  221. {
  222. options = RequestOptions.CreateOrClone(options);
  223. var msg = new AuthorizeParams
  224. {
  225. ClientId = _clientId,
  226. Scopes = scopes,
  227. RpcToken = rpcToken != null ? rpcToken : Optional.Create<string>()
  228. };
  229. if (options.Timeout == null)
  230. options.Timeout = 60000; //This requires manual input on the user's end, lets give them more time
  231. options.IgnoreState = true;
  232. return await SendRpcAsync<AuthorizeResponse>("AUTHORIZE", msg, options: options).ConfigureAwait(false);
  233. }
  234. public async Task<GetGuildsResponse> SendGetGuildsAsync(RequestOptions options = null)
  235. {
  236. options = RequestOptions.CreateOrClone(options);
  237. return await SendRpcAsync<GetGuildsResponse>("GET_GUILDS", null, options: options).ConfigureAwait(false);
  238. }
  239. public async Task<Rpc.Guild> SendGetGuildAsync(ulong guildId, RequestOptions options = null)
  240. {
  241. options = RequestOptions.CreateOrClone(options);
  242. var msg = new GetGuildParams
  243. {
  244. GuildId = guildId
  245. };
  246. return await SendRpcAsync<Rpc.Guild>("GET_GUILD", msg, options: options).ConfigureAwait(false);
  247. }
  248. public async Task<GetChannelsResponse> SendGetChannelsAsync(ulong guildId, RequestOptions options = null)
  249. {
  250. options = RequestOptions.CreateOrClone(options);
  251. var msg = new GetChannelsParams
  252. {
  253. GuildId = guildId
  254. };
  255. return await SendRpcAsync<GetChannelsResponse>("GET_CHANNELS", msg, options: options).ConfigureAwait(false);
  256. }
  257. public async Task<Rpc.Channel> SendGetChannelAsync(ulong channelId, RequestOptions options = null)
  258. {
  259. options = RequestOptions.CreateOrClone(options);
  260. var msg = new GetChannelParams
  261. {
  262. ChannelId = channelId
  263. };
  264. return await SendRpcAsync<Rpc.Channel>("GET_CHANNEL", msg, options: options).ConfigureAwait(false);
  265. }
  266. public async Task<Rpc.Channel> SendSelectTextChannelAsync(ulong channelId, RequestOptions options = null)
  267. {
  268. options = RequestOptions.CreateOrClone(options);
  269. var msg = new SelectChannelParams
  270. {
  271. ChannelId = channelId
  272. };
  273. return await SendRpcAsync<Rpc.Channel>("SELECT_TEXT_CHANNEL", msg, options: options).ConfigureAwait(false);
  274. }
  275. public async Task<Rpc.Channel> SendSelectVoiceChannelAsync(ulong channelId, bool force = false, RequestOptions options = null)
  276. {
  277. options = RequestOptions.CreateOrClone(options);
  278. var msg = new SelectChannelParams
  279. {
  280. ChannelId = channelId,
  281. Force = force
  282. };
  283. return await SendRpcAsync<Rpc.Channel>("SELECT_VOICE_CHANNEL", msg, options: options).ConfigureAwait(false);
  284. }
  285. public async Task<SubscriptionResponse> SendGlobalSubscribeAsync(string evt, RequestOptions options = null)
  286. {
  287. options = RequestOptions.CreateOrClone(options);
  288. return await SendRpcAsync<SubscriptionResponse>("SUBSCRIBE", null, evt: evt, options: options).ConfigureAwait(false);
  289. }
  290. public async Task<SubscriptionResponse> SendGlobalUnsubscribeAsync(string evt, RequestOptions options = null)
  291. {
  292. options = RequestOptions.CreateOrClone(options);
  293. return await SendRpcAsync<SubscriptionResponse>("UNSUBSCRIBE", null, evt: evt, options: options).ConfigureAwait(false);
  294. }
  295. public async Task<SubscriptionResponse> SendGuildSubscribeAsync(string evt, ulong guildId, RequestOptions options = null)
  296. {
  297. options = RequestOptions.CreateOrClone(options);
  298. var msg = new GuildSubscriptionParams
  299. {
  300. GuildId = guildId
  301. };
  302. return await SendRpcAsync<SubscriptionResponse>("SUBSCRIBE", msg, evt: evt, options: options).ConfigureAwait(false);
  303. }
  304. public async Task<SubscriptionResponse> SendGuildUnsubscribeAsync(string evt, ulong guildId, RequestOptions options = null)
  305. {
  306. options = RequestOptions.CreateOrClone(options);
  307. var msg = new GuildSubscriptionParams
  308. {
  309. GuildId = guildId
  310. };
  311. return await SendRpcAsync<SubscriptionResponse>("UNSUBSCRIBE", msg, evt: evt, options: options).ConfigureAwait(false);
  312. }
  313. public async Task<SubscriptionResponse> SendChannelSubscribeAsync(string evt, ulong channelId, RequestOptions options = null)
  314. {
  315. options = RequestOptions.CreateOrClone(options);
  316. var msg = new ChannelSubscriptionParams
  317. {
  318. ChannelId = channelId
  319. };
  320. return await SendRpcAsync<SubscriptionResponse>("SUBSCRIBE", msg, evt: evt, options: options).ConfigureAwait(false);
  321. }
  322. public async Task<SubscriptionResponse> SendChannelUnsubscribeAsync(string evt, ulong channelId, RequestOptions options = null)
  323. {
  324. options = RequestOptions.CreateOrClone(options);
  325. var msg = new ChannelSubscriptionParams
  326. {
  327. ChannelId = channelId
  328. };
  329. return await SendRpcAsync<SubscriptionResponse>("UNSUBSCRIBE", msg, evt: evt, options: options).ConfigureAwait(false);
  330. }
  331. public async Task<API.Rpc.VoiceSettings> GetVoiceSettingsAsync(RequestOptions options = null)
  332. {
  333. options = RequestOptions.CreateOrClone(options);
  334. return await SendRpcAsync<API.Rpc.VoiceSettings>("GET_VOICE_SETTINGS", null, options: options).ConfigureAwait(false);
  335. }
  336. public async Task SetVoiceSettingsAsync(API.Rpc.VoiceSettings settings, RequestOptions options = null)
  337. {
  338. options = RequestOptions.CreateOrClone(options);
  339. await SendRpcAsync<API.Rpc.VoiceSettings>("SET_VOICE_SETTINGS", settings, options: options).ConfigureAwait(false);
  340. }
  341. public async Task SetUserVoiceSettingsAsync(ulong userId, API.Rpc.UserVoiceSettings settings, RequestOptions options = null)
  342. {
  343. options = RequestOptions.CreateOrClone(options);
  344. settings.UserId = userId;
  345. await SendRpcAsync<API.Rpc.UserVoiceSettings>("SET_USER_VOICE_SETTINGS", settings, options: options).ConfigureAwait(false);
  346. }
  347. private bool ProcessMessage(API.Rpc.RpcFrame msg)
  348. {
  349. if (_requests.TryGetValue(msg.Nonce.Value.Value, out RpcRequest requestTracker))
  350. {
  351. if (msg.Event.GetValueOrDefault("") == "ERROR")
  352. {
  353. var _ = requestTracker.SetExceptionAsync(msg.Data.GetValueOrDefault() as JToken, _serializer);
  354. }
  355. else
  356. {
  357. var _ = requestTracker.SetResultAsync(msg.Data.GetValueOrDefault() as JToken, _serializer);
  358. }
  359. return true;
  360. }
  361. else
  362. return false;
  363. }
  364. }
  365. }