| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135 |
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Timers;
- using System.Threading.Tasks;
- using System.Configuration;
- using MQTTnet;
- using MQTTnet.Client;
- using MQTTnet.Client.Connecting;
- using MQTTnet.Client.Disconnecting;
- using MQTTnet.Client.Options;
- using MQTTnet.Client.Receiving;
- using MQTTnet.Extensions.ManagedClient;
- using MQTTnet.Formatter;
- using MQTTnet.Protocol;
- using MQTTnet.Server;
- namespace MQTTClientTest
- {
- class MultiClients
- {
- //private List<IManagedMqttClient> _mClientList = new List<IManagedMqttClient>();
- private List<IMqttClient> _mClientList = new List<IMqttClient>();
- private Timer _timer = new Timer();
- private Int32 _connectedCount = 0;
- private object _lock = new object();
- string serverIP = ConfigurationManager.AppSettings["MQTTServer.Addr"];
- string serverPort = ConfigurationManager.AppSettings["MQTTServer.Port"];
- string serverUserId = ConfigurationManager.AppSettings["MQTTServer.UserId"];
- string serverPasswd = ConfigurationManager.AppSettings["MQTTServer.Passwd"];
- string serverTopic = ConfigurationManager.AppSettings["MQTTServer.Topic"];
- // start multiple clients
- public async void start(int clientsCount)
- {
- await Task.Factory.StartNew(async () =>
- {
- for (int i = 0; i < clientsCount; i++)
- {
- //var c = new MqttFactory().CreateManagedMqttClient();
- var c = new MqttFactory().CreateMqttClient();
- c.ConnectedHandler = new MqttClientConnectedHandlerDelegate(arg =>
- {
- ChangeCount(1);
- System.Console.WriteLine("Client Connected, connectedCount = " + _connectedCount.ToString());
- });
- c.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(arg =>
- {
- ChangeCount(-1);
- string strMsg = "Client disconnected, ";
- if (arg.Exception != null)
- strMsg = strMsg + "exception=" + arg.Exception.Message.ToString();
- if (arg.AuthenticateResult != null)
- strMsg = strMsg + "authReason=" + arg.AuthenticateResult.ReasonString;
- System.Console.WriteLine(strMsg + ", connectedCount=" + _connectedCount.ToString());
- });
- c.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(arg =>
- {
- string payload = System.Text.Encoding.UTF8.GetString(arg.ApplicationMessage.Payload);
- System.Console.WriteLine("Message received, topic [" + arg.ApplicationMessage.Topic + "], payload [" + payload + "], connectedCount=" + _connectedCount.ToString());
- });
- string clientId = serverUserId; // Guid.NewGuid().ToString();
- /*var options = new ManagedMqttClientOptionsBuilder()
- .WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
- .WithClientOptions(new MqttClientOptionsBuilder()
- .WithClientId(clientId)
- .WithCleanSession(true)
- .WithTcpServer(serverIP, Convert.ToInt32(serverPort))
- .WithCredentials(serverUserId, serverPasswd)
- .Build()
- )
- .Build();
- await c.SubscribeAsync(serverTopic, MqttQualityOfServiceLevel.AtLeastOnce);
- await c.StartAsync(options);*/
- var options = new MqttClientOptions() { ClientId = clientId };
- options.ChannelOptions = new MqttClientTcpOptions()
- {
- Server = serverIP,
- Port = Convert.ToInt32(serverPort)
- };
- options.Credentials = new MqttClientCredentials()
- {
- Username = serverUserId,
- Password = System.Text.Encoding.Default.GetBytes(serverPasswd)
- };
- await c.ConnectAsync(options);
- await c.SubscribeAsync(serverTopic, MqttQualityOfServiceLevel.AtLeastOnce);
- _mClientList.Add(c);
- System.Threading.Thread.Sleep(200);
- }//for
- });
- //_timer.Interval = 30000;
- // _timer.Elapsed += new ElapsedEventHandler(timer_elapsed);
- //_timer.Enabled = true;
- }//start
- public async void stop()
- {
- /*foreach(IManagedMqttClient c in _mClientList)
- {
- if (c.IsStarted)
- {
- await c.StopAsync();
- }
- }*/
- }//stop
- private async void timer_elapsed(object sender, ElapsedEventArgs e)
- {
- if (_mClientList.Count < 1)
- return;
- string strPayload = DateTime.Now.ToString();
- IMqttClient c = _mClientList[0];
- await c.PublishAsync(serverTopic, strPayload);
- }
- private void ChangeCount(Int16 chg)
- {
- lock(_lock)
- {
- _connectedCount += chg;
- }
- }
- }
- }
|