MqttClientConnection.cs 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507
  1. using MQTTnet.Adapter;
  2. using MQTTnet.Client;
  3. using MQTTnet.Diagnostics;
  4. using MQTTnet.Exceptions;
  5. using MQTTnet.Formatter;
  6. using MQTTnet.Implementations;
  7. using MQTTnet.Internal;
  8. using MQTTnet.PacketDispatcher;
  9. using MQTTnet.Packets;
  10. using MQTTnet.Protocol;
  11. using MQTTnet.Server.Status;
  12. using System;
  13. using System.Collections.Generic;
  14. using System.Threading;
  15. using System.Threading.Tasks;
  16. namespace MQTTnet.Server
  17. {
  18. public sealed class MqttClientConnection : IDisposable
  19. {
  20. readonly MqttPacketIdentifierProvider _packetIdentifierProvider = new MqttPacketIdentifierProvider();
  21. readonly MqttPacketDispatcher _packetDispatcher = new MqttPacketDispatcher();
  22. readonly CancellationTokenSource _cancellationToken = new CancellationTokenSource();
  23. readonly IMqttRetainedMessagesManager _retainedMessagesManager;
  24. readonly Func<Task> _onStart;
  25. readonly Func<MqttClientDisconnectType, Task> _onStop;
  26. readonly MqttClientKeepAliveMonitor _keepAliveMonitor;
  27. readonly MqttClientSessionsManager _sessionsManager;
  28. readonly IMqttNetLogger _logger;
  29. readonly IMqttServerOptions _serverOptions;
  30. readonly IMqttChannelAdapter _channelAdapter;
  31. readonly IMqttDataConverter _dataConverter;
  32. readonly string _endpoint;
  33. readonly DateTime _connectedTimestamp;
  34. volatile Task _packageReceiverTask;
  35. DateTime _lastPacketReceivedTimestamp;
  36. DateTime _lastNonKeepAlivePacketReceivedTimestamp;
  37. long _receivedPacketsCount;
  38. long _sentPacketsCount = 1; // Start with 1 because the CONNECT packet is not counted anywhere.
  39. long _receivedApplicationMessagesCount;
  40. long _sentApplicationMessagesCount;
  41. volatile bool _isTakeover;
  42. public MqttClientConnection(
  43. MqttConnectPacket connectPacket,
  44. IMqttChannelAdapter channelAdapter,
  45. MqttClientSession session,
  46. IMqttServerOptions serverOptions,
  47. MqttClientSessionsManager sessionsManager,
  48. IMqttRetainedMessagesManager retainedMessagesManager,
  49. Func<Task> onStart,
  50. Func<MqttClientDisconnectType, Task> onStop,
  51. IMqttNetLogger logger)
  52. {
  53. Session = session ?? throw new ArgumentNullException(nameof(session));
  54. _serverOptions = serverOptions ?? throw new ArgumentNullException(nameof(serverOptions));
  55. _sessionsManager = sessionsManager ?? throw new ArgumentNullException(nameof(sessionsManager));
  56. _retainedMessagesManager = retainedMessagesManager ?? throw new ArgumentNullException(nameof(retainedMessagesManager));
  57. _onStart = onStart ?? throw new ArgumentNullException(nameof(onStart));
  58. _onStop = onStop ?? throw new ArgumentNullException(nameof(onStop));
  59. _channelAdapter = channelAdapter ?? throw new ArgumentNullException(nameof(channelAdapter));
  60. _dataConverter = _channelAdapter.PacketFormatterAdapter.DataConverter;
  61. _endpoint = _channelAdapter.Endpoint;
  62. ConnectPacket = connectPacket ?? throw new ArgumentNullException(nameof(connectPacket));
  63. if (logger == null) throw new ArgumentNullException(nameof(logger));
  64. _logger = logger.CreateChildLogger(nameof(MqttClientConnection));
  65. _keepAliveMonitor = new MqttClientKeepAliveMonitor(ConnectPacket.ClientId, () => StopAsync(), _logger);
  66. _connectedTimestamp = DateTime.UtcNow;
  67. _lastPacketReceivedTimestamp = _connectedTimestamp;
  68. _lastNonKeepAlivePacketReceivedTimestamp = _lastPacketReceivedTimestamp;
  69. }
  70. public MqttConnectPacket ConnectPacket { get; }
  71. public string ClientId => ConnectPacket.ClientId;
  72. public MqttClientSession Session { get; }
  73. public Task StopAsync(bool isTakeover = false)
  74. {
  75. _isTakeover = isTakeover;
  76. var task = _packageReceiverTask;
  77. StopInternal();
  78. if (task != null)
  79. {
  80. return task;
  81. }
  82. return PlatformAbstractionLayer.CompletedTask;
  83. }
  84. public void ResetStatistics()
  85. {
  86. _channelAdapter.ResetStatistics();
  87. }
  88. public void FillStatus(MqttClientStatus status)
  89. {
  90. status.ClientId = ClientId;
  91. status.Endpoint = _endpoint;
  92. status.ProtocolVersion = _channelAdapter.PacketFormatterAdapter.ProtocolVersion;
  93. status.ReceivedApplicationMessagesCount = Interlocked.Read(ref _receivedApplicationMessagesCount);
  94. status.SentApplicationMessagesCount = Interlocked.Read(ref _sentApplicationMessagesCount);
  95. status.ReceivedPacketsCount = Interlocked.Read(ref _receivedPacketsCount);
  96. status.SentPacketsCount = Interlocked.Read(ref _sentPacketsCount);
  97. status.ConnectedTimestamp = _connectedTimestamp;
  98. status.LastPacketReceivedTimestamp = _lastPacketReceivedTimestamp;
  99. status.LastNonKeepAlivePacketReceivedTimestamp = _lastNonKeepAlivePacketReceivedTimestamp;
  100. status.BytesSent = _channelAdapter.BytesSent;
  101. status.BytesReceived = _channelAdapter.BytesReceived;
  102. }
  103. public void Dispose()
  104. {
  105. _cancellationToken.Dispose();
  106. }
  107. public Task RunAsync(MqttConnectionValidatorContext connectionValidatorContext)
  108. {
  109. _packageReceiverTask = RunInternalAsync(connectionValidatorContext);
  110. return _packageReceiverTask;
  111. }
  112. async Task RunInternalAsync(MqttConnectionValidatorContext connectionValidatorContext)
  113. {
  114. var disconnectType = MqttClientDisconnectType.NotClean;
  115. try
  116. {
  117. await _onStart();
  118. _logger.Info("Client '{0}': Session started.", ClientId);
  119. _channelAdapter.ReadingPacketStartedCallback = OnAdapterReadingPacketStarted;
  120. _channelAdapter.ReadingPacketCompletedCallback = OnAdapterReadingPacketCompleted;
  121. Session.WillMessage = ConnectPacket.WillMessage;
  122. Task.Run(() => SendPendingPacketsAsync(_cancellationToken.Token), _cancellationToken.Token).Forget(_logger);
  123. // TODO: Change to single thread in SessionManager. Or use SessionManager and stats from KeepAliveMonitor.
  124. _keepAliveMonitor.Start(ConnectPacket.KeepAlivePeriod, _cancellationToken.Token);
  125. await SendAsync(
  126. _channelAdapter.PacketFormatterAdapter.DataConverter.CreateConnAckPacket(connectionValidatorContext)
  127. ).ConfigureAwait(false);
  128. Session.IsCleanSession = false;
  129. while (!_cancellationToken.IsCancellationRequested)
  130. {
  131. var packet = await _channelAdapter.ReceivePacketAsync(TimeSpan.Zero, _cancellationToken.Token).ConfigureAwait(false);
  132. if (packet == null)
  133. {
  134. // The client has closed the connection gracefully.
  135. break;
  136. }
  137. Interlocked.Increment(ref _sentPacketsCount);
  138. _lastPacketReceivedTimestamp = DateTime.UtcNow;
  139. if (!(packet is MqttPingReqPacket || packet is MqttPingRespPacket))
  140. {
  141. _lastNonKeepAlivePacketReceivedTimestamp = _lastPacketReceivedTimestamp;
  142. }
  143. _keepAliveMonitor.PacketReceived();
  144. if (packet is MqttPublishPacket publishPacket)
  145. {
  146. await HandleIncomingPublishPacketAsync(publishPacket).ConfigureAwait(false);
  147. continue;
  148. }
  149. if (packet is MqttPubRelPacket pubRelPacket)
  150. {
  151. var pubCompPacket = new MqttPubCompPacket
  152. {
  153. PacketIdentifier = pubRelPacket.PacketIdentifier,
  154. ReasonCode = MqttPubCompReasonCode.Success
  155. };
  156. await SendAsync(pubCompPacket).ConfigureAwait(false);
  157. continue;
  158. }
  159. if (packet is MqttSubscribePacket subscribePacket)
  160. {
  161. await HandleIncomingSubscribePacketAsync(subscribePacket).ConfigureAwait(false);
  162. continue;
  163. }
  164. if (packet is MqttUnsubscribePacket unsubscribePacket)
  165. {
  166. await HandleIncomingUnsubscribePacketAsync(unsubscribePacket).ConfigureAwait(false);
  167. continue;
  168. }
  169. if (packet is MqttPingReqPacket)
  170. {
  171. await SendAsync(new MqttPingRespPacket()).ConfigureAwait(false);
  172. continue;
  173. }
  174. if (packet is MqttDisconnectPacket)
  175. {
  176. Session.WillMessage = null;
  177. disconnectType = MqttClientDisconnectType.Clean;
  178. StopInternal();
  179. break;
  180. }
  181. _packetDispatcher.Dispatch(packet);
  182. }
  183. }
  184. catch (OperationCanceledException)
  185. {
  186. }
  187. catch (Exception exception)
  188. {
  189. if (exception is MqttCommunicationException)
  190. {
  191. _logger.Warning(exception, "Client '{0}': Communication exception while receiving client packets.", ClientId);
  192. }
  193. else
  194. {
  195. _logger.Error(exception, "Client '{0}': Error while receiving client packets.", ClientId);
  196. }
  197. StopInternal();
  198. }
  199. finally
  200. {
  201. if (_isTakeover)
  202. {
  203. disconnectType = MqttClientDisconnectType.Takeover;
  204. }
  205. if (Session.WillMessage != null)
  206. {
  207. _sessionsManager.DispatchApplicationMessage(Session.WillMessage, this);
  208. Session.WillMessage = null;
  209. }
  210. _packetDispatcher.Reset();
  211. _channelAdapter.ReadingPacketStartedCallback = null;
  212. _channelAdapter.ReadingPacketCompletedCallback = null;
  213. _logger.Info("Client '{0}': Connection stopped.", ClientId);
  214. _packageReceiverTask = null;
  215. try
  216. {
  217. await _onStop(disconnectType);
  218. }
  219. catch (Exception e)
  220. {
  221. _logger.Error(e, "client '{0}': Error while cleaning up", ClientId);
  222. }
  223. }
  224. }
  225. void StopInternal()
  226. {
  227. _cancellationToken.Cancel(false);
  228. }
  229. async Task EnqueueSubscribedRetainedMessagesAsync(ICollection<MqttTopicFilter> topicFilters)
  230. {
  231. var retainedMessages = await _retainedMessagesManager.GetSubscribedMessagesAsync(topicFilters).ConfigureAwait(false);
  232. foreach (var applicationMessage in retainedMessages)
  233. {
  234. Session.EnqueueApplicationMessage(applicationMessage, ClientId, true);
  235. }
  236. }
  237. async Task HandleIncomingSubscribePacketAsync(MqttSubscribePacket subscribePacket)
  238. {
  239. // TODO: Let the channel adapter create the packet.
  240. var subscribeResult = await Session.SubscriptionsManager.SubscribeAsync(subscribePacket, ConnectPacket).ConfigureAwait(false);
  241. await SendAsync(subscribeResult.ResponsePacket).ConfigureAwait(false);
  242. if (subscribeResult.CloseConnection)
  243. {
  244. StopInternal();
  245. return;
  246. }
  247. await EnqueueSubscribedRetainedMessagesAsync(subscribePacket.TopicFilters).ConfigureAwait(false);
  248. }
  249. async Task HandleIncomingUnsubscribePacketAsync(MqttUnsubscribePacket unsubscribePacket)
  250. {
  251. // TODO: Let the channel adapter create the packet.
  252. var unsubscribeResult = await Session.SubscriptionsManager.UnsubscribeAsync(unsubscribePacket).ConfigureAwait(false);
  253. await SendAsync(unsubscribeResult).ConfigureAwait(false);
  254. }
  255. Task HandleIncomingPublishPacketAsync(MqttPublishPacket publishPacket)
  256. {
  257. Interlocked.Increment(ref _sentApplicationMessagesCount);
  258. switch (publishPacket.QualityOfServiceLevel)
  259. {
  260. case MqttQualityOfServiceLevel.AtMostOnce:
  261. {
  262. return HandleIncomingPublishPacketWithQoS0Async(publishPacket);
  263. }
  264. case MqttQualityOfServiceLevel.AtLeastOnce:
  265. {
  266. return HandleIncomingPublishPacketWithQoS1Async(publishPacket);
  267. }
  268. case MqttQualityOfServiceLevel.ExactlyOnce:
  269. {
  270. return HandleIncomingPublishPacketWithQoS2Async(publishPacket);
  271. }
  272. default:
  273. {
  274. throw new MqttCommunicationException("Received a not supported QoS level.");
  275. }
  276. }
  277. }
  278. Task HandleIncomingPublishPacketWithQoS0Async(MqttPublishPacket publishPacket)
  279. {
  280. var applicationMessage = _dataConverter.CreateApplicationMessage(publishPacket);
  281. _sessionsManager.DispatchApplicationMessage(applicationMessage, this);
  282. return PlatformAbstractionLayer.CompletedTask;
  283. }
  284. Task HandleIncomingPublishPacketWithQoS1Async(MqttPublishPacket publishPacket)
  285. {
  286. var applicationMessage = _dataConverter.CreateApplicationMessage(publishPacket);
  287. _sessionsManager.DispatchApplicationMessage(applicationMessage, this);
  288. var pubAckPacket = _dataConverter.CreatePubAckPacket(publishPacket);
  289. return SendAsync(pubAckPacket);
  290. }
  291. Task HandleIncomingPublishPacketWithQoS2Async(MqttPublishPacket publishPacket)
  292. {
  293. var applicationMessage = _dataConverter.CreateApplicationMessage(publishPacket);
  294. _sessionsManager.DispatchApplicationMessage(applicationMessage, this);
  295. var pubRecPacket = new MqttPubRecPacket
  296. {
  297. PacketIdentifier = publishPacket.PacketIdentifier,
  298. ReasonCode = MqttPubRecReasonCode.Success
  299. };
  300. return SendAsync(pubRecPacket);
  301. }
  302. async Task SendPendingPacketsAsync(CancellationToken cancellationToken)
  303. {
  304. MqttQueuedApplicationMessage queuedApplicationMessage = null;
  305. MqttPublishPacket publishPacket = null;
  306. try
  307. {
  308. while (!cancellationToken.IsCancellationRequested)
  309. {
  310. queuedApplicationMessage = await Session.ApplicationMessagesQueue.TakeAsync(cancellationToken).ConfigureAwait(false);
  311. if (queuedApplicationMessage == null)
  312. {
  313. return;
  314. }
  315. if (cancellationToken.IsCancellationRequested)
  316. {
  317. return;
  318. }
  319. publishPacket = _dataConverter.CreatePublishPacket(queuedApplicationMessage.ApplicationMessage);
  320. publishPacket.QualityOfServiceLevel = queuedApplicationMessage.QualityOfServiceLevel;
  321. // Set the retain flag to true according to [MQTT-3.3.1-8] and [MQTT-3.3.1-9].
  322. publishPacket.Retain = queuedApplicationMessage.IsRetainedMessage;
  323. if (publishPacket.QualityOfServiceLevel > 0)
  324. {
  325. publishPacket.PacketIdentifier = _packetIdentifierProvider.GetNextPacketIdentifier();
  326. }
  327. if (_serverOptions.ClientMessageQueueInterceptor != null)
  328. {
  329. var context = new MqttClientMessageQueueInterceptorContext(
  330. queuedApplicationMessage.SenderClientId,
  331. ClientId,
  332. queuedApplicationMessage.ApplicationMessage);
  333. if (_serverOptions.ClientMessageQueueInterceptor != null)
  334. {
  335. await _serverOptions.ClientMessageQueueInterceptor.InterceptClientMessageQueueEnqueueAsync(context).ConfigureAwait(false);
  336. }
  337. if (!context.AcceptEnqueue || context.ApplicationMessage == null)
  338. {
  339. return;
  340. }
  341. publishPacket.Topic = context.ApplicationMessage.Topic;
  342. publishPacket.Payload = context.ApplicationMessage.Payload;
  343. publishPacket.QualityOfServiceLevel = context.ApplicationMessage.QualityOfServiceLevel;
  344. }
  345. if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce)
  346. {
  347. await SendAsync(publishPacket).ConfigureAwait(false);
  348. }
  349. else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce)
  350. {
  351. var awaiter = _packetDispatcher.AddAwaiter<MqttPubAckPacket>(publishPacket.PacketIdentifier);
  352. await SendAsync(publishPacket).ConfigureAwait(false);
  353. await awaiter.WaitOneAsync(_serverOptions.DefaultCommunicationTimeout).ConfigureAwait(false);
  354. }
  355. else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce)
  356. {
  357. using (var awaiter1 = _packetDispatcher.AddAwaiter<MqttPubRecPacket>(publishPacket.PacketIdentifier))
  358. using (var awaiter2 = _packetDispatcher.AddAwaiter<MqttPubCompPacket>(publishPacket.PacketIdentifier))
  359. {
  360. await SendAsync(publishPacket).ConfigureAwait(false);
  361. await awaiter1.WaitOneAsync(_serverOptions.DefaultCommunicationTimeout).ConfigureAwait(false);
  362. await SendAsync(new MqttPubRelPacket { PacketIdentifier = publishPacket.PacketIdentifier }).ConfigureAwait(false);
  363. await awaiter2.WaitOneAsync(_serverOptions.DefaultCommunicationTimeout).ConfigureAwait(false);
  364. }
  365. }
  366. _logger.Verbose("Queued application message sent (ClientId: {0}).", ClientId);
  367. }
  368. }
  369. catch (Exception exception)
  370. {
  371. if (exception is MqttCommunicationTimedOutException)
  372. {
  373. _logger.Warning(exception, "Sending publish packet failed: Timeout (ClientId: {0}).", ClientId);
  374. }
  375. else if (exception is MqttCommunicationException)
  376. {
  377. _logger.Warning(exception, "Sending publish packet failed: Communication exception (ClientId: {0}).", ClientId);
  378. }
  379. else if (exception is OperationCanceledException)
  380. {
  381. }
  382. else
  383. {
  384. _logger.Error(exception, "Sending publish packet failed (ClientId: {0}).", ClientId);
  385. }
  386. if (publishPacket?.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
  387. {
  388. queuedApplicationMessage.IsDuplicate = true;
  389. Session.ApplicationMessagesQueue.Enqueue(queuedApplicationMessage);
  390. }
  391. if (!_cancellationToken.Token.IsCancellationRequested)
  392. {
  393. await StopAsync().ConfigureAwait(false);
  394. }
  395. }
  396. }
  397. async Task SendAsync(MqttBasePacket packet)
  398. {
  399. await _channelAdapter.SendPacketAsync(packet, _serverOptions.DefaultCommunicationTimeout, _cancellationToken.Token).ConfigureAwait(false);
  400. Interlocked.Increment(ref _receivedPacketsCount);
  401. if (packet is MqttPublishPacket)
  402. {
  403. Interlocked.Increment(ref _receivedApplicationMessagesCount);
  404. }
  405. }
  406. void OnAdapterReadingPacketCompleted()
  407. {
  408. _keepAliveMonitor?.Resume();
  409. }
  410. void OnAdapterReadingPacketStarted()
  411. {
  412. _keepAliveMonitor?.Pause();
  413. }
  414. }
  415. }