Sentinel源码—6.熔断降级和数据统计的实现

大纲

1.DegradeSlot实现熔断降级的原理与源码

2.Sentinel数据指标统计的滑动窗口算法

1.DegradeSlot实现熔断降级的原理与源码

(1)熔断降级规则DegradeRule的配置Demo

(2)注册熔断降级监听器和加载熔断降级规则

(3)DegradeSlot根据熔断降级规则对请求进行验证

(1)熔断降级规则DegradeRule的配置Demo

首先熔断降级规则的应用场景有如下两种:

场景一:在微服务架构中,当一个服务出现问题时,可以通过配置熔断降级规则,防止故障扩散,保护整个系统的稳定性。

场景二:在调用第三方API时,可以配置熔断降级规则,避免因第三方API不稳定导致自身系统不稳定。

然后从下图可知,熔断降级规则包含以下属性:

属性一:熔断策略(grade)

这表示的是熔断降级规则的类型,取值范围分别是:

RuleConstant.DEGRADE_GRADE_RT(慢调用比例)
RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO(异常比例)
RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT(异常数)

其中,默认下的熔断降级规则是基于慢调用比例策略的,也就是默认值为:

RuleConstant.DEGRADE_GRADE_RT

属性二:熔断降级的阈值(count)

DegradeRule.count属性的具体含义取决于DegradeRule.grade属性的值。

如果grade为慢调用比例,则count表示慢调用比例阈值。
如果grade为异常比例,则count表示异常比例阈值。
如果grade为异常数,则count表示异常数阈值。

属性三:熔断时长(timeWindow)

这表示的是熔断降级发生后的降级持续时间,在这段时间内对应的资源将被降级。

属性四:最小请求数(minRequestAmount)

这表示的是熔断降级统计周期内的最小请求总数。仅当周期内的请求总数达到此值时,才会根据grade和count进行熔断降级。默认值为:

RuleConstant.DEGRADE_DEFAULT_MIN_REQUEST_AMOUNT

属性五:慢调用比例阈值(slowRatioThreshold)

该属性当grade为慢调用比例时生效,取值范围为0到1之间的小数,表示慢调用请求占总请求的比例。

属性六:统计时长(statIntervalMs)

这表示的是熔断降级统计周期(单位:毫秒),默认值为1000毫秒(1秒)。在这个周期内,Sentinel会对请求进行统计,以判断是否要进行熔断降级。

public class DegradeRule extends AbstractRule {
    //熔断策略,表示的是熔断降级规则的类型
    private int grade = RuleConstant.DEGRADE_GRADE_RT;

    //熔断降级的阈值,具体含义取决于DegradeRule.grade属性的值
    //如果grade为慢调用比例,则count表示慢调用比例阈值
    //如果grade为异常比例,则count表示异常比例阈值
    //如果grade为异常数,则count表示异常数阈值
    private double count;

    //熔断时长,即熔断降级发生后的降级持续时间,在这段时间内对应的资源将被降级
    private int timeWindow;

    //最小请求数,仅当周期内的请求总数达到此值时,才会根据grade和count进行熔断降级
    private int minRequestAmount = RuleConstant.DEGRADE_DEFAULT_MIN_REQUEST_AMOUNT;

    //慢调用比例阈值,仅当grade为慢调用比例时生效
    private double slowRatioThreshold = 1.0d;

    //统计时长,熔断降级统计周期,在这个周期内,Sentinel会对请求进行统计,以判断是否要进行熔断降级
    private int statIntervalMs = 1000;
    ...
}

接着如下便是DegradeRule的配置Demo:

//Run this demo, and the output will be like:
//1529399827825,total:0, pass:0, block:0
//1529399828825,total:4263, pass:100, block:4164
//1529399829825,total:19179, pass:4, block:19176 // circuit breaker opens
//1529399830824,total:19806, pass:0, block:19806
//1529399831825,total:19198, pass:0, block:19198
//1529399832824,total:19481, pass:0, block:19481
//1529399833826,total:19241, pass:0, block:19241
//1529399834826,total:17276, pass:0, block:17276
//1529399835826,total:18722, pass:0, block:18722
//1529399836826,total:19490, pass:0, block:19492
//1529399837828,total:19355, pass:0, block:19355
//1529399838827,total:11388, pass:0, block:11388
//1529399839829,total:14494, pass:104, block:14390 // After 10 seconds, the system restored
//1529399840854,total:18505, pass:0, block:18505
//1529399841854,total:19673, pass:0, block:19676
public class SlowRatioCircuitBreakerDemo {
    private static final String KEY = "some_method";
    private static volatile boolean stop = false;
    private static int seconds = 120;
    private static AtomicInteger total = new AtomicInteger();
    private static AtomicInteger pass = new AtomicInteger();
    private static AtomicInteger block = new AtomicInteger();

    public static void main(String[] args) throws Exception {
        initDegradeRule();
        registerStateChangeObserver();
        startTick();

        int concurrency = 8;
        for (int i = 0; i < concurrency; i++) {
            Thread entryThread = new Thread(() -> {
                while (true) {
                    Entry entry = null;
                    try {
                        entry = SphU.entry(KEY);
                        pass.incrementAndGet();
                        //RT: [40ms, 60ms)
                        sleep(ThreadLocalRandom.current().nextInt(40, 60));
                    } catch (BlockException e) {
                        block.incrementAndGet();
                        sleep(ThreadLocalRandom.current().nextInt(5, 10));
                    } finally {
                        total.incrementAndGet();
                        if (entry != null) {
                            entry.exit();
                        }
                    }
                }
            });
            entryThread.setName("sentinel-simulate-traffic-task-" + i);
            entryThread.start();
        }
    }

    private static void registerStateChangeObserver() {
        EventObserverRegistry.getInstance().addStateChangeObserver("logging",
            (prevState, newState, rule, snapshotValue) -> {
                if (newState == State.OPEN) {
                    System.err.println(String.format("%s -> OPEN at %d, snapshotValue=%.2f", prevState.name(), TimeUtil.currentTimeMillis(), snapshotValue));
                } else {
                    System.err.println(String.format("%s -> %s at %d", prevState.name(), newState.name(), TimeUtil.currentTimeMillis()));
                }
            }
        );
    }

    private static void initDegradeRule() {
        List<DegradeRule> rules = new ArrayList<>();
        DegradeRule rule = new DegradeRule(KEY)
            .setGrade(CircuitBreakerStrategy.SLOW_REQUEST_RATIO.getType())//Max allowed response time
            .setCount(50)
            .setTimeWindow(10)//Retry timeout (in second)
            .setSlowRatioThreshold(0.6)//Circuit breaker opens when slow request ratio > 60%
            .setMinRequestAmount(100)
            .setStatIntervalMs(20000);
        rules.add(rule);
        DegradeRuleManager.loadRules(rules);
        System.out.println("Degrade rule loaded: " + rules);
    }

    private static void sleep(int timeMs) {
        try {
            TimeUnit.MILLISECONDS.sleep(timeMs);
        } catch (InterruptedException e) {
            // ignore
        }
    }

    private static void startTick() {
        Thread timer = new Thread(new TimerTask());
        timer.setName("sentinel-timer-tick-task");
        timer.start();
    }

    static class TimerTask implements Runnable {
        @Override
        public void run() {
            long start = System.currentTimeMillis();
            System.out.println("Begin to run! Go go go!");
            System.out.println("See corresponding metrics.log for accurate statistic data");

            long oldTotal = 0;
            long oldPass = 0;
            long oldBlock = 0;

            while (!stop) {
                sleep(1000);

                long globalTotal = total.get();
                long oneSecondTotal = globalTotal - oldTotal;
                oldTotal = globalTotal;

                long globalPass = pass.get();
                long oneSecondPass = globalPass - oldPass;
                oldPass = globalPass;

                long globalBlock = block.get();
                long oneSecondBlock = globalBlock - oldBlock;
                oldBlock = globalBlock;

                System.out.println(TimeUtil.currentTimeMillis() + ", total:" + oneSecondTotal + ", pass:" + oneSecondPass + ", block:" + oneSecondBlock);
                if (seconds-- <= 0) {
                    stop = true;
                }
            }

            long cost = System.currentTimeMillis() - start;
            System.out.println("time cost: " + cost + " ms");
            System.out.println("total: " + total.get() + ", pass:" + pass.get() + ", block:" + block.get());
            System.exit(0);
        }
    }
}

(2)注册熔断降级监听器和加载熔断降级规则

一.Sentinel监听器模式的核心代码回顾

二.注册熔断降级监听器和加载熔断降级规则

三.用于实现Sentinel熔断降级功能的熔断器接口

一.Sentinel监听器模式的核心代码回顾

Sentinel监听器模式会包含三大角色:

角色一:监听器PropertyListener

角色二:监听器管理器SentinelProperty

角色三:规则管理器RuleManager

首先,规则管理器RuleManager在初始化时,会调用监听器管理器SentinelProperty的addListener()方法将监听器PropertyListener注册到监听器管理器SentinelProperty上。

然后,使用方使用具体的规则时,可以通过调用规则管理器RuleManager的loadRules()方法加载规则。加载规则时会调用监听器管理器SentinelProperty的updateValue()方法通知每一个监听器PropertyListener,即通过监听器PropertyListener的configUpdate()方法把规则加载到规则管理器的本地中。

二.注册熔断降级监听器和加载熔断降级规则

DegradeRuleManager中有两个全局的HashMap:一个是用于存放资源和熔断器的对应关系的HashMap—circuitBreakers,另一个是用于存放资源和熔断规则的对应关系的HashMap—ruleMap。

其中熔断器是由熔断策略DegradeRule.grade来决定的。如果熔断策略是慢调用比例,则熔断器是ResponseTimeCircuitBreaker。如果熔断策略是异常比例和异常数,则熔断器是ExceptionCircuitBreaker。

public class DynamicSentinelProperty<T> implements SentinelProperty<T> {
    protected Set<PropertyListener<T>> listeners = new CopyOnWriteArraySet<>();
    private T value = null;
    ...
    //添加监听器到集合
    @Override
    public void addListener(PropertyListener<T> listener) {
        listeners.add(listener);
        //回调监听器的configLoad()方法初始化规则配置
        listener.configLoad(value);
    }
    
    //更新值
    @Override
    public boolean updateValue(T newValue) {
        //如果值没变化,直接返回
        if (isEqual(value, newValue)) {
            return false;
        }
        RecordLog.info("[DynamicSentinelProperty] Config will be updated to: {}", newValue);

        //如果值发生了变化,则遍历监听器,回调监听器的configUpdate()方法更新对应的值
        value = newValue;
        for (PropertyListener<T> listener : listeners) {
            listener.configUpdate(newValue);
        }
        return true;
    }
    ...
}

public final class DegradeRuleManager {
    //用于存放资源和熔断器的对应关系的HashMap,其中熔断器是由熔断策略DegradeRule.grade来决定的
    private static volatile Map<String, List<CircuitBreaker>> circuitBreakers = new HashMap<>();
    //用于存放资源和熔断规则的对应关系的HashMap
    private static volatile Map<String, Set<DegradeRule>> ruleMap = new HashMap<>();
    private static final RulePropertyListener LISTENER = new RulePropertyListener();
    private static SentinelProperty<List<DegradeRule>> currentProperty = new DynamicSentinelProperty<>();

    static {
        currentProperty.addListener(LISTENER);
    }
    ...
    
    private static class RulePropertyListener implements PropertyListener<List<DegradeRule>> {
        @Override
        public void configUpdate(List<DegradeRule> conf) {
            reloadFrom(conf);
            RecordLog.info("[DegradeRuleManager] Degrade rules has been updated to: {}", ruleMap);
        }
       
        @Override
        public void configLoad(List<DegradeRule> conf) {
            reloadFrom(conf);
            RecordLog.info("[DegradeRuleManager] Degrade rules loaded: {}", ruleMap);
        }
       
        private synchronized void reloadFrom(List<DegradeRule> list) {
            //构建熔断器
            Map<String, List<CircuitBreaker>> cbs = buildCircuitBreakers(list);
            Map<String, Set<DegradeRule>> rm = new HashMap<>(cbs.size());
            for (Map.Entry<String, List<CircuitBreaker>> e : cbs.entrySet()) {
                assert e.getValue() != null && !e.getValue().isEmpty();
                Set<DegradeRule> rules = new HashSet<>(e.getValue().size());
                for (CircuitBreaker cb : e.getValue()) {
                    rules.add(cb.getRule());
                }
                rm.put(e.getKey(), rules);
            }
            DegradeRuleManager.circuitBreakers = cbs;
            DegradeRuleManager.ruleMap = rm;
        }

        private Map<String, List<CircuitBreaker>> buildCircuitBreakers(List<DegradeRule> list) {
            Map<String, List<CircuitBreaker>> cbMap = new HashMap<>(8);
            if (list == null || list.isEmpty()) {
                return cbMap;
            }
            for (DegradeRule rule : list) {
                if (!isValidRule(rule)) {
                    RecordLog.warn("[DegradeRuleManager] Ignoring invalid rule when loading new rules: {}", rule);
                    continue;
                }
                if (StringUtil.isBlank(rule.getLimitApp())) {
                    rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
                }
                CircuitBreaker cb = getExistingSameCbOrNew(rule);
                if (cb == null) {
                    RecordLog.warn("[DegradeRuleManager] Unknown circuit breaking strategy, ignoring: {}", rule);
                    continue;
                }
                String resourceName = rule.getResource();
                List<CircuitBreaker> cbList = cbMap.get(resourceName);
                if (cbList == null) {
                    cbList = new ArrayList<>();
                    cbMap.put(resourceName, cbList);
                }
                cbList.add(cb);
            }
            return cbMap;
        }
    }

    private static CircuitBreaker getExistingSameCbOrNew(DegradeRule rule) {
        List<CircuitBreaker> cbs = getCircuitBreakers(rule.getResource());
        if (cbs == null || cbs.isEmpty()) {
            return newCircuitBreakerFrom(rule);
        }
        for (CircuitBreaker cb : cbs) {
            if (rule.equals(cb.getRule())) {
                //Reuse the circuit breaker if the rule remains unchanged.
                return cb;
            }
        }
        return newCircuitBreakerFrom(rule);
    }
    
    static List<CircuitBreaker> getCircuitBreakers(String resourceName) {
        return circuitBreakers.get(resourceName);
    }
    
    //Create a circuit breaker instance from provided circuit breaking rule.
    private static CircuitBreaker newCircuitBreakerFrom(DegradeRule rule) {
        switch (rule.getGrade()) {
            case RuleConstant.DEGRADE_GRADE_RT:
                return new ResponseTimeCircuitBreaker(rule);
            case RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO:
            case RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT:
                return new ExceptionCircuitBreaker(rule);
            default:
                return null;
        }
    }
}

三.用于实现Sentinel熔断降级功能的熔断器接口

进行熔断降级规则验证时,就会根据熔断降级规则的熔断策略,选择对应的熔断器CircuitBreaker。再通过熔断器CircuitBreaker的tryPass()接口,尝试通过当前请求。

//熔断器接口,用于实现Sentinel的熔断降级功能
public interface CircuitBreaker {
    //Get the associated circuit breaking rule.
    //获取当前熔断器对应的熔断降级规则
    DegradeRule getRule();

    //Acquires permission of an invocation only if it is available at the time of invoking.
    //尝试通过熔断器
    //如果熔断器处于关闭状态(CLOSED),则允许请求通过;
    //如果处于打开状态(OPEN),则拒绝请求;
    //如果处于半开状态(HALF_OPEN),则根据规则允许部分请求通过;
    boolean tryPass(Context context);

    //Get current state of the circuit breaker.
    //获取当前熔断器的状态(OPEN, HALF_OPEN, CLOSED)
    State currentState();

    //Record a completed request with the context and handle state transformation of the circuit breaker.
    //Called when a passed invocation finished.
    //在请求完成后调用此方法,用于更新熔断器的统计数据
    void onRequestComplete(Context context);

    //Circuit breaker state.
    enum State {
        //In OPEN state, all requests will be rejected until the next recovery time point.
        //表示熔断器处于打开状态,此时会拒绝所有请求
        OPEN,
       
        //In HALF_OPEN state, the circuit breaker will allow a "probe" invocation.
        //If the invocation is abnormal according to the strategy (e.g. it's slow), 
        //the circuit breaker will re-transform to the OPEN state and wait for the next recovery time point;
        //otherwise the resource will be regarded as "recovered" and the circuit breaker will cease cutting off requests and transform to CLOSED state. 
        //表示熔断器处于半开状态,此时允许部分请求通过,以检测系统是否已经恢复正常
        HALF_OPEN,
        
        //In CLOSED state, all requests are permitted. 
        //When current metric value exceeds the threshold, the circuit breaker will transform to OPEN state.
        //表示熔断器处于关闭状态,此时允许所有请求通过
        CLOSED
    }
}

public class ResponseTimeCircuitBreaker extends AbstractCircuitBreaker {
    ...
    ...
}

public class ExceptionCircuitBreaker extends AbstractCircuitBreaker {
    ...
    ...
}

(3)DegradeSlot根据熔断降级规则对请求进行验证

一.entry()方法会对请求进行熔断降级的规则验证

二.exit()方法会触发改变熔断器的状态

开始对请求进行规则验证时,需要调用SphU的entry()方法。完成对请求的规则验证后,也需要调用Entry的exit()方法。

一.entry()方法会对请求进行熔断降级的规则验证

在DegradeSlot的entry()方法中,执行熔断降级规则验证的是DegradeSlot的performChecking()方法。该方法首先会根据资源名称从DegradeRuleManager中获取熔断器,然后调用每个熔断器的tryPass()方法判断熔断器开关是否已打开来验证。如果验证通过,则返回true表示放行请求。如果验证不通过,则返回false表示拦截请求。

在判断熔断器开关是否已打开的AbstractCircuitBreaker的tryPass()方法中,首先会判断熔断器是否是关闭状态。如果是关闭状态,则代表没打开熔断器,于是会直接返回true放行请求。如果是打开状态,则要继续判断当前请求是否已达熔断器恢复时间。如果当前请求已达熔断器恢复时间,也就是当前时间大于下次尝试恢复的时间,且成功将熔断器状态从OPEN变为HALF_OPEN,则放行请求,否则拒绝。

@Spi(order = Constants.ORDER_DEGRADE_SLOT)
public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
        //验证熔断规则的逻辑
        performChecking(context, resourceWrapper);
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    void performChecking(Context context, ResourceWrapper r) throws BlockException {
        //先根据资源名称获取对应的熔断器,也就是从DegradeRuleManager中的Map<String, List<CircuitBreaker>>类型的circuitBreakers获取
        List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
        if (circuitBreakers == null || circuitBreakers.isEmpty()) {
            return;
        }
        //调用每个熔断器的tryPass()方法验证当前请求是否可以通过
        for (CircuitBreaker cb : circuitBreakers) {
            if (!cb.tryPass(context)) {
                throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
            }
        }
    }
    ...
}

public abstract class AbstractCircuitBreaker implements CircuitBreaker {
    protected final DegradeRule rule;
    private final EventObserverRegistry observerRegistry;
    //熔断器当前的开关状态
    protected final AtomicReference<State> currentState = new AtomicReference<>(State.CLOSED);
    //下一次尝试恢复的时间
    protected volatile long nextRetryTimestamp;
    ...
    
    @Override
    public boolean tryPass(Context context) {
        //首先判断熔断器是否是关闭状态
        //如果是关闭状态则代表根本没打开熔断器,也就不涉及熔断了,因此直接返回true放行当前请求
        if (currentState.get() == State.CLOSED) {
            return true;
        }
        //如果熔断器是打开状态,那么就要进行如下逻辑判断
        if (currentState.get() == State.OPEN) {
            //如果当前系统时间大于等于下一次尝试恢复的时间,即已到达可尝试恢复的时间且成功设置当前熔断器状态为半开启,则可以放行当前请求
            //也就是如果此次请求已达到了熔断器恢复时间,并且将熔断器的状态从打开变为半开启,则放行,反之拒绝
            return retryTimeoutArrived() && fromOpenToHalfOpen(context);
        }
        return false;
    }
    
    protected boolean retryTimeoutArrived() {
        //当前时间是否大于下一次尝试恢复的时间
        return TimeUtil.currentTimeMillis() >= nextRetryTimestamp;
    }
    
    protected boolean fromOpenToHalfOpen(Context context) {
        //将当前熔断器的状态从OPEN变为HALF_OPEN
        if (currentState.compareAndSet(State.OPEN, State.HALF_OPEN)) {
            //通过观察者模式通知各个观察者
            notifyObservers(State.OPEN, State.HALF_OPEN, null);
            ...
            return true;
        }
        return false;
    }
    
    private void notifyObservers(CircuitBreaker.State prevState, CircuitBreaker.State newState, Double snapshotValue) {
        for (CircuitBreakerStateChangeObserver observer : observerRegistry.getStateChangeObservers()) {
            observer.onStateChange(prevState, newState, rule, snapshotValue);
        }
    }
    ...
}

二.exit()方法会触发改变熔断器的状态

问题一:熔断器的tryPass()方法一开始就会判断熔断器的状态,那么熔断器何时打开、何时关闭?

答:这个会在配置熔断降级规则DegradeRule时指定,如下图就指定了:如果最近10000ms内,10个请求中有2个是异常的,则触发熔断。

问题二:如果熔断器状态为打开,判断时会比较下一次尝试恢复时间和当前时间。如果当前时间大于下一次尝试恢复的时间,意味着请求已超过熔断时间,即当前请求不再处于熔断时间段内,因此可以放行。那么下一次尝试恢复的时间nextRetryTimestamp会在何时更新?

异常数据的采集和下次恢复时间的更新会由DegradeSlot的exit()方法触发。因为开始对请求进行规则验证时,会调用SphU的entry()方法。但完成对请求的规则验证后,则会调用Entry的exit()方法,而Entry的exit()方法最终就会执行到DegradeSlot的exit()方法。

在DegradeSlot的exit()方法中,就会调用熔断器的onRequestComplete()方法来进行计数,并改变熔断器的状态。

比如在执行ExceptionCircuitBreaker的onRequestComplete()方法中,会先统计异常数errorCount和总请求数totalCount,然后根据熔断降级的规则判断是否达到打开或关闭熔断器的阈值,最后执行比如AbstractCircuitBreaker的transformToOpen()方法打开熔断器。

AbstractCircuitBreaker.transformToOpen()方法的主要工作是:首先将当前熔断器状态变更为OPEN,然后更新下一次尝试恢复时间nextRetryTimestamp,最后通过观察者设计模式通知各个观察者。

@Spi(order = Constants.ORDER_DEGRADE_SLOT)
public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
    ...
    @Override
    public void exit(Context context, ResourceWrapper r, int count, Object... args) {
        Entry curEntry = context.getCurEntry();
        if (curEntry.getBlockError() != null) {
            fireExit(context, r, count, args);
            return;
        }
        List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
        if (circuitBreakers == null || circuitBreakers.isEmpty()) {
            fireExit(context, r, count, args);
            return;
        }

        //如果没报错,那么就调用熔断器的onRequestComplete()方法来计数
        if (curEntry.getBlockError() == null) {
            // passed request
            for (CircuitBreaker circuitBreaker : circuitBreakers) {
                circuitBreaker.onRequestComplete(context);
            }
        }

        fireExit(context, r, count, args);
    }
    ...
}

public class ExceptionCircuitBreaker extends AbstractCircuitBreaker {
    ...
    @Override
    public void onRequestComplete(Context context) {
        Entry entry = context.getCurEntry();
        if (entry == null) {
            return;
        }
        Throwable error = entry.getError();
        //获取当前值
        SimpleErrorCounter counter = stat.currentWindow().value();
        //如果此次请求报错了,则将errorCount + 1
        if (error != null) {
            counter.getErrorCount().add(1);
        }
        //将totalCount总数 + 1,用于计算异常比例
        counter.getTotalCount().add(1);

        handleStateChangeWhenThresholdExceeded(error);
    }

    private void handleStateChangeWhenThresholdExceeded(Throwable error) {
        //如果当前熔断器已经打开了,则直接返回
        if (currentState.get() == State.OPEN) {
            return;
        }
        //如果当前熔断器是半开启状态
        if (currentState.get() == State.HALF_OPEN) {
            //如果本次请求没出现异常,则代表可以关闭熔断器了
            if (error == null) {
                //调用AbstractCircuitBreaker.fromHalfOpenToClose()关闭熔断器
                fromHalfOpenToClose();
            } else {
                //如果本次请求还是异常,就继续熔断
                //即调用AbstractCircuitBreaker.fromHalfOpenToOpen()方法打开熔断器
                fromHalfOpenToOpen(1.0d);
            }
            return;
        }
    
        List<SimpleErrorCounter> counters = stat.values();
        //异常数量
        long errCount = 0;
        //请求总数
        long totalCount = 0;
        for (SimpleErrorCounter counter : counters) {
            errCount += counter.errorCount.sum();
            totalCount += counter.totalCount.sum();
        }
        //如果请求总数没超过最小请求数,那直接放行
        if (totalCount < minRequestAmount) {
            return;
        }
        double curCount = errCount;
        //熔断策略为异常比例
        if (strategy == DEGRADE_GRADE_EXCEPTION_RATIO) {
            //计算百分比
            curCount = errCount * 1.0d / totalCount;
        }
        //当错误率或者错误数大于阈值,则打开熔断器
        if (curCount > threshold) {
            //调用AbstractCircuitBreaker.transformToOpen()方法打开熔断器
            transformToOpen(curCount);
        }
    }
    ...
}

public abstract class AbstractCircuitBreaker implements CircuitBreaker {
    private final EventObserverRegistry observerRegistry;
    //熔断器当前的开关状态
    protected final AtomicReference<State> currentState = new AtomicReference<>(State.CLOSED);
    //下一次尝试恢复的时间
    protected volatile long nextRetryTimestamp;
    ...
    
    protected void transformToOpen(double triggerValue) {
        State cs = currentState.get();
        switch (cs) {
            case CLOSED:
                fromCloseToOpen(triggerValue);
                break;
            case HALF_OPEN:
                fromHalfOpenToOpen(triggerValue);
                break;
            default:
                break;
        }
    }
    
    protected boolean fromHalfOpenToClose() {
        if (currentState.compareAndSet(State.HALF_OPEN, State.CLOSED)) {
            resetStat();
            notifyObservers(State.HALF_OPEN, State.CLOSED, null);
            return true;
        }
        return false;
    }
    
    protected boolean fromHalfOpenToOpen(double snapshotValue) {
        if (currentState.compareAndSet(State.HALF_OPEN, State.OPEN)) {
            updateNextRetryTimestamp();
            notifyObservers(State.HALF_OPEN, State.OPEN, snapshotValue);
            return true;
        }
        return false;
    }
    
    private void notifyObservers(CircuitBreaker.State prevState, CircuitBreaker.State newState, Double snapshotValue) {
        for (CircuitBreakerStateChangeObserver observer : observerRegistry.getStateChangeObservers()) {
            observer.onStateChange(prevState, newState, rule, snapshotValue);
        }
    }
    ...
}

(4)总结

一.Sentinel熔断降级的两种熔断策略

二.Sentinel熔断降级的流程

三.熔断器的三个状态

四.熔断器三个状态之间的流转过程

一.Sentinel熔断降级的两种熔断策略

策略一:异常熔断器ExceptionCircuitBreaker

异常熔断器关注错误数量、错误比例,其核心功能是在请求结束时更新计数器,统计异常数和总请求数。当达到阈值时,熔断器的状态从CLOSED变为OPEN。

策略二:慢调用比例熔断器ResponseTimeCircuitBreaker

慢调用比例熔断器关注响应时间RT。它计算请求结束时间与请求开始时间的差值,然后与用户设置的阈值比较。若达到阈值,熔断器状态将从CLOSED变为OPEN。

二.Sentinel熔断降级的流程

步骤一:计数

比如统计错误数、响应时间rt。

步骤二:对比

将计数结果和用户设置的熔断阈值做对比,如达到阈值则打开熔断器。

步骤三:验证

请求进来时就可以直接判断熔断器是否是打开状态。如果是打开状态,则直接拒绝请求。如果是关闭状态,则直接放行请求。如果是半打开状态,则进行二次验证,看看是否能放行请求。

三.熔断器的三个状态

状态一:CLOSED,关闭状态

当熔断器处于CLOSED状态时,表示系统正常运行,没有发生熔断。此时,熔断器会对请求进行正常计数和统计。如果统计结果表明异常数/比例或者慢调用比例超过了预设阈值,熔断器将切换至OPEN状态,触发熔断。

状态二:OPEN,打开状态

当熔断器处于OPEN状态时,系统进入熔断状态。在这个状态下,熔断器会拒绝所有新的请求,直接返回预定义的降级策略。在熔断器打开一段时间后(通常由用户设置),熔断器会尝试从OPEN状态切换到HALF_OPEN状态,看系统是否已恢复。

状态三:HALF_OPEN,半开启状态

当熔断器处于HALF_OPEN状态时,系统将允许有限数量的请求通过。如果这些请求成功,熔断器将认为系统已经恢复,然后切回CLOSED状态。如果这些请求异常,熔断器会认为系统未恢复,切回OPEN状态继续熔断。

四.熔断器三个状态之间的流转过程

过程一:从CLOSED到OPEN

当异常数/比例或慢调用比例超过阈值时。

过程二:从OPEN到HALF_OPEN

在熔断器打开一段时间后,尝试恢复系统。

过程三:从HALF_OPEN到CLOSED

当允许的有限数量请求成功时。

过程四:从HALF_OPEN到OPEN

当允许的有限数量请求仍然出现异常时,HALF_OPEN就好比一个中间态。

这种流转机制确保了在系统出现问题时,熔断器能够自动进行熔断保护,同时在系统恢复后能够及时恢复正常运行。

2.Sentinel数据指标统计的滑动窗口算法

(1)滑动窗口介绍

(2)StatisticSlot使用滑动窗口算法进行数据统计

(1)滑动窗口介绍

一.滑动窗口原理

滑动窗口不会指定固定的时间窗口起点与终点,而是将处理请求的时间点作为该请求对应时间窗口的终点,起点则是向前距离该终点一个时间窗口长度的时间点。

二.滑动窗口的性能问题(样本窗口解决)

由于每到来一个请求,就会移动一下统计的时间窗口。如果先后到来的两个请求的相隔时间在一个时间窗口长度之内,那么分别在这个两个请求对应的时间窗口下统计请求数时就会出现重复计数的问题。如果在一个时间窗口长度内出现了大量的请求,则会进行大量重复计算从而浪费资源。

为了解决这个问题,就需要更细粒度的计算,比如引入样本窗口。样本窗口的长度会小于滑动窗口的长度,通常滑动窗口的长度是样本窗口的整数倍。每个样本窗口在到达终点时间时,会统计其中的请求数并进行记录。这样统计请求对应的时间窗口的请求数时,就可复用样本窗口的数据了。

所以,通过多个样本窗口组成滑动窗口,可以解决滑动窗口的性能问题。

(2)StatisticSlot使用滑动窗口算法进行数据统计

一.StatisticNode为了实现统计数据而进行的设计

二.LeapArray实现滑动窗口算法的数据统计逻辑

一.StatisticNode为了实现统计数据而进行的设计

首先StatisticSlot的entry()方法会调用DefaultNode的addPassRequest()方法,接着DefaultNode的addPassRequest()方法又会调用StatisticNode的addPassRequest()方法,而StatisticNode的addPassRequest()方法便会通过使用滑动窗口算法来统计数据。

StatisticNode中会定义一个用来保存数据的ArrayMetric对象。创建该对象时默认就指定了样本窗口数量为2,时间窗口长度为1000ms。其中,ArrayMetric对象中的data属性会真正用来存储数据,而ArrayMetric对象中的data属性则是一个LeapArray对象。

在LeapArray对象中会详细记录:样本窗口长度、样本窗口数量、滑动窗口长度、样本窗口数组。LeapArray的array属性便是用来统计并保存数据的WindowWrap数组,WindowWrap数组也就是样本窗口数组。

WindowWrap有一个巧妙的设计:就是使用LongAdder数组而不是用LongAdder来存储统计数据。由于统计的数据是多维度的,且MetricEvent枚举类定义了这些维度类型,因此将MetricEvent维度类型枚举值对应的序号映射成数组索引,可以巧妙地将多维度的数据存储到LongAdder数组中。

@Spi(order = Constants.ORDER_STATISTIC_SLOT)
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
        ...
        //执行下一个ProcessorSlot,先进行规则验证等
        fireEntry(context, resourceWrapper, node, count, prioritized, args);

        //如果通过了后面ProcessorSlot的验证
        //则将处理当前资源resourceWrapper的线程数+1 以及 将对当前资源resourceWrapper的成功请求数+1
        node.increaseThreadNum();
        node.addPassRequest(count);
        ...
    }
}

//A Node used to hold statistics for specific resource name in the specific context.
//Each distinct resource in each distinct Context will corresponding to a DefaultNode.
//This class may have a list of sub DefaultNodes. 
//Child nodes will be created when calling SphU.entry() or SphO.entry() multiple times in the same Context.
public class DefaultNode extends StatisticNode {
    //Associated cluster node.
    private ClusterNode clusterNode;
    ...
    
    //DefaultNode会统计名字相同的Context下的某个资源的调用数据,按照单机里的资源维度进行调用数据统计
    //EntranceNode会统计名字相同的Context下的全部资源的调用数据,按接口维度来统计调用数据,即统计接口下所有资源的调用情况
    //ClusterNode会统计某个资源在全部Context下的调用数据,按照集群中的资源维度进行调用数据统计
    @Override
    public void addPassRequest(int count) {
        //增加当前资源对应的DefaultNode中的数据
        super.addPassRequest(count);
        //增加当前资源对应的ClusterNode中的全局统计数据
        this.clusterNode.addPassRequest(count);
    }
    ...
}

//The statistic node keep three kinds of real-time statistics metrics:
//1.metrics in second level rollingCounterInSecond
//2.metrics in minute level rollingCounterInMinute
//3.thread count

//Sentinel use sliding window to record and count the resource statistics in real-time.
//The sliding window infrastructure behind the ArrayMetric is LeapArray.

//case 1: When the first request comes in, 
//Sentinel will create a new window bucket of a specified time-span to store running statics, 
//such as total response time(rt), incoming request(QPS), block request(bq), etc. 
//And the time-span is defined by sample count.
//     0      100ms
//  +-------+--→ Sliding Windows
//         ^
//         |
//       request
//Sentinel use the statics of the valid buckets to decide whether this request can be passed.
//For example, if a rule defines that only 100 requests can be passed,
//it will sum all qps in valid buckets, and compare it to the threshold defined in rule.

//case 2: continuous requests
//  0    100ms    200ms    300ms
//  +-------+-------+-------+-----→ Sliding Windows
//                      ^
//                      |
//                   request

//case 3: requests keeps coming, and previous buckets become invalid
//  0    100ms    200ms      800ms       900ms  1000ms    1300ms
//  +-------+-------+ ...... +-------+-------+ ...... +-------+-----→ Sliding Windows
//                                                      ^
//                                                      |
//                                                    request

//The sliding window should become:
// 300ms     800ms  900ms  1000ms  1300ms
//  + ...... +-------+ ...... +-------+-----→ Sliding Windows
//                                                      ^
//                                                      |
//                                                    request
public class StatisticNode implements Node {
    //Holds statistics of the recent INTERVAL milliseconds.
    //The INTERVAL is divided into time spans by given sampleCount.
    //定义一个保存数据的ArrayMetric,指定了样本窗口数量默认为2(SAMPLE_COUNT),指定了时间窗口长度默认为1000ms(INTERVAL)
    private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL);
    
    //Holds statistics of the recent 60 seconds. 
    //The windowLengthInMs is deliberately set to 1000 milliseconds,
    //meaning each bucket per second, in this way we can get accurate statistics of each second.
    private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
    ...
    @Override
    public void addPassRequest(int count) {
        //调用ArrayMetric.addPass()方法,根据当前请求增加计数
        rollingCounterInSecond.addPass(count);
        rollingCounterInMinute.addPass(count);
    }
    ...
}

//The basic metric class in Sentinel using a BucketLeapArray internal.
public class ArrayMetric implements Metric {
    //用于存储统计数据
    private final LeapArray<MetricBucket> data;
    ...
    
    @Override
    public void addPass(int count) {
        //1.通过LeapArray.currentWindow()方法获取当前时间所在的样本窗口
        WindowWrap<MetricBucket> wrap = data.currentWindow();
        //2.调用MetricBucket.addPass()方法将当前请求的计数量添加到样本窗口的统计数据中
        wrap.value().addPass(count);
    }
    ...
}

//Basic data structure for statistic metrics in Sentinel.
//Leap array use sliding window algorithm to count data. 
//Each bucket cover windowLengthInMs time span, and the total time span is intervalInMs, 
//so the total bucket amount is: sampleCount = intervalInMs / windowLengthInMs.
public abstract class LeapArray<T> {
    //样本窗口的长度
    protected int windowLengthInMs;
    //一个滑动窗口包含的样本窗口数量,公式 intervalInMs / windowLengthInMs,也就是滑动窗口长度 / 样本窗口长度
    protected int sampleCount;
    //滑动窗口长度
    protected int intervalInMs;
    //也是滑动窗口长度,只是单位为s
    private double intervalInSecond;
    //WindowWrap是样本窗口类,它是一个数组,泛型T实际类型为MetricBucket
    //LeapArray类似于一个样本窗口管理类,而真正的样本窗口类是WindowWrap<T>
    protected final AtomicReferenceArray<WindowWrap<T>> array;

    //The total bucket count is: sampleCount = intervalInMs / windowLengthInMs.
    //@param sampleCount  bucket count of the sliding window
    //@param intervalInMs the total time interval of this LeapArray in milliseconds
    public LeapArray(int sampleCount, int intervalInMs) {
        ...
        this.windowLengthInMs = intervalInMs / sampleCount;//默认为500ms
        this.intervalInMs = intervalInMs;//默认为1000ms
        this.intervalInSecond = intervalInMs / 1000.0;//默认为1
        this.sampleCount = sampleCount;//默认为2
        this.array = new AtomicReferenceArray<>(sampleCount);
    }

    //Get the bucket at current timestamp.
    //获取当前时间点所在的样本窗口
    public WindowWrap<T> currentWindow() {
        return currentWindow(TimeUtil.currentTimeMillis());
    }
    ...
}

//Wrapper entity class for a period of time window.
//样本窗口类,泛型T比如是MetricBucket
public class WindowWrap<T> {
    //Time length of a single window bucket in milliseconds.
    //单个样本窗口的长度
    private final long windowLengthInMs;
    //Start timestamp of the window in milliseconds.
    //样本窗口的起始时间戳
    private long windowStart;
    //Statistic data.
    //当前样本窗口的统计数据,类型为MetricBucket
    private T value;
    ...
    //返回比如MetricBucket对象
    public T value() {
        return value;
    }
}

//Represents metrics data in a period of time span.
//统计数据的封装类
public class MetricBucket {
    //统计的数据会存放在LongAdder数组里
    //使用数组而不直接使用"LongAdder+1"是因为:
    //由于统计的数据是多维度的,并且MetricEvent枚举类定义了这些维度类型
    //因此将MetricEvent维度类型枚举值对应的序号映射成数组索引,巧妙地将多维度的数据定义在LongAdder数组中
    private final LongAdder[] counters;
    private volatile long minRt;
    
    public MetricBucket() {
        MetricEvent[] events = MetricEvent.values();
        this.counters = new LongAdder[events.length];
        for (MetricEvent event : events) {
            counters[event.ordinal()] = new LongAdder();
        }
        initMinRt();
    }
    
    private void initMinRt() {
        this.minRt = SentinelConfig.statisticMaxRt();
    }
    
    public void addPass(int n) {
        add(MetricEvent.PASS, n);
    }
    
    public MetricBucket add(MetricEvent event, long n) {
        //统计数据并存储到counters中
        counters[event.ordinal()].add(n);
        return this;
    }
    ...
}

public enum MetricEvent {
    PASS,
    BLOCK,
    EXCEPTION,
    SUCCESS,
    RT,
    OCCUPIED_PASS
}

二.LeapArray实现滑动窗口算法的数据统计逻辑

调用ArrayMetric的addPass()进行数据统计的逻辑如下:首先通过LeapArray的currentWindow()方法获取当前时间所在的样本窗口,然后调用MetricBucket的addPass()方法统计并存储数据到样本窗口中。

LeapArray的currentWindow()方法获取当前时间所在的样本窗口的逻辑为:

情况一:如果当前时间所在的样本窗口如果还没创建,则需要初始化。

情况二:如果当前样本窗口的起始时间与计算出的样本窗口起始时间相同,则说明这两个是同一个样本窗口,直接获取就行。

情况三:如果当前样本窗口的起始时间大于计算出的样本窗口起始时间,则说明计算出的样本窗口已过时,要将原来的样本窗口替换为新样本窗口。注意LeapArray.array数组是一个环形数组。

情况四:如果当前样本窗口的起始时间小于计算出的样本窗口起始时间,一般不出现,因为时间不会倒流,除非人为修改系统时间导致时钟回拨。

public abstract class LeapArray<T> {
    //样本窗口的长度
    protected int windowLengthInMs;
    //一个滑动窗口包含的样本窗口数量,公式 intervalInMs / windowLengthInMs,也就是滑动窗口长度 / 样本窗口长度
    protected int sampleCount;
    //滑动窗口长度
    protected int intervalInMs;
    //也是滑动窗口长度,只是单位为s
    private double intervalInSecond;
    //WindowWrap是样本窗口类,它是一个数组,泛型T实际类型为MetricBucket
    //LeapArray类似于一个样本窗口管理类,而真正的样本窗口类是WindowWrap<T>
    protected final AtomicReferenceArray<WindowWrap<T>> array;
    ...
    //假设timeMillis = 1600,windowLengthInMs = 500,array.length = 2,那么timeId = 3,返回1
    private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
        long timeId = timeMillis / windowLengthInMs;
        //Calculate current index so we can map the timestamp to the leap array.
        return (int)(timeId % array.length());
    }

    //假设timeMillis = 1600,windowLengthInMs = 500,那么返回1500
    protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
        return timeMillis - timeMillis % windowLengthInMs;
    }

    //Get bucket item at provided timestamp.
    public WindowWrap<T> currentWindow(long timeMillis) {
        if (timeMillis < 0) {
            return null;
        }

        //计算当前时间所在的样本窗口id,也就是样本窗口的下标,即计算在数组LeapArray中的下标
        int idx = calculateTimeIdx(timeMillis);

        //Calculate current bucket start time.
        //计算当前样本窗口的开始时间点
        long windowStart = calculateWindowStart(timeMillis);

        //Get bucket item at given time from the array.
        //(1) Bucket is absent, then just create a new bucket and CAS update to circular array.
        //(2) Bucket is up-to-date, then just return the bucket.
        //(3) Bucket is deprecated, then reset current bucket.
        while (true) {
            //获取当前时间所在的样本窗口
            WindowWrap<T> old = array.get(idx);

            //如果当前时间所在的样本窗口为null,则需要创建
            if (old == null) {
                //创建一个时间窗口
                //     B0       B1      B2    NULL      B4
                // ||_______|_______|_______|_______|_______||___
                // 200     400     600     800     1000    1200  timestamp
                //                             ^
                //                          time=888
                //            bucket is empty, so create new and update
                //If the old bucket is absent, then we create a new bucket at windowStart,
                //then try to update circular array via a CAS operation. 
                //Only one thread can succeed to update, while other threads yield its time slice.
                WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
                //通过CAS将新创建的窗口放入到LeapArray中
                if (array.compareAndSet(idx, null, window)) {
                    //Successfully updated, return the created bucket.
                    return window;
                } else {
                    //Contention failed, the thread will yield its time slice to wait for bucket available.
                    Thread.yield();
                }
            }
            //如果当前样本窗口的起始时间与计算出的样本窗口起始时间相同,则说明这两个是同一个样本窗口,直接获取就行
            else if (windowStart == old.windowStart()) {
                //     B0       B1      B2     B3      B4
                // ||_______|_______|_______|_______|_______||___
                // 200     400     600     800     1000    1200  timestamp
                //                             ^
                //                          time=888
                //            startTime of Bucket 3: 800, so it's up-to-date
                //If current windowStart is equal to the start timestamp of old bucket,
                //that means the time is within the bucket, so directly return the bucket.
                return old;
            }
            //如果当前样本窗口的起始时间大于计算出的样本窗口起始时间,则说明计算出来的样本窗口已经过时了,需要将原来的样本窗口替换为新的样本窗口
            //数组的环形数组,不是无限长的,比如存1s,1000个样本窗口,那么下1s的1000个时间窗口会覆盖上一秒的
            else if (windowStart > old.windowStart()) {
                //   (old)
                //             B0       B1      B2    NULL      B4
                // |_______||_______|_______|_______|_______|_______||___
                // ...    1200     1400    1600    1800    2000    2200  timestamp
                //                              ^
                //                           time=1676
                //          startTime of Bucket 2: 400, deprecated, should be reset
                //If the start timestamp of old bucket is behind provided time, that means the bucket is deprecated. 
                //We have to reset the bucket to current windowStart.
                //Note that the reset and clean-up operations are hard to be atomic,
                //so we need a update lock to guarantee the correctness of bucket update.
                //The update lock is conditional (tiny scope) and will take effect only when bucket is deprecated, 
                //so in most cases it won't lead to performance loss.
                if (updateLock.tryLock()) {
                    try {
                        //Successfully get the update lock, now we reset the bucket.
                        //替换老的样本窗口
                        return resetWindowTo(old, windowStart);
                    } finally {
                        updateLock.unlock();
                    }
                } else {
                    //Contention failed, the thread will yield its time slice to wait for bucket available.
                    Thread.yield();
                }
            }
            //如果当前样本窗口的起始时间小于计算出的样本窗口起始时间
            //这种情况一般不会出现,因为时间不会倒流,除非人为修改系统时间导致时钟回拨
            else if (windowStart < old.windowStart()) {
                //Should not go through here, as the provided time is already behind.
                return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
            }
        }
    }
    ...
}

后端技术栈的基础修养 文章被收录于专栏

详细介绍后端技术栈的基础内容,包括但不限于:MySQL原理和优化、Redis原理和应用、JVM和G1原理和优化、RocketMQ原理应用及源码、Kafka原理应用及源码、ElasticSearch原理应用及源码、JUC源码、Netty源码、zk源码、Dubbo源码、Spring源码、Spring Boot源码、SCA源码、分布式锁源码、分布式事务、分库分表和TiDB、大型商品系统、大型订单系统等

全部评论

相关推荐

评论
2
3
分享

创作者周榜

更多
牛客网
牛客企业服务