klzhangweiya 1 тиждень тому
батько
коміт
bfc286ab7c

+ 40 - 1
SERVER/ChickenFarmV3/vb-collection-modules/vb-collection-core/src/main/java/cn/vber/collection/core/DatabaseDrivenModbusCollector.java

@@ -1,23 +1,29 @@
 package cn.vber.collection.core;
 
 import cn.vber.collection.domain.CollectRuleEntity;
+import cn.vber.collection.domain.CollectedDataPoint;
 import cn.vber.collection.domain.CollectorEntity;
 import cn.vber.collection.handler.DataPointHandler;
 import cn.vber.collection.utils.ModbusTcpHelper;
 import lombok.extern.slf4j.Slf4j;
 
 import java.math.BigDecimal;
+import java.time.LocalDateTime;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.*;
 import java.util.stream.Collectors;
 
+import static cn.hutool.core.convert.Convert.toDouble;
+
 @Slf4j
 public class DatabaseDrivenModbusCollector {
 
     private final Map<Long, RuleLoader.CollectorWithRules> collectorGroups;
     private final ScheduledExecutorService scheduler;
     private final ThreadPoolExecutor executor;
+    // 2. 数据处理线程池(必须新增!)
+    private final ExecutorService dataHandlerExecutor;
     private final DataPointHandler dataHandler;
 
 
@@ -35,6 +41,13 @@ public class DatabaseDrivenModbusCollector {
                 new ArrayBlockingQueue<>(groups.size()),
                 new ThreadPoolExecutor.CallerRunsPolicy()
         );
+        // 初始化数据处理线程池 👇
+        this.dataHandlerExecutor = new ThreadPoolExecutor(
+                2, 4, 60L, TimeUnit.SECONDS,
+                new LinkedBlockingQueue<>(1000),
+                r -> new Thread(r, "modbus-data-handler"),
+                new ThreadPoolExecutor.CallerRunsPolicy()
+        );
     }
     public void start() {
         for (var entry : collectorGroups.entrySet()) {
@@ -49,6 +62,31 @@ public class DatabaseDrivenModbusCollector {
             );
         }
     }
+    // 在 DatabaseDrivenModbusCollector.pollCollectorGroup 中
+    private void processRule(CollectRuleEntity rule, Object rawValue, BigDecimal convertA, BigDecimal convertB) {
+        // 1. 计算转换值
+        double x = toDouble(rawValue);
+        double converted = convertA.doubleValue() * x + convertB.doubleValue();
+
+        // 2. 创建不可变快照(此时固化时间戳!)
+        CollectedDataPoint dataPoint = CollectedDataPoint.builder()
+                .collectorId(rule.getCollectorId())
+                .ruleId(rule.getId())
+                .rawValue(converted)
+                .stringValue(String.valueOf(converted))
+                .dataUnit(rule.getDataUnit())
+                .insertTime(LocalDateTime.now()) // ⚠️ 关键:在此刻固化时间!
+                .build();
+
+        // 3. 异步提交给处理器(避免阻塞采集线程)
+        CompletableFuture.runAsync(() -> {
+            try {
+                dataHandler.handle(dataPoint); // 传入的是独立副本
+            } catch (Exception e) {
+                log.error("数据处理失败, ruleId={}", rule.getId(), e);
+            }
+        }, dataHandlerExecutor); // 使用独立线程池
+    }
     private void pollCollectorGroup(RuleLoader.CollectorWithRules group) {
         executor.execute(() -> {
             CollectorEntity collector = group.collector();
@@ -70,7 +108,8 @@ public class DatabaseDrivenModbusCollector {
                         try {
                             Object rawValue = readRawValue(helper, rule);
                             double converted = applyLinearTransform(rawValue, rule);
-                            dataHandler.handle(rule, rawValue, converted);
+                            //dataHandler.handle(rule, rawValue, converted);
+                            processRule(rule, rawValue, rule.getConvertA(), rule.getConvertB());
                         } catch (Exception e) {
                             log.error("规则 [{}] 读取失败: {}", rule.getId(), e.getMessage());
                         }

+ 24 - 0
SERVER/ChickenFarmV3/vb-collection-modules/vb-collection-core/src/main/java/cn/vber/collection/domain/CollectedDataPoint.java

@@ -0,0 +1,24 @@
+package cn.vber.collection.domain;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.LocalDateTime;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class CollectedDataPoint {
+    private Long collectorId;
+    private Long ruleId;
+    private double rawValue;
+    private String stringValue;
+    private String dataUnit;
+    private LocalDateTime insertTime;
+
+
+
+}

+ 2 - 1
SERVER/ChickenFarmV3/vb-collection-modules/vb-collection-core/src/main/java/cn/vber/collection/handler/DataPointHandler.java

@@ -1,7 +1,8 @@
 package cn.vber.collection.handler;
 
 import cn.vber.collection.domain.CollectRuleEntity;
+import cn.vber.collection.domain.CollectedDataPoint;
 
 public interface DataPointHandler {
-    void handle(CollectRuleEntity rule, Object rawValue, double convertedValue);
+    void handle(CollectedDataPoint dataPoint);
 }

+ 37 - 0
SERVER/ChickenFarmV3/vb-collection-modules/vb-collection-core/src/main/java/cn/vber/collection/handler/DatabaseDataHandler.java

@@ -0,0 +1,37 @@
+package cn.vber.collection.handler;
+
+import cn.vber.collection.domain.CollectRuleEntity;
+import cn.vber.collection.domain.CollectedDataPoint;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+
+@RequiredArgsConstructor
+@Service
+@Slf4j
+public class DatabaseDataHandler implements DataPointHandler {
+
+
+    @Override
+    public void handle(CollectedDataPoint dataPoint) {
+        //1.
+//        try {
+//            String sql = """
+//                INSERT INTO d_modbus_data_history
+//                (collector_id, rule_id, value, data_unit, insert_time)
+//                VALUES (?, ?, ?, ?, ?)
+//                """;
+//
+//            jdbcTemplate.update(sql,
+//                    dataPoint.getCollectorId(),
+//                    dataPoint.getRuleId(),
+//                    dataPoint.getRawValue(),
+//                    dataPoint.getDataUnit(),
+//                    Timestamp.valueOf(dataPoint.getInsertTime())
+//            );
+//
+//        } catch (Exception e) {
+//            throw new RuntimeException("数据库写入失败", e);
+//        }
+    }
+}

+ 17 - 2
SERVER/ChickenFarmV3/vb-collection-modules/vb-collection-core/src/main/java/cn/vber/collection/handler/RedisProducerHandler.java

@@ -1,10 +1,25 @@
 package cn.vber.collection.handler;
 
 import cn.vber.collection.domain.CollectRuleEntity;
+import cn.vber.collection.domain.CollectedDataPoint;
+import cn.vber.common.json.utils.JsonUtils;
+import cn.vber.common.redis.utils.RedisUtils;
+import lombok.extern.slf4j.Slf4j;
 
+@Slf4j
 public class RedisProducerHandler implements DataPointHandler{
-    @Override
-    public void handle(CollectRuleEntity rule, Object rawValue, double convertedValue) {
+    private static final String QUEUE_KEY = "modbus:queue";
 
+    @Override
+    public void handle(CollectedDataPoint dataPoint) {
+        try {
+            String json = JsonUtils.toJsonString(dataPoint);
+            // 2. 使用 RedisUtils 写入队列(LPUSH)
+            // 注意:这里使用 RDeque 的 push() = LPUSH
+            RedisUtils.pushToListHead(QUEUE_KEY, json);
+        } catch (Exception e) {
+            log.error("写入 Redis 失败", e);
+            // 可选:降级写入本地文件 or 告警
+        }
     }
 }

+ 10 - 0
SERVER/ChickenFarmV3/vb-collection-modules/vb-collection-core/src/main/java/cn/vber/collection/service/ModbusDataConsumeService.java

@@ -0,0 +1,10 @@
+package cn.vber.collection.service;
+
+public interface ModbusDataConsumeService {
+    /**
+     * 消费 Redis 队列中的 Modbus 数据
+     * @param batchSize 批量大小(可选)
+     * @return 消费成功的记录数
+     */
+    int consumeModbusQueue(int batchSize);
+}

+ 58 - 0
SERVER/ChickenFarmV3/vb-collection-modules/vb-collection-core/src/main/java/cn/vber/collection/service/impl/ModbusDataConsumeServiceImpl.java

@@ -0,0 +1,58 @@
+package cn.vber.collection.service.impl;
+
+import cn.vber.collection.domain.CollectedDataPoint;
+import cn.vber.collection.handler.DatabaseDataHandler;
+import cn.vber.collection.service.ModbusDataConsumeService;
+import cn.vber.common.json.utils.JsonUtils;
+import cn.vber.common.redis.utils.RedisUtils;
+import io.swagger.v3.core.util.Json;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.util.List;
+
+@RequiredArgsConstructor
+@Service
+@Slf4j
+public class ModbusDataConsumeServiceImpl implements ModbusDataConsumeService {
+
+    private static final String QUEUE_KEY = "modbus:queue";
+    private static final String DLQ_KEY = "modbus:dlq";
+
+
+    private final DatabaseDataHandler databaseHandler; // 你的 DB 处理器
+    @Override
+    @Transactional(rollbackFor = Exception.class)
+    public int consumeModbusQueue(int batchSize) {
+        if (batchSize <= 0) batchSize = 1000;
+
+        // 1. 从 Redis 弹出数据
+        List<String> jsonList = RedisUtils.popBatchFromListHead(QUEUE_KEY, batchSize);
+        if (jsonList.isEmpty()) {
+            log.info("Modbus 队列为空,无需处理");
+            return 0;
+        }
+
+        log.info("开始处理 {} 条 Modbus 数据", jsonList.size());
+        int successCount = 0;
+
+        // 2. 批量处理
+        for (String json : jsonList) {
+            try {
+                CollectedDataPoint point = JsonUtils.parseObject(json, CollectedDataPoint.class);
+                databaseHandler.handle(point); // 写入数据库
+                successCount++;
+            } catch (Exception e) {
+                log.error("处理数据失败,已移至 DLQ: {}", json, e);
+                // 3. 失败数据进入死信队列
+                RedisUtils.pushToListHead(DLQ_KEY, json);
+            }
+        }
+
+        log.info("Modbus 数据消费完成,成功: {}, 失败: {}",
+                successCount, jsonList.size() - successCount);
+        return successCount;
+    }
+}