using System; using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; namespace MQTTnet.Internal { public sealed class AsyncQueue : IDisposable { private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(0); private ConcurrentQueue _queue = new ConcurrentQueue(); public int Count => _queue.Count; public void Enqueue(TItem item) { _queue.Enqueue(item); _semaphore.Release(); } public async Task> TryDequeueAsync(CancellationToken cancellationToken) { while (!cancellationToken.IsCancellationRequested) { try { await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false); cancellationToken.ThrowIfCancellationRequested(); } catch (OperationCanceledException) { return new AsyncQueueDequeueResult(false, default(TItem)); } if (_queue.TryDequeue(out var item)) { return new AsyncQueueDequeueResult(true, item); } } return new AsyncQueueDequeueResult(false, default(TItem)); } public AsyncQueueDequeueResult TryDequeue() { if (_queue.TryDequeue(out var item)) { return new AsyncQueueDequeueResult(true, item); } return new AsyncQueueDequeueResult(false, default(TItem)); } public void Clear() { Interlocked.Exchange(ref _queue, new ConcurrentQueue()); } public void Dispose() { _semaphore?.Dispose(); } } }