HttpServerBase.cs 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Net;
  5. using System.Text;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. using MqttMsgServer.Tools;
  9. namespace MqttMsgServer.HttpService
  10. {
  11. public abstract class HttpServerBase: IDisposable
  12. {
  13. private readonly HttpListener _listener; // HTTP 协议侦听器
  14. private readonly Thread _listenerThread; // 监听线程
  15. private readonly Thread[] _workers; // 工作线程组
  16. private readonly ManualResetEvent _stop, _ready; // 通知停止、就绪
  17. private Queue<HttpListenerContext> _queue; // 请求队列
  18. private event Action<HttpListenerContext> ProcessRequest; // 请求处理委托
  19. public HttpServerBase(int maxThreads)
  20. {
  21. _workers = new Thread[maxThreads];
  22. _queue = new Queue<HttpListenerContext>();
  23. _stop = new ManualResetEvent(false);
  24. _ready = new ManualResetEvent(false);
  25. _listener = new HttpListener();
  26. _listenerThread = new Thread(HandleRequests);
  27. }
  28. public void Start(int port)
  29. {
  30. // 注册处理函数
  31. ProcessRequest += ProcessHttpRequest;
  32. // 启动Http服务
  33. _listener.Prefixes.Add($"http://*:{port}/");
  34. _listener.Start();
  35. _listenerThread.Start();
  36. // 启动工作线程
  37. for (int i = 0; i < _workers.Length; i++)
  38. {
  39. _workers[i] = new Thread(Worker);
  40. _workers[i].Start();
  41. }
  42. }
  43. // 请求处理函数
  44. protected abstract void ProcessHttpRequest(HttpListenerContext ctx);
  45. // 释放资源
  46. public void Dispose()
  47. {
  48. Stop();
  49. }
  50. // 停止服务
  51. public void Stop()
  52. {
  53. _stop.Set();
  54. _listenerThread.Join();
  55. foreach (Thread worker in _workers)
  56. {
  57. worker.Join();
  58. }
  59. _listener.Stop();
  60. }
  61. // 处理请求
  62. private void HandleRequests()
  63. {
  64. while (_listener.IsListening)
  65. {
  66. var context = _listener.BeginGetContext(ContextReady, null);
  67. if (0 == WaitHandle.WaitAny(new[] { _stop, context.AsyncWaitHandle }))
  68. {
  69. return;
  70. }
  71. }
  72. }
  73. // 请求就绪加入队列
  74. private void ContextReady(IAsyncResult ar)
  75. {
  76. try
  77. {
  78. lock (_queue)
  79. {
  80. _queue.Enqueue(_listener.EndGetContext(ar));
  81. _ready.Set();
  82. }
  83. }
  84. catch (Exception e)
  85. {
  86. //Console.WriteLine($"[HttpServerBase::ContextReady]err:{e.Message}");
  87. this.LogError($"[HttpServerBase::ContextReady]err:{e.Message}");
  88. }
  89. }
  90. // 处理一个任务
  91. private void Worker()
  92. {
  93. WaitHandle[] wait = new[] { _ready, _stop };
  94. while (0 == WaitHandle.WaitAny(wait))
  95. {
  96. HttpListenerContext context;
  97. lock (_queue)
  98. {
  99. if (_queue.Count > 0)
  100. context = _queue.Dequeue();
  101. else
  102. {
  103. _ready.Reset();
  104. continue;
  105. }
  106. }
  107. try
  108. {
  109. ProcessRequest?.Invoke(context);
  110. }
  111. catch (Exception e)
  112. {
  113. //Console.WriteLine($"[HttpServerBase::Worker]err:{e.Message}");
  114. this.LogError($"[HttpServerBase::Worker]err:{e.Message}");
  115. }
  116. }
  117. }
  118. }
  119. }