MqttClient.cs 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767
  1. using MQTTnet.Adapter;
  2. using MQTTnet.Client.Connecting;
  3. using MQTTnet.Client.Disconnecting;
  4. using MQTTnet.Client.ExtendedAuthenticationExchange;
  5. using MQTTnet.Client.Options;
  6. using MQTTnet.Client.Publishing;
  7. using MQTTnet.Client.Receiving;
  8. using MQTTnet.Client.Subscribing;
  9. using MQTTnet.Client.Unsubscribing;
  10. using MQTTnet.Diagnostics;
  11. using MQTTnet.Exceptions;
  12. using MQTTnet.Internal;
  13. using MQTTnet.PacketDispatcher;
  14. using MQTTnet.Packets;
  15. using MQTTnet.Protocol;
  16. using System;
  17. using System.Diagnostics;
  18. using System.Threading;
  19. using System.Threading.Tasks;
  20. namespace MQTTnet.Client
  21. {
  22. public class MqttClient : Disposable, IMqttClient
  23. {
  24. readonly MqttPacketIdentifierProvider _packetIdentifierProvider = new MqttPacketIdentifierProvider();
  25. readonly MqttPacketDispatcher _packetDispatcher = new MqttPacketDispatcher();
  26. readonly Stopwatch _sendTracker = new Stopwatch();
  27. readonly Stopwatch _receiveTracker = new Stopwatch();
  28. readonly object _disconnectLock = new object();
  29. readonly IMqttClientAdapterFactory _adapterFactory;
  30. readonly IMqttNetLogger _logger;
  31. CancellationTokenSource _backgroundCancellationTokenSource;
  32. Task _packetReceiverTask;
  33. Task _keepAlivePacketsSenderTask;
  34. Task _publishPacketReceiverTask;
  35. AsyncQueue<MqttPublishPacket> _publishPacketReceiverQueue;
  36. IMqttChannelAdapter _adapter;
  37. bool _cleanDisconnectInitiated;
  38. long _isDisconnectPending;
  39. bool _isConnected;
  40. public MqttClient(IMqttClientAdapterFactory channelFactory, IMqttNetLogger logger)
  41. {
  42. if (logger == null) throw new ArgumentNullException(nameof(logger));
  43. _adapterFactory = channelFactory ?? throw new ArgumentNullException(nameof(channelFactory));
  44. _logger = logger.CreateChildLogger(nameof(MqttClient));
  45. }
  46. public IMqttClientConnectedHandler ConnectedHandler { get; set; }
  47. public IMqttClientDisconnectedHandler DisconnectedHandler { get; set; }
  48. public IMqttApplicationMessageReceivedHandler ApplicationMessageReceivedHandler { get; set; }
  49. public bool IsConnected
  50. {
  51. get
  52. {
  53. return _isConnected && Interlocked.Read(ref _isDisconnectPending) == 0;
  54. }
  55. }
  56. public IMqttClientOptions Options { get; private set; }
  57. public async Task<MqttClientAuthenticateResult> ConnectAsync(IMqttClientOptions options, CancellationToken cancellationToken)
  58. {
  59. if (options == null) throw new ArgumentNullException(nameof(options));
  60. if (options.ChannelOptions == null) throw new ArgumentException("ChannelOptions are not set.");
  61. ThrowIfConnected("It is not allowed to connect with a server after the connection is established.");
  62. ThrowIfDisposed();
  63. MqttClientAuthenticateResult authenticateResult = null;
  64. try
  65. {
  66. Options = options;
  67. _packetIdentifierProvider.Reset();
  68. _packetDispatcher.Reset();
  69. _backgroundCancellationTokenSource = new CancellationTokenSource();
  70. var backgroundCancellationToken = _backgroundCancellationTokenSource.Token;
  71. _isDisconnectPending = 0;
  72. var adapter = _adapterFactory.CreateClientAdapter(options, _logger);
  73. _adapter = adapter;
  74. using (var combined = CancellationTokenSource.CreateLinkedTokenSource(backgroundCancellationToken, cancellationToken))
  75. {
  76. _logger.Verbose($"Trying to connect with server '{options.ChannelOptions}' (Timeout={options.CommunicationTimeout}).");
  77. await _adapter.ConnectAsync(options.CommunicationTimeout, combined.Token).ConfigureAwait(false);
  78. _logger.Verbose("Connection with server established.");
  79. _publishPacketReceiverQueue = new AsyncQueue<MqttPublishPacket>();
  80. _publishPacketReceiverTask = Task.Run(() => ProcessReceivedPublishPackets(backgroundCancellationToken), backgroundCancellationToken);
  81. _packetReceiverTask = Task.Run(() => TryReceivePacketsAsync(backgroundCancellationToken), backgroundCancellationToken);
  82. authenticateResult = await AuthenticateAsync(adapter, options.WillMessage, combined.Token).ConfigureAwait(false);
  83. }
  84. _sendTracker.Restart();
  85. _receiveTracker.Restart();
  86. if (Options.KeepAlivePeriod != TimeSpan.Zero)
  87. {
  88. _keepAlivePacketsSenderTask = Task.Run(() => TrySendKeepAliveMessagesAsync(backgroundCancellationToken), backgroundCancellationToken);
  89. }
  90. _isConnected = true;
  91. _logger.Info("Connected.");
  92. var connectedHandler = ConnectedHandler;
  93. if (connectedHandler != null)
  94. {
  95. await connectedHandler.HandleConnectedAsync(new MqttClientConnectedEventArgs(authenticateResult)).ConfigureAwait(false);
  96. }
  97. return authenticateResult;
  98. }
  99. catch (Exception exception)
  100. {
  101. _logger.Error(exception, "Error while connecting with server.");
  102. if (!DisconnectIsPending())
  103. {
  104. await DisconnectInternalAsync(null, exception, authenticateResult).ConfigureAwait(false);
  105. }
  106. throw;
  107. }
  108. }
  109. public async Task DisconnectAsync(MqttClientDisconnectOptions options, CancellationToken cancellationToken)
  110. {
  111. if (options is null) throw new ArgumentNullException(nameof(options));
  112. ThrowIfDisposed();
  113. try
  114. {
  115. _cleanDisconnectInitiated = true;
  116. if (IsConnected)
  117. {
  118. var disconnectPacket = _adapter.PacketFormatterAdapter.DataConverter.CreateDisconnectPacket(options);
  119. await SendAsync(disconnectPacket, cancellationToken).ConfigureAwait(false);
  120. }
  121. }
  122. finally
  123. {
  124. if (!DisconnectIsPending())
  125. {
  126. await DisconnectInternalAsync(null, null, null).ConfigureAwait(false);
  127. }
  128. }
  129. }
  130. public async Task PingAsync(CancellationToken cancellationToken)
  131. {
  132. await SendAndReceiveAsync<MqttPingRespPacket>(new MqttPingReqPacket(), cancellationToken).ConfigureAwait(false);
  133. }
  134. public Task SendExtendedAuthenticationExchangeDataAsync(MqttExtendedAuthenticationExchangeData data, CancellationToken cancellationToken)
  135. {
  136. if (data == null) throw new ArgumentNullException(nameof(data));
  137. ThrowIfDisposed();
  138. ThrowIfNotConnected();
  139. return SendAsync(new MqttAuthPacket
  140. {
  141. Properties = new MqttAuthPacketProperties
  142. {
  143. // This must always be equal to the value from the CONNECT packet. So we use it here to ensure that.
  144. AuthenticationMethod = Options.AuthenticationMethod,
  145. AuthenticationData = data.AuthenticationData,
  146. ReasonString = data.ReasonString,
  147. UserProperties = data.UserProperties
  148. }
  149. }, cancellationToken);
  150. }
  151. public async Task<MqttClientSubscribeResult> SubscribeAsync(MqttClientSubscribeOptions options, CancellationToken cancellationToken)
  152. {
  153. if (options == null) throw new ArgumentNullException(nameof(options));
  154. ThrowIfDisposed();
  155. ThrowIfNotConnected();
  156. var subscribePacket = _adapter.PacketFormatterAdapter.DataConverter.CreateSubscribePacket(options);
  157. subscribePacket.PacketIdentifier = _packetIdentifierProvider.GetNextPacketIdentifier();
  158. var subAckPacket = await SendAndReceiveAsync<MqttSubAckPacket>(subscribePacket, cancellationToken).ConfigureAwait(false);
  159. return _adapter.PacketFormatterAdapter.DataConverter.CreateClientSubscribeResult(subscribePacket, subAckPacket);
  160. }
  161. public async Task<MqttClientUnsubscribeResult> UnsubscribeAsync(MqttClientUnsubscribeOptions options, CancellationToken cancellationToken)
  162. {
  163. if (options == null) throw new ArgumentNullException(nameof(options));
  164. ThrowIfDisposed();
  165. ThrowIfNotConnected();
  166. var unsubscribePacket = _adapter.PacketFormatterAdapter.DataConverter.CreateUnsubscribePacket(options);
  167. unsubscribePacket.PacketIdentifier = _packetIdentifierProvider.GetNextPacketIdentifier();
  168. var unsubAckPacket = await SendAndReceiveAsync<MqttUnsubAckPacket>(unsubscribePacket, cancellationToken).ConfigureAwait(false);
  169. return _adapter.PacketFormatterAdapter.DataConverter.CreateClientUnsubscribeResult(unsubscribePacket, unsubAckPacket);
  170. }
  171. public Task<MqttClientPublishResult> PublishAsync(MqttApplicationMessage applicationMessage, CancellationToken cancellationToken)
  172. {
  173. if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));
  174. MqttTopicValidator.ThrowIfInvalid(applicationMessage.Topic);
  175. ThrowIfDisposed();
  176. ThrowIfNotConnected();
  177. var publishPacket = _adapter.PacketFormatterAdapter.DataConverter.CreatePublishPacket(applicationMessage);
  178. switch (applicationMessage.QualityOfServiceLevel)
  179. {
  180. case MqttQualityOfServiceLevel.AtMostOnce:
  181. {
  182. return PublishAtMostOnce(publishPacket, cancellationToken);
  183. }
  184. case MqttQualityOfServiceLevel.AtLeastOnce:
  185. {
  186. return PublishAtLeastOnceAsync(publishPacket, cancellationToken);
  187. }
  188. case MqttQualityOfServiceLevel.ExactlyOnce:
  189. {
  190. return PublishExactlyOnceAsync(publishPacket, cancellationToken);
  191. }
  192. default:
  193. {
  194. throw new NotSupportedException();
  195. }
  196. }
  197. }
  198. void Cleanup()
  199. {
  200. _backgroundCancellationTokenSource?.Cancel(false);
  201. _backgroundCancellationTokenSource?.Dispose();
  202. _backgroundCancellationTokenSource = null;
  203. _publishPacketReceiverQueue?.Dispose();
  204. _publishPacketReceiverQueue = null;
  205. _adapter?.Dispose();
  206. }
  207. protected override void Dispose(bool disposing)
  208. {
  209. if (disposing)
  210. {
  211. Cleanup();
  212. DisconnectedHandler = null;
  213. }
  214. base.Dispose(disposing);
  215. }
  216. async Task<MqttClientAuthenticateResult> AuthenticateAsync(IMqttChannelAdapter channelAdapter, MqttApplicationMessage willApplicationMessage, CancellationToken cancellationToken)
  217. {
  218. var connectPacket = channelAdapter.PacketFormatterAdapter.DataConverter.CreateConnectPacket(
  219. willApplicationMessage,
  220. Options);
  221. var connAckPacket = await SendAndReceiveAsync<MqttConnAckPacket>(connectPacket, cancellationToken).ConfigureAwait(false);
  222. var result = channelAdapter.PacketFormatterAdapter.DataConverter.CreateClientConnectResult(connAckPacket);
  223. if (result.ResultCode != MqttClientConnectResultCode.Success)
  224. {
  225. throw new MqttConnectingFailedException(result);
  226. }
  227. _logger.Verbose("Authenticated MQTT connection with server established.");
  228. return result;
  229. }
  230. void ThrowIfNotConnected()
  231. {
  232. if (!IsConnected || Interlocked.Read(ref _isDisconnectPending) == 1) throw new MqttCommunicationException("The client is not connected.");
  233. }
  234. void ThrowIfConnected(string message)
  235. {
  236. if (IsConnected) throw new MqttProtocolViolationException(message);
  237. }
  238. async Task DisconnectInternalAsync(Task sender, Exception exception, MqttClientAuthenticateResult authenticateResult)
  239. {
  240. var clientWasConnected = IsConnected;
  241. TryInitiateDisconnect();
  242. _isConnected = false;
  243. try
  244. {
  245. if (_adapter != null)
  246. {
  247. _logger.Verbose("Disconnecting [Timeout={0}]", Options.CommunicationTimeout);
  248. await _adapter.DisconnectAsync(Options.CommunicationTimeout, CancellationToken.None).ConfigureAwait(false);
  249. }
  250. _logger.Verbose("Disconnected from adapter.");
  251. }
  252. catch (Exception adapterException)
  253. {
  254. _logger.Warning(adapterException, "Error while disconnecting from adapter.");
  255. }
  256. try
  257. {
  258. var receiverTask = WaitForTaskAsync(_packetReceiverTask, sender);
  259. var publishPacketReceiverTask = WaitForTaskAsync(_publishPacketReceiverTask, sender);
  260. var keepAliveTask = WaitForTaskAsync(_keepAlivePacketsSenderTask, sender);
  261. await Task.WhenAll(receiverTask, publishPacketReceiverTask, keepAliveTask).ConfigureAwait(false);
  262. _publishPacketReceiverQueue?.Dispose();
  263. }
  264. catch (Exception e)
  265. {
  266. _logger.Warning(e, "Error while waiting for internal tasks.");
  267. }
  268. finally
  269. {
  270. Cleanup();
  271. _cleanDisconnectInitiated = false;
  272. _logger.Info("Disconnected.");
  273. var disconnectedHandler = DisconnectedHandler;
  274. if (disconnectedHandler != null)
  275. {
  276. // This handler must be executed in a new thread because otherwise a dead lock may happen
  277. // when trying to reconnect in that handler etc.
  278. Task.Run(() => disconnectedHandler.HandleDisconnectedAsync(new MqttClientDisconnectedEventArgs(clientWasConnected, exception, authenticateResult))).Forget(_logger);
  279. }
  280. }
  281. }
  282. void TryInitiateDisconnect()
  283. {
  284. lock (_disconnectLock)
  285. {
  286. try
  287. {
  288. if (_backgroundCancellationTokenSource?.IsCancellationRequested == true)
  289. {
  290. return;
  291. }
  292. _backgroundCancellationTokenSource?.Cancel(false);
  293. }
  294. catch (Exception exception)
  295. {
  296. _logger.Warning(exception, "Error while initiating disconnect.");
  297. }
  298. }
  299. }
  300. private Task SendAsync(MqttBasePacket packet, CancellationToken cancellationToken)
  301. {
  302. if (cancellationToken.IsCancellationRequested)
  303. {
  304. return Task.FromResult(0);
  305. }
  306. _sendTracker.Restart();
  307. return _adapter.SendPacketAsync(packet, Options.CommunicationTimeout, cancellationToken);
  308. }
  309. async Task<TResponsePacket> SendAndReceiveAsync<TResponsePacket>(MqttBasePacket requestPacket, CancellationToken cancellationToken) where TResponsePacket : MqttBasePacket
  310. {
  311. cancellationToken.ThrowIfCancellationRequested();
  312. _sendTracker.Restart();
  313. ushort identifier = 0;
  314. if (requestPacket is IMqttPacketWithIdentifier packetWithIdentifier && packetWithIdentifier.PacketIdentifier.HasValue)
  315. {
  316. identifier = packetWithIdentifier.PacketIdentifier.Value;
  317. }
  318. using (var packetAwaiter = _packetDispatcher.AddAwaiter<TResponsePacket>(identifier))
  319. {
  320. try
  321. {
  322. await _adapter.SendPacketAsync(requestPacket, Options.CommunicationTimeout, cancellationToken).ConfigureAwait(false);
  323. }
  324. catch (Exception e)
  325. {
  326. _logger.Warning(e, "Error when sending packet of type '{0}'.", typeof(TResponsePacket).Name);
  327. packetAwaiter.Cancel();
  328. }
  329. try
  330. {
  331. var response = await packetAwaiter.WaitOneAsync(Options.CommunicationTimeout).ConfigureAwait(false);
  332. _receiveTracker.Restart();
  333. return response;
  334. }
  335. catch (Exception exception)
  336. {
  337. if (exception is MqttCommunicationTimedOutException)
  338. {
  339. _logger.Warning(null, "Timeout while waiting for packet of type '{0}'.", typeof(TResponsePacket).Name);
  340. }
  341. throw;
  342. }
  343. }
  344. }
  345. async Task TrySendKeepAliveMessagesAsync(CancellationToken cancellationToken)
  346. {
  347. try
  348. {
  349. _logger.Verbose("Start sending keep alive packets.");
  350. while (!cancellationToken.IsCancellationRequested)
  351. {
  352. // Values described here: [MQTT-3.1.2-24].
  353. var keepAliveSendInterval = TimeSpan.FromSeconds(Options.KeepAlivePeriod.TotalSeconds * 0.75);
  354. if (Options.KeepAliveSendInterval.HasValue)
  355. {
  356. keepAliveSendInterval = Options.KeepAliveSendInterval.Value;
  357. }
  358. var waitTimeSend = keepAliveSendInterval - _sendTracker.Elapsed;
  359. var waitTimeReceive = keepAliveSendInterval - _receiveTracker.Elapsed;
  360. if (waitTimeSend <= TimeSpan.Zero || waitTimeReceive <= TimeSpan.Zero)
  361. {
  362. await SendAndReceiveAsync<MqttPingRespPacket>(new MqttPingReqPacket(), cancellationToken).ConfigureAwait(false);
  363. }
  364. await Task.Delay(keepAliveSendInterval, cancellationToken).ConfigureAwait(false);
  365. }
  366. }
  367. catch (Exception exception)
  368. {
  369. if (_cleanDisconnectInitiated)
  370. {
  371. return;
  372. }
  373. if (exception is OperationCanceledException)
  374. {
  375. }
  376. else if (exception is MqttCommunicationException)
  377. {
  378. _logger.Warning(exception, "Communication error while sending/receiving keep alive packets.");
  379. }
  380. else
  381. {
  382. _logger.Error(exception, "Error exception while sending/receiving keep alive packets.");
  383. }
  384. if (!DisconnectIsPending())
  385. {
  386. await DisconnectInternalAsync(_keepAlivePacketsSenderTask, exception, null).ConfigureAwait(false);
  387. }
  388. }
  389. finally
  390. {
  391. _logger.Verbose("Stopped sending keep alive packets.");
  392. }
  393. }
  394. async Task TryReceivePacketsAsync(CancellationToken cancellationToken)
  395. {
  396. try
  397. {
  398. _logger.Verbose("Start receiving packets.");
  399. while (!cancellationToken.IsCancellationRequested)
  400. {
  401. var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero, cancellationToken).ConfigureAwait(false);
  402. if (cancellationToken.IsCancellationRequested)
  403. {
  404. return;
  405. }
  406. if (packet == null)
  407. {
  408. if (!DisconnectIsPending())
  409. {
  410. await DisconnectInternalAsync(_packetReceiverTask, null, null).ConfigureAwait(false);
  411. }
  412. return;
  413. }
  414. await TryProcessReceivedPacketAsync(packet, cancellationToken).ConfigureAwait(false);
  415. }
  416. }
  417. catch (Exception exception)
  418. {
  419. if (_cleanDisconnectInitiated)
  420. {
  421. return;
  422. }
  423. if (exception is OperationCanceledException)
  424. {
  425. }
  426. else if (exception is MqttCommunicationException)
  427. {
  428. _logger.Warning(exception, "Communication error while receiving packets.");
  429. }
  430. else
  431. {
  432. _logger.Error(exception, "Error while receiving packets.");
  433. }
  434. _packetDispatcher.Dispatch(exception);
  435. if (!DisconnectIsPending())
  436. {
  437. await DisconnectInternalAsync(_packetReceiverTask, exception, null).ConfigureAwait(false);
  438. }
  439. }
  440. finally
  441. {
  442. _logger.Verbose("Stopped receiving packets.");
  443. }
  444. }
  445. async Task TryProcessReceivedPacketAsync(MqttBasePacket packet, CancellationToken cancellationToken)
  446. {
  447. try
  448. {
  449. _receiveTracker.Restart();
  450. if (packet is MqttPublishPacket publishPacket)
  451. {
  452. EnqueueReceivedPublishPacket(publishPacket);
  453. }
  454. else if (packet is MqttPubRelPacket pubRelPacket)
  455. {
  456. await SendAsync(new MqttPubCompPacket
  457. {
  458. PacketIdentifier = pubRelPacket.PacketIdentifier,
  459. ReasonCode = MqttPubCompReasonCode.Success
  460. }, cancellationToken).ConfigureAwait(false);
  461. }
  462. else if (packet is MqttPingReqPacket)
  463. {
  464. await SendAsync(new MqttPingRespPacket(), cancellationToken).ConfigureAwait(false);
  465. }
  466. else if (packet is MqttDisconnectPacket)
  467. {
  468. // Also dispatch disconnect to waiting threads to generate a proper exception.
  469. _packetDispatcher.Dispatch(packet);
  470. await DisconnectAsync(null, cancellationToken).ConfigureAwait(false);
  471. }
  472. else if (packet is MqttAuthPacket authPacket)
  473. {
  474. var extendedAuthenticationExchangeHandler = Options.ExtendedAuthenticationExchangeHandler;
  475. if (extendedAuthenticationExchangeHandler != null)
  476. {
  477. await extendedAuthenticationExchangeHandler.HandleRequestAsync(new MqttExtendedAuthenticationExchangeContext(authPacket, this)).ConfigureAwait(false);
  478. }
  479. }
  480. else
  481. {
  482. _packetDispatcher.Dispatch(packet);
  483. }
  484. }
  485. catch (Exception exception)
  486. {
  487. if (_cleanDisconnectInitiated)
  488. {
  489. return;
  490. }
  491. if (exception is OperationCanceledException)
  492. {
  493. }
  494. else if (exception is MqttCommunicationException)
  495. {
  496. _logger.Warning(exception, "Communication error while receiving packets.");
  497. }
  498. else
  499. {
  500. _logger.Error(exception, "Error while receiving packets.");
  501. }
  502. _packetDispatcher.Dispatch(exception);
  503. if (!DisconnectIsPending())
  504. {
  505. await DisconnectInternalAsync(_packetReceiverTask, exception, null).ConfigureAwait(false);
  506. }
  507. }
  508. }
  509. void EnqueueReceivedPublishPacket(MqttPublishPacket publishPacket)
  510. {
  511. try
  512. {
  513. _publishPacketReceiverQueue.Enqueue(publishPacket);
  514. }
  515. catch (Exception exception)
  516. {
  517. _logger.Error(exception, "Error while enqueueing application message.");
  518. }
  519. }
  520. async Task ProcessReceivedPublishPackets(CancellationToken cancellationToken)
  521. {
  522. while (!cancellationToken.IsCancellationRequested)
  523. {
  524. try
  525. {
  526. var publishPacketDequeueResult = await _publishPacketReceiverQueue.TryDequeueAsync(cancellationToken);
  527. if (!publishPacketDequeueResult.IsSuccess)
  528. {
  529. return;
  530. }
  531. var publishPacket = publishPacketDequeueResult.Item;
  532. if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce)
  533. {
  534. await HandleReceivedApplicationMessageAsync(publishPacket).ConfigureAwait(false);
  535. }
  536. else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce)
  537. {
  538. if (await HandleReceivedApplicationMessageAsync(publishPacket).ConfigureAwait(false))
  539. {
  540. await SendAsync(new MqttPubAckPacket
  541. {
  542. PacketIdentifier = publishPacket.PacketIdentifier,
  543. ReasonCode = MqttPubAckReasonCode.Success
  544. }, cancellationToken).ConfigureAwait(false);
  545. }
  546. }
  547. else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce)
  548. {
  549. if (await HandleReceivedApplicationMessageAsync(publishPacket).ConfigureAwait(false))
  550. {
  551. var pubRecPacket = new MqttPubRecPacket
  552. {
  553. PacketIdentifier = publishPacket.PacketIdentifier,
  554. ReasonCode = MqttPubRecReasonCode.Success
  555. };
  556. await SendAsync(pubRecPacket, cancellationToken).ConfigureAwait(false);
  557. }
  558. }
  559. else
  560. {
  561. throw new MqttProtocolViolationException("Received a not supported QoS level.");
  562. }
  563. }
  564. catch (Exception exception)
  565. {
  566. _logger.Error(exception, "Error while handling application message.");
  567. }
  568. }
  569. }
  570. async Task<MqttClientPublishResult> PublishAtMostOnce(MqttPublishPacket publishPacket, CancellationToken cancellationToken)
  571. {
  572. // No packet identifier is used for QoS 0 [3.3.2.2 Packet Identifier]
  573. await SendAsync(publishPacket, cancellationToken).ConfigureAwait(false);
  574. return _adapter.PacketFormatterAdapter.DataConverter.CreatePublishResult(null);
  575. }
  576. async Task<MqttClientPublishResult> PublishAtLeastOnceAsync(MqttPublishPacket publishPacket, CancellationToken cancellationToken)
  577. {
  578. publishPacket.PacketIdentifier = _packetIdentifierProvider.GetNextPacketIdentifier();
  579. var response = await SendAndReceiveAsync<MqttPubAckPacket>(publishPacket, cancellationToken).ConfigureAwait(false);
  580. return _adapter.PacketFormatterAdapter.DataConverter.CreatePublishResult(response);
  581. }
  582. async Task<MqttClientPublishResult> PublishExactlyOnceAsync(MqttPublishPacket publishPacket, CancellationToken cancellationToken)
  583. {
  584. publishPacket.PacketIdentifier = _packetIdentifierProvider.GetNextPacketIdentifier();
  585. var pubRecPacket = await SendAndReceiveAsync<MqttPubRecPacket>(publishPacket, cancellationToken).ConfigureAwait(false);
  586. var pubRelPacket = new MqttPubRelPacket
  587. {
  588. PacketIdentifier = publishPacket.PacketIdentifier,
  589. ReasonCode = MqttPubRelReasonCode.Success
  590. };
  591. var pubCompPacket = await SendAndReceiveAsync<MqttPubCompPacket>(pubRelPacket, cancellationToken).ConfigureAwait(false);
  592. return _adapter.PacketFormatterAdapter.DataConverter.CreatePublishResult(pubRecPacket, pubCompPacket);
  593. }
  594. async Task<bool> HandleReceivedApplicationMessageAsync(MqttPublishPacket publishPacket)
  595. {
  596. var applicationMessage = _adapter.PacketFormatterAdapter.DataConverter.CreateApplicationMessage(publishPacket);
  597. var handler = ApplicationMessageReceivedHandler;
  598. if (handler != null)
  599. {
  600. var eventArgs = new MqttApplicationMessageReceivedEventArgs(Options.ClientId, applicationMessage);
  601. await handler.HandleApplicationMessageReceivedAsync(eventArgs).ConfigureAwait(false);
  602. return !eventArgs.ProcessingFailed;
  603. }
  604. return true;
  605. }
  606. async Task WaitForTaskAsync(Task task, Task sender)
  607. {
  608. if (task == null)
  609. {
  610. return;
  611. }
  612. if (task == sender)
  613. {
  614. // Return here to avoid deadlocks, but first any eventual exception in the task
  615. // must be handled to avoid not getting an unhandled task exception
  616. if (!task.IsFaulted)
  617. {
  618. return;
  619. }
  620. // By accessing the Exception property the exception is considered handled and will
  621. // not result in an unhandled task exception later by the finalizer
  622. _logger.Warning(task.Exception, "Error while waiting for background task.");
  623. return;
  624. }
  625. try
  626. {
  627. await task.ConfigureAwait(false);
  628. }
  629. catch (OperationCanceledException)
  630. {
  631. }
  632. }
  633. bool DisconnectIsPending()
  634. {
  635. return Interlocked.CompareExchange(ref _isDisconnectPending, 1, 0) != 0;
  636. }
  637. }
  638. }