MqttMsgSubscribe.cs 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273
  1. /*
  2. Copyright (c) 2013, 2014 Paolo Patierno
  3. All rights reserved. This program and the accompanying materials
  4. are made available under the terms of the Eclipse Public License v1.0
  5. and Eclipse Distribution License v1.0 which accompany this distribution.
  6. The Eclipse Public License is available at
  7. http://www.eclipse.org/legal/epl-v10.html
  8. and the Eclipse Distribution License is available at
  9. http://www.eclipse.org/org/documents/edl-v10.php.
  10. Contributors:
  11. Paolo Patierno - initial API and implementation and/or initial documentation
  12. */
  13. using System;
  14. // if NOT .Net Micro Framework
  15. #if (!MF_FRAMEWORK_VERSION_V4_2 && !MF_FRAMEWORK_VERSION_V4_3)
  16. using System.Collections.Generic;
  17. #endif
  18. using System.Collections;
  19. using System.Text;
  20. using M2Mqtt.Exceptions;
  21. using M2Mqtt.Messages;
  22. namespace M2Mqtt.Messages
  23. {
  24. /// <summary>
  25. /// Class for SUBSCRIBE message from client to broker
  26. /// </summary>
  27. public class MqttMsgSubscribe : MqttMsgBase
  28. {
  29. #region Properties...
  30. /// <summary>
  31. /// List of topics to subscribe
  32. /// </summary>
  33. public string[] Topics
  34. {
  35. get { return topics; }
  36. set { topics = value; }
  37. }
  38. /// <summary>
  39. /// List of QOS Levels related to topics
  40. /// </summary>
  41. public byte[] QoSLevels
  42. {
  43. get { return qosLevels; }
  44. set { qosLevels = value; }
  45. }
  46. #endregion
  47. // topics to subscribe
  48. string[] topics;
  49. // QOS levels related to topics
  50. byte[] qosLevels;
  51. /// <summary>
  52. /// Constructor
  53. /// </summary>
  54. public MqttMsgSubscribe()
  55. {
  56. type = MQTT_MSG_SUBSCRIBE_TYPE;
  57. }
  58. /// <summary>
  59. /// Constructor
  60. /// </summary>
  61. /// <param name="topics">List of topics to subscribe</param>
  62. /// <param name="qosLevels">List of QOS Levels related to topics</param>
  63. public MqttMsgSubscribe(string[] topics, byte[] qosLevels)
  64. {
  65. type = MQTT_MSG_SUBSCRIBE_TYPE;
  66. topics = topics;
  67. qosLevels = qosLevels;
  68. // SUBSCRIBE message uses QoS Level 1 (not "officially" in 3.1.1)
  69. qosLevel = QOS_LEVEL_AT_LEAST_ONCE;
  70. }
  71. /// <summary>
  72. /// Parse bytes for a SUBSCRIBE message
  73. /// </summary>
  74. /// <param name="fixedHeaderFirstByte">First fixed header byte</param>
  75. /// <param name="protocolVersion">Protocol Version</param>
  76. /// <param name="channel">Channel connected to the broker</param>
  77. /// <returns>SUBSCRIBE message instance</returns>
  78. public static MqttMsgSubscribe Parse(byte fixedHeaderFirstByte, byte protocolVersion, IMqttNetworkChannel channel)
  79. {
  80. byte[] buffer;
  81. int index = 0;
  82. byte[] topicUtf8;
  83. int topicUtf8Length;
  84. MqttMsgSubscribe msg = new MqttMsgSubscribe();
  85. if (protocolVersion == MqttMsgConnect.PROTOCOL_VERSION_V3_1_1)
  86. {
  87. // [v3.1.1] check flag bits
  88. if ((fixedHeaderFirstByte & MSG_FLAG_BITS_MASK) != MQTT_MSG_SUBSCRIBE_FLAG_BITS)
  89. throw new MqttClientException(MqttClientErrorCode.InvalidFlagBits);
  90. }
  91. // get remaining length and allocate buffer
  92. int remainingLength = MqttMsgBase.decodeRemainingLength(channel);
  93. buffer = new byte[remainingLength];
  94. // read bytes from socket...
  95. int received = channel.Receive(buffer);
  96. if (protocolVersion == MqttMsgConnect.PROTOCOL_VERSION_V3_1)
  97. {
  98. // only 3.1.0
  99. // read QoS level from fixed header
  100. msg.qosLevel = (byte)((fixedHeaderFirstByte & QOS_LEVEL_MASK) >> QOS_LEVEL_OFFSET);
  101. // read DUP flag from fixed header
  102. msg.dupFlag = (((fixedHeaderFirstByte & DUP_FLAG_MASK) >> DUP_FLAG_OFFSET) == 0x01);
  103. // retain flag not used
  104. msg.retain = false;
  105. }
  106. // message id
  107. msg.messageId = (ushort)((buffer[index++] << 8) & 0xFF00);
  108. msg.messageId |= (buffer[index++]);
  109. // payload contains topics and QoS levels
  110. // NOTE : before, I don't know how many topics will be in the payload (so use List)
  111. // if .Net Micro Framework
  112. #if (MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3)
  113. IList tmpTopics = new ArrayList();
  114. IList tmpQosLevels = new ArrayList();
  115. // else other frameworks (.Net, .Net Compact, Mono, Windows Phone)
  116. #else
  117. IList<String> tmpTopics = new List<String>();
  118. IList<byte> tmpQosLevels = new List<byte>();
  119. #endif
  120. do
  121. {
  122. // topic name
  123. topicUtf8Length = ((buffer[index++] << 8) & 0xFF00);
  124. topicUtf8Length |= buffer[index++];
  125. topicUtf8 = new byte[topicUtf8Length];
  126. Array.Copy(buffer, index, topicUtf8, 0, topicUtf8Length);
  127. index += topicUtf8Length;
  128. tmpTopics.Add(new String(Encoding.UTF8.GetChars(topicUtf8)));
  129. // QoS level
  130. tmpQosLevels.Add(buffer[index++]);
  131. } while (index < remainingLength);
  132. // copy from list to array
  133. msg.topics = new string[tmpTopics.Count];
  134. msg.qosLevels = new byte[tmpQosLevels.Count];
  135. for (int i = 0; i < tmpTopics.Count; i++)
  136. {
  137. msg.topics[i] = (string)tmpTopics[i];
  138. msg.qosLevels[i] = (byte)tmpQosLevels[i];
  139. }
  140. return msg;
  141. }
  142. public override byte[] GetBytes(byte protocolVersion)
  143. {
  144. int fixedHeaderSize = 0;
  145. int varHeaderSize = 0;
  146. int payloadSize = 0;
  147. int remainingLength = 0;
  148. byte[] buffer;
  149. int index = 0;
  150. // topics list empty
  151. if ((topics == null) || (topics.Length == 0))
  152. throw new MqttClientException(MqttClientErrorCode.TopicsEmpty);
  153. // qos levels list empty
  154. if ((qosLevels == null) || (qosLevels.Length == 0))
  155. throw new MqttClientException(MqttClientErrorCode.QosLevelsEmpty);
  156. // topics and qos levels lists length don't match
  157. if (topics.Length != qosLevels.Length)
  158. throw new MqttClientException(MqttClientErrorCode.TopicsQosLevelsNotMatch);
  159. // message identifier
  160. varHeaderSize += MESSAGE_ID_SIZE;
  161. int topicIdx = 0;
  162. byte[][] topicsUtf8 = new byte[topics.Length][];
  163. for (topicIdx = 0; topicIdx < topics.Length; topicIdx++)
  164. {
  165. // check topic length
  166. if ((topics[topicIdx].Length < MIN_TOPIC_LENGTH) || (topics[topicIdx].Length > MAX_TOPIC_LENGTH))
  167. throw new MqttClientException(MqttClientErrorCode.TopicLength);
  168. topicsUtf8[topicIdx] = Encoding.UTF8.GetBytes(topics[topicIdx]);
  169. payloadSize += 2; // topic size (MSB, LSB)
  170. payloadSize += topicsUtf8[topicIdx].Length;
  171. payloadSize++; // byte for QoS
  172. }
  173. remainingLength += (varHeaderSize + payloadSize);
  174. // first byte of fixed header
  175. fixedHeaderSize = 1;
  176. int temp = remainingLength;
  177. // increase fixed header size based on remaining length
  178. // (each remaining length byte can encode until 128)
  179. do
  180. {
  181. fixedHeaderSize++;
  182. temp = temp / 128;
  183. } while (temp > 0);
  184. // allocate buffer for message
  185. buffer = new byte[fixedHeaderSize + varHeaderSize + payloadSize];
  186. // first fixed header byte
  187. if (protocolVersion == MqttMsgConnect.PROTOCOL_VERSION_V3_1_1)
  188. buffer[index++] = (MQTT_MSG_SUBSCRIBE_TYPE << MSG_TYPE_OFFSET) | MQTT_MSG_SUBSCRIBE_FLAG_BITS; // [v.3.1.1]
  189. else
  190. {
  191. buffer[index] = (byte)((MQTT_MSG_SUBSCRIBE_TYPE << MSG_TYPE_OFFSET) |
  192. (qosLevel << QOS_LEVEL_OFFSET));
  193. buffer[index] |= dupFlag ? (byte)(1 << DUP_FLAG_OFFSET) : (byte)0x00;
  194. index++;
  195. }
  196. // encode remaining length
  197. index = encodeRemainingLength(remainingLength, buffer, index);
  198. // check message identifier assigned (SUBSCRIBE uses QoS Level 1, so message id is mandatory)
  199. if (messageId == 0)
  200. throw new MqttClientException(MqttClientErrorCode.WrongMessageId);
  201. buffer[index++] = (byte)((messageId >> 8) & 0x00FF); // MSB
  202. buffer[index++] = (byte)(messageId & 0x00FF); // LSB
  203. topicIdx = 0;
  204. for (topicIdx = 0; topicIdx < topics.Length; topicIdx++)
  205. {
  206. // topic name
  207. buffer[index++] = (byte)((topicsUtf8[topicIdx].Length >> 8) & 0x00FF); // MSB
  208. buffer[index++] = (byte)(topicsUtf8[topicIdx].Length & 0x00FF); // LSB
  209. Array.Copy(topicsUtf8[topicIdx], 0, buffer, index, topicsUtf8[topicIdx].Length);
  210. index += topicsUtf8[topicIdx].Length;
  211. // requested QoS
  212. buffer[index++] = qosLevels[topicIdx];
  213. }
  214. return buffer;
  215. }
  216. public override string ToString()
  217. {
  218. #if TRACE
  219. return GetTraceString(
  220. "SUBSCRIBE",
  221. new object[] { "messageId", "topics", "qosLevels" },
  222. new object[] { messageId, topics, qosLevels });
  223. #else
  224. return base.ToString();
  225. #endif
  226. }
  227. }
  228. }