| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114 |
- using System;
- using System.Threading;
- using System.Threading.Tasks;
- using MQTTnet.Channel;
- using MQTTnet.Exceptions;
- namespace MQTTnet.Formatter
- {
- public class MqttPacketReader
- {
- private readonly byte[] _singleByteBuffer = new byte[1];
- private readonly IMqttChannel _channel;
- public MqttPacketReader(IMqttChannel channel)
- {
- _channel = channel ?? throw new ArgumentNullException(nameof(channel));
- }
- public async Task<ReadFixedHeaderResult> ReadFixedHeaderAsync(byte[] fixedHeaderBuffer, CancellationToken cancellationToken)
- {
- // The MQTT fixed header contains 1 byte of flags and at least 1 byte for the remaining data length.
- // So in all cases at least 2 bytes must be read for a complete MQTT packet.
- var buffer = fixedHeaderBuffer;
- var totalBytesRead = 0;
- while (totalBytesRead < buffer.Length)
- {
- var bytesRead = await _channel.ReadAsync(buffer, totalBytesRead, buffer.Length - totalBytesRead, cancellationToken).ConfigureAwait(false);
- if (cancellationToken.IsCancellationRequested)
- {
- return null;
- }
-
- if (bytesRead == 0)
- {
- return new ReadFixedHeaderResult
- {
- ConnectionClosed = true
- };
- }
-
- totalBytesRead += bytesRead;
- }
- var hasRemainingLength = buffer[1] != 0;
- if (!hasRemainingLength)
- {
- return new ReadFixedHeaderResult
- {
- FixedHeader = new MqttFixedHeader(buffer[0], 0, totalBytesRead)
- };
- }
- var bodyLength = await ReadBodyLengthAsync(buffer[1], cancellationToken).ConfigureAwait(false);
- if (!bodyLength.HasValue)
- {
- return new ReadFixedHeaderResult
- {
- ConnectionClosed = true
- };
- }
- totalBytesRead += bodyLength.Value;
- return new ReadFixedHeaderResult
- {
- FixedHeader = new MqttFixedHeader(buffer[0], bodyLength.Value, totalBytesRead)
- };
- }
- private async Task<int?> ReadBodyLengthAsync(byte initialEncodedByte, CancellationToken cancellationToken)
- {
- var offset = 0;
- var multiplier = 128;
- var value = initialEncodedByte & 127;
- int encodedByte = initialEncodedByte;
- while ((encodedByte & 128) != 0)
- {
- offset++;
- if (offset > 3)
- {
- throw new MqttProtocolViolationException("Remaining length is invalid.");
- }
- if (cancellationToken.IsCancellationRequested)
- {
- return null;
- }
- var readCount = await _channel.ReadAsync(_singleByteBuffer, 0, 1, cancellationToken).ConfigureAwait(false);
- if (cancellationToken.IsCancellationRequested)
- {
- return null;
- }
- if (readCount == 0)
- {
- return null;
- }
- encodedByte = _singleByteBuffer[0];
- value += (encodedByte & 127) * multiplier;
- multiplier *= 128;
- }
- return value;
- }
- }
- }
|