MqttHelper.cs 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. using MQTTnet;
  2. using System;
  3. using System.Collections.Generic;
  4. using System.Linq;
  5. using System.Text;
  6. using System.Threading.Tasks;
  7. using MQTTnet.Protocol;
  8. using MQTTnet.Server;
  9. namespace MqttMsgServer.Tools
  10. {
  11. public class MqttHelper
  12. {
  13. public static IMqttServer Server { get; set; }
  14. public static void PublishAsync(string topic, byte[] payLoad)
  15. {
  16. Server?.PublishAsync(new MqttApplicationMessage()
  17. {
  18. Topic = topic,
  19. Payload = payLoad
  20. }, new System.Threading.CancellationToken(false));
  21. }
  22. public static void PublishAsync(string topic, string payload)
  23. {
  24. Server?.PublishAsync(new MQTTnet.MqttApplicationMessage()
  25. {
  26. Topic = topic,
  27. Payload = Encoding.UTF8.GetBytes(payload)
  28. }, new System.Threading.CancellationToken(false));
  29. }
  30. public static async Task<bool> CheckExistClient(string clientId)
  31. {
  32. var list = await Server.GetClientStatusAsync();
  33. var client = list.FirstOrDefault(i => i.ClientId == clientId);
  34. //if (client != null)
  35. //{
  36. // await client.DisconnectAsync();
  37. //}
  38. return client != null;
  39. }
  40. public static async Task SubscribeAsync(string clientId, string topic)
  41. {
  42. var topicFilter = new MqttTopicFilter()
  43. { Topic = topic, QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce, NoLocal = true };
  44. await SubscribeAsync(clientId, topicFilter);
  45. }
  46. public static async Task SubscribeAsync(string clientId, MqttTopicFilter topicFilter)
  47. {
  48. List<MqttTopicFilter> topicFilters = new List<MqttTopicFilter>() { topicFilter };
  49. await SubscribeAsync(clientId, topicFilters);
  50. }
  51. public static async Task SubscribeAsync(string clientId, List<MqttTopicFilter> topicFilters)
  52. {
  53. if (await CheckExistClient(clientId))
  54. {
  55. Server?.SubscribeAsync(clientId, topicFilters);
  56. }
  57. }
  58. public static async Task UnsubscribeAsync(string clientId, List<string> topicFilters)
  59. {
  60. if (await CheckExistClient(clientId))
  61. {
  62. Server?.UnsubscribeAsync(clientId, topicFilters);
  63. }
  64. }
  65. public static async Task UnSubscribeAsync(string clientId, string topic)
  66. {
  67. await UnsubscribeAsync(clientId, new List<string>() { topic });
  68. }
  69. }
  70. }