using System.Threading.Tasks; using Abp.Threading; using Abp.UI; using IwbZero.ToolCommon.LogHelpers; using IwbZero.ToolCommon.StringModel; using MQTTnet; using MQTTnet.Client; using MQTTnet.Client.Connecting; using MQTTnet.Client.Disconnecting; using MQTTnet.Client.Options; using MQTTnet.Client.Receiving; namespace IwbZero.MqttClient { public class IwbMsgClient : IIwbMsgClient { public IwbMsgClient() { HasReconnectCount = 0; ConnectedCount = 0; } private IMqttClient Client { get; set; } private int ConnectedCount { get; set; } private readonly object _lock = new object(); private int HasReconnectCount { get; set; } public string ServerIp { get; set; } public int ServerPort { get; set; } public string ClientId { get; set; } public string ServerUserId { get; set; } public string ServerPassword { get; set; } public IMqttApplicationMessageReceivedHandler ReceivedHandler { get; set; } public bool NotValidate => (ServerIp.IsEmpty() || ServerPort == 0 || ServerIp.IsEmpty() || ClientId.IsEmpty() || ServerUserId.IsEmpty() || ServerPassword.IsEmpty()); /// /// 配置参数 /// /// /// /// /// /// /// public void SetOptions(string serverIp, int serverPort, string clientId, string serverUserId, string serverPassword, IMqttApplicationMessageReceivedHandler receivedHandler) { ServerIp = serverIp; ServerPort = serverPort; ClientId = clientId; ServerUserId = serverUserId; ServerPassword = serverPassword; ReceivedHandler = receivedHandler; } /// /// 订阅主题 /// /// public async Task SubscribeTopic(string topic) { var flag = await CheckClient(); if (flag) { topic = $"{ClientId}/{topic}"; await Client.SubscribeAsync(topic); } } /// /// 发送消息 /// /// /// /// public async Task SendMsg(string topic, string msg) { await SendMsg(topic, msg, null); } /// /// 发送消息 /// /// /// /// public async Task SendMsg(string topic, string msg, string clientId) { clientId = clientId ?? ClientId; var flag = await CheckClient(); if (flag) { var message = new MqttApplicationMessageBuilder() .WithTopic($"{clientId}/{topic}") .WithPayload(msg) .Build(); await Client.PublishAsync(message); this.LogInfo($"【SEND-MSG:[{topic}][{msg}]"); } } /// /// 检查客户端是否连接 /// /// private async Task CheckClient() { if (NotValidate) { throw new UserFriendlyException("客户端配置参数不合法,请检查后再试!"); } if (Client == null) { Client = CreateClient(); ClientConnect(); } else if (!Client.IsConnected) { await Client.ReconnectAsync(); } if(!Client.IsConnected) { throw new UserFriendlyException("消息客户端未连接,请检查!"); } return Client.IsConnected; } /// /// 创建客户端 /// /// private IMqttClient CreateClient() { var client = new MqttFactory().CreateMqttClient(); client.ConnectedHandler = new MqttClientConnectedHandlerDelegate(arg => { HasReconnectCount = 0; ChangeCount(1); this.LogInfo($"Client Connected, Count:{ConnectedCount}"); }); client.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate( async arg => { ChangeCount(-1); if (arg.Exception?.GetType()== typeof(MQTTnet.Exceptions.MqttCommunicationTimedOutException)) { this.LogInfo("TTT"); } HasReconnectCount++; if (HasReconnectCount < 5) { if (Client == null) { Client = CreateClient(); ClientConnect(); } else { await Client.ReconnectAsync(); } return; } string strMsg = $"Client Disconnected, Count:{ConnectedCount}\r\n "; if (arg.Exception != null) strMsg = strMsg + $"exception:{arg.Exception.Message}\r\n"; if (arg.AuthenticateResult != null) strMsg = strMsg + $"authReason:{arg.AuthenticateResult.ReasonString}\r\n"; this.LogInfo(strMsg); }); client.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(ReceivedHandlerBase); return client; } /// /// 连接服务器 /// public void ClientConnect() { var options = CreateClientOptions(); AsyncHelper.RunSync(() => Client.ConnectAsync(options)); if (ClientId.IsNotEmpty()) { AsyncHelper.RunSync(() => Client.SubscribeAsync(ClientId)); } } /// /// 客户端配置 /// /// private MqttClientOptions CreateClientOptions() { if (NotValidate) { throw new UserFriendlyException("客户端配置参数不合法,请检查后再试!"); } var options = new MqttClientOptions { ClientId = ClientId, ChannelOptions = new MqttClientTcpOptions() { Server = ServerIp, Port = ServerPort }, Credentials = new MqttClientCredentials() { Username = ServerUserId, Password = System.Text.Encoding.Default.GetBytes(ServerPassword) } }; return options; } /// ///连接数变化 /// /// private void ChangeCount(int chg) { lock (_lock) { ConnectedCount += chg; } } /// /// 接受消息后处理 /// /// private void ReceivedHandlerBase(MqttApplicationMessageReceivedEventArgs arg) { string payload = System.Text.Encoding.UTF8.GetString(arg.ApplicationMessage.Payload); this.LogInfo($"Received-Message】, topic:[{arg.ApplicationMessage.Topic}], payload: [{payload}], connectedCount:{ConnectedCount}"); ReceivedHandler?.HandleApplicationMessageReceivedAsync(arg); } } }