MqttPacketDispatcher.cs 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. using MQTTnet.Exceptions;
  2. using MQTTnet.Packets;
  3. using System;
  4. using System.Collections.Concurrent;
  5. namespace MQTTnet.PacketDispatcher
  6. {
  7. public sealed class MqttPacketDispatcher
  8. {
  9. readonly ConcurrentDictionary<Tuple<ushort, Type>, IMqttPacketAwaiter> _awaiters = new ConcurrentDictionary<Tuple<ushort, Type>, IMqttPacketAwaiter>();
  10. public void Dispatch(Exception exception)
  11. {
  12. foreach (var awaiter in _awaiters)
  13. {
  14. awaiter.Value.Fail(exception);
  15. }
  16. _awaiters.Clear();
  17. }
  18. public void Dispatch(MqttBasePacket packet)
  19. {
  20. if (packet == null) throw new ArgumentNullException(nameof(packet));
  21. if (packet is MqttDisconnectPacket disconnectPacket)
  22. {
  23. foreach (var packetAwaiter in _awaiters)
  24. {
  25. packetAwaiter.Value.Fail(new MqttUnexpectedDisconnectReceivedException(disconnectPacket));
  26. }
  27. return;
  28. }
  29. ushort identifier = 0;
  30. if (packet is IMqttPacketWithIdentifier packetWithIdentifier && packetWithIdentifier.PacketIdentifier.HasValue)
  31. {
  32. identifier = packetWithIdentifier.PacketIdentifier.Value;
  33. }
  34. var type = packet.GetType();
  35. var key = new Tuple<ushort, Type>(identifier, type);
  36. if (_awaiters.TryRemove(key, out var awaiter))
  37. {
  38. awaiter.Complete(packet);
  39. return;
  40. }
  41. throw new MqttProtocolViolationException($"Received packet '{packet}' at an unexpected time.");
  42. }
  43. public void Reset()
  44. {
  45. foreach (var awaiter in _awaiters)
  46. {
  47. awaiter.Value.Cancel();
  48. }
  49. _awaiters.Clear();
  50. }
  51. public MqttPacketAwaiter<TResponsePacket> AddAwaiter<TResponsePacket>(ushort? identifier) where TResponsePacket : MqttBasePacket
  52. {
  53. if (!identifier.HasValue)
  54. {
  55. identifier = 0;
  56. }
  57. var awaiter = new MqttPacketAwaiter<TResponsePacket>(identifier, this);
  58. var key = new Tuple<ushort, Type>(identifier.Value, typeof(TResponsePacket));
  59. if (!_awaiters.TryAdd(key, awaiter))
  60. {
  61. throw new InvalidOperationException($"The packet dispatcher already has an awaiter for packet of type '{key.Item2.Name}' with identifier {key.Item1}.");
  62. }
  63. return awaiter;
  64. }
  65. public void RemoveAwaiter<TResponsePacket>(ushort? identifier) where TResponsePacket : MqttBasePacket
  66. {
  67. if (!identifier.HasValue)
  68. {
  69. identifier = 0;
  70. }
  71. var key = new Tuple<ushort, Type>(identifier.Value, typeof(TResponsePacket));
  72. _awaiters.TryRemove(key, out _);
  73. }
  74. }
  75. }