MqttPacketReader.cs 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. using System;
  2. using System.Threading;
  3. using System.Threading.Tasks;
  4. using MQTTnet.Channel;
  5. using MQTTnet.Exceptions;
  6. namespace MQTTnet.Formatter
  7. {
  8. public class MqttPacketReader
  9. {
  10. private readonly byte[] _singleByteBuffer = new byte[1];
  11. private readonly IMqttChannel _channel;
  12. public MqttPacketReader(IMqttChannel channel)
  13. {
  14. _channel = channel ?? throw new ArgumentNullException(nameof(channel));
  15. }
  16. public async Task<ReadFixedHeaderResult> ReadFixedHeaderAsync(byte[] fixedHeaderBuffer, CancellationToken cancellationToken)
  17. {
  18. // The MQTT fixed header contains 1 byte of flags and at least 1 byte for the remaining data length.
  19. // So in all cases at least 2 bytes must be read for a complete MQTT packet.
  20. var buffer = fixedHeaderBuffer;
  21. var totalBytesRead = 0;
  22. while (totalBytesRead < buffer.Length)
  23. {
  24. var bytesRead = await _channel.ReadAsync(buffer, totalBytesRead, buffer.Length - totalBytesRead, cancellationToken).ConfigureAwait(false);
  25. if (cancellationToken.IsCancellationRequested)
  26. {
  27. return null;
  28. }
  29. if (bytesRead == 0)
  30. {
  31. return new ReadFixedHeaderResult
  32. {
  33. ConnectionClosed = true
  34. };
  35. }
  36. totalBytesRead += bytesRead;
  37. }
  38. var hasRemainingLength = buffer[1] != 0;
  39. if (!hasRemainingLength)
  40. {
  41. return new ReadFixedHeaderResult
  42. {
  43. FixedHeader = new MqttFixedHeader(buffer[0], 0, totalBytesRead)
  44. };
  45. }
  46. var bodyLength = await ReadBodyLengthAsync(buffer[1], cancellationToken).ConfigureAwait(false);
  47. if (!bodyLength.HasValue)
  48. {
  49. return new ReadFixedHeaderResult
  50. {
  51. ConnectionClosed = true
  52. };
  53. }
  54. totalBytesRead += bodyLength.Value;
  55. return new ReadFixedHeaderResult
  56. {
  57. FixedHeader = new MqttFixedHeader(buffer[0], bodyLength.Value, totalBytesRead)
  58. };
  59. }
  60. private async Task<int?> ReadBodyLengthAsync(byte initialEncodedByte, CancellationToken cancellationToken)
  61. {
  62. var offset = 0;
  63. var multiplier = 128;
  64. var value = initialEncodedByte & 127;
  65. int encodedByte = initialEncodedByte;
  66. while ((encodedByte & 128) != 0)
  67. {
  68. offset++;
  69. if (offset > 3)
  70. {
  71. throw new MqttProtocolViolationException("Remaining length is invalid.");
  72. }
  73. if (cancellationToken.IsCancellationRequested)
  74. {
  75. return null;
  76. }
  77. var readCount = await _channel.ReadAsync(_singleByteBuffer, 0, 1, cancellationToken).ConfigureAwait(false);
  78. if (cancellationToken.IsCancellationRequested)
  79. {
  80. return null;
  81. }
  82. if (readCount == 0)
  83. {
  84. return null;
  85. }
  86. encodedByte = _singleByteBuffer[0];
  87. value += (encodedByte & 127) * multiplier;
  88. multiplier *= 128;
  89. }
  90. return value;
  91. }
  92. }
  93. }