MqttClientKeepAliveMonitor.cs 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. using MQTTnet.Diagnostics;
  2. using MQTTnet.Internal;
  3. using System;
  4. using System.Diagnostics;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. namespace MQTTnet.Server
  8. {
  9. public class MqttClientKeepAliveMonitor
  10. {
  11. readonly Stopwatch _lastPacketReceivedTracker = new Stopwatch();
  12. readonly string _clientId;
  13. readonly Func<Task> _keepAliveElapsedCallback;
  14. readonly IMqttNetLogger _logger;
  15. bool _isPaused;
  16. public MqttClientKeepAliveMonitor(string clientId, Func<Task> keepAliveElapsedCallback, IMqttNetLogger logger)
  17. {
  18. _clientId = clientId ?? throw new ArgumentNullException(nameof(clientId));
  19. _keepAliveElapsedCallback = keepAliveElapsedCallback ?? throw new ArgumentNullException(nameof(keepAliveElapsedCallback));
  20. if (logger == null) throw new ArgumentNullException(nameof(logger));
  21. _logger = logger.CreateChildLogger(nameof(MqttClientKeepAliveMonitor));
  22. }
  23. public void Start(int keepAlivePeriod, CancellationToken cancellationToken)
  24. {
  25. if (keepAlivePeriod == 0)
  26. {
  27. return;
  28. }
  29. Task.Run(() => RunAsync(keepAlivePeriod, cancellationToken), cancellationToken).Forget(_logger);
  30. }
  31. public void Pause()
  32. {
  33. _isPaused = true;
  34. }
  35. public void Resume()
  36. {
  37. _isPaused = false;
  38. }
  39. public void PacketReceived()
  40. {
  41. _lastPacketReceivedTracker.Restart();
  42. }
  43. async Task RunAsync(int keepAlivePeriod, CancellationToken cancellationToken)
  44. {
  45. try
  46. {
  47. _lastPacketReceivedTracker.Restart();
  48. while (!cancellationToken.IsCancellationRequested)
  49. {
  50. // Values described here: [MQTT-3.1.2-24].
  51. // If the client sends 5 sec. the server will allow up to 7.5 seconds.
  52. // If the client sends 1 sec. the server will allow up to 1.5 seconds.
  53. if (!_isPaused && _lastPacketReceivedTracker.Elapsed.TotalSeconds >= keepAlivePeriod * 1.5D)
  54. {
  55. _logger.Warning(null, "Client '{0}': Did not receive any packet or keep alive signal.", _clientId);
  56. await _keepAliveElapsedCallback().ConfigureAwait(false);
  57. return;
  58. }
  59. // The server checks the keep alive timeout every 50 % of the overall keep alive timeout
  60. // because the server allows 1.5 times the keep alive value. This means that a value of 5 allows
  61. // up to 7.5 seconds. With an interval of 2.5 (5 / 2) the 7.5 is also affected. Waiting the whole
  62. // keep alive time will hit at 10 instead of 7.5 (but only one time instead of two times).
  63. await Task.Delay(TimeSpan.FromSeconds(keepAlivePeriod * 0.5D), cancellationToken).ConfigureAwait(false);
  64. }
  65. }
  66. catch (OperationCanceledException)
  67. {
  68. }
  69. catch (Exception exception)
  70. {
  71. _logger.Error(exception, "Client '{0}': Unhandled exception while checking keep alive timeouts.", _clientId);
  72. }
  73. finally
  74. {
  75. _logger.Verbose("Client '{0}': Stopped checking keep alive timeout.", _clientId);
  76. }
  77. }
  78. }
  79. }