فهرست منبع

feat(stompClient):WebSocket结合STOMP协议实现消息推送

HMY 10 ماه پیش
والد
کامیت
54e84e3324

+ 33 - 0
admin/src/main/java/com/dcs/hnyz/config/WebSocketStompConfig.java

@@ -0,0 +1,33 @@
+package com.dcs.hnyz.config;
+
+import org.springframework.context.annotation.Configuration;
+import org.springframework.messaging.simp.config.MessageBrokerRegistry;
+import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
+import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
+import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
+
+/**
+ * WebSocketConfig
+ * websocket配置
+ * @author: hmy
+ * @date: 2025/5/30 15:45
+ */
+@Configuration
+@EnableWebSocketMessageBroker
+public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer {
+
+    @Override
+    //定义客户端连接 WebSocket 的入口
+    public void registerStompEndpoints(StompEndpointRegistry registry) {
+        registry.addEndpoint("/ws/configurationPage")
+                .setAllowedOriginPatterns("*")
+                .withSockJS();
+    }
+
+    @Override
+    //配置消息代理规则
+    public void configureMessageBroker(MessageBrokerRegistry registry) {
+        registry.enableSimpleBroker("/topic"); // 推送路径
+        registry.setApplicationDestinationPrefixes("/app"); // 接收路径前缀
+    }
+}

+ 27 - 0
admin/src/main/java/com/dcs/hnyz/webSocket/PageWebSocketController.java

@@ -0,0 +1,27 @@
+package com.dcs.hnyz.webSocket;
+
+import com.dcs.hnyz.domain.dto.PageDeviceSubscribeDTO;
+import com.dcs.hnyz.webSocket.handler.WebSocketSessionManager;
+import org.springframework.messaging.handler.annotation.MessageMapping;
+import org.springframework.messaging.handler.annotation.Payload;
+import org.springframework.stereotype.Controller;
+import org.springframework.web.bind.annotation.CrossOrigin;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.security.Principal;
+
+/**
+ * PageWebSocketController
+ * 处理客户端的订阅请求,记录pageCode和sessionId
+ * @author: hmy
+ * @date: 2025/5/30 15:59
+ */
+@CrossOrigin(origins = "*", allowCredentials = "false")
+@Controller
+public class PageWebSocketController {
+
+    @MessageMapping("/subscribe/pageDevice")
+    public void subscribePageDevice(@Payload PageDeviceSubscribeDTO dto, Principal user) {
+        WebSocketSessionManager.register(dto.getPageCode(), user.getName(), dto.getDeviceGroupMap());
+    }
+}

+ 32 - 0
admin/src/main/java/com/dcs/hnyz/webSocket/handler/WebSocketSessionManager.java

@@ -0,0 +1,32 @@
+package com.dcs.hnyz.webSocket.handler;
+
+import com.dcs.hnyz.cache.CacheCenter;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * WebSocketSessionManager
+ * 管理页面订阅关系
+ * @author: hmy
+ * @date: 2025/5/30 16:04
+ */
+public class WebSocketSessionManager {
+    // pageCode -> 用户名 -> 大设备 -> 小设备列表
+    private static final Map<String, Map<String, Map<String, List<String>>>> sessionMap = new ConcurrentHashMap<>();
+
+    public static void register(String pageCode, String username, Map<String, List<String>> deviceGroupMap) {
+        sessionMap
+                .computeIfAbsent(pageCode, k -> new ConcurrentHashMap<>())
+                .put(username, deviceGroupMap);
+        // 同步更新全局缓存
+        CacheCenter.pageDeviceGroupMap.put(pageCode, deviceGroupMap);
+    }
+
+    public static Map<String, Map<String, List<String>>> getPageSessions(String pageCode) {
+        return sessionMap.getOrDefault(pageCode, Collections.emptyMap());
+    }
+}
+

+ 1 - 0
framework/src/main/java/com/dcs/framework/config/ResourcesConfig.java

@@ -62,6 +62,7 @@ public class ResourcesConfig implements WebMvcConfigurer
         config.addAllowedHeader("*");
         // 设置访问源请求方法
         config.addAllowedMethod("*");
+        config.setAllowCredentials(true); // 允许带认证信息的跨域请求
         // 有效期 1800秒
         config.setMaxAge(1800L);
         // 添加映射路径,拦截一切请求

+ 2 - 0
framework/src/main/java/com/dcs/framework/config/SecurityConfig.java

@@ -118,6 +118,8 @@ public class SecurityConfig
                         .antMatchers("/webSocket/**", "/modbus/getDataTwinEquipmentValues/**", "/datatwin/**", "/influxdb/**"
 //                                , "/**"
                         ).permitAll()
+                        .antMatchers("/ws/**").permitAll()
+
                         // 除上面外的所有请求全部需要鉴权认证
                         .anyRequest().authenticated();
             })

+ 94 - 0
ui/src/utils/ws/stompClient.js

@@ -0,0 +1,94 @@
+// src/utils/stompClient.js
+import SockJS from 'sockjs-client';
+import { Client } from '@stomp/stompjs';
+import { wsConfig } from '@/config';
+
+class StompClient {
+  constructor(url) {
+    this.url = url;
+    this.client = null;
+    this.subscriptions = new Map(); // pageCode -> subscription
+    this.connectedCallbackQueue = [];
+    this.connected = false;
+  }
+
+  // 初始化连接
+  connect() {
+    if (this.client) return;
+
+    const socket = new SockJS(this.url);
+    this.client = new Client({
+      webSocketFactory: () => socket,
+      reconnectDelay: 3000, // 断线重连时间
+      // debug: (str) => console.log('[STOMP]', str),
+      onConnect: () => {
+        this.connected = true;
+        console.log('STOMP 连接成功');
+        this.connectedCallbackQueue.forEach((cb) => cb());
+        this.connectedCallbackQueue = [];
+      },
+      onStompError: (frame) => {
+        console.error('STOMP 错误', frame.headers['message'], frame.body);
+      },
+      onDisconnect: () => {
+        this.connected = false;
+        console.warn('STOMP 连接断开');
+      },
+    });
+
+    this.client.activate();
+  }
+
+  // 订阅页面数据
+  subscribeToPage(pageCode, onMessage) {
+    if (!pageCode || typeof onMessage !== 'function') return;
+
+    const doSubscribe = () => {
+      const topic = `/topic/page/${pageCode}`;
+      if (this.subscriptions.has(pageCode)) {
+        console.warn(`已订阅过页面 ${pageCode}`);
+        return;
+      }
+
+      const subscription = this.client.subscribe(topic, (msg) => {
+        const data = JSON.parse(msg.body);
+        // console.log(`接收到页面 ${pageCode} 数据:`, data);
+        onMessage(data);
+      });
+
+      this.subscriptions.set(pageCode, subscription);
+      console.log(`订阅页面 ${pageCode} 成功`);
+    };
+
+    if (this.connected) {
+      doSubscribe();
+    } else {
+      this.connectedCallbackQueue.push(doSubscribe);
+      this.connect();
+    }
+  }
+
+  // 取消订阅
+  unsubscribeFromPage(pageCode) {
+    const subscription = this.subscriptions.get(pageCode);
+    if (subscription) {
+      subscription.unsubscribe();
+      this.subscriptions.delete(pageCode);
+      console.log(`已取消订阅页面 ${pageCode}`);
+    }
+  }
+
+  // 主动断开连接
+  disconnect() {
+    if (this.client) {
+      this.client.deactivate();
+      this.client = null;
+      this.connected = false;
+      this.subscriptions.clear();
+      console.log('STOMP 客户端已断开');
+    }
+  }
+}
+
+// 实例导出
+export const stompClient = new StompClient(wsConfig.stompEndpoint);