klzhangweiya 2 лет назад
Родитель
Сommit
66646ad8a7

BIN
VB_DSM_V2.1/mock/mock_data.db


+ 1 - 1
VB_DSM_V2.1/vbdsm-data-upload/vbdsm-hj212/vbdsm-hj212-modbus/src/main/java/cn/vbdsm/hj212/modbus/coder/MyEncoder.java → VB_DSM_V2.1/vbdsm-data-upload/vbdsm-hj212/vbdsm-hj212-modbus/src/main/java/cn/vbdsm/hj212/modbus/coder/CodeEncoder.java

@@ -6,7 +6,7 @@ import io.netty.handler.codec.MessageToByteEncoder;
 
 import java.nio.charset.StandardCharsets;
 
-public class MyEncoder extends MessageToByteEncoder<String> {
+public class CodeEncoder extends MessageToByteEncoder<String> {
     @Override
     protected void encode(ChannelHandlerContext channelHandlerContext, String s, ByteBuf byteBuf) throws Exception {
         byteBuf.writeBytes(s.getBytes(StandardCharsets.UTF_8));

+ 34 - 0
VB_DSM_V2.1/vbdsm-data-upload/vbdsm-hj212/vbdsm-hj212-modbus/src/main/java/cn/vbdsm/hj212/modbus/handler/ClientHeartbeatHandler.java

@@ -0,0 +1,34 @@
+package cn.vbdsm.hj212.modbus.handler;
+
+import cn.vbdsm.hj212.modbus.model.NettyMsg;
+import cn.vbdsm.hj212.modbus.model.common.ServiceCodeEnum;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.handler.timeout.IdleStateHandler;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class ClientHeartbeatHandler extends IdleStateHandler  {
+    // 设置写事件为5s
+    // 如果5s没有写事件发生 就会触发下面的IdleStateEvent
+    private static final int WRITE_IDLE_TIME = 5;
+
+
+    public ClientHeartbeatHandler() {
+        super(0, WRITE_IDLE_TIME, 0, TimeUnit.SECONDS);
+    }
+
+    @Override
+    protected void  channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) {
+        // 指定时间内没有写事件发送 就会触发 IdleState.WRITER_IDLE 类型事件
+        // 我们就可以对该连接进行处理 主动发送心跳
+        if(evt.state()== IdleState.WRITER_IDLE){
+            //log.info("{} 秒内没有发送数据,再不发送小心和服务端断开连接", WRITE_IDLE_TIME);
+            ctx.writeAndFlush(new NettyMsg(ServiceCodeEnum.Heartbeat_TYPE,"我还活着,不要断开连接"));
+        }
+    }
+
+}

+ 14 - 0
VB_DSM_V2.1/vbdsm-data-upload/vbdsm-hj212/vbdsm-hj212-modbus/src/main/java/cn/vbdsm/hj212/modbus/model/NettyMsg.java

@@ -0,0 +1,14 @@
+package cn.vbdsm.hj212.modbus.model;
+
+import cn.vbdsm.hj212.modbus.model.common.ServiceCodeEnum;
+import lombok.Data;
+
+@Data
+public class NettyMsg {
+    private ServiceCodeEnum code;
+    private String msg;
+    public NettyMsg(ServiceCodeEnum code,String msg){
+        this.code = code;
+        this.msg = msg;
+    }
+}

+ 12 - 0
VB_DSM_V2.1/vbdsm-data-upload/vbdsm-hj212/vbdsm-hj212-modbus/src/main/java/cn/vbdsm/hj212/modbus/model/common/ServiceCodeEnum.java

@@ -0,0 +1,12 @@
+package cn.vbdsm.hj212.modbus.model.common;
+
+public enum ServiceCodeEnum {
+    Heartbeat_TYPE(0,"心跳"),;
+    private int code;
+    private String msg;
+
+    ServiceCodeEnum(int code,String msg){
+        this.code = code;
+        this.msg = msg;
+    }
+}

+ 131 - 45
VB_DSM_V2.1/vbdsm-data-upload/vbdsm-hj212/vbdsm-hj212-modbus/src/main/java/cn/vbdsm/hj212/modbus/server/TcpSocketClient.java

@@ -1,22 +1,28 @@
 package cn.vbdsm.hj212.modbus.server;
 
-import cn.vbdsm.hj212.modbus.coder.MyEncoder;
+import cn.vbdsm.hj212.modbus.coder.CodeEncoder;
 import cn.vbdsm.hj212.modbus.config.ClientAttributeKey;
+import cn.vbdsm.hj212.modbus.handler.ClientHeartbeatHandler;
 import cn.vbdsm.hj212.modbus.handler.TcpClientHandler;
 import cn.vbdsm.hj212.modbus.ws.data.TerminalData;
 import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import io.netty.channel.*;
 import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
+//import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Component;
+import org.springframework.scheduling.support.CronTrigger;
 
+import java.io.*;
 import java.net.InetSocketAddress;
-import java.util.Scanner;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.StandardCharsets;
 import java.util.concurrent.TimeUnit;
 
 
@@ -44,24 +50,27 @@ public class TcpSocketClient {
         this.host = host;
         this.port = port;
         this.terminalData = terminalData;
-        init();
+        //init();
     }
 
-    public void init(){
-        EventLoopGroup group = new NioEventLoopGroup();
-        this.bootstrap = new Bootstrap();
-        bootstrap.group(group).channel(NioSocketChannel.class)
-                .option(ChannelOption.TCP_NODELAY, true)
-                .handler(new ChannelInitializer<SocketChannel>() {
-                    @Override
-                    protected void initChannel(SocketChannel ch) throws Exception {
-                        ChannelPipeline pipeline = ch.pipeline();
-                        pipeline.addLast("encoder",new MyEncoder());
-                        pipeline.addLast("handler",new TcpClientHandler());
-                    }
-                });
-
-    }
+//    public void init(){
+//        EventLoopGroup group = new NioEventLoopGroup(1);
+//        this.bootstrap = new Bootstrap();
+//        bootstrap.group(group).channel(NioSocketChannel.class)
+//                .option(ChannelOption.TCP_NODELAY, true)
+//                .option(ChannelOption.SO_KEEPALIVE, true)
+//                .option(ChannelOption.SO_SNDBUF, 1024 * 1024)
+//                .handler(new ChannelInitializer<SocketChannel>() {
+//                    @Override
+//                    protected void initChannel(SocketChannel ch) throws Exception {
+//                        ChannelPipeline pipeline = ch.pipeline();
+//                        pipeline.addLast("encoder",new CodeEncoder());
+//                        pipeline.addLast("handler",new TcpClientHandler());
+//                        pipeline.addLast("heartbeat",new ClientHeartbeatHandler());
+//                    }
+//                });
+//
+//    }
 
 
     public void Connect(){
@@ -73,40 +82,117 @@ public class TcpSocketClient {
             this.channel = f.sync().channel();
             this.channel.attr(ClientAttributeKey.terminalKey).set(terminalData.getTerminalId());
             isConnect = true;
-            //断线重连
-            f.addListener(new ChannelFutureListener() {
+            this.channel.eventLoop().schedule(new Runnable() {
                 @Override
-                public void operationComplete(ChannelFuture channelFuture) throws Exception {
-                    if (!channelFuture.isSuccess()) {
-                        isConnect = false;
-                        final EventLoop loop = channelFuture.channel().eventLoop();
-                        loop.schedule(new Runnable() {
-                            @Override
-                            public void run() {
-                                log.info("设备号:"+terminalData.getTerminalId()+"  not connect service");
-                                Connect();
-                            }
-                        }, 15, TimeUnit.SECONDS);
-                    } else {
-                        isConnect = true;
-                        channel = channelFuture.channel();
-                        System.out.println("设备号:"+terminalData.getTerminalId()+" connected!");
-                    }
+                public void run() {
+                    channel.writeAndFlush("数据读取完成!");
                 }
-            });
+            }, 5, TimeUnit.MINUTES);
+            //断线重连
+//            f.addListener(new ChannelFutureListener() {
+//                @Override
+//                public void operationComplete(ChannelFuture channelFuture) throws Exception {
+//                    if (!channelFuture.isSuccess()) {
+//                        isConnect = false;
+//                        final EventLoop loop = channelFuture.channel().eventLoop();
+//                        loop.schedule(new Runnable() {
+//                            @Override
+//                            public void run() {
+//                                log.info("设备号:"+terminalData.getTerminalId()+"  not connect service");
+//                                Connect();
+//                            }
+//                        }, 15, TimeUnit.SECONDS);
+//                    } else {
+//                        isConnect = true;
+//                        channel = channelFuture.channel();
+//                        System.out.println("设备号:"+terminalData.getTerminalId()+" connected!");
+//                    }
+//                }
+//            });
         } catch (InterruptedException e) {
-            throw new RuntimeException(e);
+            log.error("->服务端连接失败...");
+        }finally {
+            //释放线程组资源
+            //group.shutdownGracefully();
         }
         //释放线程组资源
         //group.shutdownGracefully();
     }
 
-    public void sendData(String msg){
+//    public void sendData(String msg){
+//        try {
+//            this.channel.writeAndFlush(msg);
+//        } catch (Exception e) {
+//            throw new RuntimeException(e);
+//        }
+//    }
+
+    public void sendData(String msg)   {
+//        try {
+//            Socket socket = new Socket(this.host, this.port);
+//            log.info("Connected to server...");
+//            InputStream input = socket.getInputStream();
+//            OutputStream output = socket.getOutputStream();
+//            //PrintWriter writer = new PrintWriter(output, true);
+//            //BufferedReader reader = new BufferedReader(new InputStreamReader(input));
+//            //writer.println(msg);
+//            output.write(msg.getBytes(StandardCharsets.UTF_8));
+//            InputStreamReader in = new InputStreamReader(input, StandardCharsets.UTF_8);
+//            //获取输入流里面数据并存储数据
+//            byte[] b = new byte[1024];
+//            StringBuilder sb = new StringBuilder()          ;
+//            String s;
+//            if (in.read() != -1) {
+//                s = new String(b);
+//                //System.out.println(s);
+//                sb.append(s);
+//            }
+//            log.info("来自服务器的数据:" + sb.toString());
+//            output.flush();
+//            output.close();
+//            in.close();
+//            socket.close();
+//        } catch (IOException e) {
+//            log.error("->服务端发送失败!!,{}",e.getMessage());
+//        }
+        SocketChannel socketChannel = null;
         try {
-            this.channel.writeAndFlush(msg);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
+            socketChannel = SocketChannel.open();
+            socketChannel.connect(new InetSocketAddress(this.host, this.port));
+            // 写入数据到socket
+            ByteBuffer buffer = ByteBuffer.allocate(2048);
+            buffer.put(msg.getBytes(StandardCharsets.UTF_8));
+            buffer.flip();
+            while (buffer.hasRemaining()) {
+                socketChannel.write(buffer);
+            }
+
+            // 从socket读取数据
+            buffer.clear();
+            if (socketChannel.read(buffer) > 0) {
+                buffer.flip();
+                ByteBuf byteBuf = Unpooled.wrappedBuffer(buffer);
+                // 在这里处理你的ByteBuf数据
+                byte[] bytes = new byte[byteBuf.readableBytes()];
+                byteBuf.readBytes(bytes);
+                log.info("设备:{},接收到客户端数据{}",this.terminalData.getTerminalId(),new String(bytes));
+                buffer.clear();
+            }
+        } catch (IOException e) {
+            log.error("->服务端发送失败!!,{}",e.getMessage());
+        }finally {
+            if (socketChannel != null) {
+                try {
+                    socketChannel.close();
+                } catch (IOException e) {
+                    log.error("->服务端发送失败!!,关闭channel失败!{}",e.getMessage());
+                }
+            }
         }
+
+
+
+
     }
 
 }

+ 18 - 9
VB_DSM_V2.1/vbdsm-data-upload/vbdsm-hj212/vbdsm-hj212-modbus/src/main/java/cn/vbdsm/hj212/modbus/ws/MockUploadTcpService.java

@@ -2,6 +2,7 @@ package cn.vbdsm.hj212.modbus.ws;
 
 import cn.hutool.core.util.ByteUtil;
 import cn.hutool.core.util.CharsetUtil;
+import cn.hutool.core.util.NumberUtil;
 import cn.hutool.core.util.StrUtil;
 import cn.vbdsm.hj212.modbus.server.TcpSocketClient;
 import cn.vbdsm.hj212.modbus.utils.AESUtil;
@@ -15,6 +16,7 @@ import org.springframework.stereotype.Component;
 
 import java.math.BigDecimal;
 import java.nio.charset.StandardCharsets;
+import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
@@ -49,19 +51,22 @@ public class MockUploadTcpService {
                 TerminalData td = getTerminal(cts[0],t);
                 TcpSocketClient client = new TcpSocketClient(host,Integer.parseInt(port),td);
                 clients.add(client);
-                client.Connect();
+                //client.Connect();
             }
         }
     }
 
+    private Integer getCurDay(){
+        DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyyMMdd");
+        LocalDate localDate = LocalDate.now();
+        return NumberUtil.parseInt(localDate.format(df));
+    }
 
     public void SendAllData() {
         //定时任务启动前,需要先将客户端全部连接成功!遍历客户端发送数据!
         for(TcpSocketClient client : clients){
             TerminalData td = client.getTerminalData();
-            if(td==null){
-                continue;
-            }
+            td.setUploadDate(getCurDay());
             RandomPower power = new RandomPower();
             setEngine(power,td);
             try {
@@ -69,6 +74,7 @@ public class MockUploadTcpService {
                 long now = Long.parseLong(nowStr);
                 now = now - (now - (now / 10000) * 10000) % 500;
                 uploadData(client,td,power,now);
+                Thread.sleep(500);
             }catch (Exception e){
                 log.error("uploadData error:" + e.getMessage());
             }
@@ -84,8 +90,8 @@ public class MockUploadTcpService {
     private void setEngine(RandomPower power, TerminalData td){
         var dp = RandomHelper.getRandom(22000,3000,100);
         var de = RandomHelper.getRandom(6600,600,100);
-        var pe = td.getDayTotal_p().equals(new BigDecimal(0))?new BigDecimal(0):td.getDayTotal_p().add(dp);
-        var qe = td.getDayTotal_e().equals(new BigDecimal(0))?new BigDecimal(0):td.getDayTotal_e().add(de);
+        var pe = td.getDayTotal_p().equals(new BigDecimal(0))?dp:td.getDayTotal_p().add(dp);
+        var qe = td.getDayTotal_e().equals(new BigDecimal(0))?de:td.getDayTotal_e().add(de);
         var ps = td.getTotal_p().add(dp);
         var qs = td.getTotal_e().add(de);
         power.setEngine(pe,qe,ps,qs);
@@ -106,9 +112,12 @@ public class MockUploadTcpService {
         result += "\r\n";
         log.info("终端:{},send data:{}" ,td.getTerminalId(), result);
         //当前客户端如果没有保持在线状态,不发送
-        if(client.isConnect()){
-            client.sendData(result);
-        }
+//        if(client.isConnect()){
+//            client.sendData(result);
+//            log.info("终端:{},send data success!" ,td.getTerminalId());
+//        }
+        client.sendData(result);
+        log.info("终端:{},send data success!" ,td.getTerminalId());
     }
 
     private String genStr(TerminalData td,RandomPower power){

+ 1 - 1
VB_DSM_V2.1/vbdsm-data-upload/vbdsm-hj212/vbdsm-hj212-modbus/src/main/resources/application.yml

@@ -10,7 +10,7 @@ vbdsm:
             workThreadNum: 4 #netty work线程数,默认为4
         client:
             port: 7777
-            bindIp: 127.0.0.1
+            bindIp: 192.168.0.104
     ws:
         #url: http://192.168.0.104:7100/services/UploadData?wsdl;http://192.168.0.104:7100/services/UploadData?wsdl
         url: http://192.168.0.81:7101/services/UploadData?wsdl