BlockingQueue.cs 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Threading;
  4. namespace MQTTnet.Internal
  5. {
  6. public sealed class BlockingQueue<TItem> : IDisposable
  7. {
  8. readonly object _syncRoot = new object();
  9. readonly LinkedList<TItem> _items = new LinkedList<TItem>();
  10. ManualResetEventSlim _gate = new ManualResetEventSlim(false);
  11. public int Count
  12. {
  13. get
  14. {
  15. lock (_syncRoot)
  16. {
  17. return _items.Count;
  18. }
  19. }
  20. }
  21. public void Enqueue(TItem item)
  22. {
  23. if (item == null) throw new ArgumentNullException(nameof(item));
  24. lock (_syncRoot)
  25. {
  26. _items.AddLast(item);
  27. _gate?.Set();
  28. }
  29. }
  30. public TItem Dequeue(CancellationToken cancellationToken = default)
  31. {
  32. while (!cancellationToken.IsCancellationRequested)
  33. {
  34. lock (_syncRoot)
  35. {
  36. if (_items.Count > 0)
  37. {
  38. var item = _items.First.Value;
  39. _items.RemoveFirst();
  40. return item;
  41. }
  42. if (_items.Count == 0)
  43. {
  44. _gate?.Reset();
  45. }
  46. }
  47. _gate?.Wait(cancellationToken);
  48. }
  49. throw new OperationCanceledException();
  50. }
  51. public TItem PeekAndWait(CancellationToken cancellationToken = default)
  52. {
  53. while (!cancellationToken.IsCancellationRequested)
  54. {
  55. lock (_syncRoot)
  56. {
  57. if (_items.Count > 0)
  58. {
  59. return _items.First.Value;
  60. }
  61. if (_items.Count == 0)
  62. {
  63. _gate?.Reset();
  64. }
  65. }
  66. _gate?.Wait(cancellationToken);
  67. }
  68. throw new OperationCanceledException();
  69. }
  70. public void RemoveFirst(Predicate<TItem> match)
  71. {
  72. if (match == null) throw new ArgumentNullException(nameof(match));
  73. lock (_syncRoot)
  74. {
  75. if (_items.Count > 0 && match(_items.First.Value))
  76. {
  77. _items.RemoveFirst();
  78. }
  79. }
  80. }
  81. public TItem RemoveFirst()
  82. {
  83. lock (_syncRoot)
  84. {
  85. var item = _items.First;
  86. _items.RemoveFirst();
  87. return item.Value;
  88. }
  89. }
  90. public void Clear()
  91. {
  92. lock (_syncRoot)
  93. {
  94. _items.Clear();
  95. }
  96. }
  97. public void Dispose()
  98. {
  99. _gate?.Dispose();
  100. _gate = null;
  101. }
  102. }
  103. }