using System; using System.Threading; using System.Threading.Tasks; using MQTTnet.Channel; using MQTTnet.Exceptions; namespace MQTTnet.Formatter { public class MqttPacketReader { private readonly byte[] _singleByteBuffer = new byte[1]; private readonly IMqttChannel _channel; public MqttPacketReader(IMqttChannel channel) { _channel = channel ?? throw new ArgumentNullException(nameof(channel)); } public async Task ReadFixedHeaderAsync(byte[] fixedHeaderBuffer, CancellationToken cancellationToken) { // The MQTT fixed header contains 1 byte of flags and at least 1 byte for the remaining data length. // So in all cases at least 2 bytes must be read for a complete MQTT packet. var buffer = fixedHeaderBuffer; var totalBytesRead = 0; while (totalBytesRead < buffer.Length) { var bytesRead = await _channel.ReadAsync(buffer, totalBytesRead, buffer.Length - totalBytesRead, cancellationToken).ConfigureAwait(false); if (cancellationToken.IsCancellationRequested) { return null; } if (bytesRead == 0) { return new ReadFixedHeaderResult { ConnectionClosed = true }; } totalBytesRead += bytesRead; } var hasRemainingLength = buffer[1] != 0; if (!hasRemainingLength) { return new ReadFixedHeaderResult { FixedHeader = new MqttFixedHeader(buffer[0], 0, totalBytesRead) }; } var bodyLength = await ReadBodyLengthAsync(buffer[1], cancellationToken).ConfigureAwait(false); if (!bodyLength.HasValue) { return new ReadFixedHeaderResult { ConnectionClosed = true }; } totalBytesRead += bodyLength.Value; return new ReadFixedHeaderResult { FixedHeader = new MqttFixedHeader(buffer[0], bodyLength.Value, totalBytesRead) }; } private async Task ReadBodyLengthAsync(byte initialEncodedByte, CancellationToken cancellationToken) { var offset = 0; var multiplier = 128; var value = initialEncodedByte & 127; int encodedByte = initialEncodedByte; while ((encodedByte & 128) != 0) { offset++; if (offset > 3) { throw new MqttProtocolViolationException("Remaining length is invalid."); } if (cancellationToken.IsCancellationRequested) { return null; } var readCount = await _channel.ReadAsync(_singleByteBuffer, 0, 1, cancellationToken).ConfigureAwait(false); if (cancellationToken.IsCancellationRequested) { return null; } if (readCount == 0) { return null; } encodedByte = _singleByteBuffer[0]; value += (encodedByte & 127) * multiplier; multiplier *= 128; } return value; } } }