| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128 |
- using MQTTnet.Adapter;
- using MQTTnet.Client.Options;
- using MQTTnet.Diagnostics;
- using MQTTnet.Packets;
- using System;
- using System.Threading;
- using System.Threading.Tasks;
- namespace MQTTnet.LowLevelClient
- {
- public sealed class LowLevelMqttClient : ILowLevelMqttClient
- {
- readonly IMqttNetLogger _logger;
- readonly IMqttClientAdapterFactory _clientAdapterFactory;
- IMqttChannelAdapter _adapter;
- IMqttClientOptions _options;
- public LowLevelMqttClient(IMqttClientAdapterFactory clientAdapterFactory, IMqttNetLogger logger)
- {
- if (clientAdapterFactory is null) throw new ArgumentNullException(nameof(clientAdapterFactory));
- if (logger is null) throw new ArgumentNullException(nameof(logger));
- _clientAdapterFactory = clientAdapterFactory;
- _logger = logger.CreateChildLogger(nameof(LowLevelMqttClient));
- }
- bool IsConnected => _adapter != null;
- public async Task ConnectAsync(IMqttClientOptions options, CancellationToken cancellationToken)
- {
- if (options is null) throw new ArgumentNullException(nameof(options));
- if (_adapter != null)
- {
- throw new InvalidOperationException("Low level MQTT client is already connected. Disconnect first before connecting again.");
- }
- var newAdapter = _clientAdapterFactory.CreateClientAdapter(options, _logger);
- try
- {
- _logger.Verbose($"Trying to connect with server '{options.ChannelOptions}' (Timeout={options.CommunicationTimeout}).");
- await newAdapter.ConnectAsync(options.CommunicationTimeout, cancellationToken).ConfigureAwait(false);
- _logger.Verbose("Connection with server established.");
- _options = options;
- }
- catch (Exception)
- {
- _adapter.Dispose();
- throw;
- }
- _adapter = newAdapter;
- }
- public async Task DisconnectAsync(CancellationToken cancellationToken)
- {
- if (_adapter == null)
- {
- return;
- }
- await SafeDisconnect(cancellationToken).ConfigureAwait(false);
- _adapter = null;
- }
- public async Task SendAsync(MqttBasePacket packet, CancellationToken cancellationToken)
- {
- if (packet is null) throw new ArgumentNullException(nameof(packet));
- if (_adapter == null)
- {
- throw new InvalidOperationException("Low level MQTT client is not connected.");
- }
- try
- {
- await _adapter.SendPacketAsync(packet, _options.CommunicationTimeout, cancellationToken).ConfigureAwait(false);
- }
- catch (Exception)
- {
- await SafeDisconnect(cancellationToken).ConfigureAwait(false);
- throw;
- }
- }
- public async Task<MqttBasePacket> ReceiveAsync(CancellationToken cancellationToken)
- {
- if (_adapter == null)
- {
- throw new InvalidOperationException("Low level MQTT client is not connected.");
- }
- try
- {
- return await _adapter.ReceivePacketAsync(_options.CommunicationTimeout, cancellationToken).ConfigureAwait(false);
- }
- catch (Exception)
- {
- await SafeDisconnect(cancellationToken).ConfigureAwait(false);
- throw;
- }
- }
- public void Dispose()
- {
- _adapter?.Dispose();
- }
- async Task SafeDisconnect(CancellationToken cancellationToken)
- {
- try
- {
- await _adapter.DisconnectAsync(_options.CommunicationTimeout, cancellationToken).ConfigureAwait(false);
- }
- catch (Exception exception)
- {
- _logger.Error(exception, "Error while disconnecting.");
- }
- finally
- {
- _adapter.Dispose();
- }
- }
- }
- }
|