klzhangweiya 1 周之前
父节点
当前提交
b578b78e13

+ 2 - 2
SERVER/ChickenFarmV3/vb-collection-modules/vb-collection-core/src/main/java/cn/vber/collection/core/DatabaseDrivenModbusCollector.java

@@ -72,11 +72,11 @@ public class DatabaseDrivenModbusCollector {
                             double converted = applyLinearTransform(rawValue, rule);
                             dataHandler.handle(rule, rawValue, converted);
                         } catch (Exception e) {
-                            log.error("规则 [" + rule.getId() + "] 读取失败: " + e.getMessage());
+                            log.error("规则 [{}] 读取失败: {}", rule.getId(), e.getMessage());
                         }
                     }
                 } catch (Exception e) {
-                    log.error("采集器 [" + collector.getId() + "] 连接失败: " + e.getMessage());
+                    log.error("采集器 [{}] 连接失败: {}", collector.getId(), e.getMessage());
                 }
             }
         });

+ 10 - 0
SERVER/ChickenFarmV3/vb-collection-modules/vb-collection-core/src/main/java/cn/vber/collection/handler/RedisProducerHandler.java

@@ -0,0 +1,10 @@
+package cn.vber.collection.handler;
+
+import cn.vber.collection.domain.CollectRuleEntity;
+
+public class RedisProducerHandler implements DataPointHandler{
+    @Override
+    public void handle(CollectRuleEntity rule, Object rawValue, double convertedValue) {
+
+    }
+}

+ 28 - 4
SERVER/ChickenFarmV3/vb-common/vb-common-redis/src/main/java/cn/vber/common/redis/utils/RedisUtils.java

@@ -7,10 +7,7 @@ import org.redisson.api.*;
 import org.redisson.api.options.KeysScanOptions;
 
 import java.time.Duration;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -584,4 +581,31 @@ public class RedisUtils {
         RKeys rKeys = CLIENT.getKeys();
         return rKeys.countExists(key) > 0;
     }
+    /**
+     * 从 List 头部弹出一个元素(LPOP)
+     */
+    public static <T> T popFromListHead(final String key){
+        RDeque<T> rDeque  = CLIENT.getDeque(key);
+        return rDeque.poll(); //RDeque.poll() 在所有版本都存在
+    }
+    /**
+     * 批量弹出
+     */
+    public static <T> List<T> popBatchFromListHead(final String key, int count) {
+        RDeque<T> rDeque = CLIENT.getDeque(key);
+        List<T> result = new ArrayList<>(count);
+        for (int i = 0; i < count; i++) {
+            T item = rDeque.poll();
+            if (item == null) break;
+            result.add(item);
+        }
+        return result;
+    }
+    /**
+     * 写入队列(LPUSH)
+     */
+    public static <T> void pushToListHead(final String key, T value) {
+        RDeque<T> rDeque = CLIENT.getDeque(key);
+        rDeque.push(value); // = LPUSH
+    }
 }