using Abp.Threading;
using Abp.UI;
using IwbZero.ToolCommon.LogHelpers;
using IwbZero.ToolCommon.StringModel;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Options;
using MQTTnet.Client.Receiving;
namespace WeMessageService
{
public class MqttNetMsgClient
{
public MqttNetMsgClient()
{
HasReconnectCount = 0;
ConnectedCount = 0;
}
private IMqttClient Client { get; set; }
private int ConnectedCount { get; set; }
private readonly object _lock = new object();
private int HasReconnectCount { get; set; }
private string ServerIp { get; set; }
private int ServerPort { get; set; }
private string ClientId { get; set; }
private string ServerUserId { get; set; }
private string ServerPassword { get; set; }
public bool IsConnected => Client?.IsConnected ?? false;
public IMqttApplicationMessageReceivedHandler ReceivedHandler { get; set; }
public bool NotValidate => (ServerIp.IsEmpty() || ServerPort == 0 || ServerIp.IsEmpty() ||
ClientId.IsEmpty() || ServerUserId.IsEmpty() || ServerPassword.IsEmpty());
///
/// 配置参数
///
///
///
///
///
///
///
public void SetOptions(string serverIp, int serverPort, string clientId, string serverUserId, string serverPassword, IMqttApplicationMessageReceivedHandler receivedHandler)
{
ServerIp = serverIp;
ServerPort = serverPort;
ClientId = clientId;
ServerUserId = serverUserId;
ServerPassword = serverPassword;
ReceivedHandler = receivedHandler;
}
///
/// 订阅主题
///
///
public void SubscribeTopic(string topic)
{
var flag = CheckClient();
if (flag)
{
topic = $"{ClientId}/{topic}";
AsyncHelper.RunSync(() => Client.SubscribeAsync(topic));
}
}
///
/// 发送消息
///
///
///
///
public void SendMsg(string topic, string msg, string clientId = null)
{
clientId = clientId ?? ClientId;
var flag = CheckClient();
if (flag)
{
var message = new MqttApplicationMessageBuilder()
.WithTopic($"{clientId}/{topic}")
.WithPayload(msg)
.Build();
AsyncHelper.RunSync(() => Client.PublishAsync(message));
this.LogInfo($"【SEND-MSG:[{topic}][{msg}]");
}
}
///
/// 发送消息
///
///
///
public void SendMessage(string topic, string msg)
{
var flag = CheckClient();
if (flag)
{
var message = new MqttApplicationMessageBuilder()
.WithTopic($"{topic}")
.WithPayload(msg)
.Build();
AsyncHelper.RunSync(() => Client.PublishAsync(message));
this.LogInfo($"【SEND-MSG:[{topic}][{msg}]");
}
}
///
/// 检查客户端是否连接
///
///
private bool CheckClient()
{
if (NotValidate)
{
throw new UserFriendlyException("客户端配置参数不合法,请检查后再试!");
}
if (Client == null)
{
Client = CreateClient();
ClientConnect();
}
else if (!Client.IsConnected)
{
AsyncHelper.RunSync(Client.ReconnectAsync);
}
if (!Client.IsConnected)
{
throw new UserFriendlyException("消息客户端未连接,请检查!");
}
return Client.IsConnected;
}
///
/// 创建客户端
///
///
private IMqttClient CreateClient()
{
var client = new MqttFactory().CreateMqttClient();
client.ConnectedHandler = new MqttClientConnectedHandlerDelegate(arg =>
{
HasReconnectCount = 0;
ChangeCount(1);
this.LogInfo($"Client Connected, Count:{ConnectedCount}");
});
client.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(async arg =>
{
ChangeCount(-1);
if (arg.Exception?.GetType() == typeof(MQTTnet.Exceptions.MqttCommunicationTimedOutException))
{
this.LogInfo("TTT");
}
HasReconnectCount++;
if (HasReconnectCount < 5)
{
if (Client == null)
{
Client = CreateClient();
ClientConnect();
}
else
{
await Client.ReconnectAsync();
}
return;
}
string strMsg = $"Client Disconnected, Count:{ConnectedCount}\r\n ";
if (arg.Exception != null)
strMsg = strMsg + $"exception:{arg.Exception.Message}\r\n";
if (arg.AuthenticateResult != null)
strMsg = strMsg + $"authReason:{arg.AuthenticateResult.ReasonString}\r\n";
this.LogInfo(strMsg);
});
client.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(ReceivedHandlerBase);
return client;
}
///
/// 连接服务器
///
public void ClientConnect()
{
var options = CreateClientOptions();
AsyncHelper.RunSync(() => Client.ConnectAsync(options));
if (ClientId.IsNotEmpty())
{
AsyncHelper.RunSync(() => Client.SubscribeAsync(ClientId));
}
}
///
/// 客户端配置
///
///
private MqttClientOptions CreateClientOptions()
{
if (NotValidate)
{
throw new UserFriendlyException("客户端配置参数不合法,请检查后再试!");
}
var options = new MqttClientOptions
{
ClientId = ClientId,
ChannelOptions = new MqttClientTcpOptions() { Server = ServerIp, Port = ServerPort },
Credentials = new MqttClientCredentials()
{
Username = ServerUserId,
Password = System.Text.Encoding.Default.GetBytes(ServerPassword)
}
};
return options;
}
///
///连接数变化
///
///
private void ChangeCount(int chg)
{
lock (_lock)
{
ConnectedCount += chg;
}
}
///
/// 接受消息后处理
///
///
private void ReceivedHandlerBase(MqttApplicationMessageReceivedEventArgs arg)
{
string payload = System.Text.Encoding.UTF8.GetString(arg.ApplicationMessage.Payload);
this.LogInfo($"RECEIVED-MSG】, topic:[{arg.ApplicationMessage.Topic}], payload: [{payload}], connectedCount:{ConnectedCount}");
ReceivedHandler?.HandleApplicationMessageReceivedAsync(arg);
}
}
}