|
这是第327篇不掺水的,想要了解更多,请戳下方卡片关注我们吧~什么是重试重试是指,当在一个程序运行过程中,突然遇到了例如网络延迟,中断等情况时,为了保证程序容错性,可用性,一致性等的一个措施,目前主流的框架大多都有一套自己的重试机制,例如 dubbo,mq,Spring 等概要Spring 也自己实现了一套重试机制,Spring Retry 是从 Spring batch 中独立出来的一个功能,主要功能点在于重试和熔断,目前已经广泛应用于 Spring Batch,Spring Integration, Spring for Apache Hadoop 等 Spring 项目。spring retry 提供了注解和编程 两种支持,提供了 RetryTemplate 支持,类似 RestTemplate。整个流程如下:使用介绍Maven 依赖org.springframework.retryspring-retryorg.springframeworkspring-aspects注解使用开启 Retry 功能,需在启动类中使用@EnableRetry注解@SpringBootApplication@EnableRetry@EnableSchedulingpublicclassDemoApplication{publicstaticvoidmain(String[]args){SpringApplication.run(DemoApplication.class,args);}}注解@Retryable需要在重试的代码中加入重试注解@Retryable@Retryable(value=RuntimeException.class)publicvoidtestRetry01()throwsMyException{System.out.println("测试-value属性");thrownewRuntimeException("出现了异常");}默认情况下,会重试 3 次,间隔 1 秒重试配置通过@Retryable注解的属性 可以实现重试配置Value()includevalue 与 include 含义相同,表示可重试的异常类型。默认为空,如果同时 exclude 也为空则会重试所有异常。但在使用时需要注意@Retryable(value=RuntimeException.class)publicvoidtestRetry01()throwsMyException{System.out.println("测试-value属性");thrownewRuntimeException("出现了异常");}例:testRetry01 只会在程序抛出 RuntimeException 时,开启重试exclude不可重试的异常类型。默认为空(如果 include 也为为空,将重试所有异常)。如果 include 为空但 exclude 不为空,则重试非 exclude 中的异常@Retryable(exclude=RuntimeException.class)publicvoidtestRetry02()throwsMyException{System.out.println("测试-value属性");thrownewMyException("出现了异常");}例:testRetry02 在程序抛出 MyException 时,不会开启重试maxAttempts最大重试次数,默认为 3maxAttemptsExpression最大尝试次数的表达式,表达式一旦设置了值,则会覆盖 maxAttempts 的值,maxAttemptsExpression 可以读取 application.yml 配置文件里的数据,也可以通过 SpEL 表达式计算对应的值@Retryable(value=MyException.class,maxAttemptsExpression="${maxAttempts}")publicvoidtestRetry03()throwsMyException{System.out.println("测试-maxAttemptsExpression属性");thrownewMyException("出现了异常");}例:testRetry03 会去读 properties 配置文件获取属性名为 maxAttempts 的值@Retryable(value=MyException.class,maxAttemptsExpression="#{2+3}")publicvoidtestRetry04()throwsMyException{System.out.println("测试-maxAttemptsExpression属性");thrownewMyException("出现了异常");}例:testRetry04 会去通过 SqlEL 计算出对应的重试值exceptionExpression异常处理表达式,ExpressionRetryPolicy 中使用,执行完父类的 canRetry 之后,需要校验 exceptionExpression 的值,为 true 则可以重试@Retryable(value=MyException.class,exceptionExpression="#{@retryService.isRetry()}")publicvoidtestRetry05()throwsMyException{System.out.println("测试-exceptionExpression");thrownewMyException("出现了异常");}例:这个表达式的意思就是,如果 testRetry05 方法出现异常 会调用 retryService.isRetry() 方法,根据返回结果判断是否重试@Recover兜底方法当 @Retryable 方法重试失败之后,最后就会调用 @Recover 方法。用于 @Retryable 失败时的“兜底”处理方法。 @Recover 的方法必须要与 @Retryable 注解的方法保持一致,第一入参为要重试的异常,其他参数与 @Retryable 保持一致,返回值也要一样,否则无法执行!@Retryable(value=MyException.class)publicvoidtestRetry06()throwsMyException{System.out.println("测试兜底方法");thrownewMyException("出现了异常");}@Recoverpublicvoidrecover06(MyExceptione){System.out.println("兜底方法开启,异常信息:"+e.getMessage());}熔断模式@CircuitBreaker指在具体的重试机制下失败后打开断路器,过了一段时间,断路器进入半开状态,允许一个进入重试,若失败再次进入断路器,成功则关闭断路器,注解为@CircuitBreaker,具体包括熔断打开时间、重置过期时间@CircuitBreaker(openTimeout=1000,resetTimeout=3000,value=MyException.class)publicvoidtestRetry07()throwsMyException{System.out.println("测试CircuitBreaker注解");thrownewMyException("出现了异常");}例:openTimeout 时间范围内失败 maxAttempts 次数后,熔断打开 resetTimeout 时长 这个方法的意思就是方法在一秒内失败三次时,触发熔断,下次在有请求过来时,直接进入重试策略SimpleRetryPolicy 默认最多重试 3 次TimeoutRetryPolicy 默认在 1 秒内失败都会重试ExpressionRetryPolicy 符合表达式就会重试CircuitBreakerRetryPolicy 增加了熔断的机制,如果不在熔断状态,则允许重试CompositeRetryPolicy 可以组合多个重试策略NeverRetryPolicy 从不重试(也是一种重试策略哈)AlwaysRetryPolicy 总是重试退避策略退避策略退避是指怎么去做下一次的重试,在这里其实就是等待多长时间。通过 @Backoff 注解实现,那么我们首先看一下@Backoff 的参数@Backoff 参数value默认为 1000, 与 delay 作用相同,表示延迟的毫秒数。当 delay 非 0 时,此参数忽略。delay默认为 0。在指数情况下用作初始值,在统一情况下用作*的最小值。当此元素的值为 0 时,将采用元素 value 的值,否则将采用此元素的值,并且将忽略 value。maxDelay默认为 0。重试之间的最大等待时间(以毫秒为单位)。如果小于 delay,那么将应用默认值为 30000Lmultipler默认为 0。如果为正,则用作乘法器以生成下一个退避延迟。返回一个乘法器,用于计算下一个退避延迟delayExpression评估标准退避期的表达式。在指数情况下用作初始值*,在均匀情况下用作最小值。覆盖 delay。maxDelayExpression该表达式计算重试之间的最大等待时间(以毫秒为单位)。 如果小于 delay,那么将应用 30000L 为默认值。覆盖 maxDelay。multiplierExpression评估为用作乘数的值,以生成退避的下一个延迟。覆盖 multiplier。 返回一个乘数表达式,用于计算下一个退避延迟random默认为 false,在指数情况下 multiplier> 0 将此值设置为 true 可以使后退延迟随机化,从而使最大延迟乘以前一延迟,并且两个值之间的分布是均匀的。@Retryable(value=MyException.class,maxAttempts=4,backoff=@Backoff(delay=2000,multiplier=2,maxDelay=5000))publicvoidtestRetry08()throwsMyException{System.out.println("测试-backoff属性");thrownewMyException("出现了异常");}@Backoff 的参数会影响我们使用哪种退避策略FixedBackOffPolicy默认退避策略,每 1 秒重试 1 次ExponentialBackOffPolicy指数退避策略,当设置 multiplier 时使用,每次重试时间间隔为 当前延迟时间 * multiplier。例如:默认初始 0.1 秒,系数是 2,那么下次延迟 0.2 秒,再下次就是延迟 0.4 秒,如此类推,最大 30 秒。ExponentialRandomBackOffPolicy指数随机退避策略。在指数退避策略的基础上增加了随机性。UniformRandomBackOffPolicy均匀随机策略,设置 maxDely 但没有设置 multiplier 时使用,重试间隔会在 maxDelay 和 delay 间随机原理切入点@EnableRetry@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@EnableAspectJAutoProxy(proxyTargetClass=false)@Import(RetryConfiguration.class)@Documentedpublic@interfaceEnableRetry{/***Indicatewhethersubclass-based(CGLIB)proxiesaretobecreatedasopposedto*standardJavainterface-basedproxies.Thedefaultis{@codefalse}.*@returnwhethertoproxyornottoproxytheclass*/booleanproxyTargetClass()defaultfalse;}@EnablRetry 中使用了两个特殊的注解@EnableAspectJAutoProxy这个注解的作用是开启 aop 的功能,默认使用 jdk 的动态代理。如果 proxyTargetClass 参数为 true,则使用 cglib 的动态代理。@ImportImport 引入了 RetryConfiguration 的 bean 。我们重点看下这个 bean。@Role(BeanDefinition.ROLE_INFRASTRUCTURE)@ComponentpublicclassRetryConfigurationextendsAbstractPointcutAdvisorimplementsIntroductionAdvisor,BeanFactoryAware,InitializingBean{privateAdviceadvice;privatePointcutpointcut;我们可以看到 RetryConfiguration 继承了 AbstractPointcutAdvisor,所以 RetryConfiguration 需要实现 getAdvice() 和 getPointcut() 接口,所以这个 bean 的作用就是为 @Retryable 注解注册 pointcut 切点和 advice 增强。我们再来看他的 初始化方法@OverridepublicvoidafterPropertiesSet()throwsException{this.retryContextCache=findBean(RetryContextCache.class);this.methodArgumentsKeyGenerator=findBean(MethodArgumentsKeyGenerator.class);this.newMethodArgumentsIdentifier=findBean(NewMethodArgumentsIdentifier.class);this.retryListeners=findBeans(RetryListener.class);this.sleeper=findBean(Sleeper.class);Set>retryableAnnotationTypes=newLinkedHashSet>(1);retryableAnnotationTypes.add(Retryable.class);this.pointcut=buildPointcut(retryableAnnotationTypes);//创建pointcutthis.advice=buildAdvice();//创建adviceif(this.adviceinstanceofBeanFactoryAware){((BeanFactoryAware)this.advice).setBeanFactory(this.beanFactory);}}protectedAdvicebuildAdvice(){AnnotationAwareRetryOperationsInterceptorinterceptor=newAnnotationAwareRetryOperationsInterceptor();if(this.retryContextCache!=null){interceptor.setRetryContextCache(this.retryContextCache);}if(this.retryListeners!=null){interceptor.setListeners(this.retryListeners);}if(this.methodArgumentsKeyGenerator!=null){interceptor.setKeyGenerator(this.methodArgumentsKeyGenerator);}if(this.newMethodArgumentsIdentifier!=null){interceptor.setNewItemIdentifier(this.newMethodArgumentsIdentifier);}if(this.sleeper!=null){interceptor.setSleeper(this.sleeper);}returninterceptor;}上面代码用到了 AnnotationClassOrMethodPointcut,其实它最终还是用到了 AnnotationMethodMatcher 来根据注解进行切入点的过滤。这里就是 @Retryable 注解了下面来看 AnnotationAwareRetryOperationsInterceptor 的 invoke() 方法@OverridepublicObjectinvoke(MethodInvocationinvocation)throwsThrowable{//获取真正的代理类MethodInterceptordelegate=getDelegate(invocation.getThis(),invocation.getMethod());if(delegate!=null){//代理类存在,则执行代理类的invoke()方法returndelegate.invoke(invocation);}else{//否则,直接执行目标方法returninvocation.proceed();}}这里 getDelegate() 会处理 @Retryable 的相关参数以及决定使用哪种重试策略和退避策略。privateMethodInterceptorgetDelegate(Objecttarget,Methodmethod){ConcurrentMapcachedMethods=this.delegates.get(target);if(cachedMethods==null){cachedMethods=newConcurrentHashMap();}MethodInterceptordelegate=cachedMethods.get(method);if(delegate==null){//获取方法上的Retryable注解MethodInterceptorinterceptor=NULL_INTERCEPTOR;Retryableretryable=AnnotatedElementUtils.findMergedAnnotation(method,Retryable.class);if(retryable==null){//获取类上的Retryable注解retryable=AnnotatedElementUtils.findMergedAnnotation(method.getDeclaringClass(),Retryable.class);}if(retryable==null){//获取目标类或者方法上的Retryable注解retryable=findAnnotationOnTarget(target,method,Retryable.class);}if(retryable!=null){if(StringUtils.hasText(retryable.interceptor())){//是否实现了自定义拦截,优先级最高interceptor=this.beanFactory.getBean(retryable.interceptor(),MethodInterceptor.class);}elseif(retryable.stateful()){//有状态的拦截interceptor=getStatefulInterceptor(target,method,retryable);}else{//无状态的拦截interceptor=getStatelessInterceptor(target,method,retryable);}}cachedMethods.putIfAbsent(method,interceptor);delegate=cachedMethods.get(method);}this.delegates.putIfAbsent(target,cachedMethods);returndelegate==NULL_INTERCEPTORnull:delegate;}该方法会返回 @Retryable 最终使用的处理类,我们重点看一下 getStatelessInterceptor 的处理,getStatefulInterceptor 中多了 @CircuitBreaker 熔断相关的处理。privateMethodInterceptorgetStatelessInterceptor(Objecttarget,Methodmethod,Retryableretryable){//生成RetryTemplate,同时主持listenerRetryTemplatetemplate=createTemplate(retryable.listeners());//设置重试策略template.setRetryPolicy(getRetryPolicy(retryable));//设置退避策略template.setBackOffPolicy(getBackoffPolicy(retryable.backoff()));//通过StatelessRetryInterceptorBuilder创建RetryOperationsInterceptor拦截,初始化重试模板等信息returnRetryInterceptorBuilder.stateless().retryOperations(template).label(retryable.label()).recoverer(getRecoverer(target,method)).build();}在回头看看 getStatefulInterceptor 方法privateMethodInterceptorgetStatefulInterceptor(Objecttarget,Methodmethod,Retryableretryable){RetryTemplatetemplate=createTemplate(retryable.listeners());template.setRetryContextCache(this.retryContextCache);//获取方法上的CircuitBreaker注解CircuitBreakercircuit=AnnotatedElementUtils.findMergedAnnotation(method,CircuitBreaker.class);if(circuit==null){//如果熔断参数不为空,则处理相关参数,返回响应的拦截处理方,如果为空,则处理非熔断的有状态重试circuit=findAnnotationOnTarget(target,method,CircuitBreaker.class);}if(circuit!=null){//处理CircuitBreaker注解中的retryable相关参数,获得重试策略RetryPolicypolicy=getRetryPolicy(circuit);CircuitBreakerRetryPolicybreaker=newCircuitBreakerRetryPolicy(policy);breaker.setOpenTimeout(getOpenTimeout(circuit));breaker.setResetTimeout(getResetTimeout(circuit));template.setRetryPolicy(breaker);template.setBackOffPolicy(newNoBackOffPolicy());Stringlabel=circuit.label();if(!StringUtils.hasText(label)){label=method.toGenericString();}returnRetryInterceptorBuilder.circuitBreaker().keyGenerator(newFixedKeyGenerator("circuit")).retryOperations(template).recoverer(getRecoverer(target,method)).label(label).build();}RetryPolicypolicy=getRetryPolicy(retryable);template.setRetryPolicy(policy);template.setBackOffPolicy(getBackoffPolicy(retryable.backoff()));Stringlabel=retryable.label();returnRetryInterceptorBuilder.stateful().keyGenerator(this.methodArgumentsKeyGenerator).newMethodArgumentsIdentifier(this.newMethodArgumentsIdentifier).retryOperations(template).label(label).recoverer(getRecoverer(target,method)).build();}重试逻辑及策略实现RetryTemplate 的 doExecute 方法。protectedTdoExecute(RetryCallbackretryCallback,RecoveryCallbackrecoveryCallback,RetryStatestate)throwsE,ExhaustedRetryException{//获得重试策略RetryPolicyretryPolicy=this.retryPolicy;//退避策略BackOffPolicybackOffPolicy=this.backOffPolicy;//新建一个RetryContext来保存本轮重试的上下文,允许重试策略自行初始化RetryContextcontext=open(retryPolicy,state);if(this.logger.isTraceEnabled()){this.logger.trace("RetryContextretrieved:"+context);}//Makesurethecontextisavailablegloballyforclientswhoneed//it...RetrySynchronizationManager.register(context);ThrowablelastException=null;booleanexhausted=false;try{//给监听器发送一条信息。booleanrunning=doOpenInterceptors(retryCallback,context);if(!running){thrownewTerminatedRetryException("Retryterminatedabnormallybyinterceptorbeforefirstattempt");}//GetorStartthebackoffcontext...BackOffContextbackOffContext=null;Objectresource=context.getAttribute("backOffContext");if(resourceinstanceofBackOffContext){backOffContext=(BackOffContext)resource;}if(backOffContext==null){backOffContext=backOffPolicy.start(context);if(backOffContext!=null){context.setAttribute("backOffContext",backOffContext);}}//判断能否重试,就是调用RetryPolicy的canRetry方法来判断。//这个循环会直到原方法不抛出异常,或不需要再重试while(canRetry(retryPolicy,context)&!context.isExhaustedOnly()){try{if(this.logger.isDebugEnabled()){this.logger.debug("Retry:count="+context.getRetryCount());}lastException=null;returnretryCallback.doWithRetry(context);}catch(Throwablee){//方法抛出了异常lastException=e;try{//记录异常信息registerThrowable(retryPolicy,state,context,e);}catch(Exceptionex){thrownewTerminatedRetryException("Couldnotregisterthrowable",ex);}finally{//调用RetryListener的onError方法doOnErrorInterceptors(retryCallback,context,e);}//再次判断能否重试if(canRetry(retryPolicy,context)&!context.isExhaustedOnly()){try{//如果可以重试则走退避策略backOffPolicy.backOff(backOffContext);}catch(BackOffInterruptedExceptionex){lastException=e;//backoffwaspreventedbyanotherthread-failtheretryif(this.logger.isDebugEnabled()){this.logger.debug("Abortretrybecauseinterrupted:count="+context.getRetryCount());}throwex;}}if(this.logger.isDebugEnabled()){this.logger.debug("Checkingforrethrow:count="+context.getRetryCount());}if(shouldRethrow(retryPolicy,context,state)){if(this.logger.isDebugEnabled()){this.logger.debug("Rethrowinretryforpolicy:count="+context.getRetryCount());}throwRetryTemplate.wrapIfNecessary(e);}}/**Astatefulattemptthatcanretrymayrethrowtheexceptionbeforenow,*butifwegetthisfarinastatefulretrythere'sareasonforit,*likeacircuitbreakerorarollbackclassifier.*/if(state!=null&context.hasAttribute(GLOBAL_STATE)){break;}}if(state==null&this.logger.isDebugEnabled()){this.logger.debug("Retryfailedlastattempt:count="+context.getRetryCount());}exhausted=true;//这里会查看是否有兜底方法,有就执行,没有就抛出异常returnhandleRetryExhausted(recoveryCallback,context,state);}catch(Throwablee){throwRetryTemplate.wrapIfNecessary(e);}finally{close(retryPolicy,context,state,lastException==null||exhausted);//关闭RetryListenerdoCloseInterceptors(retryCallback,context,lastException);RetrySynchronizationManager.clear();}}主要核心重试逻辑就是上面的代码了,看上去还是挺简单的。下面看 RetryPolicy 的 canRetry 方法和 BackOffPolicy 的 backOff 方法,以及这两个 Policy 是怎么来的。我们回头看看getStatelessInterceptor方法中的getRetryPolicy和getRetryPolicy方法。privateRetryPolicygetRetryPolicy(Annotationretryable){Mapattrs=AnnotationUtils.getAnnotationAttributes(retryable);@SuppressWarnings("unchecked")Class[]includes=(Class[])attrs.get("value");//通过注解属性判断重试策略这里判断如果value注解内容为空才去获取include注解的内容可得出value的优先级大于includeStringexceptionExpression=(String)attrs.get("exceptionExpression");booleanhasExpression=StringUtils.hasText(exceptionExpression);if(includes.length==0){@SuppressWarnings("unchecked")Class[]value=(Class[])attrs.get("include");includes=value;}@SuppressWarnings("unchecked")Class[]excludes=(Class[])attrs.get("exclude");IntegermaxAttempts=(Integer)attrs.get("maxAttempts");StringmaxAttemptsExpression=(String)attrs.get("maxAttemptsExpression");if(StringUtils.hasText(maxAttemptsExpression)){maxAttempts=PARSER.parseExpression(resolve(maxAttemptsExpression),PARSER_CONTEXT).getValue(this.evaluationContext,Integer.class);}if(includes.length==0&excludes.length==0){SimpleRetryPolicysimple=hasExpressionnewExpressionRetryPolicy(resolve(exceptionExpression)).withBeanFactory(this.beanFactory):newSimpleRetryPolicy();simple.setMaxAttempts(maxAttempts);returnsimple;}Map,Boolean>policyMap=newHashMap,Boolean>();for(Classtype:includes){policyMap.put(type,true);}for(Classtype:excludes){policyMap.put(type,false);}booleanretryNotExcluded=includes.length==0;if(hasExpression){returnnewExpressionRetryPolicy(maxAttempts,policyMap,true,exceptionExpression,retryNotExcluded).withBeanFactory(this.beanFactory);}else{returnnewSimpleRetryPolicy(maxAttempts,policyMap,true,retryNotExcluded);}}总结一下:就是通过 @Retryable 注解中的参数,来判断具体使用文章开头说到的哪个重试策略,是 SimpleRetryPolicy 还是 ExpressionRetryPolicy 等。privateBackOffPolicygetBackoffPolicy(Backoffbackoff){longmin=backoff.delay()==0backoff.value():backoff.delay();if(StringUtils.hasText(backoff.delayExpression())){min=PARSER.parseExpression(resolve(backoff.delayExpression()),PARSER_CONTEXT).getValue(this.evaluationContext,Long.class);}longmax=backoff.maxDelay();if(StringUtils.hasText(backoff.maxDelayExpression())){max=PARSER.parseExpression(resolve(backoff.maxDelayExpression()),PARSER_CONTEXT).getValue(this.evaluationContext,Long.class);}doublemultiplier=backoff.multiplier();if(StringUtils.hasText(backoff.multiplierExpression())){multiplier=PARSER.parseExpression(resolve(backoff.multiplierExpression()),PARSER_CONTEXT).getValue(this.evaluationContext,Double.class);}if(multiplier>0){ExponentialBackOffPolicypolicy=newExponentialBackOffPolicy();if(backoff.random()){policy=newExponentialRandomBackOffPolicy();}policy.setInitialInterval(min);policy.setMultiplier(multiplier);policy.setMaxInterval(max>minmax:ExponentialBackOffPolicy.DEFAULT_MAX_INTERVAL);if(this.sleeper!=null){policy.setSleeper(this.sleeper);}returnpolicy;}if(max>min){UniformRandomBackOffPolicypolicy=newUniformRandomBackOffPolicy();policy.setMinBackOffPeriod(min);policy.setMaxBackOffPeriod(max);if(this.sleeper!=null){policy.setSleeper(this.sleeper);}returnpolicy;}FixedBackOffPolicypolicy=newFixedBackOffPolicy();policy.setBackOffPeriod(min);if(this.sleeper!=null){policy.setSleeper(this.sleeper);}returnpolicy;}就是通过 @Backoff 注解中的参数,来判断具体使用文章开头说到的哪个退避策略,是 FixedBackOffPolicy 还是 UniformRandomBackOffPolicy 等。那么每个 RetryPolicy 都会重写 canRetry 方法,然后在 RetryTemplate 判断是否需要重试。我们看看 SimpleRetryPolicy 的@OverridepublicbooleancanRetry(RetryContextcontext){Throwablet=context.getLastThrowable();//判断抛出的异常是否符合重试的异常//还有,是否超过了重试的次数return(t==null||retryForException(t))&context.getRetryCount()
|
|