1.STOMP协议简介常用的WebSocket协议定义了两种传输信息类型:文本信息和二进制信息。类型虽然被确定,但是他们的传输体是没有规定的,也就是说传输体可以自定义成什么样的数据格式都行,只要客户端和服务端约定好,得到数据后能够按照 约定 的语义解析数据就好。相较于Http协议约定好了必须有请求头、请求体这类语义,接收到数据后先要解析头信息,WebSocket协议为了节省带宽和减轻服务器的负担,只要建立了ws连接,双方需要采用提前定义好的传输体格式(比如JSON格式)去解析数据,没有了Http协议那些头信息的解析工作和传输,也就提升了通讯效率。 直接使用WebSocket因为没有高层级的协议,所以需要我们定义应用之间所发送消息的语义,还要确保联机的两端都能遵循。因此Stomp作为文本定向消息协议常常作为子协议用于集成WebSocket,用来定义消息的语义。请求和响应数据格式类似,STOMP帧由命令、一个或多个头信息以及负载所组成。 下图是测试使用Stomp协议从服务端主动推送给客户端信息,这整个消息被称为“帧”,其中:
<<< MESSAGE #命令message-id:6834f845-49dd-480e-8b94-84d31372d744 #头信息subscription:sub-0 #头信息content-length:136 #头信息content-type:application/json #头信息{'content':'此条内容为发送的消息--你好,世界!'} # 负载 由此可以看出Stomp既没有Http协议那样包含了太多的头信息而占用过多的网络带宽,又能提供比较丰富的格式语义方便双方的通讯。 2.Netty整合STOMP存在的问题“即便Netty提供了STOMP编辑码器,但STOMP 是为消息中间件而设计的一种消息传递协议,而Netty 提供的是异步的、事件驱动的网络应用程序框架,并非作为消息代理,因此从功能契合的意义上说Netty不支持 STOMP协议。”———— Apache ActiveMQ 开发人员 Justin Bertram 不过实际开发中如果只是单纯的采用Netty+WebSocket会导致通信形式层级过低从而导致通信缺乏可靠性,所以Spring社区就提倡将Stomp作为WebSocket子协议增强通信的消息语义,《Spring实战》也给出了如何通集成Spring、WebSocket、Stomp如何进行集成开发。但是Netty+WebSocket如何将Stomp作为子协议,本人搜索了Google、Stack Overflow后并没有具体的范例,直至后面在Netty的GitHub社区中看到了简单的范例说明,因此特意记录下来希望帮助到有需要的人。 具体代码可参考: 3.Netty消息通讯的握手原理在集成STOMP协议过程中,最重要的是官方提供的
让我们进入 public WebSocketServerProtocolHandler(String websocketPath, String subprotocols) { this(websocketPath, subprotocols, 10000L); } 此方法最终调用的是(代码3-2):
其中 public WebSocketServerProtocolConfig build() { return new WebSocketServerProtocolConfig( websocketPath, subprotocols, checkStartsWith, handshakeTimeoutMillis, forceCloseTimeoutMillis, handleCloseFrames, sendCloseFrame, dropPongFrames, decoderConfigBuilder == null ? decoderConfig : decoderConfigBuilder.build() ); } 此处为了读者能理解后续的代码执行流程先插入一个知识点,ChannelHandler 回调方法的执行顺序:
handlerAdded ()、channelRegistered ()、channelActive ()只会在生命周期执行一次,后面便不会再被执行。 具体可以查看链接: 在Netty的ChannelHandler生命周期中handlerAdded()方法的执行是在检测到新请求连接时添加一个handler处理器到链表之中。经过
在上述代码中因为handlerAdded中判断第一次连接时,Pipeline管道内不存在 最终形成 我们先不管Utf8FrameValidator,着重分析 @Override public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { final HttpObject httpObject = (HttpObject) msg; if (httpObject instanceof HttpRequest) { //获取当前请求信息 final HttpRequest req = (HttpRequest) httpObject; //进行比对请求路径,验证协议 isWebSocketPath = isWebSocketPath(req); if (!isWebSocketPath) { ctx.fireChannelRead(msg); return; } try { ... ... //创建握手工厂,从参数可以看出:webSocketURL--webSocket的路径地址、subprotocols--添加的自定义子协议、decoderConfig解码设置 final WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory( getWebSocketLocation(ctx.pipeline(), req, serverConfig.websocketPath()), serverConfig.subprotocols(), serverConfig.decoderConfig()); ////创建一个握手处理器 final WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(req); //握手回调 final ChannelPromise localHandshakePromise = handshakePromise; if (handshaker == null) { WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); } else { //设置处理器 WebSocketServerProtocolHandler.setHandshaker(ctx.channel(), handshaker); ctx.pipeline().remove(this); //ChannelFuture的作用是用来保存Channel异步操作的结果。在Netty中所有的I/O操作都是异步的。ChannelFuture代表了异步计算的结果,这个接口的主要方法就是检查计算是否已完成,等待计算然后返回计算结果。 final ChannelFuture handshakeFuture = handshaker.handshake(ctx.channel(), req); //进行结果监听 handshakeFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { ... ... //如果发送成功情况,保持兼容性触发事件 localHandshakePromise.trySuccess(); //如果发送成功情况,则触发握手成功事件 ctx.fireUserEventTriggered( new WebSocketServerProtocolHandler.HandshakeComplete( req.uri(), req.headers(), handshaker.selectedSubprotocol())); } }); applyHandshakeTimeout(); } } } 以下是笔者先用SpirngBoot+WebSocket整合Stomp协议做了基本模板,我们可以看到在相应头中存在sec-websocket-protocol属性,其意义就是所携带的STOMP协议和其版本信息: 如本文开头所说,STOMP协议意在让WebSocket协议变得可靠一些,通过客户端与服务端达成的语义相互确认后才进行连接通讯。因此如何获取客户端请求中的子协议信息并让服务端匹配确认,这一步骤在握手环节过程中是非常重要的。 所以,在上述代码中 wsFactory.newHandshaker(req) 、 handshaker.handshake(ctx.channel(), req) 两个方法实现了我们想要的效果。 首先是newHandshaker方法,它是属于
之后的handshaker.handshake(ctx.channel(), req)方法将会开始处理握手,因为其中传入参数req正式客户端发过来的请求,所以此方法部分作用就是用来比对客户端携带的sec-websocket-protocol属性值是否与服务端自定义属性相对应,如果是则返回握手成功的结果(代码3-6): public ChannelFuture handshake(Channel channel, FullHttpRequest req) { return handshake(channel, req, null, channel.newPromise());}public final ChannelFuture handshake(Channel channel, FullHttpRequest req, HttpHeaders responseHeaders, final ChannelPromise promise) { ... ... FullHttpResponse response = newHandshakeResponse(req, responseHeaders); ... ... return promise;} newHandshakeResponse由
在代码最后调用了selectSubprotocol(protocol),传入的参数是客户端发来的请求中携带的sec-websocket-protocol属性值(代码3-8): protected String selectSubprotocol(String requestedSubprotocols) { ... ... //将sec-websocket-protocol属性值进行分割 String[] requestedSubprotocolArray = requestedSubprotocols.split(','); for (String p: requestedSubprotocolArray) { String requestedSubprotocol = p.trim(); for (String supportedSubprotocol: subprotocols) { if (SUB_PROTOCOL_WILDCARD.equals(supportedSubprotocol) || requestedSubprotocol.equals(supportedSubprotocol)) { selectedSubprotocol = requestedSubprotocol; return requestedSubprotocol; } } } return null; } 我们可以注意到if中进行了 所以根据上述的流程,我们应当创建一个StompVersion和一个握手事件成功后触发的处理器,分别用于握手前传入 4.Netty如何兼容STOMP协议根据上述代码流程我们可以发现,当确认握手成功建立连接后,就会触发握手成功事件。因此我门只需要编写事件触发后接收事件的处理器,并添加到pipeline(返回看3-1代码)中(代码4-1):
在StompChatHandler处理器中(代码过长,直接下载文章开头的势力代码进行阅读即可),我们根据STOMP官方规定的命令(CONNECT、SUBSCRIBE、SEND、UNSUBSCRIBE、DISCONNECT)进行判断和对消息内容的处理。以下以SEND命令为例(代码4-2): private final ConcurrentMap<String, Set<StompSubscription>> chatDestinations = new ConcurrentHashMap<String, Set<StompSubscription>>(); private void onSend(ChannelHandlerContext ctx, StompFrame inboundFrame) { //获取目的地名称 String destination = inboundFrame.headers().getAsString(DESTINATION); if (destination == null) { sendErrorFrame('missed header', 'required 'destination' header missed', ctx); return; } //根据目的地名称获取相应订阅地址信息 Set<StompSubscription> subscriptions = chatDestinations.get(destination); for (StompSubscription subscription : subscriptions) { subscription.channel().writeAndFlush(transformToMessage(inboundFrame, subscription)); } } private static StompFrame transformToMessage(StompFrame sendFrame, StompSubscription subscription) { Charset charset = StandardCharsets.UTF_8; //设置消息命令为MESSAGE,消息文本内容副本,并返回DefaultStompFrame类型 StompFrame messageFrame = new DefaultStompFrame(StompCommand.MESSAGE, sendFrame.content().retainedDuplicate()); String id = UUID.randomUUID().toString(); //设置STOMP帧的头信息,具体参照文章开始STOMP帧格式 messageFrame.headers() .set(MESSAGE_ID, id) .set(SUBSCRIPTION, subscription.id()) .set(CONTENT_LENGTH, Integer.toString(messageFrame.content().readableBytes())); //获取STOMP内容类型,默认未BYTEBUF CharSequence contentType = sendFrame.headers().get(CONTENT_TYPE); if (contentType != null) { //如果有,则也设置入头中 messageFrame.headers().set(CONTENT_TYPE, contentType); } //返回设置好的消息格式 return messageFrame; } 进入DefaultStompFrame可知,我们需要发送的文本被转化成ByteBuf类型。如果默认采用此类型而不加以设置会导致当内容为中文时产生乱码。
如上问题,我们需要编写一个编码器 @Override protected WebSocketFrame convertFullFrame(StompFrame original, ByteBuf encoded) { //如果content-type消息头类型为text或则是application/json,则采用TextWebSocketFrame编码方式 if (isTextFrame(original)) { return new TextWebSocketFrame(encoded); } //否则采用BinaryWebSocketFrame(二进制)编码方式 return new BinaryWebSocketFrame(encoded); } private static boolean isTextFrame(StompHeadersSubframe headersSubframe) { //判断STOMP帧是否携带content-type消息头,并且对消息类型进行判断 String contentType = headersSubframe.headers().getAsString(StompHeaders.CONTENT_TYPE); return contentType != null && (contentType.startsWith('text') || contentType.startsWith('application/json')); } 为什么发送消息出去要用的编码器而不是解码器,我们以下面这流程图为例: 5.编写StompVersion即使Spring官方推荐采用SockJS以兼容更多的浏览器,但考虑到目前主流的浏览器都可以支持WebSocket协议,因此笔者便放弃采用SockJS+ Stomp.over()创建客户端,而是选用Stomp.client(url)。前者与后者区别主要在于Stomp.client(url)调用时采用的是浏览器自身所支持的不同的WebSocket协议,而不是教由SockJS统一实现。 根据源码可以看到STOMP当的默认版本是 ['v10.stomp', 'v11.stomp'] ,因此要让Netty兼容STOMP则必须让服务端明白当前采用STOMP协议与其对应的版本,可以看到STOMP协议版本:v10.stomp、v11.stomp。
还记得(代码3-1)与(代码4-1)中 我们需要传递附加的子协议吗?这便是为什么需要编写StompVersion(代码5-1): public enum StompVersion { STOMP_V11('1.1', 'v11.stomp'), STOMP_V12('1.2', 'v12.stomp'); //获取key为stomp_version的常量。如果没有这样的常量,将创建并返回一个新的常量。 public static final AttributeKey<StompVersion> CHANNEL_ATTRIBUTE_KEY = AttributeKey.valueOf('stomp_version'); public static final String SUB_PROTOCOLS; static { List<String> subProtocols = new ArrayList<String>(values().length); for (StompVersion stompVersion : values()) { subProtocols.add(stompVersion.subProtocol); } //将枚举类型转化为字符串格式: v11.stomp,v12.stomp SUB_PROTOCOLS = StringUtil.join(',', subProtocols).toString(); } private final String version; private final String subProtocol; StompVersion(String version, String subProtocol) { this.version = version; this.subProtocol = subProtocol; } public String version() { return version; } public String subProtocol() { return subProtocol; } //根据subProtocol返回匹配上的枚举 public static StompVersion findBySubProtocol(String subProtocol) { if (subProtocol != null) { for (StompVersion stompVersion : values()) { if (stompVersion.subProtocol().equals(subProtocol)) { return stompVersion; } } } } 在代码(代码4-1)中,即握手事件成功时,我们需要比对并获取当前Stomp协议的版本,并将该版本信息设置进CHANNEL_ATTRIBUTE_KEY中:
而当客户端真正连接服务端,发送CONNECT命令时,就会触发StompChatHandler代码中onSend方法(代码5-2): private static void onConnect(ChannelHandlerContext ctx, StompFrame inboundFrame) { String acceptVersions = inboundFrame.headers().getAsString(ACCEPT_VERSION); //获取当前采用的STOMP协议版本 StompVersion handshakeAcceptVersion = ctx.channel().attr(StompVersion.CHANNEL_ATTRIBUTE_KEY).get(); StompFrame connectedFrame = new DefaultStompFrame(StompCommand.CONNECTED); //设置头信息 connectedFrame.headers() .set(VERSION, handshakeAcceptVersion.version()) .set(SERVER, 'Netty-Server') .set(HEART_BEAT, '0,0'); ctx.writeAndFlush(connectedFrame); } 这时候客户端会接受到服务端发送来封装好的帧,而这些帧正如HTTP中的头信息携带语义,可以让客户端得知当前连接成功以及开始后续其他操作:
自此,整个Netty整合STOMP协议告一段落,因为除了Netty官网的示例外,暂时没有看到有其他博客对此有说明记录,因此尽兴了比较详细的讲解。内容或许过于分散,希望读者跟着源码进行阅读理解。 |
|