using System;
using System.Runtime.CompilerServices;
using System.Text;
using MQTTnet.Protocol;
namespace MQTTnet.Formatter
{
///
/// This is a custom implementation of a memory stream which provides only MQTTnet relevant features.
/// The goal is to avoid lots of argument checks like in the original stream. The growth rule is the
/// same as for the original MemoryStream in .net. Also this implementation allows accessing the internal
/// buffer for all platforms and .net framework versions (which is not available at the regular MemoryStream).
///
public class MqttPacketWriter : IMqttPacketWriter
{
private static readonly ArraySegment ZeroVariableLengthIntegerArray = new ArraySegment(new byte[1], 0, 1);
private static readonly ArraySegment ZeroTwoByteIntegerArray = new ArraySegment(new byte[2], 0, 2);
public static int InitialBufferSize = 128;
public static int MaxBufferSize = 4096;
private byte[] _buffer = new byte[InitialBufferSize];
private int _offset;
public int Length { get; private set; }
public static byte BuildFixedHeader(MqttControlPacketType packetType, byte flags = 0)
{
var fixedHeader = (int)packetType << 4;
fixedHeader |= flags;
return (byte)fixedHeader;
}
public static int GetLengthOfVariableInteger(uint value)
{
var result = 0;
var x = value;
do
{
x = x / 128;
result++;
} while (x > 0);
return result;
}
public static ArraySegment EncodeVariableLengthInteger(uint value)
{
if (value == 0)
{
return ZeroVariableLengthIntegerArray;
}
if (value <= 127)
{
return new ArraySegment(new[] { (byte)value }, 0, 1);
}
var buffer = new byte[4];
var bufferOffset = 0;
var x = value;
do
{
var encodedByte = x % 128;
x = x / 128;
if (x > 0)
{
encodedByte = encodedByte | 128;
}
buffer[bufferOffset] = (byte)encodedByte;
bufferOffset++;
} while (x > 0);
return new ArraySegment(buffer, 0, bufferOffset);
}
public void WriteVariableLengthInteger(uint value)
{
Write(EncodeVariableLengthInteger(value));
}
public void WriteWithLengthPrefix(string value)
{
if (string.IsNullOrEmpty(value))
{
Write(ZeroTwoByteIntegerArray);
}
else
{
WriteWithLengthPrefix(Encoding.UTF8.GetBytes(value));
}
}
public void WriteWithLengthPrefix(byte[] value)
{
if (value == null || value.Length == 0)
{
Write(ZeroTwoByteIntegerArray);
}
else
{
EnsureAdditionalCapacity(value.Length + 2);
Write((ushort)value.Length);
Write(value, 0, value.Length);
}
}
public void Write(byte @byte)
{
EnsureAdditionalCapacity(1);
_buffer[_offset] = @byte;
IncreasePosition(1);
}
public void Write(ushort value)
{
EnsureAdditionalCapacity(2);
_buffer[_offset] = (byte)(value >> 8);
IncreasePosition(1);
_buffer[_offset] = (byte)value;
IncreasePosition(1);
}
public void Write(byte[] buffer, int offset, int count)
{
if (buffer == null) throw new ArgumentNullException(nameof(buffer));
if (count == 0)
{
return;
}
EnsureAdditionalCapacity(count);
Array.Copy(buffer, offset, _buffer, _offset, count);
IncreasePosition(count);
}
public void Write(IMqttPacketWriter propertyWriter)
{
if (propertyWriter == null) throw new ArgumentNullException(nameof(propertyWriter));
if (propertyWriter is MqttPacketWriter writer)
{
if (writer.Length == 0)
{
return;
}
Write(writer._buffer, 0, writer.Length);
return;
}
throw new InvalidOperationException($"{nameof(propertyWriter)} must be of type {typeof(MqttPacketWriter).Name}");
}
public void Reset(int length)
{
Length = length;
}
public void Seek(int position)
{
EnsureCapacity(position);
_offset = position;
}
public byte[] GetBuffer()
{
return _buffer;
}
public void FreeBuffer()
{
// This method frees the used memory by shrinking the buffer. This is required because the buffer
// is used across several messages. In general this is not a big issue because subsequent Ping packages
// have the same size but a very big publish package with 100 MB of payload will increase the buffer
// a lot and the size will never reduced. So this method tries to find a size which can be held in
// memory for a long time without causing troubles.
if (_buffer.Length < MaxBufferSize)
{
return;
}
Array.Resize(ref _buffer, MaxBufferSize);
}
private void Write(ArraySegment buffer)
{
Write(buffer.Array, buffer.Offset, buffer.Count);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void EnsureAdditionalCapacity(int additionalCapacity)
{
var freeSpace = _buffer.Length - _offset;
if (freeSpace >= additionalCapacity)
{
return;
}
EnsureCapacity(_buffer.Length + additionalCapacity - freeSpace);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void EnsureCapacity(int capacity)
{
var newBufferLength = _buffer.Length;
if (newBufferLength >= capacity)
{
return;
}
while (newBufferLength < capacity)
{
newBufferLength *= 2;
}
Array.Resize(ref _buffer, newBufferLength);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void IncreasePosition(int length)
{
_offset += length;
if (_offset > Length)
{
Length = _offset;
}
}
}
}