sentinel降级背后的原理解析

  张一帆   2020年04月04日


  • 介绍

  Sentinel是阿里中间件团队开源的,面向分布式服务加购的轻量级高可用流量控制组件,主要以流量为切入点,从流量控制、熔断降级、系统负载保护等多个维度来帮助用户保护服务的稳定性。

  首先让我们比对下目前业界流行的熔断降级库Netfilix Hystrix的异同。


  • 项目结构

  这是sentinel项目的整体结构图:

sentinal结构图

  • sentinel-core 核心模块,限流、降级、系统保护等都在这里。因为这样可以单独提供sdk。
  • sentinel-dashboard 控制后台,可以对sentinal客户端进行可视化管理。
  • sentinel-transport 传输模块,提供基本的监控服务端和客户端的API接口。
  • sentinel-extension 扩展模块,主要对dataSource进行了部分扩展实现。
  • sentinel-adapter 适配器模块,主要实现了对一些常见框架的适配。
  • sentinel-demo 样例,里面教你如何使用限流、降级等。
  • sentinel-benchmark 基准测试模块,对核心代码的精确性提供基准测试。

  • 运行样例

  在sentinel-demo模块中有很多不同的样例,我们找到FlowQpsDemo,看看限流的样例。限流有很多种类的限流,先找qps限流的类来看。

  /*
有一个TimeTicker线程在做统计,每1秒钟做一次。有N个RunTask线程在模拟请求,被访问的business code被资源key保护着,根据规则,每秒只允许20个请求通过。
*/
public class FlowQpsDemo {

    //key是qps江流的隔离名称
    private static final String KEY = "abc";

    /*
    全局共享三个变量
    使用原子整形类型变量记录通过,拦截及总体数量。
    利用AtomicInteger就是因为原子性的特性,可以帮助我们防止多线程的安全问题。
    */
    private static AtomicInteger pass = new AtomicInteger();
    private static AtomicInteger block = new AtomicInteger();
    private static AtomicInteger total = new AtomicInteger();

    private static volatile boolean stop = false;

    //线程总数
    private static final int threadCount = 32;

    //时间为 100s
    private static int seconds = 60 + 40;

    public static void main(String[] args) throws Exception {
        initFlowQpsRule();

        tick();
        // first make the system run on a very low condition
        simulateTraffic();

        System.out.println("===== begin to do flow control");
        System.out.println("only 20 requests per second can pass");

    }

    /*
    初始化限流规则
    */
    private static void initFlowQpsRule() {
        List<FlowRule> rules = new ArrayList<FlowRule>();
        FlowRule rule1 = new FlowRule();
        rule1.setResource(KEY);
        //设置qps最大值20个
        rule1.setCount(20);
        //设置限流类型:根据qps
        rule1.setGrade(RuleConstant.FLOW_GRADE_QPS);
        //需要被限制的应用名称。default代表允许全部应用
        rule1.setLimitApp("default");
        rules.add(rule1);
        FlowRuleManager.loadRules(rules);
    }
    
    //模拟请求。并发32个线程运行 RunTask()任务
    private static void simulateTraffic() {
        for (int i = 0; i < threadCount; i++) {
            Thread t = new Thread(new RunTask());
            t.setName("simulate-traffic-Task");
            t.start();
        }
    }

    //启动一个线程来运行TimerTask()任务。
    private static void tick() {
        Thread timer = new Thread(new TimerTask());
        timer.setName("sentinel-timer-task");
        timer.start();
    }

    //记录相关事件时间耗时的,tick()调用的所以就运行1次。
    static class TimerTask implements Runnable {

        @Override
        public void run() {
            long start = System.currentTimeMillis();
            System.out.println("begin to statistic!!!");

            long oldTotal = 0;
            long oldPass = 0;
            long oldBlock = 0;
            /*
            循环。
            
            */
            while (!stop) {
                try {
                    //1秒执行一次
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                }
                //全局总数量
                long globalTotal = total.get();
                //一秒内的全部数量
                long oneSecondTotal = globalTotal - oldTotal;
                //
                oldTotal = globalTotal;

                long globalPass = pass.get();
                //一秒内通过的数量
                long oneSecondPass = globalPass - oldPass;
                oldPass = globalPass;

                long globalBlock = block.get();
                //一秒内拒绝的数量
                long oneSecondBlock = globalBlock - oldBlock;
                oldBlock = globalBlock;

                System.out.println(seconds + " send qps is: " + oneSecondTotal);
                System.out.println(TimeUtil.currentTimeMillis() + ", total:" + oneSecondTotal
                    + ", pass:" + oneSecondPass
                    + ", block:" + oneSecondBlock);
                //当时间耗尽时,停止循环
                if (seconds-- <= 0) {
                    stop = true;
                }
            }
            //整个线程耗时
            long cost = System.currentTimeMillis() - start;
            System.out.println("time cost: " + cost + " ms");
            System.out.println("total:" + total.get() + ", pass:" + pass.get()
                + ", block:" + block.get());
            System.exit(0);
        }
    }

    //
    static class RunTask implements Runnable {
        @Override
        public void run() {
            while (!stop) {
                Entry entry = null;

                try {
                    //进行限流,采用key。因为已经在上面的方法里设置了限流策略了,所以在这个线程中,会自动按照限流策略进行限流的。我们只需要对结果进行监控,然后执行之后逻辑即可。
                    entry = SphU.entry(KEY);
                    // 如果没有抛出异常的话,就说明通过了。就给pass+1
                    pass.addAndGet(1);
                } catch (BlockException e1) {
                    获得了BlockExcepton就给block+1
                    block.incrementAndGet();
                } catch (Exception e2) {
                    // 这个是在业务中抛出了异常了。需要对业务逻辑异常处理
                } finally {
                    //都会走到这里,给total+1
                    total.incrementAndGet();
                    //限流策略退出
                    if (entry != null) {
                        entry.exit();
                    }
                }

                Random random2 = new Random();
                try {
                    //随机睡一会
                    TimeUnit.MILLISECONDS.sleep(random2.nextInt(50));
                } catch (InterruptedException e) {
                    // ignore
                }
            }
        }
    }
}


  运行程序后,结果如下:

  begin to statistic!!!
===== begin to do flow control
only 20 requests per second can pass
100 send qps is: 1125
1621777621479, total:1125, pass:32, block:1093
99 send qps is: 1195
1621777622478, total:1195, pass:20, block:1175
98 send qps is: 1168
1621777623481, total:1168, pass:20, block:1148
97 send qps is: 1198
1621777624481, total:1198, pass:21, block:1177
96 send qps is: 1155
1621777625486, total:1155, pass:20, block:1135
95 send qps is: 1210
1621777626491, total:1210, pass:20, block:1190
94 send qps is: 1163
1621777627495, total:1163, pass:23, block:1140
93 send qps is: 1158
1621777628498, total:1158, pass:20, block:1138

  可以看到,上面的结果中,pass的数量和我们设置的不一样。我们设定的是每秒可以pass的数量是20个。但是有很多的请求都是超过20个的。

  原因是SphU.entry(KEY)的内部是没有加锁的,所以就会在高并发的情况下,pass的数量就会高于20.由于pass、block、total是全局共享的,而多个RunTask()线程在执行SphU.entry(KEY)申请获取entry时,内部没有所保护,所以会存在pass的个数超过设定的阈值。

runTask模型

  接下来,我把threadCount改为1,只有一个线程来执行这个方法,结果如下。

  begin to statistic!!!
===== begin to do flow control
only 20 requests per second can pass
100 send qps is: 36
1621784048375, total:36, pass:26, block:10
99 send qps is: 37
1621784049379, total:37, pass:20, block:17
98 send qps is: 33
1621784050384, total:33, pass:20, block:13
97 send qps is: 39
1621784051389, total:39, pass:20, block:19
96 send qps is: 35
1621784052394, total:35, pass:20, block:15
95 send qps is: 33
1621784053398, total:33, pass:20, block:13


  • 深入原理

  一切的开始让我们从SphU.entry()开始。

  public class SphU {

    ……
    
    /*
    为给定的resource记录统计数据并且执行方法检查
    */
    public static Entry entry(String name) throws BlockException {
        return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0);
    }

  /*
sentinel的环境类,这个类会触发所有的sentinel的初始化
这个类有个特殊注意到的地方,为了防止程序上的deadlock。不能让其他的类的静态方法块或者静态变量不要指向这个类。
*/
public class Env {

    //主要是这块代码
    public static final Sph sph = new CtSph();

    static {
        // If init fails, the process will exit.
        InitExecutor.doInit();
    }
}

  SphU.entry中Env是一个类,我们知道在JVM的生命周期里,每个类都会被加载一次,而且加载的原则是懒加载。这个类中有个Sph类型的静态变量,指向了实例化后的CtSph()。也就是说Env.sph.entry()最终是调用了CtsSph.entry()。

  public class CtsSph {

    /*
    检查所有resource的限流规则
    每个唯一的resource会用一个ProcessorSlot做限流检查。同样的resource会用同样的ProcessorSlot。
    注意的是ProcessorSlot总数不能超过配置的MAX_SLOT_CHAIN_SIZE值。否则,所有的限流检查都会终止。也就是说同一时刻的所有请求都会通过,没有任何限流。
    */
    public Entry entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
        return entryWithPriority(resourceWrapper, count, false, args);
    }
    
    /*
    这个方法是具体的实现,也是比较重要的。
    */
    private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
        throws BlockException {
        
        //获取当前的上线文
        Context context = ContextUtil.getContext();
        if (context instanceof NullContext) {
            // The {@link NullContext} indicates that the amount of context has exceeded the threshold,
            // so here init the entry only. No rule checking will be done.
            //NullContext代表的是上下文数量已经超过了阈值,所以这里初始化CtEntry,就是不会做任何限流检查。
            return new CtEntry(resourceWrapper, null, context);
        }

        //如果没有上下文就给一个默认的上下文。
        if (context == null) {
            // Using default context.
            context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
        }

        // 开关,可以控制是否做限流检查
        if (!Constants.ON) {
            return new CtEntry(resourceWrapper, null, context);
        }

        //这个比较重要,获取该资源对应的SlotChain
        ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);

         //slot chain超过了Constants.MAX_SLOT_CHAIN_SIZE。所以就是说没有限流检查
        if (chain == null) {
            return new CtEntry(resourceWrapper, null, context);
        }

        Entry e = new CtEntry(resourceWrapper, chain, context);
        try {
            // 执行Slot的entry方法
            chain.entry(context, resourceWrapper, null, count, prioritized, args);
        } catch (BlockException e1) {
            // 抛出BlockExecption
            e.exit(count, args);
            throw e1;
        } catch (Throwable e1) {
            // This should not happen, unless there are errors existing in Sentinel internal.
            RecordLog.info("Sentinel unexpected exception", e1);
        }
        return e;
    }
}

  这个方法可以分为以下几个部分:

  • 对参数和全局配置做检测,如果不符合要求就返回一个CtEntry对象,不会再进行任何限流检测。否则就会进入检测流程。
  • 根据包装过的资源对象获取对应的SlotChain
  • 执行SlotChain的entry方法,如果抛出了BlockExecption就代表被限流了。如果没有就返回Entry对象

  • ProcessorSlot是什么

  我们先来看看ProcessorSlot到底是个什么东西。他是一个接口,详细的请看下面的程序解释。

  /*
某些流程的容器,以及流程完成时的通知方式。

直译的话就是处理器插槽,是sentinel实现限流降级、熔断降级、系统自适应降级等功能的切入点。Sentinel 提供的 ProcessorSlot 可以分为两类,一类是辅助完成资源指标数据统计的切入点,一类是实现降级功能的切入点。
*/
public interface ProcessorSlot<T> {

    //slot的入口
    void entry(Context context, ResourceWrapper resourceWrapper, T param, int count, boolean prioritized,
               Object... args) throws Throwable;


     //触发入entry
    void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized,
                   Object... args) throws Throwable;
                   
     //slot的出口
    void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);

     //触发出entry
    void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);
}


  通过实现关系,我们可以看到这两类ProcessorSlot分别是:

  辅助资源指标数据统计的 ProcessorSlot:

  • NodeSelectorSlot:为当前资源创建 DefaultNode
  • ClusterBuilderSlot:如果当前资源未创建 ClusterNode,则为资源创建 ClusterNode;将 ClusterNode 赋值给当前资源的 DefaultNode.clusterNode;如果调用来源(origin)不为空,则为调用来源创建 StatisticNode,用于实现按调用来源统计资源的指标数据,ClusterNode 持有每个调用来源的 StatisticNode。
  • StatisticSlot:这是 Sentinel 最为重要的类之一,用于实现指标数据统计。先是调用后续的 ProcessorSlot#entry 判断是否放行请求,再根据判断结果进行相应的指标数据统计操作。

  实现降级功能的 ProcessorSlot:

  • AuthoritySlot:实现黑白名单降级
  • SystemSlot:实现系统自适应降级
  • FlowSlot:实现限流降级
  • DegradeSlot:实现熔断降级

  • ProcessorSlot 的创建

  然后我们看看是如何将ProcessorSlot创建成链的:

   //通过slot chain builder SPI提供的创建slot链的类
public final class SlotChainProvider {

    private static volatile SlotChainBuilder slotChainBuilder = null;


     /*
     加载和选择进程不是线程安全的,但是如果只是通过lookProcessChain()方法调用就可以,因为CtSph是带锁的。一会就能看到lookProcessChain这个方法了。这个方法是有synchronized的。
     */
    public static ProcessorSlotChain newSlotChain() {
        if (slotChainBuilder != null) {
            return slotChainBuilder.build();
        }

        //通过chain builder的SPI来实现代码和业务的隔离,也就是说通过配置文件来选择可用的builder.以此builder来创建ProcessorSlotChain
        slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();

        if (slotChainBuilder == null) {
            // Should not go through here.
            //一般来说不应该为null,因为作者已经预留了文件在resources的META-INF中。但是为了万一,还是判断一下,给出个解决方案。
            RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
            slotChainBuilder = new DefaultSlotChainBuilder();
        } else {
            RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: {}",
                slotChainBuilder.getClass().getCanonicalName());
        }
        //返回,用选择的builder调用build方法,创建ProcessorSlotChain
        return slotChainBuilder.build();
    }

    private SlotChainProvider() {}
}


  我很感兴趣slotChainBuilder,于是就去看了眼这个类。其实他是一个接口,接口中就一个build()方法。目前系统提供了两种实现,一个是DefaultSlotChainBuilder,一个是DemoSlotChainBuilder。这块对后续的增加留下了扩展性。主要体现的是建造者模式思想。

  public interface SlotChainBuilder {

     //构造 processor slot chain
    ProcessorSlotChain build();
}

  那接下来我选择DemoSlotChainBuilder看下怎么实现的。

  /*
一个例子来讲述如何自定义建造一个slot chain
*/
@Spi
public class DemoSlotChainBuilder implements SlotChainBuilder {

    @Override
    public ProcessorSlotChain build() {

        ProcessorSlotChain chain = new DefaultProcessorSlotChain();

        List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();
        // 过滤掉降级
        //demo不会被降级异常阻塞的
        sortedSlotList.removeIf(o -> DegradeSlot.class.equals(o.getClass()));
        for (ProcessorSlot slot : sortedSlotList) {
            if (!(slot instanceof AbstractLinkedProcessorSlot)) {
                RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
                continue;
            }

            chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
        }

        return chain;
    }
}

  通过上面的演示,就创建除了一个slot chain了。返回的就是ProcessorSlotChain。


  • 继续深入原理

  
//chanMap是一个原子map,map的key是ResourceWrapper封装类,value是ProcessorSlotChain
private static volatile Map<ResourceWrapper, ProcessorSlotChain> chainMap
        = new HashMap<ResourceWrapper, ProcessorSlotChain>();
        
/*
获取当前资源的ProcessorSlotChain,如果没有的话就创建一个ProcessorSlotChain出来
同样的资源只要ResourceWrapper.equals(Object)相等,不论在不在同一个上下文中,都会全局共享同一个ProcessorSlotChain。
还有就是ProcessorSlot的总数不能超过Constants#MAX_SLOT_CHAIN_SIZE否则会返回null
*/
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
        //获取单向链表中的ProcessorSlotChain
        ProcessorSlotChain chain = chainMap.get(resourceWrapper);
        //chain是空的则进入,不为空就直接返回呗
        if (chain == null) {
            //这个就是刚才我在SlotChainProvider()方法中提到的。获取的话一定在lookProcessChain()中,因为有锁就保证了线程安全。
            synchronized (LOCK) {
                chain = chainMap.get(resourceWrapper);
                //double check。因为在锁里了。在之前查到现在查的时间内,有可能加锁前chain被别的线程注入了,那这块就可能出问题的。
                if (chain == null) {
                    // 超过预设值了,返回null吧。
                    if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
                        return null;
                    }

                    //刚才已经分析过了是如何创建slot chain的了。但是我只是用了demoSlotBuilder讲的。这块因为是用默认的DefaultSlotChainBuilder。所以我还得贴一下DefaultSlotChainBuilder的代码在下面
                    chain = SlotChainProvider.newSlotChain();
                    Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
                        chainMap.size() + 1);
                    newMap.putAll(chainMap);
                    newMap.put(resourceWrapper, chain);
                    chainMap = newMap;
                }
            }
        }
        return chain;
    }

  //默认的构建slot chain的建造者。请关注其中的SPI。
@Spi(isDefault = true)
public class DefaultSlotChainBuilder implements SlotChainBuilder {

    @Override
    public ProcessorSlotChain build() {
        ProcessorSlotChain chain = new DefaultProcessorSlotChain();
        //采用SPI,那就看看META-INF中规定的processorSlot,看看有几种规则吧。贴在下面了
        List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();
        //通过下面的列表我们知道了chain中依次加入了很多slot
        for (ProcessorSlot slot : sortedSlotList) {
            if (!(slot instanceof AbstractLinkedProcessorSlot)) {
                RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
                continue;
            }
            //DefaultProcessorSlotChain中的addLast
            chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
        }

        return chain;
    }
}


  //文件名:sentinel/sentinel-core/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.slotchain.ProcessorSlot

# Sentinel default ProcessorSlots
com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot
com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot
com.alibaba.csp.sentinel.slots.logger.LogSlot
com.alibaba.csp.sentinel.slots.statistic.StatisticSlot
com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
com.alibaba.csp.sentinel.slots.system.SystemSlot
com.alibaba.csp.sentinel.slots.block.flow.FlowSlot
com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot

  以上的这些slot已经被依次添加到了chain中了。然后调用DefaultProcessorSlotChain中的addLast完成责任链的创建。

  /*
DefaultProcessorSlotChain的实现。他继承了ProcessorSlotChain,继承了AbstractLinkedProcessorSlot实现了ProcessorSlot。
这一大坨的继承关系。其实仔细想想,也能明白。一个好的程序继承关系最多也就5层。不能再高了。再高了就不是一个好的设计方式了。
ProcessorSlotChain是一个抽象类,比继承的对象多了两个方法,addFirst()和addLast()。这两个方法必须在继承类中具体实现。
*/
public class DefaultProcessorSlotChain extends ProcessorSlotChain {

    /*
    构造了一个first节点。并且重写了entry和exit方法。
    注意,这个变量指向的是他的父亲
    */
    AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {

        @Override
        public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
            throws Throwable {
            //调用父亲的fireEntry()。这个方法会在真正进入entry时触发。是由first节点进行触发。
            super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
        }

        @Override
        public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
            //调用父亲的fireExit()
            super.fireExit(context, resourceWrapper, count, args);
        }

    };
    
    //同理,构造一个end节点,初始值和first指向一致,下面有图。 
    AbstractLinkedProcessorSlot<?> end = first;

    @Override
    public void addFirst(AbstractLinkedProcessorSlot<?> protocolProcessor) {
        protocolProcessor.setNext(first.getNext());
        first.setNext(protocolProcessor);
        if (end == first) {
            end = protocolProcessor;
        }
    }

    /*
    插入到链表中,从链表尾开始插。
    注意,一个是赋值模型的next值。 一个是更新变量的引用。
    */
    @Override
    public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) {
        //end的下一个节点指向变量传进来的slot。也就是first也指向了这个slot。注意这里的setNext方法。
        end.setNext(protocolProcessor);
        //end的指向更新,指向了新的slot
        end = protocolProcessor;
    }

    /*
    这个是这个方法的setNext。父类也有一个setNext。
    像上面end.setNext()其实是调用的父亲的setNExt()。不是这个方法。
    只有 this.setNext()或者setNext()才是用的这个方法。
    */
    @Override
    public void setNext(AbstractLinkedProcessorSlot<?> next) {
        addLast(next);
    }
    
    …………

}


  DefaultProcessorSlotChain中有两个AbstractLinkedProcessorSlot类型的变量:first和end,这就是链表的头结点和尾节点。

  创建DefaultProcessorSlotChain对象时,首先创建了首节点,然后把首节点赋值给了尾节点,可以用下图表示:

fist&end

  当把第一个slot(NodeSelectorSlot)添加(chain.addLast())到联表中。是这样的:

fist&end2

  将所有的节点都加入到联表中后,整个联表的结构变成了如下图所示:

fist&end3

  样就将所有的Slot对象添加到了链表中去了,每一个Slot都是继承自AbstractLinkedProcessorSlot。而AbstractLinkedProcessorSlot是一种责任链的设计,每个对象中都有一个next属性,指向的是另一个AbstractLinkedProcessorSlot对象。其实责任链模式在很多框架中都有,比如Netty中是通过pipeline来实现的。

  知道了SlotChain是如何创建的了,那接下来就要看下是如何执行Slot的entry方法的了。


  • 责任链是如何执行的

  回看entryWithPriority()方法。在创建玩成chain后。调用了这个方法:

  chain.entry(context, resourceWrapper, null, count, prioritized, args);

  这个方法最终调用的是

   public class DefaultProcessorSlotChain extends ProcessorSlotChain {

    …………
    
    //其实这个方法最终调用的是first的transformEntry()
    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
        throws Throwable {
        first.transformEntry(context, resourceWrapper, t, count, prioritized, args);
    }

    …………
}

  也就是说,DefaultProcessorSlotChain的entry实际是执行的first属性的transformEntry方法。

  而transformEntry方法会执行当前节点的entry方法,在DefaultProcessorSlotChain中first节点重写了entry方法,具体如下:

  public class DefaultProcessorSlotChain extends ProcessorSlotChain {
        @Override
        public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
            throws Throwable {
            super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
        }
}


  我们看到,实际entry()又执行了super的fireEntry()。

  public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {

    @Override
    public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
        throws Throwable {
        if (next != null) {
            //这个next指的就是下一个slot
            next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
        }
    }
}

  从这里看到,从fireEntry()中就开始传递执行entry了,这里会执行当前节点的下一个节点的transformEntry()。上面已经分析过了,transformEntry方法会触发当前节点的entry,也就是说fireEntry方法实际是触发了下一个节点的entry方法。具体的流程如下图所示:

责任链开始了传递

  那责任链开始传递了,每次传递到的slot都会运行entry()。我们应该将目光看看entry()的实现了。


  • slot的entry()

  从上面得知,链表中在第一个位置的是NodeSelectorSlot。

  @Spi(isSingleton = false, order = Constants.ORDER_NODE_SELECTOR_SLOT)
public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {

    /**
     * {@link DefaultNode}s of the same resource in different context.
     */
    private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);

    //关注这里。实际上调用的方法。 这个entry()方法的内部逻辑
    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
        throws Throwable {
        /*
         * It's interesting that we use context name rather resource name as the map key.
         *
         * Remember that same resource({@link ResourceWrapper#equals(Object)}) will share
         * the same {@link ProcessorSlotChain} globally, no matter in which context. So if
         * code goes into {@link #entry(Context, ResourceWrapper, DefaultNode, int, Object...)},
         * the resource name must be same but context name may not.
         *
         * If we use {@link com.alibaba.csp.sentinel.SphU#entry(String resource)} to
         * enter same resource in different context, using context name as map key can
         * distinguish the same resource. In this case, multiple {@link DefaultNode}s will be created
         * of the same resource name, for every distinct context (different context name) each.
         *
         * Consider another question. One resource may have multiple {@link DefaultNode},
         * so what is the fastest way to get total statistics of the same resource?
         * The answer is all {@link DefaultNode}s with same resource name share one
         * {@link ClusterNode}. See {@link ClusterBuilderSlot} for detail.
         */
        DefaultNode node = map.get(context.getName());
        if (node == null) {
            synchronized (this) {
                node = map.get(context.getName());
                if (node == null) {
                    node = new DefaultNode(resourceWrapper, null);
                    HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
                    cacheMap.putAll(map);
                    cacheMap.put(context.getName(), node);
                    map = cacheMap;
                    // Build invocation tree
                    ((DefaultNode) context.getLastNode()).addChild(node);
                }

            }
        }
        context.setCurNode(node);
        //调用下一个节点的entry()
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        fireExit(context, resourceWrapper, count, args);
    }
}


  执行完业务逻辑处理后,调用了fireEntry()方法,由此触发了下一个节点的entry方法。此时我们就知道了sentinel的责任链就是这样传递的:每个Slot节点执行完自己的业务后,会调用fireEntry来触发下一个节点的entry方法。因为每个slot的逻辑都不太一样,所以这里就不展开了,我将来会写每个slot的分析的。

整个逻辑

  至此就通过SlotChain完成了对每个节点的entry()方法的调用,每个节点会根据创建的规则,进行自己的逻辑处理,当统计的结果达到设置的阈值时,就会触发限流、降级等事件,具体是抛出BlockException异常。

本文参考: