LowLevelMqttClient.cs 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. using MQTTnet.Adapter;
  2. using MQTTnet.Client.Options;
  3. using MQTTnet.Diagnostics;
  4. using MQTTnet.Packets;
  5. using System;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. namespace MQTTnet.LowLevelClient
  9. {
  10. public sealed class LowLevelMqttClient : ILowLevelMqttClient
  11. {
  12. readonly IMqttNetLogger _logger;
  13. readonly IMqttClientAdapterFactory _clientAdapterFactory;
  14. IMqttChannelAdapter _adapter;
  15. IMqttClientOptions _options;
  16. public LowLevelMqttClient(IMqttClientAdapterFactory clientAdapterFactory, IMqttNetLogger logger)
  17. {
  18. if (clientAdapterFactory is null) throw new ArgumentNullException(nameof(clientAdapterFactory));
  19. if (logger is null) throw new ArgumentNullException(nameof(logger));
  20. _clientAdapterFactory = clientAdapterFactory;
  21. _logger = logger.CreateChildLogger(nameof(LowLevelMqttClient));
  22. }
  23. bool IsConnected => _adapter != null;
  24. public async Task ConnectAsync(IMqttClientOptions options, CancellationToken cancellationToken)
  25. {
  26. if (options is null) throw new ArgumentNullException(nameof(options));
  27. if (_adapter != null)
  28. {
  29. throw new InvalidOperationException("Low level MQTT client is already connected. Disconnect first before connecting again.");
  30. }
  31. var newAdapter = _clientAdapterFactory.CreateClientAdapter(options, _logger);
  32. try
  33. {
  34. _logger.Verbose($"Trying to connect with server '{options.ChannelOptions}' (Timeout={options.CommunicationTimeout}).");
  35. await newAdapter.ConnectAsync(options.CommunicationTimeout, cancellationToken).ConfigureAwait(false);
  36. _logger.Verbose("Connection with server established.");
  37. _options = options;
  38. }
  39. catch (Exception)
  40. {
  41. _adapter.Dispose();
  42. throw;
  43. }
  44. _adapter = newAdapter;
  45. }
  46. public async Task DisconnectAsync(CancellationToken cancellationToken)
  47. {
  48. if (_adapter == null)
  49. {
  50. return;
  51. }
  52. await SafeDisconnect(cancellationToken).ConfigureAwait(false);
  53. _adapter = null;
  54. }
  55. public async Task SendAsync(MqttBasePacket packet, CancellationToken cancellationToken)
  56. {
  57. if (packet is null) throw new ArgumentNullException(nameof(packet));
  58. if (_adapter == null)
  59. {
  60. throw new InvalidOperationException("Low level MQTT client is not connected.");
  61. }
  62. try
  63. {
  64. await _adapter.SendPacketAsync(packet, _options.CommunicationTimeout, cancellationToken).ConfigureAwait(false);
  65. }
  66. catch (Exception)
  67. {
  68. await SafeDisconnect(cancellationToken).ConfigureAwait(false);
  69. throw;
  70. }
  71. }
  72. public async Task<MqttBasePacket> ReceiveAsync(CancellationToken cancellationToken)
  73. {
  74. if (_adapter == null)
  75. {
  76. throw new InvalidOperationException("Low level MQTT client is not connected.");
  77. }
  78. try
  79. {
  80. return await _adapter.ReceivePacketAsync(_options.CommunicationTimeout, cancellationToken).ConfigureAwait(false);
  81. }
  82. catch (Exception)
  83. {
  84. await SafeDisconnect(cancellationToken).ConfigureAwait(false);
  85. throw;
  86. }
  87. }
  88. public void Dispose()
  89. {
  90. _adapter?.Dispose();
  91. }
  92. async Task SafeDisconnect(CancellationToken cancellationToken)
  93. {
  94. try
  95. {
  96. await _adapter.DisconnectAsync(_options.CommunicationTimeout, cancellationToken).ConfigureAwait(false);
  97. }
  98. catch (Exception exception)
  99. {
  100. _logger.Error(exception, "Error while disconnecting.");
  101. }
  102. finally
  103. {
  104. _adapter.Dispose();
  105. }
  106. }
  107. }
  108. }