|
|
@@ -1,179 +0,0 @@
|
|
|
-package com.iwbnet.iot.mqtt;
|
|
|
-
|
|
|
-import com.alibaba.fastjson2.JSON;
|
|
|
-import com.iwbnet.iot.config.MqttConfig;
|
|
|
-import com.iwbnet.iot.model.TopicMsgDto;
|
|
|
-import lombok.extern.slf4j.Slf4j;
|
|
|
-import org.eclipse.paho.client.mqttv3.*;
|
|
|
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
|
|
-import org.springframework.beans.factory.annotation.Autowired;
|
|
|
-import org.springframework.stereotype.Component;
|
|
|
-
|
|
|
-import javax.annotation.PreDestroy;
|
|
|
-
|
|
|
-
|
|
|
-@Slf4j
|
|
|
-@Component
|
|
|
-public class MqttConnect {
|
|
|
-
|
|
|
- @Autowired
|
|
|
- private MqttConfig mqttConfig;
|
|
|
-
|
|
|
- private MqttClient mqttClient;
|
|
|
-
|
|
|
-
|
|
|
- public boolean isConnect() {
|
|
|
- return mqttClient.isConnected();
|
|
|
- }
|
|
|
- /**
|
|
|
- * 客户端connect连接mqtt服务器
|
|
|
- *
|
|
|
- * @param userName 用户名
|
|
|
- * @param passWord 密码
|
|
|
- * @param mqttCallback 回调函数
|
|
|
- **/
|
|
|
- public void connect(String userName, String passWord, MqttCallback mqttCallback){
|
|
|
- if(userName.equals("") || passWord.equals("")) {
|
|
|
- userName = mqttConfig.getUsername();
|
|
|
- passWord = mqttConfig.getPassword();
|
|
|
- }
|
|
|
-
|
|
|
- try {
|
|
|
- MqttConnectOptions options = mqttConnectOptions(userName, passWord);
|
|
|
- if (mqttCallback == null) {
|
|
|
- mqttClient.setCallback(new MqCallback());
|
|
|
- } else {
|
|
|
- mqttClient.setCallback(mqttCallback);
|
|
|
- }
|
|
|
- mqttClient.connect(options);
|
|
|
- } catch (MqttException e) {
|
|
|
- log.error(e.getMessage(), e);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 订阅默认主题
|
|
|
- */
|
|
|
- public void subInitTopic(){
|
|
|
- try {
|
|
|
-
|
|
|
- mqttClient.subscribe(mqttConfig.getInitSub());
|
|
|
- } catch (MqttException e) {
|
|
|
- log.error(e.getMessage(), e);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- /**
|
|
|
- * MQTT连接参数设置
|
|
|
- */
|
|
|
- private MqttConnectOptions mqttConnectOptions(String userName, String passWord) throws MqttException {
|
|
|
- String host = "tcp://127.0.0.1:1883";
|
|
|
- String clientId = "vbdsm-hj212-iot";
|
|
|
- if(mqttConfig != null && mqttConfig.getHost() != null && !"".equals(mqttConfig.getHost())) {
|
|
|
- host = mqttConfig.getHost();
|
|
|
- clientId = mqttConfig.getClientId();
|
|
|
- }
|
|
|
- mqttClient = new MqttClient(host, clientId, new MemoryPersistence());
|
|
|
- MqttConnectOptions options = new MqttConnectOptions();
|
|
|
- options.setUserName(userName);
|
|
|
- options.setPassword(passWord.toCharArray());
|
|
|
- options.setConnectionTimeout(10);///默认:30
|
|
|
- options.setAutomaticReconnect(true);//默认:false
|
|
|
- options.setCleanSession(false);//默认:true
|
|
|
- //options.setKeepAliveInterval(20);//默认:60
|
|
|
- return options;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 关闭MQTT连接
|
|
|
- */
|
|
|
- @PreDestroy
|
|
|
- public void close() throws MqttException {
|
|
|
- log.info("关闭MQTT连接");
|
|
|
- mqttClient.disconnect();
|
|
|
- mqttClient.close();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 向某个主题发布消息 默认qos:1
|
|
|
- *
|
|
|
- * @param topic:发布的主题
|
|
|
- * @param msg:发布的消息
|
|
|
- */
|
|
|
- public boolean pub(String topic, String msg) {
|
|
|
- return pub(topic, msg, 1);
|
|
|
- }
|
|
|
-
|
|
|
- public boolean pub(TopicMsgDto input) {
|
|
|
- String topic = input.getTopic();
|
|
|
- String payLoad = input.getPayload();
|
|
|
- if (payLoad == null) {
|
|
|
- return false;
|
|
|
- }
|
|
|
- return pub(topic, payLoad, input.getQos());
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 向某个主题发布消息
|
|
|
- *
|
|
|
- * @param topic: 发布的主题
|
|
|
- * @param msg: 发布的消息
|
|
|
- * @param qos: 消息质量 Qos:0、1、2
|
|
|
- */
|
|
|
- public boolean pub(String topic, String msg, int qos) {
|
|
|
- MqttMessage mqttMessage = new MqttMessage();
|
|
|
- mqttMessage.setQos(qos);
|
|
|
- mqttMessage.setPayload(msg.getBytes());
|
|
|
- MqttTopic mqttTopic = mqttClient.getTopic(topic);
|
|
|
- try{
|
|
|
- MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
|
|
|
- token.waitForCompletion();
|
|
|
- return true;
|
|
|
- }catch (MqttException e) {
|
|
|
- log.error(e.getMessage(), e);
|
|
|
- }
|
|
|
- return false;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 订阅某一个主题 ,此方法默认的的Qos等级为:1
|
|
|
- *
|
|
|
- * @param topic 主题
|
|
|
- */
|
|
|
- public boolean sub(String topic) {
|
|
|
- try{
|
|
|
- mqttClient.subscribe(topic);
|
|
|
- return true;
|
|
|
- }catch (MqttException e) {
|
|
|
- log.error(e.getMessage(), e);
|
|
|
- }
|
|
|
- return false;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 订阅某一个主题,可携带Qos
|
|
|
- *
|
|
|
- * @param topic 所要订阅的主题
|
|
|
- * @param qos 消息质量:0、1、2
|
|
|
- */
|
|
|
- public boolean sub(String topic, int qos) {
|
|
|
- try{
|
|
|
- mqttClient.subscribe(topic, qos);
|
|
|
- return true;
|
|
|
- }catch (MqttException e) {
|
|
|
- log.error(e.getMessage(), e);
|
|
|
- }
|
|
|
- return false;
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
-// public void destroy() {
|
|
|
-// try {
|
|
|
-// this.close();
|
|
|
-// } catch (MqttException e) {
|
|
|
-// log.error(e.getMessage(), e);
|
|
|
-// }
|
|
|
-// }
|
|
|
-}
|