using MQTTnet; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using MQTTnet.Protocol; using MQTTnet.Server; namespace MqttMsgServer.Tools { public class MqttHelper { public static IMqttServer Server { get; set; } public static void PublishAsync(string topic, byte[] payLoad) { Server?.PublishAsync(new MqttApplicationMessage() { Topic = topic, Payload = payLoad }, new System.Threading.CancellationToken(false)); } public static void PublishAsync(string topic, string payload) { Server?.PublishAsync(new MQTTnet.MqttApplicationMessage() { Topic = topic, Payload = Encoding.UTF8.GetBytes(payload) }, new System.Threading.CancellationToken(false)); } public static async Task CheckExistClient(string clientId) { var list = await Server.GetClientStatusAsync(); var client = list.FirstOrDefault(i => i.ClientId == clientId); //if (client != null) //{ // await client.DisconnectAsync(); //} return client != null; } public static async Task SubscribeAsync(string clientId, string topic) { var topicFilter = new MqttTopicFilter() { Topic = topic, QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce, NoLocal = true }; await SubscribeAsync(clientId, topicFilter); } public static async Task SubscribeAsync(string clientId, MqttTopicFilter topicFilter) { List topicFilters = new List() { topicFilter }; await SubscribeAsync(clientId, topicFilters); } public static async Task SubscribeAsync(string clientId, List topicFilters) { if (await CheckExistClient(clientId)) { Server?.SubscribeAsync(clientId, topicFilters); } } public static async Task UnsubscribeAsync(string clientId, List topicFilters) { if (await CheckExistClient(clientId)) { Server?.UnsubscribeAsync(clientId, topicFilters); } } public static async Task UnSubscribeAsync(string clientId, string topic) { await UnsubscribeAsync(clientId, new List() { topic }); } } }