|
|
@@ -0,0 +1,179 @@
|
|
|
+package com.iwbnet.iot.mqtt;
|
|
|
+
|
|
|
+import com.alibaba.fastjson2.JSON;
|
|
|
+import com.iwbnet.iot.config.MqttConfig;
|
|
|
+import com.iwbnet.iot.model.TopicMsgDto;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.eclipse.paho.client.mqttv3.*;
|
|
|
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+
|
|
|
+import javax.annotation.PreDestroy;
|
|
|
+
|
|
|
+
|
|
|
+@Slf4j
|
|
|
+@Component
|
|
|
+public class MqttConnect {
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private MqttConfig mqttConfig;
|
|
|
+
|
|
|
+ private MqttClient mqttClient;
|
|
|
+
|
|
|
+
|
|
|
+ public boolean isConnect() {
|
|
|
+ return mqttClient.isConnected();
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * 客户端connect连接mqtt服务器
|
|
|
+ *
|
|
|
+ * @param userName 用户名
|
|
|
+ * @param passWord 密码
|
|
|
+ * @param mqttCallback 回调函数
|
|
|
+ **/
|
|
|
+ public void connect(String userName, String passWord, MqttCallback mqttCallback){
|
|
|
+ if(userName.equals("") || passWord.equals("")) {
|
|
|
+ userName = mqttConfig.getUsername();
|
|
|
+ passWord = mqttConfig.getPassword();
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ MqttConnectOptions options = mqttConnectOptions(userName, passWord);
|
|
|
+ if (mqttCallback == null) {
|
|
|
+ mqttClient.setCallback(new MqCallback());
|
|
|
+ } else {
|
|
|
+ mqttClient.setCallback(mqttCallback);
|
|
|
+ }
|
|
|
+ mqttClient.connect(options);
|
|
|
+ } catch (MqttException e) {
|
|
|
+ log.error(e.getMessage(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 订阅默认主题
|
|
|
+ */
|
|
|
+ public void subInitTopic(){
|
|
|
+ try {
|
|
|
+
|
|
|
+ mqttClient.subscribe(mqttConfig.getInitSub());
|
|
|
+ } catch (MqttException e) {
|
|
|
+ log.error(e.getMessage(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * MQTT连接参数设置
|
|
|
+ */
|
|
|
+ private MqttConnectOptions mqttConnectOptions(String userName, String passWord) throws MqttException {
|
|
|
+ String host = "tcp://127.0.0.1:1883";
|
|
|
+ String clientId = "vbdsm-hj212-iot";
|
|
|
+ if(mqttConfig != null && mqttConfig.getHost() != null && !"".equals(mqttConfig.getHost())) {
|
|
|
+ host = mqttConfig.getHost();
|
|
|
+ clientId = mqttConfig.getClientId();
|
|
|
+ }
|
|
|
+ mqttClient = new MqttClient(host, clientId, new MemoryPersistence());
|
|
|
+ MqttConnectOptions options = new MqttConnectOptions();
|
|
|
+ options.setUserName(userName);
|
|
|
+ options.setPassword(passWord.toCharArray());
|
|
|
+ options.setConnectionTimeout(10);///默认:30
|
|
|
+ options.setAutomaticReconnect(true);//默认:false
|
|
|
+ options.setCleanSession(false);//默认:true
|
|
|
+ //options.setKeepAliveInterval(20);//默认:60
|
|
|
+ return options;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 关闭MQTT连接
|
|
|
+ */
|
|
|
+ @PreDestroy
|
|
|
+ public void close() throws MqttException {
|
|
|
+ log.info("关闭MQTT连接");
|
|
|
+ mqttClient.disconnect();
|
|
|
+ mqttClient.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 向某个主题发布消息 默认qos:1
|
|
|
+ *
|
|
|
+ * @param topic:发布的主题
|
|
|
+ * @param msg:发布的消息
|
|
|
+ */
|
|
|
+ public boolean pub(String topic, String msg) {
|
|
|
+ return pub(topic, msg, 1);
|
|
|
+ }
|
|
|
+
|
|
|
+ public boolean pub(TopicMsgDto input) {
|
|
|
+ String topic = input.getTopic();
|
|
|
+ String payLoad = input.getPayload();
|
|
|
+ if (payLoad == null) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ return pub(topic, payLoad, input.getQos());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 向某个主题发布消息
|
|
|
+ *
|
|
|
+ * @param topic: 发布的主题
|
|
|
+ * @param msg: 发布的消息
|
|
|
+ * @param qos: 消息质量 Qos:0、1、2
|
|
|
+ */
|
|
|
+ public boolean pub(String topic, String msg, int qos) {
|
|
|
+ MqttMessage mqttMessage = new MqttMessage();
|
|
|
+ mqttMessage.setQos(qos);
|
|
|
+ mqttMessage.setPayload(msg.getBytes());
|
|
|
+ MqttTopic mqttTopic = mqttClient.getTopic(topic);
|
|
|
+ try{
|
|
|
+ MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
|
|
|
+ token.waitForCompletion();
|
|
|
+ return true;
|
|
|
+ }catch (MqttException e) {
|
|
|
+ log.error(e.getMessage(), e);
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 订阅某一个主题 ,此方法默认的的Qos等级为:1
|
|
|
+ *
|
|
|
+ * @param topic 主题
|
|
|
+ */
|
|
|
+ public boolean sub(String topic) {
|
|
|
+ try{
|
|
|
+ mqttClient.subscribe(topic);
|
|
|
+ return true;
|
|
|
+ }catch (MqttException e) {
|
|
|
+ log.error(e.getMessage(), e);
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 订阅某一个主题,可携带Qos
|
|
|
+ *
|
|
|
+ * @param topic 所要订阅的主题
|
|
|
+ * @param qos 消息质量:0、1、2
|
|
|
+ */
|
|
|
+ public boolean sub(String topic, int qos) {
|
|
|
+ try{
|
|
|
+ mqttClient.subscribe(topic, qos);
|
|
|
+ return true;
|
|
|
+ }catch (MqttException e) {
|
|
|
+ log.error(e.getMessage(), e);
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+// public void destroy() {
|
|
|
+// try {
|
|
|
+// this.close();
|
|
|
+// } catch (MqttException e) {
|
|
|
+// log.error(e.getMessage(), e);
|
|
|
+// }
|
|
|
+// }
|
|
|
+}
|