AsyncQueue.cs 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Threading;
  4. using System.Threading.Tasks;
  5. namespace MQTTnet.Internal
  6. {
  7. public sealed class AsyncQueue<TItem> : IDisposable
  8. {
  9. private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(0);
  10. private ConcurrentQueue<TItem> _queue = new ConcurrentQueue<TItem>();
  11. public int Count => _queue.Count;
  12. public void Enqueue(TItem item)
  13. {
  14. _queue.Enqueue(item);
  15. _semaphore.Release();
  16. }
  17. public async Task<AsyncQueueDequeueResult<TItem>> TryDequeueAsync(CancellationToken cancellationToken)
  18. {
  19. while (!cancellationToken.IsCancellationRequested)
  20. {
  21. try
  22. {
  23. await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
  24. cancellationToken.ThrowIfCancellationRequested();
  25. }
  26. catch (OperationCanceledException)
  27. {
  28. return new AsyncQueueDequeueResult<TItem>(false, default(TItem));
  29. }
  30. if (_queue.TryDequeue(out var item))
  31. {
  32. return new AsyncQueueDequeueResult<TItem>(true, item);
  33. }
  34. }
  35. return new AsyncQueueDequeueResult<TItem>(false, default(TItem));
  36. }
  37. public AsyncQueueDequeueResult<TItem> TryDequeue()
  38. {
  39. if (_queue.TryDequeue(out var item))
  40. {
  41. return new AsyncQueueDequeueResult<TItem>(true, item);
  42. }
  43. return new AsyncQueueDequeueResult<TItem>(false, default(TItem));
  44. }
  45. public void Clear()
  46. {
  47. Interlocked.Exchange(ref _queue, new ConcurrentQueue<TItem>());
  48. }
  49. public void Dispose()
  50. {
  51. _semaphore?.Dispose();
  52. }
  53. }
  54. }