找回密码
 会员注册
查看: 18|回复: 0

关于Sentinel的那些事

[复制链接]

2万

主题

0

回帖

7万

积分

超级版主

积分
72245
发表于 2024-10-6 20:36:04 | 显示全部楼层 |阅读模式
前言随着微服务的流行,服务和服务之间的稳定性变得越来越重要。Sentinel是面向分布式服务架构的流量控制组件,主要以流量为切入点,从流量控制、熔断降级、系统自适应保护等多个维度来帮助您保障微服务的稳定性。所以合理的使用Sentinel,并掌握其背后的底层原理就显得尤为重要!近日,在进行线上稳定性巡检的时候,发现线上服务偶尔会出现调用Ab接口熔断的情况。查看线上配置的Sentinel熔断规则如下?发现似乎是没有问题的。查看线上日志,发现熔断之前的慢调用也不是很多,按照1s的统计窗口来计算的话,远远达不到配置的50%的比例。很是不解,于是决定看下源码一探究竟。为了帮大家节省时间,这里先直接上结论:?Sentinel的慢调用比例熔断规则统计的时候,不是等到滑动窗口结束了再去根据这一整个窗口的数据来做判断,而是每次请求都会做判断。比如拿最上面的配置规则来做例子的话,如果当前窗口的刚开始的前几个请求中(大于5)慢调用比例刚好超过了50%,那么就会触发熔断,断路器直接打开,3s内的所有请求都走降级,然后3s后断路器进入半开状态,如果下一个请求正常了,那么断路器就关闭。?下面是源码探险之旅?梦开始的地方:@SentinelResource(value = "pandora.abService.callAb", blockHandler = "callAb", blockHandlerClass = SentinelFallbackHandler.class) public String callAb(Long userId, String key) { ? ?//省略... } 项目中通过@SentinelResource注解来进行资源管控,如果触发熔断或者降级会走注解中配置的blockHandlerClass的blockHandler降级方法,这是我们最常用的Sentinel使用方式。然后再SentinelAutoConfiguration类中可以看到会注入一个切面处理的类SentinelResourceAspect。@Bean @ConditionalOnMissingBean public SentinelResourceAspect sentinelResourceAspect() { ? return new SentinelResourceAspect(); } 顾名思义,这个类就是处理@SentinelResource注解包裹的资源的。下面来看下这个SentinelResourceAspect。可以发现SphU.entry() 是一个核心的逻辑, 从SphU.entry()方法往下执行会进入到Sph.entry(),Sph的默认实现类是CtSph,而最终会进入CtSph的entry方法:可以发现,这里主要就是对资源进行封装,然后调用entryWithPriority()。private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) ? ?throws BlockException { ? ?// 获取上下文 ? ?Context context = ContextUtil.getContext(); ? ?// 省略部分代码... ? ?// 获取处理责任链 ? ?ProcessorSlot chain = lookProcessChain(resourceWrapper); ? ?// 省略部分代码... ? ?new CtEntry(resourceWrapper, chain, context); ? ?try { ? ? ? ?// 链式处理资源 ? ? ? ?chain.entry(context, resourceWrapper, null, count, prioritized, args); ? ?} catch (BlockException e1) { ? ? ? ?e.exit(count, args); ? ? ? ?throw e1; ? ?} catch (Throwable e1) { ? ? ? ?// This should not happen, unless there are errors existing in Sentinel internal. ? ? ? ?RecordLog.info("Sentinel unexpected exception", e1); ? ?} ? ?return e; } 看了这段代码,可以发现,这里会先去获取对应的资源(也就是@SentinelResource包裹的方法,在Sentinel中把这类方法抽象成一个resource)处理的责任链,然后通过对资源进行链式处理。我们先看下lookProcessChain(resourceWrapper) 获取了哪些处理链:public class DefaultSlotChainBuilder implements SlotChainBuilder { ? ?@Override ? ?public ProcessorSlotChain build() { ? ? ? ?// 处理链 ? ? ? ?ProcessorSlotChain chain = new DefaultProcessorSlotChain(); ? ? ? ?chain.addLast(new NodeSelectorSlot()); ? ? ? ?chain.addLast(new ClusterBuilderSlot()); ? ? ? ?chain.addLast(new LogSlot()); ? ? ? ?chain.addLast(new StatisticSlot()); ? ? ? ?chain.addLast(new SystemSlot()); ? ? ? ?chain.addLast(new AuthoritySlot()); ? ? ? ?chain.addLast(new FlowSlot()); ? ? ? ?chain.addLast(new DegradeSlot()); ? ? ? ?return chain; ? ?} } @Override public voidaddLast(AbstractLinkedProcessorSlotprotocolProcessor) { end.setNext(protocolProcessor); end= protocolProcessor; } 走到这里就很清晰了,发现这里其实就和Sentinel官网的一张图对应上了。这里就是一个典型的责任链模式。我们先接着往下看责任链是如何工作的,也就是chain.entry(),发现chain.entry()处理的时候都调用fireEntry(),调用责任链的next节点进行处理。现在我们重点关注下慢查询和熔断相关的逻辑根据官网的介绍,也就是下面二个ProcessorSlot。StatisticSlot:用于记录、统计不同纬度的 runtime 指标监控信息。DegradeSlot:通过统计信息以及预设的规则,来做熔断降级。StatisticSlot:先来看StatisTicSlot对应entry()。@Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, ? ? ? ? ? ? ? ? ?boolean prioritized, Object... args) throws Throwable { ? ?try { ? ? ? ?// 先走下面的规则校验的slot ? ? ? ?// Do some checking. ? ? ? ?fireEntry(context, resourceWrapper, node, count, prioritized, args); ? ? ? ?// 校验通过就会走下面的node的两个统计方法 ? ? ? ?// Request passed, add thread count and pass count. ? ? ? ?node.increaseThreadNum(); ? ? ? ?node.addPassRequest(count); ? ? ? ?// 省略部分代码... ? ?} catch (BlockException e) { ? ? ? ?// 后续slot如果校验不通过,也就是触发了限流或者熔断就会走到这里 ? ? ? ?// Blocked, set block exception to current entry. ? ? ? ?context.getCurEntry().setBlockError(e); ? ? ? ?// 统计被block的次数 ? ? ? ?// Add block count. ? ? ? ?node.increaseBlockQps(count); ? ? ? ?if (context.getCurEntry().getOriginNode() != null) { ? ? ? ? ? ?context.getCurEntry().getOriginNode().increaseBlockQps(count); ? ? ? ?} ? ? ? ?// 省略部分代码... ? ? ? ?throw e; ? ?} catch (Throwable e) { ? ? ? ?// Unexpected internal error, set error to current entry. ? ? ? ?context.getCurEntry().setError(e); ? ? ? ?throw e; ? ?} } 看下entry方法,该方法首先会触发后续slot的entry方法,即SystemSlot、FlowSlot、DegradeSlot等的规则,如果规则不通过,就会抛出BlockException,则会在node中统计被block的数量。反之会在node中统计通过的请求数和线程数等信息。我们可以看到node.addPassRequest()这段代码是在fireEntry执行之后执行的,这意味着,当前请求通过了sentinel的流控等规则,此时需要将当次请求记录下来,也就是执行 node.addPassRequest()这行代码,我们跟进去看看:@Override public void addPassRequest(int count) { ? ?rollingCounterInSecond.addPass(count); ? ?rollingCounterInMinute.addPass(count); } 这里就是我们发现Sentinel内部有两个窗口的概念,一种是秒级别的,一种是分钟级别的。继续跟进去,发现会进入ArrayMetric。private final LeapArray data; @Override public void addPass(int count) { ? ?WindowWrap wrap = data.currentWindow(); ? ?wrap.value().addPass(count); } 这里就发现了统计的两个核心类LeapArray和MetricBucket,我们先看MetricBucket。?private final LongAdder[] counters; public MetricBucket add(MetricEvent event, long n) { ? ?counters[event.ordinal()].add(n); ? ?return this; } public enum MetricEvent { ? ?PASS, ? ?BLOCK, ? ?EXCEPTION, ? ?SUCCESS, ? ?RT, ? ?OCCUPIED_PASS } 发现MetricBucket会用MetricEvent来区分统计的信息类别,并用LongAdder来记录信息确保线程安全。然后我们回过头去下看data.currentWindow()这个获取当前窗口的处理,也就是LeapArray类的逻辑。?public WindowWrap currentWindow() { ? ?return currentWindow(TimeUtil.currentTimeMillis()); } // timeMillis就是当前时间戳public WindowWrap currentWindow(long timeMillis) { ? ?if (timeMillis old = array.get(idx); ? ? ? ?if (old == null) { ? ? ? ? ? ?// 如果为空,说明之前还没初始化,就初始化一个新的窗口 ? ? ? ? ? ?WindowWrap window = new WindowWrap(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); ? ? ? ? ? ?// cas更新到array中 ? ? ? ? ? ?if (array.compareAndSet(idx, null, window)) { ? ? ? ? ? ? ? ?return window; ? ? ? ? ? ?} else { ? ? ? ? ? ? ? ?Thread.yield(); ? ? ? ? ? ?} ? ? ? ?} ?// 如果old窗口的开始时间和计算得到的开始时间相等,说明old窗口就是当前时间窗口 ? ? ? ? ? ?else if (windowStart == old.windowStart()) { ? ? ? ? ? ?return old; ? ? ? ?} else if (windowStart > old.windowStart()) { ? ? ? ? ? ?// 如果old窗口的开始时间比计算得到的开始时间要小,说明old已经过期了,需要重新设置一个时间窗口 ? ? ? ? ? ?if (updateLock.tryLock()) { ? ? ? ? ? ? ? ?try { ? ? ? ? ? ? ? ? ? ?// 重设时间窗口 ? ? ? ? ? ? ? ? ? ?return resetWindowTo(old, windowStart); ? ? ? ? ? ? ? ?} finally { ? ? ? ? ? ? ? ? ? ?updateLock.unlock(); ? ? ? ? ? ? ? ?} ? ? ? ? ? ?} else { ? ? ? ? ? ? ? ?Thread.yield(); ? ? ? ? ? ?} ? ? ? ?} else if (windowStart (windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); ? ? ? ?} ? ?} } 还有一点补充一下,秒级别的滑动窗口,在内部会初始化两个小的窗口,每个窗口的时间为500ms,代码如下:// 这里SampleCountProperty.SAMPLE_COUNT默认为2 private transient volatile Metric rollingCounterInSecond = new ?ArrayMetric(SampleCountProperty.SAMPLE_COUNT, ? ?IntervalProperty.INTERVAL); public ArrayMetric(int sampleCount, int intervalInMs) { ? ?this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs); } public LeapArray(int sampleCount, int intervalInMs) { ? ?// 窗口个数,1s默认为两个,所以每个小窗口长度为500ms ? ?this.windowLengthInMs = intervalInMs / sampleCount; ? ?this.intervalInMs = intervalInMs; ? ?this.sampleCount = sampleCount; ? ?this.array = new AtomicReferenceArray(sampleCount); } 统计数据的StatisticSlot搞明白了,最后我们看熔断降级相关的规则判断DegradeSlot。DegradeSlot:熔断相关的逻辑主要在DegradeSlot.exit()方法中。public ResponseTimeCircuitBreaker(DegradeRule rule) { ? ?// 慢调用比例的滑动窗口 ? ?// 可以发现小窗口只有一个,窗口大小是熔断规则配置的长度,这里是30s;这点比StatisticSlot的滑动窗口要简单一些 ? ?this(rule, new SlowRequestLeapArray(1, rule.getStatIntervalMs())); } @Override public void exit(Context context, ResourceWrapper r, int count, Object... args) { ? ?Entry curEntry = context.getCurEntry(); ? ? ? ?// 省略部分代码… ? ? ? ?if (curEntry.getBlockError() == null) { ? ? ? ?// passed request ? ? ? ?for (CircuitBreaker circuitBreaker : circuitBreakers) { ? ? ? ? ? ? ? ? ? ?// 熔断规则校验 ? ? ? ? ? ?circuitBreaker.onRequestComplete(context); ? ? ? ?} ? ?} ? ? ? ?fireExit(context, r, count, args); } // 慢调用比例熔断规则校验ResponseTimeCircuitBreaker.onRequestComplete @Override public void onRequestComplete(Context context) { ? ?// 这里小窗口只有1个 ? ?SlowRequestCounter counter = slidingCounter.currentWindow().value(); ? ? ? ?Entry entry = context.getCurEntry(); ? ?if (entry == null) { ? ? ? ?return; ? ?} ? ?long completeTime = entry.getCompleteTimestamp(); ? ?if (completeTime maxAllowedRt) { ? ? ? ?counter.slowCount.add(1); ? ?} ? ? ? ?// 总调用次数+1 ? ?counter.totalCount.add(1); ? ?// 慢调用熔断规则判断 ? ?handleStateChangeWhenThresholdExceeded(rt); } // 慢调用熔断规则判断 private void handleStateChangeWhenThresholdExceeded(long rt) { ? ?if (currentState.get() == State.OPEN) { ? ? ? ?return; ? ?} ? ? ? ?if (currentState.get() == State.HALF_OPEN) { ? ? ? ?// In detecting request ? ? ? ?// TODO: improve logic for half-open recovery ? ? ? ?if (rt > maxAllowedRt) { ? ? ? ? ? ?fromHalfOpenToOpen(1.0d); ? ? ? ?} else { ? ? ? ? ? ?fromHalfOpenToClose(); ? ? ? ?} ? ? ? ?return; ? ?} ? ?List counters = slidingCounter.values(); ? ?long slowCount = 0; ? ?long totalCount = 0; ? ?for (SlowRequestCounter counter : counters) { ? ? ? ?slowCount += counter.slowCount.sum(); ? ? ? ?totalCount += counter.totalCount.sum(); ? ?} ? ?// 如果当前窗口的总调用次数小于配置的最小调用数(也就是管理后台配置的5)直接return,不做校验 ? ?if (totalCount maxSlowRequestRatio) { ? ? ? ?transformToOpen(currentRatio); ? ?} } // 慢调用比例熔断处理,打开熔断开关,下次调用直接被熔断protected void transformToOpen(double triggerValue) { ? ?// 一开始熔断器肯定是关闭状态也就是CLOSE ? ?State cs = currentState.get(); ? ?switch (cs) { ? ? ? ?case CLOSED: ? ? ? ? ? ?// 关闭状态 —> 打开状态 ? ? ? ? ? ?fromCloseToOpen(triggerValue); ? ? ? ? ? ?break; ? ? ? ?case HALF_OPEN: ? ? ? ? ? ?// 半开状态 --> 打开状态 ? ? ? ? ? ?fromHalfOpenToOpen(triggerValue); ? ? ? ? ? ?break; ? ? ? ?default: ? ? ? ? ? ?break; ? ?} } // 熔断器打开的时候会设置下一次重试的时机(当前时间+熔断时长),也就是半开状态的时机,此时会放行一个请求,如果这个请求不再是慢调用请求,那么熔断器关闭,否则熔断器打开。protected void updateNextRetryTimestamp() { ? ?this.nextRetryTimestamp = TimeUtil.currentTimeMillis() + recoveryTimeoutMs; } // 在DegradeSlot.entry()会调用tryPass方法,这里会判断是否把熔断器从打开状态 —-> 半开状态// 熔断器打开的状态下,这里会先判断当前时间是否大于上面设置的下次重试时间,如果大于就把熔断器状态变为半开。?public boolean tryPass(Context context) { ? ?// Template implementation. ? ?if (currentState.get() == State.CLOSED) { ? ? ? ?return true; ? ?} ? ?if (currentState.get() == State.OPEN) { ? ? ? ?// For half-open state we allow a request for probing. ? ? ? ?return retryTimeoutArrived() & fromOpenToHalfOpen(context); ? ?} ? ?return false; } 上面的代码很长,总结一下。在给一个资源配置慢调用比例类型的熔断规则前提下:?1. Sentinel会根据我们配置的规则,创建对应的滑动窗口来统计相应的慢调用请求数和总请求数;这里的滑动窗口的统计窗口个数为1,窗口长度为我们设置的长度;在本文的案例中,窗口长度为3s,小窗口个数为1。2. 对应的资源每次请求结束的时候,会在DegradeSlot.exit() 方法中做统计,先获取当前滑动窗口,然后根据rt来判断是否是慢调用,然后记录在1中的统计窗口中。3. 如果当前请求是慢调用就会走慢调用规则判断,默认一开始熔断器是close关闭状态,此时会计算当前窗口的总调用次数是否不小于配置的最小调用数(5),如果是的话就会进行慢调用比例判断,如果慢调用比例刚好也大于我们配置的慢调用比例(50%),此时就会把熔断器状态从close->open,并根据我们配置的熔断时长,设置下一次半开状态的时间(当前时间+熔断时长(3s))。4. 下次该资源的请求过来的时候先走DegradeSlot.entry()中的判断逻辑。如果熔断器是close状态,那么直接放行该请求;如果熔断器是open状态,且熔断时间还没结束那么该请求就会直接熔断,抛出DegradeException熔断异常,在最外层的会catch住该异常走我们配置的降级逻辑;如果熔断器是open状态,且熔断时间已经结束,那么就会把熔断状态从open->half-open也就是改为半开状态,此时会放行一个请求,如果该请求rt不再是慢调用请求,那么就吧熔断状态改为close关闭状态。真相大白到此就真相大白了,通过以上源码分析可以找到线上Ab接口为什么会偶发熔断了。Sentinel的慢调用比例熔断规则统计的时候,不是等到滑动窗口结束了再去根据这一整个窗口的数据来做判断,而是每次请求都会做判断。比如拿最上面的配置规则来做例子的话,如果当前窗口的刚开始的前几个请求中(大于5)慢调用比例刚好超过了50%,那么就会触发熔断,断路器直接打开,3s内的所有请求都走降级,然后3s后断路器进入半开状态,如果下一个请求正常了,那么断路器就关闭。?*文/mmmooo
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 会员注册

本版积分规则

QQ|手机版|心飞设计-版权所有:微度网络信息技术服务中心 ( 鲁ICP备17032091号-12 )|网站地图

GMT+8, 2025-1-11 05:55 , Processed in 0.481415 second(s), 26 queries .

Powered by Discuz! X3.5

© 2001-2025 Discuz! Team.

快速回复 返回顶部 返回列表