using System; using System.Collections.Generic; using System.Linq; using System.Timers; using System.Threading.Tasks; using System.Configuration; using MQTTnet; using MQTTnet.Client; using MQTTnet.Client.Connecting; using MQTTnet.Client.Disconnecting; using MQTTnet.Client.Options; using MQTTnet.Client.Receiving; using MQTTnet.Extensions.ManagedClient; using MQTTnet.Formatter; using MQTTnet.Protocol; using MQTTnet.Server; namespace MQTTClientTest { class MultiClients { //private List _mClientList = new List(); private List _mClientList = new List(); private Timer _timer = new Timer(); private Int32 _connectedCount = 0; private object _lock = new object(); string serverIP = ConfigurationManager.AppSettings["MQTTServer.Addr"]; string serverPort = ConfigurationManager.AppSettings["MQTTServer.Port"]; string serverUserId = ConfigurationManager.AppSettings["MQTTServer.UserId"]; string serverPasswd = ConfigurationManager.AppSettings["MQTTServer.Passwd"]; string serverTopic = ConfigurationManager.AppSettings["MQTTServer.Topic"]; // start multiple clients public async void start(int clientsCount) { await Task.Factory.StartNew(async () => { for (int i = 0; i < clientsCount; i++) { //var c = new MqttFactory().CreateManagedMqttClient(); var c = new MqttFactory().CreateMqttClient(); c.ConnectedHandler = new MqttClientConnectedHandlerDelegate(arg => { ChangeCount(1); System.Console.WriteLine("Client Connected, connectedCount = " + _connectedCount.ToString()); }); c.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(arg => { ChangeCount(-1); string strMsg = "Client disconnected, "; if (arg.Exception != null) strMsg = strMsg + "exception=" + arg.Exception.Message.ToString(); if (arg.AuthenticateResult != null) strMsg = strMsg + "authReason=" + arg.AuthenticateResult.ReasonString; System.Console.WriteLine(strMsg + ", connectedCount=" + _connectedCount.ToString()); }); c.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(arg => { string payload = System.Text.Encoding.UTF8.GetString(arg.ApplicationMessage.Payload); System.Console.WriteLine("Message received, topic [" + arg.ApplicationMessage.Topic + "], payload [" + payload + "], connectedCount=" + _connectedCount.ToString()); }); string clientId = serverUserId; // Guid.NewGuid().ToString(); /*var options = new ManagedMqttClientOptionsBuilder() .WithAutoReconnectDelay(TimeSpan.FromSeconds(5)) .WithClientOptions(new MqttClientOptionsBuilder() .WithClientId(clientId) .WithCleanSession(true) .WithTcpServer(serverIP, Convert.ToInt32(serverPort)) .WithCredentials(serverUserId, serverPasswd) .Build() ) .Build(); await c.SubscribeAsync(serverTopic, MqttQualityOfServiceLevel.AtLeastOnce); await c.StartAsync(options);*/ var options = new MqttClientOptions() { ClientId = clientId }; options.ChannelOptions = new MqttClientTcpOptions() { Server = serverIP, Port = Convert.ToInt32(serverPort) }; options.Credentials = new MqttClientCredentials() { Username = serverUserId, Password = System.Text.Encoding.Default.GetBytes(serverPasswd) }; await c.ConnectAsync(options); await c.SubscribeAsync(serverTopic, MqttQualityOfServiceLevel.AtLeastOnce); _mClientList.Add(c); System.Threading.Thread.Sleep(200); }//for }); //_timer.Interval = 30000; // _timer.Elapsed += new ElapsedEventHandler(timer_elapsed); //_timer.Enabled = true; }//start public async void stop() { /*foreach(IManagedMqttClient c in _mClientList) { if (c.IsStarted) { await c.StopAsync(); } }*/ }//stop private async void timer_elapsed(object sender, ElapsedEventArgs e) { if (_mClientList.Count < 1) return; string strPayload = DateTime.Now.ToString(); IMqttClient c = _mClientList[0]; await c.PublishAsync(serverTopic, strPayload); } private void ChangeCount(Int16 chg) { lock(_lock) { _connectedCount += chg; } } } }