using MQTTnet.Server; using System; using System.Collections.Generic; using System.Linq; using System.Runtime.CompilerServices; using System.ServiceProcess; using System.Text; using System.Threading.Tasks; using ConsoleHttp.Model; using MqttMsgServer.Dao; using MqttMsgServer.Model; using MqttMsgServer.Service.Client; using MqttMsgServer.Service.Client.Dto; using MqttMsgServer.Tools; using MQTTnet; using MQTTnet.Protocol; using MqttMsgServer.Service; namespace MqttMsgServer { static class Program { /// /// 应用程序的主入口点。 /// static async Task Main() { int mqttPort = AppSetting.GetInt("MqttPort"); await MqttConfig(mqttPort); ServiceBase[] ServicesToRun; ServicesToRun = new ServiceBase[] { new MqttMsgServer() }; ServiceBase.Run(ServicesToRun); } private static async Task MqttConfig(int port = 1883) { IMqttServer server; var optionBuilder = new MqttServerOptionsBuilder(). WithDefaultEndpoint().WithDefaultEndpointPort(port).WithConnectionValidator( c => { //if (await MqttHelper.CheckExistClient(c.ClientId)) //{ // typeof(Program).LogInfo($"ClientId[{c.ClientId}] has connected.please disconneted!"); // c.ReasonCode = MqttConnectReasonCode.UseAnotherServer; // return; //} try { ClientInfoService cs = new ClientInfoService(); ResponseResult rs = cs.ConnectServer(new ConnectDto() { ClientId = c.ClientId, ClientName = c.Username, Password = c.Password }); if (!rs.IsSuccess) { typeof(Program).LogInfo($"ClientId[{c.ClientId}] disconnected event fired.{rs.ErrorMessage}"); c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword; return; } c.ReasonCode = MqttConnectReasonCode.Success; } catch(Exception err) { Console.WriteLine(err.Message); typeof(Program).LogInfo($"{err.Message}"); } }).WithSubscriptionInterceptor(c => { if (c == null) return; c.AcceptSubscription = true; typeof(Program).LogInfo($@"{DateTime.Now}:订阅者{c.ClientId},订阅:{c.TopicFilter.Topic}"); }).WithApplicationMessageInterceptor(c => { if (c == null) return; c.AcceptPublish = true; try { string str = c.ApplicationMessage?.Payload == null ? null : Encoding.UTF8.GetString(c.ApplicationMessage?.Payload) + "\r\n"; //ClientInfoService cs = new ClientInfoService(); //cs.WriteCache(c.ClientId, c.ApplicationMessage?.Topic, str); var entity = new ReceiveMessageRecord() { Id = Guid.NewGuid().ToString("N"), ClientId = c.ClientId, Topic = c.ApplicationMessage?.Topic, Payload = str, QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce, CreatorDate = DateTime.Now, CreatorUserId = 1 }; ThreadMsg.ThreadReceiveMsg.Insert(entity); typeof(Program).LogInfo($@"订阅者{c.ClientId},订阅:{c.ApplicationMessage?.Topic},发送消息:{str}"); } catch (Exception e) { typeof(Program).LogInfo(e); } }); server = new MqttFactory().CreateMqttServer(); server.UseClientDisconnectedHandler(c => { typeof(Program).LogInfo($@"{DateTime.Now}:订阅者{c.ClientId}已退出!"); }); await server.StartAsync(optionBuilder.Build()); MqttHelper.Server = server; } } }