M2MqttMsgClient.cs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Text;
  5. using System.Threading;
  6. using IwbZero.ToolCommon.LogHelpers;
  7. using IwbZero.ToolCommon.StringModel;
  8. using uPLibrary.Networking.M2Mqtt;
  9. using uPLibrary.Networking.M2Mqtt.Exceptions;
  10. using uPLibrary.Networking.M2Mqtt.Messages;
  11. using Timer = System.Timers.Timer;
  12. //using uPLibrary.Networking.M2Mqtt;
  13. //using uPLibrary.Networking.M2Mqtt.Messages;
  14. namespace WeMessageService
  15. {
  16. public class M2MqttMsgClient
  17. {
  18. public M2MqttMsgClient()
  19. {
  20. TryReconnectCount = 0;
  21. ToClose = false;
  22. TopicList = new List<string>();
  23. HeartbeatTopic = new List<string>();
  24. HeartbeatSwitch = false;
  25. }
  26. private MqttClient Client { get; set; }
  27. private string ServerIp { get; set; }
  28. private int ServerPort { get; set; }
  29. private string ClientId { get; set; }
  30. private string ServerUserId { get; set; }
  31. private string ServerPassword { get; set; }
  32. private List<string> TopicList { get; }
  33. private byte[] QosLevels { get; set; } = { MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE };
  34. private int TryReconnectCount { get; set; }
  35. private bool ToClose { get; set; }
  36. private bool HeartbeatSwitch { get; set; }
  37. private Timer Timer { get; set; }
  38. public MqttClient.MqttMsgPublishEventHandler MsgReceivedHandler { get; set; }
  39. public bool NotValidate => (ServerIp.IsEmpty() || ServerPort == 0 || ServerIp.IsEmpty() ||
  40. ClientId.IsEmpty() || ServerUserId.IsEmpty() || ServerPassword.IsEmpty());
  41. public List<string> HeartbeatTopic { get; set; }
  42. public bool IsConnected => Client?.IsConnected ?? false;
  43. /// <summary>
  44. /// 配置参数
  45. /// </summary>
  46. /// <param name="serverIp"></param>
  47. /// <param name="serverPort"></param>
  48. /// <param name="clientId"></param>
  49. /// <param name="serverUserId"></param>
  50. /// <param name="serverPassword"></param>
  51. public void SetOptions(string serverIp, int serverPort, string clientId, string serverUserId, string serverPassword)
  52. {
  53. ServerIp = serverIp;
  54. ServerPort = serverPort;
  55. ClientId = clientId;
  56. ServerUserId = serverUserId;
  57. ServerPassword = serverPassword;
  58. ToClose = false;
  59. TryReconnectCount = 0;
  60. }
  61. /// <summary>
  62. /// 配置消息接受回调事件
  63. /// </summary>
  64. /// <param name="handler"></param>
  65. public void SetReceivedHandler(MqttClient.MqttMsgPublishEventHandler handler)
  66. {
  67. MsgReceivedHandler = handler;
  68. }
  69. /// <summary>
  70. /// 配置消息质量
  71. /// </summary>
  72. /// <param name="qosLevels"></param>
  73. public void SetQosLevels(byte[] qosLevels)
  74. {
  75. QosLevels = qosLevels;
  76. }
  77. /// <summary>
  78. /// 订阅主题
  79. /// </summary>
  80. /// <param name="topic"></param>
  81. /// <param name="withClient"></param>
  82. public void SubscribeTopic(string topic,bool withClient=true)
  83. {
  84. if (CheckClient())
  85. {
  86. topic = withClient ? $"{ClientId}/{topic}" : $"{topic}";
  87. if (!TopicList.Contains(topic))
  88. {
  89. TopicList.Add(topic);
  90. }
  91. Client.Subscribe(new[] { topic }, QosLevels);
  92. this.LogInfo("SUB-TOPIC:" + topic);
  93. }
  94. }
  95. /// <summary>
  96. /// 发送消息
  97. /// </summary>
  98. /// <param name="topic"></param>
  99. /// <param name="msg"></param>
  100. /// <param name="clientId"></param>
  101. public void SendMsg(string topic, string msg, string clientId = null)
  102. {
  103. if (CheckClient())
  104. {
  105. var msgContent = Encoding.UTF8.GetBytes(msg);
  106. clientId = clientId ?? ClientId;
  107. var targetClientIds = clientId.Split('@');
  108. foreach (var targetClientId in targetClientIds)
  109. {
  110. var topicTemp = $"{targetClientId}/{topic}";
  111. Client.Publish(topicTemp, msgContent);
  112. }
  113. this.LogInfo($"【SEND-MSG:[{clientId}] [{topic}][{(msg.Length > 100 ? msg.Substring(0, 100) : msg)}]");
  114. }
  115. }
  116. /// <summary>
  117. /// 发送消息
  118. /// </summary>
  119. /// <param name="topic"></param>
  120. /// <param name="msg"></param>
  121. public void SendMessage(string topic, string msg)
  122. {
  123. if (CheckClient())
  124. {
  125. Client.Publish(topic, Encoding.UTF8.GetBytes(msg));
  126. this.LogInfo($"【SEND-MSG:[{topic}][{(msg.Length>100? msg.Substring(0,100):msg)}]");
  127. }
  128. }
  129. /// <summary>
  130. /// 断开连接
  131. /// </summary>
  132. public void DisConnect()
  133. {
  134. if (IsConnected)
  135. {
  136. ToClose = true;
  137. try
  138. {
  139. Client.Disconnect();
  140. }
  141. catch (MqttClientException e)
  142. {
  143. this.LogError($"Connect Exception:[{TryReconnectCount}]-[{e.Message}]");
  144. }
  145. catch (MqttCommunicationException e)
  146. {
  147. this.LogError($"Connect Exception:[{TryReconnectCount}]-[{e.Message}]");
  148. }
  149. catch (MqttConnectionException e)
  150. {
  151. this.LogError($"Connect Exception:[{TryReconnectCount}]-[{e.Message}]");
  152. }
  153. catch (MqttTimeoutException e)
  154. {
  155. this.LogError($"Connect Exception:[{TryReconnectCount}]-[{e.Message}]");
  156. }
  157. catch (Exception e)
  158. {
  159. this.LogError($"Connect Exception:[{TryReconnectCount}]-[{e.Message}]");
  160. }
  161. }
  162. }
  163. /// <summary>
  164. /// 检查客户端是否连接
  165. /// </summary>
  166. /// <returns></returns>
  167. public bool CheckClient()
  168. {
  169. if (IsConnected)
  170. return true;
  171. if (NotValidate)
  172. {
  173. //throw new UserFriendlyException("客户端配置参数不合法,请检查后再试!");
  174. this.LogError($"客户端{ClientId}配置参数[{ServerIp}:{ServerPort}/{ServerUserId}/{ServerPassword}]不合法,请检查后再试!");
  175. return false;
  176. }
  177. if (Client == null)
  178. {
  179. Client = CreateClient();
  180. Connect();
  181. }
  182. else if (!IsConnected)
  183. {
  184. Connect();
  185. }
  186. if (!IsConnected)
  187. {
  188. //throw new UserFriendlyException("消息客户端未连接,请检查!");
  189. this.LogError("消息客户端未连接,请检查!");
  190. return false;
  191. }
  192. return true;
  193. }
  194. /// <summary>
  195. /// 创建客户端
  196. /// </summary>
  197. /// <returns></returns>
  198. private MqttClient CreateClient()
  199. {
  200. if (NotValidate)
  201. {
  202. //throw new UserFriendlyException("客户端配置参数不合法,请检查后再试!");
  203. this.LogError($"客户端{ClientId}配置参数[{ServerIp}:{ServerPort}/{ServerUserId}/{ServerPassword}]不合法,请检查后再试!");
  204. return null;
  205. }
  206. try
  207. {
  208. ToClose = false;
  209. TryReconnectCount = 0;
  210. var client = new MqttClient(ServerIp, ServerPort, false, null, null, MqttSslProtocols.None);
  211. client.ConnectionClosed += ClientClosed;
  212. //client.MqttMsgSubscribed += MsgSubscribed;
  213. client.MqttMsgUnsubscribed += MsgUnSubscribed;
  214. client.MqttMsgPublished += MsgPublished;
  215. client.MqttMsgPublishReceived += MsgPublishReceived;
  216. if (MsgReceivedHandler != null)
  217. {
  218. client.MqttMsgPublishReceived += MsgReceivedHandler;
  219. }
  220. return client;
  221. }
  222. catch(MqttClientException e)
  223. {
  224. this.LogError("客户端创建失败");
  225. this.LogError(e);
  226. //throw new UserFriendlyException("客户端创建失败,请检查后再试!");
  227. }
  228. catch(Exception e)
  229. {
  230. this.LogError("客户端创建失败");
  231. this.LogError(e);
  232. //throw new UserFriendlyException("客户端创建失败,请检查后再试!");
  233. }
  234. return null;
  235. }
  236. private void Connect()
  237. {
  238. if (NotValidate)
  239. {
  240. //throw new UserFriendlyException("客户端配置参数不合法,请检查后再试!");
  241. this.LogError($"客户端{ClientId}配置参数[{ServerIp}:{ServerPort}/{ServerUserId}/{ServerPassword}]不合法,请检查后再试!");
  242. return;
  243. }
  244. if (ToClose)
  245. return;
  246. try
  247. {
  248. Client?.Connect(ClientId, ServerUserId, ServerPassword);
  249. }
  250. catch (MqttClientException e)
  251. {
  252. this.LogError($"Connect Exception:[{TryReconnectCount}]-[{e.Message}]");
  253. }
  254. catch (MqttCommunicationException e)
  255. {
  256. this.LogError($"Connect Exception:[{TryReconnectCount}]-[{e.Message}]");
  257. }
  258. catch (MqttConnectionException e)
  259. {
  260. this.LogError($"Connect Exception:[{TryReconnectCount}]-[{e.Message}]");
  261. }
  262. catch (MqttTimeoutException e)
  263. {
  264. this.LogError($"Connect Exception:[{TryReconnectCount}]-[{e.Message}]");
  265. }
  266. catch (Exception e)
  267. {
  268. this.LogError($"Connect Exception:[{TryReconnectCount}]-[{e.Message}]");
  269. }
  270. if (IsConnected)
  271. {
  272. ReSubscribeTopic();
  273. this.LogInfo(TryReconnectCount > 0
  274. ? $"客户端{ClientId}已重新注册连接!重连次数:{TryReconnectCount}"
  275. : $"客户端{ClientId}已注册连接!");
  276. TryReconnectCount = 0;
  277. if (HeartbeatSwitch)
  278. {
  279. if (Timer != null)
  280. {
  281. Timer.Stop();
  282. Timer.Close();
  283. Timer = null;
  284. }
  285. Timer = new Timer(1000 * 60 * 5) { AutoReset = true, Enabled = true };
  286. Timer.Elapsed += (o, e) =>
  287. {
  288. SendMsg("Heartbeat", $"HB-{new Random().Next(11111, 99999)}");
  289. if (HeartbeatTopic.Any())
  290. {
  291. foreach (var topic in HeartbeatTopic)
  292. {
  293. SendMessage($"{topic}/Heartbeat", $"[{ClientId}]:HB-{new Random().Next(11111, 99999)}");
  294. }
  295. }
  296. };
  297. }
  298. }
  299. else
  300. {
  301. if (TryReconnectCount <= 10)
  302. {
  303. Thread.Sleep(5000);
  304. TryContinueConnect();
  305. }
  306. else
  307. {
  308. try
  309. {
  310. Client?.Disconnect();
  311. }
  312. catch (Exception e)
  313. {
  314. this.LogError(e);
  315. }
  316. }
  317. }
  318. }
  319. public void ReSubscribeTopic(List<string> topicList = null)
  320. {
  321. topicList = topicList ?? TopicList;
  322. if (topicList.Any() && IsConnected)
  323. {
  324. foreach (var topic in TopicList)
  325. {
  326. Client?.Subscribe(new[] { topic }, QosLevels);
  327. }
  328. }
  329. }
  330. private void TryContinueConnect()
  331. {
  332. if (IsConnected) return;
  333. if (ToClose) return;
  334. if (Client == null)
  335. {
  336. CreateClient();
  337. Thread.Sleep(3000);
  338. }
  339. TryReconnectCount++;
  340. if (TryReconnectCount > 10)
  341. {
  342. ToClose = true;
  343. }
  344. Connect();
  345. }
  346. #region EVENT
  347. private void ClientClosed(object sender, EventArgs e)
  348. {
  349. this.LogInfo($"Connection Lost:{ClientId}");
  350. TryContinueConnect();
  351. }
  352. //private void MsgSubscribed(object sender, MqttMsgSubscribedEventArgs e)
  353. //{
  354. // this.LogInfo($"Subscribed:{e.MessageId}");
  355. //}
  356. private void MsgUnSubscribed(object sender, MqttMsgUnsubscribedEventArgs e)
  357. {
  358. this.LogInfo($"UnSubscribed:{e.MessageId}");
  359. }
  360. private void MsgPublished(object sender, MqttMsgPublishedEventArgs e)
  361. {
  362. this.LogInfo($"Published:{e.MessageId}-{e.IsPublished}");
  363. }
  364. // 处理接收到的消息
  365. private void MsgPublishReceived(object sender, MqttMsgPublishEventArgs e)
  366. {
  367. string msg = Encoding.UTF8.GetString(e.Message);
  368. this.LogInfo($"RECEIVED-MSG】: [{e.Topic}] [{(msg.Length > 500 ? msg.Substring(0, 500) : msg)}]");
  369. }
  370. #endregion
  371. }
  372. }