| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241 |
- using System.Threading.Tasks;
- using Abp.Threading;
- using Abp.UI;
- using IwbZero.ToolCommon.LogHelpers;
- using IwbZero.ToolCommon.StringModel;
- using MQTTnet;
- using MQTTnet.Client;
- using MQTTnet.Client.Connecting;
- using MQTTnet.Client.Disconnecting;
- using MQTTnet.Client.Options;
- using MQTTnet.Client.Receiving;
- namespace IwbZero.MqttClient
- {
- public class IwbMsgClient : IIwbMsgClient
- {
- public IwbMsgClient()
- {
- HasReconnectCount = 0;
- ConnectedCount = 0;
- }
- private IMqttClient Client { get; set; }
- private int ConnectedCount { get; set; }
- private readonly object _lock = new object();
- private int HasReconnectCount { get; set; }
- public string ServerIp { get; set; }
- public int ServerPort { get; set; }
- public string ClientId { get; set; }
- public string ServerUserId { get; set; }
- public string ServerPassword { get; set; }
- public IMqttApplicationMessageReceivedHandler ReceivedHandler { get; set; }
- public bool NotValidate => (ServerIp.IsEmpty() || ServerPort == 0 || ServerIp.IsEmpty() ||
- ClientId.IsEmpty() || ServerUserId.IsEmpty() || ServerPassword.IsEmpty());
- /// <summary>
- /// 配置参数
- /// </summary>
- /// <param name="serverIp"></param>
- /// <param name="serverPort"></param>
- /// <param name="clientId"></param>
- /// <param name="serverUserId"></param>
- /// <param name="serverPassword"></param>
- /// <param name="receivedHandler"></param>
- public void SetOptions(string serverIp, int serverPort, string clientId, string serverUserId, string serverPassword, IMqttApplicationMessageReceivedHandler receivedHandler)
- {
- ServerIp = serverIp;
- ServerPort = serverPort;
- ClientId = clientId;
- ServerUserId = serverUserId;
- ServerPassword = serverPassword;
- ReceivedHandler = receivedHandler;
- }
- /// <summary>
- /// 订阅主题
- /// </summary>
- /// <param name="topic"></param>
- public async Task SubscribeTopic(string topic)
- {
- var flag = await CheckClient();
- if (flag)
- {
- topic = $"{ClientId}/{topic}";
- await Client.SubscribeAsync(topic);
- }
-
- }
- /// <summary>
- /// 发送消息
- /// </summary>
- /// <param name="topic"></param>
- /// <param name="msg"></param>
- /// <returns></returns>
- public async Task SendMsg(string topic, string msg)
- {
- await SendMsg(topic, msg, null);
- }
- /// <summary>
- /// 发送消息
- /// </summary>
- /// <param name="topic"></param>
- /// <param name="msg"></param>
- /// <param name="clientId"></param>
- public async Task SendMsg(string topic, string msg, string clientId)
- {
- clientId = clientId ?? ClientId;
- var flag = await CheckClient();
- if (flag)
- {
- var message = new MqttApplicationMessageBuilder()
- .WithTopic($"{clientId}/{topic}")
- .WithPayload(msg)
- .Build();
- await Client.PublishAsync(message);
- this.LogInfo($"【SEND-MSG:[{topic}][{msg}]");
- }
- }
- /// <summary>
- /// 检查客户端是否连接
- /// </summary>
- /// <returns></returns>
- private async Task<bool> CheckClient()
- {
- if (NotValidate)
- {
- throw new UserFriendlyException("客户端配置参数不合法,请检查后再试!");
- }
- if (Client == null)
- {
- Client = CreateClient();
- ClientConnect();
- }
- else if (!Client.IsConnected)
- {
- await Client.ReconnectAsync();
- }
- if(!Client.IsConnected)
- {
- throw new UserFriendlyException("消息客户端未连接,请检查!");
- }
- return Client.IsConnected;
- }
- /// <summary>
- /// 创建客户端
- /// </summary>
- /// <returns></returns>
- private IMqttClient CreateClient()
- {
- var client = new MqttFactory().CreateMqttClient();
- client.ConnectedHandler = new MqttClientConnectedHandlerDelegate(arg =>
- {
- HasReconnectCount = 0;
- ChangeCount(1);
- this.LogInfo($"Client Connected, Count:{ConnectedCount}");
- });
- client.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate( async arg =>
- {
- ChangeCount(-1);
- if (arg.Exception?.GetType()== typeof(MQTTnet.Exceptions.MqttCommunicationTimedOutException))
- {
- this.LogInfo("TTT");
- }
- HasReconnectCount++;
- if (HasReconnectCount < 5)
- {
- if (Client == null)
- {
- Client = CreateClient();
- ClientConnect();
- }
- else
- {
- await Client.ReconnectAsync();
- }
- return;
- }
- string strMsg = $"Client Disconnected, Count:{ConnectedCount}\r\n ";
- if (arg.Exception != null)
- strMsg = strMsg + $"exception:{arg.Exception.Message}\r\n";
- if (arg.AuthenticateResult != null)
- strMsg = strMsg + $"authReason:{arg.AuthenticateResult.ReasonString}\r\n";
- this.LogInfo(strMsg);
- });
- client.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(ReceivedHandlerBase);
- return client;
- }
- /// <summary>
- /// 连接服务器
- /// </summary>
- public void ClientConnect()
- {
- var options = CreateClientOptions();
- AsyncHelper.RunSync(() => Client.ConnectAsync(options));
- if (ClientId.IsNotEmpty())
- {
- AsyncHelper.RunSync(() => Client.SubscribeAsync(ClientId));
- }
- }
- /// <summary>
- /// 客户端配置
- /// </summary>
- /// <returns></returns>
- private MqttClientOptions CreateClientOptions()
- {
- if (NotValidate)
- {
- throw new UserFriendlyException("客户端配置参数不合法,请检查后再试!");
- }
- var options = new MqttClientOptions
- {
- ClientId = ClientId,
- ChannelOptions = new MqttClientTcpOptions() { Server = ServerIp, Port = ServerPort },
- Credentials = new MqttClientCredentials()
- {
- Username = ServerUserId,
- Password = System.Text.Encoding.Default.GetBytes(ServerPassword)
- }
- };
- return options;
- }
- /// <summary>
- ///连接数变化
- /// </summary>
- /// <param name="chg"></param>
- private void ChangeCount(int chg)
- {
- lock (_lock)
- {
- ConnectedCount += chg;
- }
- }
- /// <summary>
- /// 接受消息后处理
- /// </summary>
- /// <param name="arg"></param>
- private void ReceivedHandlerBase(MqttApplicationMessageReceivedEventArgs arg)
- {
- string payload = System.Text.Encoding.UTF8.GetString(arg.ApplicationMessage.Payload);
- this.LogInfo($"Received-Message】, topic:[{arg.ApplicationMessage.Topic}], payload: [{payload}], connectedCount:{ConnectedCount}");
- ReceivedHandler?.HandleApplicationMessageReceivedAsync(arg);
- }
- }
- }
|