MqttClientSession.cs 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. using MQTTnet.Diagnostics;
  2. using MQTTnet.Server.Status;
  3. using System;
  4. using System.Collections.Generic;
  5. using System.Threading.Tasks;
  6. namespace MQTTnet.Server
  7. {
  8. public class MqttClientSession
  9. {
  10. readonly IMqttNetLogger _logger;
  11. readonly DateTime _createdTimestamp = DateTime.UtcNow;
  12. readonly IMqttRetainedMessagesManager _retainedMessagesManager;
  13. public MqttClientSession(string clientId, IDictionary<object, object> items, MqttServerEventDispatcher eventDispatcher, IMqttServerOptions serverOptions, IMqttRetainedMessagesManager retainedMessagesManager, IMqttNetLogger logger)
  14. {
  15. ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId));
  16. Items = items ?? throw new ArgumentNullException(nameof(items));
  17. _retainedMessagesManager = retainedMessagesManager ?? throw new ArgumentNullException(nameof(retainedMessagesManager));
  18. SubscriptionsManager = new MqttClientSubscriptionsManager(this, eventDispatcher, serverOptions);
  19. ApplicationMessagesQueue = new MqttClientSessionApplicationMessagesQueue(serverOptions);
  20. if (logger == null) throw new ArgumentNullException(nameof(logger));
  21. _logger = logger.CreateChildLogger(nameof(MqttClientSession));
  22. }
  23. public string ClientId { get; }
  24. public bool IsCleanSession { get; set; } = true;
  25. public MqttApplicationMessage WillMessage { get; set; }
  26. public MqttClientSubscriptionsManager SubscriptionsManager { get; }
  27. public MqttClientSessionApplicationMessagesQueue ApplicationMessagesQueue { get; }
  28. /// <summary>
  29. /// Gets or sets a key/value collection that can be used to share data within the scope of this session.
  30. /// </summary>
  31. public IDictionary<object, object> Items { get; }
  32. public void EnqueueApplicationMessage(MqttApplicationMessage applicationMessage, string senderClientId, bool isRetainedApplicationMessage)
  33. {
  34. var checkSubscriptionsResult = SubscriptionsManager.CheckSubscriptions(applicationMessage.Topic, applicationMessage.QualityOfServiceLevel);
  35. if (!checkSubscriptionsResult.IsSubscribed)
  36. {
  37. return;
  38. }
  39. _logger.Verbose("Queued application message with topic '{0}' (ClientId: {1}).", applicationMessage.Topic, ClientId);
  40. ApplicationMessagesQueue.Enqueue(applicationMessage, senderClientId, checkSubscriptionsResult.QualityOfServiceLevel, isRetainedApplicationMessage);
  41. }
  42. public async Task SubscribeAsync(ICollection<MqttTopicFilter> topicFilters)
  43. {
  44. if (topicFilters is null) throw new ArgumentNullException(nameof(topicFilters));
  45. await SubscriptionsManager.SubscribeAsync(topicFilters).ConfigureAwait(false);
  46. var matchingRetainedMessages = await _retainedMessagesManager.GetSubscribedMessagesAsync(topicFilters).ConfigureAwait(false);
  47. foreach (var matchingRetainedMessage in matchingRetainedMessages)
  48. {
  49. EnqueueApplicationMessage(matchingRetainedMessage, null, true);
  50. }
  51. }
  52. public Task UnsubscribeAsync(IEnumerable<string> topicFilters)
  53. {
  54. if (topicFilters is null) throw new ArgumentNullException(nameof(topicFilters));
  55. return SubscriptionsManager.UnsubscribeAsync(topicFilters);
  56. }
  57. public void FillStatus(MqttSessionStatus status)
  58. {
  59. status.ClientId = ClientId;
  60. status.CreatedTimestamp = _createdTimestamp;
  61. status.PendingApplicationMessagesCount = ApplicationMessagesQueue.Count;
  62. status.Items = Items;
  63. }
  64. }
  65. }