MultiClients.cs 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Timers;
  5. using System.Threading.Tasks;
  6. using System.Configuration;
  7. using MQTTnet;
  8. using MQTTnet.Client;
  9. using MQTTnet.Client.Connecting;
  10. using MQTTnet.Client.Disconnecting;
  11. using MQTTnet.Client.Options;
  12. using MQTTnet.Client.Receiving;
  13. using MQTTnet.Extensions.ManagedClient;
  14. using MQTTnet.Formatter;
  15. using MQTTnet.Protocol;
  16. using MQTTnet.Server;
  17. namespace MQTTClientTest
  18. {
  19. class MultiClients
  20. {
  21. //private List<IManagedMqttClient> _mClientList = new List<IManagedMqttClient>();
  22. private List<IMqttClient> _mClientList = new List<IMqttClient>();
  23. private Timer _timer = new Timer();
  24. private Int32 _connectedCount = 0;
  25. private object _lock = new object();
  26. string serverIP = ConfigurationManager.AppSettings["MQTTServer.Addr"];
  27. string serverPort = ConfigurationManager.AppSettings["MQTTServer.Port"];
  28. string serverUserId = ConfigurationManager.AppSettings["MQTTServer.UserId"];
  29. string serverPasswd = ConfigurationManager.AppSettings["MQTTServer.Passwd"];
  30. string serverTopic = ConfigurationManager.AppSettings["MQTTServer.Topic"];
  31. // start multiple clients
  32. public async void start(int clientsCount)
  33. {
  34. await Task.Factory.StartNew(async () =>
  35. {
  36. for (int i = 0; i < clientsCount; i++)
  37. {
  38. //var c = new MqttFactory().CreateManagedMqttClient();
  39. var c = new MqttFactory().CreateMqttClient();
  40. c.ConnectedHandler = new MqttClientConnectedHandlerDelegate(arg =>
  41. {
  42. ChangeCount(1);
  43. System.Console.WriteLine("Client Connected, connectedCount = " + _connectedCount.ToString());
  44. });
  45. c.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(arg =>
  46. {
  47. ChangeCount(-1);
  48. string strMsg = "Client disconnected, ";
  49. if (arg.Exception != null)
  50. strMsg = strMsg + "exception=" + arg.Exception.Message.ToString();
  51. if (arg.AuthenticateResult != null)
  52. strMsg = strMsg + "authReason=" + arg.AuthenticateResult.ReasonString;
  53. System.Console.WriteLine(strMsg + ", connectedCount=" + _connectedCount.ToString());
  54. });
  55. c.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(arg =>
  56. {
  57. string payload = System.Text.Encoding.UTF8.GetString(arg.ApplicationMessage.Payload);
  58. System.Console.WriteLine("Message received, topic [" + arg.ApplicationMessage.Topic + "], payload [" + payload + "], connectedCount=" + _connectedCount.ToString());
  59. });
  60. string clientId = serverUserId; // Guid.NewGuid().ToString();
  61. /*var options = new ManagedMqttClientOptionsBuilder()
  62. .WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
  63. .WithClientOptions(new MqttClientOptionsBuilder()
  64. .WithClientId(clientId)
  65. .WithCleanSession(true)
  66. .WithTcpServer(serverIP, Convert.ToInt32(serverPort))
  67. .WithCredentials(serverUserId, serverPasswd)
  68. .Build()
  69. )
  70. .Build();
  71. await c.SubscribeAsync(serverTopic, MqttQualityOfServiceLevel.AtLeastOnce);
  72. await c.StartAsync(options);*/
  73. var options = new MqttClientOptions() { ClientId = clientId };
  74. options.ChannelOptions = new MqttClientTcpOptions()
  75. {
  76. Server = serverIP,
  77. Port = Convert.ToInt32(serverPort)
  78. };
  79. options.Credentials = new MqttClientCredentials()
  80. {
  81. Username = serverUserId,
  82. Password = System.Text.Encoding.Default.GetBytes(serverPasswd)
  83. };
  84. await c.ConnectAsync(options);
  85. await c.SubscribeAsync(serverTopic, MqttQualityOfServiceLevel.AtLeastOnce);
  86. _mClientList.Add(c);
  87. System.Threading.Thread.Sleep(200);
  88. }//for
  89. });
  90. //_timer.Interval = 30000;
  91. // _timer.Elapsed += new ElapsedEventHandler(timer_elapsed);
  92. //_timer.Enabled = true;
  93. }//start
  94. public async void stop()
  95. {
  96. /*foreach(IManagedMqttClient c in _mClientList)
  97. {
  98. if (c.IsStarted)
  99. {
  100. await c.StopAsync();
  101. }
  102. }*/
  103. }//stop
  104. private async void timer_elapsed(object sender, ElapsedEventArgs e)
  105. {
  106. if (_mClientList.Count < 1)
  107. return;
  108. string strPayload = DateTime.Now.ToString();
  109. IMqttClient c = _mClientList[0];
  110. await c.PublishAsync(serverTopic, strPayload);
  111. }
  112. private void ChangeCount(Int16 chg)
  113. {
  114. lock(_lock)
  115. {
  116. _connectedCount += chg;
  117. }
  118. }
  119. }
  120. }