国内高清视频素材网站,网站做两个版本,建湖企业做网站多少钱,wordpress广告图片自动轮播代码目录 一、序言二、开启RabbitMQ外部消息代理三、代码示例1、Maven依赖项2、相关实体3、自定义用户认证拦截器4、Websocket外部消息代理配置5、ChatController6、前端页面chat.html 四、测试示例1、群聊、私聊、后台定时推送测试2、登录RabbitMQ控制台查看队列信息 五、结语 一、… 目录 一、序言二、开启RabbitMQ外部消息代理三、代码示例1、Maven依赖项2、相关实体3、自定义用户认证拦截器4、Websocket外部消息代理配置5、ChatController6、前端页面chat.html 四、测试示例1、群聊、私聊、后台定时推送测试2、登录RabbitMQ控制台查看队列信息 五、结语 一、序言
上节我们在 WebSocket的那些事4-Spring中的STOMP支持详解 中详细说明了通过Spring内置消息代理结合STOMP子协议进行Websocket通信以及相关注解的使用及原理。
但是Spring内置消息代理会有一些限制比如只支持STOMP协议的一部分命令像acks、receipts命令都是不支持的还有由于内置消息代理把消息存储在内存当应用不可用时客户端也就订阅不到到后台推送的消息。
这节我们将会使用支持STOMP协议的外部消息代理RabbitMQ进行Websocket通信。 二、开启RabbitMQ外部消息代理
服务端路由发送消息以及客户端订阅消息都要通过STOMP协议与RabbitMQ进行交互由于RabbitMQ默认没有启动STOMP插件因此我们需要先启用该插件。
rabbitmq-plugins enable rabbitmq_stomp启动该插件后RabbitMQ中STOMP适配器默认会监听61613端口如果是云服务器需要把该端口在安全组中放开。
关于该插件说明请参考RabbitMQ中STOMP插件说明。 三、代码示例
我们在 WebSocket的那些事4-Spring中的STOMP支持详解中写了一个简单的聊天Demo示例下面我们对该聊天Demo示例进行改造将Spring内置消息代理替换成RabbitMQ外部消息代理。
1、Maven依赖项
服务端和客户端与外部消息代理都是通过TCP进行通信Spring底层默认使用的是Netty和Reactor因此需要引入相关依赖项。
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-websocket/artifactId
/dependency
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-reactor-netty/artifactId
/dependency
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-thymeleaf/artifactId
/dependency2、相关实体
(1) 请求消息参数
Data
public class WebSocketMsgDTO {private String name;private String content;
}
(2) 响应消息内容
Data
Builder
AllArgsConstructor
NoArgsConstructor
public class WebSocketMsgVO {private String content;
}
(3) 自定义认证用户信息
Data
AllArgsConstructor
NoArgsConstructor
public class StompAuthenticatedUser implements Principal {/*** 用户唯一ID*/private String userId;/*** 用户昵称*/private String nickName;/*** 用于指定用户消息推送的标识* return*/Overridepublic String getName() {return this.userId;}}3、自定义用户认证拦截器
Slf4j
public class UserAuthenticationChannelInterceptor implements ChannelInterceptor {private static final String USER_ID User-ID;private static final String USER_NAME User-Name;Overridepublic Message? preSend(Message? message, MessageChannel channel) {StompHeaderAccessor accessor MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);// 如果是连接请求记录userIdif (StompCommand.CONNECT.equals(accessor.getCommand())) {String userID accessor.getFirstNativeHeader(USER_ID);String username accessor.getFirstNativeHeader(USER_NAME);log.info(Stomp User-Related headers found, userID: {}, username:{}, userID, username);accessor.setUser(new StompAuthenticatedUser(userID, username));}return message;}}
4、Websocket外部消息代理配置
Spring中与外部消息代理通信的中间方被称之为Broker Relay它会维护一个系统共享的单一TCP连接和外部消息代理进行通信该TCP连接仅仅适用于服务端用来发送消息而不是接收消息通过Broker Relay的systemLogin和systemPasscode属性可以设置该连接的认证信息。
Broker Relay也会为每个连接的Websocket客户端创建一个TCP连接该连接用来接收消息通过clientLogin和clientPasscode属性可以设置连接的认证信息。
/*** Websocket连接外部消息代理配置* author Nick Liu* date 2023/9/6*/
Configuration
EnableWebSocketMessageBroker
public class WebsocketExternalMessageBrokerConfig implements WebSocketMessageBrokerConfigurer {Overridepublic void configureClientInboundChannel(ChannelRegistration registration) {// 拦截器配置registration.interceptors(new UserAuthenticationChannelInterceptor());}Overridepublic void registerStompEndpoints(StompEndpointRegistry registry) {registry.addEndpoint(/websocket) // WebSocket握手端口.addInterceptors(new HttpSessionHandshakeInterceptor()).setAllowedOriginPatterns(*) // 设置跨域.withSockJS(); // 开启SockJS回退机制}Overridepublic void configureMessageBroker(MessageBrokerRegistry registry) {registry.setApplicationDestinationPrefixes(/app) // 发送到服务端目的地前缀.enableStompBrokerRelay(/topic) // 开启外部消息代理指定消息订阅前缀.setRelayHost(localhost) // 外部消息代理Host.setRelayPort(61613) // 外部消息代理STOMP端口.setSystemLogin(admin) // 共享系统连接用户名该连接主要用来发送消息.setSystemPasscode(admin) // 共享系统连接密码该连接主要用来发送消息.setClientLogin(admin) // 客户端连接用户名该连接主要用来接收消息.setClientPasscode(admin) // 客户端连接密码该连接主要用来接收消息.setVirtualHost(/stomp); // RabbitMQ虚拟主机}
} 备注我们可以为服务端与客户端的连接设置不同的用户针对客户端连接用户进行权限管控保证系统的安全性在这里为了方便测试我们统一用一个用户。 5、ChatController
STOMP协议并没有规定消息代理必须支持哪种类型的Destinations(目的地)但是RabbitMQ STOMP适配器只支持一些指定的目的地类型如下图
/exchange指定交换机和路由key发送和订阅来自队列的消息。/queue发送和订阅受STOMP网关管理的队列的消息最多只有一个订阅者能到消息。/amq/queue发送和订阅不受STOMP网关管理的队列的消息。/topic发送和订阅来自临时或者持久Topic的消息多个订阅者都能接收到消息。/temp-queue/发送和订阅来自临时队列的消息。 参考文档见RabbitMQ中STOMP插件说明。 在下面的示例中我们选用了/topic的开头的消息发送和订阅前缀目的地格式只能为/topic/{routing-key}routing-key不能有斜杠否则会报错。
Slf4j
Controller
RequiredArgsConstructor
public class ChatController {private final SimpUserRegistry simpUserRegistry;private final SimpMessagingTemplate simpMessagingTemplate;/*** 模板引擎为Thymeleaf需要加上spring-boot-starter-thymeleaf依赖* return*/GetMapping(/page/chat)public ModelAndView turnToChatPage() {return new ModelAndView(chat);}/*** 群聊消息处理* 这里我们通过SendTo注解指定消息目的地为/topic/chat/group如果不加该注解则会自动发送到/topic /chat/group* param webSocketMsgDTO 请求参数消息处理器会自动将JSON字符串转换为对象* return 消息内容方法返回值将会广播给所有订阅/topic/chat/group的客户端*/MessageMapping(/chat/group)SendTo(/topic/chat-group)public WebSocketMsgVO groupChat(WebSocketMsgDTO webSocketMsgDTO) {log.info(Group chat message received: {}, FastJsonUtils.toJsonString(webSocketMsgDTO));String content String.format(来自[%s]的群聊消息: %s, webSocketMsgDTO.getName(), webSocketMsgDTO.getContent());return WebSocketMsgVO.builder().content(content).build();}/*** 私聊消息处理* 这里我们通过SendToUser注解指定消息目的地为/topic/chat/private发送目的地默认会拼接上/user/前缀* 实际发送目的地为/user/topic/chat/private* param webSocketMsgDTO 请求参数消息处理器会自动将JSON字符串转换为对象* return 消息内容方法返回值将会基于SessionID单播给指定用户*/MessageMapping(/chat/private)SendToUser(/topic/chat-private)public WebSocketMsgVO privateChat(WebSocketMsgDTO webSocketMsgDTO) {log.info(Private chat message received: {}, FastJsonUtils.toJsonString(webSocketMsgDTO));String content 私聊消息回复 webSocketMsgDTO.getContent();return WebSocketMsgVO.builder().content(content).build();}/*** 定时消息推送这里我们会列举所有在线的用户然后单播给指定用户。* 通过SimpMessagingTemplate实例可以在任何地方推送消息。*/Scheduled(fixedRate 10 * 1000)public void pushMessageAtFixedRate() {log.info(当前在线人数: {}, simpUserRegistry.getUserCount());if (simpUserRegistry.getUserCount() 0) {return;}// 这里的Principal为StompAuthenticatedUser实例SetStompAuthenticatedUser users simpUserRegistry.getUsers().stream().map(simpUser - StompAuthenticatedUser.class.cast(simpUser.getPrincipal())).collect(Collectors.toSet());users.forEach(authenticatedUser - {String userId authenticatedUser.getUserId();String nickName authenticatedUser.getNickName();WebSocketMsgVO webSocketMsgVO new WebSocketMsgVO();webSocketMsgVO.setContent(String.format(定时推送的私聊消息, 接收人: %s, 时间: %s, nickName, LocalDateTime.now()));log.info(开始推送消息给指定用户, userId: {}, 消息内容:{}, userId, FastJsonUtils.toJsonString(webSocketMsgVO));simpMessagingTemplate.convertAndSendToUser(userId, /topic/chat-push, webSocketMsgVO);});}}6、前端页面chat.html
!DOCTYPE html
html langen xmlns:thhttp://www.thymeleaf.org
headmeta charsetUTF-8titlechat/titlescript srchttps://cdnjs.cloudflare.com/ajax/libs/sockjs-client/1.6.1/sockjs.min.js/scriptscript srchttps://cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.min.js/scriptscript srchttps://cdnjs.cloudflare.com/ajax/libs/jquery/3.6.4/jquery.min.js/scriptstyle#mainWrapper {width: 600px;margin: auto;}/style
/head
body
div idmainWrapperdivlabel forusername stylemargin-right: 5px姓名/labelinput idusername typetext//divdiv idmsgWrapperp stylevertical-align: top发送的消息:/ptextarea idmsgSent stylewidth: 600px;height: 100px/textareap stylevertical-align: top收到的群聊消息:/ptextarea idgroupMsgReceived stylewidth: 600px;height: 100px/textareap stylevertical-align: top收到的私聊消息:/ptextarea idprivateMsgReceived stylewidth: 600px;height: 200px/textarea/divdiv stylemargin-top: 5px;button onclickconnect()连接/buttonbutton onclicksendGroupMessage()发送群聊消息/buttonbutton onclicksendPrivateMessage()发送私聊消息/buttonbutton onclickdisconnect()断开连接/button/div
/div
script typetext/javascript$(() {$(#msgSent).val();$(#groupMsgReceived).val();$(#privateMsgReceived).val();});let stompClient null;// 连接服务器const connect () {const header {User-ID: new Date().getTime().toString(), User-Name: $(#username).val()};const ws new SockJS(http://localhost:8080/websocket);stompClient Stomp.over(ws);stompClient.connect(header, () subscribeTopic());}// 订阅主题const subscribeTopic () {alert(连接成功!);// 订阅广播消息stompClient.subscribe(/topic/chat-group, function (message) {console.log(Group message received : ${message.body});const resp JSON.parse(message.body);const previousMsg $(#groupMsgReceived).val();$(#groupMsgReceived).val(${previousMsg}${resp.content}\n);});// 订阅单播消息stompClient.subscribe(/user/topic/chat-private, message {console.log(Private message received : ${message.body});const resp JSON.parse(message.body);const previousMsg $(#privateMsgReceived).val();$(#privateMsgReceived).val(${previousMsg}${resp.content}\n);});// 订阅定时推送的单播消息stompClient.subscribe(/user/topic/chat-push, message {console.log(Private message received : ${message.body});const resp JSON.parse(message.body);const previousMsg $(#privateMsgReceived).val();$(#privateMsgReceived).val(${previousMsg}${resp.content}\n);});};// 断连const disconnect () {stompClient.disconnect(() {$(#msgReceived).val(Disconnected from WebSocket server);});}// 发送群聊消息const sendGroupMessage () {const msg {name: $(#username).val(), content: $(#msgSent).val()};stompClient.send(/app/chat/group, {}, JSON.stringify(msg));}// 发送私聊消息const sendPrivateMessage () {const msg {name: $(#username).val(), content: $(#msgSent).val()};stompClient.send(/app/chat/private, {}, JSON.stringify(msg));}
/script
/body
/html四、测试示例
1、群聊、私聊、后台定时推送测试
启动应用程序日志打印显示系统连接建立成功如下 打开浏览器访问http://localhost:8080/page/chat可进入聊天页同时打开两个窗口访问。 2、登录RabbitMQ控制台查看队列信息 可以看到所有消息都发送到了amq.topic交换机上Topic类型 RabbitMQ会为每个连接的客户端创建3个队列。
因为我们在ChatController中定义了三个目的地Routing Key分别是/topic/chat-group、/topic/chat-private、/topic/chat-push。群聊消息目的地/topic/chat-group绑定了两个队列用于实现广播订阅其它两个Routing Key分别绑定到了不同的队列上实现唯一订阅。 五、结语
下一节我们将会详细说明RabbitMQ STOMP适配器支持的各种消息目的地类型的区别以及适用场景。