WeEngineMsgClientManager.cs 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. using System.Configuration;
  2. using System.Reflection;
  3. using Abp.Configuration;
  4. using Abp.Dependency;
  5. using Abp.Runtime.Caching;
  6. using IwbZero.ToolCommon;
  7. using IwbZero.ToolCommon.LogHelpers;
  8. using IwbZero.ToolCommon.StringModel;
  9. using uPLibrary.Networking.M2Mqtt.Messages;
  10. using WeEngine.CommonDto;
  11. using WeEngine.Enum;
  12. using WeMessageService;
  13. namespace WeEngine.Message
  14. {
  15. public class WeEngineMsgClientManager : ISingletonDependency
  16. {
  17. public WeEngineMsgClientManager(ISettingManager settingManager, ICacheManager cacheManager)
  18. {
  19. SettingManager = settingManager;
  20. CacheManager = cacheManager;
  21. if (Client.NotValidate)
  22. {
  23. RegisterClient();
  24. }
  25. }
  26. private static M2MqttMsgClient Client = new M2MqttMsgClient();
  27. //public IIwbMsgClient Client { get; set; }
  28. private ISettingManager SettingManager { get; }
  29. private ICacheManager CacheManager { get; }
  30. public string ClientId { get; set; }
  31. /// <summary>
  32. /// 强制刷新客户端
  33. /// </summary>
  34. public void RefreshClient()
  35. {
  36. this.LogInfo($"客户端{ClientId}开始强制刷新!");
  37. Client.DisConnect();
  38. Client = new M2MqttMsgClient();
  39. RegisterClient();
  40. SendMessage(ClientId, $"客户端{ClientId}已强制刷新!");
  41. SendMessage("WeApp", $"客户端{ClientId}已强制刷新!");
  42. }
  43. /// <summary>
  44. /// 注册客户端
  45. /// </summary>
  46. public void RegisterClient()
  47. {
  48. ClientId = "WeEngine";
  49. #if DEBUG
  50. ClientId += "-TEST";
  51. var result = new MsgClientDto()
  52. {
  53. AppId = ClientId,
  54. Name = "智慧推演引擎",
  55. SecretKey = "WePlatformWeEngine",
  56. ServerIp = "localhost",
  57. ServerPort = 1885,
  58. };
  59. #else
  60. string url = "";//SettingManager.GetSettingValue(IwbSettingNames.DateCenterUrl);
  61. url = url.IsEmpty()
  62. ? ConfigurationManager.AppSettings["DataCenter.Platform.BaseUrl"] ?? "http://shvber.com:5025/"
  63. : url;
  64. this.LogInfo(url);
  65. var result = GetMsgClientDto(ClientId, url);
  66. if (result == null)
  67. {
  68. //throw new UserFriendlyException("未查询到消息客户端配置!");
  69. this.LogError($"未查询到消息客户端{ClientId}的配置!");
  70. return;
  71. }
  72. #endif
  73. ClientId = result.AppId;
  74. Client.SetOptions(result.ServerIp, result.ServerPort, result.AppId, result.Name, result.SecretKey);
  75. Client.SetReceivedHandler(ReceivedHandler);
  76. Client.HeartbeatTopic.Add("WeApp");
  77. Subscribe();
  78. }
  79. private void Subscribe()
  80. {
  81. if (Client.CheckClient())
  82. {
  83. Client.SubscribeTopic(ClientId, false);
  84. Client.SubscribeTopic("Heartbeat");
  85. var t = typeof(MessageType);
  86. var fts = t.GetFields(BindingFlags.Static | BindingFlags.Public);
  87. foreach (var f in fts)
  88. {
  89. string name = f.Name;
  90. Client.SubscribeTopic($"{name}/+");
  91. //AsyncHelper.RunSync(() => Client.SubscribeTopic(name));
  92. }
  93. }
  94. }
  95. public void SendMessage(WeMessage message, MessageType type, string clientId = null)
  96. {
  97. if (Client.NotValidate)
  98. {
  99. RegisterClient();
  100. }
  101. var topic = message.RunningId.IsEmpty() ? type.ToString() : $"{type}/{message.RunningId}";
  102. Client.SendMsg(topic, message.Obj2String(), clientId);
  103. }
  104. public void SendMessage(WeMessage message)
  105. {
  106. if (Client.NotValidate)
  107. {
  108. RegisterClient();
  109. }
  110. var topic = message.RunningId.IsEmpty() ? message.Type.ToString() : $"{message.Type}/{message.RunningId}";
  111. Client.SendMsg(topic, message.Obj2String(), message.TargetClientId);
  112. }
  113. public void SendMessage(string msg)
  114. {
  115. if (Client.NotValidate)
  116. {
  117. RegisterClient();
  118. }
  119. Client.SendMessage(ClientId, msg.Obj2String());
  120. }
  121. public void SendMessage(string topic, string msg)
  122. {
  123. if (Client.NotValidate)
  124. {
  125. RegisterClient();
  126. }
  127. Client.SendMessage(topic, msg.Obj2String());
  128. }
  129. /// <summary>
  130. /// 消息接受回调处理
  131. /// </summary>
  132. /// <param name="msg"></param>
  133. private void ReceiveMsgHandler(string msg)
  134. {
  135. var message = msg.Str2Obj<WeMessage>();
  136. if (message != null)
  137. {
  138. switch (message.Type)
  139. {
  140. case MessageType.Environment:
  141. break;
  142. case MessageType.RunningInfo:
  143. break;
  144. case MessageType.CommonMessage:
  145. break;
  146. case MessageType.GuideInfo:
  147. break;
  148. }
  149. }
  150. else
  151. {
  152. typeof(WeEngineMsgClientManager).LogError("MSG ERROR:" + msg);
  153. }
  154. }
  155. /// <summary>
  156. /// 消息接受回调处理
  157. /// </summary>
  158. /// <param name="sender"></param>
  159. /// <param name="e"></param>
  160. private void ReceivedHandler(object sender, MqttMsgPublishEventArgs e)
  161. {
  162. if (e.Topic == ClientId || e.Topic == $"{ClientId}/Heartbeat")
  163. {
  164. return;
  165. }
  166. string msg = System.Text.Encoding.UTF8.GetString(e.Message);
  167. ReceiveMsgHandler(msg);
  168. }
  169. //private static IMqttApplicationMessageReceivedHandler ReceivedHandler => new MqttApplicationMessageReceivedHandlerDelegate(
  170. // (arg) =>
  171. // {
  172. // string payload = System.Text.Encoding.UTF8.GetString(arg.ApplicationMessage.Payload);
  173. // var message = payload.Str2Obj<WeMessage>();
  174. // if (message != null)
  175. // {
  176. // switch (message.MsgType)
  177. // {
  178. // case MessageType.Environment:
  179. // break;
  180. // case MessageType.RunningInfo:
  181. // break;
  182. // case MessageType.CommonMessage:
  183. // break;
  184. // case MessageType.GuideInfo:
  185. // break;
  186. // }
  187. // }
  188. // else
  189. // {
  190. // typeof(WeEngineMsgClientManager).LogError("Message ERROR:" + payload);
  191. // }
  192. // });
  193. /// <summary>
  194. /// 获取消息客户端信息
  195. /// </summary>
  196. /// <returns></returns>
  197. public MsgClientDto GetMsgClientDto(string appid, string urlPrev)
  198. {
  199. return CacheManager.GetCache("WeEngineClient").Get($"{appid}", () =>
  200. {
  201. var url =
  202. $"{urlPrev.Ew("/")}api/services/WePlatform/MessageServer/GetClient?appid={appid}";
  203. var result = url.RequestPost("");
  204. var dto = result.Str2Obj<MsgClientDto>();
  205. return dto;
  206. });
  207. }
  208. }
  209. }