Sentinel源码—2.Context和处理链的初始化

大纲

1.Sentinel底层的核心概念

2.Sentinel中Context的设计思想与源码实现

3.Java SPI机制的引入

4.Java SPI机制在Sentinel处理链中的应用

5.Sentinel默认处理链ProcessorSlot的构建

1.Sentinel底层的核心概念

(1)资源和规则

(2)Context

(3)ProcessorSlot

(4)Node

(1)资源和规则

一.什么是资源、规则和Entry

二.使用责任链模式 + 过滤器来收集资源的调用数据

三.如何设计Entry资源访问类

四.如何设计Entry资源访问类的子类

一.什么是资源、规则和Entry

资源:可以是一个方法、一个接口或一段代码。资源是被Sentinel保护和管理的对象。

规则:用来定义资源应遵循的约束条件。Sentinel支持多种规则类型,如流控规则、熔断降级规则等。

Entry:可以理解为是对资源的一次访问。

在Sentinel中,资源和规则是紧密相互关联的。Sentinel会根据配置的规则,对资源的调用进行控制,从而保证系统稳定。

二.使用责任链模式 + 过滤器来收集资源的调用数据

Sentinel要实现流控、熔断降级等功能,首先需要收集资源的调用数据。比如:QPS、请求失败次数、请求成功次数、线程数、响应时间等。

为了收集资源的这些调用数据,可以使用过滤器。将需要限制的资源配置在配置文件中,然后创建一个过滤器来拦截请求。当发现当前请求的接口已在配置文件中配置时,便需要进行数据收集。比如每次收到请求都将totalQps字段值 + 1,请求成功则再将successQps字段值 + 1。但使用一个过滤器是不够的,因为不仅有限制QPS的规则,也可能有限制异常比例的规则等。因此还需要使用责任链模式,让每一种规则都对应一个过滤器节点。这样拦截请求进行过滤时,便会形成一个过滤器处理链:开始 -> 流控过滤器 -> 黑白名单过滤器 -> 熔断降级过滤器 -> 业务 -> 结束。

三.如何设计Entry资源访问类

Entry可以理解为是对资源的一次访问。在使用责任链模式之前,需要先设计Entry资源访问类。

已知资源可以是一个方法、一个接口甚至一段代码,那么资源就肯定要有资源名称,比如方法名、接口名、其他自定义名称。因此,可以设计如下Entry资源访问类:

//资源访问类
public class Entry {
    //资源名称
    private final String name;
}

一个资源作为API提供给第三方调用时,此时它属于入口流量。一个资源也会主动请求第三方API,此时它属于出口流量。第三方调用自己时,需要限制QPS。自己调用第三方时,第三方也有QPS限制。所以需要一个EntryType类:

public enum EntryType {
    //入口流量,资源被调用
    IN,
    //出口流量,资源调用其他接口
    OUT;
}

然后,还需要一个ResourceTypeConstants类来记录资源的类型,表明资源是HTTP接口、RPC接口、还是Gateway网关服务等。

//资源类
public final class ResourceTypeConstants {
    //默认类型
    public static final int COMMON = 0;
    //Web类型,也就是最常见的HTTP类型
    public static final int COMMON_WEB = 1;
    //RPC类型,如Dubbo RPC,Grpc,Thrift等
    public static final int COMMON_RPC = 2;
    //API网关
    public static final int COMMON_API_GATEWAY = 3;
    //数据库SQL操作
    public static final int COMMON_DB_SQL = 4;
}

因此,一个资源起码有三个字段:名称(name)、资源类型(resourceType)以及请求类型(entryType)。将这三个字段包装成一个ResourceWrapper类,如下所示:

//资源类(资源包装类)
public class ResourceWrapper {
    //资源名称
    protected final String name;
    //资源类型:入口流量还是出口流量
    protected final EntryType entryType;
    //请求类型:HTTP类型、RPC类型、API网关
    protected final int resourceType;
    
    public ResourceWrapper(String name, EntryType entryType, int resourceType) {
        this.name = name;
        this.entryType = entryType;
        this.resourceType = resourceType;
    }
}

接着,将这个ResourceWrapper资源类放入到Entry资源访问类中:

public class Entry {
    //封装了:名称(name)、请求类型(entryType)以及资源类型(resourceType)三个字段
    protected final ResourceWrapper resourceWrapper;
    
    public Entry(ResourceWrapper resourceWrapper) {
        this.resourceWrapper = resourceWrapper;
    }
}

类图如下:

同时,还需记录资源访问的开始时间和完成时间:

public class Entry {
    //资源访问的开始时间
    private final long createTimestamp;
    //资源访问的完成时间
    private long completeTimestamp;
    //封装了:名称(name)、请求类型(entryType)以及资源类型(resourceType)三个字段
    protected final ResourceWrapper resourceWrapper;
    
    public Entry(ResourceWrapper resourceWrapper) {
        this.resourceWrapper = resourceWrapper;
        //给开始时间赋值为当前系统时间
        this.createTimestamp = TimeUtil.currentTimeMillis();
    }
}

然后,还需记录这个资源总的请求数、请求成功、请求失败等指标。可以将获取这些指标的方法放到一个interface里,然后通过interface提供的方法获取这些指标,可称这个interface为Node。

//用于统计资源的各项指标数据
public interface Node {
    //总的请求数
    long totalRequest();
    //请求成功数
    long totalSuccess();
    ...
}

于是,Entry资源访问类变成如下:

public class Entry {
    //资源访问的开始时间
    private final long createTimestamp;
    //资源访问的完成时间
    private long completeTimestamp;
    //统计资源的各项数据指标
    private Node curNode;
    //封装了:名称(name)、请求类型(entryType)以及资源类型(resourceType)三个字段
    protected final ResourceWrapper resourceWrapper;
    
    public Entry(ResourceWrapper resourceWrapper) {
        this.resourceWrapper = resourceWrapper;
        //给开始时间赋值为当前系统时间
        this.createTimestamp = TimeUtil.currentTimeMillis();
    }
}

此外,当触发预设规则的阈值时,系统将抛出警告提示。因此还要自定义一个异常类BlockException。所以,Entry资源访问类最终如下:

//Each SphU.entry() will return an Entry.
//This class holds information of current invocation:
//createTime, the create time of this entry, using for rt statistics.
//current Node, that is statistics of the resource in current context.
//origin Node, that is statistics for the specific origin. 
//Usually the origin could be the Service Consumer's app name, see ContextUtil.enter(String name, String origin).
//ResourceWrapper, that is resource name.

//A invocation tree will be created if we invoke SphU.entry() multi times in the same Context,
//so parent or child entry may be held by this to form the tree. 
//Since Context always holds the current entry in the invocation tree, 
//every Entry.exit() call should modify Context.setCurEntry(Entry) as parent entry of this.
public abstract class Entry {
    //资源访问的开始时间
    private final long createTimestamp;
    //资源访问的完成时间
    private long completeTimestamp;
    //统计资源的各项数据指标
    private Node curNode;
    //异常
    private BlockException blockError;
    //封装了:名称(name)、请求类型(entryType)以及资源类型(resourceType)三个字段
    protected final ResourceWrapper resourceWrapper;
    
    public Entry(ResourceWrapper resourceWrapper) {
        this.resourceWrapper = resourceWrapper;
        //给开始时间赋值为当前系统时间
        this.createTimestamp = TimeUtil.currentTimeMillis();
    }
}

Entry资源访问类的设计总结:

每次资源被访问时都会创建一个Entry资源访问对象。Entry资源访问对象会包含:资源名称、资源类型、请求类型、操作资源的开始和结束时间、统计各项指标的接口以及自定义流控异常。

根据这些字段可以完成以下需求:统计一个资源的出口流量、成功次数、失败次数等指标。根据规则配置判断是否超出阈值,若是则抛出自定义异常BlockException。这样的Entry资源访问类,能灵活方便管理和监控资源,满足不同的需求。

为了方便子类继承,可以将Entry资源访问类设计为抽象类。

四.如何设计Entry资源访问类的子类

假设需要提供一个外部接口(sms/send)用于发送短信服务,具体的短信发送操作是通过第三方云服务提供商提供的API或SDK完成的,第三方云服务提供商需要使用方对其提供的API或SKD进行QPS限流。

当用户请求接口sms/send到达系统时,会生成一个名为sms/send的Entry。由于当接口sms/send请求第三方API时,也需要进行限流操作,因此还会生成一个名为yun/sms/api的Entry。

可以发现这两个Entry都属于同一次请求,即这两个Entry是有关联的。

Entry(name=sms/send)的子节点是Entry(name=yun/sms/api)
Entry(name=yun/sms/api)的父节点是Entry(name=sms/send)

这显然是父子关系,可以使用双向链表表示,因此需要两个指针。

此外,由于这两个Entry属于同一个请求,即它们位于同一个作用范围内。因此,需要一个字段来表示作用范围。

最后,由于需要通过设置一系列的过滤器来采集Entry的各规则下的指标。所以,还需增加一个存放处理链(处理器插槽链条)类型的字段。

综上,可以设计Entry的子类CtEntry如下:

//Entry的子类
class CtEntry extends Entry {
    //指向上一个节点,父节点,类型为Entry
    protected Entry parent = null;
    //指向下一个节点,子节点,类型为Entry
    protected Entry child = null;
    //作用域,上下文
    protected Context context;
    //处理链(处理器插槽链条)
    protected ProcessorSlot<Object> chain;
}

(2)Context

Context对象的用途是存储请求调用链中关联的Entry信息。在Sentinel中,每个请求都有一个与之关联的Context实例。当系统接收到一个请求时,就会创建出一个Context。请求的处理过程中可能会涉及多个资源,Context便会在多个资源中传递。

Context类的设计如下:

public class Context {
    //名称
    private final String name;
    //处理到哪个Entry了
    private Entry curEntry;
    //来源,比如请求 IP
    private String origin = "";
}

可以发现Context比较简单,它就像一个容器一样。Context会关联此次请求的所有资源,即包含一个Entry双向链表。一个Context的生命周期内可能有多个资源操作,并非是一个接口对应一个Context,可以是多个接口对应一个Context。比如A调用B,B调用C,那么A、B、C三个资源就属于同一个Context。Context生命周期内的最后一个资源退出时就会清理并结束该Context。

Context类的设计总结:

在处理一个请求对应的一个资源时或者多个资源时,这些资源的操作必须要建立在一个Context环境下,而且每一个资源的操作必须要通过Entry对象来完成。也就是一个请求对应一个Context,一个请求可能操作多个资源。而多个资源又对应多个Entry,但这些Entry却都属于同一个Context。

(3)ProcessorSlot

ProcessorSlot就是一个用于负责数据采集的处理链(处理器插槽链条)的节点。每当一个资源被调用时都会创建一个Entry资源访问对象。Entry资源访问对象中就有一条节点为ProcessorSlot的处理链,对应于Entry对象的chain属性。这意味着在创建Entry对象时,也会生成一系列ProcessorSlot处理槽,这些ProcessorSlot处理槽各自承担不同的职责。

一.NodeSelectorSlot

NodeSelectorSlot负责构建资源的调用路径,然后将这些资源的调用路径以树状结构存储起来,方便后续根据资源的调用路径进行数据统计和限流降级。

二.ClusterBuilderSlot

ClusterBuilderSlot负责实现集群限流功能,ClusterBuilderSlot会将请求的流量信息汇报到集群的统计节点,然后根据集群限流规则决定是否应该限制请求。

三.StatisticSlot

StatisticSlot负责记录资源的访问统计信息,比如通过的请求数、阻塞的请求数、响应时间等。StatisticSlot会将每次资源访问的信息记录在资源的统计节点中,这些统计信息是Sentinel执行流量控制(如限流、熔断降级等)的重要指标。

四.SystemSlot

SystemSlot用于实现系统保护功能,它会提供基于系统负载、系统平均响应时间和系统入口QPS的保护策略。

五.AuthoritySlot

AuthoritySlot用于实现授权规则功能,比如基于黑白名单的访问权限。

六.FlowSlot

FlowSlot用于实现流量控制功能,比如对不同来源的流量进行限制、基于调用关系对流量进行控制。

七.DegradeSlot

DegradeSlot用于实现熔断降级功能,可以支持基于异常比例、异常数和响应时间的降级策略,处理链(处理器插槽链条)之间的关系图如下:

ProcessorSlot的用途总结:负责构建调用路径树、进行数据采集、实施流量控制和熔断降级等规则。

(4)Node

Node的用途很简单,就是基于处理槽采集的数据进行统计。比如统计总的请求量、请求成功量、请求失败量等指标。因此可以定义如下接口:

public interface Node {
    //获取总请求量
    long totalRequest();
    //获取请求成功量
    long successRequest();
    //获取请求失败量
    long failedRequest();
    ...
}

即然Node是一个接口,那么需要提供一个具体的实现类。由于Node接口主要关注统计相关功能,因此可将实现类命名为StatisticNode,如下所示:

public class StatisticNode implements Node {
    ...
}

通过StatisticNode类可完成Node接口定义的各种性能指标的收集和计算。但为了更多维度的计算,比如:上下文Context维度、资源维度等,还需要额外设计如下三个子类:

一.DefaultNode(单机里的资源维度)

默认节点,用于统计名字相同的Context下的某个资源的调用数据,意味着DefaultNode是以Context和ResourceWrapper为维度进行统计。

二.EntranceNode(接口维度)

继承自DefaultNode,是名字相同的Context的入口节点。用于统计名字相同的Context下的所有资源的调用数据,维度为Context。注意:默认创建的Context都是名字相同的,一个线程对应一个Context。所以EntranceNode可以理解为统计某接口被所有线程访问的调用数据。

三.ClusterNode(集群中的资源维度)

ClusterNode保存的是同一个资源的相关的统计信息,ClusterNode是以资源为维度的,不区分Context。

//Holds real-time statistics for resources.
public interface Node extends OccupySupport, DebugSupport {
    ...
}

public class StatisticNode implements Node {
    ...
}

//A Node used to hold statistics for specific resource name in the specific context.
//Each distinct resource in each distinct Context will corresponding to a DefaultNode.
//This class may have a list of sub DefaultNodes.
//Child nodes will be created when calling SphU.entry() multiple times in the same Context.
public class DefaultNode extends StatisticNode {
    ...
}

//A Node represents the entrance of the invocation tree.
//One Context will related to a EntranceNode, which represents the entrance of the invocation tree. 
//New EntranceNode will be created if current context does't have one. 
//Note that same context name will share same EntranceNode globally.
public class EntranceNode extends DefaultNode {
    ...
}

//This class stores summary runtime statistics of the resource, including rt, thread count, qps and so on. 
//Same resource shares the same ClusterNode globally, no matter in which Context.
public class ClusterNode extends StatisticNode {
    ...
}

//The ClusterNode is uniquely identified by the ResourceId.
//The DefaultNode is identified by both the resource id and Context. 
//In other words, one resource id will generate multiple DefaultNode for each distinct context,
//but only one ClusterNode.

三者的区别:DefaultNode统计的是名字相同的Context下的某个资源的调用数据,EntranceNode统计的是名字相同的Context下的全部资源的调用数据,ClusterNode统计的是某个资源在所有Context下的调用数据。

注意:资源可以是一个方法、一个接口或一段代码。一个请求会对应一个Context,一个请求可能包含多个方法,所以一个请求可能操作多个资源,一个Context可能包含多个资源。

Node的用途总结:统计各种维度的各种数据指标。

(5)总结

一.资源访问对象Entry

每次访问资源都会创建一个Entry资源访问对象。每个Entry对象都会包含资源的基本信息(如名称、请求类型、资源类型等)、数据采集链和获取指标信息的方法(如QPS、成功请求数、失败请求数等)。由于一次请求可能涉及多个资源,因此Entry资源访问对象采用双向链表结构。

二.管理资源对象的上下文Context

资源的操作要在一个Context环境下进行,一个Context可包含多个资源。

三.请求、Entry、Context、ProcessorSlot、Node之间的关系

每次访问资源都要创建一个Entry对象,资源操作要建立在一个Context环境下。一个请求对应一个Context,一个Context可以包含多个资源,也就是一个Context会包含一条完整请求链路中涉及的所有资源。每个资源都通过ProcessorSlot进行数据采集和规则验证,而采集完的数据交由Node去做聚合统计分析。

总之:每个请求都需要与Context绑定,一个Context可以关联多个资源。每个资源都通过处理链ProcessorSlot进行数据采集和规则验证。ProcessorSlot数据采集完成后,会通过Node进行统计和分析。

2.Sentinel中Context的设计思想与源码实现

(1)初始化Entry和Context的设计思想

(2)初始化Entry的源码—将处理链、Context与Entry对象绑定

(3)初始化Context的源码—如何创建Context和EntranceNode

(4)总结

(1)初始化Entry和Context的设计思想

一.Context对象的name属性一般取默认值

Context对象是有一个name属性的,所以如果没有指定Context对象的name属性,则Context对象的name默认为sentinel_default_context。

Context context = InternalContextUtil.internalEnter("sentinel_default_context");

二.使用ThreadLocal绑定线程和Context对象

由于一个请求要与一个Context对象进行绑定,一个请求由一个线程处理。所以可以定义一个ThreadLocal变量,让一个线程与一个Context对象绑定。

//存放线程与Context的绑定关系
private static ThreadLocal<Context> contextHolder = new ThreadLocal<>();

三.如何初始化Entry对象

由于一个请求涉及多个资源,即一个Context对象会包含多个Entry对象,所以每个Entry对象必然属于某个Context对象。因此初始化Entry对象时需要将Context对象绑定到Entry对象中,这可以在Entry的构造方法中传入一个Context对象。当然Entry的构造方法也会传入一个ResourceWrapper对象,因为Entry的基本属性(名称、请求类型、资源类型)会封装在ResourceWrapper对象中。当然,Entry对象还需要的关键属性有:ProcessorSlot和Node。

//初始化Entry的基本属性
ResourceWrapper resource = new ResourceWrapper("/hello/world", EntryType.OUT, ResourceTypeConstants.COMMON);
//初始化Context
Context context = InternalContextUtil.internalEnter("sentinel_default_context");
//放到Entry当中
CtEntry entry = new CtEntry(resourceWrapper, context);

(2)初始化Entry的源码—将处理链、Context与Entry对象绑定

Sentinel创建资源访问对象的入口是:SphU.entry("/hello/world"),在执行SphU.entry()方法时会初始化一个Context对象,其中便会调用CtSph.entry()方法创建一个Entry资源访问对象。

在CtSph的entry()方法中,首先会创建一个StringResourceWrapper对象,然后调用CtSph的entryWithPriority()方法执行如下的处理逻辑:初始化Context -> 将Context与线程绑定 -> 初始化Entry -> 将Context和ResourceWrapper放入Entry中。

在CtSph的entryWithPriority()方法中,由于创建一个Entry资源访问对象时,需要传入当前线程对应的Context,所以首先会调用ContextUtil的getContext()方法从当前线程中获取Context。如果获取到的Context为空,也就是当前线程没有绑定Context,那么就调用InternalContextUtil的internalEnter()方法创建一个Context对象,也就是调用ContextUtil的trueEnter()方法创建一个Context对象,并把这个Context对象放入ThreadLocal线程变量contextHolder中。然后调用CtSph的lookProcessChain()方法初始化处理链,接着创建一个Entry资源访问对象并将处理链、Context与该Entry资源访问对象绑定,最后调用ProcessorSlot的entry()方法执行处理链节点的数据采集 + 规则验证。

//The fundamental Sentinel API for recording statistics and performing rule checking for resources.
public class SphU {
    private static final Object[] OBJECTS0 = new Object[0];
    ...
    //Record statistics and perform rule checking for the given resource.
    //@param name the unique name of the protected resource
    public static Entry entry(String name) throws BlockException {
        //调用CtSph.entry()方法创建一个Entry资源访问对象,默认的请求类型为OUT
        return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0);
    }
    ...
}

//Sentinel Env. This class will trigger all initialization for Sentinel.
public class Env {
    //创建一个CtSph对象
    public static final Sph sph = new CtSph();
    static {
        //If init fails, the process will exit.
        InitExecutor.doInit();
    }
}

public class CtSph implements Sph {
    ...
    //Record statistics and perform rule checking for the given resource.
    //@param name the unique name for the protected resource
    //@param type the traffic type (inbound, outbound or internal).
    //This is used to mark whether it can be blocked when the system is unstable, only inbound traffic could be blocked by SystemRule
    //@param count the amount of calls within the invocation (e.g. batchCount=2 means request for 2 tokens)
    //@param args args for parameter flow control or customized slots
    @Override
    public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException {
        //StringResourceWrapper是ResourceWrapper的子类,且StringResourceWrapper的构造方法默认了资源类型为COMMON
        StringResourceWrapper resource = new StringResourceWrapper(name, type);
        return entry(resource, count, args);
    }
    
    //Do all {@link Rule}s checking about the resource.
    public Entry entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
        //调用CtSph.entryWithPriority()方法,执行如下处理:
        //初始化Context -> 将Context与线程绑定 -> 初始化Entry -> 将Context和ResourceWrapper放入Entry中
        return entryWithPriority(resourceWrapper, count, false, args);
    }
    
    private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException {
        //从当前线程中获取Context
        Context context = ContextUtil.getContext();
        if (context instanceof NullContext) {
            return new CtEntry(resourceWrapper, null, context);
        }
        //如果没获取到Context
        if (context == null) {
            //Using default context.
            //创建一个名为sentinel_default_context的Context,并且与当前线程绑定
            context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
        }
        //Global switch is close, no rule checking will do.
        if (!Constants.ON) {
            return new CtEntry(resourceWrapper, null, context);
        }
        //调用CtSph.lookProcessChain()方法初始化处理链(处理器插槽链条)
        ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
        if (chain == null) {
            return new CtEntry(resourceWrapper, null, context);
        }
        //创建出一个Entry资源访问对象,将处理链(处理器插槽链条)、Context与Entry资源访问对象绑定
        //其中会将Entry的三个基础属性(封装在resourceWrapper里)以及当前Entry所属的Context作为参数传入CtEntry的构造方法
        Entry e = new CtEntry(resourceWrapper, chain, context);
        try {
            //处理链(处理器插槽链条)入口,负责采集数据,规则验证
            //调用DefaultProcessorSlotChain.entry()方法执行处理链每个节点的逻辑(数据采集+规则验证)
            chain.entry(context, resourceWrapper, null, count, prioritized, args);
        } catch (BlockException e1) {
            //规则验证失败,比如:被流控、被熔断降级、触发黑白名单等
            e.exit(count, args);
            throw e1;
        } catch (Throwable e1) {
            RecordLog.info("Sentinel unexpected exception", e1);
        }
        return e;
    }
    ...
    private final static class InternalContextUtil extends ContextUtil {
        static Context internalEnter(String name) {
            //调用ContextUtil.trueEnter()方法创建一个Context对象
            return trueEnter(name, "");
        }
        static Context internalEnter(String name, String origin) {
            return trueEnter(name, origin);
        }
    }
}

public class StringResourceWrapper extends ResourceWrapper {
    public StringResourceWrapper(String name, EntryType e) {
        //调用父类构造方法,且默认资源类型为COMMON
        super(name, e, ResourceTypeConstants.COMMON);
    }
    ...
}

初始化Context的核心源码总结:

1.从当前ThreadLocal里获取Context

2.获取不到Context,则创建Context并将Context放到ThreadLocal里,与当前请求线程绑定

3.初始化处理链(处理器插槽链条)

4.将处理链(处理器插槽链条)、Context与Entry资源访问对象绑定

5.执行每一个链条的逻辑(数据采集 + 规则验证)

6.验证失败抛出BlockException

(3)初始化Context的源码—如何创建Context和EntranceNode

ContextUtil的trueEnter()方法会尝试从ThreadLocal获取一个Context对象。如果获取不到,那么再创建一个Context对象然后放入到ThreadLocal中。

由于当前线程可能会涉及创建多个Entry,所以该方法需要注意并发问题。但是并非在创建Context时需要注意并发,因为一个线程本就需要一个Context。而是在创建Context对象所需的EntranceNode对象时才需注意并发,因为相同名字的Context对象会共用同一个EntranceNode对象。默认情况下,创建的Context对象都是有相同名字的,这样一个EntranceNode对象就可以对当前机器的所有请求进行统计。

其中在创建EntranceNode对象时,会使用Double Check + 锁的机制,先从缓存EntranceNode对象的Map中尝试获取已存在的EntranceNode。如果获取不到EntranceNode,那么再去创建EntranceNode对象,然后使用写时复制 + 锁去更新缓存EntranceNode对象的Map。

//Utility class to get or create Context in current thread.
//Each SphU.entry() should be in a Context.
//If we don't invoke ContextUtil.enter() explicitly, DEFAULT context will be used.
public class ContextUtil {
    //Store the context in ThreadLocal for easy access.
    //存放线程与Context的绑定关系
    //每个请求对应一个线程,每个线程绑定一个Context,所以每个请求对应一个Context
    private static ThreadLocal<Context> contextHolder = new ThreadLocal<>();
    //Holds all EntranceNode. Each EntranceNode is associated with a distinct context name.
    //以Context的name作为key,EntranceNode作为value缓存到HashMap中
    private static volatile Map<String, DefaultNode> contextNameNodeMap = new HashMap<>();
    private static final ReentrantLock LOCK = new ReentrantLock();
    private static final Context NULL_CONTEXT = new NullContext();
    ...
    //ContextUtil.trueEnter()方法会尝试从ThreadLocal获取一个Context对象
    //如果获取不到,再创建一个Context对象然后放入ThreadLocal中
    //入参name其实一般就是默认的Constants.CONTEXT_DEFAULT_NAME=sentinel_default_context
    //由于当前线程可能会涉及创建多个Entry资源访问对象,所以trueEnter()方法需要注意并发问题
    protected static Context trueEnter(String name, String origin) {
        //从ThreadLocal中获取当前线程绑定的Context对象
        Context context = contextHolder.get();
        //如果当前线程还没绑定Context对象,则初始化Context对象并且与当前线程进行绑定
        if (context == null) {
            //首先要获取或创建Context对象所需要的EntranceNode对象,EntranceNode会负责统计名字相同的Context下的指标数据
            //将全局缓存contextNameNodeMap赋值给一个临时变量localCacheNameMap
            //因为后续会对contextNameNodeMap的内容进行修改,所以这里需要将原来的contextNameNodeMap复制一份出来
            //从而避免后续对contextNameNodeMap的内容进行修改时,可能造成对接下来读取contextNameNodeMap内容的影响
            Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;

            //从缓存副本localCacheNameMap中获取EntranceNode
            //这个name其实一般就是默认的sentinel_default_context
            DefaultNode node = localCacheNameMap.get(name);
            //如果获取的EntranceNode为空
            if (node == null) {
                //为了防止缓存无限制地增长,导致内存占用过高,需要设置一个上限,只要超过上限,就直接返回NULL_CONTEXT
                if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
                    setNullContext();
                    return NULL_CONTEXT;
                } else {
                    //如果Context还没创建,缓存里也没有当前Context名称对应的EntranceNode,并且缓存数量尚未达到2000
                    //那么就创建一个EntranceNode,创建EntranceNode时需要加锁,否则会有线程不安全问题
                    //毕竟需要修改HashMap类型的contextNameNodeMap

                    //通过加锁 + 缓存 + 写时复制更新缓存,避免并发情况下创建出多个EntranceNode对象
                    //一个线程对应一个Context对象,多个线程对应多个Context对象
                    //这些Context对象会使用ThreadLocal进行隔离,但它们的name默认都是sentinel_default_context
                    //根据下面的代码逻辑:
                    //多个线程(对应多个Context的name默认都是sentinel_default_context)会共用同一个EntranceNode
                    //于是可知,多个Context对象会共用一个EntranceNode对象
                    LOCK.lock();
                    try {
                        //从缓存中获取EntranceNode
                        node = contextNameNodeMap.get(name);
                        //对node进行Double Check
                        //如果没获取到EntranceNode
                        if (node == null) {
                            if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
                                setNullContext();
                                return NULL_CONTEXT;
                            } else {
                                //创建EntranceNode,缓存到contextNameNodeMap当中
                                node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
                                //Add entrance node.
                                //将新创建的EntranceNode添加到ROOT中,ROOT就是每个Node的根结点
                                Constants.ROOT.addChild(node);

                                //写时复制,将新创建的EntranceNode添加到缓存中
                                Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);
                                newMap.putAll(contextNameNodeMap);
                                newMap.put(name, node);
                                contextNameNodeMap = newMap;
                            }
                        }
                    } finally {
                        //解锁
                        LOCK.unlock();
                    }
                }
            }
            //此处可能会有多个线程同时执行到此处,并发创建多个Context对象
            //但这是允许的,因为一个请求对应一个Context,一个请求对应一个线程,所以一个线程本来就需要创建一个Context对象

            //初始化Context,将刚获取到或刚创建的EntranceNode放到Context的entranceNode属性中
            context = new Context(node, name);
            context.setOrigin(origin);
            //将创建出来的Context对象放入ThreadLocal变量contextHolder中,实现Context对象与当前线程的绑定
            contextHolder.set(context);
        }
        return context;
    }
    ...
}

(4)总结

初始化Entry和Context的整体流程:

3.Java SPI机制的引入

(1)Java SPI机制的简介

(2)基于Java SPI机制开发日志框架的示例

(3)ServiceLoader的原理简介

(1)Java SPI机制的简介

SPI(Service Provider Interface)是Java提供的一种轻量级的服务发现机制,可以让开发者通过约定的方式,在程序运行时动态加载和替换接口的实现,从而提高程序的扩展性和灵活性。

比如在写框架时可以先写一个接口,然后内置几种实现类(实现不同算法)。但是业务系统接入这个框架时应该选择哪种算法、采取哪个实现类呢?这时就可以通过配置来完成。

首先将接口实现类的全限定名配置在配置文件中,然后当业务系统启动时,框架会读取配置文件并解析出配置类的全限定名,接着通过反射机制在运行时动态替换接口的默认实现类。

(2)基于Java SPI机制开发日志框架的示例

假设要开发一个日志框架,其中需要实现一个Logger接口,用于记录日志。我们希望在框架中预置两个Logger实现类:ConsoleLogger和FileLogger,同时也希望用户可以根据自己的需要扩展Logger实现类。

步骤一:定义Logger接口

public interface Logger {
    void log(String message);
}

步骤二:在框架中内置ConsoleLogger和FileLogger实现类

public class ConsoleLogger implements Logger {
    @Override
    public void log(String message) {
        //直接打印到控制台
        System.out.println("[ConsoleLogger] " + message);
    }
}

public class FileLogger implements Logger {
    @Override
    public void log(String message) {
        //将日志写入文件
    }
}

步骤三:实现SPI机制 + 扩展Logger实现类

首先,在META-INF/services目录下创建文件"com.example.Logger",文件内容为Logger实现类的全限定名。例如,如下的MyLogger就是用户自定义的Logger实现类。

com.example.MyLogger

然后,在框架中读取Logger实现类的全限定名,并实例化Logger实现类,这可以通过以下代码实现。这段代码会从META-INF/services目录下读取Logger实现类的全限定名,然后通过Java反射机制实例化对应的Logger实现类,并调用其log()方法。

public static void main(String[] args){
    ServiceLoader<Logger> loggerServiceLoader = ServiceLoader.load(Logger.class);
    for (Logger logger : loggerServiceLoader) {
        logger.log("Hello, Java SPI!");
    }
}

(3)ServiceLoader的原理简介

ServiceLoader是Java提供的一种基于SPI机制的服务加载器,它可以在程序运行时动态加载和实例化实现了某个接口的类。所以通过ServiceLoader机制,可以方便地扩展和替换程序中的实现类。

ServiceLoader的使用非常简单,只要首先按照规范创建接口和实现类,然后在META-INF/services目录下创建以接口全限定名命名的文件,接着在文件中写入实现类的全限定名,最后调用ServiceLoader的load()方法便可以加载并实例化实现类。

ServiceLoader的load()方法会根据传入的接口获取接口的全类名,然后将前缀/META-INF/services与接口的全类名拼接来定位到配置文件,接着读取配置文件的内容获取文件中的字符串,最后解析字符串得到实现类的全类名并添加到一个数组。

ServiceLoader实现了迭代器Iterable接口,当遍历ServiceLoader时,ServiceLoader会调用Class的forName()方法加载类并通过反射创建实例。如果没有指定类加载器,则会使用当前线程的上下文类加载器来加载类。通过这种方式,就可以方便地扩展和替换程序中的实现类。

(4)总结

Java SPI是一种轻量级的服务发现机制,可以动态加载和替换接口实现类。通过约定的方式,开发者可在程序运行时动态地加载和替换接口的实现,从而提高程序的扩展性和灵活性。

Java SPI机制通常与ServiceLoader一起使用,ServiceLoader会从META-INF/services目录下的配置文件中,读取接口实现类的全限定名,并通过Java反射机制实例化对应的类,从而实现对接口实现类的动态加载和替换。

SPI机制在开源项目中被广泛使用,例如SpringBoot、RocketMQ、Dubbo、Sentinel等。

Java SPI的使用流程图如下:

4.Java SPI机制在Sentinel处理链中的应用

(1)初始化Entry会初始化处理链

(2)初始化处理链的设计分析

(3)Sentinel初始化处理链的完整流程

(4)Sentinel初始化处理任链之加载SPI文件

(5)Sentinel初始化处理链之实例化Class

(1)初始化Entry会初始化处理链

初始化Entry时会调用两个与处理链相关的核心方法:一是调用CtSph的lookProcessChain()方法初始化处理链,二是调用ProcessorSlot的entry()方法执行处理链节点的逻辑。

public class CtSph implements Sph {
    ...
    //Record statistics and perform rule checking for the given resource.
    //@param name the unique name for the protected resource
    //@param type the traffic type (inbound, outbound or internal).
    //This is used to mark whether it can be blocked when the system is unstable, only inbound traffic could be blocked by SystemRule
    //@param count the amount of calls within the invocation (e.g. batchCount=2 means request for 2 tokens)
    //@param args args for parameter flow control or customized slots
    @Override
    public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException {
        //StringResourceWrapper是ResourceWrapper的子类,且StringResourceWrapper的构造方法默认了资源类型为COMMON
        StringResourceWrapper resource = new StringResourceWrapper(name, type);
        return entry(resource, count, args);
    }
    
    //Do all {@link Rule}s checking about the resource.
    public Entry entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
        //调用CtSph.entryWithPriority()方法,执行如下处理:
        //初始化Context -> 将Context与线程绑定 -> 初始化Entry -> 将Context和ResourceWrapper放入Entry中
        return entryWithPriority(resourceWrapper, count, false, args);
    }
    
    private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException {
        //从当前线程中获取Context
        Context context = ContextUtil.getContext();
        if (context instanceof NullContext) {
            return new CtEntry(resourceWrapper, null, context);
        }
        //如果没获取到Context
        if (context == null) {
            //Using default context.
            //创建一个名为sentinel_default_context的Context,并且与当前线程绑定
            context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
        }
        //Global switch is close, no rule checking will do.
        if (!Constants.ON) {
            return new CtEntry(resourceWrapper, null, context);
        }
        //调用CtSph.lookProcessChain()方法初始化处理链(处理器插槽链条)
        ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
        if (chain == null) {
            return new CtEntry(resourceWrapper, null, context);
        }
        //创建出一个Entry对象,将处理链(处理器插槽链条)、Context与Entry绑定
        //其中会将Entry的三个基础属性(封装在resourceWrapper里)以及当前Entry所属的Context作为参数传入CtEntry的构造方法
        Entry e = new CtEntry(resourceWrapper, chain, context);
        try {
            //处理链(处理器插槽链条)入口,负责采集数据,规则验证
            //调用DefaultProcessorSlotChain.entry()方法执行处理链每个节点的逻辑(数据采集+规则验证)
            chain.entry(context, resourceWrapper, null, count, prioritized, args);
        } catch (BlockException e1) {
            //规则验证失败,比如:被流控、被熔断降级、触发黑白名单等
            e.exit(count, args);
            throw e1;
        } catch (Throwable e1) {
            RecordLog.info("Sentinel unexpected exception", e1);
        }
        return e;
    }
    ...
    private final static class InternalContextUtil extends ContextUtil {
        static Context internalEnter(String name) {
            //调用ContextUtil.trueEnter()方法创建一个Context对象
            return trueEnter(name, "");
        }
        
        static Context internalEnter(String name, String origin) {
            return trueEnter(name, origin);
        }
    }
}

(2)初始化处理链的设计分析

一.初始化处理链需要加锁确保线程安全

由于每个线程在执行CtSph的entry()方法创建一个Entry对象时,都需要首先调用CtSph的lookProcessChain()方法获取一个处理链,然后再根据这个处理链去初始化一个Entry对象,所以多个同样的Entry对象会共用一个处理链对象。

需要注意:即便是多个线程访问同样的资源(ResourceWrapper对象的属性一样),多个线程也会对应多个Entry对象(Entry对象之间的基本属性一样),多个线程也会对应多个Context对象(使用ThreadLocal存放Context对象),从而多个Entry对象会对应各自的Context对象。多个Entry对象会共用一个处理链对象(使用HashMap来缓存处理链对象),多个Context对象会共用一个Node对象(使用HashMap来缓存Node对象),多个Entry对象会共用一个Node对象(使用HashMap来缓存Node对象)。

因此当出现多线程并发创建多个Entry对象时,CtSph的lookProcessChain()方法自然就会被多线程并发调用。所以在初始化处理链时,需要考虑线程安全和性能问题。

为了确保线程安全,可以采用加锁的方式来初始化处理链,然后将处理链缓存到HashMap中来提高性能,如下伪代码所示:

//缓存处理链,key为资源,value为处理链
//多个线程创建多个Entry对象时,也会创建多个ResourceWrapper对象
//但这些ResourceWrapper对象,在同一个资源下,其属性是一样的
private static volatile Map<ResourceWrapper, ProcessorSlotChain> chainMap = new HashMap<ResourceWrapper, ProcessorSlotChain>();

//加锁synchronized
synchronized ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
    ProcessorSlotChain chain = chainMap.get(resourceWrapper);
    if (chain == null) {
        //构建(初始化)处理链(处理器插槽链条)
        chain = SlotChainProvider.newSlotChain();
        //将处理链(处理器插槽链条)放到缓存中
        chainMap.put(resourceWrapper, chain);
    }
    return chain;
}

二.初始化处理链通过SPI机制动态管理节点

初始化处理链时可使用List硬编码添加每个节点。

public class DefaultSlotChainBuilder implements SlotChainBuilder {
    @Override
    public ProcessorSlotChain build() {
        //硬编码
        ProcessorSlotChain chain = new DefaultProcessorSlotChain();
        chain.addLast(new NodeSelectorSlot());
        chain.addLast(new ClusterBuilderSlot());
        chain.addLast(new StatisticSlot());
        chain.addLast(new AuthoritySlot());
        chain.addLast(new SystemSlot());
        chain.addLast(new FlowSlot());
        chain.addLast(new DegradeSlot());
        return chain;
    }
}

但硬编码的缺点是无法动态增减ProcessorSlot。比如不需要授权AuthoritySlot,硬编码时就不好删除了。比如内置的几个ProcessorSlot不符合业务需求,硬编码难以实现自定义。因此,需要使用SPI机制来解决硬编码带来的问题。也就是在配置文件中注册多个实现类,然后通过迭代器的方式逐个加载。

具体就是在META-INF/services目录下创建一个接口文件,然后将需要注册的实现类的全类名每行一个写入该文件中。框架启动时依次读取该文件中的实现类,并按照文件中注册的顺序加载。如果业务系统也创建了META-INF/services目录以及一个接口文件,那么业务系统的实现类并不会覆盖框架内置的实现类,而是叠加起来使用。

利用SPI机制,可以实现责任链模式的可插拔式扩展,处理链(处理器插槽链条)的每个节点都可以动态添加、删除、替换。

三.使用SPI机制的两大步骤

步骤一:创建META-INF/services目录,然后在该目录下创建一个文件名为如下的文件,文件内容为定义了处理链由哪些节点组成的类的类名全路径。

com.alibaba.csp.sentinel.slotchain.SlotChainBuilder

步骤二:通过ServiceLoader初始化处理链节点。

public static void main(String[] args){
    ServiceLoader<SlotChainBuilder> serviceLoader = ServiceLoader.load(SlotChainBuilder.class);
    ...
}

四.初始化处理链的设计要点总结

要点一:初始化处理链时加锁

要点二:使用HashMap缓存处理链

要点三:使用SPI机制初始化处理链

(3)Sentinel初始化处理链的完整流程

一.将处理链存储在全局缓存 + 使用锁初始化处理链

如果对CtSph的lookProcessChain()方法加锁,则锁的粒度过大。所以可以在操作缓存处理链的HashMap时才加synchronized锁,而且在操作缓存处理链的HashMap时,使用了Double Check + 写时复制。

二.利用SPI机制完成处理链的初始化

为了增加扩展性,Sentinel初始化处理链时使用了SPI机制两次。第一次使用SPI机制,是为了可以自定义处理链的节点编排。第二次使用SPI机制,是为了可以自定义处理链各节点的具体逻辑。

public class CtSph implements Sph {
    //Same resource will share the same ProcessorSlotChain}, no matter in which Context.
    //Same resource is that ResourceWrapper#equals(Object).
    private static volatile Map<ResourceWrapper, ProcessorSlotChain> chainMap = new HashMap<ResourceWrapper, ProcessorSlotChain>();
    private static final Object LOCK = new Object();
    ...
    //Get ProcessorSlotChain of the resource.
    //new ProcessorSlotChain will be created if the resource doesn't relate one.
    //Same resource will share the same ProcessorSlotChain globally, no matter in which Context.
    //Same resource is that ResourceWrapper#equals(Object).
    //Note that total ProcessorSlot count must not exceed Constants.MAX_SLOT_CHAIN_SIZE, otherwise null will return.
    ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
        ProcessorSlotChain chain = chainMap.get(resourceWrapper);
        if (chain == null) {
            //操作chainMap时才加锁
            synchronized (LOCK) {
                chain = chainMap.get(resourceWrapper);
                if (chain == null) {//Double Check
                    //Entry size limit.
                    if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
                        return null;
                    }
                    //初始化处理链(处理器插槽链条)
                    chain = SlotChainProvider.newSlotChain();
                    //写时复制
                    Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(chainMap.size() + 1);
                    newMap.putAll(chainMap);
                    newMap.put(resourceWrapper, chain);
                    chainMap = newMap;
                }
            }
        }
        return chain;
    }
    ...
}

//A provider for creating slot chains via resolved slot chain builder SPI.
public final class SlotChainProvider {
    private static volatile SlotChainBuilder slotChainBuilder = null;

    //The load and pick process is not thread-safe,
    //but it's okay since the method should be only invoked via CtSph.lookProcessChain() under lock.
    public static ProcessorSlotChain newSlotChain() {
        //如果存在,则直接返回
        if (slotChainBuilder != null) {
            return slotChainBuilder.build();
        }

        //Resolve the slot chain builder SPI.
        //第一次使用SPI: 通过SPI机制初始化SlotChainBuilder
        slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();

        if (slotChainBuilder == null) {
            //Should not go through here.
            RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
            slotChainBuilder = new DefaultSlotChainBuilder();
        } else {
            RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: {}", slotChainBuilder.getClass().getCanonicalName());
        }
        return slotChainBuilder.build();
    }
    private SlotChainProvider() {}
}

public final class SpiLoader<S> {
    //Cache the SpiLoader instances, key: classname of Service, value: SpiLoader instance
    private static final ConcurrentHashMap<String, SpiLoader> SPI_LOADER_MAP = new ConcurrentHashMap<>();
    //Cache the classes of Provider
    private final List<Class<? extends S>> classList = Collections.synchronizedList(new ArrayList<Class<? extends S>>());
    //Cache the sorted classes of Provider
    private final List<Class<? extends S>> sortedClassList = Collections.synchronizedList(new ArrayList<Class<? extends S>>());
    ...
    //Create SpiLoader instance via Service class Cached by className, and load from cache first
    public static <T> SpiLoader<T> of(Class<T> service) {
        AssertUtil.notNull(service, "SPI class cannot be null");
        AssertUtil.isTrue(service.isInterface() || Modifier.isAbstract(service.getModifiers()), "SPI class[" + service.getName() + "] must be interface or abstract class");

        String className = service.getName();
        SpiLoader<T> spiLoader = SPI_LOADER_MAP.get(className);
        if (spiLoader == null) {
            synchronized (SpiLoader.class) {
                spiLoader = SPI_LOADER_MAP.get(className);
                if (spiLoader == null) {//Double Check
                    SPI_LOADER_MAP.putIfAbsent(className, new SpiLoader<>(service));
                    spiLoader = SPI_LOADER_MAP.get(className);
                }
            }
        }
        return spiLoader;
    }
    
    //Load the first-found Provider instance,if not found, return default Provider instance
    public S loadFirstInstanceOrDefault() {
        //SPI机制加载Class,然后将加载的Class放到classList数组里
        load();

        //循环遍历,根据classList里的Class来初始化对应的实例
        for (Class<? extends S> clazz : classList) {
            if (defaultClass == null || clazz != defaultClass) {
                return createInstance(clazz);
            }
        }
        //初始化默认的DefaultSlotChainBuilder
        return loadDefaultInstance();
    }
    
    //Load all Provider instances of the specified Service, sorted by order value in class's {@link Spi} annotation
    public List<S> loadInstanceListSorted() {
        //比如读取com.alibaba.csp.sentinel.slotchain.ProcessorSlot文件里的Class名字
        //然后根据这些Class名字加载Class
        //接着将Class放到sortedClassList集合中
        load();

        //实例化sortedClassList集合里的每个Class
        return createInstanceList(sortedClassList);
    }
    ...
}

//Builder for a default {@link ProcessorSlotChain}.
@Spi(isDefault = true)
public class DefaultSlotChainBuilder implements SlotChainBuilder {
    @Override
    public ProcessorSlotChain build() {
        ProcessorSlotChain chain = new DefaultProcessorSlotChain();
        //通过SPI机制加载责任链的节点ProcessorSlot实现类
        //然后按照@Spi注解的order属性进行排序并进行实例化
        //最后将ProcessorSlot实例放到sortedSlotList中
        List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();
        //遍历已排好序的ProcessorSlot集合
        for (ProcessorSlot slot : sortedSlotList) {
            //安全检查,防止业务系统也写了一个SPI文件,但没按规定继承AbstractLinkedProcessorSlot
            if (!(slot instanceof AbstractLinkedProcessorSlot)) {
                RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
                continue;
            }
            //调用DefaultProcessorSlotChain.addLast()方法构建单向链表
            //将责任链的节点ProcessorSlot实例放入DefaultProcessorSlotChain中
            chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
        }
        //返回单向链表
        return chain;
    }
}

第一次使用SPI机制是初始化SlotChainBuilder,在com.alibaba.csp.sentinel.slotchain.SlotChainBuilder文件中,框架默认的是com.alibaba.csp.sentinel.slots.DefaultSlotChainBuilder。

在SpiLoader的loadFirstInstanceOrDefault()方法中,根据"if (defaultClass == null || clazz != defaultClass)"可知,系统接入Sentinel时可以自定义SPI接口文件来替换DefaultSlotChainBuilder。

比如想删除AuthoritySlot,那么可以在META-INF/services目录下,创建文件名如下,文件值为自定义接口的实现类全路径,如下:

文件名:com.alibaba.csp.sentinel.slotchain.SlotChainBuilder
文件值:com.example.MyCustomSlotChainBuilder

然后在MyCustomSlotChainBuilder中就可以自定义一套处理链的规则。

public class MyCustomSlotChainBuilder implements SlotChainBuilder {
    @Override
    public ProcessorSlotChain build() {
        ProcessorSlotChain chain = new DefaultProcessorSlotChain();
        chain.addLast(new NodeSelectorSlot());
        chain.addLast(new ClusterBuilderSlot());
        chain.addLast(new StatisticSlot());
        chain.addLast(new FlowSlot());
        return chain;
    }
}

第二次使用SPI机制是执行DefaultSlotChainBuilder的build()方法初始化处理链,其中的核心代码是SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted()。所调用的方法会读取com.alibaba.csp.sentinel.slotchain.ProcessorSlot文件内容,然后根据文件内容加载Class,接着将Class放到sortedClassList集合中,最后实例化sortedClassList集合中的Class并返回ProcessorSlot实例列表。

注意:ProcessorSlot实现类是有序的,即处理链的节点是有序的。比如ClusterBuilderSlot的前一个节点必须是NodeSelectorSlot,StatisticSlot的前一个节点必须是ClusterBuilderSlot等。

(4)Sentinel初始化处理链之加载SPI文件

Sentinel初始化处理链时会先后调用如下两个方法,这两个方法都会调用SpiLoader的load()方法来加载SPI文件。

SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault()
SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted()

SpiLoader的load()方法的执行过程就是加载SPI文件的过程。但是由于SPI文件可能会有很多个,比如负责管控整个处理链的Builder对应的SPI文件:

com.alibaba.csp.sentinel.slotchain.SlotChainBuilder

以及处理链节点的ProcessorSlot对应的SPI文件:

com.alibaba.csp.sentinel.slotchain.ProcessorSlot

虽然它们都隶属于META-INF/services下,但其文件名是不一样的。所以按理应在load()方法添加一个参数如:load(String fileName)。但是Sentinel却没这么做,因为Sentinel使用配置去替代参数。

具体做法就是:先在SpiLoader内定义常量SPI_FILE_PREFIX = "META-INF/services/"。有了SPI文件夹后,还需接口全路径名为名称的文件,就可读取文件了。

于是SpiLoader提供了一个静态的of()方法,来指定要加载那一类接口。也就是SpiLoader的of()方法会返回一个指定加载某种接口的SpiLoader实例。

然后在执行指定加载某一类接口的SpiLoader实例的load()方法时,就能将指定要加载的接口的名称拼接上SPI_FILE_PREFIX文件夹前缀,得到一份完整的文件路径,接着就可通过IO流读取文件里的内容。

从文件中获取到指定加载的某一类接口的实现类类名之后,就可以通过Class.forName() + 线程上下文类加载器去加载对应的实现类,加载到的实现类会放入到classList和sortedClassList两个列表中。

注意:Sentinel的SPI机制没有使用JDK内置的ServiceLoader,而是自己实现。因为Sentinel的SPI有一些定制化逻辑,比如@Spi注解的order属性可以指定实例化类时的顺序。但本质上和JDK内置的ServiceLoader一致,只是多了个性化的逻辑。

public final class SpiLoader<S> {
    //Default path for the folder of Provider configuration file
    //SPI文件夹路径
    private static final String SPI_FILE_PREFIX = "META-INF/services/";
    //Cache the SpiLoader instances, key: classname of Service, value: SpiLoader instance
    //每个接口Class对应的SpiLoader,只需new一次即可,new完就可以将其缓存起来
    private static final ConcurrentHashMap<String, SpiLoader> SPI_LOADER_MAP = new ConcurrentHashMap<>();
    //Cache the classes of Provider
    private final List<Class<? extends S>> classList = Collections.synchronizedList(new ArrayList<Class<? extends S>>());
    //Cache the sorted classes of Provider
    private final List<Class<? extends S>> sortedClassList = Collections.synchronizedList(new ArrayList<Class<? extends S>>());
    //The Service class, must be interface or abstract class
    //指定要使用SPI进行加载的实现类的接口的Class,比如SlotChainBuilder.class、ProcessorSlot.class
    private Class<S> service;
    ...
    private SpiLoader(Class<S> service) {
        this.service = service;
    }
    
    //Create SpiLoader instance via Service class
    //Cached by className, and load from cache first
    //@param service Service class 指定要使用SPI加载的实现类的接口的Class
    public static <T> SpiLoader<T> of(Class<T> service) {
        //判断是不是null
        AssertUtil.notNull(service, "SPI class cannot be null");
        //判断是不是interface类型
        AssertUtil.isTrue(service.isInterface() || Modifier.isAbstract(service.getModifiers()), "SPI class[" + service.getName() + "] must be interface or abstract class");


        //获取接口的全路径名,判断缓存里是否已经存在
        String className = service.getName();
        SpiLoader<T> spiLoader = SPI_LOADER_MAP.get(className);
        //缓存里没有,则使用Double Check + 锁机制去初始化SpiLoader,然后将初始化好的SpiLoader实例放到缓存
        if (spiLoader == null) {
            synchronized (SpiLoader.class) {
                spiLoader = SPI_LOADER_MAP.get(className);
                if (spiLoader == null) {
                    //new SpiLoader<>(service)初始化SpiLoader实例
                    SPI_LOADER_MAP.putIfAbsent(className, new SpiLoader<>(service));
                    spiLoader = SPI_LOADER_MAP.get(className);
                }
            }
        }
        //返回SpiLoader实例
        return spiLoader;
    }
    
    //Load the first-found Provider instance,if not found, return default Provider instance
    public S loadFirstInstanceOrDefault() {
        //SPI机制加载Class,然后将加载的Class放到classList数组里
        load();
        //循环遍历,根据classList里的Class来初始化对应的实例
        for (Class<? extends S> clazz : classList) {
            if (defaultClass == null || clazz != defaultClass) {
                return createInstance(clazz);
            }
        }
        //初始化默认的DefaultSlotChainBuilder
        return loadDefaultInstance();
    }
    
    //Load all Provider instances of the specified Service, sorted by order value in class's {@link Spi} annotation
    public List<S> loadInstanceListSorted() {
        //比如读取com.alibaba.csp.sentinel.slotchain.ProcessorSlot文件里的Class名字
        //然后根据这些Class名字加载Class
        //接着将Class放到sortedClassList集合中
        load();
        //实例化sortedClassList集合里的每个Class
        return createInstanceList(sortedClassList);
    }
    
    //Load the Provider class from Provider configuration file
    public void load() {
        ...
        //IO流读取文件内容
        while (urls.hasMoreElements()) {
            ...
            clazz = (Class<S>) Class.forName(line, false, classLoader);
            //加入到集合中
            classList.add(clazz);
            ...
        }
        ...
        //生成新的按照order排序的集合
        sortedClassList.addAll(classList);
        //进行排序
        Collections.sort(sortedClassList, new Comparator<Class<? extends S>>() {
            @Override
            public int compare(Class<? extends S> o1, Class<? extends S> o2) {
                //获取Spi注解
                Spi spi1 = o1.getAnnotation(Spi.class);
                //获取Spi注解的order属性
                int order1 = spi1 == null ? 0 : spi1.order();
                Spi spi2 = o2.getAnnotation(Spi.class);
                int order2 = spi2 == null ? 0 : spi2.order();
                return Integer.compare(order1, order2);
            }
        });
    }
    ...
}

(5)Sentinel初始化处理链之实例化Class

执行完SpiLoader的load()方法加载好指定的实现类的Class后,就会调用SpiLoader的createInstance()方法或createInstanceList()方法实例化Class。

SpiLoader放入createInstance(clazz)方法内部会判断是否是单例,如果是单例则放到缓存当中,如果不是单例则每次都通过clazz.newInstance()方法创建一个新的对象。

public final class SpiLoader<S> {
    //Cache the singleton instance of Provider, key: classname of Provider, value: Provider instance
    private final ConcurrentHashMap<String, S> singletonMap = new ConcurrentHashMap<>();
    ...
    //Create Provider instance list
    private List<S> createInstanceList(List<Class<? extends S>> clazzList) {
        if (clazzList == null || clazzList.size() == 0) {
            return Collections.emptyList();
        }

        List<S> instances = new ArrayList<>(clazzList.size());
        for (Class<? extends S> clazz : clazzList) {
            S instance = createInstance(clazz);
            instances.add(instance);
        }
        return instances;
    }
    
    //Create Provider instance
    private S createInstance(Class<? extends S> clazz) {
        Spi spi = clazz.getAnnotation(Spi.class);
        boolean singleton = true;
        if (spi != null) {
            singleton = spi.isSingleton();
        }
        return createInstance(clazz, singleton);
    }
    
    //Create Provider instance
    private S createInstance(Class<? extends S> clazz, boolean singleton) {
        S instance = null;
        try {
            if (singleton) {
                instance = singletonMap.get(clazz.getName());
                if (instance == null) {
                    synchronized (this) {
                        instance = singletonMap.get(clazz.getName());
                        if (instance == null) {
                            instance = service.cast(clazz.newInstance());
                            singletonMap.put(clazz.getName(), instance);
                        }
                    }
                }
            } else {
                instance = service.cast(clazz.newInstance());
            }
        } catch (Throwable e) {
            fail(clazz.getName() + " could not be instantiated");
        }
        return instance;
    }
    ...
}

(6)总结

一.初始化处理链时需要加锁和使用缓存

初始化一个Entry对象时需要根据一个处理链对象进行初始化,并发请求同一接口时的多个同样的Entry对象会共用一个处理链对象,所以初始化处理链的过程中需要加锁来保证线程安全性,初始化完处理链后需要将其放到全局缓存里来提高性能。

二.初始化处理链时分两步使用SPI

首先初始化Builder,负责管控处理链整体即编排处理链节点。Sentinel要求Builder只能存在一个,而且是外部系统优先原则。可以在com.alibaba.csp.sentinel.slotchain.SlotChainBuilder文件中,指定替代默认的DefaultSlotChainBuilder的自定义接口实现类。

然后通过Builder初始化完整的处理链,这里也是通过SPI机制进行初始化。因此可以进行扩展,比如外部系统自定义ProcessorSlot添加到处理链。注意ProcessorSlot是有顺序的,如果顺序没指定正确,则可能造成异常。ProcessorSlot的顺序可以通过@Spi注解的order属性设置。为了防止出现bug,建议直接将自定义的ProcessorSlot放到最后。

三.Sentinel通过配置的方式来替代传入参数

SpiLoader的of()方法会返回一个指定加载某种实现类的SpiLoader实例,这样就可以使用类似SpiLoader.of().load()的方式通过SPI进行加载。

四.Sentinel没有采用JDK内置的ServiceLoader来实现SPI机制

Sentinel单独写一套SPI的实现逻辑,核心原因是需要支持个性化的配置。比如@Spi注解支持order排序属性以及isSingleton是否单例属性。如果是单例,则每个类全局只能实例化一次(通过Map缓存实现)。如果不是单例,则每次都new一个新的对象。

五.Sentinel的这套SPI机制可以当作工具类拷贝到业务系统中

因为没有其他外部依赖,它被单独放到一个包里。

5.Sentinel默认处理链ProcessorSlot的构建

(1)Sentinel默认处理链的节点

(2)Sentinel默认处理链的构建

(1)Sentinel默认处理链的节点

Sentinel中的ProcessorSlot是流量控制处理过程中的处理器,每个ProcessorSlot处理器子类负责处理特定的任务。Sentinel默认的处理链会基于SPI配置在sentinel-core模块的如下文件中:

# Sentinel-1.8.6/sentinel-core/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.slotchain.ProcessorSlot
# Sentinel default ProcessorSlots
com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot
com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot
com.alibaba.csp.sentinel.slots.logger.LogSlot
com.alibaba.csp.sentinel.slots.statistic.StatisticSlot
com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
com.alibaba.csp.sentinel.slots.system.SystemSlot
com.alibaba.csp.sentinel.slots.block.flow.FlowSlot
com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot

一共8个Slot,每个Slot都是一个过滤器,各自承担不同的职责。这些Slot整体可以划分为两类:指标数据采集的Slot和规则验证的Slot。其中规则验证的Slot包括:授权验证、流控验证、熔断降级验证。

每个Slot都有自己在整个处理链(处理器插槽链条)中的顺序,具体的顺序(也就是实例化顺序)并不是根据SPI文件的编写顺序来确定的,而是基于修饰每个Slot的@Spi注解来确定的。

@Spi注解有一个order属性用于设置顺序。SpiLoader的load()方法在读取SPI文件时,会按order属性对Slot进行排序,并将排好序的ProcessorSlot实现类放入sortedClassList中。这样后续就可以遍历sortedClassList,按照顺序实例化这些Slot的实现类。

一.NodeSelectorSlot

负责创建和维护资源调用树(资源调用关系),同时为资源访问对象Entry关联对应的统计节点DefaultNode。这样不仅可实现对资源调用链路的监控,还能统计每个资源的调用信息。

二.ClusterBuilderSlot

负责根据资源的统计信息,计算集群维度的统计数据。集群维度统计数据是从资源维度的统计数据中整合得到的,这些统计数据用于实现资源在集群中的流量控制、熔断降级等功能。

三.LogSlot

负责记录请求异常时的日志,可用于故障排查。

四.StatisticsSlot

负责统计资源的调用数据,如成功调用次数、异常次数、响应时间等。这些数据可用于分析资源的性能,或驱动其他Slot(如限流降级Slot)运行。

五.AuthoritySlot

负责进行授权控制,根据资源的授权规则来判断是否允许请求进行访问。如果请求不被允许访问,AuthoritySlot将抛出AuthorityException异常。

六.SystemSlot

负责进行系统保护,根据系统保护规则(如CPU使用率、负载等)判断请求是否需要被限制。如果需要被限制,SystemSlot将抛出SystemException异常。

七.FlowSlot

负责进行流量控制,根据资源的流量控制规则(如QPS限制等)来判断请求是否需要被限流。如果需要被限流,FlowSlot将抛出一个FlowException异常。

八.DegradeSlot

负责进行熔断降级,根据资源的熔断降级规则(如异常比例等)来判断请求是否需要被降级。如果需要被降级,DegradeSlot将抛出一个DegradeException异常。

(2)Sentinel默认处理链的构建

处理链(处理器插槽链条)ProcessorSlotChain其实就是一条责任链,由于责任链是典型的单向链表结构,所以每个Slot必须要有一个next属性,用于指向下一个节点。

为了实现单向链表结构:可以利用已排好序的Slot列表以及每个Slot都有的next引用的特性。只需遍历已排序的Slot列表,让每个Slot的next引用指向下一个Slot即可。这是一个非常简单的单向链表操作,可将这个过程封装到DefaultProcessorSlotChain类中的addLast()方法中。

//Builder for a default {@link ProcessorSlotChain}.
@Spi(isDefault = true)
public class DefaultSlotChainBuilder implements SlotChainBuilder {
    @Override
    public ProcessorSlotChain build() {
        //创建一个DefaultProcessorSlotChain对象实例
        ProcessorSlotChain chain = new DefaultProcessorSlotChain();
        //通过SPI机制加载责任链的节点ProcessorSlot实现类
        //然后按照@Spi注解的order属性进行排序并进行实例化
        //最后将ProcessorSlot实例放到sortedSlotList中
        List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();
        //遍历已排好序的ProcessorSlot列表
        for (ProcessorSlot slot : sortedSlotList) {
            //安全检查,防止业务系统也写了一个SPI文件,但没按规定继承AbstractLinkedProcessorSlot
            if (!(slot instanceof AbstractLinkedProcessorSlot)) {
                RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
                continue;
            }
            //调用DefaultProcessorSlotChain.addLast()方法构建单向链表
            //将责任链的节点ProcessorSlot实例放入DefaultProcessorSlotChain中
            chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
        }
        //返回单向链表
        return chain;
    }
}

public class DefaultProcessorSlotChain extends ProcessorSlotChain {
    AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {
        @Override
        public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args) throws Throwable {
            super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
        }

        @Override
        public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
            super.fireExit(context, resourceWrapper, count, args);
        }
    };
    
    AbstractLinkedProcessorSlot<?> end = first;

    @Override
    public void addFirst(AbstractLinkedProcessorSlot<?> protocolProcessor) {
        protocolProcessor.setNext(first.getNext());
        first.setNext(protocolProcessor);
        if (end == first) {
            end = protocolProcessor;
        }
    }
    
    @Override
    public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) {
        end.setNext(protocolProcessor);
        end = protocolProcessor;
    }
    
    @Override
    public void setNext(AbstractLinkedProcessorSlot<?> next) {
        addLast(next);
    }
    
    @Override
    public AbstractLinkedProcessorSlot<?> getNext() {
        return first.getNext();
    }
    
    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args) throws Throwable {
        first.transformEntry(context, resourceWrapper, t, count, prioritized, args);
    }
    
    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        first.exit(context, resourceWrapper, count, args);
    }
}

public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {
    private AbstractLinkedProcessorSlot<?> next = null;
    
    @Override
    public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable {
        if (next != null) {
            next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
        }
    }
    
    @SuppressWarnings("unchecked")
    void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args) throws Throwable {
        T t = (T)o;
        entry(context, resourceWrapper, t, count, prioritized, args);
    }
    
    @Override
    public void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        if (next != null) {
            next.exit(context, resourceWrapper, count, args);
        }
    }
    
    public AbstractLinkedProcessorSlot<?> getNext() {
        return next;
    }
    
    public void setNext(AbstractLinkedProcessorSlot<?> next) {
        this.next = next;
    }
}

后端技术栈的基础修养 文章被收录于专栏

详细介绍后端技术栈的基础内容,包括但不限于:MySQL原理和优化、Redis原理和应用、JVM和G1原理和优化、RocketMQ原理应用及源码、Kafka原理应用及源码、ElasticSearch原理应用及源码、JUC源码、Netty源码、zk源码、Dubbo源码、Spring源码、Spring Boot源码、SCA源码、分布式锁源码、分布式事务、分库分表和TiDB、大型商品系统、大型订单系统等

全部评论

相关推荐

评论
1
1
分享

创作者周榜

更多
牛客网
牛客企业服务