M2MqttClient.cs 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. using System;
  2. using System.IO;
  3. using System.Collections.Generic;
  4. using System.Linq;
  5. using System.Text;
  6. using System.Threading.Tasks;
  7. using System.Timers;
  8. using System.Configuration;
  9. using uPLibrary.Networking.M2Mqtt;
  10. using uPLibrary.Networking.M2Mqtt.Messages;
  11. namespace MQTTClientTest
  12. {
  13. class M2MqttClient
  14. {
  15. private MqttClient _client = null;
  16. private Timer _timer = new Timer();
  17. private byte[] _buffer = null;
  18. string serverIP = ConfigurationManager.AppSettings["MQTTServer.Addr"];
  19. string serverPort = ConfigurationManager.AppSettings["MQTTServer.Port"];
  20. string serverClientId = ConfigurationManager.AppSettings["MQTTServer.ClientId"];
  21. string serverUserId = ConfigurationManager.AppSettings["MQTTServer.UserId"];
  22. string serverPasswd = ConfigurationManager.AppSettings["MQTTServer.Passwd"];
  23. string serverTopic = ConfigurationManager.AppSettings["MQTTServer.Topic"];
  24. public void start()
  25. {
  26. read_payload();
  27. _client = new MqttClient(serverIP, Convert.ToInt32(serverPort), false, null, null, MqttSslProtocols.None);
  28. string clientId = serverClientId; // Guid.NewGuid().ToString();
  29. int loop = 3;
  30. while (loop-- > 0)
  31. {
  32. try
  33. {
  34. // 建立连接
  35. byte con_ret = _client.Connect(clientId, serverUserId, serverPasswd);
  36. if (con_ret == 0)
  37. {
  38. System.Console.WriteLine("Connect to MQTTSever ok!");
  39. // 订阅
  40. ushort sub_ret = _client.Subscribe(new string[] { serverTopic }, new byte[] { MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE });
  41. break;
  42. }
  43. }
  44. catch (Exception e)
  45. {
  46. System.Console.WriteLine(e.Message);
  47. }
  48. }
  49. if (loop <= 0)
  50. return;
  51. // 回调
  52. _client.ConnectionClosed += client_ConnectionClosed;
  53. _client.MqttMsgSubscribed += client_MsgSubscribed;
  54. _client.MqttMsgPublishReceived += client_MsgPublishReceived;
  55. // 启动定时器
  56. _timer.Interval = 2 * 1000;
  57. _timer.Elapsed += new ElapsedEventHandler(timer_elapsed);
  58. _timer.Enabled = true;
  59. }
  60. public void stop()
  61. {
  62. if (_client != null)
  63. {
  64. _timer.Enabled = false;
  65. _client.Disconnect();
  66. }
  67. }
  68. private void read_payload()
  69. {
  70. string src = "d:\\1.txt";
  71. FileStream fs = new FileStream(src, FileMode.Open);
  72. //获取文件大小
  73. long size = fs.Length;
  74. _buffer = new byte[size];
  75. //将文件读到byte数组中
  76. fs.Read(_buffer, 0, _buffer.Length);
  77. fs.Close();
  78. }
  79. private void client_MsgSubscribed(object sender, MqttMsgSubscribedEventArgs e)
  80. {
  81. System.Console.WriteLine("Subscribed " + e.MessageId);
  82. }
  83. private void client_ConnectionClosed(object sender, EventArgs e)
  84. {
  85. System.Console.WriteLine("Connection lost");
  86. }
  87. // 处理接收到的消息
  88. private void client_MsgPublishReceived(object sender, MqttMsgPublishEventArgs e)
  89. {
  90. string msg = System.Text.Encoding.Default.GetString(e.Message);
  91. System.Console.WriteLine("Message received, " + msg);
  92. }
  93. private void timer_elapsed(object sender, ElapsedEventArgs e)
  94. {
  95. string strPayload = DateTime.Now.ToString();
  96. //byte[] buffer = System.Text.Encoding.Default.GetBytes(strPayload);
  97. if (_buffer != null)
  98. {
  99. _client.Publish(serverTopic, _buffer, MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE, false);
  100. }
  101. }
  102. }
  103. }