using System; using System.IO; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Timers; using System.Configuration; using uPLibrary.Networking.M2Mqtt; using uPLibrary.Networking.M2Mqtt.Messages; namespace MQTTClientTest { class M2MqttClient { private MqttClient _client = null; private Timer _timer = new Timer(); private byte[] _buffer = null; string serverIP = ConfigurationManager.AppSettings["MQTTServer.Addr"]; string serverPort = ConfigurationManager.AppSettings["MQTTServer.Port"]; string serverClientId = ConfigurationManager.AppSettings["MQTTServer.ClientId"]; string serverUserId = ConfigurationManager.AppSettings["MQTTServer.UserId"]; string serverPasswd = ConfigurationManager.AppSettings["MQTTServer.Passwd"]; string serverTopic = ConfigurationManager.AppSettings["MQTTServer.Topic"]; public void start() { read_payload(); _client = new MqttClient(serverIP, Convert.ToInt32(serverPort), false, null, null, MqttSslProtocols.None); string clientId = serverClientId; // Guid.NewGuid().ToString(); int loop = 3; while (loop-- > 0) { try { // 建立连接 byte con_ret = _client.Connect(clientId, serverUserId, serverPasswd); if (con_ret == 0) { System.Console.WriteLine("Connect to MQTTSever ok!"); // 订阅 ushort sub_ret = _client.Subscribe(new string[] { serverTopic }, new byte[] { MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE }); break; } } catch (Exception e) { System.Console.WriteLine(e.Message); } } if (loop <= 0) return; // 回调 _client.ConnectionClosed += client_ConnectionClosed; _client.MqttMsgSubscribed += client_MsgSubscribed; _client.MqttMsgPublishReceived += client_MsgPublishReceived; // 启动定时器 _timer.Interval = 2 * 1000; _timer.Elapsed += new ElapsedEventHandler(timer_elapsed); _timer.Enabled = true; } public void stop() { if (_client != null) { _timer.Enabled = false; _client.Disconnect(); } } private void read_payload() { string src = "d:\\1.txt"; FileStream fs = new FileStream(src, FileMode.Open); //获取文件大小 long size = fs.Length; _buffer = new byte[size]; //将文件读到byte数组中 fs.Read(_buffer, 0, _buffer.Length); fs.Close(); } private void client_MsgSubscribed(object sender, MqttMsgSubscribedEventArgs e) { System.Console.WriteLine("Subscribed " + e.MessageId); } private void client_ConnectionClosed(object sender, EventArgs e) { System.Console.WriteLine("Connection lost"); } // 处理接收到的消息 private void client_MsgPublishReceived(object sender, MqttMsgPublishEventArgs e) { string msg = System.Text.Encoding.Default.GetString(e.Message); System.Console.WriteLine("Message received, " + msg); } private void timer_elapsed(object sender, ElapsedEventArgs e) { string strPayload = DateTime.Now.ToString(); //byte[] buffer = System.Text.Encoding.Default.GetBytes(strPayload); if (_buffer != null) { _client.Publish(serverTopic, _buffer, MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE, false); } } } }