ManagedMqttClient.cs 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590
  1. using MQTTnet.Client;
  2. using MQTTnet.Client.Connecting;
  3. using MQTTnet.Client.Disconnecting;
  4. using MQTTnet.Client.Publishing;
  5. using MQTTnet.Client.Receiving;
  6. using MQTTnet.Diagnostics;
  7. using MQTTnet.Exceptions;
  8. using MQTTnet.Internal;
  9. using MQTTnet.Protocol;
  10. using MQTTnet.Server;
  11. using System;
  12. using System.Collections.Generic;
  13. using System.Linq;
  14. using System.Threading;
  15. using System.Threading.Tasks;
  16. namespace MQTTnet.Extensions.ManagedClient
  17. {
  18. public class ManagedMqttClient : Disposable, IManagedMqttClient
  19. {
  20. private readonly BlockingQueue<ManagedMqttApplicationMessage> _messageQueue = new BlockingQueue<ManagedMqttApplicationMessage>();
  21. /// <summary>
  22. /// The subscriptions are managed in 2 separate buckets:
  23. /// <see cref="_subscriptions"/> and <see cref="_unsubscriptions"/> are processed during normal operation
  24. /// and are moved to the <see cref="_reconnectSubscriptions"/> when they get processed. They can be accessed by
  25. /// any thread and are therefore mutex'ed. <see cref="_reconnectSubscriptions"/> get sent to the broker
  26. /// at reconnect and are solely owned by <see cref="MaintainConnectionAsync"/>.
  27. /// </summary>
  28. private readonly Dictionary<string, MqttQualityOfServiceLevel> _reconnectSubscriptions = new Dictionary<string, MqttQualityOfServiceLevel>();
  29. private readonly Dictionary<string, MqttQualityOfServiceLevel> _subscriptions = new Dictionary<string, MqttQualityOfServiceLevel>();
  30. private readonly HashSet<string> _unsubscriptions = new HashSet<string>();
  31. private readonly SemaphoreSlim _subscriptionsQueuedSignal = new SemaphoreSlim(0);
  32. private readonly IMqttClient _mqttClient;
  33. private readonly IMqttNetLogger _logger;
  34. private readonly AsyncLock _messageQueueLock = new AsyncLock();
  35. private CancellationTokenSource _connectionCancellationToken;
  36. private CancellationTokenSource _publishingCancellationToken;
  37. private Task _maintainConnectionTask;
  38. private ManagedMqttClientStorageManager _storageManager;
  39. public ManagedMqttClient(IMqttClient mqttClient, IMqttNetLogger logger)
  40. {
  41. _mqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient));
  42. if (logger == null) throw new ArgumentNullException(nameof(logger));
  43. _logger = logger.CreateChildLogger(nameof(ManagedMqttClient));
  44. }
  45. public bool IsConnected => _mqttClient.IsConnected;
  46. public bool IsStarted => _connectionCancellationToken != null;
  47. public int PendingApplicationMessagesCount => _messageQueue.Count;
  48. public IManagedMqttClientOptions Options { get; private set; }
  49. public IMqttClientConnectedHandler ConnectedHandler
  50. {
  51. get => _mqttClient.ConnectedHandler;
  52. set => _mqttClient.ConnectedHandler = value;
  53. }
  54. public IMqttClientDisconnectedHandler DisconnectedHandler
  55. {
  56. get => _mqttClient.DisconnectedHandler;
  57. set => _mqttClient.DisconnectedHandler = value;
  58. }
  59. public IMqttApplicationMessageReceivedHandler ApplicationMessageReceivedHandler
  60. {
  61. get => _mqttClient.ApplicationMessageReceivedHandler;
  62. set => _mqttClient.ApplicationMessageReceivedHandler = value;
  63. }
  64. public IApplicationMessageProcessedHandler ApplicationMessageProcessedHandler { get; set; }
  65. public IApplicationMessageSkippedHandler ApplicationMessageSkippedHandler { get; set; }
  66. public IConnectingFailedHandler ConnectingFailedHandler { get; set; }
  67. public ISynchronizingSubscriptionsFailedHandler SynchronizingSubscriptionsFailedHandler { get; set; }
  68. public async Task StartAsync(IManagedMqttClientOptions options)
  69. {
  70. ThrowIfDisposed();
  71. if (options == null) throw new ArgumentNullException(nameof(options));
  72. if (options.ClientOptions == null) throw new ArgumentException("The client options are not set.", nameof(options));
  73. if (!_maintainConnectionTask?.IsCompleted ?? false) throw new InvalidOperationException("The managed client is already started.");
  74. Options = options;
  75. if (Options.Storage != null)
  76. {
  77. _storageManager = new ManagedMqttClientStorageManager(Options.Storage);
  78. var messages = await _storageManager.LoadQueuedMessagesAsync().ConfigureAwait(false);
  79. foreach (var message in messages)
  80. {
  81. _messageQueue.Enqueue(message);
  82. }
  83. }
  84. _connectionCancellationToken = new CancellationTokenSource();
  85. _maintainConnectionTask = Task.Run(() => MaintainConnectionAsync(_connectionCancellationToken.Token), _connectionCancellationToken.Token);
  86. _maintainConnectionTask.Forget(_logger);
  87. _logger.Info("Started");
  88. }
  89. public async Task StopAsync()
  90. {
  91. ThrowIfDisposed();
  92. StopPublishing();
  93. StopMaintainingConnection();
  94. _messageQueue.Clear();
  95. if (_maintainConnectionTask != null)
  96. {
  97. await Task.WhenAny(_maintainConnectionTask);
  98. _maintainConnectionTask = null;
  99. }
  100. }
  101. public Task PingAsync(CancellationToken cancellationToken)
  102. {
  103. return _mqttClient.PingAsync(cancellationToken);
  104. }
  105. public async Task<MqttClientPublishResult> PublishAsync(MqttApplicationMessage applicationMessage, CancellationToken cancellationToken)
  106. {
  107. ThrowIfDisposed();
  108. if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));
  109. await PublishAsync(new ManagedMqttApplicationMessageBuilder().WithApplicationMessage(applicationMessage).Build()).ConfigureAwait(false);
  110. return new MqttClientPublishResult();
  111. }
  112. public async Task PublishAsync(ManagedMqttApplicationMessage applicationMessage)
  113. {
  114. ThrowIfDisposed();
  115. if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));
  116. if (Options == null) throw new InvalidOperationException("call StartAsync before publishing messages");
  117. MqttTopicValidator.ThrowIfInvalid(applicationMessage.ApplicationMessage.Topic);
  118. ManagedMqttApplicationMessage removedMessage = null;
  119. ApplicationMessageSkippedEventArgs applicationMessageSkippedEventArgs = null;
  120. try
  121. {
  122. using (await _messageQueueLock.WaitAsync(CancellationToken.None).ConfigureAwait(false))
  123. {
  124. if (_messageQueue.Count >= Options.MaxPendingMessages)
  125. {
  126. if (Options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropNewMessage)
  127. {
  128. _logger.Verbose("Skipping publish of new application message because internal queue is full.");
  129. applicationMessageSkippedEventArgs = new ApplicationMessageSkippedEventArgs(applicationMessage);
  130. return;
  131. }
  132. if (Options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage)
  133. {
  134. removedMessage = _messageQueue.RemoveFirst();
  135. _logger.Verbose("Removed oldest application message from internal queue because it is full.");
  136. applicationMessageSkippedEventArgs = new ApplicationMessageSkippedEventArgs(removedMessage);
  137. }
  138. }
  139. _messageQueue.Enqueue(applicationMessage);
  140. if (_storageManager != null)
  141. {
  142. if (removedMessage != null)
  143. {
  144. await _storageManager.RemoveAsync(removedMessage).ConfigureAwait(false);
  145. }
  146. await _storageManager.AddAsync(applicationMessage).ConfigureAwait(false);
  147. }
  148. }
  149. }
  150. finally
  151. {
  152. if (applicationMessageSkippedEventArgs != null)
  153. {
  154. var applicationMessageSkippedHandler = ApplicationMessageSkippedHandler;
  155. if (applicationMessageSkippedHandler != null)
  156. {
  157. await applicationMessageSkippedHandler.HandleApplicationMessageSkippedAsync(applicationMessageSkippedEventArgs).ConfigureAwait(false);
  158. }
  159. }
  160. }
  161. }
  162. public Task SubscribeAsync(IEnumerable<MqttTopicFilter> topicFilters)
  163. {
  164. ThrowIfDisposed();
  165. if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));
  166. lock (_subscriptions)
  167. {
  168. foreach (var topicFilter in topicFilters)
  169. {
  170. _subscriptions[topicFilter.Topic] = topicFilter.QualityOfServiceLevel;
  171. _unsubscriptions.Remove(topicFilter.Topic);
  172. }
  173. }
  174. _subscriptionsQueuedSignal.Release();
  175. return Task.FromResult(0);
  176. }
  177. public Task UnsubscribeAsync(IEnumerable<string> topics)
  178. {
  179. ThrowIfDisposed();
  180. if (topics == null) throw new ArgumentNullException(nameof(topics));
  181. lock (_subscriptions)
  182. {
  183. foreach (var topic in topics)
  184. {
  185. _subscriptions.Remove(topic);
  186. _unsubscriptions.Add(topic);
  187. }
  188. }
  189. _subscriptionsQueuedSignal.Release();
  190. return Task.FromResult(0);
  191. }
  192. protected override void Dispose(bool disposing)
  193. {
  194. if (disposing)
  195. {
  196. StopPublishing();
  197. StopMaintainingConnection();
  198. if (_maintainConnectionTask != null)
  199. {
  200. _maintainConnectionTask.GetAwaiter().GetResult();
  201. _maintainConnectionTask = null;
  202. }
  203. _messageQueue.Dispose();
  204. _messageQueueLock.Dispose();
  205. _mqttClient.Dispose();
  206. _subscriptionsQueuedSignal.Dispose();
  207. }
  208. base.Dispose(disposing);
  209. }
  210. private async Task MaintainConnectionAsync(CancellationToken cancellationToken)
  211. {
  212. try
  213. {
  214. while (!cancellationToken.IsCancellationRequested)
  215. {
  216. await TryMaintainConnectionAsync(cancellationToken).ConfigureAwait(false);
  217. }
  218. }
  219. catch (OperationCanceledException)
  220. {
  221. }
  222. catch (Exception exception)
  223. {
  224. _logger.Error(exception, "Error exception while maintaining connection.");
  225. }
  226. finally
  227. {
  228. if (!IsDisposed)
  229. {
  230. try
  231. {
  232. await _mqttClient.DisconnectAsync().ConfigureAwait(false);
  233. }
  234. catch (Exception exception)
  235. {
  236. _logger.Error(exception, "Error while disconnecting.");
  237. }
  238. _logger.Info("Stopped");
  239. }
  240. _reconnectSubscriptions.Clear();
  241. lock (_subscriptions)
  242. {
  243. _subscriptions.Clear();
  244. _unsubscriptions.Clear();
  245. }
  246. }
  247. }
  248. private async Task TryMaintainConnectionAsync(CancellationToken cancellationToken)
  249. {
  250. try
  251. {
  252. var connectionState = await ReconnectIfRequiredAsync().ConfigureAwait(false);
  253. if (connectionState == ReconnectionResult.NotConnected)
  254. {
  255. StopPublishing();
  256. await Task.Delay(Options.AutoReconnectDelay, cancellationToken).ConfigureAwait(false);
  257. return;
  258. }
  259. if (connectionState == ReconnectionResult.Reconnected)
  260. {
  261. await PublishReconnectSubscriptionsAsync().ConfigureAwait(false);
  262. StartPublishing();
  263. return;
  264. }
  265. if (connectionState == ReconnectionResult.Recovered)
  266. {
  267. StartPublishing();
  268. return;
  269. }
  270. if (connectionState == ReconnectionResult.StillConnected)
  271. {
  272. await PublishSubscriptionsAsync(Options.ConnectionCheckInterval, cancellationToken).ConfigureAwait(false);
  273. }
  274. }
  275. catch (OperationCanceledException)
  276. {
  277. }
  278. catch (MqttCommunicationException exception)
  279. {
  280. _logger.Warning(exception, "Communication error while maintaining connection.");
  281. }
  282. catch (Exception exception)
  283. {
  284. _logger.Error(exception, "Error exception while maintaining connection.");
  285. }
  286. }
  287. private async Task PublishQueuedMessagesAsync(CancellationToken cancellationToken)
  288. {
  289. try
  290. {
  291. while (!cancellationToken.IsCancellationRequested && _mqttClient.IsConnected)
  292. {
  293. // Peek at the message without dequeueing in order to prevent the
  294. // possibility of the queue growing beyond the configured cap.
  295. // Previously, messages could be re-enqueued if there was an
  296. // exception, and this re-enqueueing did not honor the cap.
  297. // Furthermore, because re-enqueueing would shuffle the order
  298. // of the messages, the DropOldestQueuedMessage strategy would
  299. // be unable to know which message is actually the oldest and would
  300. // instead drop the first item in the queue.
  301. var message = _messageQueue.PeekAndWait(cancellationToken);
  302. if (message == null)
  303. {
  304. continue;
  305. }
  306. cancellationToken.ThrowIfCancellationRequested();
  307. await TryPublishQueuedMessageAsync(message).ConfigureAwait(false);
  308. }
  309. }
  310. catch (OperationCanceledException)
  311. {
  312. }
  313. catch (Exception exception)
  314. {
  315. _logger.Error(exception, "Error while publishing queued application messages.");
  316. }
  317. finally
  318. {
  319. _logger.Verbose("Stopped publishing messages.");
  320. }
  321. }
  322. private async Task TryPublishQueuedMessageAsync(ManagedMqttApplicationMessage message)
  323. {
  324. Exception transmitException = null;
  325. try
  326. {
  327. await _mqttClient.PublishAsync(message.ApplicationMessage).ConfigureAwait(false);
  328. using (await _messageQueueLock.WaitAsync(CancellationToken.None).ConfigureAwait(false)) //lock to avoid conflict with this.PublishAsync
  329. {
  330. // While publishing this message, this.PublishAsync could have booted this
  331. // message off the queue to make room for another (when using a cap
  332. // with the DropOldestQueuedMessage strategy). If the first item
  333. // in the queue is equal to this message, then it's safe to remove
  334. // it from the queue. If not, that means this.PublishAsync has already
  335. // removed it, in which case we don't want to do anything.
  336. _messageQueue.RemoveFirst(i => i.Id.Equals(message.Id));
  337. if (_storageManager != null)
  338. {
  339. await _storageManager.RemoveAsync(message).ConfigureAwait(false);
  340. }
  341. }
  342. }
  343. catch (MqttCommunicationException exception)
  344. {
  345. transmitException = exception;
  346. _logger.Warning(exception, $"Publishing application ({message.Id}) message failed.");
  347. if (message.ApplicationMessage.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce)
  348. {
  349. //If QoS 0, we don't want this message to stay on the queue.
  350. //If QoS 1 or 2, it's possible that, when using a cap, this message
  351. //has been booted off the queue by this.PublishAsync, in which case this
  352. //thread will not continue to try to publish it. While this does
  353. //contradict the expected behavior of QoS 1 and 2, that's also true
  354. //for the usage of a message queue cap, so it's still consistent
  355. //with prior behavior in that way.
  356. using (await _messageQueueLock.WaitAsync(CancellationToken.None).ConfigureAwait(false)) //lock to avoid conflict with this.PublishAsync
  357. {
  358. _messageQueue.RemoveFirst(i => i.Id.Equals(message.Id));
  359. if (_storageManager != null)
  360. {
  361. await _storageManager.RemoveAsync(message).ConfigureAwait(false);
  362. }
  363. }
  364. }
  365. }
  366. catch (Exception exception)
  367. {
  368. transmitException = exception;
  369. _logger.Error(exception, $"Error while publishing application message ({message.Id}).");
  370. }
  371. finally
  372. {
  373. var eventHandler = ApplicationMessageProcessedHandler;
  374. if (eventHandler != null)
  375. {
  376. var eventArguments = new ApplicationMessageProcessedEventArgs(message, transmitException);
  377. await eventHandler.HandleApplicationMessageProcessedAsync(eventArguments).ConfigureAwait(false);
  378. }
  379. }
  380. }
  381. private async Task PublishSubscriptionsAsync(TimeSpan timeout, CancellationToken cancellationToken)
  382. {
  383. var endTime = DateTime.UtcNow + timeout;
  384. while (await _subscriptionsQueuedSignal.WaitAsync(GetRemainingTime(endTime), cancellationToken).ConfigureAwait(false))
  385. {
  386. List<MqttTopicFilter> subscriptions;
  387. HashSet<string> unsubscriptions;
  388. lock (_subscriptions)
  389. {
  390. subscriptions = _subscriptions.Select(i => new MqttTopicFilter { Topic = i.Key, QualityOfServiceLevel = i.Value }).ToList();
  391. _subscriptions.Clear();
  392. unsubscriptions = new HashSet<string>(_unsubscriptions);
  393. _unsubscriptions.Clear();
  394. }
  395. if (!subscriptions.Any() && !unsubscriptions.Any())
  396. {
  397. continue;
  398. }
  399. _logger.Verbose($"Publishing subscriptions ({subscriptions.Count} subscriptions and {unsubscriptions.Count} unsubscriptions)");
  400. foreach (var unsubscription in unsubscriptions)
  401. {
  402. _reconnectSubscriptions.Remove(unsubscription);
  403. }
  404. foreach (var subscription in subscriptions)
  405. {
  406. _reconnectSubscriptions[subscription.Topic] = subscription.QualityOfServiceLevel;
  407. }
  408. try
  409. {
  410. if (unsubscriptions.Any())
  411. {
  412. await _mqttClient.UnsubscribeAsync(unsubscriptions.ToArray()).ConfigureAwait(false);
  413. }
  414. if (subscriptions.Any())
  415. {
  416. await _mqttClient.SubscribeAsync(subscriptions.ToArray()).ConfigureAwait(false);
  417. }
  418. }
  419. catch (Exception exception)
  420. {
  421. await HandleSubscriptionExceptionAsync(exception).ConfigureAwait(false);
  422. }
  423. }
  424. }
  425. private async Task PublishReconnectSubscriptionsAsync()
  426. {
  427. _logger.Info("Publishing subscriptions at reconnect");
  428. try
  429. {
  430. if (_reconnectSubscriptions.Any())
  431. {
  432. var subscriptions = _reconnectSubscriptions.Select(i => new MqttTopicFilter { Topic = i.Key, QualityOfServiceLevel = i.Value });
  433. await _mqttClient.SubscribeAsync(subscriptions.ToArray()).ConfigureAwait(false);
  434. }
  435. }
  436. catch (Exception exception)
  437. {
  438. await HandleSubscriptionExceptionAsync(exception).ConfigureAwait(false);
  439. }
  440. }
  441. private async Task HandleSubscriptionExceptionAsync(Exception exception)
  442. {
  443. _logger.Warning(exception, "Synchronizing subscriptions failed.");
  444. var synchronizingSubscriptionsFailedHandler = SynchronizingSubscriptionsFailedHandler;
  445. if (SynchronizingSubscriptionsFailedHandler != null)
  446. {
  447. await synchronizingSubscriptionsFailedHandler.HandleSynchronizingSubscriptionsFailedAsync(new ManagedProcessFailedEventArgs(exception)).ConfigureAwait(false);
  448. }
  449. }
  450. private async Task<ReconnectionResult> ReconnectIfRequiredAsync()
  451. {
  452. if (_mqttClient.IsConnected)
  453. {
  454. return ReconnectionResult.StillConnected;
  455. }
  456. try
  457. {
  458. var result = await _mqttClient.ConnectAsync(Options.ClientOptions).ConfigureAwait(false);
  459. return result.IsSessionPresent ? ReconnectionResult.Recovered : ReconnectionResult.Reconnected;
  460. }
  461. catch (Exception exception)
  462. {
  463. var connectingFailedHandler = ConnectingFailedHandler;
  464. if (connectingFailedHandler != null)
  465. {
  466. await connectingFailedHandler.HandleConnectingFailedAsync(new ManagedProcessFailedEventArgs(exception)).ConfigureAwait(false);
  467. }
  468. return ReconnectionResult.NotConnected;
  469. }
  470. }
  471. private void StartPublishing()
  472. {
  473. if (_publishingCancellationToken != null)
  474. {
  475. StopPublishing();
  476. }
  477. var cts = new CancellationTokenSource();
  478. _publishingCancellationToken = cts;
  479. Task.Run(() => PublishQueuedMessagesAsync(cts.Token), cts.Token).Forget(_logger);
  480. }
  481. private void StopPublishing()
  482. {
  483. _publishingCancellationToken?.Cancel(false);
  484. _publishingCancellationToken?.Dispose();
  485. _publishingCancellationToken = null;
  486. }
  487. private void StopMaintainingConnection()
  488. {
  489. _connectionCancellationToken?.Cancel(false);
  490. _connectionCancellationToken?.Dispose();
  491. _connectionCancellationToken = null;
  492. }
  493. private TimeSpan GetRemainingTime(DateTime endTime)
  494. {
  495. var remainingTime = endTime - DateTime.UtcNow;
  496. return remainingTime < TimeSpan.Zero ? TimeSpan.Zero : remainingTime;
  497. }
  498. }
  499. }