| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115 |
- using MQTTnet.Server;
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Runtime.CompilerServices;
- using System.ServiceProcess;
- using System.Text;
- using System.Threading.Tasks;
- using ConsoleHttp.Model;
- using MqttMsgServer.Dao;
- using MqttMsgServer.Model;
- using MqttMsgServer.Service.Client;
- using MqttMsgServer.Service.Client.Dto;
- using MqttMsgServer.Tools;
- using MQTTnet;
- using MQTTnet.Protocol;
- using MqttMsgServer.Service;
- namespace MqttMsgServer
- {
- static class Program
- {
- /// <summary>
- /// 应用程序的主入口点。
- /// </summary>
- static async Task Main()
- {
- int mqttPort = AppSetting.GetInt("MqttPort");
- await MqttConfig(mqttPort);
- ServiceBase[] ServicesToRun;
- ServicesToRun = new ServiceBase[]
- {
- new MqttMsgServer()
- };
- ServiceBase.Run(ServicesToRun);
-
- }
- private static async Task MqttConfig(int port = 1883)
- {
- IMqttServer server;
- var optionBuilder = new MqttServerOptionsBuilder().
- WithDefaultEndpoint().WithDefaultEndpointPort(port).WithConnectionValidator(
- c =>
- {
- //if (await MqttHelper.CheckExistClient(c.ClientId))
- //{
- // typeof(Program).LogInfo($"ClientId[{c.ClientId}] has connected.please disconneted!");
- // c.ReasonCode = MqttConnectReasonCode.UseAnotherServer;
- // return;
- //}
-
- try
- {
- ClientInfoService cs = new ClientInfoService();
- ResponseResult rs = cs.ConnectServer(new ConnectDto()
- { ClientId = c.ClientId, ClientName = c.Username, Password = c.Password });
- if (!rs.IsSuccess)
- {
- typeof(Program).LogInfo($"ClientId[{c.ClientId}] disconnected event fired.{rs.ErrorMessage}");
- c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
- return;
- }
- c.ReasonCode = MqttConnectReasonCode.Success;
- }
- catch(Exception err)
- {
- Console.WriteLine(err.Message);
- typeof(Program).LogInfo($"{err.Message}");
- }
-
- }).WithSubscriptionInterceptor(c =>
- {
- if (c == null) return;
- c.AcceptSubscription = true;
- typeof(Program).LogInfo($@"{DateTime.Now}:订阅者{c.ClientId},订阅:{c.TopicFilter.Topic}");
- }).WithApplicationMessageInterceptor(c =>
- {
- if (c == null) return;
- c.AcceptPublish = true;
- try
- {
- string str = c.ApplicationMessage?.Payload == null ? null : Encoding.UTF8.GetString(c.ApplicationMessage?.Payload) + "\r\n";
- //ClientInfoService cs = new ClientInfoService();
- //cs.WriteCache(c.ClientId, c.ApplicationMessage?.Topic, str);
- var entity = new ReceiveMessageRecord()
- {
- Id = Guid.NewGuid().ToString("N"),
- ClientId = c.ClientId,
- Topic = c.ApplicationMessage?.Topic,
- Payload = str,
- QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce,
- CreatorDate = DateTime.Now,
- CreatorUserId = 1
- };
- ThreadMsg.ThreadReceiveMsg.Insert(entity);
- typeof(Program).LogInfo($@"订阅者{c.ClientId},订阅:{c.ApplicationMessage?.Topic},发送消息:{str}");
- }
- catch (Exception e)
- {
- typeof(Program).LogInfo(e);
- }
-
- });
- server = new MqttFactory().CreateMqttServer();
- server.UseClientDisconnectedHandler(c =>
- {
-
- typeof(Program).LogInfo($@"{DateTime.Now}:订阅者{c.ClientId}已退出!");
- });
- await server.StartAsync(optionBuilder.Build());
- MqttHelper.Server = server;
- }
- }
- }
|