|
一、前言1、如果我们的app类似于股票这种,数据很多很快,之前用的tomcat自带的websocket又或者spring-boot-starter-websocke集成,但是性能在数据并发很大时就会存在问题。2、我前面写的一篇关于springBoot+webosket的,没有使用netty的文章springBoot使用webSocket的几种方式以及在高并发出现的问题及解决,其中就包含了以下者两种方式,都有说明,大家如果量不大,下面这两种方式也是可以的。tomcat自带的websocketspring-boot-starter-websocke集成二、使用Netty完成webSocket1、如何使用,可以参考netty+webSocket+SpringBott是参考文章SpringBoot整合Netty处理WebSocket(支持url参数)这篇文章是,说的已经很ok了2、但是上面那篇文章还是有所不足,因为我需要加上token认证,只有认证了,才可以建立链接,上面那篇的文章,只是获取参数,在认证方面,还是有所不足,满足不了这个条件。后续我可以把我的方式,写一篇文章放出来2.1、RequestUriUtils的getBasePath方法2、比如你的链接是ws://192.168.172.139:1234/ws/id=1,使用它文章中的获取后得到/ws/,建议改成如下,获取之后是/ws/***获取URI中参数以外部分路径**@paramuri*@return*/publicstaticStringgetBasePath(StringuriStr){StringpathWithSlash="";try{//使用URI解析URL字符串URIuri=newURI(uriStr);//获取路径部分pathWithSlash=uri.getPath();//去掉末尾的斜杠returnpathWithSlash.replaceAll("/$","");}catch(URISyntaxExceptione){log.error("解析path错误",e);}returnpathWithSlash;}12345678910111213141516171819202.2、WebSocketChannelInitializer中的ChannelPipeline说明在WebSocket服务器的构建中添加.addLast(newHttpServerCodec())的主要原因是WebSocket握手是基于HTTP协议的,WebSocket连接的建立需要经过以下步骤:客户端向服务器发送一个HTTP请求,请求升级到WebSocket协议。服务器收到这个请求后,需要进行协议升级处理,将HTTP协议切换到WebSocket协议。一旦升级成功,WebSocket连接建立,客户端和服务器之间可以通过WebSocket协议进行双向通信。因此,WebSocket握手的开始阶段仍然是HTTP请求和响应。为了处理这个初始的HTTP请求,需要在Netty的ChannelPipeline中添加.addLast(newHttpServerCodec()),以确保能够解析和处理这个HTTP请求,并在需要时将其升级为WebSocket连接。简而言之,.addLast(newHttpServerCodec())的作用是为了使WebSocket服务器能够正确地处理WebSocket握手之前的HTTP请求和响应,确保WebSocket连接能够成功建立。一旦WebSocket连接建立,就可以通过WebSocket协议进行实时双向通信。这是WebSocket服务器构建中的一个标准操作。websocket协议本身是基于http协议的,所以这边也要使用http解编码器2.3、addLast(newChunkedWriteHandler()).addLast(newChunkedWriteHandler())是Netty中的一个ChannelHandler,它的主要作用是支持异步写大数据流(例如文件传输)。在某些情况下,你可能需要向客户端发送大量的数据,例如文件的内容,而不是一次性将整个数据写入缓冲区,因为这可能会导致内存占用过高。相反,你可以将数据分成小块(chunk)并逐块写入客户端,以避免内存问题。ChunkedWriteHandler的作用如下:支持大数据流的异步写入:它允许你将数据切割成小块并异步地将这些块写入客户端。这对于传输大型文件或大量数据非常有用,因为它可以避免将整个数据加载到内存中。维护写入顺序:它确保数据块按照它们添加到Channel的顺序进行写入。这有助于保持数据的有序性。提高性能:通过异步写入数据块,ChunkedWriteHandler可以提高网络性能,因为它不会阻塞线程等待数据传输完成。这个处理器通常与其他处理器一起使用,以完成完整的数据传输过程。例如,如果你要实现文件传输,通常会使用ChunkedWriteHandler将文件数据切割成小块,然后使用其他处理器来处理文件的传输,例如文件块的编码和解码。总之,.addLast(newChunkedWriteHandler())的作用是支持异步写大数据流,以提高性能并降低内存使用,尤其在需要传输大量数据时非常有用。2.4、addLast(newHttpObjectAggregator(1024*64))将HttpMessage和HttpContents聚合到一个完成的FullHttpRequest或FullHttpResponse中,具体是FullHttpRequest对象还是FullHttpResponse对象取决于是请求还是响应.addLast(newHttpObjectAggregator(1024*64))是Netty中的一个ChannelHandler,主要用于将HTTP请求或响应的多个部分聚合成一个完整的HTTP消息。这对于处理HTTP消息非常有用,特别是当你需要处理大量的HTTP数据时。以下是.addLast(newHttpObjectAggregator(1024*64))的主要作用:消息聚合:在HTTP通信中,请求或响应可能会分成多个部分(例如,HTTP请求头和HTTP请求体)。HttpObjectAggregator负责将这些部分聚合成一个完整的FullHttpRequest或FullHttpResponse,以便更容易处理和操作。内存管理:这个处理器还具有内存管理功能。你可以在构造函数中指定一个最大的聚合字节数(在示例中是64KB)。如果接收到的HTTP数据超过了这个大小,HttpObjectAggregator将抛出异常以防止内存泄漏。简化HTTP消息处理:聚合HTTP消息使得你可以更容易地处理完整的HTTP请求和响应,而不必手动处理每个部分。这对于构建Web服务器或HTTP代理非常有用。示例使用:pipeline.addLast(newHttpServerCodec());//添加HTTP编解码器pipeline.addLast(newHttpObjectAggregator(1024*64));//聚合HTTP消息,最大64KBpipeline.addLast(newMyHttpRequestHandler());//自定义的HTTP请求处理器123在上面的示例中,首先使用HttpServerCodec添加了HTTP编解码器,然后使用HttpObjectAggregator聚合HTTP消息,最后添加了一个自定义的HTTP请求处理器。总之,.addLast(newHttpObjectAggregator(1024*64))的作用是将HTTP请求或响应的多个部分聚合成一个完整的HTTP消息,以简化和改善处理HTTP消息的流程,并提供内存管理功能。这在构建支持HTTP的应用程序中非常有用。2.5、addLast(newWebSocketServerCompressionHandler())webSocket数据压缩扩展,当添加这个的时候WebSocketServerProtocolHandler的第三个参数需要设置成true.addLast(newWebSocketServerCompressionHandler())是Netty中的一个ChannelHandler,用于支持WebSocket消息的压缩和解压缩。WebSocket消息压缩可以减小消息的大小,提高网络传输效率,尤其在低带宽环境下非常有用。以下是.addLast(newWebSocketServerCompressionHandler())的主要作用:WebSocket消息压缩:当客户端和服务器之间通过WebSocket协议传输大量数据时,可以使用压缩技术将消息压缩为更小的尺寸,以减少网络带宽的使用。WebSocketServerCompressionHandler负责处理消息的压缩。WebSocket消息解压缩:对于接收到的已压缩的WebSocket消息,服务器需要将其解压缩以获取原始消息。WebSocketServerCompressionHandler也负责解压缩已压缩的消息。支持多种压缩算法:WebSocketServerCompressionHandler支持多种压缩算法,包括通常的DEFLATE和GZIP压缩算法,以及自定义的压缩算法。在WebSocket应用程序中,通常需要在WebSocket连接建立时协商是否启用压缩,以及使用哪种压缩算法。如果客户端和服务器都支持压缩,那么它们可以在消息传输过程中启用压缩。要使用.addLast(newWebSocketServerCompressionHandler()),你需要在WebSocket服务器的处理管道中添加该处理器。例如:pipeline.addLast(newHttpServerCodec());//添加HTTP编解码器pipeline.addLast(newHttpObjectAggregator(1024*64));//聚合HTTP消息,最大64KBpipeline.addLast(newWebSocketServerCompressionHandler());//添加WebSocket消息压缩处理器pipeline.addLast(newMyWebSocketHandler());//自定义的WebSocket处理器1234在上面的示例中,首先使用HttpServerCodec添加了HTTP编解码器,然后使用HttpObjectAggregator聚合HTTP消息,接下来添加了WebSocketServerCompressionHandler以支持WebSocket消息压缩,最后添加了一个自定义的WebSocket处理器。总之,.addLast(newWebSocketServerCompressionHandler())的作用是为WebSocket服务器添加消息压缩和解压缩的功能,以减小消息大小并提高网络传输效率。这在需要传输大量数据的WebSocket应用中非常有用。2.6、.addLast(newMyWebSocketHandler())自定义处理器-处理websocket消息(消息的父类是WebSocketFrame,旗下有很多子类,比如BinaryWebSocketFrameTextWebSocketFrame等等)如果你使用的是父类是WebSocketFrame,则需要在其内部,判断是什么类型的数据,如果你使用的具体的子类,那么只有具体的消息类型会到哪里2.7、.addLast(newWebSocketServerProtocolHandler(WebSocketProperties.path,null,true,10485760));服务器端向外暴露的websocket端点,当客户端传递比较大的对象时,maxFrameSize参数的值需要调大WebSocketServerProtocolHandler是Netty中的一个关键组件,用于处理WebSocket握手和协议升级,以及管理WebSocket连接的生命周期。它的主要作用如下:WebSocket握手处理:当客户端通过HTTP请求发起WebSocket握手时,WebSocketServerProtocolHandler负责识别并处理这些握手请求。它可以检查HTTP请求中的升级标头和协议头,以确定是否需要升级到WebSocket协议。WebSocket握手协议升级:如果客户端发送了符合WebSocket握手规范的请求,WebSocketServerProtocolHandler会处理协议升级,将连接从HTTP协议切换到WebSocket协议。这个过程包括升级响应的构建和升级握手的处理。WebSocket生命周期管理:一旦WebSocket握手成功,WebSocketServerProtocolHandler管理WebSocket连接的生命周期。它会处理连接的打开、关闭、异常和消息传递等事件。Ping/Pong处理:WebSocket协议支持Ping和Pong消息,用于保持连接的活动状态。WebSocketServerProtocolHandler会自动处理这些心跳消息,以确保连接保持活动状态。以下是一个示例,展示了如何在Netty中使用WebSocketServerProtocolHandler:pipeline.addLast(newHttpServerCodec());//添加HTTP编解码器pipeline.addLast(newHttpObjectAggregator(1024*64));//聚合HTTP消息,最大64KBpipeline.addLast(newWebSocketServerProtocolHandler("/websocket"));//添加WebSocket握手处理器pipeline.addLast(newMyWebSocketHandler());//自定义的WebSocket处理器1234在上面的示例中,WebSocketServerProtocolHandler被添加到处理管道中,并指定了WebSocket的路径(在示例中是"/websocket")。一旦握手成功,连接将切换到WebSocket协议,并且可以在MyWebSocketHandler中处理WebSocket消息。总之,WebSocketServerProtocolHandler是用于处理WebSocket握手和协议升级的关键组件,它使得在Netty中创建WebSocket服务器变得更加容易。三、WebSocket性能对比——SpringBootvsTomcatvsNetty参考文章WebSocket性能对比——SpringBootvsTomcatvsNetty说的很ok了。四、使用四种框架分别实现百万websocket常连接的服务器(写的很好,必看)1、文章包含了一些线上的参数调整,都是干活原文地址:https://colobu.com/2015/05/22/implement-C1000K-servers-by-spray-netty-undertow-and-node-js/五、七种WebSocket框架的性能比较原文地址:https://colobu.com/2015/07/14/performance-comparison-of-7-websocket-frameworks/六、使用python脚本测试1、主要测试两部分大量客户端同时在线,查看性能,内存消耗问题大量客户端同时在线,数据发送效率2、python安装这里就不再说了3、本文的第四节和第五节请务必了解,需要修改对应的服务器tcp链接数等等参数。4、我的webSocket链接格式是ws://192.168.172.226:7081/ws/token最后的那个token,用于线上的认证,只有认证了的用户,才会建立通道,这里为了方便测试,直接用数值代替,如下,这样,就代表用户id好了,毕竟后续我要是测试50w个客户端,总不能先生成50w个token吧。ws://192.168.172.226:7081/ws/1ws://192.168.172.226:7081/ws/26.1、python脚本1、脚本内容importthreadingimporttimeimportwebsocket#定义带有顺序编号的WebSocketURLurl_base="ws://192.168.172.226:7081/ws/"num_connections=10000#要模拟的连接数running_connections=0#跟踪当前正在运行的连接数#创建线程本地存储对象来存储每个线程的文件名local=threading.local()#建立WebSocket连接的函数defconnect_websocket():globalrunning_connectionstry:#使用顺序编号生成URLurl=url_base+str(running_connections)#为当前线程创建文件名local.filename=f"{running_connections}.txt"whileTrue:#创建WebSocket连接ws=websocket.create_connection(url)whileTrue:#接收来自服务端的消息message=ws.recv()#保存消息到文件withopen(local.filename,"a")asfile:file.write(message+"\n")exceptExceptionase:print(f"WebSocket连接失败:{e}")running_connections-=1#开始模拟WebSocket连接whilerunning_connections0:time.sleep(1)print("所有WebSocket连接完成。")12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152532、运行#安装websocket-clientpipinstallwebsocket-client#运行test.py文件pythontest.py123453、说明脚本作用是生成指定num_connections的webSocket连接数,并一直监听服务端返回的消息,如果服务端有消息就会保存到对应链接的文件夹下面,包含其服务端返回的内容。6.2、netty服务端1、具体的链接的代码我这里就不说了2、主要需要写两个接口,一个接口是向所有在线的客户端发送一条消息,另一个接口是向所有在线的客户端发送指定数量mockCount的消息packagecn.jt.thermalapi.common.controller;importcn.jt.thermalapi.response.Response;importcn.jt.thermalapi.websocket.session.SessionFactory;importio.swagger.annotations.Api;importlombok.extern.slf4j.Slf4j;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.PathVariable;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RestController;importspringfox.documentation.annotations.ApiIgnore;importjava.util.concurrent.atomic.AtomicInteger;/***@authorGXM*@version1.0.0*@DescriptionTODO*@createTime2023年10月13日*/@ApiIgnore@Api(tags="测试api")@RestController@RequestMapping("/test")@Slf4jpublicclassTestController{@GetMapping("mockOne")publicResponsemockOne(){AtomicIntegercount=newAtomicInteger(0);SessionFactory.getSession().broadcast(count.getAndIncrement()+"");returnResponse.buildSuccess();}@GetMapping("mockMany/{mockCount}")publicResponsemockMany(@PathVariable("mockCount")intmockCount){AtomicIntegercount=newAtomicInteger(0);while(count.getAndIncrement()
|
|