| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416 |
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Net;
- using System.Text;
- using System.Threading;
- using Assets.Plugins.CommonTool;
- using Assets.Plugins.Log;
- using uPLibrary.Networking.M2Mqtt;
- using uPLibrary.Networking.M2Mqtt.Exceptions;
- using uPLibrary.Networking.M2Mqtt.Messages;
- using Timer = System.Timers.Timer;
- //using uPLibrary.Networking.M2Mqtt;
- //using uPLibrary.Networking.M2Mqtt.Messages;
- namespace Assets.Plugins.WeMessageService
- {
- public class MessageM2MqttClient
- {
- public MessageM2MqttClient()
- {
- TryReconnectCount = 0;
- ToClose = false;
- TopicList = new List<string>();
- HeartbeatTopic = new List<string>();
- HeartbeatSwitch = false;
- HeartbeatInterval = 5;
- }
- public MessageM2MqttClient(bool heartbeatSwitch,double heartbeatInterval=5):this()
- {
- HeartbeatSwitch = heartbeatSwitch;
- HeartbeatInterval = heartbeatInterval;
- }
- private MqttClient Client { get; set; }
- private string ServerIp { get; set; }
- private int ServerPort { get; set; }
- private string ClientId { get; set; }
- private string ServerUserId { get; set; }
- private string ServerPassword { get; set; }
- private List<string> TopicList { get; }
- private byte[] QosLevels { get; set; } = { MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE };
- private int TryReconnectCount { get; set; }
- private bool ToClose { get; set; }
- private bool HeartbeatSwitch { get; set; }
- public double HeartbeatInterval { get; set; }
- private Timer Timer { get; set; }
- public MqttClient.MqttMsgPublishEventHandler MsgReceivedHandler { get; set; }
- public bool NotValidate => (ServerIp.IsEmpty() || ServerPort == 0 || ServerIp.IsEmpty() ||
- ClientId.IsEmpty() || ServerUserId.IsEmpty() || ServerPassword.IsEmpty());
- public List<string> HeartbeatTopic { get; set; }
- public bool IsConnected => Client?.IsConnected ?? false;
- /// <summary>
- /// 配置参数
- /// </summary>
- /// <param name="serverIp"></param>
- /// <param name="serverPort"></param>
- /// <param name="clientId"></param>
- /// <param name="serverUserId"></param>
- /// <param name="serverPassword"></param>
- public void SetOptions(string serverIp, int serverPort, string clientId, string serverUserId, string serverPassword)
- {
- ServerIp = serverIp;
- ServerPort = serverPort;
- ClientId = clientId;
- ServerUserId = serverUserId;
- ServerPassword = serverPassword;
- ToClose = false;
- TryReconnectCount = 0;
- }
- /// <summary>
- /// 配置消息接受回调事件
- /// </summary>
- /// <param name="handler"></param>
- public void SetReceivedHandler(MqttClient.MqttMsgPublishEventHandler handler)
- {
- MsgReceivedHandler = handler;
- }
- /// <summary>
- /// 配置消息质量
- /// </summary>
- /// <param name="qosLevels"></param>
- public void SetQosLevels(byte[] qosLevels)
- {
- QosLevels = qosLevels;
- }
- /// <summary>
- /// 订阅主题
- /// </summary>
- /// <param name="topic"></param>
- /// <param name="withClient"></param>
- public void SubscribeTopic(string topic,bool withClient=true)
- {
- if (CheckClient())
- {
- topic = withClient ? $"{ClientId}/{topic}" : $"{topic}";
- if (!TopicList.Contains(topic))
- {
- TopicList.Add(topic);
- }
- Client.Subscribe(new[] { topic }, QosLevels);
- this.LogInfo("SUB-TOPIC:" + topic);
- }
- }
- /// <summary>
- /// 发送消息
- /// </summary>
- /// <param name="topic"></param>
- /// <param name="msg"></param>
- /// <param name="clientId"></param>
- public void SendMsg(string topic, string msg, string clientId = null)
- {
- if (CheckClient())
- {
- var msgContent = Encoding.UTF8.GetBytes(msg);
- clientId ??= ClientId;
- var targetClientIds = clientId.Split('@');
- foreach (var targetClientId in targetClientIds)
- {
- topic = $"{targetClientId}/{topic}";
- Client.Publish(topic, msgContent);
- }
- this.LogInfo($"【SEND-MSG:[{clientId}] [{topic}][{(msg.Length > 100 ? msg.Substring(0, 100) : msg)}]");
- }
-
- }
- /// <summary>
- /// 发送消息
- /// </summary>
- /// <param name="topic"></param>
- /// <param name="msg"></param>
- public void SendMessage(string topic, string msg)
- {
- if (CheckClient())
- {
- Client.Publish(topic, Encoding.UTF8.GetBytes(msg));
- this.LogInfo($"【SEND-MSG:[{topic}][{(msg.Length>100? msg.Substring(0,100):msg)}]");
- }
-
- }
- /// <summary>
- /// 断开连接
- /// </summary>
- public void DisConnect()
- {
- if (IsConnected)
- {
- ToClose = true;
- try
- {
- Client.Disconnect();
- }
- catch (MqttClientException e)
- {
- this.LogError($"Connect Exception:[{TryReconnectCount}]-[{e.Message}]");
- }
- catch (MqttCommunicationException e)
- {
- this.LogError($"Connect Exception:[{TryReconnectCount}]-[{e.Message}]");
- }
- catch (MqttConnectionException e)
- {
- this.LogError($"Connect Exception:[{TryReconnectCount}]-[{e.Message}]");
- }
- catch (MqttTimeoutException e)
- {
- this.LogError($"Connect Exception:[{TryReconnectCount}]-[{e.Message}]");
- }
- catch (Exception e)
- {
- this.LogError($"Connect Exception:[{TryReconnectCount}]-[{e.Message}]");
- }
- }
- }
- /// <summary>
- /// 检查客户端是否连接
- /// </summary>
- /// <returns></returns>
- public bool CheckClient()
- {
- if (IsConnected)
- return true;
- if (NotValidate)
- {
- //throw new UserFriendlyException("客户端配置参数不合法,请检查后再试!");
- this.LogError($"客户端{ClientId}配置参数[{ServerIp}:{ServerPort}/{ServerUserId}/{ServerPassword}]不合法,请检查后再试!");
- return false;
- }
-
- if (Client == null)
- {
- Client = CreateClient();
- Connect();
- }
- else if (!IsConnected)
- {
- Connect();
- }
- if (!IsConnected)
- {
- //throw new UserFriendlyException("消息客户端未连接,请检查!");
- this.LogError("消息客户端未连接,请检查!");
- return false;
- }
- return true;
- }
- /// <summary>
- /// 创建客户端
- /// </summary>
- /// <returns></returns>
- private MqttClient CreateClient()
- {
- if (NotValidate)
- {
- //throw new UserFriendlyException("客户端配置参数不合法,请检查后再试!");
- this.LogError($"客户端{ClientId}配置参数[{ServerIp}:{ServerPort}/{ServerUserId}/{ServerPassword}]不合法,请检查后再试!");
- return null;
- }
- try
- {
- ToClose = false;
- var client = new MqttClient(ServerIp, ServerPort, false, null, null, MqttSslProtocols.None);
- client.ConnectionClosed += ClientClosed;
- //client.MqttMsgSubscribed += MsgSubscribed;
- client.MqttMsgUnsubscribed += MsgUnSubscribed;
- client.MqttMsgPublished += MsgPublished;
- client.MqttMsgPublishReceived += MsgPublishReceived;
- if (MsgReceivedHandler != null)
- {
- client.MqttMsgPublishReceived += MsgReceivedHandler;
- }
- return client;
- }
- catch(MqttClientException e)
- {
- this.LogError("客户端创建失败");
- this.LogError(e);
- //throw new UserFriendlyException("客户端创建失败,请检查后再试!");
- }
- catch(Exception e)
- {
- this.LogError("客户端创建失败");
- this.LogError(e);
- //throw new UserFriendlyException("客户端创建失败,请检查后再试!");
- }
- return null;
- }
- private void Connect()
- {
- if (NotValidate)
- {
- //throw new UserFriendlyException("客户端配置参数不合法,请检查后再试!");
- this.LogError($"客户端{ClientId}配置参数[{ServerIp}:{ServerPort}/{ServerUserId}/{ServerPassword}]不合法,请检查后再试!");
- return;
- }
- if (ToClose)
- return;
- try
- {
- Client?.Connect(ClientId, ServerUserId, ServerPassword);
- }
- catch (MqttClientException e)
- {
- this.LogError($"Connect Exception:[{TryReconnectCount}]-[{e.Message}]");
- this.LogError(e);
- }
- catch (MqttCommunicationException e)
- {
- this.LogError($"Connect Exception:[{TryReconnectCount}]-[{e.Message}]");
- this.LogError(e);
- }
- catch (MqttConnectionException e)
- {
- this.LogError($"Connect Exception:[{TryReconnectCount}]-[{e.Message}]");
- this.LogError(e);
- }
- catch (MqttTimeoutException e)
- {
- this.LogError($"Connect Exception:[{TryReconnectCount}]-[{e.Message}]");
- this.LogError(e);
- }
- catch (Exception e)
- {
- this.LogError($"Connect Exception:[{TryReconnectCount}]-[{e.Message}]");
- this.LogError(e);
- }
- if (IsConnected)
- {
- ReSubscribeTopic();
- this.LogInfo(TryReconnectCount > 0
- ? $"客户端{ClientId}已重新注册连接!重连次数:{TryReconnectCount}"
- : $"客户端{ClientId}已注册连接!");
- TryReconnectCount = 0;
- if (HeartbeatSwitch)
- {
- if (Timer != null)
- {
- Timer.Stop();
- Timer.Close();
- Timer = null;
- }
- Timer = new Timer(1000 * 60 * HeartbeatInterval) { AutoReset = true, Enabled = true };
- Timer.Elapsed += (o, e) =>
- {
- SendMsg("Heartbeat", $"HB-{new Random().Next(11111, 99999)}");
- if (HeartbeatTopic.Any())
- {
- foreach (var topic in HeartbeatTopic)
- {
- SendMessage($"{topic}/Heartbeat", $"[{ClientId}]:HB-{new Random().Next(11111, 99999)}");
- }
- }
- };
- }
- }
- else
- {
- if (TryReconnectCount <= 3)
- {
- Thread.Sleep(5000);
- TryContinueConnect();
- }
- else
- {
- try
- {
- Client?.Disconnect();
- }
- catch (Exception e)
- {
- this.LogError(e);
- }
-
- }
- }
- }
- public void ReSubscribeTopic(List<string> topicList = null)
- {
- topicList ??= TopicList;
- if (topicList.Any() && IsConnected)
- {
- foreach (var topic in TopicList)
- {
- Client?.Subscribe(new[] { topic }, QosLevels);
- }
- }
- }
- private void TryContinueConnect()
- {
- if (IsConnected) return;
- if (ToClose) return;
- if (Client == null)
- {
- CreateClient();
- Thread.Sleep(3000);
- }
- TryReconnectCount++;
- if (TryReconnectCount > 3)
- {
- ToClose = true;
- }
- Connect();
- }
- #region EVENT
- private void ClientClosed(object sender, EventArgs e)
- {
- this.LogInfo($"Connection Lost:{ClientId}");
- TryContinueConnect();
- }
- //private void MsgSubscribed(object sender, MqttMsgSubscribedEventArgs e)
- //{
- // this.LogInfo($"Subscribed:{e.MessageId}");
- //}
- private void MsgUnSubscribed(object sender, MqttMsgUnsubscribedEventArgs e)
- {
- this.LogInfo($"UnSubscribed:{e.MessageId}");
- }
- private void MsgPublished(object sender, MqttMsgPublishedEventArgs e)
- {
- this.LogInfo($"Published:{e.MessageId}-{e.IsPublished}");
- }
- // 处理接收到的消息
- private void MsgPublishReceived(object sender, MqttMsgPublishEventArgs e)
- {
- string msg = Encoding.UTF8.GetString(e.Message);
- this.LogInfo($"RECEIVED-MSG】: [{e.Topic}] [{(msg.Length > 500 ? msg.Substring(0, 500) : msg)}]");
- }
- #endregion
- }
- }
|