Преглед изворни кода

Update 优化sse 修复相同token历史连接未关闭问题;新增心跳监测,关闭无效连接

Yue пре 1 недеља
родитељ
комит
5634bb5dcb

+ 57 - 0
SERVER/VberAdminPlusV3/vber-common/vber-common-sse/src/main/java/com/vber/common/sse/core/SseEmitterManager.java

@@ -1,14 +1,21 @@
 package com.vber.common.sse.core;
 
+import cn.hutool.core.collection.CollUtil;
 import cn.hutool.core.map.MapUtil;
+
+import com.vber.common.core.utils.SpringUtils;
 import com.vber.common.redis.utils.RedisUtils;
 import com.vber.common.sse.dto.SseMessageDto;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
 /**
@@ -26,6 +33,12 @@ public class SseEmitterManager {
 
     private final static Map<Long, Map<String, SseEmitter>> USER_TOKEN_EMITTERS = new ConcurrentHashMap<>();
 
+    public SseEmitterManager() {
+        // 定时执行 SSE 心跳检测
+        SpringUtils.getBean(ScheduledExecutorService.class)
+                .scheduleWithFixedDelay(this::sseMonitor, 60L, 60L, TimeUnit.SECONDS);
+    }
+
     /**
      * 建立与指定用户的 SSE 连接
      *
@@ -38,6 +51,12 @@ public class SseEmitterManager {
         // 每个用户可以有多个 SSE 连接,通过 token 进行区分
         Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.computeIfAbsent(userId, k -> new ConcurrentHashMap<>());
 
+        // 关闭已存在的SseEmitter,防止超过最大连接数
+        SseEmitter oldEmitter = emitters.remove(token);
+        if (oldEmitter != null) {
+            oldEmitter.complete();
+        }
+
         // 创建一个新的 SseEmitter 实例,超时时间设置为一天 避免连接之后直接关闭浏览器导致连接停滞
         SseEmitter emitter = new SseEmitter(86400000L);
 
@@ -97,6 +116,44 @@ public class SseEmitterManager {
         }
     }
 
+    /**
+     * SSE心跳检测,关闭无效连接
+     */
+    public void sseMonitor() {
+        final SseEmitter.SseEventBuilder heartbeat = SseEmitter.event().comment("heartbeat");
+        // 记录需要移除的用户ID
+        List<Long> toRemoveUsers = new ArrayList<>();
+
+        USER_TOKEN_EMITTERS.forEach((userId, emitterMap) -> {
+            if (CollUtil.isEmpty(emitterMap)) {
+                toRemoveUsers.add(userId);
+                return;
+            }
+
+            emitterMap.entrySet().removeIf(entry -> {
+                try {
+                    entry.getValue().send(heartbeat);
+                    return false;
+                } catch (Exception ex) {
+                    try {
+                        entry.getValue().complete();
+                    } catch (Exception ignore) {
+                        // 忽略重复关闭异常
+                    }
+                    return true; // 发送失败 → 移除该连接
+                }
+            });
+
+            // 移除空连接用户
+            if (emitterMap.isEmpty()) {
+                toRemoveUsers.add(userId);
+            }
+        });
+
+        // 循环结束后统一清理空用户,避免并发修改异常
+        toRemoveUsers.forEach(USER_TOKEN_EMITTERS::remove);
+    }
+
     /**
      * 订阅SSE消息主题,并提供一个消费者函数来处理接收到的消息
      *