using System; using System.Collections.Generic; using System.Linq; using System.Net; using System.Text; using System.Threading; using Assets.Plugins.CommonTool; using Assets.Plugins.Log; using uPLibrary.Networking.M2Mqtt; using uPLibrary.Networking.M2Mqtt.Exceptions; using uPLibrary.Networking.M2Mqtt.Messages; using Timer = System.Timers.Timer; //using uPLibrary.Networking.M2Mqtt; //using uPLibrary.Networking.M2Mqtt.Messages; namespace Assets.Plugins.WeMessageService { public class MessageM2MqttClient { public MessageM2MqttClient() { TryReconnectCount = 0; ToClose = false; TopicList = new List(); HeartbeatTopic = new List(); HeartbeatSwitch = false; HeartbeatInterval = 5; } public MessageM2MqttClient(bool heartbeatSwitch,double heartbeatInterval=5):this() { HeartbeatSwitch = heartbeatSwitch; HeartbeatInterval = heartbeatInterval; } private MqttClient Client { get; set; } private string ServerIp { get; set; } private int ServerPort { get; set; } private string ClientId { get; set; } private string ServerUserId { get; set; } private string ServerPassword { get; set; } private List TopicList { get; } private byte[] QosLevels { get; set; } = { MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE }; private int TryReconnectCount { get; set; } private bool ToClose { get; set; } private bool HeartbeatSwitch { get; set; } public double HeartbeatInterval { get; set; } private Timer Timer { get; set; } public MqttClient.MqttMsgPublishEventHandler MsgReceivedHandler { get; set; } public bool NotValidate => (ServerIp.IsEmpty() || ServerPort == 0 || ServerIp.IsEmpty() || ClientId.IsEmpty() || ServerUserId.IsEmpty() || ServerPassword.IsEmpty()); public List HeartbeatTopic { get; set; } public bool IsConnected => Client?.IsConnected ?? false; /// /// 配置参数 /// /// /// /// /// /// public void SetOptions(string serverIp, int serverPort, string clientId, string serverUserId, string serverPassword) { ServerIp = serverIp; ServerPort = serverPort; ClientId = clientId; ServerUserId = serverUserId; ServerPassword = serverPassword; ToClose = false; TryReconnectCount = 0; } /// /// 配置消息接受回调事件 /// /// public void SetReceivedHandler(MqttClient.MqttMsgPublishEventHandler handler) { MsgReceivedHandler = handler; } /// /// 配置消息质量 /// /// public void SetQosLevels(byte[] qosLevels) { QosLevels = qosLevels; } /// /// 订阅主题 /// /// /// public void SubscribeTopic(string topic,bool withClient=true) { if (CheckClient()) { topic = withClient ? $"{ClientId}/{topic}" : $"{topic}"; if (!TopicList.Contains(topic)) { TopicList.Add(topic); } Client.Subscribe(new[] { topic }, QosLevels); this.LogInfo("SUB-TOPIC:" + topic); } } /// /// 发送消息 /// /// /// /// public void SendMsg(string topic, string msg, string clientId = null) { if (CheckClient()) { var msgContent = Encoding.UTF8.GetBytes(msg); clientId ??= ClientId; var targetClientIds = clientId.Split('@'); foreach (var targetClientId in targetClientIds) { topic = $"{targetClientId}/{topic}"; Client.Publish(topic, msgContent); } this.LogInfo($"【SEND-MSG:[{clientId}] [{topic}][{(msg.Length > 100 ? msg.Substring(0, 100) : msg)}]"); } } /// /// 发送消息 /// /// /// public void SendMessage(string topic, string msg) { if (CheckClient()) { Client.Publish(topic, Encoding.UTF8.GetBytes(msg)); this.LogInfo($"【SEND-MSG:[{topic}][{(msg.Length>100? msg.Substring(0,100):msg)}]"); } } /// /// 断开连接 /// public void DisConnect() { if (IsConnected) { ToClose = true; try { Client.Disconnect(); } catch (MqttClientException e) { this.LogError($"Connect Exception:[{TryReconnectCount}]-[{e.Message}]"); } catch (MqttCommunicationException e) { this.LogError($"Connect Exception:[{TryReconnectCount}]-[{e.Message}]"); } catch (MqttConnectionException e) { this.LogError($"Connect Exception:[{TryReconnectCount}]-[{e.Message}]"); } catch (MqttTimeoutException e) { this.LogError($"Connect Exception:[{TryReconnectCount}]-[{e.Message}]"); } catch (Exception e) { this.LogError($"Connect Exception:[{TryReconnectCount}]-[{e.Message}]"); } } } /// /// 检查客户端是否连接 /// /// public bool CheckClient() { if (IsConnected) return true; if (NotValidate) { //throw new UserFriendlyException("客户端配置参数不合法,请检查后再试!"); this.LogError($"客户端{ClientId}配置参数[{ServerIp}:{ServerPort}/{ServerUserId}/{ServerPassword}]不合法,请检查后再试!"); return false; } if (Client == null) { Client = CreateClient(); Connect(); } else if (!IsConnected) { Connect(); } if (!IsConnected) { //throw new UserFriendlyException("消息客户端未连接,请检查!"); this.LogError("消息客户端未连接,请检查!"); return false; } return true; } /// /// 创建客户端 /// /// private MqttClient CreateClient() { if (NotValidate) { //throw new UserFriendlyException("客户端配置参数不合法,请检查后再试!"); this.LogError($"客户端{ClientId}配置参数[{ServerIp}:{ServerPort}/{ServerUserId}/{ServerPassword}]不合法,请检查后再试!"); return null; } try { ToClose = false; var client = new MqttClient(ServerIp, ServerPort, false, null, null, MqttSslProtocols.None); client.ConnectionClosed += ClientClosed; //client.MqttMsgSubscribed += MsgSubscribed; client.MqttMsgUnsubscribed += MsgUnSubscribed; client.MqttMsgPublished += MsgPublished; client.MqttMsgPublishReceived += MsgPublishReceived; if (MsgReceivedHandler != null) { client.MqttMsgPublishReceived += MsgReceivedHandler; } return client; } catch(MqttClientException e) { this.LogError("客户端创建失败"); this.LogError(e); //throw new UserFriendlyException("客户端创建失败,请检查后再试!"); } catch(Exception e) { this.LogError("客户端创建失败"); this.LogError(e); //throw new UserFriendlyException("客户端创建失败,请检查后再试!"); } return null; } private void Connect() { if (NotValidate) { //throw new UserFriendlyException("客户端配置参数不合法,请检查后再试!"); this.LogError($"客户端{ClientId}配置参数[{ServerIp}:{ServerPort}/{ServerUserId}/{ServerPassword}]不合法,请检查后再试!"); return; } if (ToClose) return; try { Client?.Connect(ClientId, ServerUserId, ServerPassword); } catch (MqttClientException e) { this.LogError($"Connect Exception:[{TryReconnectCount}]-[{e.Message}]"); this.LogError(e); } catch (MqttCommunicationException e) { this.LogError($"Connect Exception:[{TryReconnectCount}]-[{e.Message}]"); this.LogError(e); } catch (MqttConnectionException e) { this.LogError($"Connect Exception:[{TryReconnectCount}]-[{e.Message}]"); this.LogError(e); } catch (MqttTimeoutException e) { this.LogError($"Connect Exception:[{TryReconnectCount}]-[{e.Message}]"); this.LogError(e); } catch (Exception e) { this.LogError($"Connect Exception:[{TryReconnectCount}]-[{e.Message}]"); this.LogError(e); } if (IsConnected) { ReSubscribeTopic(); this.LogInfo(TryReconnectCount > 0 ? $"客户端{ClientId}已重新注册连接!重连次数:{TryReconnectCount}" : $"客户端{ClientId}已注册连接!"); TryReconnectCount = 0; if (HeartbeatSwitch) { if (Timer != null) { Timer.Stop(); Timer.Close(); Timer = null; } Timer = new Timer(1000 * 60 * HeartbeatInterval) { AutoReset = true, Enabled = true }; Timer.Elapsed += (o, e) => { SendMsg("Heartbeat", $"HB-{new Random().Next(11111, 99999)}"); if (HeartbeatTopic.Any()) { foreach (var topic in HeartbeatTopic) { SendMessage($"{topic}/Heartbeat", $"[{ClientId}]:HB-{new Random().Next(11111, 99999)}"); } } }; } } else { if (TryReconnectCount <= 3) { Thread.Sleep(5000); TryContinueConnect(); } else { try { Client?.Disconnect(); } catch (Exception e) { this.LogError(e); } } } } public void ReSubscribeTopic(List topicList = null) { topicList ??= TopicList; if (topicList.Any() && IsConnected) { foreach (var topic in TopicList) { Client?.Subscribe(new[] { topic }, QosLevels); } } } private void TryContinueConnect() { if (IsConnected) return; if (ToClose) return; if (Client == null) { CreateClient(); Thread.Sleep(3000); } TryReconnectCount++; if (TryReconnectCount > 3) { ToClose = true; } Connect(); } #region EVENT private void ClientClosed(object sender, EventArgs e) { this.LogInfo($"Connection Lost:{ClientId}"); TryContinueConnect(); } //private void MsgSubscribed(object sender, MqttMsgSubscribedEventArgs e) //{ // this.LogInfo($"Subscribed:{e.MessageId}"); //} private void MsgUnSubscribed(object sender, MqttMsgUnsubscribedEventArgs e) { this.LogInfo($"UnSubscribed:{e.MessageId}"); } private void MsgPublished(object sender, MqttMsgPublishedEventArgs e) { this.LogInfo($"Published:{e.MessageId}-{e.IsPublished}"); } // 处理接收到的消息 private void MsgPublishReceived(object sender, MqttMsgPublishEventArgs e) { string msg = Encoding.UTF8.GetString(e.Message); this.LogInfo($"RECEIVED-MSG】: [{e.Topic}] [{(msg.Length > 500 ? msg.Substring(0, 500) : msg)}]"); } #endregion } }