You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

AudioClient.cs 21 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499
  1. using Discord.API.Voice;
  2. using Discord.Audio.Streams;
  3. using Discord.Logging;
  4. using Discord.Net.Converters;
  5. using Discord.WebSocket;
  6. using Microsoft.Extensions.Logging;
  7. using Newtonsoft.Json;
  8. using Newtonsoft.Json.Linq;
  9. using System;
  10. using System.Collections.Concurrent;
  11. using System.Linq;
  12. using System.Text;
  13. using System.Threading;
  14. using System.Threading.Tasks;
  15. using System.Collections.Generic;
  16. namespace Discord.Audio
  17. {
  18. //TODO: Add audio reconnecting
  19. internal partial class AudioClient : IAudioClient
  20. {
  21. internal struct StreamPair
  22. {
  23. public AudioInStream Reader;
  24. public AudioOutStream Writer;
  25. public StreamPair(AudioInStream reader, AudioOutStream writer)
  26. {
  27. Reader = reader;
  28. Writer = writer;
  29. }
  30. }
  31. private readonly ILogger _audioLogger;
  32. private readonly JsonSerializer _serializer;
  33. private readonly ConnectionManager _connection;
  34. private readonly SemaphoreSlim _stateLock;
  35. private readonly ConcurrentQueue<long> _heartbeatTimes;
  36. private readonly ConcurrentQueue<KeyValuePair<ulong, int>> _keepaliveTimes;
  37. private readonly ConcurrentDictionary<uint, ulong> _ssrcMap;
  38. private readonly ConcurrentDictionary<ulong, StreamPair> _streams;
  39. private Task _heartbeatTask, _keepaliveTask;
  40. private long _lastMessageTime;
  41. private string _url, _sessionId, _token;
  42. private ulong _userId;
  43. private uint _ssrc;
  44. private bool _isSpeaking;
  45. public SocketGuild Guild { get; }
  46. public DiscordVoiceAPIClient ApiClient { get; private set; }
  47. public int Latency { get; private set; }
  48. public int UdpLatency { get; private set; }
  49. public ulong ChannelId { get; internal set; }
  50. internal byte[] SecretKey { get; private set; }
  51. private DiscordSocketClient Discord => Guild.Discord;
  52. public ConnectionState ConnectionState => _connection.State;
  53. /// <summary> Creates a new REST/WebSocket discord client. </summary>
  54. internal AudioClient(SocketGuild guild, int clientId, ulong channelId)
  55. {
  56. Guild = guild;
  57. ChannelId = channelId;
  58. _audioLogger = Discord.LogManager.CreateLogger($"Audio #{clientId}");
  59. ApiClient = new DiscordVoiceAPIClient(guild.Id, Discord.WebSocketProvider, Discord.UdpSocketProvider);
  60. ApiClient.SentGatewayMessage += opCode =>
  61. {
  62. _audioLogger.LogDebug($"Sent {opCode}");
  63. return Task.CompletedTask;
  64. };
  65. ApiClient.SentDiscovery += () =>
  66. {
  67. _audioLogger.LogDebug("Sent Discovery");
  68. return Task.CompletedTask;
  69. };
  70. //ApiClient.SentData += async bytes => await _audioLogger.DebugAsync($"Sent {bytes} Bytes").ConfigureAwait(false);
  71. ApiClient.ReceivedEvent += ProcessMessageAsync;
  72. ApiClient.ReceivedPacket += ProcessPacketAsync;
  73. _stateLock = new SemaphoreSlim(1, 1);
  74. _connection = new ConnectionManager(_stateLock, _audioLogger, 30000,
  75. OnConnectingAsync, OnDisconnectingAsync, x => ApiClient.Disconnected += x);
  76. _connection.Connected += () => _connectedEvent.InvokeAsync();
  77. _connection.Disconnected += (ex, recon) => _disconnectedEvent.InvokeAsync(ex);
  78. _heartbeatTimes = new ConcurrentQueue<long>();
  79. _keepaliveTimes = new ConcurrentQueue<KeyValuePair<ulong, int>>();
  80. _ssrcMap = new ConcurrentDictionary<uint, ulong>();
  81. _streams = new ConcurrentDictionary<ulong, StreamPair>();
  82. _serializer = new JsonSerializer { ContractResolver = new DiscordContractResolver() };
  83. _serializer.Error += (s, e) =>
  84. {
  85. _audioLogger.LogWarning(e.ErrorContext.Error, e.ErrorContext.Error.Message);
  86. e.ErrorContext.Handled = true;
  87. };
  88. LatencyUpdated += (old, val) =>
  89. {
  90. _audioLogger.LogDebug($"Latency = {val} ms");
  91. return Task.CompletedTask;
  92. };
  93. UdpLatencyUpdated += (old, val) =>
  94. {
  95. _audioLogger.LogDebug($"UDP Latency = {val} ms");
  96. return Task.CompletedTask;
  97. };
  98. }
  99. internal async Task StartAsync(string url, ulong userId, string sessionId, string token)
  100. {
  101. _url = url;
  102. _userId = userId;
  103. _sessionId = sessionId;
  104. _token = token;
  105. await _connection.StartAsync().ConfigureAwait(false);
  106. }
  107. public IReadOnlyDictionary<ulong, AudioInStream> GetStreams()
  108. {
  109. return _streams.ToDictionary(pair => pair.Key, pair => pair.Value.Reader);
  110. }
  111. public async Task StopAsync()
  112. {
  113. await _connection.StopAsync().ConfigureAwait(false);
  114. }
  115. private async Task OnConnectingAsync()
  116. {
  117. _audioLogger.LogDebug("Connecting ApiClient");
  118. await ApiClient.ConnectAsync("wss://" + _url + "?v=" + DiscordConfig.VoiceAPIVersion).ConfigureAwait(false);
  119. _audioLogger.LogDebug("Listening on port " + ApiClient.UdpPort);
  120. _audioLogger.LogDebug("Sending Identity");
  121. await ApiClient.SendIdentityAsync(_userId, _sessionId, _token).ConfigureAwait(false);
  122. //Wait for READY
  123. await _connection.WaitAsync().ConfigureAwait(false);
  124. }
  125. private async Task OnDisconnectingAsync(Exception ex)
  126. {
  127. _audioLogger.LogDebug("Disconnecting ApiClient");
  128. await ApiClient.DisconnectAsync().ConfigureAwait(false);
  129. //Wait for tasks to complete
  130. _audioLogger.LogDebug("Waiting for heartbeater");
  131. var heartbeatTask = _heartbeatTask;
  132. if (heartbeatTask != null)
  133. await heartbeatTask.ConfigureAwait(false);
  134. _heartbeatTask = null;
  135. var keepaliveTask = _keepaliveTask;
  136. if (keepaliveTask != null)
  137. await keepaliveTask.ConfigureAwait(false);
  138. _keepaliveTask = null;
  139. while (_heartbeatTimes.TryDequeue(out _)) { }
  140. _lastMessageTime = 0;
  141. await ClearInputStreamsAsync().ConfigureAwait(false);
  142. _audioLogger.LogDebug("Sending Voice State");
  143. await Discord.ApiClient.SendVoiceStateUpdateAsync(Guild.Id, null, false, false).ConfigureAwait(false);
  144. }
  145. public AudioOutStream CreateOpusStream(int bufferMillis)
  146. {
  147. var outputStream = new OutputStream(ApiClient); //Ignores header
  148. var sodiumEncrypter = new SodiumEncryptStream( outputStream, this); //Passes header
  149. var rtpWriter = new RTPWriteStream(sodiumEncrypter, _ssrc); //Consumes header, passes
  150. return new BufferedWriteStream(rtpWriter, this, bufferMillis, _connection.CancelToken, _audioLogger); //Generates header
  151. }
  152. public AudioOutStream CreateDirectOpusStream()
  153. {
  154. var outputStream = new OutputStream(ApiClient); //Ignores header
  155. var sodiumEncrypter = new SodiumEncryptStream(outputStream, this); //Passes header
  156. return new RTPWriteStream(sodiumEncrypter, _ssrc); //Consumes header (external input), passes
  157. }
  158. public AudioOutStream CreatePCMStream(AudioApplication application, int? bitrate, int bufferMillis, int packetLoss)
  159. {
  160. var outputStream = new OutputStream(ApiClient); //Ignores header
  161. var sodiumEncrypter = new SodiumEncryptStream(outputStream, this); //Passes header
  162. var rtpWriter = new RTPWriteStream(sodiumEncrypter, _ssrc); //Consumes header, passes
  163. var bufferedStream = new BufferedWriteStream(rtpWriter, this, bufferMillis, _connection.CancelToken, _audioLogger); //Ignores header, generates header
  164. return new OpusEncodeStream(bufferedStream, bitrate ?? (96 * 1024), application, packetLoss); //Generates header
  165. }
  166. public AudioOutStream CreateDirectPCMStream(AudioApplication application, int? bitrate, int packetLoss)
  167. {
  168. var outputStream = new OutputStream(ApiClient); //Ignores header
  169. var sodiumEncrypter = new SodiumEncryptStream(outputStream, this); //Passes header
  170. var rtpWriter = new RTPWriteStream(sodiumEncrypter, _ssrc); //Consumes header, passes
  171. return new OpusEncodeStream(rtpWriter, bitrate ?? (96 * 1024), application, packetLoss); //Generates header
  172. }
  173. internal async Task CreateInputStreamAsync(ulong userId)
  174. {
  175. //Assume Thread-safe
  176. if (!_streams.ContainsKey(userId))
  177. {
  178. var readerStream = new InputStream(); //Consumes header
  179. var opusDecoder = new OpusDecodeStream(readerStream); //Passes header
  180. //var jitterBuffer = new JitterBuffer(opusDecoder, _audioLogger);
  181. var rtpReader = new RTPReadStream(opusDecoder); //Generates header
  182. var decryptStream = new SodiumDecryptStream(rtpReader, this); //No header
  183. _streams.TryAdd(userId, new StreamPair(readerStream, decryptStream));
  184. await _streamCreatedEvent.InvokeAsync(userId, readerStream);
  185. }
  186. }
  187. internal AudioInStream GetInputStream(ulong id)
  188. {
  189. if (_streams.TryGetValue(id, out StreamPair streamPair))
  190. return streamPair.Reader;
  191. return null;
  192. }
  193. internal async Task RemoveInputStreamAsync(ulong userId)
  194. {
  195. if (_streams.TryRemove(userId, out var pair))
  196. {
  197. await _streamDestroyedEvent.InvokeAsync(userId).ConfigureAwait(false);
  198. pair.Reader.Dispose();
  199. }
  200. }
  201. internal async Task ClearInputStreamsAsync()
  202. {
  203. foreach (var pair in _streams)
  204. {
  205. await _streamDestroyedEvent.InvokeAsync(pair.Key).ConfigureAwait(false);
  206. pair.Value.Reader.Dispose();
  207. }
  208. _ssrcMap.Clear();
  209. _streams.Clear();
  210. }
  211. private async Task ProcessMessageAsync(VoiceOpCode opCode, object payload)
  212. {
  213. _lastMessageTime = Environment.TickCount;
  214. try
  215. {
  216. switch (opCode)
  217. {
  218. case VoiceOpCode.Ready:
  219. {
  220. _audioLogger.LogDebug("Received Ready");
  221. var data = (payload as JToken).ToObject<ReadyEvent>(_serializer);
  222. _ssrc = data.SSRC;
  223. if (!data.Modes.Contains(DiscordVoiceAPIClient.Mode))
  224. throw new InvalidOperationException($"Discord does not support {DiscordVoiceAPIClient.Mode}");
  225. ApiClient.SetUdpEndpoint(data.Ip, data.Port);
  226. await ApiClient.SendDiscoveryAsync(_ssrc).ConfigureAwait(false);
  227. _heartbeatTask = RunHeartbeatAsync(41250, _connection.CancelToken);
  228. }
  229. break;
  230. case VoiceOpCode.SessionDescription:
  231. {
  232. _audioLogger.LogDebug("Received SessionDescription");
  233. var data = (payload as JToken).ToObject<SessionDescriptionEvent>(_serializer);
  234. if (data.Mode != DiscordVoiceAPIClient.Mode)
  235. throw new InvalidOperationException($"Discord selected an unexpected mode: {data.Mode}");
  236. SecretKey = data.SecretKey;
  237. _isSpeaking = false;
  238. await ApiClient.SendSetSpeaking(false).ConfigureAwait(false);
  239. _keepaliveTask = RunKeepaliveAsync(5000, _connection.CancelToken);
  240. var _ = _connection.CompleteAsync();
  241. }
  242. break;
  243. case VoiceOpCode.HeartbeatAck:
  244. {
  245. _audioLogger.LogDebug("Received HeartbeatAck");
  246. if (_heartbeatTimes.TryDequeue(out long time))
  247. {
  248. int latency = (int)(Environment.TickCount - time);
  249. int before = Latency;
  250. Latency = latency;
  251. await _latencyUpdatedEvent.InvokeAsync(before, latency).ConfigureAwait(false);
  252. }
  253. }
  254. break;
  255. case VoiceOpCode.Speaking:
  256. {
  257. _audioLogger.LogDebug("Received Speaking");
  258. var data = (payload as JToken).ToObject<SpeakingEvent>(_serializer);
  259. _ssrcMap[data.Ssrc] = data.UserId; //TODO: Memory Leak: SSRCs are never cleaned up
  260. await _speakingUpdatedEvent.InvokeAsync(data.UserId, data.Speaking);
  261. }
  262. break;
  263. default:
  264. _audioLogger.LogWarning($"Unknown OpCode ({opCode})");
  265. return;
  266. }
  267. }
  268. catch (Exception ex)
  269. {
  270. _audioLogger.LogError($"Error handling {opCode}", ex);
  271. return;
  272. }
  273. }
  274. private async Task ProcessPacketAsync(byte[] packet)
  275. {
  276. try
  277. {
  278. if (_connection.State == ConnectionState.Connecting)
  279. {
  280. if (packet.Length != 70)
  281. {
  282. _audioLogger.LogDebug("Malformed Packet");
  283. return;
  284. }
  285. string ip;
  286. int port;
  287. try
  288. {
  289. ip = Encoding.UTF8.GetString(packet, 4, 70 - 6).TrimEnd('\0');
  290. port = (packet[69] << 8) | packet[68];
  291. }
  292. catch (Exception ex)
  293. {
  294. _audioLogger.LogDebug("Malformed Packet", ex);
  295. return;
  296. }
  297. _audioLogger.LogDebug("Received Discovery");
  298. await ApiClient.SendSelectProtocol(ip, port).ConfigureAwait(false);
  299. }
  300. else if (_connection.State == ConnectionState.Connected)
  301. {
  302. if (packet.Length == 8)
  303. {
  304. _audioLogger.LogDebug("Received Keepalive");
  305. ulong value =
  306. ((ulong)packet[0] >> 0) |
  307. ((ulong)packet[1] >> 8) |
  308. ((ulong)packet[2] >> 16) |
  309. ((ulong)packet[3] >> 24) |
  310. ((ulong)packet[4] >> 32) |
  311. ((ulong)packet[5] >> 40) |
  312. ((ulong)packet[6] >> 48) |
  313. ((ulong)packet[7] >> 56);
  314. while (_keepaliveTimes.TryDequeue(out var pair))
  315. {
  316. if (pair.Key == value)
  317. {
  318. int latency = Environment.TickCount - pair.Value;
  319. int before = UdpLatency;
  320. UdpLatency = latency;
  321. await _udpLatencyUpdatedEvent.InvokeAsync(before, latency).ConfigureAwait(false);
  322. break;
  323. }
  324. }
  325. }
  326. else
  327. {
  328. if (!RTPReadStream.TryReadSsrc(packet, 0, out var ssrc))
  329. {
  330. _audioLogger.LogDebug("Malformed Frame");
  331. return;
  332. }
  333. if (!_ssrcMap.TryGetValue(ssrc, out var userId))
  334. {
  335. _audioLogger.LogDebug($"Unknown SSRC {ssrc}");
  336. return;
  337. }
  338. if (!_streams.TryGetValue(userId, out var pair))
  339. {
  340. _audioLogger.LogDebug($"Unknown User {userId}");
  341. return;
  342. }
  343. try
  344. {
  345. await pair.Writer.WriteAsync(packet, 0, packet.Length).ConfigureAwait(false);
  346. }
  347. catch (Exception ex)
  348. {
  349. _audioLogger.LogDebug(ex, "Malformed Frame");
  350. return;
  351. }
  352. //await _audioLogger.DebugAsync($"Received {packet.Length} bytes from user {userId}").ConfigureAwait(false);
  353. }
  354. }
  355. }
  356. catch (Exception ex)
  357. {
  358. _audioLogger.LogWarning("Failed to process UDP packet", ex);
  359. return;
  360. }
  361. }
  362. private async Task RunHeartbeatAsync(int intervalMillis, CancellationToken cancelToken)
  363. {
  364. // TODO: Clean this up when Discord's session patch is live
  365. try
  366. {
  367. _audioLogger.LogDebug("Heartbeat Started");
  368. while (!cancelToken.IsCancellationRequested)
  369. {
  370. var now = Environment.TickCount;
  371. //Did server respond to our last heartbeat?
  372. if (_heartbeatTimes.Count != 0 && (now - _lastMessageTime) > intervalMillis &&
  373. ConnectionState == ConnectionState.Connected)
  374. {
  375. _connection.Error(new Exception("Server missed last heartbeat"));
  376. return;
  377. }
  378. _heartbeatTimes.Enqueue(now);
  379. try
  380. {
  381. await ApiClient.SendHeartbeatAsync().ConfigureAwait(false);
  382. }
  383. catch (Exception ex)
  384. {
  385. _audioLogger.LogWarning("Failed to send heartbeat", ex);
  386. }
  387. await Task.Delay(intervalMillis, cancelToken).ConfigureAwait(false);
  388. }
  389. _audioLogger.LogDebug("Heartbeat Stopped");
  390. }
  391. catch (OperationCanceledException)
  392. {
  393. _audioLogger.LogDebug("Heartbeat Stopped");
  394. }
  395. catch (Exception ex)
  396. {
  397. _audioLogger.LogError("Heartbeat Errored", ex);
  398. }
  399. }
  400. private async Task RunKeepaliveAsync(int intervalMillis, CancellationToken cancelToken)
  401. {
  402. try
  403. {
  404. _audioLogger.LogDebug("Keepalive Started");
  405. while (!cancelToken.IsCancellationRequested)
  406. {
  407. var now = Environment.TickCount;
  408. try
  409. {
  410. ulong value = await ApiClient.SendKeepaliveAsync().ConfigureAwait(false);
  411. if (_keepaliveTimes.Count < 12) //No reply for 60 Seconds
  412. _keepaliveTimes.Enqueue(new KeyValuePair<ulong, int>(value, now));
  413. }
  414. catch (Exception ex)
  415. {
  416. _audioLogger.LogWarning("Failed to send keepalive", ex);
  417. }
  418. await Task.Delay(intervalMillis, cancelToken).ConfigureAwait(false);
  419. }
  420. _audioLogger.LogDebug("Keepalive Stopped");
  421. }
  422. catch (OperationCanceledException)
  423. {
  424. _audioLogger.LogDebug("Keepalive Stopped");
  425. }
  426. catch (Exception ex)
  427. {
  428. _audioLogger.LogError("Keepalive Errored", ex);
  429. }
  430. }
  431. public async Task SetSpeakingAsync(bool value)
  432. {
  433. if (_isSpeaking != value)
  434. {
  435. _isSpeaking = value;
  436. await ApiClient.SendSetSpeaking(value).ConfigureAwait(false);
  437. }
  438. }
  439. internal void Dispose(bool disposing)
  440. {
  441. if (disposing)
  442. {
  443. StopAsync().GetAwaiter().GetResult();
  444. ApiClient.Dispose();
  445. _stateLock?.Dispose();
  446. }
  447. }
  448. /// <inheritdoc />
  449. public void Dispose() => Dispose(true);
  450. }
  451. }