本文作者芋艿,原题“芋道 Spring Boot WebSocket 入门”(原文链接见文末)。即时通讯网收录时有修订和改动,感谢原作者。
cover.png (9.15 KB, 下载次数: 1398)
下载附件 保存到相册
3 年前 上传
这里有个误区,WebSocket 相比普通的 Socket 来说,仅仅是借助 HTTP 协议完成握手,创建连接。后续的所有通信,都和 HTTP 协议无关。
x2.png (60.07 KB, 下载次数: 1361)
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 [url=http://maven.apache.org/xsd/maven-4.0.0.xsd]http://maven.apache.org/xsd/maven-4.0.0.xsd[/url]"> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.10.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>lab-25-01</artifactId> <dependencies> <!-- 实现对 WebSocket 相关依赖的引入,方便~ --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency> <!-- 引入 Fastjson ,实现对 JSON 的序列化,因为后续我们会使用它解析消息 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.62</version> </dependency> </dependencies> </project>
// WebsocketServerEndpoint.java @Controller @ServerEndpoint("/") public class WebsocketServerEndpoint { private Logger logger = LoggerFactory.getLogger(getClass()); @OnOpen public void onOpen(Session session, EndpointConfig config) { logger.info("[onOpen][session({}) 接入]", session); } @OnMessage public void onMessage(Session session, String message) { logger.info("[onOpen][session({}) 接收到一条消息({})]", session, message); // 生产环境下,请设置成 debug 级别 } @OnClose public void onClose(Session session, CloseReason closeReason) { logger.info("[onClose][session({}) 连接关闭。关闭原因是({})}]", session, closeReason); } @OnError public void onError(Session session, Throwable throwable) { logger.info("[onClose][session({}) 发生异常]", session, throwable); } }
// WebSocketConfiguration.java @Configuration // @EnableWebSocket // 无需添加该注解,因为我们并不是使用 Spring WebSocket public class WebSocketConfiguration { @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }
// Application.java @SpringBootApplication public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
1.png (32.27 KB, 下载次数: 1435)
{ type: "", // 消息类型 body: {} // 消息体 }
// Message.java public interface Message { }
// AuthRequest.java public class AuthRequest implements Message { public static final String TYPE = "AUTH_REQUEST"; /** * 认证 Token */ private String accessToken; // ... 省略 set/get 方法 }
// AuthResponse.java public class AuthResponse implements Message { public static final String TYPE = "AUTH_RESPONSE"; /** * 响应状态码 */ private Integer code; /** * 响应提示 */ private String message; // ... 省略 set/get 方法 }
// UserJoinNoticeRequest.java public class UserJoinNoticeRequest implements Message { public static final String TYPE = "USER_JOIN_NOTICE_REQUEST"; /** * 昵称 */ private String nickname; // ... 省略 set/get 方法 }
// SendToOneRequest.java public class SendToOneRequest implements Message { public static final String TYPE = "SEND_TO_ONE_REQUEST"; /** * 发送给的用户 */ private String toUser; /** * 消息编号 */ private String msgId; /** * 内容 */ private String content; // ... 省略 set/get 方法 }
// SendToAllRequest.java public class SendToAllRequest implements Message { public static final String TYPE = "SEND_TO_ALL_REQUEST"; /** * 消息编号 */ private String msgId; /** * 内容 */ private String content; // ... 省略 set/get 方法 }
// SendResponse.java public class SendResponse implements Message { public static final String TYPE = "SEND_RESPONSE"; /** * 消息编号 */ private String msgId; /** * 响应状态码 */ private Integer code; /** * 响应提示 */ private String message; // ... 省略 set/get 方法 }
// SendResponse.java public class SendToUserRequest implements Message { public static final String TYPE = "SEND_TO_USER_REQUEST"; /** * 消息编号 */ private String msgId; /** * 内容 */ private String content; // ... 省略 set/get 方法 }
// MessageHandler.java public interface MessageHandler<T extends Message> { /** * 执行处理消息 * * @param session 会话 * @param message 消息 */ void execute(Session session, T message); /** * @return 消息类型,即每个 Message 实现类上的 TYPE 静态字段 */ String getType(); }
// AuthMessageHandler.java @Component public class AuthMessageHandler implements MessageHandler<AuthRequest> { @Override public void execute(Session session, AuthRequest message) { // 如果未传递 accessToken if (StringUtils.isEmpty(message.getAccessToken())) { WebSocketUtil.send(session, AuthResponse.TYPE, new AuthResponse().setCode(1).setMessage("认证 accessToken 未传入")); return; } // 添加到 WebSocketUtil 中 WebSocketUtil.addSession(session, message.getAccessToken()); // 考虑到代码简化,我们先直接使用 accessToken 作为 User // 判断是否认证成功。这里,假装直接成功 WebSocketUtil.send(session, AuthResponse.TYPE, new AuthResponse().setCode(0)); // 通知所有人,某个人加入了。这个是可选逻辑,仅仅是为了演示 WebSocketUtil.broadcast(UserJoinNoticeRequest.TYPE, new UserJoinNoticeRequest().setNickname(message.getAccessToken())); // 考虑到代码简化,我们先直接使用 accessToken 作为 User } @Override public String getType() { return AuthRequest.TYPE; } }
// SendToOneRequest.java @Component public class SendToOneHandler implements MessageHandler<SendToOneRequest> { @Override public void execute(Session session, SendToOneRequest message) { // 这里,假装直接成功 SendResponse sendResponse = new SendResponse().setMsgId(message.getMsgId()).setCode(0); WebSocketUtil.send(session, SendResponse.TYPE, sendResponse); // 创建转发的消息 SendToUserRequest sendToUserRequest = new SendToUserRequest().setMsgId(message.getMsgId()) .setContent(message.getContent()); // 广播发送 WebSocketUtil.send(message.getToUser(), SendToUserRequest.TYPE, sendToUserRequest); } @Override public String getType() { return SendToOneRequest.TYPE; } }
// SendToAllRequest.java @Component public class SendToAllHandler implements MessageHandler<SendToAllRequest> { @Override public void execute(Session session, SendToAllRequest message) { // 这里,假装直接成功 SendResponse sendResponse = new SendResponse().setMsgId(message.getMsgId()).setCode(0); WebSocketUtil.send(session, SendResponse.TYPE, sendResponse); // 创建转发的消息 SendToUserRequest sendToUserRequest = new SendToUserRequest().setMsgId(message.getMsgId()) .setContent(message.getContent()); // 广播发送 WebSocketUtil.broadcast(SendToUserRequest.TYPE, sendToUserRequest); } @Override public String getType() { return SendToAllRequest.TYPE; } }
x1.png (35.3 KB, 下载次数: 1418)
// WebsocketServerEndpoint.java /** * 消息类型与 MessageHandler 的映射 * * 注意,这里设置成静态变量。虽然说 WebsocketServerEndpoint 是单例,但是 Spring Boot 还是会为每个 WebSocket 创建一个 WebsocketServerEndpoint Bean 。 */ private static final Map<String, MessageHandler> HANDLERS = new HashMap<>(); @Autowired private ApplicationContext applicationContext; @Override public void afterPropertiesSet() throws Exception { // 通过 ApplicationContext 获得所有 MessageHandler Bean applicationContext.getBeansOfType(MessageHandler.class).values() // 获得所有 MessageHandler Bean .forEach(messageHandler -> HANDLERS.put(messageHandler.getType(), messageHandler)); // 添加到 handlers 中 logger.info("[afterPropertiesSet][消息处理器数量:{}]", HANDLERS.size()); }
// WebsocketServerEndpoint.java @OnOpen public void onOpen(Session session, EndpointConfig config) { logger.info("[onOpen][session({}) 接入]", session); // <1> 解析 accessToken List<String> accessTokenValues = session.getRequestParameterMap().get("accessToken"); String accessToken = !CollectionUtils.isEmpty(accessTokenValues) ? accessTokenValues.get(0) : null; // <2> 创建 AuthRequest 消息类型 AuthRequest authRequest = new AuthRequest().setAccessToken(accessToken); // <3> 获得消息处理器 MessageHandler<AuthRequest> messageHandler = HANDLERS.get(AuthRequest.TYPE); if (messageHandler == null) { logger.error("[onOpen][认证消息类型,不存在消息处理器]"); return; } messageHandler.execute(session, authRequest); }
2.png (65.55 KB, 下载次数: 1399)
// WebsocketServerEndpoint.java @OnMessage public void onMessage(Session session, String message) { logger.info("[onOpen][session({}) 接收到一条消息({})]", session, message); // 生产环境下,请设置成 debug 级别 try { // <1> 获得消息类型 JSONObject jsonMessage = JSON.parseObject(message); String messageType = jsonMessage.getString("type"); // <2> 获得消息处理器 MessageHandler messageHandler = HANDLERS.get(messageType); if (messageHandler == null) { logger.error("[onMessage][消息类型({}) 不存在消息处理器]", messageType); return; } // <3> 解析消息 Class<? extends Message> messageClass = this.getMessageClass(messageHandler); // <4> 处理消息 Message messageObj = JSON.parseObject(jsonMessage.getString("body"), messageClass); messageHandler.execute(session, messageObj); } catch (Throwable throwable) { logger.info("[onMessage][session({}) message({}) 发生异常]", session, throwable); } }
// WebsocketServerEndpoint.java private Class<? extends Message> getMessageClass(MessageHandler handler) { // 获得 Bean 对应的 Class 类名。因为有可能被 AOP 代理过。 Class<?> targetClass = AopProxyUtils.ultimateTargetClass(handler); // 获得接口的 Type 数组 Type[] interfaces = targetClass.getGenericInterfaces(); Class<?> superclass = targetClass.getSuperclass(); while ((Objects.isNull(interfaces) || 0 == interfaces.length) && Objects.nonNull(superclass)) { // 此处,是以父类的接口为准 interfaces = superclass.getGenericInterfaces(); superclass = targetClass.getSuperclass(); } if (Objects.nonNull(interfaces)) { // 遍历 interfaces 数组 for (Type type : interfaces) { // 要求 type 是泛型参数 if (type instanceof ParameterizedType) { ParameterizedType parameterizedType = (ParameterizedType) type; // 要求是 MessageHandler 接口 if (Objects.equals(parameterizedType.getRawType(), MessageHandler.class)) { Type[] actualTypeArguments = parameterizedType.getActualTypeArguments(); // 取首个元素 if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) { return (Class<Message>) actualTypeArguments[0]; } else { throw new IllegalStateException(String.format("类型(%s) 获得不到消息类型", handler)); } } } } } throw new IllegalStateException(String.format("类型(%s) 获得不到消息类型", handler)); }
{ type: "SEND_TO_ONE_REQUEST", body: { toUser: "番茄", msgId: "eaef4a3c-35dd-46ee-b548-f9c4eb6396fe", content: "我是一条单聊消息" } }
{ type: "SEND_TO_ALL_REQUEST", body: { msgId: "838e97e1-6ae9-40f9-99c3-f7127ed64747", content: "我是一条群聊消息" } }
3.png (77.71 KB, 下载次数: 1362)
// WebsocketServerEndpoint.java @OnClose public void onClose(Session session, CloseReason closeReason) { logger.info("[onClose][session({}) 连接关闭。关闭原因是({})}]", session, closeReason); WebSocketUtil.removeSession(session); }
// WebsocketServerEndpoint.java @OnError public void onError(Session session, Throwable throwable) { logger.info("[onClose][session({}) 发生异常]", session, throwable); }
x3.png (64.04 KB, 下载次数: 1407)
// DemoWebSocketShakeInterceptor.java public class DemoWebSocketShakeInterceptor extends HttpSessionHandshakeInterceptor { @Override // 拦截 Handshake 事件 public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception { // 获得 accessToken if (request instanceof ServletServerHttpRequest) { ServletServerHttpRequest serverRequest = (ServletServerHttpRequest) request; attributes.put("accessToken", serverRequest.getServletRequest().getParameter("accessToken")); } // 调用父方法,继续执行逻辑 return super.beforeHandshake(request, response, wsHandler, attributes); } }
x4.png (39.39 KB, 下载次数: 1429)
// WebSocketConfiguration.java @Configuration @EnableWebSocket // 开启 Spring WebSocket public class WebSocketConfiguration implements WebSocketConfigurer { @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { registry.addHandler(this.webSocketHandler(), "/") // 配置处理器 .addInterceptors(new DemoWebSocketShakeInterceptor()) // 配置拦截器 .setAllowedOrigins("*"); // 解决跨域问题 } @Bean public DemoWebSocketHandler webSocketHandler() { return new DemoWebSocketHandler(); } @Bean public DemoWebSocketShakeInterceptor webSocketShakeInterceptor() { return new DemoWebSocketShakeInterceptor(); } }
来源:即时通讯网 - 即时通讯开发者社区!
轻量级开源移动端即时通讯框架。
快速入门 / 性能 / 指南 / 提问
轻量级Web端即时通讯框架。
详细介绍 / 精编源码 / 手册教程
移动端实时音视频框架。
详细介绍 / 性能测试 / 安装体验
基于MobileIMSDK的移动IM系统。
详细介绍 / 产品截图 / 安装体验
一套产品级Web端IM系统。
详细介绍 / 产品截图 / 演示视频
引用此评论
精华主题数超过100个。
连续任职达2年以上的合格正式版主
为论区做出突出贡献的开发者、版主等。
Copyright © 2014-2024 即时通讯网 - 即时通讯开发者社区 / 版本 V4.4
苏州网际时代信息科技有限公司 (苏ICP备16005070号-1)
Processed in 0.152334 second(s), 46 queries , Gzip On.