Browse Source

Added SendMessage queue

tags/docs-0.9
Brandon Smith 9 years ago
parent
commit
467ce68343
9 changed files with 280 additions and 118 deletions
  1. +3
    -0
      src/Discord.Net.Net45/Discord.Net.csproj
  2. +4
    -2
      src/Discord.Net/API/Models/Common.cs
  3. +6
    -0
      src/Discord.Net/DiscordClient.Events.cs
  4. +218
    -105
      src/Discord.Net/DiscordClient.cs
  5. +18
    -0
      src/Discord.Net/DiscordClientConfig.cs
  6. +4
    -3
      src/Discord.Net/DiscordWebSocket.cs
  7. +14
    -2
      src/Discord.Net/Helpers/AsyncCache.cs
  8. +12
    -4
      src/Discord.Net/Message.cs
  9. +1
    -2
      src/Discord.Net/PackedPermissions.cs

+ 3
- 0
src/Discord.Net.Net45/Discord.Net.csproj View File

@@ -76,6 +76,9 @@
<Compile Include="..\Discord.Net\DiscordClient.Events.cs">
<Link>DiscordClient.Events.cs</Link>
</Compile>
<Compile Include="..\Discord.Net\DiscordClientConfig.cs">
<Link>DiscordClientConfig.cs</Link>
</Compile>
<Compile Include="..\Discord.Net\DiscordWebSocket.cs">
<Link>DiscordWebSocket.cs</Link>
</Compile>


+ 4
- 2
src/Discord.Net/API/Models/Common.cs View File

@@ -211,7 +211,7 @@ namespace Discord.API.Models
}
public sealed class Embed
{
public sealed class ProviderInfo
public sealed class Reference
{
[JsonProperty(PropertyName = "url")]
public string Url;
@@ -238,8 +238,10 @@ namespace Discord.API.Models
public string Title;
[JsonProperty(PropertyName = "description")]
public string Description;
[JsonProperty(PropertyName = "author")]
public Reference Author;
[JsonProperty(PropertyName = "provider")]
public ProviderInfo Provider;
public Reference Provider;
[JsonProperty(PropertyName = "thumbnail")]
public ThumbnailInfo Thumbnail;
}


+ 6
- 0
src/Discord.Net/DiscordClient.Events.cs View File

@@ -127,6 +127,12 @@ namespace Discord
if (MessageRead != null)
MessageRead(this, new MessageEventArgs(msg));
}
public event EventHandler<MessageEventArgs> MessageSent;
private void RaiseMessageSent(Message msg)
{
if (MessageSent != null)
MessageSent(this, new MessageEventArgs(msg));
}

//Role
public sealed class RoleEventArgs : EventArgs


+ 218
- 105
src/Discord.Net/DiscordClient.cs View File

@@ -3,6 +3,7 @@ using Discord.API.Models;
using Discord.Helpers;
using Newtonsoft.Json;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
@@ -16,8 +17,11 @@ namespace Discord
/// <summary> Provides a connection to the DiscordApp service. </summary>
public partial class DiscordClient
{
private DiscordClientConfig _config;
private DiscordWebSocket _webSocket;
private ManualResetEventSlim _isStopping;
private ManualResetEventSlim _blockEvent;
private volatile CancellationTokenSource _disconnectToken;
private volatile Task _tasks;
private readonly Regex _userRegex, _channelRegex;
private readonly MatchEvaluator _userRegexEvaluator, _channelRegexEvaluator;
private readonly JsonSerializer _serializer;
@@ -32,43 +36,40 @@ namespace Discord
/// <summary> Returns a collection of all users the client can see across all servers. </summary>
/// <remarks> This collection does not guarantee any ordering. </remarks>
public IEnumerable<User> Users => _users;
private AsyncCache<User, API.Models.UserReference> _users;
private readonly AsyncCache<User, API.Models.UserReference> _users;

/// <summary> Returns a collection of all servers the client is a member of. </summary>
/// <remarks> This collection does not guarantee any ordering. </remarks>
public IEnumerable<Server> Servers => _servers;
private AsyncCache<Server, API.Models.ServerReference> _servers;
private readonly AsyncCache<Server, API.Models.ServerReference> _servers;

/// <summary> Returns a collection of all channels the client can see across all servers. </summary>
/// <remarks> This collection does not guarantee any ordering. </remarks>
public IEnumerable<Channel> Channels => _channels;
private AsyncCache<Channel, API.Models.ChannelReference> _channels;
private readonly AsyncCache<Channel, API.Models.ChannelReference> _channels;

/// <summary> Returns a collection of all messages the client has in cache. </summary>
/// <remarks> This collection does not guarantee any ordering. </remarks>
public IEnumerable<Message> Messages => _messages;
private AsyncCache<Message, API.Models.MessageReference> _messages;
private readonly AsyncCache<Message, API.Models.MessageReference> _messages;
private readonly ConcurrentQueue<Message> _pendingMessages;

/// <summary> Returns a collection of all roles the client can see across all servers. </summary>
/// <remarks> This collection does not guarantee any ordering. </remarks>
public IEnumerable<Role> Roles => _roles;
private AsyncCache<Role, API.Models.Role> _roles;
private readonly AsyncCache<Role, API.Models.Role> _roles;

/// <summary> Returns true if the user has successfully logged in and the websocket connection has been established. </summary>
public bool IsConnected => _isReady;
private bool _isReady;

/// <summary> Gets or sets the time (in milliseconds) to wait after an unexpected disconnect before reconnecting. </summary>
public int ReconnectDelay { get; set; } = 1000;
/// <summary> Gets or sets the time (in milliseconds) to wait after an reconnect fails before retrying. </summary>
public int FailedReconnectDelay { get; set; } = 10000;

public bool IsConnected => _isConnected;
private bool _isConnected;
/// <summary> Initializes a new instance of the DiscordClient class. </summary>
public DiscordClient()
public DiscordClient(DiscordClientConfig config = null)
{
_rand = new Random();
_isStopping = new ManualResetEventSlim(false);
_blockEvent = new ManualResetEventSlim(false);

_config = config ?? new DiscordClientConfig();
_rand = new Random();

_serializer = new JsonSerializer();
#if TEST_RESPONSES
@@ -79,7 +80,7 @@ namespace Discord
_userRegex = new Regex(@"<@\d+?>", RegexOptions.Compiled);
_channelRegex = new Regex(@"<#\d+?>", RegexOptions.Compiled);
_userRegexEvaluator = new MatchEvaluator(e =>
{
{
string id = e.Value.Substring(2, e.Value.Length - 3);
var user = _users[id];
if (user != null)
@@ -125,7 +126,7 @@ namespace Discord
server.UpdateMember(membership);
foreach (var membership in extendedModel.Presences)
server.UpdateMember(membership);
}
}
},
server => { }
);
@@ -162,7 +163,7 @@ namespace Discord
channel.PermissionOverwrites = null;
}
},
channel =>
channel =>
{
if (channel.IsPrivate)
{
@@ -206,12 +207,20 @@ namespace Discord
};
if (x.Provider != null)
{
embed.Provider = new Message.EmbedProvider
embed.Provider = new Message.EmbedReference
{
Url = x.Provider.Url,
Name = x.Provider.Name
};
}
if (x.Author != null)
{
embed.Author = new Message.EmbedReference
{
Url = x.Author.Url,
Name = x.Author.Name
};
}
if (x.Thumbnail != null)
{
embed.Thumbnail = new Message.File
@@ -240,13 +249,14 @@ namespace Discord
},
message => { }
);
_pendingMessages = new ConcurrentQueue<Message>();
_roles = new AsyncCache<Role, API.Models.Role>(
(key, parentKey) => new Role(key, parentKey, this),
(role, model) =>
{
role.Name = model.Name;
role.Permissions.RawValue = (uint)model.Permissions;
},
},
role => { }
);
_users = new AsyncCache<User, API.Models.UserReference>(
@@ -261,22 +271,22 @@ namespace Discord
var extendedModel = model as SelfUserInfo;
user.Email = extendedModel.Email;
user.IsVerified = extendedModel.IsVerified;
}
}
},
user => { }
);

_webSocket = new DiscordWebSocket();
_webSocket = new DiscordWebSocket(_config.WebSocketInterval);
_webSocket.Connected += (s, e) => RaiseConnected();
_webSocket.Disconnected += async (s, e) =>
{
//Reconnect if we didn't cause the disconnect
RaiseDisconnected();
while (!_isStopping.IsSet)
while (!_disconnectToken.IsCancellationRequested)
{
try
{
await Task.Delay(ReconnectDelay);
await Task.Delay(_config.ReconnectDelay);
await _webSocket.ConnectAsync(Endpoints.WebSocket_Hub, true);
break;
}
@@ -284,7 +294,7 @@ namespace Discord
{
RaiseOnDebugMessage($"Reconnect Failed: {ex.Message}");
//Net is down? We can keep trying to reconnect until the user runs Disconnect()
await Task.Delay(FailedReconnectDelay);
await Task.Delay(_config.FailedReconnectDelay);
}
}
};
@@ -308,7 +318,7 @@ namespace Discord
_servers.Update(server.Id, server);
foreach (var channel in data.PrivateChannels)
_channels.Update(channel.Id, null, channel);
}
}
break;

//Servers
@@ -446,8 +456,23 @@ namespace Discord
case "MESSAGE_CREATE":
{
var data = e.Event.ToObject<WebSocketEvents.MessageCreate>(_serializer);
var msg = _messages.Update(data.Id, data.ChannelId, data);
Message msg = null;
bool wasLocal = _config.UseMessageQueue && data.Author.Id == UserId && data.Nonce != null;
if (wasLocal)
{
msg = _messages.Remap("nonce" + data.Nonce, data.Id);
if (msg != null)
{
msg.IsQueued = false;
msg.Id = data.Id;
}
}
msg = _messages.Update(data.Id, data.ChannelId, data);
msg.User.UpdateActivity(data.Timestamp);
if (wasLocal)
{
try { RaiseMessageSent(msg); } catch { }
}
try { RaiseMessageCreated(msg); } catch { }
}
break;
@@ -549,6 +574,50 @@ namespace Discord
_webSocket.OnDebugMessage += (s, e) => RaiseOnDebugMessage(e.Message);
}
private async Task SendAsync()
{
var cancelToken = _disconnectToken.Token;
try
{
Message msg;
while (!cancelToken.IsCancellationRequested)
{
while (_pendingMessages.TryDequeue(out msg))
{
bool hasFailed = false;
APIResponses.SendMessage apiMsg = null;
try
{
apiMsg = await DiscordAPI.SendMessage(msg.ChannelId, msg.RawText, msg.MentionIds, msg.Nonce);
}
catch (WebException) { break; }
catch (HttpException) { hasFailed = true; }

if (!hasFailed)
{
_messages.Remap("nonce_", apiMsg.Id);
_messages.Update(msg.Id, msg.ChannelId, apiMsg);
}
msg.IsQueued = false;
msg.HasFailed = hasFailed;
try { RaiseMessageSent(msg); } catch { }
}
await Task.Delay(_config.MessageQueueInterval);
}
}
catch { }
finally { _disconnectToken.Cancel(); }
}
private async Task EmptyAsync()
{
var cancelToken = _disconnectToken.Token;
try
{
await Task.Delay(-1, cancelToken);
}
catch { }
}


/// <summary> Returns the user with the specified id, or null if none was found. </summary>
public User GetUser(string id) => _users[id];
@@ -727,77 +796,109 @@ namespace Discord

//Auth
/// <summary> Connects to the Discord server with the provided token. </summary>
public async Task Connect(string token)
{
_isStopping.Reset();
Http.Token = token;
await _webSocket.ConnectAsync(Endpoints.WebSocket_Hub, true);

_isReady = true;
}
public Task<string> Connect(string token)
=> ConnectInternal(null, null, token);
/// <summary> Connects to the Discord server with the provided email and password. </summary>
/// <returns> Returns a token for future connections. </returns>
public async Task<string> Connect(string email, string password)
{
_isStopping.Reset();

//Open websocket while we wait for login response
Task socketTask = _webSocket.ConnectAsync(Endpoints.WebSocket_Hub, false);
var response = await DiscordAPI.Login(email, password);
Http.Token = response.Token;

//Wait for websocket to finish connecting, then send token
await socketTask;
_webSocket.Login();

_isReady = true;
return response.Token;
}
public Task<string> Connect(string email, string password)
=> ConnectInternal(email, password, null);
/// <summary> Connects to the Discord server with the provided token, and will fall back to username and password. </summary>
/// <returns> Returns a token for future connections. </returns>
public async Task<string> Connect(string email, string password, string token)
public Task<string> Connect(string email, string password, string token)
=> ConnectInternal(email, password, token);
/// <summary> Connects to the Discord server as an anonymous user with the provided username. </summary>
/// <returns> Returns a token for future connections. </returns>
public Task<string> ConnectAnonymous(string username)
=> ConnectInternal(username, null, null);
public async Task<string> ConnectInternal(string emailOrUsername, string password, string token)
{
try
bool success = false;
await Disconnect();
_blockEvent.Reset();
_disconnectToken = new CancellationTokenSource();

//Connect by Token
if (token != null)
{
await Connect(token);
return token;
try
{
Http.Token = token;
await _webSocket.ConnectAsync(Endpoints.WebSocket_Hub, true);
success = true;
}
catch (InvalidOperationException) //Bad Token
{
if (password == null) //If we don't have an alternate login, throw this error
throw;
}
}
catch (InvalidOperationException) //Bad Token
if (!success && password != null) //Email/Password login
{
return await Connect(email, password);
//Open websocket while we wait for login response
Task socketTask = _webSocket.ConnectAsync(Endpoints.WebSocket_Hub, false);
var response = await DiscordAPI.Login(emailOrUsername, password);
await socketTask;

//Wait for websocket to finish connecting, then send token
token = response.Token;
Http.Token = token;
_webSocket.Login();
success = true;
}
}
/// <summary> Connects to the Discord server as an anonymous user with the provided username. </summary>
/// <returns> Returns a token for future connections. </returns>
public async Task<string> ConnectAnonymous(string username)
{
_isStopping.Reset();

//Open websocket while we wait for login response
Task socketTask = _webSocket.ConnectAsync(Endpoints.WebSocket_Hub, false);
var response = await DiscordAPI.LoginAnonymous(username);
Http.Token = response.Token;

//Wait for websocket to finish connecting, then send token
await socketTask;
_webSocket.Login();

_isReady = true;
return response.Token;
}
if (!success && password == null) //Anonymous login
{
//Open websocket while we wait for login response
Task socketTask = _webSocket.ConnectAsync(Endpoints.WebSocket_Hub, false);
var response = await DiscordAPI.LoginAnonymous(emailOrUsername);
await socketTask;

//Wait for websocket to finish connecting, then send token
token = response.Token;
Http.Token = token;
_webSocket.Login();
success = true;
}
if (success)
{
var cancelToken = _disconnectToken.Token;
if (_config.UseMessageQueue)
_tasks = Task.WhenAll(await Task.Factory.StartNew(SendAsync, cancelToken, TaskCreationOptions.LongRunning, TaskScheduler.Default));
else
_tasks = Task.WhenAll(await Task.Factory.StartNew(EmptyAsync, cancelToken, TaskCreationOptions.LongRunning, TaskScheduler.Default));
_tasks = _tasks.ContinueWith(async x =>
{
await _webSocket.DisconnectAsync();

//Do not clean up until all tasks have ended
_webSocket.Dispose();
_webSocket = null;
_blockEvent.Dispose();
_blockEvent = null;
_tasks = null;

//Clear send queue
Message ignored;
while (_pendingMessages.TryDequeue(out ignored)) { }

_channels.Clear();
_messages.Clear();
_roles.Clear();
_servers.Clear();
_users.Clear();
});
_isConnected = true;
}
else
token = null;
return token;
}
/// <summary> Disconnects from the Discord server, canceling any pending requests. </summary>
public async Task Disconnect()
{
_isReady = false;
_isStopping.Set();
await _webSocket.DisconnectAsync();

_channels.Clear();
_messages.Clear();
_roles.Clear();
_servers.Clear();
_users.Clear();
_blockEvent.Set();

if (_tasks != null)
await _tasks;
}

//Servers
@@ -1004,26 +1105,36 @@ namespace Discord
{
CheckReady();
if (text.Length <= 2000)
int blockCount = (int)Math.Ceiling(text.Length / (double)DiscordAPI.MaxMessageSize);
Message[] result = new Message[blockCount];
for (int i = 0; i < blockCount; i++)
{
var nonce = GenerateNonce();
var msg = await DiscordAPI.SendMessage(channelId, text, mentions, nonce);
return new Message[] { _messages.Update(msg.Id, channelId, msg) };
}
else
{
int blockCount = (int)Math.Ceiling(text.Length / (double)DiscordAPI.MaxMessageSize);
Message[] result = new Message[blockCount];
for (int i = 0; i < blockCount; i++)
int index = i * DiscordAPI.MaxMessageSize;
string blockText = text.Substring(index, Math.Min(2000, text.Length - index));
var nonce = GenerateNonce();
if (_config.UseMessageQueue)
{
var msg = _messages.Update("nonce_" + nonce, channelId, new API.Models.Message
{
Content = blockText,
Timestamp = DateTime.UtcNow,
Author = new UserReference { Avatar = User.AvatarId, Discriminator = User.Discriminator, Id = User.Id, Username = User.Name },
ChannelId = channelId
});
msg.IsQueued = true;
msg.Nonce = nonce;
_pendingMessages.Enqueue(msg);
}
else
{
int index = i * DiscordAPI.MaxMessageSize;
var nonce = GenerateNonce();
var msg = await DiscordAPI.SendMessage(channelId, text.Substring(index, Math.Min(2000, text.Length - index)), mentions, nonce);
var msg = await DiscordAPI.SendMessage(channelId, blockText, mentions, nonce);
result[i] = _messages.Update(msg.Id, channelId, msg);
await Task.Delay(1000);
result[i].Nonce = nonce;
try { RaiseMessageSent(result[i]); } catch { }
}
return result;
await Task.Delay(1000);
}
return result;
}

/// <summary> Edits a message the provided message. </summary>
@@ -1180,7 +1291,9 @@ namespace Discord
//Helpers
private void CheckReady()
{
if (!_isReady)
if (_blockEvent.IsSet)
throw new InvalidOperationException("The client is currently disconnecting.");
else if (!_isConnected)
throw new InvalidOperationException("The client is not currently connected to Discord");
}
internal string CleanMessageText(string text)
@@ -1198,7 +1311,7 @@ namespace Discord
/// <summary> Blocking call that will not return until client has been stopped. This is mainly intended for use in console applications. </summary>
public void Block()
{
_isStopping.Wait();
_blockEvent.Wait();
}
}
}

+ 18
- 0
src/Discord.Net/DiscordClientConfig.cs View File

@@ -0,0 +1,18 @@
namespace Discord
{
public class DiscordClientConfig
{
/// <summary> Gets or sets the time (in milliseconds) to wait after an unexpected disconnect before reconnecting. </summary>
public int ReconnectDelay { get; set; } = 1000;
/// <summary> Gets or sets the time (in milliseconds) to wait after an reconnect fails before retrying. </summary>
public int FailedReconnectDelay { get; set; } = 10000;
/// <summary> Gets or sets the time (in milliseconds) to wait when the websocket's message queue is empty before checking again. </summary>
public int WebSocketInterval { get; set; } = 100;
/// <summary> Enables or disables the internal message queue. This will allow SendMessage to return immediately and handle messages internally. Messages will set the IsQueued and HasFailed properties to show their progress. </summary>
public bool UseMessageQueue { get; set; } = false;
/// <summary> Gets or sets the time (in milliseconds) to wait when the message queue is empty before checking again. </summary>
public int MessageQueueInterval { get; set; } = 100;

public DiscordClientConfig() { }
}
}

+ 4
- 3
src/Discord.Net/DiscordWebSocket.cs View File

@@ -21,13 +21,14 @@ namespace Discord
private volatile CancellationTokenSource _disconnectToken;
private volatile Task _tasks;
private ConcurrentQueue<byte[]> _sendQueue;
private int _heartbeatInterval;
private int _heartbeatInterval, _sendInterval;
private DateTime _lastHeartbeat;
private ManualResetEventSlim _connectWaitOnLogin, _connectWaitOnLogin2;
private bool _isConnected;

public DiscordWebSocket()
public DiscordWebSocket(int interval)
{
_sendInterval = interval;
_connectWaitOnLogin = new ManualResetEventSlim(false);
_connectWaitOnLogin2 = new ManualResetEventSlim(false);
@@ -186,7 +187,7 @@ namespace Discord
}
while (_sendQueue.TryDequeue(out bytes))
await SendMessage(bytes, cancelToken);
await Task.Delay(100);
await Task.Delay(_sendInterval);
}
}
catch { }


+ 14
- 2
src/Discord.Net/Helpers/AsyncCache.cs View File

@@ -33,8 +33,20 @@ namespace Discord.Helpers
return value;
}
}
public TValue Update(string key, TModel model)

public TValue Add(string key, TValue obj)
{
_dictionary[key] = obj;
return obj;
}
public TValue Remap(string oldKey, string newKey)
{
var obj = Remove(oldKey);
if (obj != null)
Add(newKey, obj);
return obj;
}
public TValue Update(string key, TModel model)
{
return Update(key, null, model);
}


+ 12
- 4
src/Discord.Net/Message.cs View File

@@ -26,12 +26,14 @@ namespace Discord
public string Title { get; internal set; }
/// <summary> Summary of this embed. </summary>
public string Description { get; internal set; }
/// <summary> Returns information about the author of this embed. </summary>
public EmbedReference Author { get; internal set; }
/// <summary> Returns information about the providing website of this embed. </summary>
public EmbedProvider Provider { get; internal set; }
public EmbedReference Provider { get; internal set; }
/// <summary> Returns the thumbnail of this embed. </summary>
public File Thumbnail { get; internal set; }
}
public sealed class EmbedProvider
public sealed class EmbedReference
{
/// <summary> URL of this embed provider. </summary>
public string Url { get; internal set; }
@@ -53,8 +55,10 @@ namespace Discord
private readonly DiscordClient _client;
private string _cleanText;
/// <summary> Returns the unique identifier for this message. </summary>
public string Id { get; }
/// <summary> Returns the global unique identifier for this message. </summary>
public string Id { get; internal set; }
/// <summary> Returns the local unique identifier for this message. </summary>
public string Nonce { get; internal set; }

/// <summary> Returns true if the logged-in user was mentioned. </summary>
/// <remarks> This is not set to true if the user was mentioned with @everyone (see IsMentioningEverone). </remarks>
@@ -63,6 +67,10 @@ namespace Discord
public bool IsMentioningEveryone { get; internal set; }
/// <summary> Returns true if the message was sent as text-to-speech by someone with permissions to do so. </summary>
public bool IsTTS { get; internal set; }
/// <summary> Returns true if the message is still in the outgoing message queue. </summary>
public bool IsQueued { get; internal set; }
/// <summary> Returns true if the message was rejected by the server. </summary>
public bool HasFailed { get; internal set; }
/// <summary> Returns the raw content of this message as it was received from the server.. </summary>
public string RawText { get; internal set; }
/// <summary> Returns the content of this message with any special references such as mentions converted. </summary>


+ 1
- 2
src/Discord.Net/PackedPermissions.cs View File

@@ -14,8 +14,7 @@
public bool General_BanMembers => ((_rawValue >> 1) & 0x1) == 1;
/// <summary> If True, a user may kick users from the server. </summary>
public bool General_KickMembers => ((_rawValue >> 2) & 0x1) == 1;
/// <summary> If True, a user adjust roles. </summary>
/// <remarks> Having this permission effectively gives all the others as a user may add them to themselves. </remarks>
/// <summary> If True, a user may adjust roles. This also bypasses all other permissions, granting all the others. </summary>
public bool General_ManageRoles => ((_rawValue >> 3) & 0x1) == 1;
/// <summary> If True, a user may create, delete and modify channels. </summary>
public bool General_ManageChannels => ((_rawValue >> 4) & 0x1) == 1;


Loading…
Cancel
Save