You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

HttpMixin.cs 5.8 KiB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. //From https://github.com/akavache/Akavache
  2. //Copyright (c) 2012 GitHub
  3. //TODO: Remove once netstandard support is added
  4. #pragma warning disable CS0618
  5. using Akavache;
  6. using System;
  7. using System.Collections.Generic;
  8. using System.Diagnostics;
  9. using System.IO;
  10. using System.Linq;
  11. using System.Net;
  12. using System.Reactive.Linq;
  13. using System.Reactive.Subjects;
  14. using System.Text;
  15. using System.Reactive;
  16. using System.Reactive.Threading.Tasks;
  17. namespace Discord.Net
  18. {
  19. public class HttpMixin : IAkavacheHttpMixin
  20. {
  21. /// <summary>
  22. /// Download data from an HTTP URL and insert the result into the
  23. /// cache. If the data is already in the cache, this returns
  24. /// a cached value. The URL itself is used as the key.
  25. /// </summary>
  26. /// <param name="url">The URL to download.</param>
  27. /// <param name="headers">An optional Dictionary containing the HTTP
  28. /// request headers.</param>
  29. /// <param name="fetchAlways">Force a web request to always be issued, skipping the cache.</param>
  30. /// <param name="absoluteExpiration">An optional expiration date.</param>
  31. /// <returns>The data downloaded from the URL.</returns>
  32. public IObservable<byte[]> DownloadUrl(IBlobCache This, string url, IDictionary<string, string> headers = null, bool fetchAlways = false, DateTimeOffset? absoluteExpiration = null)
  33. {
  34. return This.DownloadUrl(url, url, headers, fetchAlways, absoluteExpiration);
  35. }
  36. /// <summary>
  37. /// Download data from an HTTP URL and insert the result into the
  38. /// cache. If the data is already in the cache, this returns
  39. /// a cached value. An explicit key is provided rather than the URL itself.
  40. /// </summary>
  41. /// <param name="key">The key to store with.</param>
  42. /// <param name="url">The URL to download.</param>
  43. /// <param name="headers">An optional Dictionary containing the HTTP
  44. /// request headers.</param>
  45. /// <param name="fetchAlways">Force a web request to always be issued, skipping the cache.</param>
  46. /// <param name="absoluteExpiration">An optional expiration date.</param>
  47. /// <returns>The data downloaded from the URL.</returns>
  48. public IObservable<byte[]> DownloadUrl(IBlobCache This, string key, string url, IDictionary<string, string> headers = null, bool fetchAlways = false, DateTimeOffset? absoluteExpiration = null)
  49. {
  50. var doFetch = MakeWebRequest(new Uri(url), headers).SelectMany(x => ProcessWebResponse(x, url, absoluteExpiration));
  51. var fetchAndCache = doFetch.SelectMany(x => This.Insert(key, x, absoluteExpiration).Select(_ => x));
  52. var ret = default(IObservable<byte[]>);
  53. if (!fetchAlways)
  54. {
  55. ret = This.Get(key).Catch(fetchAndCache);
  56. }
  57. else
  58. {
  59. ret = fetchAndCache;
  60. }
  61. var conn = ret.PublishLast();
  62. conn.Connect();
  63. return conn;
  64. }
  65. IObservable<byte[]> ProcessWebResponse(WebResponse wr, string url, DateTimeOffset? absoluteExpiration)
  66. {
  67. var hwr = (HttpWebResponse)wr;
  68. Debug.Assert(hwr != null, "The Web Response is somehow null but shouldn't be.");
  69. if ((int)hwr.StatusCode >= 400)
  70. {
  71. return Observable.Throw<byte[]>(new WebException(hwr.StatusDescription));
  72. }
  73. var ms = new MemoryStream();
  74. using (var responseStream = hwr.GetResponseStream())
  75. {
  76. Debug.Assert(responseStream != null, "The response stream is somehow null");
  77. responseStream.CopyTo(ms);
  78. }
  79. var ret = ms.ToArray();
  80. return Observable.Return(ret);
  81. }
  82. static IObservable<WebResponse> MakeWebRequest(
  83. Uri uri,
  84. IDictionary<string, string> headers = null,
  85. string content = null,
  86. int retries = 3,
  87. TimeSpan? timeout = null)
  88. {
  89. IObservable<WebResponse> request;
  90. request = Observable.Defer(() =>
  91. {
  92. var hwr = CreateWebRequest(uri, headers);
  93. if (content == null)
  94. return Observable.FromAsyncPattern<WebResponse>(hwr.BeginGetResponse, hwr.EndGetResponse)();
  95. var buf = Encoding.UTF8.GetBytes(content);
  96. // NB: You'd think that BeginGetResponse would never block,
  97. // seeing as how it's asynchronous. You'd be wrong :-/
  98. var ret = new AsyncSubject<WebResponse>();
  99. Observable.Start(() =>
  100. {
  101. Observable.FromAsyncPattern<Stream>(hwr.BeginGetRequestStream, hwr.EndGetRequestStream)()
  102. .SelectMany(x => WriteAsyncRx(x, buf, 0, buf.Length))
  103. .SelectMany(_ => Observable.FromAsyncPattern<WebResponse>(hwr.BeginGetResponse, hwr.EndGetResponse)())
  104. .Multicast(ret).Connect();
  105. }, BlobCache.TaskpoolScheduler);
  106. return ret;
  107. });
  108. return request.Timeout(timeout ?? TimeSpan.FromSeconds(15), BlobCache.TaskpoolScheduler).Retry(retries);
  109. }
  110. private static WebRequest CreateWebRequest(Uri uri, IDictionary<string, string> headers)
  111. {
  112. var hwr = WebRequest.Create(uri);
  113. if (headers != null)
  114. {
  115. foreach (var x in headers)
  116. {
  117. hwr.Headers[x.Key] = x.Value;
  118. }
  119. }
  120. return hwr;
  121. }
  122. private static IObservable<Unit> WriteAsyncRx(Stream stream, byte[] data, int start, int length)
  123. {
  124. return stream.WriteAsync(data, start, length).ToObservable();
  125. }
  126. }
  127. }