You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

HttpMixin.cs 5.9 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. //From https://github.com/akavache/Akavache
  2. //Copyright (c) 2012 GitHub
  3. //TODO: Remove once netstandard support is added
  4. #pragma warning disable CS0618
  5. using Akavache;
  6. using System;
  7. using System.Collections.Generic;
  8. using System.Diagnostics;
  9. using System.IO;
  10. using System.Net;
  11. using System.Reactive.Linq;
  12. using System.Reactive.Subjects;
  13. using System.Text;
  14. using System.Reactive;
  15. using System.Reactive.Threading.Tasks;
  16. namespace Discord.Net
  17. {
  18. public class HttpMixin : IAkavacheHttpMixin
  19. {
  20. /// <summary>
  21. /// Download data from an HTTP URL and insert the result into the
  22. /// cache. If the data is already in the cache, this returns
  23. /// a cached value. The URL itself is used as the key.
  24. /// </summary>
  25. /// <param name="url">The URL to download.</param>
  26. /// <param name="headers">An optional Dictionary containing the HTTP
  27. /// request headers.</param>
  28. /// <param name="fetchAlways">Force a web request to always be issued, skipping the cache.</param>
  29. /// <param name="absoluteExpiration">An optional expiration date.</param>
  30. /// <returns>The data downloaded from the URL.</returns>
  31. public IObservable<byte[]> DownloadUrl(IBlobCache This, string url, IDictionary<string, string> headers = null, bool fetchAlways = false, DateTimeOffset? absoluteExpiration = null)
  32. {
  33. return This.DownloadUrl(url, url, headers, fetchAlways, absoluteExpiration);
  34. }
  35. /// <summary>
  36. /// Download data from an HTTP URL and insert the result into the
  37. /// cache. If the data is already in the cache, this returns
  38. /// a cached value. An explicit key is provided rather than the URL itself.
  39. /// </summary>
  40. /// <param name="key">The key to store with.</param>
  41. /// <param name="url">The URL to download.</param>
  42. /// <param name="headers">An optional Dictionary containing the HTTP
  43. /// request headers.</param>
  44. /// <param name="fetchAlways">Force a web request to always be issued, skipping the cache.</param>
  45. /// <param name="absoluteExpiration">An optional expiration date.</param>
  46. /// <returns>The data downloaded from the URL.</returns>
  47. public IObservable<byte[]> DownloadUrl(IBlobCache This, string key, string url, IDictionary<string, string> headers = null, bool fetchAlways = false, DateTimeOffset? absoluteExpiration = null)
  48. {
  49. var doFetch = MakeWebRequest(new Uri(url), headers).SelectMany(x => ProcessWebResponse(x, url, absoluteExpiration));
  50. var fetchAndCache = doFetch.SelectMany(x => This.Insert(key, x, absoluteExpiration).Select(_ => x));
  51. var ret = default(IObservable<byte[]>);
  52. if (!fetchAlways)
  53. {
  54. ret = This.Get(key).Catch(fetchAndCache);
  55. }
  56. else
  57. {
  58. ret = fetchAndCache;
  59. }
  60. var conn = ret.PublishLast();
  61. conn.Connect();
  62. return conn;
  63. }
  64. IObservable<byte[]> ProcessWebResponse(WebResponse wr, string url, DateTimeOffset? absoluteExpiration)
  65. {
  66. var hwr = (HttpWebResponse)wr;
  67. Debug.Assert(hwr != null, "The Web Response is somehow null but shouldn't be.");
  68. if ((int)hwr.StatusCode >= 400)
  69. {
  70. return Observable.Throw<byte[]>(new WebException(hwr.StatusDescription));
  71. }
  72. var ms = new MemoryStream();
  73. using (var responseStream = hwr.GetResponseStream())
  74. {
  75. Debug.Assert(responseStream != null, "The response stream is somehow null");
  76. responseStream.CopyTo(ms);
  77. }
  78. var ret = ms.ToArray();
  79. return Observable.Return(ret);
  80. }
  81. static IObservable<WebResponse> MakeWebRequest(
  82. Uri uri,
  83. IDictionary<string, string> headers = null,
  84. string content = null,
  85. int retries = 3,
  86. TimeSpan? timeout = null)
  87. {
  88. IObservable<WebResponse> request;
  89. request = Observable.Defer(() =>
  90. {
  91. var hwr = CreateWebRequest(uri, headers);
  92. if (content == null)
  93. return Observable.FromAsyncPattern<WebResponse>(hwr.BeginGetResponse, hwr.EndGetResponse)();
  94. var buf = Encoding.UTF8.GetBytes(content);
  95. // NB: You'd think that BeginGetResponse would never block,
  96. // seeing as how it's asynchronous. You'd be wrong :-/
  97. var ret = new AsyncSubject<WebResponse>();
  98. Observable.Start(() =>
  99. {
  100. Observable.FromAsyncPattern<Stream>(hwr.BeginGetRequestStream, hwr.EndGetRequestStream)()
  101. .SelectMany(x => WriteAsyncRx(x, buf, 0, buf.Length))
  102. .SelectMany(_ => Observable.FromAsyncPattern<WebResponse>(hwr.BeginGetResponse, hwr.EndGetResponse)())
  103. .Multicast(ret).Connect();
  104. }, BlobCache.TaskpoolScheduler);
  105. return ret;
  106. });
  107. return request.Timeout(timeout ?? TimeSpan.FromSeconds(15), BlobCache.TaskpoolScheduler).Retry(retries);
  108. }
  109. private static WebRequest CreateWebRequest(Uri uri, IDictionary<string, string> headers)
  110. {
  111. var hwr = WebRequest.Create(uri);
  112. if (headers != null)
  113. {
  114. foreach (var x in headers)
  115. {
  116. hwr.Headers[x.Key] = x.Value;
  117. }
  118. }
  119. return hwr;
  120. }
  121. private static IObservable<Unit> WriteAsyncRx(Stream stream, byte[] data, int start, int length)
  122. {
  123. return stream.WriteAsync(data, start, length).ToObservable();
  124. }
  125. }
  126. }