vue3(stompjs)+springboot3.2实现websocket STOMP订阅发布 作者:马育民 • 2026-04-16 10:46 • 阅读:10000 # 介绍 在企业后台、物联网、告警通知等场景中,后端经常需要主动向前端推送实时数据。相比传统轮询,WebSocket + STOMP 是目前更高效、更主流的方案,无需前端频繁请求,后端可主动推送,大幅提升性能和用户体验。 ### 一、STOMP 是什么? STOMP 是基于 WebSocket 的简单文本消息协议,核心作用是提供标准的订阅/发布(Pub/Sub)模式。它运行在 WebSocket 长连接之上,相当于给“连接通道”定了一套通信规则,让前端可以订阅指定主题,后端向主题发送消息时,所有订阅该主题的前端会自动接收消息,无需手动管理连接和消息分发逻辑。 ### 二、方案优势 - 长连接特性,低延迟、轻量高效,避免传统轮询的资源浪费; - 自带广播、单点推送、主题订阅能力,无需手动开发分发逻辑; - SpringBoot 官方原生支持,Vue3 生态适配成熟,集成成本低; - 无需手动维护前端连接状态,框架自动管理,降低开发和维护成本。 ### 三、整体实现流程 ##### 1. 后端(SpringBoot3)实现 开启 STOMP 消息代理,配置连接端点和消息主题规则,暴露前端可连接的地址,同时提供消息推送接口,可向指定主题或单个用户推送消息,无需关注前端连接细节。 ##### 2. 前端(Vue3)实现 基于 Vue3 环境,通过 STOMP 相关依赖建立与后端的长连接,无需手动处理 WebSocket 底层逻辑;连接成功后,订阅后端定义的消息主题,即可实时接收后端推送的消息;同时支持主动向后端发送消息,满足双向通信需求。 前端核心逻辑简洁:初始化连接 → 连接成功后订阅指定主题 → 监听并处理接收的实时消息 → 断开连接时释放资源,全程无需关注连接管理和消息分发,框架自动完成。 ### 四、适用场景 该方案广泛应用于企业级开发,尤其适合以下场景: - 管理后台实时通知、系统告警推送(如设备异常、审批提醒); - 物联网平台(如智慧农业、工业监控)的设备状态、实时数据推送; - 聊天室、系统公告广播、多端数据同步等场景。 # 后端 ### 添加依赖 ``` org.springframework.boot spring-boot-starter-websocket ``` ### 修改原授权校验 修改 `SecurityConfig` 类, 放行 WebSocket 校验,添加下面代码: ``` // 放行 WebSocket 端点(SockJS 和 STOMP) .requestMatchers("/ws/**").permitAll() .requestMatchers("/ws").permitAll() ``` ### 添加websocket授权校验 添加 `WebSocketAuthInterceptor` 类,校验逻辑与过滤器类似,**校验token**,内容如下: ``` package com.smartagriculture.config; import com.smartagriculture.utils.JwtUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.simp.stomp.StompCommand; import org.springframework.messaging.simp.stomp.StompHeaderAccessor; import org.springframework.messaging.support.ChannelInterceptor; import org.springframework.messaging.support.MessageHeaderAccessor; import org.springframework.stereotype.Component; /** * WebSocket 认证拦截器 * 在客户端连接时验证 JWT token */ @Slf4j @Component public class WebSocketAuthInterceptor implements ChannelInterceptor { @Autowired private JwtUtil jwtUtil; @Override public Message> preSend(Message> message, MessageChannel channel) { StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); if (accessor != null && StompCommand.CONNECT.equals(accessor.getCommand())) { // 从连接头中获取 token String token = accessor.getFirstNativeHeader("Authorization"); if (token != null && token.startsWith("Bearer ")) { token = token.substring(7); try { // 验证 token if (jwtUtil.validateToken(token)) { String username = jwtUtil.getUsernameFromToken(token); accessor.setUser(() -> username); log.info("WebSocket 连接认证成功,用户: {}", username); } else { log.warn("WebSocket 连接认证失败:无效的 token"); throw new RuntimeException("无效的 token"); } } catch (Exception e) { log.error("WebSocket 连接认证失败: {}", e.getMessage()); throw new RuntimeException("Token 验证失败"); } } else { log.warn("WebSocket 连接缺少 Authorization 头"); throw new RuntimeException("缺少认证信息"); } } return message; } } ``` ### 添加websocket配置类 创建 `WebSocketConfig` 类,内容如下: ``` package com.smartagriculture.config; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.simp.config.ChannelRegistration; import org.springframework.messaging.simp.config.MessageBrokerRegistry; import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker; import org.springframework.web.socket.config.annotation.StompEndpointRegistry; import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer; /** * WebSocket 配置类(使用 STOMP 协议) */ @Configuration @EnableWebSocketMessageBroker public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { @Autowired private WebSocketAuthInterceptor webSocketAuthInterceptor; @Override public void configureMessageBroker(MessageBrokerRegistry config) { // 启用简单消息代理,用于广播消息 config.enableSimpleBroker("/topic"); // 配置应用前缀,用于处理客户端发送的消息 config.setApplicationDestinationPrefixes("/app"); // 配置用户前缀,用于一对一消息 config.setUserDestinationPrefix("/user"); } @Override public void registerStompEndpoints(StompEndpointRegistry registry) { // 注册 STOMP 端点,只使用原生 WebSocket registry.addEndpoint("/ws") .setAllowedOrigins("*"); } @Override public void configureClientInboundChannel(ChannelRegistration registration) { // 添加认证拦截器 registration.interceptors(webSocketAuthInterceptor); } } ``` # 前端 ### 安装依赖 ``` npm install @stomp/stompjs ``` **提示:**不要使用 `sockjs-client `,与vite不兼容,运行可行,打包报错 ### 添加服务 创建文件 `src/services/websocket.js` 内容如下: ``` /** * WebSocket 服务封装 * 使用原生 WebSocket + STOMP.js 与后端通信 */ import { Stomp } from '@stomp/stompjs'; class WebSocketService { constructor() { this.stompClient = null; this.subscriptions = new Map(); this.isConnected = false; this.reconnectAttempts = 0; this.maxReconnectAttempts = 5; this.reconnectDelay = 3000; this.connectionPromise = null; this.messageQueue = []; } /** * 获取 WebSocket 连接 URL */ getWebSocketUrl() { // 使用与 API 相同的基础 URL,但替换为 WebSocket 端点 const baseURL = 'http://localhost:9999'; // 将 http 改为 ws,https 改为 wss const wsURL = baseURL.replace('http://', 'ws://').replace('https://', 'wss://'); return `${wsURL}/ws`; } /** * 获取 JWT Token */ getToken() { return localStorage.getItem('token') || ''; } /** * 建立 WebSocket 连接 * @returns {Promise} 连接成功的 Promise */ connect() { if (this.connectionPromise) { return this.connectionPromise; } if (this.isConnected && this.stompClient) { return Promise.resolve(); } this.connectionPromise = new Promise((resolve, reject) => { const token = this.getToken(); if (!token) { reject(new Error('未找到认证 token')); return; } const wsUrl = this.getWebSocketUrl(); const socket = new WebSocket(wsUrl); this.stompClient = Stomp.over(socket); // 配置 STOMP 客户端 this.stompClient.configure({ reconnectDelay: this.reconnectDelay, maxReconnectAttempts: this.maxReconnectAttempts, onStompError: (frame) => { console.error('STOMP 错误:', frame); }, onWebSocketError: (error) => { console.error('WebSocket 错误:', error); this.isConnected = false; this.connectionPromise = null; reject(new Error('WebSocket 连接错误')); }, onDisconnect: () => { console.log('WebSocket 连接断开'); this.isConnected = false; this.stompClient = null; this.connectionPromise = null; } }); // 连接到 STOMP 服务器 this.stompClient.connect( { 'Authorization': `Bearer ${token}` }, () => { console.log('STOMP 连接成功'); console.log('当前连接状态:', this.stompClient.connected); // 连接成功后的处理 console.log('WebSocket 连接成功'); this.isConnected = true; this.reconnectAttempts = 0; this.connectionPromise = null; // 处理队列中的消息 this.flushMessageQueue(); resolve(); }, (error) => { console.error('STOMP 连接失败:', error); this.isConnected = false; this.connectionPromise = null; reject(new Error('STOMP 连接失败')); } ); }); return this.connectionPromise; } /** * 断开 WebSocket 连接 */ disconnect() { if (this.stompClient) { this.stompClient.disconnect(() => { console.log('STOMP 连接已断开'); }); this.stompClient = null; } this.subscriptions.clear(); this.isConnected = false; this.connectionPromise = null; this.messageQueue = []; console.log('WebSocket 连接已断开'); } /** * 订阅主题 * @param {string} topic 订阅主题,如 '/topic/device/realtime/123' * @param {function} callback 收到消息时的回调函数 * @returns {string} 订阅ID,用于取消订阅 */ subscribe(topic, callback) { if (!topic || typeof callback !== 'function') { console.error('订阅参数无效'); return null; } // 如果已经订阅过,先取消 if (this.subscriptions.has(topic)) { this.unsubscribe(topic); } if (this.isConnected && this.stompClient) { const subscription = this.stompClient.subscribe(topic, (message) => { try { const data = JSON.parse(message.body); callback(data); } catch (error) { console.error('解析消息失败:', error); } }); this.subscriptions.set(topic, subscription); console.log(`订阅成功: ${topic}`); return topic; } else { // 加入消息队列 this.messageQueue.push({ type: 'SUBSCRIBE', topic, callback }); console.log(`订阅请求加入队列 (等待连接): ${topic}`); // 尝试连接 if (!this.connectionPromise) { this.connect().catch(error => { console.error('连接失败,订阅失败:', error); }); } return topic; } } /** * 取消订阅 * @param {string} topic 订阅主题 */ unsubscribe(topic) { if (this.subscriptions.has(topic)) { const subscription = this.subscriptions.get(topic); if (subscription) { subscription.unsubscribe(); console.log(`取消订阅: ${topic}`); } this.subscriptions.delete(topic); } } /** * 发送消息 * @param {string} topic 发送主题 * @param {object} data 消息数据 */ send(topic, data = {}) { if (this.isConnected && this.stompClient) { this.stompClient.send(topic, {}, JSON.stringify(data)); console.log(`发送消息到: ${topic}`, data); } else { // 加入消息队列 this.messageQueue.push({ type: 'SEND', topic, data }); console.log(`消息加入队列 (等待连接): ${topic}`, data); // 尝试连接 if (!this.connectionPromise) { this.connect().catch(error => { console.error('连接失败,消息发送失败:', error); }); } } } /** * 处理队列中的消息 */ flushMessageQueue() { while (this.messageQueue.length > 0) { const message = this.messageQueue.shift(); if (this.isConnected && this.stompClient) { if (message.type === 'SUBSCRIBE') { this.subscribe(message.topic, message.callback); } else if (message.type === 'SEND') { this.send(message.topic, message.data); } } } } /** * 请求设备历史数据 * @param {number} deviceId 设备ID * @param {number} limit 每页数量 * @param {number} offset 偏移量 */ requestDeviceHistory(deviceId, limit = 100, offset = 0) { const topic = `/app/device/history/${deviceId}`; this.send(topic, { limit, offset }); } /** * 订阅设备历史数据响应 * @param {function} callback 收到数据时的回调函数 */ subscribeDeviceHistory(callback) { return this.subscribe('/user/queue/device/history', callback); } /** * 订阅设备实时数据 * @param {number} deviceId 设备ID * @param {function} callback 收到数据时的回调函数 */ subscribeDeviceRealtime(deviceId, callback) { return this.subscribe(`/topic/device/realtime/${deviceId}`, callback); } } // 导出单例实例 export const websocketService = new WebSocketService(); export default websocketService; ``` ### 修改页面 在需要调用 websocket 的页面上加下面代码: ``` import websocketService from '../services/websocket' ``` ``` // 初始化WebSocket连接 async initWebSocketConnection() { if (!this.selectedDevice || !this.selectedDevice.id) return console.log('开始初始化WebSocket连接,设备ID:', this.selectedDevice.id) try { // 建立连接 console.log('正在建立WebSocket连接...') await websocketService.connect() this.isConnected = true console.log('WebSocket连接建立成功') // 订阅实时数据 console.log('正在订阅设备实时数据,主题:', `/topic/device/realtime/${this.selectedDevice.id}`) this.subscriptionId = websocketService.subscribeDeviceRealtime(this.selectedDevice.id, (data) => { this.handleRealtimeData(data) }) console.log('订阅成功,订阅ID:', this.subscriptionId) // 初始获取一次数据(可选,确保页面加载时有数据) this.fetchInitialData() } catch (error) { console.error('WebSocket 连接失败:', error) this.isConnected = false } }, // 断开WebSocket连接 disconnectWebSocket() { if (this.subscriptionId) { websocketService.unsubscribe(this.subscriptionId) this.subscriptionId = null } websocketService.disconnect() this.isConnected = false }, // 处理实时数据 handleRealtimeData(data) { console.log('收到实时数据:', data) if (data) { this.$emit('update:deviceData', data) } }, ``` 原文出处:http://malaoshi.top/show_1GW38nTcMYZ2.html