| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243 |
- using System;
- using System.IO;
- using System.Net;
- using System.Net.Sockets;
- using System.Threading;
- using System.Threading.Tasks;
- namespace MQTTnet.Implementations
- {
- public sealed class CrossPlatformSocket : IDisposable
- {
- readonly Socket _socket;
- NetworkStream _networkStream;
- public CrossPlatformSocket(AddressFamily addressFamily)
- {
- _socket = new Socket(addressFamily, SocketType.Stream, ProtocolType.Tcp);
- }
- public CrossPlatformSocket()
- {
- // Having this contructor is important because avoiding the address family as parameter
- // will make use of dual mode in the .net framework.
- _socket = new Socket(SocketType.Stream, ProtocolType.Tcp);
- }
- public CrossPlatformSocket(Socket socket)
- {
- _socket = socket ?? throw new ArgumentNullException(nameof(socket));
- _networkStream = new NetworkStream(socket, true);
- }
- public bool NoDelay
- {
- get
- {
- return (int)_socket.GetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.NoDelay) > 0;
- }
- set
- {
- _socket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.NoDelay, value ? 1 : 0);
- }
- }
- public bool DualMode
- {
- get
- {
- return _socket.DualMode;
- }
- set
- {
- _socket.DualMode = value;
- }
- }
- public int ReceiveBufferSize
- {
- get
- {
- return _socket.ReceiveBufferSize;
- }
- set
- {
- _socket.ReceiveBufferSize = value;
- }
- }
- public int SendBufferSize
- {
- get
- {
- return _socket.SendBufferSize;
- }
- set
- {
- _socket.SendBufferSize = value;
- }
- }
- public EndPoint RemoteEndPoint
- {
- get
- {
- return _socket.RemoteEndPoint;
- }
- }
- public bool ReuseAddress
- {
- get
- {
- return (int)_socket.GetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress) != 0;
- }
- set
- {
- _socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, value ? 1 : 0);
- }
- }
- public async Task<CrossPlatformSocket> AcceptAsync()
- {
- try
- {
- #if NET452 || NET461
- var clientSocket = await Task.Factory.FromAsync(_socket.BeginAccept, _socket.EndAccept, null).ConfigureAwait(false);
- return new CrossPlatformSocket(clientSocket);
- #else
- var clientSocket = await _socket.AcceptAsync().ConfigureAwait(false);
- return new CrossPlatformSocket(clientSocket);
- #endif
- }
- catch (ObjectDisposedException)
- {
- // This will happen when _socket.EndAccept gets called by Task library but the socket is already disposed.
- return null;
- }
- }
- public void Bind(EndPoint localEndPoint)
- {
- if (localEndPoint is null) throw new ArgumentNullException(nameof(localEndPoint));
- _socket.Bind(localEndPoint);
- }
- public void Listen(int connectionBacklog)
- {
- _socket.Listen(connectionBacklog);
- }
- public async Task ConnectAsync(string host, int port, CancellationToken cancellationToken)
- {
- if (host is null) throw new ArgumentNullException(nameof(host));
- try
- {
- _networkStream?.Dispose();
- // Workaround for: https://github.com/dotnet/corefx/issues/24430
- using (cancellationToken.Register(() => _socket.Dispose()))
- {
- cancellationToken.ThrowIfCancellationRequested();
- #if NET452 || NET461
- await Task.Factory.FromAsync(_socket.BeginConnect, _socket.EndConnect, host, port, null).ConfigureAwait(false);
- #else
- await _socket.ConnectAsync(host, port).ConfigureAwait(false);
- #endif
- _networkStream = new NetworkStream(_socket, true);
- }
- }
- catch (ObjectDisposedException)
- {
- // This will happen when _socket.EndConnect gets called by Task library but the socket is already disposed.
- }
- }
- public async Task SendAsync(ArraySegment<byte> buffer, SocketFlags socketFlags)
- {
- try
- {
- #if NET452 || NET461
- await Task.Factory.FromAsync(SocketWrapper.BeginSend, _socket.EndSend, new SocketWrapper(_socket, buffer, socketFlags)).ConfigureAwait(false);
- #else
- await _socket.SendAsync(buffer, socketFlags).ConfigureAwait(false);
- #endif
- }
- catch (ObjectDisposedException)
- {
- // This will happen when _socket.EndConnect gets called by Task library but the socket is already disposed.
- }
- }
- public async Task<int> ReceiveAsync(ArraySegment<byte> buffer, SocketFlags socketFlags)
- {
- try
- {
- #if NET452 || NET461
- return await Task.Factory.FromAsync(SocketWrapper.BeginReceive, _socket.EndReceive, new SocketWrapper(_socket, buffer, socketFlags)).ConfigureAwait(false);
- #else
- return await _socket.ReceiveAsync(buffer, socketFlags).ConfigureAwait(false);
- #endif
- }
- catch (ObjectDisposedException)
- {
- // This will happen when _socket.EndReceive gets called by Task library but the socket is already disposed.
- return -1;
- }
- }
- public NetworkStream GetStream()
- {
- var networkStream = _networkStream;
- if (networkStream == null)
- {
- throw new IOException("The socket is not connected.");
- }
- return networkStream;
- }
- public void Dispose()
- {
- _networkStream?.Dispose();
- _socket?.Dispose();
- }
- #if NET452 || NET461
- class SocketWrapper
- {
- readonly Socket _socket;
- readonly ArraySegment<byte> _buffer;
- readonly SocketFlags _socketFlags;
- public SocketWrapper(Socket socket, ArraySegment<byte> buffer, SocketFlags socketFlags)
- {
- _socket = socket;
- _buffer = buffer;
- _socketFlags = socketFlags;
- }
- public static IAsyncResult BeginSend(AsyncCallback callback, object state)
- {
- var socketWrapper = (SocketWrapper)state;
- return socketWrapper._socket.BeginSend(socketWrapper._buffer.Array, socketWrapper._buffer.Offset, socketWrapper._buffer.Count, socketWrapper._socketFlags, callback, state);
- }
- public static IAsyncResult BeginReceive(AsyncCallback callback, object state)
- {
- var socketWrapper = (SocketWrapper)state;
- return socketWrapper._socket.BeginReceive(socketWrapper._buffer.Array, socketWrapper._buffer.Offset, socketWrapper._buffer.Count, socketWrapper._socketFlags, callback, state);
- }
- }
- #endif
- }
- }
|