MqttPacketAwaiter.cs 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. using MQTTnet.Exceptions;
  2. using MQTTnet.Packets;
  3. using System;
  4. using System.Threading;
  5. using System.Threading.Tasks;
  6. namespace MQTTnet.PacketDispatcher
  7. {
  8. public sealed class MqttPacketAwaiter<TPacket> : IMqttPacketAwaiter where TPacket : MqttBasePacket
  9. {
  10. readonly TaskCompletionSource<MqttBasePacket> _taskCompletionSource;
  11. readonly ushort? _packetIdentifier;
  12. readonly MqttPacketDispatcher _owningPacketDispatcher;
  13. public MqttPacketAwaiter(ushort? packetIdentifier, MqttPacketDispatcher owningPacketDispatcher)
  14. {
  15. _packetIdentifier = packetIdentifier;
  16. _owningPacketDispatcher = owningPacketDispatcher ?? throw new ArgumentNullException(nameof(owningPacketDispatcher));
  17. #if NET452
  18. _taskCompletionSource = new TaskCompletionSource<MqttBasePacket>();
  19. #else
  20. _taskCompletionSource = new TaskCompletionSource<MqttBasePacket>(TaskCreationOptions.RunContinuationsAsynchronously);
  21. #endif
  22. }
  23. public async Task<TPacket> WaitOneAsync(TimeSpan timeout)
  24. {
  25. using (var timeoutToken = new CancellationTokenSource(timeout))
  26. {
  27. timeoutToken.Token.Register(() => Fail(new MqttCommunicationTimedOutException()));
  28. var packet = await _taskCompletionSource.Task.ConfigureAwait(false);
  29. return (TPacket)packet;
  30. }
  31. }
  32. public void Complete(MqttBasePacket packet)
  33. {
  34. if (packet == null) throw new ArgumentNullException(nameof(packet));
  35. #if NET452
  36. // To prevent deadlocks it is required to call the _TrySetResult_ method
  37. // from a new thread because the awaiting code will not(!) be executed in
  38. // a new thread automatically (due to await). Furthermore _this_ thread will
  39. // do it. But _this_ thread is also reading incoming packets -> deadlock.
  40. // NET452 does not support RunContinuationsAsynchronously
  41. Task.Run(() => _taskCompletionSource.TrySetResult(packet));
  42. #else
  43. _taskCompletionSource.TrySetResult(packet);
  44. #endif
  45. }
  46. public void Fail(Exception exception)
  47. {
  48. if (exception == null) throw new ArgumentNullException(nameof(exception));
  49. #if NET452
  50. // To prevent deadlocks it is required to call the _TrySetResult_ method
  51. // from a new thread because the awaiting code will not(!) be executed in
  52. // a new thread automatically (due to await). Furthermore _this_ thread will
  53. // do it. But _this_ thread is also reading incoming packets -> deadlock.
  54. // NET452 does not support RunContinuationsAsynchronously
  55. Task.Run(() => _taskCompletionSource.TrySetException(exception));
  56. #else
  57. _taskCompletionSource.TrySetException(exception);
  58. #endif
  59. }
  60. public void Cancel()
  61. {
  62. #if NET452
  63. // To prevent deadlocks it is required to call the _TrySetResult_ method
  64. // from a new thread because the awaiting code will not(!) be executed in
  65. // a new thread automatically (due to await). Furthermore _this_ thread will
  66. // do it. But _this_ thread is also reading incoming packets -> deadlock.
  67. // NET452 does not support RunContinuationsAsynchronously
  68. Task.Run(() => _taskCompletionSource.TrySetCanceled());
  69. #else
  70. _taskCompletionSource.TrySetCanceled();
  71. #endif
  72. }
  73. public void Dispose()
  74. {
  75. _owningPacketDispatcher.RemoveAwaiter<TPacket>(_packetIdentifier);
  76. }
  77. }
  78. }