找回密码
 会员注册
查看: 7|回复: 0

深入剖析SpringWebFlux

[复制链接]

2万

主题

0

回帖

7万

积分

超级版主

积分
72632
发表于 2024-10-5 09:55:56 | 显示全部楼层 |阅读模式
Zhou Changqing一、WebFlux 简介WebFlux 是 Spring Framework5.0 中引入的一种新的反应式Web框架。通过Reactor项目实现Reactive Streams规范,完全异步和非阻塞框架。本身不会加快程序执行速度,但在高并发情况下借助异步IO能够以少量而稳定的线程处理更高的吞吐,规避文件IO/网络IO阻塞带来的线程堆积。1.1 WebFlux 的特性WebFlux 具有以下特性:异步非阻塞 - 可以举一个上传例子。相对于 Spring MVC 是同步阻塞IO模型,Spring WebFlux这样处理:线程发现文件数据没传输好,就先做其他事情,当文件准备好时通知线程来处理(这里就是输入非阻塞方式),当接收完并写入磁盘(该步骤也可以采用异步非阻塞方式)完毕后再通知线程来处理响应(这里就是输出非阻塞方式)。响应式函数编程 - 相对于Java8 Stream 同步、阻塞的Pull模式,Spring Flux 采用Reactor Stream 异步、非阻塞Push模式。书写采用 Java lambda 方式,接近自然语言形式且容易理解。不拘束于Servlet - 可以运行在传统的Servlet 容器(3.1+版本),还能运行在Netty、Undertow等NIO容器中。1.2 WebFlux 的设计目标适用高并发高吞吐量可伸缩性二、Spring WebFlux 组件介绍2.1 HTTPHandler一个简单的处理请求和响应的抽象,用来适配不同HTTP服务容器的API。2.2 WebHandler一个用于处理业务请求抽象接口,定义了一系列处理行为。相关核心实现类如下;2.3 DispatcherHandler请求处理的总控制器,实际工作是由多个可配置的组件来处理。WebFlux是兼容Spring MVC 基于@Controller,@RequestMapping等注解的编程开发方式的,可以做到平滑切换。2.4 Functional Endpoints这是一个轻量级函数编程模型。是基于@Controller,@RequestMapping等注解的编程模型的替代方案,提供一套函数式API 用于创建Router,Handler和Filter。调用处理组件如下:简单的RouterFuntion 路由注册和业务处理过程:@Beanpublic RouterFunction initRouterFunction() { return RouterFunctions.route() .GET("/hello/{name}", serverRequest -> { String name = serverRequest.pathVariable("name"); return ServerResponse.ok().bodyValue(name); }).build();}请求转发处理过程:2.5 Reactive Stream这是一个重要的组件,WebFlux 就是利用Reactor 来重写了传统Spring MVC 逻辑。其中Flux和Mono 是Reactor中两个关键概念。掌握了这两个概念才能理解WebFlux工作方式。Flux和Mono 都实现了Reactor的Publisher接口,属于时间发布者,对消费者提供订阅接口,当有事件发生的时候,Flux或者Mono会通过回调消费者的相应的方法来通知消费者相应的事件。这就是所谓的响应式编程模型。Mono工作流程图只会在发送出单个结果后完成。Flux工作流程图发送出零个或者多个,可能无限个结果后才完成。对于流式媒体类型:application/stream+json 或者 text/event-stream ,可以让调用端获得服务器滚动结果。对于非流类型:application/json WebFlux 默认JSON编码器会将序列化的JSON 一次性刷新到网络,这并不意味着阻塞,因为结果Flux 是以反应式方式写入网络的,没有任何障碍。三、WebFlux 工作原理3.1 组件装配过程流程相关源码解析-WebFluxAutoConfiguration@Configuration//条件装配 只有启动的类型是REACTIVE时加载@ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.REACTIVE)//只有存在 WebFluxConfigurer实例 时加载@ConditionalOnClass(WebFluxConfigurer.class)//在不存在 WebFluxConfigurationSupport实例时 加载@ConditionalOnMissingBean({ WebFluxConfigurationSupport.class })//在之后装配@AutoConfigureAfter({ ReactiveWebServerFactoryAutoConfiguration.class, CodecsAutoConfiguration.class, ValidationAutoConfiguration.class })//自动装配顺序@AutoConfigureOrder(Ordered.HIGHEST_PRECEDENCE + 10)public class WebFluxAutoConfiguration { @Configuration @EnableConfigurationProperties({ ResourceProperties.class, WebFluxProperties.class }) //接口编程 在装配WebFluxConfig 之前要先 装配EnableWebFluxConfiguration @Import({ EnableWebFluxConfiguration.class }) public static class WebFluxConfig implements WebFluxConfigurer { //隐藏部分源码 /** * Configuration equivalent to {@code @EnableWebFlux}. */ } @Configuration public static class EnableWebFluxConfiguration extends DelegatingWebFluxConfiguration { //隐藏部分代码 } @Configuration @ConditionalOnEnabledResourceChain static class ResourceChainCustomizerConfiguration { //隐藏部分代码 } private static class ResourceChainResourceHandlerRegistrationCustomizer implements ResourceHandlerRegistrationCustomizer { //隐藏部分代码 }WebFluxAutoConfiguration 自动装配时先自动装配EnableWebFluxConfiguration而EnableWebFluxConfiguration->-> DelegatingWebFluxConfiguration-> WebFluxConfigurationSupport。最终WebFluxConfigurationSupport 不仅配置DispatcherHandler 还同时配置了其他很多WebFlux核心组件包括 异常处理器WebExceptionHandler,映射处理器处理器HandlerMapping,请求适配器HandlerAdapter,响应处理器HandlerResultHandler 等。DispatcherHandler 创建初始化过程如下;public class WebFluxConfigurationSupport implements ApplicationContextAware { //隐藏部分代码 @Nullable public final ApplicationContext getApplicationContext() { return this.applicationContext; } //隐藏部分代码 @Bean public DispatcherHandler webHandler() { return new DispatcherHandler(); }public class DispatcherHandler implements WebHandler, ApplicationContextAware { @Nullable private List handlerMappings; @Nullable private List handlerAdapters; @Nullable private List resultHandlers; @Override public void setApplicationContext(ApplicationContext applicationContext) { initStrategies(applicationContext); } protected void initStrategies(ApplicationContext context) { //注入handlerMappings Map mappingBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors( context, HandlerMapping.class, true, false); ArrayList mappings = new ArrayList(mappingBeans.values()); AnnotationAwareOrderComparator.sort(mappings); this.handlerMappings = Collections.unmodifiableList(mappings); //注入handlerAdapters Map adapterBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors( context, HandlerAdapter.class, true, false); this.handlerAdapters = new ArrayList(adapterBeans.values()); AnnotationAwareOrderComparator.sort(this.handlerAdapters); //注入resultHandlers Map beans = BeanFactoryUtils.beansOfTypeIncludingAncestors( context, HandlerResultHandler.class, true, false); this.resultHandlers = new ArrayList(beans.values()); AnnotationAwareOrderComparator.sort(this.resultHandlers); }流程相关源码解析HTTPHandlerAutoConfiguration上面已讲解过WebFlux 核心组件装载过程,那么这些组件又是什么时候注入到对应的容器上下文中的呢?其实是在刷新容器上下文时注入进去的。org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext#onRefreshpublic class ReactiveWebServerApplicationContext extends GenericReactiveWebApplicationContext implements ConfigurableWebServerApplicationContext { @Override protected void onRefresh() { super.onRefresh(); try { createWebServer(); } catch (Throwable ex) { throw new ApplicationContextException("Unable to start reactive web server", ex); } } private void createWebServer() { WebServerManager serverManager = this.serverManager; if (serverManager == null) { String webServerFactoryBeanName = getWebServerFactoryBeanName(); ReactiveWebServerFactory webServerFactory = getWebServerFactory(webServerFactoryBeanName); boolean lazyInit = getBeanFactory().getBeanDefinition(webServerFactoryBeanName).isLazyInit(); // 这里创建容器管理时注入httpHandler this.serverManager = new WebServerManager(this, webServerFactory, this::getHttpHandler, lazyInit); getBeanFactory().registerSingleton("webServerGracefulShutdown", new WebServerGracefulShutdownLifecycle(this.serverManager)); // 注册一个 web容器启动服务类,该类继承了SmartLifecycle getBeanFactory().registerSingleton("webServerStartStop", new WebServerStartStopLifecycle(this.serverManager)); } initPropertySources(); } protected HttpHandler getHttpHandler() { String[] beanNames = getBeanFactory().getBeanNamesForType(HttpHandler.class); if (beanNames.length == 0) { throw new ApplicationContextException( "Unable to start ReactiveWebApplicationContext due to missing HttpHandler bean."); } if (beanNames.length > 1) { throw new ApplicationContextException( "Unable to start ReactiveWebApplicationContext due to multiple HttpHandler beans : " + StringUtils.arrayToCommaDelimitedString(beanNames)); } //容器上下文获取httpHandler return getBeanFactory().getBean(beanNames[0], HttpHandler.class); }而这个HTTPHandler是由HTTPHandlerAutoConfiguration装配进去的。@Configuration@ConditionalOnClass({ DispatcherHandler.class, HttpHandler.class })@ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.REACTIVE)@ConditionalOnMissingBean(HttpHandler.class)@AutoConfigureAfter({ WebFluxAutoConfiguration.class })@AutoConfigureOrder(Ordered.HIGHEST_PRECEDENCE + 10)public class HttpHandlerAutoConfiguration { @Configuration public static class AnnotationConfig { private ApplicationContext applicationContext; public AnnotationConfig(ApplicationContext applicationContext) { this.applicationContext = applicationContext; } //构建WebHandler @Bean public HttpHandler httpHandler() { return WebHttpHandlerBuilder.applicationContext(this.applicationContext) .build(); } }流程相关源码解析-web容器org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext#createWebServer 。在创建WebServerManager 容器管理器时会获取对应web容器实例,并注入响应的HTTPHandler。class WebServerManager { private final ReactiveWebServerApplicationContext applicationContext; private final DelayedInitializationHttpHandler handler; private final WebServer webServer; WebServerManager(ReactiveWebServerApplicationContext applicationContext, ReactiveWebServerFactory factory, Supplier handlerSupplier, boolean lazyInit) { this.applicationContext = applicationContext; Assert.notNull(factory, "Factory must not be null"); this.handler = new DelayedInitializationHttpHandler(handlerSupplier, lazyInit); this.webServer = factory.getWebServer(this.handler); }}以Tomcat 容器为例展示创建过程,使用的是 TomcatHTTPHandlerAdapter 来连接Servlet 请求到HTTPHandler组件。public class TomcatReactiveWebServerFactory extends AbstractReactiveWebServerFactory implements ConfigurableTomcatWebServerFactory { //隐藏部分代码 @Override public WebServer getWebServer(HttpHandler httpHandler) { if (this.disableMBeanRegistry) { Registry.disableRegistry(); } Tomcat tomcat = new Tomcat(); File baseDir = (this.baseDirectory != null)this.baseDirectory : createTempDir("tomcat"); tomcat.setBaseDir(baseDir.getAbsolutePath()); Connector connector = new Connector(this.protocol); connector.setThrowOnFailure(true); tomcat.getService().addConnector(connector); customizeConnector(connector); tomcat.setConnector(connector); tomcat.getHost().setAutoDeploy(false); configureEngine(tomcat.getEngine()); for (Connector additionalConnector : this.additionalTomcatConnectors) { tomcat.getService().addConnector(additionalConnector); } TomcatHttpHandlerAdapter servlet = new TomcatHttpHandlerAdapter(httpHandler); prepareContext(tomcat.getHost(), servlet); return getTomcatWebServer(tomcat); }}最后Spring容器加载后通过SmartLifecycle实现类 WebServerStartStopLifecycle 来启动Web容器。WebServerStartStopLifecycle 注册过程详见:org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext#createWebServer3.2 完整请求处理流程(引用自:https://blog.csdn.net)该图给出了一个HTTP请求处理的调用链路。是采用Reactor Stream 方式书写,只有最终调用 subscirbe 才真正执行业务逻辑。基于WebFlux 开发时要避免controller 中存在阻塞逻辑。列举下面例子可以看到Spring MVC 和Spring Webflux 之间的请求处理区别。@RestControllerpublicclass TestController { private Logger logger = LoggerFactory.getLogger(this.getClass()); @GetMapping("sync") public String sync() { logger.info("sync method start"); String result = this.execute(); logger.info("sync method end"); return result; } @GetMapping("async/mono") public Mono asyncMono() { logger.info("async method start"); Mono result = Mono.fromSupplier(this::execute); logger.info("async method end"); return result; } private String execute() { try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } return "hello"; }}日志输出2021-05-31 20:14:52.384 INFO 3508 --- [nio-8080-exec-2] c.v.internet.webflux.web.TestController : sync method start2021-05-31 20:14:57.385 INFO 3508 --- [nio-8080-exec-2] c.v.internet.webflux.web.TestController : sync method end2021-05-31 20:15:09.659 INFO 3508 --- [nio-8080-exec-3] c.v.internet.webflux.web.TestController : async method start2021-05-31 20:15:09.660 INFO 3508 --- [nio-8080-exec-3] c.v.internet.webflux.web.TestController : async method end从上面例子可以看出sync() 方法阻塞了请求,而asyncMono() 没有阻塞请求并立刻返回的。asyncMono() 方法具体业务逻辑 被包裹在了Mono 中Supplier中的了。当execute 处理完业务逻辑后通过回调方式响应给浏览器。四、存储支持一旦控制层使用了 Spring Webflux 则安全认证层、数据访问层都必须使用 Reactive API 才真正实现异步非阻塞。NOSQL DatabaseMongoDB(org.springframework.boot:spring-boot-starter-data-mongodb-reactive)。Redis(org.springframework.boot:spring-boot-starter-data-redis-reactive)。Relational DatabaseH2 (io.r2dbc:r2dbc-h2)MariaDB (org.mariadb:r2dbc-mariadb)Microsoft SQL Server (io.r2dbc:r2dbc-mssql)MySQL (dev.miku:r2dbc-mysql)jasync-sql MySQL (com.github.jasync-sql:jasync-r2dbc-mysql)Postgres (io.r2dbc:r2dbc-postgresql)Oracle (com.oracle.database.r2dbcracle-r2dbc)五、总结关于Spring MVC 和Spring WebFlux 测评很多,本文引用下做简单说明。参考:《Spring: Blocking vs non-blocking: R2DBC vs JDBC and WebFlux vs Web MVC》。基本依赖 org.springframework.boot spring-boot-starter-data-r2dbc io.r2dbc r2dbc-pool dev.miku r2dbc- mysql org.springframework.boot spring-boot-starter-data-jdbc org.springframework.boot spring-boot-starter-webflux 相同数据下效果如下;Spring MVC + JDBC 在低并发下表现最好,但 WebFlux + R2DBC 在高并发下每个处理请求使用的内存最少。Spring WebFlux + R2DBC 在高并发下,吞吐量表现优异。END猜你喜欢 神秘又强大的@SpringBootApplication注解复杂多变场景下的Groovy脚本引擎实战详解Apache Dubbo的SPI实现机制
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 会员注册

本版积分规则

QQ|手机版|心飞设计-版权所有:微度网络信息技术服务中心 ( 鲁ICP备17032091号-12 )|网站地图

GMT+8, 2025-1-11 20:41 , Processed in 0.614117 second(s), 25 queries .

Powered by Discuz! X3.5

© 2001-2025 Discuz! Team.

快速回复 返回顶部 返回列表