| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123 |
- using System;
- using System.IO;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
- using System.Threading.Tasks;
- using System.Timers;
- using System.Configuration;
- using uPLibrary.Networking.M2Mqtt;
- using uPLibrary.Networking.M2Mqtt.Messages;
- namespace MQTTClientTest
- {
- class M2MqttClient
- {
- private MqttClient _client = null;
- private Timer _timer = new Timer();
- private byte[] _buffer = null;
- string serverIP = ConfigurationManager.AppSettings["MQTTServer.Addr"];
- string serverPort = ConfigurationManager.AppSettings["MQTTServer.Port"];
- string serverClientId = ConfigurationManager.AppSettings["MQTTServer.ClientId"];
- string serverUserId = ConfigurationManager.AppSettings["MQTTServer.UserId"];
- string serverPasswd = ConfigurationManager.AppSettings["MQTTServer.Passwd"];
- string serverTopic = ConfigurationManager.AppSettings["MQTTServer.Topic"];
- public void start()
- {
- read_payload();
- _client = new MqttClient(serverIP, Convert.ToInt32(serverPort), false, null, null, MqttSslProtocols.None);
- string clientId = serverClientId; // Guid.NewGuid().ToString();
- int loop = 3;
- while (loop-- > 0)
- {
- try
- {
- // 建立连接
- byte con_ret = _client.Connect(clientId, serverUserId, serverPasswd);
- if (con_ret == 0)
- {
- System.Console.WriteLine("Connect to MQTTSever ok!");
- // 订阅
- ushort sub_ret = _client.Subscribe(new string[] { serverTopic }, new byte[] { MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE });
- break;
- }
- }
- catch (Exception e)
- {
- System.Console.WriteLine(e.Message);
- }
- }
- if (loop <= 0)
- return;
- // 回调
- _client.ConnectionClosed += client_ConnectionClosed;
- _client.MqttMsgSubscribed += client_MsgSubscribed;
- _client.MqttMsgPublishReceived += client_MsgPublishReceived;
- // 启动定时器
- _timer.Interval = 2 * 1000;
- _timer.Elapsed += new ElapsedEventHandler(timer_elapsed);
- _timer.Enabled = true;
- }
- public void stop()
- {
- if (_client != null)
- {
- _timer.Enabled = false;
- _client.Disconnect();
- }
- }
- private void read_payload()
- {
- string src = "d:\\1.txt";
- FileStream fs = new FileStream(src, FileMode.Open);
- //获取文件大小
- long size = fs.Length;
- _buffer = new byte[size];
- //将文件读到byte数组中
- fs.Read(_buffer, 0, _buffer.Length);
- fs.Close();
- }
- private void client_MsgSubscribed(object sender, MqttMsgSubscribedEventArgs e)
- {
- System.Console.WriteLine("Subscribed " + e.MessageId);
- }
- private void client_ConnectionClosed(object sender, EventArgs e)
- {
- System.Console.WriteLine("Connection lost");
- }
- // 处理接收到的消息
- private void client_MsgPublishReceived(object sender, MqttMsgPublishEventArgs e)
- {
- string msg = System.Text.Encoding.Default.GetString(e.Message);
- System.Console.WriteLine("Message received, " + msg);
- }
- private void timer_elapsed(object sender, ElapsedEventArgs e)
- {
- string strPayload = DateTime.Now.ToString();
- //byte[] buffer = System.Text.Encoding.Default.GetBytes(strPayload);
- if (_buffer != null)
- {
- _client.Publish(serverTopic, _buffer, MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE, false);
- }
- }
- }
- }
|