using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Connections; using Microsoft.AspNetCore.Hosting; using Microsoft.Extensions.DependencyInjection; using Microsoft.VisualStudio.TestTools.UnitTesting; using System; using System.Linq; using System.Threading; using System.Threading.Tasks; using MQTTnet.Adapter; using MQTTnet.AspNetCore.Tests.Mockups; using MQTTnet.Client.Options; using MQTTnet.Exceptions; using MQTTnet.Formatter; using MQTTnet.Packets; using System.Net; namespace MQTTnet.AspNetCore.Tests { [TestClass] public class MqttConnectionContextTest { [TestMethod] public async Task TestReceivePacketAsyncThrowsWhenReaderCompleted() { var serializer = new MqttPacketFormatterAdapter(MqttProtocolVersion.V311); var pipe = new DuplexPipeMockup(); var connection = new DefaultConnectionContext(); connection.Transport = pipe; var ctx = new MqttConnectionContext(serializer, connection); pipe.Receive.Writer.Complete(); await Assert.ThrowsExceptionAsync(() => ctx.ReceivePacketAsync(TimeSpan.Zero, CancellationToken.None)); } [TestMethod] public async Task TestParallelWrites() { var serializer = new MqttPacketFormatterAdapter(MqttProtocolVersion.V311); var pipe = new DuplexPipeMockup(); var connection = new DefaultConnectionContext(); connection.Transport = pipe; var ctx = new MqttConnectionContext(serializer, connection); var tasks = Enumerable.Range(1, 10).Select(_ => Task.Run(async () => { for (int i = 0; i < 100; i++) { await ctx.SendPacketAsync(new MqttPublishPacket(), TimeSpan.Zero, CancellationToken.None).ConfigureAwait(false); } })); await Task.WhenAll(tasks).ConfigureAwait(false); } [TestMethod] public async Task TestLargePacket() { var serializer = new MqttPacketFormatterAdapter(MqttProtocolVersion.V311); var pipe = new DuplexPipeMockup(); var connection = new DefaultConnectionContext(); connection.Transport = pipe; var ctx = new MqttConnectionContext(serializer, connection); await ctx.SendPacketAsync(new MqttPublishPacket() { Payload = new byte[20_000] }, TimeSpan.Zero, CancellationToken.None).ConfigureAwait(false); var readResult = await pipe.Send.Reader.ReadAsync(); Assert.IsTrue(readResult.Buffer.Length > 20000); } private class Startup { public void Configure(IApplicationBuilder app) { } } [TestMethod] public async Task TestEndpoint() { var mockup = new ConnectionHandlerMockup(); using (var host = new WebHostBuilder() .UseKestrel(kestrel => kestrel.ListenLocalhost(1883, listener => listener.Use((ctx, next) => mockup.OnConnectedAsync(ctx)))) .UseStartup() .ConfigureServices((hostContext, services) => { services.AddHostedMqttServer(o => o.WithoutDefaultEndpoint()); services.AddSingleton(mockup); }) .Build()) using (var client = new MqttFactory().CreateMqttClient()) { host.Start(); await client.ConnectAsync(new MqttClientOptionsBuilder() .WithTcpServer("localhost") .Build(), CancellationToken.None); var ctx = await mockup.Context.Task; #if NETCOREAPP3_1 var ep = IPEndPoint.Parse(ctx.Endpoint); Assert.IsNotNull(ep); #endif Assert.IsNotNull(ctx); } } } }