| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895 |
- using System;
- using System.Collections.Generic;
- using MQTTnet.Adapter;
- using MQTTnet.Exceptions;
- using MQTTnet.Packets;
- using MQTTnet.Protocol;
- namespace MQTTnet.Formatter.V5
- {
- public class MqttV500PacketDecoder
- {
- private static readonly MqttPingReqPacket PingReqPacket = new MqttPingReqPacket();
- private static readonly MqttPingRespPacket PingRespPacket = new MqttPingRespPacket();
- public MqttBasePacket Decode(ReceivedMqttPacket receivedMqttPacket)
- {
- if (receivedMqttPacket == null) throw new ArgumentNullException(nameof(receivedMqttPacket));
- var controlPacketType = receivedMqttPacket.FixedHeader >> 4;
- if (controlPacketType < 1 || controlPacketType > 15)
- {
- throw new MqttProtocolViolationException($"The packet type is invalid ({controlPacketType}).");
- }
- switch ((MqttControlPacketType)controlPacketType)
- {
- case MqttControlPacketType.Connect: return DecodeConnectPacket(receivedMqttPacket.Body);
- case MqttControlPacketType.ConnAck: return DecodeConnAckPacket(receivedMqttPacket.Body);
- case MqttControlPacketType.Disconnect: return DecodeDisconnectPacket(receivedMqttPacket.Body);
- case MqttControlPacketType.Publish: return DecodePublishPacket(receivedMqttPacket.FixedHeader, receivedMqttPacket.Body);
- case MqttControlPacketType.PubAck: return DecodePubAckPacket(receivedMqttPacket.Body);
- case MqttControlPacketType.PubRec: return DecodePubRecPacket(receivedMqttPacket.Body);
- case MqttControlPacketType.PubRel: return DecodePubRelPacket(receivedMqttPacket.Body);
- case MqttControlPacketType.PubComp: return DecodePubCompPacket(receivedMqttPacket.Body);
- case MqttControlPacketType.PingReq: return DecodePingReqPacket();
- case MqttControlPacketType.PingResp: return DecodePingRespPacket();
- case MqttControlPacketType.Subscribe: return DecodeSubscribePacket(receivedMqttPacket.Body);
- case MqttControlPacketType.SubAck: return DecodeSubAckPacket(receivedMqttPacket.Body);
- case MqttControlPacketType.Unsubscibe: return DecodeUnsubscribePacket(receivedMqttPacket.Body);
- case MqttControlPacketType.UnsubAck: return DecodeUnsubAckPacket(receivedMqttPacket.Body);
- case MqttControlPacketType.Auth: return DecodeAuthPacket(receivedMqttPacket.Body);
- default: throw new MqttProtocolViolationException($"Packet type ({controlPacketType}) not supported.");
- }
- }
- private static MqttBasePacket DecodeConnectPacket(IMqttPacketBodyReader body)
- {
- ThrowIfBodyIsEmpty(body);
- var packet = new MqttConnectPacket();
- var protocolName = body.ReadStringWithLengthPrefix();
- var protocolVersion = body.ReadByte();
- if (protocolName != "MQTT" && protocolVersion != 5)
- {
- throw new MqttProtocolViolationException("MQTT protocol name and version do not match MQTT v5.");
- }
- var connectFlags = body.ReadByte();
- var cleanSessionFlag = (connectFlags & 0x02) > 0;
- var willMessageFlag = (connectFlags & 0x04) > 0;
- var willMessageQoS = (byte)(connectFlags >> 3 & 3);
- var willMessageRetainFlag = (connectFlags & 0x20) > 0;
- var passwordFlag = (connectFlags & 0x40) > 0;
- var usernameFlag = (connectFlags & 0x80) > 0;
- packet.CleanSession = cleanSessionFlag;
- if (willMessageFlag)
- {
- packet.WillMessage = new MqttApplicationMessage
- {
- QualityOfServiceLevel = (MqttQualityOfServiceLevel)willMessageQoS,
- Retain = willMessageRetainFlag
- };
- }
- packet.KeepAlivePeriod = body.ReadTwoByteInteger();
- var propertiesReader = new MqttV500PropertiesReader(body);
- while (propertiesReader.MoveNext())
- {
- if (packet.Properties == null)
- {
- packet.Properties = new MqttConnectPacketProperties();
- }
- if (propertiesReader.CurrentPropertyId == MqttPropertyId.SessionExpiryInterval)
- {
- packet.Properties.SessionExpiryInterval = propertiesReader.ReadSessionExpiryInterval();
- }
- else if (propertiesReader.CurrentPropertyId == MqttPropertyId.AuthenticationMethod)
- {
- packet.Properties.AuthenticationMethod = propertiesReader.ReadAuthenticationMethod();
- }
- else if (propertiesReader.CurrentPropertyId == MqttPropertyId.AuthenticationData)
- {
- packet.Properties.AuthenticationData = propertiesReader.ReadAuthenticationData();
- }
- else if (propertiesReader.CurrentPropertyId == MqttPropertyId.ReceiveMaximum)
- {
- packet.Properties.ReceiveMaximum = propertiesReader.ReadReceiveMaximum();
- }
- else if (propertiesReader.CurrentPropertyId == MqttPropertyId.TopicAliasMaximum)
- {
- packet.Properties.TopicAliasMaximum = propertiesReader.ReadTopicAliasMaximum();
- }
- else if (propertiesReader.CurrentPropertyId == MqttPropertyId.MaximumPacketSize)
- {
- packet.Properties.MaximumPacketSize = propertiesReader.ReadMaximumPacketSize();
- }
- else if (propertiesReader.CurrentPropertyId == MqttPropertyId.RequestResponseInformation)
- {
- packet.Properties.RequestResponseInformation = propertiesReader.RequestResponseInformation();
- }
- else if (propertiesReader.CurrentPropertyId == MqttPropertyId.RequestProblemInformation)
- {
- packet.Properties.RequestProblemInformation = propertiesReader.RequestProblemInformation();
- }
- else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
- {
- if (packet.Properties.UserProperties == null)
- {
- packet.Properties.UserProperties = new List<MqttUserProperty>();
- }
- propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
- }
- else
- {
- propertiesReader.ThrowInvalidPropertyIdException(typeof(MqttConnectPacket));
- }
- }
- packet.ClientId = body.ReadStringWithLengthPrefix();
- if (packet.WillMessage != null)
- {
- var willPropertiesReader = new MqttV500PropertiesReader(body);
- while (willPropertiesReader.MoveNext())
- {
- if (willPropertiesReader.CurrentPropertyId == MqttPropertyId.PayloadFormatIndicator)
- {
- packet.WillMessage.PayloadFormatIndicator = propertiesReader.ReadPayloadFormatIndicator();
- }
- else if (willPropertiesReader.CurrentPropertyId == MqttPropertyId.MessageExpiryInterval)
- {
- packet.WillMessage.MessageExpiryInterval = propertiesReader.ReadMessageExpiryInterval();
- }
- else if (willPropertiesReader.CurrentPropertyId == MqttPropertyId.TopicAlias)
- {
- packet.WillMessage.TopicAlias = propertiesReader.ReadTopicAlias();
- }
- else if (willPropertiesReader.CurrentPropertyId == MqttPropertyId.ResponseTopic)
- {
- packet.WillMessage.ResponseTopic = propertiesReader.ReadResponseTopic();
- }
- else if (willPropertiesReader.CurrentPropertyId == MqttPropertyId.CorrelationData)
- {
- packet.WillMessage.CorrelationData = propertiesReader.ReadCorrelationData();
- }
- else if (willPropertiesReader.CurrentPropertyId == MqttPropertyId.SubscriptionIdentifier)
- {
- if (packet.WillMessage.SubscriptionIdentifiers == null)
- {
- packet.WillMessage.SubscriptionIdentifiers = new List<uint>();
- }
- packet.WillMessage.SubscriptionIdentifiers.Add(propertiesReader.ReadSubscriptionIdentifier());
- }
- else if (willPropertiesReader.CurrentPropertyId == MqttPropertyId.ContentType)
- {
- packet.WillMessage.ContentType = propertiesReader.ReadContentType();
- }
- else if (willPropertiesReader.CurrentPropertyId == MqttPropertyId.WillDelayInterval)
- {
- // This is a special case!
- packet.Properties.WillDelayInterval = propertiesReader.ReadWillDelayInterval();
- }
- else if (willPropertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
- {
- if (packet.WillMessage.UserProperties == null)
- {
- packet.WillMessage.UserProperties = new List<MqttUserProperty>();
- }
- propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
- }
- else
- {
- propertiesReader.ThrowInvalidPropertyIdException(typeof(MqttPublishPacket));
- }
- }
- packet.WillMessage.Topic = body.ReadStringWithLengthPrefix();
- packet.WillMessage.Payload = body.ReadWithLengthPrefix();
- }
- if (usernameFlag)
- {
- packet.Username = body.ReadStringWithLengthPrefix();
- }
- if (passwordFlag)
- {
- packet.Password = body.ReadWithLengthPrefix();
- }
- return packet;
- }
- private static MqttBasePacket DecodeConnAckPacket(IMqttPacketBodyReader body)
- {
- ThrowIfBodyIsEmpty(body);
- var acknowledgeFlags = body.ReadByte();
- var packet = new MqttConnAckPacket
- {
- IsSessionPresent = (acknowledgeFlags & 0x1) > 0,
- ReasonCode = (MqttConnectReasonCode)body.ReadByte()
- };
- var propertiesReader = new MqttV500PropertiesReader(body);
- while (propertiesReader.MoveNext())
- {
- if (packet.Properties == null)
- {
- packet.Properties = new MqttConnAckPacketProperties();
- }
- if (propertiesReader.CurrentPropertyId == MqttPropertyId.SessionExpiryInterval)
- {
- packet.Properties.SessionExpiryInterval = propertiesReader.ReadSessionExpiryInterval();
- }
- else if (propertiesReader.CurrentPropertyId == MqttPropertyId.AuthenticationMethod)
- {
- packet.Properties.AuthenticationMethod = propertiesReader.ReadAuthenticationMethod();
- }
- else if (propertiesReader.CurrentPropertyId == MqttPropertyId.AuthenticationData)
- {
- packet.Properties.AuthenticationData = propertiesReader.ReadAuthenticationData();
- }
- else if (propertiesReader.CurrentPropertyId == MqttPropertyId.RetainAvailable)
- {
- packet.Properties.RetainAvailable = propertiesReader.ReadRetainAvailable();
- }
- else if (propertiesReader.CurrentPropertyId == MqttPropertyId.ReceiveMaximum)
- {
- packet.Properties.ReceiveMaximum = propertiesReader.ReadReceiveMaximum();
- }
- else if (propertiesReader.CurrentPropertyId == MqttPropertyId.AssignedClientIdentifier)
- {
- packet.Properties.AssignedClientIdentifier = propertiesReader.ReadAssignedClientIdentifier();
- }
- else if (propertiesReader.CurrentPropertyId == MqttPropertyId.TopicAliasMaximum)
- {
- packet.Properties.TopicAliasMaximum = propertiesReader.ReadTopicAliasMaximum();
- }
- else if (propertiesReader.CurrentPropertyId == MqttPropertyId.ReasonString)
- {
- packet.Properties.ReasonString = propertiesReader.ReadReasonString();
- }
- else if (propertiesReader.CurrentPropertyId == MqttPropertyId.MaximumPacketSize)
- {
- packet.Properties.MaximumPacketSize = propertiesReader.ReadMaximumPacketSize();
- }
- else if (propertiesReader.CurrentPropertyId == MqttPropertyId.WildcardSubscriptionAvailable)
- {
- packet.Properties.WildcardSubscriptionAvailable = propertiesReader.ReadWildcardSubscriptionAvailable();
- }
- else if (propertiesReader.CurrentPropertyId == MqttPropertyId.SubscriptionIdentifiersAvailable)
- {
- packet.Properties.SubscriptionIdentifiersAvailable = propertiesReader.ReadSubscriptionIdentifiersAvailable();
- }
- else if (propertiesReader.CurrentPropertyId == MqttPropertyId.SharedSubscriptionAvailable)
- {
- packet.Properties.SharedSubscriptionAvailable = propertiesReader.ReadSharedSubscriptionAvailable();
- }
- else if (propertiesReader.CurrentPropertyId == MqttPropertyId.ServerKeepAlive)
- {
- packet.Properties.ServerKeepAlive = propertiesReader.ReadServerKeepAlive();
- }
- else if (propertiesReader.CurrentPropertyId == MqttPropertyId.ResponseInformation)
- {
- packet.Properties.ResponseInformation = propertiesReader.ReadResponseInformation();
- }
- else if (propertiesReader.CurrentPropertyId == MqttPropertyId.ServerReference)
- {
- packet.Properties.ServerReference = propertiesReader.ReadServerReference();
- }
- else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
- {
- if (packet.Properties.UserProperties == null)
- {
- packet.Properties.UserProperties = new List<MqttUserProperty>();
- }
- propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
- }
- else
- {
- propertiesReader.ThrowInvalidPropertyIdException(typeof(MqttConnAckPacket));
- }
- }
- return packet;
- }
- private static MqttBasePacket DecodeDisconnectPacket(IMqttPacketBodyReader body)
- {
- ThrowIfBodyIsEmpty(body);
- var packet = new MqttDisconnectPacket
- {
- ReasonCode = (MqttDisconnectReasonCode)body.ReadByte()
- };
- var propertiesReader = new MqttV500PropertiesReader(body);
- while (propertiesReader.MoveNext())
- {
- if (packet.Properties == null)
- {
- packet.Properties = new MqttDisconnectPacketProperties();
- }
- if (propertiesReader.CurrentPropertyId == MqttPropertyId.SessionExpiryInterval)
- {
- packet.Properties.SessionExpiryInterval = propertiesReader.ReadSessionExpiryInterval();
- }
- else if (propertiesReader.CurrentPropertyId == MqttPropertyId.ReasonString)
- {
- packet.Properties.ReasonString = propertiesReader.ReadReasonString();
- }
- else if (propertiesReader.CurrentPropertyId == MqttPropertyId.ServerReference)
- {
- packet.Properties.ServerReference = propertiesReader.ReadServerReference();
- }
- else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
- {
- if (packet.Properties.UserProperties == null)
- {
- packet.Properties.UserProperties = new List<MqttUserProperty>();
- }
- propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
- }
- else
- {
- propertiesReader.ThrowInvalidPropertyIdException(typeof(MqttDisconnectPacket));
- }
- }
- return packet;
- }
- private static MqttBasePacket DecodeSubscribePacket(IMqttPacketBodyReader body)
- {
- ThrowIfBodyIsEmpty(body);
- var packet = new MqttSubscribePacket
- {
- PacketIdentifier = body.ReadTwoByteInteger()
- };
- var propertiesReader = new MqttV500PropertiesReader(body);
- while (propertiesReader.MoveNext())
- {
- if (packet.Properties == null)
- {
- packet.Properties = new MqttSubscribePacketProperties();
- }
- if (propertiesReader.CurrentPropertyId == MqttPropertyId.SubscriptionIdentifier)
- {
- packet.Properties.SubscriptionIdentifier = propertiesReader.ReadSubscriptionIdentifier();
- }
- else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
- {
- if (packet.Properties.UserProperties == null)
- {
- packet.Properties.UserProperties = new List<MqttUserProperty>();
- }
- propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
- }
- else
- {
- propertiesReader.ThrowInvalidPropertyIdException(typeof(MqttSubscribePacket));
- }
- }
- while (!body.EndOfStream)
- {
- var topic = body.ReadStringWithLengthPrefix();
- var options = body.ReadByte();
- var qos = (MqttQualityOfServiceLevel)(options & 3);
- var noLocal = (options & (1 << 2)) > 0;
- var retainAsPublished = (options & (1 << 3)) > 0;
- var retainHandling = (MqttRetainHandling)((options >> 4) & 3);
- packet.TopicFilters.Add(new MqttTopicFilter
- {
- Topic = topic,
- QualityOfServiceLevel = qos,
- NoLocal = noLocal,
- RetainAsPublished = retainAsPublished,
- RetainHandling = retainHandling
- });
- }
- return packet;
- }
- private static MqttBasePacket DecodeSubAckPacket(IMqttPacketBodyReader body)
- {
- ThrowIfBodyIsEmpty(body);
- var packet = new MqttSubAckPacket
- {
- PacketIdentifier = body.ReadTwoByteInteger()
- };
- var propertiesReader = new MqttV500PropertiesReader(body);
- while (propertiesReader.MoveNext())
- {
- if (packet.Properties == null)
- {
- packet.Properties = new MqttSubAckPacketProperties();
- }
- if (propertiesReader.CurrentPropertyId == MqttPropertyId.ReasonString)
- {
- packet.Properties.ReasonString = propertiesReader.ReadReasonString();
- }
- else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
- {
- if (packet.Properties.UserProperties == null)
- {
- packet.Properties.UserProperties = new List<MqttUserProperty>();
- }
- propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
- }
- else
- {
- propertiesReader.ThrowInvalidPropertyIdException(typeof(MqttSubAckPacket));
- }
- }
- while (!body.EndOfStream)
- {
- var reasonCode = (MqttSubscribeReasonCode)body.ReadByte();
- packet.ReasonCodes.Add(reasonCode);
- }
- return packet;
- }
- private static MqttBasePacket DecodeUnsubscribePacket(IMqttPacketBodyReader body)
- {
- ThrowIfBodyIsEmpty(body);
- var packet = new MqttUnsubscribePacket
- {
- PacketIdentifier = body.ReadTwoByteInteger()
- };
- var propertiesReader = new MqttV500PropertiesReader(body);
- while (propertiesReader.MoveNext())
- {
- if (packet.Properties == null)
- {
- packet.Properties = new MqttUnsubscribePacketProperties();
- }
- if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
- {
- if (packet.Properties.UserProperties == null)
- {
- packet.Properties.UserProperties = new List<MqttUserProperty>();
- }
- propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
- }
- else
- {
- propertiesReader.ThrowInvalidPropertyIdException(typeof(MqttUnsubscribePacket));
- }
- }
- while (!body.EndOfStream)
- {
- packet.TopicFilters.Add(body.ReadStringWithLengthPrefix());
- }
- return packet;
- }
- private static MqttBasePacket DecodeUnsubAckPacket(IMqttPacketBodyReader body)
- {
- ThrowIfBodyIsEmpty(body);
- var packet = new MqttUnsubAckPacket
- {
- PacketIdentifier = body.ReadTwoByteInteger()
- };
- var propertiesReader = new MqttV500PropertiesReader(body);
- while (propertiesReader.MoveNext())
- {
- if (packet.Properties == null)
- {
- packet.Properties = new MqttUnsubAckPacketProperties();
- }
- if (propertiesReader.CurrentPropertyId == MqttPropertyId.ReasonString)
- {
- packet.Properties.ReasonString = propertiesReader.ReadReasonString();
- }
- else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
- {
- if (packet.Properties.UserProperties == null)
- {
- packet.Properties.UserProperties = new List<MqttUserProperty>();
- }
- propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
- }
- else
- {
- propertiesReader.ThrowInvalidPropertyIdException(typeof(MqttUnsubAckPacket));
- }
- }
- while (!body.EndOfStream)
- {
- var reasonCode = (MqttUnsubscribeReasonCode)body.ReadByte();
- packet.ReasonCodes.Add(reasonCode);
- }
- return packet;
- }
- private static MqttBasePacket DecodePingReqPacket()
- {
- return PingReqPacket;
- }
- private static MqttBasePacket DecodePingRespPacket()
- {
- return PingRespPacket;
- }
- private static MqttBasePacket DecodePublishPacket(byte header, IMqttPacketBodyReader body)
- {
- ThrowIfBodyIsEmpty(body);
- var retain = (header & 1) > 0;
- var qos = (MqttQualityOfServiceLevel)(header >> 1 & 3);
- var dup = (header >> 3 & 1) > 0;
- var packet = new MqttPublishPacket
- {
- Topic = body.ReadStringWithLengthPrefix(),
- Retain = retain,
- QualityOfServiceLevel = qos,
- Dup = dup
- };
- if (qos > 0)
- {
- packet.PacketIdentifier = body.ReadTwoByteInteger();
- }
- var propertiesReader = new MqttV500PropertiesReader(body);
- while (propertiesReader.MoveNext())
- {
- if (packet.Properties == null)
- {
- packet.Properties = new MqttPublishPacketProperties();
- }
- if (propertiesReader.CurrentPropertyId == MqttPropertyId.PayloadFormatIndicator)
- {
- packet.Properties.PayloadFormatIndicator = propertiesReader.ReadPayloadFormatIndicator();
- }
- else if (propertiesReader.CurrentPropertyId == MqttPropertyId.MessageExpiryInterval)
- {
- packet.Properties.MessageExpiryInterval = propertiesReader.ReadMessageExpiryInterval();
- }
- else if (propertiesReader.CurrentPropertyId == MqttPropertyId.TopicAlias)
- {
- packet.Properties.TopicAlias = propertiesReader.ReadTopicAlias();
- }
- else if (propertiesReader.CurrentPropertyId == MqttPropertyId.ResponseTopic)
- {
- packet.Properties.ResponseTopic = propertiesReader.ReadResponseTopic();
- }
- else if (propertiesReader.CurrentPropertyId == MqttPropertyId.CorrelationData)
- {
- packet.Properties.CorrelationData = propertiesReader.ReadCorrelationData();
- }
- else if (propertiesReader.CurrentPropertyId == MqttPropertyId.SubscriptionIdentifier)
- {
- if (packet.Properties.SubscriptionIdentifiers == null)
- {
- packet.Properties.SubscriptionIdentifiers = new List<uint>();
- }
- packet.Properties.SubscriptionIdentifiers.Add(propertiesReader.ReadSubscriptionIdentifier());
- }
- else if (propertiesReader.CurrentPropertyId == MqttPropertyId.ContentType)
- {
- packet.Properties.ContentType = propertiesReader.ReadContentType();
- }
- else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
- {
- if (packet.Properties.UserProperties == null)
- {
- packet.Properties.UserProperties = new List<MqttUserProperty>();
- }
- propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
- }
- else
- {
- propertiesReader.ThrowInvalidPropertyIdException(typeof(MqttPublishPacket));
- }
- }
- if (!body.EndOfStream)
- {
- packet.Payload = body.ReadRemainingData();
- }
- return packet;
- }
- private static MqttBasePacket DecodePubAckPacket(IMqttPacketBodyReader body)
- {
- ThrowIfBodyIsEmpty(body);
- var packet = new MqttPubAckPacket
- {
- PacketIdentifier = body.ReadTwoByteInteger()
- };
- if (body.EndOfStream)
- {
- packet.ReasonCode = MqttPubAckReasonCode.Success;
- return packet;
- }
- packet.ReasonCode = (MqttPubAckReasonCode)body.ReadByte();
- var propertiesReader = new MqttV500PropertiesReader(body);
- while (propertiesReader.MoveNext())
- {
- if (packet.Properties == null)
- {
- packet.Properties = new MqttPubAckPacketProperties();
- }
- if (propertiesReader.CurrentPropertyId == MqttPropertyId.ReasonString)
- {
- packet.Properties.ReasonString = propertiesReader.ReadReasonString();
- }
- else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
- {
- if (packet.Properties.UserProperties == null)
- {
- packet.Properties.UserProperties = new List<MqttUserProperty>();
- }
- propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
- }
- else
- {
- propertiesReader.ThrowInvalidPropertyIdException(typeof(MqttPubAckPacket));
- }
- }
- return packet;
- }
- private static MqttBasePacket DecodePubRecPacket(IMqttPacketBodyReader body)
- {
- ThrowIfBodyIsEmpty(body);
- var packet = new MqttPubRecPacket
- {
- PacketIdentifier = body.ReadTwoByteInteger()
- };
- if (body.EndOfStream)
- {
- packet.ReasonCode = MqttPubRecReasonCode.Success;
- return packet;
- }
- packet.ReasonCode = (MqttPubRecReasonCode)body.ReadByte();
- var propertiesReader = new MqttV500PropertiesReader(body);
- while (propertiesReader.MoveNext())
- {
- if (packet.Properties == null)
- {
- packet.Properties = new MqttPubRecPacketProperties();
- }
- if (propertiesReader.CurrentPropertyId == MqttPropertyId.ReasonString)
- {
- packet.Properties.ReasonString = propertiesReader.ReadReasonString();
- }
- else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
- {
- if (packet.Properties.UserProperties == null)
- {
- packet.Properties.UserProperties = new List<MqttUserProperty>();
- }
- propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
- }
- else
- {
- propertiesReader.ThrowInvalidPropertyIdException(typeof(MqttPubRecPacket));
- }
- }
- return packet;
- }
- private static MqttBasePacket DecodePubRelPacket(IMqttPacketBodyReader body)
- {
- ThrowIfBodyIsEmpty(body);
- var packet = new MqttPubRelPacket
- {
- PacketIdentifier = body.ReadTwoByteInteger()
- };
- if (body.EndOfStream)
- {
- packet.ReasonCode = MqttPubRelReasonCode.Success;
- return packet;
- }
- packet.ReasonCode = (MqttPubRelReasonCode)body.ReadByte();
- var propertiesReader = new MqttV500PropertiesReader(body);
- while (propertiesReader.MoveNext())
- {
- if (packet.Properties == null)
- {
- packet.Properties = new MqttPubRelPacketProperties();
- }
- if (propertiesReader.CurrentPropertyId == MqttPropertyId.ReasonString)
- {
- packet.Properties.ReasonString = propertiesReader.ReadReasonString();
- }
- else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
- {
- if (packet.Properties.UserProperties == null)
- {
- packet.Properties.UserProperties = new List<MqttUserProperty>();
- }
- propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
- }
- else
- {
- propertiesReader.ThrowInvalidPropertyIdException(typeof(MqttPubRelPacket));
- }
- }
- return packet;
- }
- private static MqttBasePacket DecodePubCompPacket(IMqttPacketBodyReader body)
- {
- ThrowIfBodyIsEmpty(body);
- var packet = new MqttPubCompPacket
- {
- PacketIdentifier = body.ReadTwoByteInteger()
- };
- if (body.EndOfStream)
- {
- packet.ReasonCode = MqttPubCompReasonCode.Success;
- return packet;
- }
- packet.ReasonCode = (MqttPubCompReasonCode)body.ReadByte();
- var propertiesReader = new MqttV500PropertiesReader(body);
- while (propertiesReader.MoveNext())
- {
- if (packet.Properties == null)
- {
- packet.Properties = new MqttPubCompPacketProperties();
- }
- if (propertiesReader.CurrentPropertyId == MqttPropertyId.ReasonString)
- {
- packet.Properties.ReasonString = propertiesReader.ReadReasonString();
- }
- else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
- {
- if (packet.Properties.UserProperties == null)
- {
- packet.Properties.UserProperties = new List<MqttUserProperty>();
- }
- propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
- }
- else
- {
- propertiesReader.ThrowInvalidPropertyIdException(typeof(MqttPubCompPacket));
- }
- }
- return packet;
- }
- private static MqttBasePacket DecodeAuthPacket(IMqttPacketBodyReader body)
- {
- ThrowIfBodyIsEmpty(body);
- var packet = new MqttAuthPacket();
- if (body.EndOfStream)
- {
- packet.ReasonCode = MqttAuthenticateReasonCode.Success;
- return packet;
- }
- packet.ReasonCode = (MqttAuthenticateReasonCode)body.ReadByte();
- var propertiesReader = new MqttV500PropertiesReader(body);
- while (propertiesReader.MoveNext())
- {
- if (packet.Properties == null)
- {
- packet.Properties = new MqttAuthPacketProperties();
- }
- if (propertiesReader.CurrentPropertyId == MqttPropertyId.AuthenticationMethod)
- {
- packet.Properties.AuthenticationMethod = propertiesReader.ReadAuthenticationMethod();
- }
- else if (propertiesReader.CurrentPropertyId == MqttPropertyId.AuthenticationData)
- {
- packet.Properties.AuthenticationData = propertiesReader.ReadAuthenticationData();
- }
- else if (propertiesReader.CurrentPropertyId == MqttPropertyId.ReasonString)
- {
- packet.Properties.ReasonString = propertiesReader.ReadReasonString();
- }
- else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
- {
- if (packet.Properties.UserProperties == null)
- {
- packet.Properties.UserProperties = new List<MqttUserProperty>();
- }
- propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
- }
- else
- {
- propertiesReader.ThrowInvalidPropertyIdException(typeof(MqttAuthPacket));
- }
- }
- return packet;
- }
- // ReSharper disable once ParameterOnlyUsedForPreconditionCheck.Local
- private static void ThrowIfBodyIsEmpty(IMqttPacketBodyReader body)
- {
- if (body == null || body.Length == 0)
- {
- throw new MqttProtocolViolationException("Data from the body is required but not present.");
- }
- }
- }
- }
|