using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; namespace WeEngine.Message { public class ScenePusher { private volatile ConcurrentQueue _messages; private CancellationTokenSource _cts; private bool _switch; /// /// 发送消息事件 /// public event SendMsgEventHandle SendMsg; public delegate void SendMsgEventHandle(ScenePushMessage msg); public ScenePusher() { _switch = false; _messages = new ConcurrentQueue(); _cts = new CancellationTokenSource(); } public bool NeedHandel => SendMsg == null; public void Push(ScenePushMessage message) { lock (_messages) { _messages.Enqueue(message); } } public void Stop() { lock (_messages) { _messages = new ConcurrentQueue(); } _switch = false; _cts.Cancel(); } public bool Start() { if (!_switch) { _switch = true; if (_cts.IsCancellationRequested) { _cts = new CancellationTokenSource(); } Listener(_cts.Token); return false; } return true; } /// /// 监听 /// /// private void Listener(CancellationToken cancellationToken) { Task.Factory.StartNew(() => { while (!cancellationToken.IsCancellationRequested)//如果没有取消线程,则一直监听执行发送消息 { // ReSharper disable InconsistentlySynchronizedField if (_messages.Count > 0) { var msg = DequeueS(); if (msg != null) { SendMsg?.Invoke(msg); } } else { SpinWait.SpinUntil(() => _messages.Count >= 1, 5000);//自旋等待直到消息队列有>=1的记录或超时5S后再进入下一轮的判断 } } }, cancellationToken); } /// /// 取待发送消息 /// /// private ScenePushMessage DequeueS() { lock (_messages) { if (_messages.Count <= 0) { return null; } _messages.TryDequeue(out var msg); return msg; } } } }