IwbMsgClient.cs 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. using System.Threading.Tasks;
  2. using Abp.Threading;
  3. using Abp.UI;
  4. using IwbZero.ToolCommon.LogHelpers;
  5. using IwbZero.ToolCommon.StringModel;
  6. using MQTTnet;
  7. using MQTTnet.Client;
  8. using MQTTnet.Client.Connecting;
  9. using MQTTnet.Client.Disconnecting;
  10. using MQTTnet.Client.Options;
  11. using MQTTnet.Client.Receiving;
  12. namespace IwbZero.MqttClient
  13. {
  14. public class IwbMsgClient : IIwbMsgClient
  15. {
  16. public IwbMsgClient()
  17. {
  18. HasReconnectCount = 0;
  19. ConnectedCount = 0;
  20. }
  21. private IMqttClient Client { get; set; }
  22. private int ConnectedCount { get; set; }
  23. private readonly object _lock = new object();
  24. private int HasReconnectCount { get; set; }
  25. public string ServerIp { get; set; }
  26. public int ServerPort { get; set; }
  27. public string ClientId { get; set; }
  28. public string ServerUserId { get; set; }
  29. public string ServerPassword { get; set; }
  30. public IMqttApplicationMessageReceivedHandler ReceivedHandler { get; set; }
  31. public bool NotValidate => (ServerIp.IsEmpty() || ServerPort == 0 || ServerIp.IsEmpty() ||
  32. ClientId.IsEmpty() || ServerUserId.IsEmpty() || ServerPassword.IsEmpty());
  33. /// <summary>
  34. /// 配置参数
  35. /// </summary>
  36. /// <param name="serverIp"></param>
  37. /// <param name="serverPort"></param>
  38. /// <param name="clientId"></param>
  39. /// <param name="serverUserId"></param>
  40. /// <param name="serverPassword"></param>
  41. /// <param name="receivedHandler"></param>
  42. public void SetOptions(string serverIp, int serverPort, string clientId, string serverUserId, string serverPassword, IMqttApplicationMessageReceivedHandler receivedHandler)
  43. {
  44. ServerIp = serverIp;
  45. ServerPort = serverPort;
  46. ClientId = clientId;
  47. ServerUserId = serverUserId;
  48. ServerPassword = serverPassword;
  49. ReceivedHandler = receivedHandler;
  50. }
  51. /// <summary>
  52. /// 订阅主题
  53. /// </summary>
  54. /// <param name="topic"></param>
  55. public async Task SubscribeTopic(string topic)
  56. {
  57. var flag = await CheckClient();
  58. if (flag)
  59. {
  60. topic = $"{ClientId}/{topic}";
  61. await Client.SubscribeAsync(topic);
  62. }
  63. }
  64. /// <summary>
  65. /// 发送消息
  66. /// </summary>
  67. /// <param name="topic"></param>
  68. /// <param name="msg"></param>
  69. /// <returns></returns>
  70. public async Task SendMsg(string topic, string msg)
  71. {
  72. await SendMsg(topic, msg, null);
  73. }
  74. /// <summary>
  75. /// 发送消息
  76. /// </summary>
  77. /// <param name="topic"></param>
  78. /// <param name="msg"></param>
  79. /// <param name="clientId"></param>
  80. public async Task SendMsg(string topic, string msg, string clientId)
  81. {
  82. clientId = clientId ?? ClientId;
  83. var flag = await CheckClient();
  84. if (flag)
  85. {
  86. var message = new MqttApplicationMessageBuilder()
  87. .WithTopic($"{clientId}/{topic}")
  88. .WithPayload(msg)
  89. .Build();
  90. await Client.PublishAsync(message);
  91. this.LogInfo($"【SEND-MSG:[{topic}][{msg}]");
  92. }
  93. }
  94. /// <summary>
  95. /// 检查客户端是否连接
  96. /// </summary>
  97. /// <returns></returns>
  98. private async Task<bool> CheckClient()
  99. {
  100. if (NotValidate)
  101. {
  102. throw new UserFriendlyException("客户端配置参数不合法,请检查后再试!");
  103. }
  104. if (Client == null)
  105. {
  106. Client = CreateClient();
  107. ClientConnect();
  108. }
  109. else if (!Client.IsConnected)
  110. {
  111. await Client.ReconnectAsync();
  112. }
  113. if(!Client.IsConnected)
  114. {
  115. throw new UserFriendlyException("消息客户端未连接,请检查!");
  116. }
  117. return Client.IsConnected;
  118. }
  119. /// <summary>
  120. /// 创建客户端
  121. /// </summary>
  122. /// <returns></returns>
  123. private IMqttClient CreateClient()
  124. {
  125. var client = new MqttFactory().CreateMqttClient();
  126. client.ConnectedHandler = new MqttClientConnectedHandlerDelegate(arg =>
  127. {
  128. HasReconnectCount = 0;
  129. ChangeCount(1);
  130. this.LogInfo($"Client Connected, Count:{ConnectedCount}");
  131. });
  132. client.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate( async arg =>
  133. {
  134. ChangeCount(-1);
  135. if (arg.Exception?.GetType()== typeof(MQTTnet.Exceptions.MqttCommunicationTimedOutException))
  136. {
  137. this.LogInfo("TTT");
  138. }
  139. HasReconnectCount++;
  140. if (HasReconnectCount < 5)
  141. {
  142. if (Client == null)
  143. {
  144. Client = CreateClient();
  145. ClientConnect();
  146. }
  147. else
  148. {
  149. await Client.ReconnectAsync();
  150. }
  151. return;
  152. }
  153. string strMsg = $"Client Disconnected, Count:{ConnectedCount}\r\n ";
  154. if (arg.Exception != null)
  155. strMsg = strMsg + $"exception:{arg.Exception.Message}\r\n";
  156. if (arg.AuthenticateResult != null)
  157. strMsg = strMsg + $"authReason:{arg.AuthenticateResult.ReasonString}\r\n";
  158. this.LogInfo(strMsg);
  159. });
  160. client.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(ReceivedHandlerBase);
  161. return client;
  162. }
  163. /// <summary>
  164. /// 连接服务器
  165. /// </summary>
  166. public void ClientConnect()
  167. {
  168. var options = CreateClientOptions();
  169. AsyncHelper.RunSync(() => Client.ConnectAsync(options));
  170. if (ClientId.IsNotEmpty())
  171. {
  172. AsyncHelper.RunSync(() => Client.SubscribeAsync(ClientId));
  173. }
  174. }
  175. /// <summary>
  176. /// 客户端配置
  177. /// </summary>
  178. /// <returns></returns>
  179. private MqttClientOptions CreateClientOptions()
  180. {
  181. if (NotValidate)
  182. {
  183. throw new UserFriendlyException("客户端配置参数不合法,请检查后再试!");
  184. }
  185. var options = new MqttClientOptions
  186. {
  187. ClientId = ClientId,
  188. ChannelOptions = new MqttClientTcpOptions() { Server = ServerIp, Port = ServerPort },
  189. Credentials = new MqttClientCredentials()
  190. {
  191. Username = ServerUserId,
  192. Password = System.Text.Encoding.Default.GetBytes(ServerPassword)
  193. }
  194. };
  195. return options;
  196. }
  197. /// <summary>
  198. ///连接数变化
  199. /// </summary>
  200. /// <param name="chg"></param>
  201. private void ChangeCount(int chg)
  202. {
  203. lock (_lock)
  204. {
  205. ConnectedCount += chg;
  206. }
  207. }
  208. /// <summary>
  209. /// 接受消息后处理
  210. /// </summary>
  211. /// <param name="arg"></param>
  212. private void ReceivedHandlerBase(MqttApplicationMessageReceivedEventArgs arg)
  213. {
  214. string payload = System.Text.Encoding.UTF8.GetString(arg.ApplicationMessage.Payload);
  215. this.LogInfo($"Received-Message】, topic:[{arg.ApplicationMessage.Topic}], payload: [{payload}], connectedCount:{ConnectedCount}");
  216. ReceivedHandler?.HandleApplicationMessageReceivedAsync(arg);
  217. }
  218. }
  219. }