You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

AudioClient.cs 22 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476
  1. using Discord.API.Voice;
  2. using Discord.Audio.Streams;
  3. using Discord.Logging;
  4. using Discord.Net.Converters;
  5. using Discord.WebSocket;
  6. using Newtonsoft.Json;
  7. using Newtonsoft.Json.Linq;
  8. using System;
  9. using System.Collections.Concurrent;
  10. using System.Linq;
  11. using System.Text;
  12. using System.Threading;
  13. using System.Threading.Tasks;
  14. using System.Collections.Generic;
  15. namespace Discord.Audio
  16. {
  17. //TODO: Add audio reconnecting
  18. internal partial class AudioClient : IAudioClient
  19. {
  20. internal struct StreamPair
  21. {
  22. public AudioInStream Reader;
  23. public AudioOutStream Writer;
  24. public StreamPair(AudioInStream reader, AudioOutStream writer)
  25. {
  26. Reader = reader;
  27. Writer = writer;
  28. }
  29. }
  30. private readonly Logger _audioLogger;
  31. private readonly JsonSerializer _serializer;
  32. private readonly ConnectionManager _connection;
  33. private readonly SemaphoreSlim _stateLock;
  34. private readonly ConcurrentQueue<long> _heartbeatTimes;
  35. private readonly ConcurrentQueue<KeyValuePair<ulong, int>> _keepaliveTimes;
  36. private readonly ConcurrentDictionary<uint, ulong> _ssrcMap;
  37. private readonly ConcurrentDictionary<ulong, StreamPair> _streams;
  38. private Task _heartbeatTask, _keepaliveTask;
  39. private long _lastMessageTime;
  40. private string _url, _sessionId, _token;
  41. private ulong _userId;
  42. private uint _ssrc;
  43. private bool _isSpeaking;
  44. public SocketGuild Guild { get; }
  45. public DiscordVoiceAPIClient ApiClient { get; private set; }
  46. public int Latency { get; private set; }
  47. public int UdpLatency { get; private set; }
  48. public ulong ChannelId { get; internal set; }
  49. internal byte[] SecretKey { get; private set; }
  50. private DiscordSocketClient Discord => Guild.Discord;
  51. public ConnectionState ConnectionState => _connection.State;
  52. /// <summary> Creates a new REST/WebSocket discord client. </summary>
  53. internal AudioClient(SocketGuild guild, int clientId, ulong channelId)
  54. {
  55. Guild = guild;
  56. ChannelId = channelId;
  57. _audioLogger = Discord.LogManager.CreateLogger($"Audio #{clientId}");
  58. ApiClient = new DiscordVoiceAPIClient(guild.Id, Discord.WebSocketProvider, Discord.UdpSocketProvider);
  59. ApiClient.SentGatewayMessage += async opCode => await _audioLogger.DebugAsync($"Sent {opCode}").ConfigureAwait(false);
  60. ApiClient.SentDiscovery += async () => await _audioLogger.DebugAsync("Sent Discovery").ConfigureAwait(false);
  61. //ApiClient.SentData += async bytes => await _audioLogger.DebugAsync($"Sent {bytes} Bytes").ConfigureAwait(false);
  62. ApiClient.ReceivedEvent += ProcessMessageAsync;
  63. ApiClient.ReceivedPacket += ProcessPacketAsync;
  64. _stateLock = new SemaphoreSlim(1, 1);
  65. _connection = new ConnectionManager(_stateLock, _audioLogger, 30000,
  66. OnConnectingAsync, OnDisconnectingAsync, x => ApiClient.Disconnected += x);
  67. _connection.Connected += () => _connectedEvent.InvokeAsync();
  68. _connection.Disconnected += (ex, recon) => _disconnectedEvent.InvokeAsync(ex);
  69. _heartbeatTimes = new ConcurrentQueue<long>();
  70. _keepaliveTimes = new ConcurrentQueue<KeyValuePair<ulong, int>>();
  71. _ssrcMap = new ConcurrentDictionary<uint, ulong>();
  72. _streams = new ConcurrentDictionary<ulong, StreamPair>();
  73. _serializer = new JsonSerializer { ContractResolver = new DiscordContractResolver() };
  74. _serializer.Error += (s, e) =>
  75. {
  76. _audioLogger.WarningAsync(e.ErrorContext.Error).GetAwaiter().GetResult();
  77. e.ErrorContext.Handled = true;
  78. };
  79. LatencyUpdated += async (old, val) => await _audioLogger.DebugAsync($"Latency = {val} ms").ConfigureAwait(false);
  80. UdpLatencyUpdated += async (old, val) => await _audioLogger.DebugAsync($"UDP Latency = {val} ms").ConfigureAwait(false);
  81. }
  82. internal async Task StartAsync(string url, ulong userId, string sessionId, string token)
  83. {
  84. _url = url;
  85. _userId = userId;
  86. _sessionId = sessionId;
  87. _token = token;
  88. await _connection.StartAsync().ConfigureAwait(false);
  89. }
  90. public async Task StopAsync()
  91. {
  92. await _connection.StopAsync().ConfigureAwait(false);
  93. }
  94. private async Task OnConnectingAsync()
  95. {
  96. await _audioLogger.DebugAsync("Connecting ApiClient").ConfigureAwait(false);
  97. await ApiClient.ConnectAsync("wss://" + _url + "?v=" + DiscordConfig.VoiceAPIVersion).ConfigureAwait(false);
  98. await _audioLogger.DebugAsync("Listening on port " + ApiClient.UdpPort).ConfigureAwait(false);
  99. await _audioLogger.DebugAsync("Sending Identity").ConfigureAwait(false);
  100. await ApiClient.SendIdentityAsync(_userId, _sessionId, _token).ConfigureAwait(false);
  101. //Wait for READY
  102. await _connection.WaitAsync().ConfigureAwait(false);
  103. }
  104. private async Task OnDisconnectingAsync(Exception ex)
  105. {
  106. await _audioLogger.DebugAsync("Disconnecting ApiClient").ConfigureAwait(false);
  107. await ApiClient.DisconnectAsync().ConfigureAwait(false);
  108. //Wait for tasks to complete
  109. await _audioLogger.DebugAsync("Waiting for heartbeater").ConfigureAwait(false);
  110. var heartbeatTask = _heartbeatTask;
  111. if (heartbeatTask != null)
  112. await heartbeatTask.ConfigureAwait(false);
  113. _heartbeatTask = null;
  114. var keepaliveTask = _keepaliveTask;
  115. if (keepaliveTask != null)
  116. await keepaliveTask.ConfigureAwait(false);
  117. _keepaliveTask = null;
  118. while (_heartbeatTimes.TryDequeue(out _)) { }
  119. _lastMessageTime = 0;
  120. await ClearInputStreamsAsync().ConfigureAwait(false);
  121. await _audioLogger.DebugAsync("Sending Voice State").ConfigureAwait(false);
  122. await Discord.ApiClient.SendVoiceStateUpdateAsync(Guild.Id, null, false, false).ConfigureAwait(false);
  123. }
  124. public AudioOutStream CreateOpusStream(int bufferMillis)
  125. {
  126. var outputStream = new OutputStream(ApiClient); //Ignores header
  127. var sodiumEncrypter = new SodiumEncryptStream( outputStream, this); //Passes header
  128. var rtpWriter = new RTPWriteStream(sodiumEncrypter, _ssrc); //Consumes header, passes
  129. return new BufferedWriteStream(rtpWriter, this, bufferMillis, _connection.CancelToken, _audioLogger); //Generates header
  130. }
  131. public AudioOutStream CreateDirectOpusStream()
  132. {
  133. var outputStream = new OutputStream(ApiClient); //Ignores header
  134. var sodiumEncrypter = new SodiumEncryptStream(outputStream, this); //Passes header
  135. return new RTPWriteStream(sodiumEncrypter, _ssrc); //Consumes header (external input), passes
  136. }
  137. public AudioOutStream CreatePCMStream(AudioApplication application, int? bitrate, int bufferMillis, int packetLoss)
  138. {
  139. var outputStream = new OutputStream(ApiClient); //Ignores header
  140. var sodiumEncrypter = new SodiumEncryptStream(outputStream, this); //Passes header
  141. var rtpWriter = new RTPWriteStream(sodiumEncrypter, _ssrc); //Consumes header, passes
  142. var bufferedStream = new BufferedWriteStream(rtpWriter, this, bufferMillis, _connection.CancelToken, _audioLogger); //Ignores header, generates header
  143. return new OpusEncodeStream(bufferedStream, bitrate ?? (96 * 1024), application, packetLoss); //Generates header
  144. }
  145. public AudioOutStream CreateDirectPCMStream(AudioApplication application, int? bitrate, int packetLoss)
  146. {
  147. var outputStream = new OutputStream(ApiClient); //Ignores header
  148. var sodiumEncrypter = new SodiumEncryptStream(outputStream, this); //Passes header
  149. var rtpWriter = new RTPWriteStream(sodiumEncrypter, _ssrc); //Consumes header, passes
  150. return new OpusEncodeStream(rtpWriter, bitrate ?? (96 * 1024), application, packetLoss); //Generates header
  151. }
  152. internal async Task CreateInputStreamAsync(ulong userId)
  153. {
  154. //Assume Thread-safe
  155. if (!_streams.ContainsKey(userId))
  156. {
  157. var readerStream = new InputStream(); //Consumes header
  158. var opusDecoder = new OpusDecodeStream(readerStream); //Passes header
  159. //var jitterBuffer = new JitterBuffer(opusDecoder, _audioLogger);
  160. var rtpReader = new RTPReadStream(opusDecoder); //Generates header
  161. var decryptStream = new SodiumDecryptStream(rtpReader, this); //No header
  162. _streams.TryAdd(userId, new StreamPair(readerStream, decryptStream));
  163. await _streamCreatedEvent.InvokeAsync(userId, readerStream);
  164. }
  165. }
  166. internal AudioInStream GetInputStream(ulong id)
  167. {
  168. if (_streams.TryGetValue(id, out StreamPair streamPair))
  169. return streamPair.Reader;
  170. return null;
  171. }
  172. internal async Task RemoveInputStreamAsync(ulong userId)
  173. {
  174. if (_streams.TryRemove(userId, out var pair))
  175. {
  176. await _streamDestroyedEvent.InvokeAsync(userId).ConfigureAwait(false);
  177. pair.Reader.Dispose();
  178. }
  179. }
  180. internal async Task ClearInputStreamsAsync()
  181. {
  182. foreach (var pair in _streams)
  183. {
  184. await _streamDestroyedEvent.InvokeAsync(pair.Key).ConfigureAwait(false);
  185. pair.Value.Reader.Dispose();
  186. }
  187. _ssrcMap.Clear();
  188. _streams.Clear();
  189. }
  190. private async Task ProcessMessageAsync(VoiceOpCode opCode, object payload)
  191. {
  192. _lastMessageTime = Environment.TickCount;
  193. try
  194. {
  195. switch (opCode)
  196. {
  197. case VoiceOpCode.Ready:
  198. {
  199. await _audioLogger.DebugAsync("Received Ready").ConfigureAwait(false);
  200. var data = (payload as JToken).ToObject<ReadyEvent>(_serializer);
  201. _ssrc = data.SSRC;
  202. if (!data.Modes.Contains(DiscordVoiceAPIClient.Mode))
  203. throw new InvalidOperationException($"Discord does not support {DiscordVoiceAPIClient.Mode}");
  204. ApiClient.SetUdpEndpoint(data.Ip, data.Port);
  205. await ApiClient.SendDiscoveryAsync(_ssrc).ConfigureAwait(false);
  206. _heartbeatTask = RunHeartbeatAsync(41250, _connection.CancelToken);
  207. }
  208. break;
  209. case VoiceOpCode.SessionDescription:
  210. {
  211. await _audioLogger.DebugAsync("Received SessionDescription").ConfigureAwait(false);
  212. var data = (payload as JToken).ToObject<SessionDescriptionEvent>(_serializer);
  213. if (data.Mode != DiscordVoiceAPIClient.Mode)
  214. throw new InvalidOperationException($"Discord selected an unexpected mode: {data.Mode}");
  215. SecretKey = data.SecretKey;
  216. _isSpeaking = false;
  217. await ApiClient.SendSetSpeaking(false).ConfigureAwait(false);
  218. _keepaliveTask = RunKeepaliveAsync(5000, _connection.CancelToken);
  219. var _ = _connection.CompleteAsync();
  220. }
  221. break;
  222. case VoiceOpCode.HeartbeatAck:
  223. {
  224. await _audioLogger.DebugAsync("Received HeartbeatAck").ConfigureAwait(false);
  225. if (_heartbeatTimes.TryDequeue(out long time))
  226. {
  227. int latency = (int)(Environment.TickCount - time);
  228. int before = Latency;
  229. Latency = latency;
  230. await _latencyUpdatedEvent.InvokeAsync(before, latency).ConfigureAwait(false);
  231. }
  232. }
  233. break;
  234. case VoiceOpCode.Speaking:
  235. {
  236. await _audioLogger.DebugAsync("Received Speaking").ConfigureAwait(false);
  237. var data = (payload as JToken).ToObject<SpeakingEvent>(_serializer);
  238. _ssrcMap[data.Ssrc] = data.UserId; //TODO: Memory Leak: SSRCs are never cleaned up
  239. await _speakingUpdatedEvent.InvokeAsync(data.UserId, data.Speaking);
  240. }
  241. break;
  242. default:
  243. await _audioLogger.WarningAsync($"Unknown OpCode ({opCode})").ConfigureAwait(false);
  244. return;
  245. }
  246. }
  247. catch (Exception ex)
  248. {
  249. await _audioLogger.ErrorAsync($"Error handling {opCode}", ex).ConfigureAwait(false);
  250. return;
  251. }
  252. }
  253. private async Task ProcessPacketAsync(byte[] packet)
  254. {
  255. try
  256. {
  257. if (_connection.State == ConnectionState.Connecting)
  258. {
  259. if (packet.Length != 70)
  260. {
  261. await _audioLogger.DebugAsync("Malformed Packet").ConfigureAwait(false);
  262. return;
  263. }
  264. string ip;
  265. int port;
  266. try
  267. {
  268. ip = Encoding.UTF8.GetString(packet, 4, 70 - 6).TrimEnd('\0');
  269. port = (packet[69] << 8) | packet[68];
  270. }
  271. catch (Exception ex)
  272. {
  273. await _audioLogger.DebugAsync("Malformed Packet", ex).ConfigureAwait(false);
  274. return;
  275. }
  276. await _audioLogger.DebugAsync("Received Discovery").ConfigureAwait(false);
  277. await ApiClient.SendSelectProtocol(ip, port).ConfigureAwait(false);
  278. }
  279. else if (_connection.State == ConnectionState.Connected)
  280. {
  281. if (packet.Length == 8)
  282. {
  283. await _audioLogger.DebugAsync("Received Keepalive").ConfigureAwait(false);
  284. ulong value =
  285. ((ulong)packet[0] >> 0) |
  286. ((ulong)packet[1] >> 8) |
  287. ((ulong)packet[2] >> 16) |
  288. ((ulong)packet[3] >> 24) |
  289. ((ulong)packet[4] >> 32) |
  290. ((ulong)packet[5] >> 40) |
  291. ((ulong)packet[6] >> 48) |
  292. ((ulong)packet[7] >> 56);
  293. while (_keepaliveTimes.TryDequeue(out var pair))
  294. {
  295. if (pair.Key == value)
  296. {
  297. int latency = Environment.TickCount - pair.Value;
  298. int before = UdpLatency;
  299. UdpLatency = latency;
  300. await _udpLatencyUpdatedEvent.InvokeAsync(before, latency).ConfigureAwait(false);
  301. break;
  302. }
  303. }
  304. }
  305. else
  306. {
  307. if (!RTPReadStream.TryReadSsrc(packet, 0, out var ssrc))
  308. {
  309. await _audioLogger.DebugAsync("Malformed Frame").ConfigureAwait(false);
  310. return;
  311. }
  312. if (!_ssrcMap.TryGetValue(ssrc, out var userId))
  313. {
  314. await _audioLogger.DebugAsync($"Unknown SSRC {ssrc}").ConfigureAwait(false);
  315. return;
  316. }
  317. if (!_streams.TryGetValue(userId, out var pair))
  318. {
  319. await _audioLogger.DebugAsync($"Unknown User {userId}").ConfigureAwait(false);
  320. return;
  321. }
  322. try
  323. {
  324. await pair.Writer.WriteAsync(packet, 0, packet.Length).ConfigureAwait(false);
  325. }
  326. catch (Exception ex)
  327. {
  328. await _audioLogger.DebugAsync("Malformed Frame", ex).ConfigureAwait(false);
  329. return;
  330. }
  331. //await _audioLogger.DebugAsync($"Received {packet.Length} bytes from user {userId}").ConfigureAwait(false);
  332. }
  333. }
  334. }
  335. catch (Exception ex)
  336. {
  337. await _audioLogger.WarningAsync("Failed to process UDP packet", ex).ConfigureAwait(false);
  338. return;
  339. }
  340. }
  341. private async Task RunHeartbeatAsync(int intervalMillis, CancellationToken cancelToken)
  342. {
  343. //TODO: Clean this up when Discord's session patch is live
  344. try
  345. {
  346. await _audioLogger.DebugAsync("Heartbeat Started").ConfigureAwait(false);
  347. while (!cancelToken.IsCancellationRequested)
  348. {
  349. var now = Environment.TickCount;
  350. //Did server respond to our last heartbeat?
  351. if (_heartbeatTimes.Count != 0 && (now - _lastMessageTime) > intervalMillis &&
  352. ConnectionState == ConnectionState.Connected)
  353. {
  354. _connection.Error(new Exception("Server missed last heartbeat"));
  355. return;
  356. }
  357. _heartbeatTimes.Enqueue(now);
  358. try
  359. {
  360. await ApiClient.SendHeartbeatAsync().ConfigureAwait(false);
  361. }
  362. catch (Exception ex)
  363. {
  364. await _audioLogger.WarningAsync("Failed to send heartbeat", ex).ConfigureAwait(false);
  365. }
  366. await Task.Delay(intervalMillis, cancelToken).ConfigureAwait(false);
  367. }
  368. await _audioLogger.DebugAsync("Heartbeat Stopped").ConfigureAwait(false);
  369. }
  370. catch (OperationCanceledException)
  371. {
  372. await _audioLogger.DebugAsync("Heartbeat Stopped").ConfigureAwait(false);
  373. }
  374. catch (Exception ex)
  375. {
  376. await _audioLogger.ErrorAsync("Heartbeat Errored", ex).ConfigureAwait(false);
  377. }
  378. }
  379. private async Task RunKeepaliveAsync(int intervalMillis, CancellationToken cancelToken)
  380. {
  381. try
  382. {
  383. await _audioLogger.DebugAsync("Keepalive Started").ConfigureAwait(false);
  384. while (!cancelToken.IsCancellationRequested)
  385. {
  386. var now = Environment.TickCount;
  387. try
  388. {
  389. ulong value = await ApiClient.SendKeepaliveAsync().ConfigureAwait(false);
  390. if (_keepaliveTimes.Count < 12) //No reply for 60 Seconds
  391. _keepaliveTimes.Enqueue(new KeyValuePair<ulong, int>(value, now));
  392. }
  393. catch (Exception ex)
  394. {
  395. await _audioLogger.WarningAsync("Failed to send keepalive", ex).ConfigureAwait(false);
  396. }
  397. await Task.Delay(intervalMillis, cancelToken).ConfigureAwait(false);
  398. }
  399. await _audioLogger.DebugAsync("Keepalive Stopped").ConfigureAwait(false);
  400. }
  401. catch (OperationCanceledException)
  402. {
  403. await _audioLogger.DebugAsync("Keepalive Stopped").ConfigureAwait(false);
  404. }
  405. catch (Exception ex)
  406. {
  407. await _audioLogger.ErrorAsync("Keepalive Errored", ex).ConfigureAwait(false);
  408. }
  409. }
  410. public async Task SetSpeakingAsync(bool value)
  411. {
  412. if (_isSpeaking != value)
  413. {
  414. _isSpeaking = value;
  415. await ApiClient.SendSetSpeaking(value).ConfigureAwait(false);
  416. }
  417. }
  418. internal void Dispose(bool disposing)
  419. {
  420. if (disposing)
  421. {
  422. StopAsync().GetAwaiter().GetResult();
  423. ApiClient.Dispose();
  424. _stateLock?.Dispose();
  425. }
  426. }
  427. /// <inheritdoc />
  428. public void Dispose() => Dispose(true);
  429. }
  430. }