MqttClientSessionApplicationMessagesQueue.cs 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. using MQTTnet.Internal;
  2. using MQTTnet.Protocol;
  3. using System;
  4. using System.Threading;
  5. using System.Threading.Tasks;
  6. namespace MQTTnet.Server
  7. {
  8. public class MqttClientSessionApplicationMessagesQueue : Disposable
  9. {
  10. private readonly AsyncQueue<MqttQueuedApplicationMessage> _messageQueue = new AsyncQueue<MqttQueuedApplicationMessage>();
  11. private readonly IMqttServerOptions _options;
  12. public MqttClientSessionApplicationMessagesQueue(IMqttServerOptions options)
  13. {
  14. _options = options ?? throw new ArgumentNullException(nameof(options));
  15. }
  16. public int Count => _messageQueue.Count;
  17. public void Enqueue(MqttApplicationMessage applicationMessage, string senderClientId, MqttQualityOfServiceLevel qualityOfServiceLevel, bool isRetainedMessage)
  18. {
  19. if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));
  20. Enqueue(new MqttQueuedApplicationMessage
  21. {
  22. ApplicationMessage = applicationMessage,
  23. SenderClientId = senderClientId,
  24. QualityOfServiceLevel = qualityOfServiceLevel,
  25. IsRetainedMessage = isRetainedMessage
  26. });
  27. }
  28. public void Clear()
  29. {
  30. _messageQueue.Clear();
  31. }
  32. public async Task<MqttQueuedApplicationMessage> TakeAsync(CancellationToken cancellationToken)
  33. {
  34. var dequeueResult = await _messageQueue.TryDequeueAsync(cancellationToken).ConfigureAwait(false);
  35. if (!dequeueResult.IsSuccess)
  36. {
  37. return null;
  38. }
  39. return dequeueResult.Item;
  40. }
  41. public void Enqueue(MqttQueuedApplicationMessage queuedApplicationMessage)
  42. {
  43. if (queuedApplicationMessage == null) throw new ArgumentNullException(nameof(queuedApplicationMessage));
  44. lock (_messageQueue)
  45. {
  46. if (_messageQueue.Count >= _options.MaxPendingMessagesPerClient)
  47. {
  48. if (_options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropNewMessage)
  49. {
  50. return;
  51. }
  52. if (_options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage)
  53. {
  54. _messageQueue.TryDequeue();
  55. }
  56. }
  57. _messageQueue.Enqueue(queuedApplicationMessage);
  58. }
  59. }
  60. protected override void Dispose(bool disposing)
  61. {
  62. if (disposing)
  63. {
  64. _messageQueue.Dispose();
  65. }
  66. base.Dispose(disposing);
  67. }
  68. }
  69. }