| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589 |
- using System;
- using System.Linq;
- using MQTTnet.Exceptions;
- using MQTTnet.Packets;
- using MQTTnet.Protocol;
- namespace MQTTnet.Formatter.V5
- {
- public class MqttV500PacketEncoder
- {
- private readonly IMqttPacketWriter _packetWriter;
- public MqttV500PacketEncoder()
- : this(new MqttPacketWriter())
- {
- }
- public MqttV500PacketEncoder(IMqttPacketWriter packetWriter)
- {
- _packetWriter = packetWriter;
- }
- public ArraySegment<byte> Encode(MqttBasePacket packet)
- {
- if (packet == null) throw new ArgumentNullException(nameof(packet));
- // Leave enough head space for max header size (fixed + 4 variable remaining length = 5 bytes)
- _packetWriter.Reset(5);
- _packetWriter.Seek(5);
- var fixedHeader = EncodePacket(packet, _packetWriter);
- var remainingLength = (uint)(_packetWriter.Length - 5);
- var remainingLengthSize = MqttPacketWriter.GetLengthOfVariableInteger(remainingLength);
-
- var headerSize = 1 + remainingLengthSize;
- var headerOffset = 5 - headerSize;
- // Position cursor on correct offset on beginning of array (has leading 0x0)
- _packetWriter.Seek(headerOffset);
- _packetWriter.Write(fixedHeader);
- _packetWriter.WriteVariableLengthInteger(remainingLength);
- var buffer = _packetWriter.GetBuffer();
- return new ArraySegment<byte>(buffer, headerOffset, _packetWriter.Length - headerOffset);
- }
- public void FreeBuffer()
- {
- _packetWriter.FreeBuffer();
- }
- private static byte EncodePacket(MqttBasePacket packet, IMqttPacketWriter packetWriter)
- {
- switch (packet)
- {
- case MqttConnectPacket connectPacket: return EncodeConnectPacket(connectPacket, packetWriter);
- case MqttConnAckPacket connAckPacket: return EncodeConnAckPacket(connAckPacket, packetWriter);
- case MqttDisconnectPacket disconnectPacket: return EncodeDisconnectPacket(disconnectPacket, packetWriter);
- case MqttPingReqPacket _: return EncodePingReqPacket();
- case MqttPingRespPacket _: return EncodePingRespPacket();
- case MqttPublishPacket publishPacket: return EncodePublishPacket(publishPacket, packetWriter);
- case MqttPubAckPacket pubAckPacket: return EncodePubAckPacket(pubAckPacket, packetWriter);
- case MqttPubRecPacket pubRecPacket: return EncodePubRecPacket(pubRecPacket, packetWriter);
- case MqttPubRelPacket pubRelPacket: return EncodePubRelPacket(pubRelPacket, packetWriter);
- case MqttPubCompPacket pubCompPacket: return EncodePubCompPacket(pubCompPacket, packetWriter);
- case MqttSubscribePacket subscribePacket: return EncodeSubscribePacket(subscribePacket, packetWriter);
- case MqttSubAckPacket subAckPacket: return EncodeSubAckPacket(subAckPacket, packetWriter);
- case MqttUnsubscribePacket unsubscribePacket: return EncodeUnsubscribePacket(unsubscribePacket, packetWriter);
- case MqttUnsubAckPacket unsubAckPacket: return EncodeUnsubAckPacket(unsubAckPacket, packetWriter);
- case MqttAuthPacket authPacket: return EncodeAuthPacket(authPacket, packetWriter);
- default: throw new MqttProtocolViolationException("Packet type invalid.");
- }
- }
- private static byte EncodeConnectPacket(MqttConnectPacket packet, IMqttPacketWriter packetWriter)
- {
- if (packet == null) throw new ArgumentNullException(nameof(packet));
- if (packetWriter == null) throw new ArgumentNullException(nameof(packetWriter));
- if (string.IsNullOrEmpty(packet.ClientId) && !packet.CleanSession)
- {
- throw new MqttProtocolViolationException("CleanSession must be set if ClientId is empty [MQTT-3.1.3-7].");
- }
- packetWriter.WriteWithLengthPrefix("MQTT");
- packetWriter.Write(5); // [3.1.2.2 Protocol Version]
- byte connectFlags = 0x0;
- if (packet.CleanSession)
- {
- connectFlags |= 0x2;
- }
- if (packet.WillMessage != null)
- {
- connectFlags |= 0x4;
- connectFlags |= (byte)((byte)packet.WillMessage.QualityOfServiceLevel << 3);
- if (packet.WillMessage.Retain)
- {
- connectFlags |= 0x20;
- }
- }
- if (packet.Password != null && packet.Username == null)
- {
- throw new MqttProtocolViolationException("If the User Name Flag is set to 0, the Password Flag MUST be set to 0 [MQTT-3.1.2-22].");
- }
- if (packet.Password != null)
- {
- connectFlags |= 0x40;
- }
- if (packet.Username != null)
- {
- connectFlags |= 0x80;
- }
- packetWriter.Write(connectFlags);
- packetWriter.Write(packet.KeepAlivePeriod);
- var propertiesWriter = new MqttV500PropertiesWriter();
- if (packet.Properties != null)
- {
- propertiesWriter.WriteSessionExpiryInterval(packet.Properties.SessionExpiryInterval);
- propertiesWriter.WriteAuthenticationMethod(packet.Properties.AuthenticationMethod);
- propertiesWriter.WriteAuthenticationData(packet.Properties.AuthenticationData);
- propertiesWriter.WriteRequestProblemInformation(packet.Properties.RequestProblemInformation);
- propertiesWriter.WriteRequestResponseInformation(packet.Properties.RequestResponseInformation);
- propertiesWriter.WriteReceiveMaximum(packet.Properties.ReceiveMaximum);
- propertiesWriter.WriteTopicAliasMaximum(packet.Properties.TopicAliasMaximum);
- propertiesWriter.WriteMaximumPacketSize(packet.Properties.MaximumPacketSize);
- propertiesWriter.WriteUserProperties(packet.Properties.UserProperties);
- }
- propertiesWriter.WriteTo(packetWriter);
- packetWriter.WriteWithLengthPrefix(packet.ClientId);
-
- if (packet.WillMessage != null)
- {
- var willPropertiesWriter = new MqttV500PropertiesWriter();
- willPropertiesWriter.WritePayloadFormatIndicator(packet.WillMessage.PayloadFormatIndicator);
- willPropertiesWriter.WriteMessageExpiryInterval(packet.WillMessage.MessageExpiryInterval);
- willPropertiesWriter.WriteTopicAlias(packet.WillMessage.TopicAlias);
- willPropertiesWriter.WriteResponseTopic(packet.WillMessage.ResponseTopic);
- willPropertiesWriter.WriteCorrelationData(packet.WillMessage.CorrelationData);
- willPropertiesWriter.WriteSubscriptionIdentifiers(packet.WillMessage.SubscriptionIdentifiers);
- willPropertiesWriter.WriteContentType(packet.WillMessage.ContentType);
- willPropertiesWriter.WriteUserProperties(packet.WillMessage.UserProperties);
-
- // This is a special case!
- willPropertiesWriter.WriteWillDelayInterval(packet.Properties?.WillDelayInterval);
- willPropertiesWriter.WriteTo(packetWriter);
- packetWriter.WriteWithLengthPrefix(packet.WillMessage.Topic);
- packetWriter.WriteWithLengthPrefix(packet.WillMessage.Payload);
- }
- if (packet.Username != null)
- {
- packetWriter.WriteWithLengthPrefix(packet.Username);
- }
- if (packet.Password != null)
- {
- packetWriter.WriteWithLengthPrefix(packet.Password);
- }
- return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Connect);
- }
- private static byte EncodeConnAckPacket(MqttConnAckPacket packet, IMqttPacketWriter packetWriter)
- {
- if (packet == null) throw new ArgumentNullException(nameof(packet));
- if (packetWriter == null) throw new ArgumentNullException(nameof(packetWriter));
- if (!packet.ReasonCode.HasValue)
- {
- ThrowReasonCodeNotSetException();
- }
- byte connectAcknowledgeFlags = 0x0;
- if (packet.IsSessionPresent)
- {
- connectAcknowledgeFlags |= 0x1;
- }
- packetWriter.Write(connectAcknowledgeFlags);
- packetWriter.Write((byte)packet.ReasonCode.Value);
- var propertiesWriter = new MqttV500PropertiesWriter();
- if (packet.Properties != null)
- {
- propertiesWriter.WriteSessionExpiryInterval(packet.Properties.SessionExpiryInterval);
- propertiesWriter.WriteAuthenticationMethod(packet.Properties.AuthenticationMethod);
- propertiesWriter.WriteAuthenticationData(packet.Properties.AuthenticationData);
- propertiesWriter.WriteRetainAvailable(packet.Properties.RetainAvailable);
- propertiesWriter.WriteReceiveMaximum(packet.Properties.ReceiveMaximum);
- propertiesWriter.WriteAssignedClientIdentifier(packet.Properties.AssignedClientIdentifier);
- propertiesWriter.WriteTopicAliasMaximum(packet.Properties.TopicAliasMaximum);
- propertiesWriter.WriteReasonString(packet.Properties.ReasonString);
- propertiesWriter.WriteMaximumPacketSize(packet.Properties.MaximumPacketSize);
- propertiesWriter.WriteWildcardSubscriptionAvailable(packet.Properties.WildcardSubscriptionAvailable);
- propertiesWriter.WriteSubscriptionIdentifiersAvailable(packet.Properties.SubscriptionIdentifiersAvailable);
- propertiesWriter.WriteSharedSubscriptionAvailable(packet.Properties.SharedSubscriptionAvailable);
- propertiesWriter.WriteServerKeepAlive(packet.Properties.ServerKeepAlive);
- propertiesWriter.WriteResponseInformation(packet.Properties.ResponseInformation);
- propertiesWriter.WriteServerReference(packet.Properties.ServerReference);
- propertiesWriter.WriteUserProperties(packet.Properties.UserProperties);
- }
- propertiesWriter.WriteTo(packetWriter);
- return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.ConnAck);
- }
- private static byte EncodePublishPacket(MqttPublishPacket packet, IMqttPacketWriter packetWriter)
- {
- if (packet == null) throw new ArgumentNullException(nameof(packet));
- if (packetWriter == null) throw new ArgumentNullException(nameof(packetWriter));
- if (packet.QualityOfServiceLevel == 0 && packet.Dup)
- {
- throw new MqttProtocolViolationException("Dup flag must be false for QoS 0 packets [MQTT-3.3.1-2].");
- }
- packetWriter.WriteWithLengthPrefix(packet.Topic);
- if (packet.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
- {
- if (!packet.PacketIdentifier.HasValue)
- {
- throw new MqttProtocolViolationException("Publish packet has no packet identifier.");
- }
- packetWriter.Write(packet.PacketIdentifier.Value);
- }
- else
- {
- if (packet.PacketIdentifier > 0)
- {
- throw new MqttProtocolViolationException("Packet identifier must be 0 if QoS == 0 [MQTT-2.3.1-5].");
- }
- }
- var propertiesWriter = new MqttV500PropertiesWriter();
- if (packet.Properties != null)
- {
- propertiesWriter.WritePayloadFormatIndicator(packet.Properties.PayloadFormatIndicator);
- propertiesWriter.WriteMessageExpiryInterval(packet.Properties.MessageExpiryInterval);
- propertiesWriter.WriteTopicAlias(packet.Properties.TopicAlias);
- propertiesWriter.WriteResponseTopic(packet.Properties.ResponseTopic);
- propertiesWriter.WriteCorrelationData(packet.Properties.CorrelationData);
- propertiesWriter.WriteSubscriptionIdentifiers(packet.Properties.SubscriptionIdentifiers);
- propertiesWriter.WriteContentType(packet.Properties.ContentType);
- propertiesWriter.WriteUserProperties(packet.Properties.UserProperties);
- }
- propertiesWriter.WriteTo(packetWriter);
- if (packet.Payload?.Length > 0)
- {
- packetWriter.Write(packet.Payload, 0, packet.Payload.Length);
- }
- byte fixedHeader = 0;
- if (packet.Retain)
- {
- fixedHeader |= 0x01;
- }
- fixedHeader |= (byte)((byte)packet.QualityOfServiceLevel << 1);
- if (packet.Dup)
- {
- fixedHeader |= 0x08;
- }
- return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Publish, fixedHeader);
- }
- private static byte EncodePubAckPacket(MqttPubAckPacket packet, IMqttPacketWriter packetWriter)
- {
- if (packet == null) throw new ArgumentNullException(nameof(packet));
- if (packetWriter == null) throw new ArgumentNullException(nameof(packetWriter));
- if (!packet.PacketIdentifier.HasValue)
- {
- throw new MqttProtocolViolationException("PubAck packet has no packet identifier.");
- }
- if (!packet.ReasonCode.HasValue)
- {
- throw new MqttProtocolViolationException("PubAck packet must contain a reason code.");
- }
- packetWriter.Write(packet.PacketIdentifier.Value);
-
- var propertiesWriter = new MqttV500PropertiesWriter();
- if (packet.Properties != null)
- {
- propertiesWriter.WriteReasonString(packet.Properties.ReasonString);
- propertiesWriter.WriteUserProperties(packet.Properties.UserProperties);
- }
- if (packetWriter.Length > 0 || packet.ReasonCode.Value != MqttPubAckReasonCode.Success)
- {
- packetWriter.Write((byte)packet.ReasonCode.Value);
- propertiesWriter.WriteTo(packetWriter);
- }
-
- return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubAck);
- }
- private static byte EncodePubRecPacket(MqttPubRecPacket packet, IMqttPacketWriter packetWriter)
- {
- ThrowIfPacketIdentifierIsInvalid(packet);
- if (!packet.ReasonCode.HasValue)
- {
- ThrowReasonCodeNotSetException();
- }
-
- var propertiesWriter = new MqttV500PropertiesWriter();
- if (packet.Properties != null)
- {
- propertiesWriter.WriteReasonString(packet.Properties.ReasonString);
- propertiesWriter.WriteUserProperties(packet.Properties.UserProperties);
- }
- packetWriter.Write(packet.PacketIdentifier.Value);
- if (packetWriter.Length > 0 || packet.ReasonCode.Value != MqttPubRecReasonCode.Success)
- {
- packetWriter.Write((byte)packet.ReasonCode.Value);
- propertiesWriter.WriteTo(packetWriter);
- }
- return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubRec);
- }
- private static byte EncodePubRelPacket(MqttPubRelPacket packet, IMqttPacketWriter packetWriter)
- {
- ThrowIfPacketIdentifierIsInvalid(packet);
- if (!packet.ReasonCode.HasValue)
- {
- ThrowReasonCodeNotSetException();
- }
- var propertiesWriter = new MqttV500PropertiesWriter();
- if (packet.Properties != null)
- {
- propertiesWriter.WriteReasonString(packet.Properties.ReasonString);
- propertiesWriter.WriteUserProperties(packet.Properties.UserProperties);
- }
- packetWriter.Write(packet.PacketIdentifier.Value);
-
- if (propertiesWriter.Length > 0 || packet.ReasonCode.Value != MqttPubRelReasonCode.Success)
- {
- packetWriter.Write((byte)packet.ReasonCode.Value);
- propertiesWriter.WriteTo(packetWriter);
- }
- return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubRel, 0x02);
- }
- private static byte EncodePubCompPacket(MqttPubCompPacket packet, IMqttPacketWriter packetWriter)
- {
- ThrowIfPacketIdentifierIsInvalid(packet);
- if (!packet.ReasonCode.HasValue)
- {
- ThrowReasonCodeNotSetException();
- }
- packetWriter.Write(packet.PacketIdentifier.Value);
-
- var propertiesWriter = new MqttV500PropertiesWriter();
- if (packet.Properties != null)
- {
- propertiesWriter.WriteReasonString(packet.Properties.ReasonString);
- propertiesWriter.WriteUserProperties(packet.Properties.UserProperties);
- }
- if (propertiesWriter.Length > 0 || packet.ReasonCode.Value != MqttPubCompReasonCode.Success)
- {
- packetWriter.Write((byte)packet.ReasonCode.Value);
- propertiesWriter.WriteTo(packetWriter);
- }
- return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubComp);
- }
- private static byte EncodeSubscribePacket(MqttSubscribePacket packet, IMqttPacketWriter packetWriter)
- {
- if (packet.TopicFilters?.Any() != true) throw new MqttProtocolViolationException("At least one topic filter must be set [MQTT-3.8.3-3].");
- ThrowIfPacketIdentifierIsInvalid(packet);
- packetWriter.Write(packet.PacketIdentifier.Value);
- var propertiesWriter = new MqttV500PropertiesWriter();
- if (packet.Properties != null)
- {
- propertiesWriter.WriteSubscriptionIdentifier(packet.Properties.SubscriptionIdentifier);
- propertiesWriter.WriteUserProperties(packet.Properties.UserProperties);
- }
- propertiesWriter.WriteTo(packetWriter);
- if (packet.TopicFilters?.Count > 0)
- {
- foreach (var topicFilter in packet.TopicFilters)
- {
- packetWriter.WriteWithLengthPrefix(topicFilter.Topic);
- var options = (byte)topicFilter.QualityOfServiceLevel;
- if (topicFilter.NoLocal == true)
- {
- options |= 1 << 2;
- }
- if (topicFilter.RetainAsPublished == true)
- {
- options |= 1 << 3;
- }
- if (topicFilter.RetainHandling.HasValue)
- {
- options |= (byte)((byte)topicFilter.RetainHandling.Value << 4);
- }
-
- packetWriter.Write(options);
- }
- }
- return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Subscribe, 0x02);
- }
- private static byte EncodeSubAckPacket(MqttSubAckPacket packet, IMqttPacketWriter packetWriter)
- {
- if (packet.ReasonCodes?.Any() != true) throw new MqttProtocolViolationException("At least one reason code must be set[MQTT - 3.8.3 - 3].");
- ThrowIfPacketIdentifierIsInvalid(packet);
- packetWriter.Write(packet.PacketIdentifier.Value);
- var propertiesWriter = new MqttV500PropertiesWriter();
- if (packet.Properties != null)
- {
- propertiesWriter.WriteReasonString(packet.Properties.ReasonString);
- propertiesWriter.WriteUserProperties(packet.Properties.UserProperties);
- }
- propertiesWriter.WriteTo(packetWriter);
- foreach (var reasonCode in packet.ReasonCodes)
- {
- packetWriter.Write((byte)reasonCode);
- }
- return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.SubAck);
- }
- private static byte EncodeUnsubscribePacket(MqttUnsubscribePacket packet, IMqttPacketWriter packetWriter)
- {
- if (packet.TopicFilters?.Any() != true) throw new MqttProtocolViolationException("At least one topic filter must be set [MQTT-3.10.3-2].");
- ThrowIfPacketIdentifierIsInvalid(packet);
- packetWriter.Write(packet.PacketIdentifier.Value);
- var propertiesWriter = new MqttV500PropertiesWriter();
- if (packet.Properties != null)
- {
- propertiesWriter.WriteUserProperties(packet.Properties.UserProperties);
- }
- propertiesWriter.WriteTo(packetWriter);
- foreach (var topicFilter in packet.TopicFilters)
- {
- packetWriter.WriteWithLengthPrefix(topicFilter);
- }
- return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Unsubscibe, 0x02);
- }
- private static byte EncodeUnsubAckPacket(MqttUnsubAckPacket packet, IMqttPacketWriter packetWriter)
- {
- if (packet.ReasonCodes?.Any() != true) throw new MqttProtocolViolationException("At least one reason code must be set[MQTT - 3.8.3 - 3].");
- ThrowIfPacketIdentifierIsInvalid(packet);
-
- packetWriter.Write(packet.PacketIdentifier.Value);
- var propertiesWriter = new MqttV500PropertiesWriter();
- if (packet.Properties != null)
- {
- propertiesWriter.WriteReasonString(packet.Properties.ReasonString);
- propertiesWriter.WriteUserProperties(packet.Properties.UserProperties);
- }
- propertiesWriter.WriteTo(packetWriter);
- foreach (var reasonCode in packet.ReasonCodes)
- {
- packetWriter.Write((byte)reasonCode);
- }
- return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.UnsubAck);
- }
- private static byte EncodeDisconnectPacket(MqttDisconnectPacket packet, IMqttPacketWriter packetWriter)
- {
- if (!packet.ReasonCode.HasValue)
- {
- ThrowReasonCodeNotSetException();
- }
- packetWriter.Write((byte)packet.ReasonCode.Value);
- var propertiesWriter = new MqttV500PropertiesWriter();
- if (packet.Properties != null)
- {
- propertiesWriter.WriteServerReference(packet.Properties.ServerReference);
- propertiesWriter.WriteReasonString(packet.Properties.ReasonString);
- propertiesWriter.WriteSessionExpiryInterval(packet.Properties.SessionExpiryInterval);
- propertiesWriter.WriteUserProperties(packet.Properties.UserProperties);
- }
- propertiesWriter.WriteTo(packetWriter);
- return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Disconnect);
- }
- private static byte EncodePingReqPacket()
- {
- return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PingReq);
- }
- private static byte EncodePingRespPacket()
- {
- return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PingResp);
- }
- private static byte EncodeAuthPacket(MqttAuthPacket packet, IMqttPacketWriter packetWriter)
- {
- packetWriter.Write((byte)packet.ReasonCode);
- var propertiesWriter = new MqttV500PropertiesWriter();
- if (packet.Properties != null)
- {
- propertiesWriter.WriteAuthenticationMethod(packet.Properties.AuthenticationMethod);
- propertiesWriter.WriteAuthenticationData(packet.Properties.AuthenticationData);
- propertiesWriter.WriteReasonString(packet.Properties.ReasonString);
- propertiesWriter.WriteUserProperties(packet.Properties.UserProperties);
- }
- propertiesWriter.WriteTo(packetWriter);
- return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Auth);
- }
- private static void ThrowReasonCodeNotSetException()
- {
- throw new MqttProtocolViolationException("The ReasonCode must be set for MQTT version 5.");
- }
- private static void ThrowIfPacketIdentifierIsInvalid(IMqttPacketWithIdentifier packet)
- {
- if (!packet.PacketIdentifier.HasValue)
- {
- throw new MqttProtocolViolationException($"Packet identifier is not set for {packet.GetType().Name}.");
- }
- }
- }
- }
|