Program.cs 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. using MQTTnet.Server;
  2. using System;
  3. using System.Collections.Generic;
  4. using System.Linq;
  5. using System.Runtime.CompilerServices;
  6. using System.ServiceProcess;
  7. using System.Text;
  8. using System.Threading.Tasks;
  9. using ConsoleHttp.Model;
  10. using MqttMsgServer.Dao;
  11. using MqttMsgServer.Model;
  12. using MqttMsgServer.Service.Client;
  13. using MqttMsgServer.Service.Client.Dto;
  14. using MqttMsgServer.Tools;
  15. using MQTTnet;
  16. using MQTTnet.Protocol;
  17. using MqttMsgServer.Service;
  18. namespace MqttMsgServer
  19. {
  20. static class Program
  21. {
  22. /// <summary>
  23. /// 应用程序的主入口点。
  24. /// </summary>
  25. static async Task Main()
  26. {
  27. int mqttPort = AppSetting.GetInt("MqttPort");
  28. await MqttConfig(mqttPort);
  29. ServiceBase[] ServicesToRun;
  30. ServicesToRun = new ServiceBase[]
  31. {
  32. new MqttMsgServer()
  33. };
  34. ServiceBase.Run(ServicesToRun);
  35. }
  36. private static async Task MqttConfig(int port = 1883)
  37. {
  38. IMqttServer server;
  39. var optionBuilder = new MqttServerOptionsBuilder().
  40. WithDefaultEndpoint().WithDefaultEndpointPort(port).WithConnectionValidator(
  41. c =>
  42. {
  43. //if (await MqttHelper.CheckExistClient(c.ClientId))
  44. //{
  45. // typeof(Program).LogInfo($"ClientId[{c.ClientId}] has connected.please disconneted!");
  46. // c.ReasonCode = MqttConnectReasonCode.UseAnotherServer;
  47. // return;
  48. //}
  49. try
  50. {
  51. ClientInfoService cs = new ClientInfoService();
  52. ResponseResult rs = cs.ConnectServer(new ConnectDto()
  53. { ClientId = c.ClientId, ClientName = c.Username, Password = c.Password });
  54. if (!rs.IsSuccess)
  55. {
  56. typeof(Program).LogInfo($"ClientId[{c.ClientId}] disconnected event fired.{rs.ErrorMessage}");
  57. c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
  58. return;
  59. }
  60. c.ReasonCode = MqttConnectReasonCode.Success;
  61. }
  62. catch(Exception err)
  63. {
  64. Console.WriteLine(err.Message);
  65. typeof(Program).LogInfo($"{err.Message}");
  66. }
  67. }).WithSubscriptionInterceptor(c =>
  68. {
  69. if (c == null) return;
  70. c.AcceptSubscription = true;
  71. typeof(Program).LogInfo($@"{DateTime.Now}:订阅者{c.ClientId},订阅:{c.TopicFilter.Topic}");
  72. }).WithApplicationMessageInterceptor(c =>
  73. {
  74. if (c == null) return;
  75. c.AcceptPublish = true;
  76. try
  77. {
  78. string str = c.ApplicationMessage?.Payload == null ? null : Encoding.UTF8.GetString(c.ApplicationMessage?.Payload) + "\r\n";
  79. //ClientInfoService cs = new ClientInfoService();
  80. //cs.WriteCache(c.ClientId, c.ApplicationMessage?.Topic, str);
  81. var entity = new ReceiveMessageRecord()
  82. {
  83. Id = Guid.NewGuid().ToString("N"),
  84. ClientId = c.ClientId,
  85. Topic = c.ApplicationMessage?.Topic,
  86. Payload = str,
  87. QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce,
  88. CreatorDate = DateTime.Now,
  89. CreatorUserId = 1
  90. };
  91. ThreadMsg.ThreadReceiveMsg.Insert(entity);
  92. typeof(Program).LogInfo($@"订阅者{c.ClientId},订阅:{c.ApplicationMessage?.Topic},发送消息:{str}");
  93. }
  94. catch (Exception e)
  95. {
  96. typeof(Program).LogInfo(e);
  97. }
  98. });
  99. server = new MqttFactory().CreateMqttServer();
  100. server.UseClientDisconnectedHandler(c =>
  101. {
  102. typeof(Program).LogInfo($@"{DateTime.Now}:订阅者{c.ClientId}已退出!");
  103. });
  104. await server.StartAsync(optionBuilder.Build());
  105. MqttHelper.Server = server;
  106. }
  107. }
  108. }