ScenePusher.cs 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. using System.Collections.Concurrent;
  2. using System.Threading;
  3. using System.Threading.Tasks;
  4. namespace WeEngine.Message
  5. {
  6. public class ScenePusher
  7. {
  8. private volatile ConcurrentQueue<ScenePushMessage> _messages;
  9. private CancellationTokenSource _cts;
  10. private bool _switch;
  11. /// <summary>
  12. /// 发送消息事件
  13. /// </summary>
  14. public event SendMsgEventHandle SendMsg;
  15. public delegate void SendMsgEventHandle(ScenePushMessage msg);
  16. public ScenePusher()
  17. {
  18. _switch = false;
  19. _messages = new ConcurrentQueue<ScenePushMessage>();
  20. _cts = new CancellationTokenSource();
  21. }
  22. public bool NeedHandel => SendMsg == null;
  23. public void Push(ScenePushMessage message)
  24. {
  25. lock (_messages)
  26. {
  27. _messages.Enqueue(message);
  28. }
  29. }
  30. public void Stop()
  31. {
  32. lock (_messages)
  33. {
  34. _messages = new ConcurrentQueue<ScenePushMessage>();
  35. }
  36. _switch = false;
  37. _cts.Cancel();
  38. }
  39. public bool Start()
  40. {
  41. if (!_switch)
  42. {
  43. _switch = true;
  44. if (_cts.IsCancellationRequested)
  45. {
  46. _cts = new CancellationTokenSource();
  47. }
  48. Listener(_cts.Token);
  49. return false;
  50. }
  51. return true;
  52. }
  53. /// <summary>
  54. /// 监听
  55. /// </summary>
  56. /// <param name="cancellationToken"></param>
  57. private void Listener(CancellationToken cancellationToken)
  58. {
  59. Task.Factory.StartNew(() =>
  60. {
  61. while (!cancellationToken.IsCancellationRequested)//如果没有取消线程,则一直监听执行发送消息
  62. {
  63. // ReSharper disable InconsistentlySynchronizedField
  64. if (_messages.Count > 0)
  65. {
  66. var msg = DequeueS();
  67. if (msg != null)
  68. {
  69. SendMsg?.Invoke(msg);
  70. }
  71. }
  72. else
  73. {
  74. SpinWait.SpinUntil(() => _messages.Count >= 1, 5000);//自旋等待直到消息队列有>=1的记录或超时5S后再进入下一轮的判断
  75. }
  76. }
  77. }, cancellationToken);
  78. }
  79. /// <summary>
  80. /// 取待发送消息
  81. /// </summary>
  82. /// <returns></returns>
  83. private ScenePushMessage DequeueS()
  84. {
  85. lock (_messages)
  86. {
  87. if (_messages.Count <= 0)
  88. {
  89. return null;
  90. }
  91. _messages.TryDequeue(out var msg);
  92. return msg;
  93. }
  94. }
  95. }
  96. }