| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384 |
- using MQTTnet.Internal;
- using MQTTnet.Protocol;
- using System;
- using System.Threading;
- using System.Threading.Tasks;
- namespace MQTTnet.Server
- {
- public class MqttClientSessionApplicationMessagesQueue : Disposable
- {
- private readonly AsyncQueue<MqttQueuedApplicationMessage> _messageQueue = new AsyncQueue<MqttQueuedApplicationMessage>();
-
- private readonly IMqttServerOptions _options;
- public MqttClientSessionApplicationMessagesQueue(IMqttServerOptions options)
- {
- _options = options ?? throw new ArgumentNullException(nameof(options));
- }
- public int Count => _messageQueue.Count;
- public void Enqueue(MqttApplicationMessage applicationMessage, string senderClientId, MqttQualityOfServiceLevel qualityOfServiceLevel, bool isRetainedMessage)
- {
- if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));
- Enqueue(new MqttQueuedApplicationMessage
- {
- ApplicationMessage = applicationMessage,
- SenderClientId = senderClientId,
- QualityOfServiceLevel = qualityOfServiceLevel,
- IsRetainedMessage = isRetainedMessage
- });
- }
- public void Clear()
- {
- _messageQueue.Clear();
- }
- public async Task<MqttQueuedApplicationMessage> TakeAsync(CancellationToken cancellationToken)
- {
- var dequeueResult = await _messageQueue.TryDequeueAsync(cancellationToken).ConfigureAwait(false);
- if (!dequeueResult.IsSuccess)
- {
- return null;
- }
- return dequeueResult.Item;
- }
- public void Enqueue(MqttQueuedApplicationMessage queuedApplicationMessage)
- {
- if (queuedApplicationMessage == null) throw new ArgumentNullException(nameof(queuedApplicationMessage));
- lock (_messageQueue)
- {
- if (_messageQueue.Count >= _options.MaxPendingMessagesPerClient)
- {
- if (_options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropNewMessage)
- {
- return;
- }
- if (_options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage)
- {
- _messageQueue.TryDequeue();
- }
- }
- _messageQueue.Enqueue(queuedApplicationMessage);
- }
- }
- protected override void Dispose(bool disposing)
- {
- if (disposing)
- {
- _messageQueue.Dispose();
- }
- base.Dispose(disposing);
- }
- }
- }
|