MqttV500PacketDecoder.cs 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895
  1. using System;
  2. using System.Collections.Generic;
  3. using MQTTnet.Adapter;
  4. using MQTTnet.Exceptions;
  5. using MQTTnet.Packets;
  6. using MQTTnet.Protocol;
  7. namespace MQTTnet.Formatter.V5
  8. {
  9. public class MqttV500PacketDecoder
  10. {
  11. private static readonly MqttPingReqPacket PingReqPacket = new MqttPingReqPacket();
  12. private static readonly MqttPingRespPacket PingRespPacket = new MqttPingRespPacket();
  13. public MqttBasePacket Decode(ReceivedMqttPacket receivedMqttPacket)
  14. {
  15. if (receivedMqttPacket == null) throw new ArgumentNullException(nameof(receivedMqttPacket));
  16. var controlPacketType = receivedMqttPacket.FixedHeader >> 4;
  17. if (controlPacketType < 1 || controlPacketType > 15)
  18. {
  19. throw new MqttProtocolViolationException($"The packet type is invalid ({controlPacketType}).");
  20. }
  21. switch ((MqttControlPacketType)controlPacketType)
  22. {
  23. case MqttControlPacketType.Connect: return DecodeConnectPacket(receivedMqttPacket.Body);
  24. case MqttControlPacketType.ConnAck: return DecodeConnAckPacket(receivedMqttPacket.Body);
  25. case MqttControlPacketType.Disconnect: return DecodeDisconnectPacket(receivedMqttPacket.Body);
  26. case MqttControlPacketType.Publish: return DecodePublishPacket(receivedMqttPacket.FixedHeader, receivedMqttPacket.Body);
  27. case MqttControlPacketType.PubAck: return DecodePubAckPacket(receivedMqttPacket.Body);
  28. case MqttControlPacketType.PubRec: return DecodePubRecPacket(receivedMqttPacket.Body);
  29. case MqttControlPacketType.PubRel: return DecodePubRelPacket(receivedMqttPacket.Body);
  30. case MqttControlPacketType.PubComp: return DecodePubCompPacket(receivedMqttPacket.Body);
  31. case MqttControlPacketType.PingReq: return DecodePingReqPacket();
  32. case MqttControlPacketType.PingResp: return DecodePingRespPacket();
  33. case MqttControlPacketType.Subscribe: return DecodeSubscribePacket(receivedMqttPacket.Body);
  34. case MqttControlPacketType.SubAck: return DecodeSubAckPacket(receivedMqttPacket.Body);
  35. case MqttControlPacketType.Unsubscibe: return DecodeUnsubscribePacket(receivedMqttPacket.Body);
  36. case MqttControlPacketType.UnsubAck: return DecodeUnsubAckPacket(receivedMqttPacket.Body);
  37. case MqttControlPacketType.Auth: return DecodeAuthPacket(receivedMqttPacket.Body);
  38. default: throw new MqttProtocolViolationException($"Packet type ({controlPacketType}) not supported.");
  39. }
  40. }
  41. private static MqttBasePacket DecodeConnectPacket(IMqttPacketBodyReader body)
  42. {
  43. ThrowIfBodyIsEmpty(body);
  44. var packet = new MqttConnectPacket();
  45. var protocolName = body.ReadStringWithLengthPrefix();
  46. var protocolVersion = body.ReadByte();
  47. if (protocolName != "MQTT" && protocolVersion != 5)
  48. {
  49. throw new MqttProtocolViolationException("MQTT protocol name and version do not match MQTT v5.");
  50. }
  51. var connectFlags = body.ReadByte();
  52. var cleanSessionFlag = (connectFlags & 0x02) > 0;
  53. var willMessageFlag = (connectFlags & 0x04) > 0;
  54. var willMessageQoS = (byte)(connectFlags >> 3 & 3);
  55. var willMessageRetainFlag = (connectFlags & 0x20) > 0;
  56. var passwordFlag = (connectFlags & 0x40) > 0;
  57. var usernameFlag = (connectFlags & 0x80) > 0;
  58. packet.CleanSession = cleanSessionFlag;
  59. if (willMessageFlag)
  60. {
  61. packet.WillMessage = new MqttApplicationMessage
  62. {
  63. QualityOfServiceLevel = (MqttQualityOfServiceLevel)willMessageQoS,
  64. Retain = willMessageRetainFlag
  65. };
  66. }
  67. packet.KeepAlivePeriod = body.ReadTwoByteInteger();
  68. var propertiesReader = new MqttV500PropertiesReader(body);
  69. while (propertiesReader.MoveNext())
  70. {
  71. if (packet.Properties == null)
  72. {
  73. packet.Properties = new MqttConnectPacketProperties();
  74. }
  75. if (propertiesReader.CurrentPropertyId == MqttPropertyId.SessionExpiryInterval)
  76. {
  77. packet.Properties.SessionExpiryInterval = propertiesReader.ReadSessionExpiryInterval();
  78. }
  79. else if (propertiesReader.CurrentPropertyId == MqttPropertyId.AuthenticationMethod)
  80. {
  81. packet.Properties.AuthenticationMethod = propertiesReader.ReadAuthenticationMethod();
  82. }
  83. else if (propertiesReader.CurrentPropertyId == MqttPropertyId.AuthenticationData)
  84. {
  85. packet.Properties.AuthenticationData = propertiesReader.ReadAuthenticationData();
  86. }
  87. else if (propertiesReader.CurrentPropertyId == MqttPropertyId.ReceiveMaximum)
  88. {
  89. packet.Properties.ReceiveMaximum = propertiesReader.ReadReceiveMaximum();
  90. }
  91. else if (propertiesReader.CurrentPropertyId == MqttPropertyId.TopicAliasMaximum)
  92. {
  93. packet.Properties.TopicAliasMaximum = propertiesReader.ReadTopicAliasMaximum();
  94. }
  95. else if (propertiesReader.CurrentPropertyId == MqttPropertyId.MaximumPacketSize)
  96. {
  97. packet.Properties.MaximumPacketSize = propertiesReader.ReadMaximumPacketSize();
  98. }
  99. else if (propertiesReader.CurrentPropertyId == MqttPropertyId.RequestResponseInformation)
  100. {
  101. packet.Properties.RequestResponseInformation = propertiesReader.RequestResponseInformation();
  102. }
  103. else if (propertiesReader.CurrentPropertyId == MqttPropertyId.RequestProblemInformation)
  104. {
  105. packet.Properties.RequestProblemInformation = propertiesReader.RequestProblemInformation();
  106. }
  107. else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
  108. {
  109. if (packet.Properties.UserProperties == null)
  110. {
  111. packet.Properties.UserProperties = new List<MqttUserProperty>();
  112. }
  113. propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
  114. }
  115. else
  116. {
  117. propertiesReader.ThrowInvalidPropertyIdException(typeof(MqttConnectPacket));
  118. }
  119. }
  120. packet.ClientId = body.ReadStringWithLengthPrefix();
  121. if (packet.WillMessage != null)
  122. {
  123. var willPropertiesReader = new MqttV500PropertiesReader(body);
  124. while (willPropertiesReader.MoveNext())
  125. {
  126. if (willPropertiesReader.CurrentPropertyId == MqttPropertyId.PayloadFormatIndicator)
  127. {
  128. packet.WillMessage.PayloadFormatIndicator = propertiesReader.ReadPayloadFormatIndicator();
  129. }
  130. else if (willPropertiesReader.CurrentPropertyId == MqttPropertyId.MessageExpiryInterval)
  131. {
  132. packet.WillMessage.MessageExpiryInterval = propertiesReader.ReadMessageExpiryInterval();
  133. }
  134. else if (willPropertiesReader.CurrentPropertyId == MqttPropertyId.TopicAlias)
  135. {
  136. packet.WillMessage.TopicAlias = propertiesReader.ReadTopicAlias();
  137. }
  138. else if (willPropertiesReader.CurrentPropertyId == MqttPropertyId.ResponseTopic)
  139. {
  140. packet.WillMessage.ResponseTopic = propertiesReader.ReadResponseTopic();
  141. }
  142. else if (willPropertiesReader.CurrentPropertyId == MqttPropertyId.CorrelationData)
  143. {
  144. packet.WillMessage.CorrelationData = propertiesReader.ReadCorrelationData();
  145. }
  146. else if (willPropertiesReader.CurrentPropertyId == MqttPropertyId.SubscriptionIdentifier)
  147. {
  148. if (packet.WillMessage.SubscriptionIdentifiers == null)
  149. {
  150. packet.WillMessage.SubscriptionIdentifiers = new List<uint>();
  151. }
  152. packet.WillMessage.SubscriptionIdentifiers.Add(propertiesReader.ReadSubscriptionIdentifier());
  153. }
  154. else if (willPropertiesReader.CurrentPropertyId == MqttPropertyId.ContentType)
  155. {
  156. packet.WillMessage.ContentType = propertiesReader.ReadContentType();
  157. }
  158. else if (willPropertiesReader.CurrentPropertyId == MqttPropertyId.WillDelayInterval)
  159. {
  160. // This is a special case!
  161. packet.Properties.WillDelayInterval = propertiesReader.ReadWillDelayInterval();
  162. }
  163. else if (willPropertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
  164. {
  165. if (packet.WillMessage.UserProperties == null)
  166. {
  167. packet.WillMessage.UserProperties = new List<MqttUserProperty>();
  168. }
  169. propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
  170. }
  171. else
  172. {
  173. propertiesReader.ThrowInvalidPropertyIdException(typeof(MqttPublishPacket));
  174. }
  175. }
  176. packet.WillMessage.Topic = body.ReadStringWithLengthPrefix();
  177. packet.WillMessage.Payload = body.ReadWithLengthPrefix();
  178. }
  179. if (usernameFlag)
  180. {
  181. packet.Username = body.ReadStringWithLengthPrefix();
  182. }
  183. if (passwordFlag)
  184. {
  185. packet.Password = body.ReadWithLengthPrefix();
  186. }
  187. return packet;
  188. }
  189. private static MqttBasePacket DecodeConnAckPacket(IMqttPacketBodyReader body)
  190. {
  191. ThrowIfBodyIsEmpty(body);
  192. var acknowledgeFlags = body.ReadByte();
  193. var packet = new MqttConnAckPacket
  194. {
  195. IsSessionPresent = (acknowledgeFlags & 0x1) > 0,
  196. ReasonCode = (MqttConnectReasonCode)body.ReadByte()
  197. };
  198. var propertiesReader = new MqttV500PropertiesReader(body);
  199. while (propertiesReader.MoveNext())
  200. {
  201. if (packet.Properties == null)
  202. {
  203. packet.Properties = new MqttConnAckPacketProperties();
  204. }
  205. if (propertiesReader.CurrentPropertyId == MqttPropertyId.SessionExpiryInterval)
  206. {
  207. packet.Properties.SessionExpiryInterval = propertiesReader.ReadSessionExpiryInterval();
  208. }
  209. else if (propertiesReader.CurrentPropertyId == MqttPropertyId.AuthenticationMethod)
  210. {
  211. packet.Properties.AuthenticationMethod = propertiesReader.ReadAuthenticationMethod();
  212. }
  213. else if (propertiesReader.CurrentPropertyId == MqttPropertyId.AuthenticationData)
  214. {
  215. packet.Properties.AuthenticationData = propertiesReader.ReadAuthenticationData();
  216. }
  217. else if (propertiesReader.CurrentPropertyId == MqttPropertyId.RetainAvailable)
  218. {
  219. packet.Properties.RetainAvailable = propertiesReader.ReadRetainAvailable();
  220. }
  221. else if (propertiesReader.CurrentPropertyId == MqttPropertyId.ReceiveMaximum)
  222. {
  223. packet.Properties.ReceiveMaximum = propertiesReader.ReadReceiveMaximum();
  224. }
  225. else if (propertiesReader.CurrentPropertyId == MqttPropertyId.AssignedClientIdentifier)
  226. {
  227. packet.Properties.AssignedClientIdentifier = propertiesReader.ReadAssignedClientIdentifier();
  228. }
  229. else if (propertiesReader.CurrentPropertyId == MqttPropertyId.TopicAliasMaximum)
  230. {
  231. packet.Properties.TopicAliasMaximum = propertiesReader.ReadTopicAliasMaximum();
  232. }
  233. else if (propertiesReader.CurrentPropertyId == MqttPropertyId.ReasonString)
  234. {
  235. packet.Properties.ReasonString = propertiesReader.ReadReasonString();
  236. }
  237. else if (propertiesReader.CurrentPropertyId == MqttPropertyId.MaximumPacketSize)
  238. {
  239. packet.Properties.MaximumPacketSize = propertiesReader.ReadMaximumPacketSize();
  240. }
  241. else if (propertiesReader.CurrentPropertyId == MqttPropertyId.WildcardSubscriptionAvailable)
  242. {
  243. packet.Properties.WildcardSubscriptionAvailable = propertiesReader.ReadWildcardSubscriptionAvailable();
  244. }
  245. else if (propertiesReader.CurrentPropertyId == MqttPropertyId.SubscriptionIdentifiersAvailable)
  246. {
  247. packet.Properties.SubscriptionIdentifiersAvailable = propertiesReader.ReadSubscriptionIdentifiersAvailable();
  248. }
  249. else if (propertiesReader.CurrentPropertyId == MqttPropertyId.SharedSubscriptionAvailable)
  250. {
  251. packet.Properties.SharedSubscriptionAvailable = propertiesReader.ReadSharedSubscriptionAvailable();
  252. }
  253. else if (propertiesReader.CurrentPropertyId == MqttPropertyId.ServerKeepAlive)
  254. {
  255. packet.Properties.ServerKeepAlive = propertiesReader.ReadServerKeepAlive();
  256. }
  257. else if (propertiesReader.CurrentPropertyId == MqttPropertyId.ResponseInformation)
  258. {
  259. packet.Properties.ResponseInformation = propertiesReader.ReadResponseInformation();
  260. }
  261. else if (propertiesReader.CurrentPropertyId == MqttPropertyId.ServerReference)
  262. {
  263. packet.Properties.ServerReference = propertiesReader.ReadServerReference();
  264. }
  265. else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
  266. {
  267. if (packet.Properties.UserProperties == null)
  268. {
  269. packet.Properties.UserProperties = new List<MqttUserProperty>();
  270. }
  271. propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
  272. }
  273. else
  274. {
  275. propertiesReader.ThrowInvalidPropertyIdException(typeof(MqttConnAckPacket));
  276. }
  277. }
  278. return packet;
  279. }
  280. private static MqttBasePacket DecodeDisconnectPacket(IMqttPacketBodyReader body)
  281. {
  282. ThrowIfBodyIsEmpty(body);
  283. var packet = new MqttDisconnectPacket
  284. {
  285. ReasonCode = (MqttDisconnectReasonCode)body.ReadByte()
  286. };
  287. var propertiesReader = new MqttV500PropertiesReader(body);
  288. while (propertiesReader.MoveNext())
  289. {
  290. if (packet.Properties == null)
  291. {
  292. packet.Properties = new MqttDisconnectPacketProperties();
  293. }
  294. if (propertiesReader.CurrentPropertyId == MqttPropertyId.SessionExpiryInterval)
  295. {
  296. packet.Properties.SessionExpiryInterval = propertiesReader.ReadSessionExpiryInterval();
  297. }
  298. else if (propertiesReader.CurrentPropertyId == MqttPropertyId.ReasonString)
  299. {
  300. packet.Properties.ReasonString = propertiesReader.ReadReasonString();
  301. }
  302. else if (propertiesReader.CurrentPropertyId == MqttPropertyId.ServerReference)
  303. {
  304. packet.Properties.ServerReference = propertiesReader.ReadServerReference();
  305. }
  306. else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
  307. {
  308. if (packet.Properties.UserProperties == null)
  309. {
  310. packet.Properties.UserProperties = new List<MqttUserProperty>();
  311. }
  312. propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
  313. }
  314. else
  315. {
  316. propertiesReader.ThrowInvalidPropertyIdException(typeof(MqttDisconnectPacket));
  317. }
  318. }
  319. return packet;
  320. }
  321. private static MqttBasePacket DecodeSubscribePacket(IMqttPacketBodyReader body)
  322. {
  323. ThrowIfBodyIsEmpty(body);
  324. var packet = new MqttSubscribePacket
  325. {
  326. PacketIdentifier = body.ReadTwoByteInteger()
  327. };
  328. var propertiesReader = new MqttV500PropertiesReader(body);
  329. while (propertiesReader.MoveNext())
  330. {
  331. if (packet.Properties == null)
  332. {
  333. packet.Properties = new MqttSubscribePacketProperties();
  334. }
  335. if (propertiesReader.CurrentPropertyId == MqttPropertyId.SubscriptionIdentifier)
  336. {
  337. packet.Properties.SubscriptionIdentifier = propertiesReader.ReadSubscriptionIdentifier();
  338. }
  339. else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
  340. {
  341. if (packet.Properties.UserProperties == null)
  342. {
  343. packet.Properties.UserProperties = new List<MqttUserProperty>();
  344. }
  345. propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
  346. }
  347. else
  348. {
  349. propertiesReader.ThrowInvalidPropertyIdException(typeof(MqttSubscribePacket));
  350. }
  351. }
  352. while (!body.EndOfStream)
  353. {
  354. var topic = body.ReadStringWithLengthPrefix();
  355. var options = body.ReadByte();
  356. var qos = (MqttQualityOfServiceLevel)(options & 3);
  357. var noLocal = (options & (1 << 2)) > 0;
  358. var retainAsPublished = (options & (1 << 3)) > 0;
  359. var retainHandling = (MqttRetainHandling)((options >> 4) & 3);
  360. packet.TopicFilters.Add(new MqttTopicFilter
  361. {
  362. Topic = topic,
  363. QualityOfServiceLevel = qos,
  364. NoLocal = noLocal,
  365. RetainAsPublished = retainAsPublished,
  366. RetainHandling = retainHandling
  367. });
  368. }
  369. return packet;
  370. }
  371. private static MqttBasePacket DecodeSubAckPacket(IMqttPacketBodyReader body)
  372. {
  373. ThrowIfBodyIsEmpty(body);
  374. var packet = new MqttSubAckPacket
  375. {
  376. PacketIdentifier = body.ReadTwoByteInteger()
  377. };
  378. var propertiesReader = new MqttV500PropertiesReader(body);
  379. while (propertiesReader.MoveNext())
  380. {
  381. if (packet.Properties == null)
  382. {
  383. packet.Properties = new MqttSubAckPacketProperties();
  384. }
  385. if (propertiesReader.CurrentPropertyId == MqttPropertyId.ReasonString)
  386. {
  387. packet.Properties.ReasonString = propertiesReader.ReadReasonString();
  388. }
  389. else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
  390. {
  391. if (packet.Properties.UserProperties == null)
  392. {
  393. packet.Properties.UserProperties = new List<MqttUserProperty>();
  394. }
  395. propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
  396. }
  397. else
  398. {
  399. propertiesReader.ThrowInvalidPropertyIdException(typeof(MqttSubAckPacket));
  400. }
  401. }
  402. while (!body.EndOfStream)
  403. {
  404. var reasonCode = (MqttSubscribeReasonCode)body.ReadByte();
  405. packet.ReasonCodes.Add(reasonCode);
  406. }
  407. return packet;
  408. }
  409. private static MqttBasePacket DecodeUnsubscribePacket(IMqttPacketBodyReader body)
  410. {
  411. ThrowIfBodyIsEmpty(body);
  412. var packet = new MqttUnsubscribePacket
  413. {
  414. PacketIdentifier = body.ReadTwoByteInteger()
  415. };
  416. var propertiesReader = new MqttV500PropertiesReader(body);
  417. while (propertiesReader.MoveNext())
  418. {
  419. if (packet.Properties == null)
  420. {
  421. packet.Properties = new MqttUnsubscribePacketProperties();
  422. }
  423. if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
  424. {
  425. if (packet.Properties.UserProperties == null)
  426. {
  427. packet.Properties.UserProperties = new List<MqttUserProperty>();
  428. }
  429. propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
  430. }
  431. else
  432. {
  433. propertiesReader.ThrowInvalidPropertyIdException(typeof(MqttUnsubscribePacket));
  434. }
  435. }
  436. while (!body.EndOfStream)
  437. {
  438. packet.TopicFilters.Add(body.ReadStringWithLengthPrefix());
  439. }
  440. return packet;
  441. }
  442. private static MqttBasePacket DecodeUnsubAckPacket(IMqttPacketBodyReader body)
  443. {
  444. ThrowIfBodyIsEmpty(body);
  445. var packet = new MqttUnsubAckPacket
  446. {
  447. PacketIdentifier = body.ReadTwoByteInteger()
  448. };
  449. var propertiesReader = new MqttV500PropertiesReader(body);
  450. while (propertiesReader.MoveNext())
  451. {
  452. if (packet.Properties == null)
  453. {
  454. packet.Properties = new MqttUnsubAckPacketProperties();
  455. }
  456. if (propertiesReader.CurrentPropertyId == MqttPropertyId.ReasonString)
  457. {
  458. packet.Properties.ReasonString = propertiesReader.ReadReasonString();
  459. }
  460. else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
  461. {
  462. if (packet.Properties.UserProperties == null)
  463. {
  464. packet.Properties.UserProperties = new List<MqttUserProperty>();
  465. }
  466. propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
  467. }
  468. else
  469. {
  470. propertiesReader.ThrowInvalidPropertyIdException(typeof(MqttUnsubAckPacket));
  471. }
  472. }
  473. while (!body.EndOfStream)
  474. {
  475. var reasonCode = (MqttUnsubscribeReasonCode)body.ReadByte();
  476. packet.ReasonCodes.Add(reasonCode);
  477. }
  478. return packet;
  479. }
  480. private static MqttBasePacket DecodePingReqPacket()
  481. {
  482. return PingReqPacket;
  483. }
  484. private static MqttBasePacket DecodePingRespPacket()
  485. {
  486. return PingRespPacket;
  487. }
  488. private static MqttBasePacket DecodePublishPacket(byte header, IMqttPacketBodyReader body)
  489. {
  490. ThrowIfBodyIsEmpty(body);
  491. var retain = (header & 1) > 0;
  492. var qos = (MqttQualityOfServiceLevel)(header >> 1 & 3);
  493. var dup = (header >> 3 & 1) > 0;
  494. var packet = new MqttPublishPacket
  495. {
  496. Topic = body.ReadStringWithLengthPrefix(),
  497. Retain = retain,
  498. QualityOfServiceLevel = qos,
  499. Dup = dup
  500. };
  501. if (qos > 0)
  502. {
  503. packet.PacketIdentifier = body.ReadTwoByteInteger();
  504. }
  505. var propertiesReader = new MqttV500PropertiesReader(body);
  506. while (propertiesReader.MoveNext())
  507. {
  508. if (packet.Properties == null)
  509. {
  510. packet.Properties = new MqttPublishPacketProperties();
  511. }
  512. if (propertiesReader.CurrentPropertyId == MqttPropertyId.PayloadFormatIndicator)
  513. {
  514. packet.Properties.PayloadFormatIndicator = propertiesReader.ReadPayloadFormatIndicator();
  515. }
  516. else if (propertiesReader.CurrentPropertyId == MqttPropertyId.MessageExpiryInterval)
  517. {
  518. packet.Properties.MessageExpiryInterval = propertiesReader.ReadMessageExpiryInterval();
  519. }
  520. else if (propertiesReader.CurrentPropertyId == MqttPropertyId.TopicAlias)
  521. {
  522. packet.Properties.TopicAlias = propertiesReader.ReadTopicAlias();
  523. }
  524. else if (propertiesReader.CurrentPropertyId == MqttPropertyId.ResponseTopic)
  525. {
  526. packet.Properties.ResponseTopic = propertiesReader.ReadResponseTopic();
  527. }
  528. else if (propertiesReader.CurrentPropertyId == MqttPropertyId.CorrelationData)
  529. {
  530. packet.Properties.CorrelationData = propertiesReader.ReadCorrelationData();
  531. }
  532. else if (propertiesReader.CurrentPropertyId == MqttPropertyId.SubscriptionIdentifier)
  533. {
  534. if (packet.Properties.SubscriptionIdentifiers == null)
  535. {
  536. packet.Properties.SubscriptionIdentifiers = new List<uint>();
  537. }
  538. packet.Properties.SubscriptionIdentifiers.Add(propertiesReader.ReadSubscriptionIdentifier());
  539. }
  540. else if (propertiesReader.CurrentPropertyId == MqttPropertyId.ContentType)
  541. {
  542. packet.Properties.ContentType = propertiesReader.ReadContentType();
  543. }
  544. else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
  545. {
  546. if (packet.Properties.UserProperties == null)
  547. {
  548. packet.Properties.UserProperties = new List<MqttUserProperty>();
  549. }
  550. propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
  551. }
  552. else
  553. {
  554. propertiesReader.ThrowInvalidPropertyIdException(typeof(MqttPublishPacket));
  555. }
  556. }
  557. if (!body.EndOfStream)
  558. {
  559. packet.Payload = body.ReadRemainingData();
  560. }
  561. return packet;
  562. }
  563. private static MqttBasePacket DecodePubAckPacket(IMqttPacketBodyReader body)
  564. {
  565. ThrowIfBodyIsEmpty(body);
  566. var packet = new MqttPubAckPacket
  567. {
  568. PacketIdentifier = body.ReadTwoByteInteger()
  569. };
  570. if (body.EndOfStream)
  571. {
  572. packet.ReasonCode = MqttPubAckReasonCode.Success;
  573. return packet;
  574. }
  575. packet.ReasonCode = (MqttPubAckReasonCode)body.ReadByte();
  576. var propertiesReader = new MqttV500PropertiesReader(body);
  577. while (propertiesReader.MoveNext())
  578. {
  579. if (packet.Properties == null)
  580. {
  581. packet.Properties = new MqttPubAckPacketProperties();
  582. }
  583. if (propertiesReader.CurrentPropertyId == MqttPropertyId.ReasonString)
  584. {
  585. packet.Properties.ReasonString = propertiesReader.ReadReasonString();
  586. }
  587. else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
  588. {
  589. if (packet.Properties.UserProperties == null)
  590. {
  591. packet.Properties.UserProperties = new List<MqttUserProperty>();
  592. }
  593. propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
  594. }
  595. else
  596. {
  597. propertiesReader.ThrowInvalidPropertyIdException(typeof(MqttPubAckPacket));
  598. }
  599. }
  600. return packet;
  601. }
  602. private static MqttBasePacket DecodePubRecPacket(IMqttPacketBodyReader body)
  603. {
  604. ThrowIfBodyIsEmpty(body);
  605. var packet = new MqttPubRecPacket
  606. {
  607. PacketIdentifier = body.ReadTwoByteInteger()
  608. };
  609. if (body.EndOfStream)
  610. {
  611. packet.ReasonCode = MqttPubRecReasonCode.Success;
  612. return packet;
  613. }
  614. packet.ReasonCode = (MqttPubRecReasonCode)body.ReadByte();
  615. var propertiesReader = new MqttV500PropertiesReader(body);
  616. while (propertiesReader.MoveNext())
  617. {
  618. if (packet.Properties == null)
  619. {
  620. packet.Properties = new MqttPubRecPacketProperties();
  621. }
  622. if (propertiesReader.CurrentPropertyId == MqttPropertyId.ReasonString)
  623. {
  624. packet.Properties.ReasonString = propertiesReader.ReadReasonString();
  625. }
  626. else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
  627. {
  628. if (packet.Properties.UserProperties == null)
  629. {
  630. packet.Properties.UserProperties = new List<MqttUserProperty>();
  631. }
  632. propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
  633. }
  634. else
  635. {
  636. propertiesReader.ThrowInvalidPropertyIdException(typeof(MqttPubRecPacket));
  637. }
  638. }
  639. return packet;
  640. }
  641. private static MqttBasePacket DecodePubRelPacket(IMqttPacketBodyReader body)
  642. {
  643. ThrowIfBodyIsEmpty(body);
  644. var packet = new MqttPubRelPacket
  645. {
  646. PacketIdentifier = body.ReadTwoByteInteger()
  647. };
  648. if (body.EndOfStream)
  649. {
  650. packet.ReasonCode = MqttPubRelReasonCode.Success;
  651. return packet;
  652. }
  653. packet.ReasonCode = (MqttPubRelReasonCode)body.ReadByte();
  654. var propertiesReader = new MqttV500PropertiesReader(body);
  655. while (propertiesReader.MoveNext())
  656. {
  657. if (packet.Properties == null)
  658. {
  659. packet.Properties = new MqttPubRelPacketProperties();
  660. }
  661. if (propertiesReader.CurrentPropertyId == MqttPropertyId.ReasonString)
  662. {
  663. packet.Properties.ReasonString = propertiesReader.ReadReasonString();
  664. }
  665. else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
  666. {
  667. if (packet.Properties.UserProperties == null)
  668. {
  669. packet.Properties.UserProperties = new List<MqttUserProperty>();
  670. }
  671. propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
  672. }
  673. else
  674. {
  675. propertiesReader.ThrowInvalidPropertyIdException(typeof(MqttPubRelPacket));
  676. }
  677. }
  678. return packet;
  679. }
  680. private static MqttBasePacket DecodePubCompPacket(IMqttPacketBodyReader body)
  681. {
  682. ThrowIfBodyIsEmpty(body);
  683. var packet = new MqttPubCompPacket
  684. {
  685. PacketIdentifier = body.ReadTwoByteInteger()
  686. };
  687. if (body.EndOfStream)
  688. {
  689. packet.ReasonCode = MqttPubCompReasonCode.Success;
  690. return packet;
  691. }
  692. packet.ReasonCode = (MqttPubCompReasonCode)body.ReadByte();
  693. var propertiesReader = new MqttV500PropertiesReader(body);
  694. while (propertiesReader.MoveNext())
  695. {
  696. if (packet.Properties == null)
  697. {
  698. packet.Properties = new MqttPubCompPacketProperties();
  699. }
  700. if (propertiesReader.CurrentPropertyId == MqttPropertyId.ReasonString)
  701. {
  702. packet.Properties.ReasonString = propertiesReader.ReadReasonString();
  703. }
  704. else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
  705. {
  706. if (packet.Properties.UserProperties == null)
  707. {
  708. packet.Properties.UserProperties = new List<MqttUserProperty>();
  709. }
  710. propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
  711. }
  712. else
  713. {
  714. propertiesReader.ThrowInvalidPropertyIdException(typeof(MqttPubCompPacket));
  715. }
  716. }
  717. return packet;
  718. }
  719. private static MqttBasePacket DecodeAuthPacket(IMqttPacketBodyReader body)
  720. {
  721. ThrowIfBodyIsEmpty(body);
  722. var packet = new MqttAuthPacket();
  723. if (body.EndOfStream)
  724. {
  725. packet.ReasonCode = MqttAuthenticateReasonCode.Success;
  726. return packet;
  727. }
  728. packet.ReasonCode = (MqttAuthenticateReasonCode)body.ReadByte();
  729. var propertiesReader = new MqttV500PropertiesReader(body);
  730. while (propertiesReader.MoveNext())
  731. {
  732. if (packet.Properties == null)
  733. {
  734. packet.Properties = new MqttAuthPacketProperties();
  735. }
  736. if (propertiesReader.CurrentPropertyId == MqttPropertyId.AuthenticationMethod)
  737. {
  738. packet.Properties.AuthenticationMethod = propertiesReader.ReadAuthenticationMethod();
  739. }
  740. else if (propertiesReader.CurrentPropertyId == MqttPropertyId.AuthenticationData)
  741. {
  742. packet.Properties.AuthenticationData = propertiesReader.ReadAuthenticationData();
  743. }
  744. else if (propertiesReader.CurrentPropertyId == MqttPropertyId.ReasonString)
  745. {
  746. packet.Properties.ReasonString = propertiesReader.ReadReasonString();
  747. }
  748. else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty)
  749. {
  750. if (packet.Properties.UserProperties == null)
  751. {
  752. packet.Properties.UserProperties = new List<MqttUserProperty>();
  753. }
  754. propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties);
  755. }
  756. else
  757. {
  758. propertiesReader.ThrowInvalidPropertyIdException(typeof(MqttAuthPacket));
  759. }
  760. }
  761. return packet;
  762. }
  763. // ReSharper disable once ParameterOnlyUsedForPreconditionCheck.Local
  764. private static void ThrowIfBodyIsEmpty(IMqttPacketBodyReader body)
  765. {
  766. if (body == null || body.Length == 0)
  767. {
  768. throw new MqttProtocolViolationException("Data from the body is required but not present.");
  769. }
  770. }
  771. }
  772. }