WeEngineMsgClientManager.cs 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. using System.Configuration;
  2. using System.Reflection;
  3. using Abp.Configuration;
  4. using Abp.Dependency;
  5. using Abp.Runtime.Caching;
  6. using IwbZero.ToolCommon;
  7. using IwbZero.ToolCommon.LogHelpers;
  8. using IwbZero.ToolCommon.StringModel;
  9. using uPLibrary.Networking.M2Mqtt.Messages;
  10. using WeEngine.CommonDto;
  11. using WeEngine.Enum;
  12. using WeMessageService;
  13. namespace WeEngine.Message
  14. {
  15. public class WeEngineMsgClientManager : ISingletonDependency
  16. {
  17. public WeEngineMsgClientManager(ISettingManager settingManager, ICacheManager cacheManager)
  18. {
  19. SettingManager = settingManager;
  20. CacheManager = cacheManager;
  21. if (Client.NotValidate)
  22. {
  23. RegisterClient();
  24. }
  25. }
  26. private static M2MqttMsgClient Client = new M2MqttMsgClient();
  27. //public IIwbMsgClient Client { get; set; }
  28. private ISettingManager SettingManager { get; }
  29. private ICacheManager CacheManager { get; }
  30. public string ClientId { get; set; }
  31. /// <summary>
  32. /// 强制刷新客户端
  33. /// </summary>
  34. public void RefreshClient()
  35. {
  36. this.LogInfo($"客户端{ClientId}开始强制刷新!");
  37. Client.DisConnect();
  38. Client = new M2MqttMsgClient();
  39. RegisterClient();
  40. SendMessage(ClientId, $"客户端{ClientId}已强制刷新!");
  41. SendMessage("WeApp", $"客户端{ClientId}已强制刷新!");
  42. }
  43. /// <summary>
  44. /// 注册客户端
  45. /// </summary>
  46. public void RegisterClient()
  47. {
  48. ClientId = "WeEngine";
  49. #if DEBUG
  50. ClientId += "-TEST";
  51. var result = new MsgClientDto()
  52. {
  53. AppId = ClientId,
  54. Name = "智慧推演引擎",
  55. SecretKey = "WePlatformWeEngine",
  56. ServerIp = "shvber.com",
  57. ServerPort = 1885,
  58. };
  59. #else
  60. string url = "";//SettingManager.GetSettingValue(IwbSettingNames.DateCenterUrl);
  61. url = url.IsEmpty()
  62. ? ConfigurationManager.AppSettings["DataCenter.Platform.BaseUrl"] ?? "http://shvber.com:5025/"
  63. : url;
  64. this.LogInfo(url);
  65. var result = GetMsgClientDto(ClientId, url);
  66. if (result == null)
  67. {
  68. //throw new UserFriendlyException("未查询到消息客户端配置!");
  69. this.LogError($"未查询到消息客户端{ClientId}的配置!");
  70. return;
  71. }
  72. #endif
  73. //var result = new MsgClientDto()
  74. //{
  75. // Name = "智慧推演引擎",
  76. // AppId = "WeEngine",
  77. // SecretKey = "WePlatformWeEngine",
  78. // ServerIp = "shvber.com",
  79. // ServerPort = 1885
  80. //};
  81. //var result = new MsgClientDto()
  82. //{
  83. // Name = "admin",
  84. // AppId = "WeEngine",
  85. // SecretKey = "admin123456",
  86. // ServerIp = "shvber.com",
  87. // ServerPort = 1883
  88. //};
  89. ClientId = result.AppId;
  90. Client.SetOptions(result.ServerIp, result.ServerPort, result.AppId, result.Name, result.SecretKey);
  91. Client.SetReceivedHandler(ReceivedHandler);
  92. Client.HeartbeatTopic.Add("WeApp");
  93. Subscribe();
  94. }
  95. private void Subscribe()
  96. {
  97. if (Client.CheckClient())
  98. {
  99. Client.SubscribeTopic(ClientId, false);
  100. Client.SubscribeTopic("Heartbeat");
  101. var t = typeof(MessageType);
  102. var fts = t.GetFields(BindingFlags.Static | BindingFlags.Public);
  103. foreach (var f in fts)
  104. {
  105. string name = f.Name;
  106. Client.SubscribeTopic($"{name}/+");
  107. //AsyncHelper.RunSync(() => Client.SubscribeTopic(name));
  108. }
  109. }
  110. }
  111. public void SendMessage(WeMessage message, MessageType type, string clientId = null)
  112. {
  113. if (Client.NotValidate)
  114. {
  115. RegisterClient();
  116. }
  117. var topic = message.RunningId.IsEmpty() ? type.ToString() : $"{type}/{message.RunningId}";
  118. Client.SendMsg(topic, message.Obj2String(), clientId);
  119. }
  120. public void SendMessage(WeMessage message)
  121. {
  122. if (Client.NotValidate)
  123. {
  124. RegisterClient();
  125. }
  126. var topic = message.RunningId.IsEmpty() ? message.Type.ToString() : $"{message.Type}/{message.RunningId}";
  127. Client.SendMsg(topic, message.Obj2String(), message.TargetClientId);
  128. }
  129. public void SendMessage(string msg)
  130. {
  131. if (Client.NotValidate)
  132. {
  133. RegisterClient();
  134. }
  135. Client.SendMessage(ClientId, msg.Obj2String());
  136. }
  137. public void SendMessage(string topic, string msg)
  138. {
  139. if (Client.NotValidate)
  140. {
  141. RegisterClient();
  142. }
  143. Client.SendMessage(topic, msg.Obj2String());
  144. }
  145. /// <summary>
  146. /// 消息接受回调处理
  147. /// </summary>
  148. /// <param name="msg"></param>
  149. private void ReceiveMsgHandler(string msg)
  150. {
  151. var message = msg.Str2Obj<WeMessage>();
  152. if (message != null)
  153. {
  154. switch (message.Type)
  155. {
  156. case MessageType.Environment:
  157. break;
  158. case MessageType.RunningInfo:
  159. break;
  160. case MessageType.CommonMessage:
  161. break;
  162. case MessageType.GuideInfo:
  163. break;
  164. }
  165. }
  166. else
  167. {
  168. typeof(WeEngineMsgClientManager).LogError("MSG ERROR:" + msg);
  169. }
  170. }
  171. /// <summary>
  172. /// 消息接受回调处理
  173. /// </summary>
  174. /// <param name="sender"></param>
  175. /// <param name="e"></param>
  176. private void ReceivedHandler(object sender, MqttMsgPublishEventArgs e)
  177. {
  178. if (e.Topic == ClientId || e.Topic == $"{ClientId}/Heartbeat")
  179. {
  180. return;
  181. }
  182. string msg = System.Text.Encoding.UTF8.GetString(e.Message);
  183. ReceiveMsgHandler(msg);
  184. }
  185. //private static IMqttApplicationMessageReceivedHandler ReceivedHandler => new MqttApplicationMessageReceivedHandlerDelegate(
  186. // (arg) =>
  187. // {
  188. // string payload = System.Text.Encoding.UTF8.GetString(arg.ApplicationMessage.Payload);
  189. // var message = payload.Str2Obj<WeMessage>();
  190. // if (message != null)
  191. // {
  192. // switch (message.MsgType)
  193. // {
  194. // case MessageType.Environment:
  195. // break;
  196. // case MessageType.RunningInfo:
  197. // break;
  198. // case MessageType.CommonMessage:
  199. // break;
  200. // case MessageType.GuideInfo:
  201. // break;
  202. // }
  203. // }
  204. // else
  205. // {
  206. // typeof(WeEngineMsgClientManager).LogError("Message ERROR:" + payload);
  207. // }
  208. // });
  209. /// <summary>
  210. /// 获取消息客户端信息
  211. /// </summary>
  212. /// <returns></returns>
  213. public MsgClientDto GetMsgClientDto(string appid, string urlPrev)
  214. {
  215. return CacheManager.GetCache("WeEngineClient").Get($"{appid}", () =>
  216. {
  217. var url =
  218. $"{urlPrev.Ew("/")}api/services/WePlatform/MessageServer/GetClient?appid={appid}";
  219. var result = url.RequestPost("");
  220. var dto = result.Str2Obj<MsgClientDto>();
  221. return dto;
  222. });
  223. }
  224. }
  225. }