MqttConnectionContextTest.cs 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. using Microsoft.AspNetCore.Builder;
  2. using Microsoft.AspNetCore.Connections;
  3. using Microsoft.AspNetCore.Hosting;
  4. using Microsoft.Extensions.DependencyInjection;
  5. using Microsoft.VisualStudio.TestTools.UnitTesting;
  6. using System;
  7. using System.Linq;
  8. using System.Threading;
  9. using System.Threading.Tasks;
  10. using MQTTnet.Adapter;
  11. using MQTTnet.AspNetCore.Tests.Mockups;
  12. using MQTTnet.Client.Options;
  13. using MQTTnet.Exceptions;
  14. using MQTTnet.Formatter;
  15. using MQTTnet.Packets;
  16. using System.Net;
  17. namespace MQTTnet.AspNetCore.Tests
  18. {
  19. [TestClass]
  20. public class MqttConnectionContextTest
  21. {
  22. [TestMethod]
  23. public async Task TestReceivePacketAsyncThrowsWhenReaderCompleted()
  24. {
  25. var serializer = new MqttPacketFormatterAdapter(MqttProtocolVersion.V311);
  26. var pipe = new DuplexPipeMockup();
  27. var connection = new DefaultConnectionContext();
  28. connection.Transport = pipe;
  29. var ctx = new MqttConnectionContext(serializer, connection);
  30. pipe.Receive.Writer.Complete();
  31. await Assert.ThrowsExceptionAsync<MqttCommunicationException>(() => ctx.ReceivePacketAsync(TimeSpan.Zero, CancellationToken.None));
  32. }
  33. [TestMethod]
  34. public async Task TestParallelWrites()
  35. {
  36. var serializer = new MqttPacketFormatterAdapter(MqttProtocolVersion.V311);
  37. var pipe = new DuplexPipeMockup();
  38. var connection = new DefaultConnectionContext();
  39. connection.Transport = pipe;
  40. var ctx = new MqttConnectionContext(serializer, connection);
  41. var tasks = Enumerable.Range(1, 10).Select(_ => Task.Run(async () =>
  42. {
  43. for (int i = 0; i < 100; i++)
  44. {
  45. await ctx.SendPacketAsync(new MqttPublishPacket(), TimeSpan.Zero, CancellationToken.None).ConfigureAwait(false);
  46. }
  47. }));
  48. await Task.WhenAll(tasks).ConfigureAwait(false);
  49. }
  50. [TestMethod]
  51. public async Task TestLargePacket()
  52. {
  53. var serializer = new MqttPacketFormatterAdapter(MqttProtocolVersion.V311);
  54. var pipe = new DuplexPipeMockup();
  55. var connection = new DefaultConnectionContext();
  56. connection.Transport = pipe;
  57. var ctx = new MqttConnectionContext(serializer, connection);
  58. await ctx.SendPacketAsync(new MqttPublishPacket() { Payload = new byte[20_000] }, TimeSpan.Zero, CancellationToken.None).ConfigureAwait(false);
  59. var readResult = await pipe.Send.Reader.ReadAsync();
  60. Assert.IsTrue(readResult.Buffer.Length > 20000);
  61. }
  62. private class Startup
  63. {
  64. public void Configure(IApplicationBuilder app)
  65. {
  66. }
  67. }
  68. [TestMethod]
  69. public async Task TestEndpoint()
  70. {
  71. var mockup = new ConnectionHandlerMockup();
  72. using (var host = new WebHostBuilder()
  73. .UseKestrel(kestrel => kestrel.ListenLocalhost(1883, listener => listener.Use((ctx, next) => mockup.OnConnectedAsync(ctx))))
  74. .UseStartup<Startup>()
  75. .ConfigureServices((hostContext, services) =>
  76. {
  77. services.AddHostedMqttServer(o => o.WithoutDefaultEndpoint());
  78. services.AddSingleton<IMqttServerAdapter>(mockup);
  79. })
  80. .Build())
  81. using (var client = new MqttFactory().CreateMqttClient())
  82. {
  83. host.Start();
  84. await client.ConnectAsync(new MqttClientOptionsBuilder()
  85. .WithTcpServer("localhost")
  86. .Build(), CancellationToken.None);
  87. var ctx = await mockup.Context.Task;
  88. #if NETCOREAPP3_1
  89. var ep = IPEndPoint.Parse(ctx.Endpoint);
  90. Assert.IsNotNull(ep);
  91. #endif
  92. Assert.IsNotNull(ctx);
  93. }
  94. }
  95. }
  96. }