MqttNetMsgClient.cs 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. using Abp.Threading;
  2. using Abp.UI;
  3. using IwbZero.ToolCommon.LogHelpers;
  4. using IwbZero.ToolCommon.StringModel;
  5. using MQTTnet;
  6. using MQTTnet.Client;
  7. using MQTTnet.Client.Connecting;
  8. using MQTTnet.Client.Disconnecting;
  9. using MQTTnet.Client.Options;
  10. using MQTTnet.Client.Receiving;
  11. namespace WeMessageService
  12. {
  13. public class MqttNetMsgClient
  14. {
  15. public MqttNetMsgClient()
  16. {
  17. HasReconnectCount = 0;
  18. ConnectedCount = 0;
  19. }
  20. private IMqttClient Client { get; set; }
  21. private int ConnectedCount { get; set; }
  22. private readonly object _lock = new object();
  23. private int HasReconnectCount { get; set; }
  24. private string ServerIp { get; set; }
  25. private int ServerPort { get; set; }
  26. private string ClientId { get; set; }
  27. private string ServerUserId { get; set; }
  28. private string ServerPassword { get; set; }
  29. public bool IsConnected => Client?.IsConnected ?? false;
  30. public IMqttApplicationMessageReceivedHandler ReceivedHandler { get; set; }
  31. public bool NotValidate => (ServerIp.IsEmpty() || ServerPort == 0 || ServerIp.IsEmpty() ||
  32. ClientId.IsEmpty() || ServerUserId.IsEmpty() || ServerPassword.IsEmpty());
  33. /// <summary>
  34. /// 配置参数
  35. /// </summary>
  36. /// <param name="serverIp"></param>
  37. /// <param name="serverPort"></param>
  38. /// <param name="clientId"></param>
  39. /// <param name="serverUserId"></param>
  40. /// <param name="serverPassword"></param>
  41. /// <param name="receivedHandler"></param>
  42. public void SetOptions(string serverIp, int serverPort, string clientId, string serverUserId, string serverPassword, IMqttApplicationMessageReceivedHandler receivedHandler)
  43. {
  44. ServerIp = serverIp;
  45. ServerPort = serverPort;
  46. ClientId = clientId;
  47. ServerUserId = serverUserId;
  48. ServerPassword = serverPassword;
  49. ReceivedHandler = receivedHandler;
  50. }
  51. /// <summary>
  52. /// 订阅主题
  53. /// </summary>
  54. /// <param name="topic"></param>
  55. public void SubscribeTopic(string topic)
  56. {
  57. var flag = CheckClient();
  58. if (flag)
  59. {
  60. topic = $"{ClientId}/{topic}";
  61. AsyncHelper.RunSync(() => Client.SubscribeAsync(topic));
  62. }
  63. }
  64. /// <summary>
  65. /// 发送消息
  66. /// </summary>
  67. /// <param name="topic"></param>
  68. /// <param name="msg"></param>
  69. /// <param name="clientId"></param>
  70. public void SendMsg(string topic, string msg, string clientId = null)
  71. {
  72. clientId = clientId ?? ClientId;
  73. var flag = CheckClient();
  74. if (flag)
  75. {
  76. var message = new MqttApplicationMessageBuilder()
  77. .WithTopic($"{clientId}/{topic}")
  78. .WithPayload(msg)
  79. .Build();
  80. AsyncHelper.RunSync(() => Client.PublishAsync(message));
  81. this.LogInfo($"【SEND-MSG:[{topic}][{msg}]");
  82. }
  83. }
  84. /// <summary>
  85. /// 发送消息
  86. /// </summary>
  87. /// <param name="topic"></param>
  88. /// <param name="msg"></param>
  89. public void SendMessage(string topic, string msg)
  90. {
  91. var flag = CheckClient();
  92. if (flag)
  93. {
  94. var message = new MqttApplicationMessageBuilder()
  95. .WithTopic($"{topic}")
  96. .WithPayload(msg)
  97. .Build();
  98. AsyncHelper.RunSync(() => Client.PublishAsync(message));
  99. this.LogInfo($"【SEND-MSG:[{topic}][{msg}]");
  100. }
  101. }
  102. /// <summary>
  103. /// 检查客户端是否连接
  104. /// </summary>
  105. /// <returns></returns>
  106. private bool CheckClient()
  107. {
  108. if (NotValidate)
  109. {
  110. throw new UserFriendlyException("客户端配置参数不合法,请检查后再试!");
  111. }
  112. if (Client == null)
  113. {
  114. Client = CreateClient();
  115. ClientConnect();
  116. }
  117. else if (!Client.IsConnected)
  118. {
  119. AsyncHelper.RunSync(Client.ReconnectAsync);
  120. }
  121. if (!Client.IsConnected)
  122. {
  123. throw new UserFriendlyException("消息客户端未连接,请检查!");
  124. }
  125. return Client.IsConnected;
  126. }
  127. /// <summary>
  128. /// 创建客户端
  129. /// </summary>
  130. /// <returns></returns>
  131. private IMqttClient CreateClient()
  132. {
  133. var client = new MqttFactory().CreateMqttClient();
  134. client.ConnectedHandler = new MqttClientConnectedHandlerDelegate(arg =>
  135. {
  136. HasReconnectCount = 0;
  137. ChangeCount(1);
  138. this.LogInfo($"Client Connected, Count:{ConnectedCount}");
  139. });
  140. client.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(async arg =>
  141. {
  142. ChangeCount(-1);
  143. if (arg.Exception?.GetType() == typeof(MQTTnet.Exceptions.MqttCommunicationTimedOutException))
  144. {
  145. this.LogInfo("TTT");
  146. }
  147. HasReconnectCount++;
  148. if (HasReconnectCount < 5)
  149. {
  150. if (Client == null)
  151. {
  152. Client = CreateClient();
  153. ClientConnect();
  154. }
  155. else
  156. {
  157. await Client.ReconnectAsync();
  158. }
  159. return;
  160. }
  161. string strMsg = $"Client Disconnected, Count:{ConnectedCount}\r\n ";
  162. if (arg.Exception != null)
  163. strMsg = strMsg + $"exception:{arg.Exception.Message}\r\n";
  164. if (arg.AuthenticateResult != null)
  165. strMsg = strMsg + $"authReason:{arg.AuthenticateResult.ReasonString}\r\n";
  166. this.LogInfo(strMsg);
  167. });
  168. client.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(ReceivedHandlerBase);
  169. return client;
  170. }
  171. /// <summary>
  172. /// 连接服务器
  173. /// </summary>
  174. public void ClientConnect()
  175. {
  176. var options = CreateClientOptions();
  177. AsyncHelper.RunSync(() => Client.ConnectAsync(options));
  178. if (ClientId.IsNotEmpty())
  179. {
  180. AsyncHelper.RunSync(() => Client.SubscribeAsync(ClientId));
  181. }
  182. }
  183. /// <summary>
  184. /// 客户端配置
  185. /// </summary>
  186. /// <returns></returns>
  187. private MqttClientOptions CreateClientOptions()
  188. {
  189. if (NotValidate)
  190. {
  191. throw new UserFriendlyException("客户端配置参数不合法,请检查后再试!");
  192. }
  193. var options = new MqttClientOptions
  194. {
  195. ClientId = ClientId,
  196. ChannelOptions = new MqttClientTcpOptions() { Server = ServerIp, Port = ServerPort },
  197. Credentials = new MqttClientCredentials()
  198. {
  199. Username = ServerUserId,
  200. Password = System.Text.Encoding.Default.GetBytes(ServerPassword)
  201. }
  202. };
  203. return options;
  204. }
  205. /// <summary>
  206. ///连接数变化
  207. /// </summary>
  208. /// <param name="chg"></param>
  209. private void ChangeCount(int chg)
  210. {
  211. lock (_lock)
  212. {
  213. ConnectedCount += chg;
  214. }
  215. }
  216. /// <summary>
  217. /// 接受消息后处理
  218. /// </summary>
  219. /// <param name="arg"></param>
  220. private void ReceivedHandlerBase(MqttApplicationMessageReceivedEventArgs arg)
  221. {
  222. string payload = System.Text.Encoding.UTF8.GetString(arg.ApplicationMessage.Payload);
  223. this.LogInfo($"RECEIVED-MSG】, topic:[{arg.ApplicationMessage.Topic}], payload: [{payload}], connectedCount:{ConnectedCount}");
  224. ReceivedHandler?.HandleApplicationMessageReceivedAsync(arg);
  225. }
  226. }
  227. }