|
|
@@ -1,59 +1,49 @@ |
|
|
|
using System; |
|
|
|
using System.Collections.Concurrent; |
|
|
|
using System.IO; |
|
|
|
using System.Threading; |
|
|
|
using System.Threading.Tasks; |
|
|
|
|
|
|
|
namespace Discord.Audio.Streams |
|
|
|
{ |
|
|
|
///<summary> Reads the payload from an RTP frame </summary> |
|
|
|
public class RTPReadStream : AudioInStream |
|
|
|
public class RTPReadStream : AudioOutStream |
|
|
|
{ |
|
|
|
private readonly BlockingCollection<byte[]> _queuedData; //TODO: Replace with max-length ring buffer |
|
|
|
//private readonly BlockingCollection<RTPFrame> _queuedData; //TODO: Replace with max-length ring buffer |
|
|
|
private readonly AudioClient _audioClient; |
|
|
|
private readonly InputStream _queue; |
|
|
|
private readonly AudioOutStream _next; |
|
|
|
private readonly byte[] _buffer, _nonce, _secretKey; |
|
|
|
|
|
|
|
public override bool CanRead => true; |
|
|
|
public override bool CanSeek => false; |
|
|
|
public override bool CanWrite => true; |
|
|
|
|
|
|
|
internal RTPReadStream(AudioClient audioClient, byte[] secretKey, int bufferSize = 4000) |
|
|
|
public RTPReadStream(InputStream queue, byte[] secretKey, int bufferSize = 4000) |
|
|
|
: this(queue, null, secretKey, bufferSize) { } |
|
|
|
public RTPReadStream(InputStream queue, AudioOutStream next, byte[] secretKey, int bufferSize = 4000) |
|
|
|
{ |
|
|
|
_audioClient = audioClient; |
|
|
|
_queue = queue; |
|
|
|
_next = next; |
|
|
|
_secretKey = secretKey; |
|
|
|
_buffer = new byte[bufferSize]; |
|
|
|
_queuedData = new BlockingCollection<byte[]>(100); |
|
|
|
_nonce = new byte[24]; |
|
|
|
} |
|
|
|
|
|
|
|
/*public RTPFrame ReadFrame() |
|
|
|
{ |
|
|
|
var queuedData = _queuedData.Take(); |
|
|
|
Buffer.BlockCopy(queuedData, 0, buffer, offset, Math.Min(queuedData.Length, count)); |
|
|
|
return queuedData.Length; |
|
|
|
}*/ |
|
|
|
public override int Read(byte[] buffer, int offset, int count) |
|
|
|
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancelToken) |
|
|
|
{ |
|
|
|
var queuedData = _queuedData.Take(); |
|
|
|
Buffer.BlockCopy(queuedData, 0, buffer, offset, Math.Min(queuedData.Length, count)); |
|
|
|
return queuedData.Length; |
|
|
|
} |
|
|
|
public override void Write(byte[] buffer, int offset, int count) |
|
|
|
{ |
|
|
|
var newBuffer = new byte[count]; |
|
|
|
Buffer.BlockCopy(buffer, 0, newBuffer, 0, count); |
|
|
|
_queuedData.Add(newBuffer); |
|
|
|
} |
|
|
|
cancelToken.ThrowIfCancellationRequested(); |
|
|
|
|
|
|
|
public override void Flush() { throw new NotSupportedException(); } |
|
|
|
var payload = new byte[count - 12]; |
|
|
|
Buffer.BlockCopy(buffer, offset + 12, payload, 0, count - 12); |
|
|
|
|
|
|
|
public override long Length { get { throw new NotSupportedException(); } } |
|
|
|
public override long Position |
|
|
|
{ |
|
|
|
get { throw new NotSupportedException(); } |
|
|
|
set { throw new NotSupportedException(); } |
|
|
|
} |
|
|
|
ushort seq = (ushort)((buffer[offset + 3] << 8) | |
|
|
|
(buffer[offset + 2] << 0)); |
|
|
|
|
|
|
|
uint timestamp = (uint)((buffer[offset + 4] << 24) | |
|
|
|
(buffer[offset + 5] << 16) | |
|
|
|
(buffer[offset + 6] << 16) | |
|
|
|
(buffer[offset + 7] << 0)); |
|
|
|
|
|
|
|
public override void SetLength(long value) { throw new NotSupportedException(); } |
|
|
|
public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); } |
|
|
|
_queue.WriteHeader(seq, timestamp); |
|
|
|
await (_next ?? _queue as Stream).WriteAsync(buffer, offset, count, cancelToken).ConfigureAwait(false); |
|
|
|
} |
|
|
|
} |
|
|
|
} |