MessageM2MqttClient.cs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Net;
  5. using System.Text;
  6. using System.Threading;
  7. using Assets.Plugins.CommonTool;
  8. using Assets.Plugins.Log;
  9. using uPLibrary.Networking.M2Mqtt;
  10. using uPLibrary.Networking.M2Mqtt.Exceptions;
  11. using uPLibrary.Networking.M2Mqtt.Messages;
  12. using Timer = System.Timers.Timer;
  13. //using uPLibrary.Networking.M2Mqtt;
  14. //using uPLibrary.Networking.M2Mqtt.Messages;
  15. namespace Assets.Plugins.WeMessageService
  16. {
  17. public class MessageM2MqttClient
  18. {
  19. public MessageM2MqttClient()
  20. {
  21. TryReconnectCount = 0;
  22. ToClose = false;
  23. TopicList = new List<string>();
  24. HeartbeatTopic = new List<string>();
  25. HeartbeatSwitch = false;
  26. HeartbeatInterval = 5;
  27. }
  28. public MessageM2MqttClient(bool heartbeatSwitch,double heartbeatInterval=5):this()
  29. {
  30. HeartbeatSwitch = heartbeatSwitch;
  31. HeartbeatInterval = heartbeatInterval;
  32. }
  33. private MqttClient Client { get; set; }
  34. private string ServerIp { get; set; }
  35. private int ServerPort { get; set; }
  36. private string ClientId { get; set; }
  37. private string ServerUserId { get; set; }
  38. private string ServerPassword { get; set; }
  39. private List<string> TopicList { get; }
  40. private byte[] QosLevels { get; set; } = { MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE };
  41. private int TryReconnectCount { get; set; }
  42. private bool ToClose { get; set; }
  43. private bool HeartbeatSwitch { get; set; }
  44. public double HeartbeatInterval { get; set; }
  45. private Timer Timer { get; set; }
  46. public MqttClient.MqttMsgPublishEventHandler MsgReceivedHandler { get; set; }
  47. public bool NotValidate => (ServerIp.IsEmpty() || ServerPort == 0 || ServerIp.IsEmpty() ||
  48. ClientId.IsEmpty() || ServerUserId.IsEmpty() || ServerPassword.IsEmpty());
  49. public List<string> HeartbeatTopic { get; set; }
  50. public bool IsConnected => Client?.IsConnected ?? false;
  51. /// <summary>
  52. /// 配置参数
  53. /// </summary>
  54. /// <param name="serverIp"></param>
  55. /// <param name="serverPort"></param>
  56. /// <param name="clientId"></param>
  57. /// <param name="serverUserId"></param>
  58. /// <param name="serverPassword"></param>
  59. public void SetOptions(string serverIp, int serverPort, string clientId, string serverUserId, string serverPassword)
  60. {
  61. ServerIp = serverIp;
  62. ServerPort = serverPort;
  63. ClientId = clientId;
  64. ServerUserId = serverUserId;
  65. ServerPassword = serverPassword;
  66. ToClose = false;
  67. TryReconnectCount = 0;
  68. }
  69. /// <summary>
  70. /// 配置消息接受回调事件
  71. /// </summary>
  72. /// <param name="handler"></param>
  73. public void SetReceivedHandler(MqttClient.MqttMsgPublishEventHandler handler)
  74. {
  75. MsgReceivedHandler = handler;
  76. }
  77. /// <summary>
  78. /// 配置消息质量
  79. /// </summary>
  80. /// <param name="qosLevels"></param>
  81. public void SetQosLevels(byte[] qosLevels)
  82. {
  83. QosLevels = qosLevels;
  84. }
  85. /// <summary>
  86. /// 订阅主题
  87. /// </summary>
  88. /// <param name="topic"></param>
  89. /// <param name="withClient"></param>
  90. public void SubscribeTopic(string topic,bool withClient=true)
  91. {
  92. if (CheckClient())
  93. {
  94. topic = withClient ? $"{ClientId}/{topic}" : $"{topic}";
  95. if (!TopicList.Contains(topic))
  96. {
  97. TopicList.Add(topic);
  98. }
  99. Client.Subscribe(new[] { topic }, QosLevels);
  100. this.LogInfo("SUB-TOPIC:" + topic);
  101. }
  102. }
  103. /// <summary>
  104. /// 发送消息
  105. /// </summary>
  106. /// <param name="topic"></param>
  107. /// <param name="msg"></param>
  108. /// <param name="clientId"></param>
  109. public void SendMsg(string topic, string msg, string clientId = null)
  110. {
  111. if (CheckClient())
  112. {
  113. var msgContent = Encoding.UTF8.GetBytes(msg);
  114. clientId ??= ClientId;
  115. var targetClientIds = clientId.Split('@');
  116. foreach (var targetClientId in targetClientIds)
  117. {
  118. topic = $"{targetClientId}/{topic}";
  119. Client.Publish(topic, msgContent);
  120. }
  121. this.LogInfo($"【SEND-MSG:[{clientId}] [{topic}][{(msg.Length > 100 ? msg.Substring(0, 100) : msg)}]");
  122. }
  123. }
  124. /// <summary>
  125. /// 发送消息
  126. /// </summary>
  127. /// <param name="topic"></param>
  128. /// <param name="msg"></param>
  129. public void SendMessage(string topic, string msg)
  130. {
  131. if (CheckClient())
  132. {
  133. Client.Publish(topic, Encoding.UTF8.GetBytes(msg));
  134. this.LogInfo($"【SEND-MSG:[{topic}][{(msg.Length>100? msg.Substring(0,100):msg)}]");
  135. }
  136. }
  137. /// <summary>
  138. /// 断开连接
  139. /// </summary>
  140. public void DisConnect()
  141. {
  142. if (IsConnected)
  143. {
  144. ToClose = true;
  145. try
  146. {
  147. Client.Disconnect();
  148. }
  149. catch (MqttClientException e)
  150. {
  151. this.LogError($"Connect Exception:[{TryReconnectCount}]-[{e.Message}]");
  152. }
  153. catch (MqttCommunicationException e)
  154. {
  155. this.LogError($"Connect Exception:[{TryReconnectCount}]-[{e.Message}]");
  156. }
  157. catch (MqttConnectionException e)
  158. {
  159. this.LogError($"Connect Exception:[{TryReconnectCount}]-[{e.Message}]");
  160. }
  161. catch (MqttTimeoutException e)
  162. {
  163. this.LogError($"Connect Exception:[{TryReconnectCount}]-[{e.Message}]");
  164. }
  165. catch (Exception e)
  166. {
  167. this.LogError($"Connect Exception:[{TryReconnectCount}]-[{e.Message}]");
  168. }
  169. }
  170. }
  171. /// <summary>
  172. /// 检查客户端是否连接
  173. /// </summary>
  174. /// <returns></returns>
  175. public bool CheckClient()
  176. {
  177. if (IsConnected)
  178. return true;
  179. if (NotValidate)
  180. {
  181. //throw new UserFriendlyException("客户端配置参数不合法,请检查后再试!");
  182. this.LogError($"客户端{ClientId}配置参数[{ServerIp}:{ServerPort}/{ServerUserId}/{ServerPassword}]不合法,请检查后再试!");
  183. return false;
  184. }
  185. if (Client == null)
  186. {
  187. Client = CreateClient();
  188. Connect();
  189. }
  190. else if (!IsConnected)
  191. {
  192. Connect();
  193. }
  194. if (!IsConnected)
  195. {
  196. //throw new UserFriendlyException("消息客户端未连接,请检查!");
  197. this.LogError("消息客户端未连接,请检查!");
  198. return false;
  199. }
  200. return true;
  201. }
  202. /// <summary>
  203. /// 创建客户端
  204. /// </summary>
  205. /// <returns></returns>
  206. private MqttClient CreateClient()
  207. {
  208. if (NotValidate)
  209. {
  210. //throw new UserFriendlyException("客户端配置参数不合法,请检查后再试!");
  211. this.LogError($"客户端{ClientId}配置参数[{ServerIp}:{ServerPort}/{ServerUserId}/{ServerPassword}]不合法,请检查后再试!");
  212. return null;
  213. }
  214. try
  215. {
  216. ToClose = false;
  217. var client = new MqttClient(ServerIp, ServerPort, false, null, null, MqttSslProtocols.None);
  218. client.ConnectionClosed += ClientClosed;
  219. //client.MqttMsgSubscribed += MsgSubscribed;
  220. client.MqttMsgUnsubscribed += MsgUnSubscribed;
  221. client.MqttMsgPublished += MsgPublished;
  222. client.MqttMsgPublishReceived += MsgPublishReceived;
  223. if (MsgReceivedHandler != null)
  224. {
  225. client.MqttMsgPublishReceived += MsgReceivedHandler;
  226. }
  227. return client;
  228. }
  229. catch(MqttClientException e)
  230. {
  231. this.LogError("客户端创建失败");
  232. this.LogError(e);
  233. //throw new UserFriendlyException("客户端创建失败,请检查后再试!");
  234. }
  235. catch(Exception e)
  236. {
  237. this.LogError("客户端创建失败");
  238. this.LogError(e);
  239. //throw new UserFriendlyException("客户端创建失败,请检查后再试!");
  240. }
  241. return null;
  242. }
  243. private void Connect()
  244. {
  245. if (NotValidate)
  246. {
  247. //throw new UserFriendlyException("客户端配置参数不合法,请检查后再试!");
  248. this.LogError($"客户端{ClientId}配置参数[{ServerIp}:{ServerPort}/{ServerUserId}/{ServerPassword}]不合法,请检查后再试!");
  249. return;
  250. }
  251. if (ToClose)
  252. return;
  253. try
  254. {
  255. Client?.Connect(ClientId, ServerUserId, ServerPassword);
  256. }
  257. catch (MqttClientException e)
  258. {
  259. this.LogError($"Connect Exception:[{TryReconnectCount}]-[{e.Message}]");
  260. this.LogError(e);
  261. }
  262. catch (MqttCommunicationException e)
  263. {
  264. this.LogError($"Connect Exception:[{TryReconnectCount}]-[{e.Message}]");
  265. this.LogError(e);
  266. }
  267. catch (MqttConnectionException e)
  268. {
  269. this.LogError($"Connect Exception:[{TryReconnectCount}]-[{e.Message}]");
  270. this.LogError(e);
  271. }
  272. catch (MqttTimeoutException e)
  273. {
  274. this.LogError($"Connect Exception:[{TryReconnectCount}]-[{e.Message}]");
  275. this.LogError(e);
  276. }
  277. catch (Exception e)
  278. {
  279. this.LogError($"Connect Exception:[{TryReconnectCount}]-[{e.Message}]");
  280. this.LogError(e);
  281. }
  282. if (IsConnected)
  283. {
  284. ReSubscribeTopic();
  285. this.LogInfo(TryReconnectCount > 0
  286. ? $"客户端{ClientId}已重新注册连接!重连次数:{TryReconnectCount}"
  287. : $"客户端{ClientId}已注册连接!");
  288. TryReconnectCount = 0;
  289. if (HeartbeatSwitch)
  290. {
  291. if (Timer != null)
  292. {
  293. Timer.Stop();
  294. Timer.Close();
  295. Timer = null;
  296. }
  297. Timer = new Timer(1000 * 60 * HeartbeatInterval) { AutoReset = true, Enabled = true };
  298. Timer.Elapsed += (o, e) =>
  299. {
  300. SendMsg("Heartbeat", $"HB-{new Random().Next(11111, 99999)}");
  301. if (HeartbeatTopic.Any())
  302. {
  303. foreach (var topic in HeartbeatTopic)
  304. {
  305. SendMessage($"{topic}/Heartbeat", $"[{ClientId}]:HB-{new Random().Next(11111, 99999)}");
  306. }
  307. }
  308. };
  309. }
  310. }
  311. else
  312. {
  313. if (TryReconnectCount <= 3)
  314. {
  315. Thread.Sleep(5000);
  316. TryContinueConnect();
  317. }
  318. else
  319. {
  320. try
  321. {
  322. Client?.Disconnect();
  323. }
  324. catch (Exception e)
  325. {
  326. this.LogError(e);
  327. }
  328. }
  329. }
  330. }
  331. public void ReSubscribeTopic(List<string> topicList = null)
  332. {
  333. topicList ??= TopicList;
  334. if (topicList.Any() && IsConnected)
  335. {
  336. foreach (var topic in TopicList)
  337. {
  338. Client?.Subscribe(new[] { topic }, QosLevels);
  339. }
  340. }
  341. }
  342. private void TryContinueConnect()
  343. {
  344. if (IsConnected) return;
  345. if (ToClose) return;
  346. if (Client == null)
  347. {
  348. CreateClient();
  349. Thread.Sleep(3000);
  350. }
  351. TryReconnectCount++;
  352. if (TryReconnectCount > 3)
  353. {
  354. ToClose = true;
  355. }
  356. Connect();
  357. }
  358. #region EVENT
  359. private void ClientClosed(object sender, EventArgs e)
  360. {
  361. this.LogInfo($"Connection Lost:{ClientId}");
  362. TryContinueConnect();
  363. }
  364. //private void MsgSubscribed(object sender, MqttMsgSubscribedEventArgs e)
  365. //{
  366. // this.LogInfo($"Subscribed:{e.MessageId}");
  367. //}
  368. private void MsgUnSubscribed(object sender, MqttMsgUnsubscribedEventArgs e)
  369. {
  370. this.LogInfo($"UnSubscribed:{e.MessageId}");
  371. }
  372. private void MsgPublished(object sender, MqttMsgPublishedEventArgs e)
  373. {
  374. this.LogInfo($"Published:{e.MessageId}-{e.IsPublished}");
  375. }
  376. // 处理接收到的消息
  377. private void MsgPublishReceived(object sender, MqttMsgPublishEventArgs e)
  378. {
  379. string msg = Encoding.UTF8.GetString(e.Message);
  380. this.LogInfo($"RECEIVED-MSG】: [{e.Topic}] [{(msg.Length > 500 ? msg.Substring(0, 500) : msg)}]");
  381. }
  382. #endregion
  383. }
  384. }