目录 |
-
介绍
Sentinel是阿里中间件团队开源的,面向分布式服务加购的轻量级高可用流量控制组件,主要以流量为切入点,从流量控制、熔断降级、系统负载保护等多个维度来帮助用户保护服务的稳定性。
首先让我们比对下目前业界流行的熔断降级库Netfilix Hystrix的异同。
-
项目结构
这是sentinel项目的整体结构图:
- sentinel-core 核心模块,限流、降级、系统保护等都在这里。因为这样可以单独提供sdk。
- sentinel-dashboard 控制后台,可以对sentinal客户端进行可视化管理。
- sentinel-transport 传输模块,提供基本的监控服务端和客户端的API接口。
- sentinel-extension 扩展模块,主要对dataSource进行了部分扩展实现。
- sentinel-adapter 适配器模块,主要实现了对一些常见框架的适配。
- sentinel-demo 样例,里面教你如何使用限流、降级等。
- sentinel-benchmark 基准测试模块,对核心代码的精确性提供基准测试。
-
运行样例
在sentinel-demo模块中有很多不同的样例,我们找到FlowQpsDemo,看看限流的样例。限流有很多种类的限流,先找qps限流的类来看。
/*
有一个TimeTicker线程在做统计,每1秒钟做一次。有N个RunTask线程在模拟请求,被访问的business code被资源key保护着,根据规则,每秒只允许20个请求通过。
*/
public class FlowQpsDemo {
//key是qps江流的隔离名称
private static final String KEY = "abc";
/*
全局共享三个变量
使用原子整形类型变量记录通过,拦截及总体数量。
利用AtomicInteger就是因为原子性的特性,可以帮助我们防止多线程的安全问题。
*/
private static AtomicInteger pass = new AtomicInteger();
private static AtomicInteger block = new AtomicInteger();
private static AtomicInteger total = new AtomicInteger();
private static volatile boolean stop = false;
//线程总数
private static final int threadCount = 32;
//时间为 100s
private static int seconds = 60 + 40;
public static void main(String[] args) throws Exception {
initFlowQpsRule();
tick();
// first make the system run on a very low condition
simulateTraffic();
System.out.println("===== begin to do flow control");
System.out.println("only 20 requests per second can pass");
}
/*
初始化限流规则
*/
private static void initFlowQpsRule() {
List<FlowRule> rules = new ArrayList<FlowRule>();
FlowRule rule1 = new FlowRule();
rule1.setResource(KEY);
//设置qps最大值20个
rule1.setCount(20);
//设置限流类型:根据qps
rule1.setGrade(RuleConstant.FLOW_GRADE_QPS);
//需要被限制的应用名称。default代表允许全部应用
rule1.setLimitApp("default");
rules.add(rule1);
FlowRuleManager.loadRules(rules);
}
//模拟请求。并发32个线程运行 RunTask()任务
private static void simulateTraffic() {
for (int i = 0; i < threadCount; i++) {
Thread t = new Thread(new RunTask());
t.setName("simulate-traffic-Task");
t.start();
}
}
//启动一个线程来运行TimerTask()任务。
private static void tick() {
Thread timer = new Thread(new TimerTask());
timer.setName("sentinel-timer-task");
timer.start();
}
//记录相关事件时间耗时的,tick()调用的所以就运行1次。
static class TimerTask implements Runnable {
@Override
public void run() {
long start = System.currentTimeMillis();
System.out.println("begin to statistic!!!");
long oldTotal = 0;
long oldPass = 0;
long oldBlock = 0;
/*
循环。
*/
while (!stop) {
try {
//1秒执行一次
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
//全局总数量
long globalTotal = total.get();
//一秒内的全部数量
long oneSecondTotal = globalTotal - oldTotal;
//
oldTotal = globalTotal;
long globalPass = pass.get();
//一秒内通过的数量
long oneSecondPass = globalPass - oldPass;
oldPass = globalPass;
long globalBlock = block.get();
//一秒内拒绝的数量
long oneSecondBlock = globalBlock - oldBlock;
oldBlock = globalBlock;
System.out.println(seconds + " send qps is: " + oneSecondTotal);
System.out.println(TimeUtil.currentTimeMillis() + ", total:" + oneSecondTotal
+ ", pass:" + oneSecondPass
+ ", block:" + oneSecondBlock);
//当时间耗尽时,停止循环
if (seconds-- <= 0) {
stop = true;
}
}
//整个线程耗时
long cost = System.currentTimeMillis() - start;
System.out.println("time cost: " + cost + " ms");
System.out.println("total:" + total.get() + ", pass:" + pass.get()
+ ", block:" + block.get());
System.exit(0);
}
}
//
static class RunTask implements Runnable {
@Override
public void run() {
while (!stop) {
Entry entry = null;
try {
//进行限流,采用key。因为已经在上面的方法里设置了限流策略了,所以在这个线程中,会自动按照限流策略进行限流的。我们只需要对结果进行监控,然后执行之后逻辑即可。
entry = SphU.entry(KEY);
// 如果没有抛出异常的话,就说明通过了。就给pass+1
pass.addAndGet(1);
} catch (BlockException e1) {
获得了BlockExcepton就给block+1
block.incrementAndGet();
} catch (Exception e2) {
// 这个是在业务中抛出了异常了。需要对业务逻辑异常处理
} finally {
//都会走到这里,给total+1
total.incrementAndGet();
//限流策略退出
if (entry != null) {
entry.exit();
}
}
Random random2 = new Random();
try {
//随机睡一会
TimeUnit.MILLISECONDS.sleep(random2.nextInt(50));
} catch (InterruptedException e) {
// ignore
}
}
}
}
}
运行程序后,结果如下:
begin to statistic!!!
===== begin to do flow control
only 20 requests per second can pass
100 send qps is: 1125
1621777621479, total:1125, pass:32, block:1093
99 send qps is: 1195
1621777622478, total:1195, pass:20, block:1175
98 send qps is: 1168
1621777623481, total:1168, pass:20, block:1148
97 send qps is: 1198
1621777624481, total:1198, pass:21, block:1177
96 send qps is: 1155
1621777625486, total:1155, pass:20, block:1135
95 send qps is: 1210
1621777626491, total:1210, pass:20, block:1190
94 send qps is: 1163
1621777627495, total:1163, pass:23, block:1140
93 send qps is: 1158
1621777628498, total:1158, pass:20, block:1138
可以看到,上面的结果中,pass的数量和我们设置的不一样。我们设定的是每秒可以pass的数量是20个。但是有很多的请求都是超过20个的。
原因是SphU.entry(KEY)
的内部是没有加锁的,所以就会在高并发的情况下,pass的数量就会高于20.由于pass、block、total是全局共享的,而多个RunTask()线程在执行SphU.entry(KEY)申请获取entry时,内部没有所保护,所以会存在pass的个数超过设定的阈值。
接下来,我把threadCount改为1,只有一个线程来执行这个方法,结果如下。
begin to statistic!!!
===== begin to do flow control
only 20 requests per second can pass
100 send qps is: 36
1621784048375, total:36, pass:26, block:10
99 send qps is: 37
1621784049379, total:37, pass:20, block:17
98 send qps is: 33
1621784050384, total:33, pass:20, block:13
97 send qps is: 39
1621784051389, total:39, pass:20, block:19
96 send qps is: 35
1621784052394, total:35, pass:20, block:15
95 send qps is: 33
1621784053398, total:33, pass:20, block:13
-
深入原理
一切的开始让我们从SphU.entry()开始。
public class SphU {
……
/*
为给定的resource记录统计数据并且执行方法检查
*/
public static Entry entry(String name) throws BlockException {
return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0);
}
/*
sentinel的环境类,这个类会触发所有的sentinel的初始化
这个类有个特殊注意到的地方,为了防止程序上的deadlock。不能让其他的类的静态方法块或者静态变量不要指向这个类。
*/
public class Env {
//主要是这块代码
public static final Sph sph = new CtSph();
static {
// If init fails, the process will exit.
InitExecutor.doInit();
}
}
SphU.entry中Env是一个类,我们知道在JVM的生命周期里,每个类都会被加载一次,而且加载的原则是懒加载。这个类中有个Sph类型的静态变量,指向了实例化后的CtSph()。也就是说Env.sph.entry()最终是调用了CtsSph.entry()。
public class CtsSph {
/*
检查所有resource的限流规则
每个唯一的resource会用一个ProcessorSlot做限流检查。同样的resource会用同样的ProcessorSlot。
注意的是ProcessorSlot总数不能超过配置的MAX_SLOT_CHAIN_SIZE值。否则,所有的限流检查都会终止。也就是说同一时刻的所有请求都会通过,没有任何限流。
*/
public Entry entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
return entryWithPriority(resourceWrapper, count, false, args);
}
/*
这个方法是具体的实现,也是比较重要的。
*/
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
throws BlockException {
//获取当前的上线文
Context context = ContextUtil.getContext();
if (context instanceof NullContext) {
// The {@link NullContext} indicates that the amount of context has exceeded the threshold,
// so here init the entry only. No rule checking will be done.
//NullContext代表的是上下文数量已经超过了阈值,所以这里初始化CtEntry,就是不会做任何限流检查。
return new CtEntry(resourceWrapper, null, context);
}
//如果没有上下文就给一个默认的上下文。
if (context == null) {
// Using default context.
context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
}
// 开关,可以控制是否做限流检查
if (!Constants.ON) {
return new CtEntry(resourceWrapper, null, context);
}
//这个比较重要,获取该资源对应的SlotChain
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
//slot chain超过了Constants.MAX_SLOT_CHAIN_SIZE。所以就是说没有限流检查
if (chain == null) {
return new CtEntry(resourceWrapper, null, context);
}
Entry e = new CtEntry(resourceWrapper, chain, context);
try {
// 执行Slot的entry方法
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
// 抛出BlockExecption
e.exit(count, args);
throw e1;
} catch (Throwable e1) {
// This should not happen, unless there are errors existing in Sentinel internal.
RecordLog.info("Sentinel unexpected exception", e1);
}
return e;
}
}
这个方法可以分为以下几个部分:
- 对参数和全局配置做检测,如果不符合要求就返回一个CtEntry对象,不会再进行任何限流检测。否则就会进入检测流程。
- 根据包装过的资源对象获取对应的SlotChain
- 执行SlotChain的entry方法,如果抛出了BlockExecption就代表被限流了。如果没有就返回Entry对象
-
ProcessorSlot是什么
我们先来看看ProcessorSlot到底是个什么东西。他是一个接口,详细的请看下面的程序解释。
/*
某些流程的容器,以及流程完成时的通知方式。
直译的话就是处理器插槽,是sentinel实现限流降级、熔断降级、系统自适应降级等功能的切入点。Sentinel 提供的 ProcessorSlot 可以分为两类,一类是辅助完成资源指标数据统计的切入点,一类是实现降级功能的切入点。
*/
public interface ProcessorSlot<T> {
//slot的入口
void entry(Context context, ResourceWrapper resourceWrapper, T param, int count, boolean prioritized,
Object... args) throws Throwable;
//触发入entry
void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized,
Object... args) throws Throwable;
//slot的出口
void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);
//触发出entry
void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);
}
通过实现关系,我们可以看到这两类ProcessorSlot分别是:
辅助资源指标数据统计的 ProcessorSlot:
- NodeSelectorSlot:为当前资源创建 DefaultNode
- ClusterBuilderSlot:如果当前资源未创建 ClusterNode,则为资源创建 ClusterNode;将 ClusterNode 赋值给当前资源的 DefaultNode.clusterNode;如果调用来源(origin)不为空,则为调用来源创建 StatisticNode,用于实现按调用来源统计资源的指标数据,ClusterNode 持有每个调用来源的 StatisticNode。
- StatisticSlot:这是 Sentinel 最为重要的类之一,用于实现指标数据统计。先是调用后续的 ProcessorSlot#entry 判断是否放行请求,再根据判断结果进行相应的指标数据统计操作。
实现降级功能的 ProcessorSlot:
- AuthoritySlot:实现黑白名单降级
- SystemSlot:实现系统自适应降级
- FlowSlot:实现限流降级
- DegradeSlot:实现熔断降级
-
ProcessorSlot 的创建
然后我们看看是如何将ProcessorSlot创建成链的:
//通过slot chain builder SPI提供的创建slot链的类
public final class SlotChainProvider {
private static volatile SlotChainBuilder slotChainBuilder = null;
/*
加载和选择进程不是线程安全的,但是如果只是通过lookProcessChain()方法调用就可以,因为CtSph是带锁的。一会就能看到lookProcessChain这个方法了。这个方法是有synchronized的。
*/
public static ProcessorSlotChain newSlotChain() {
if (slotChainBuilder != null) {
return slotChainBuilder.build();
}
//通过chain builder的SPI来实现代码和业务的隔离,也就是说通过配置文件来选择可用的builder.以此builder来创建ProcessorSlotChain
slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();
if (slotChainBuilder == null) {
// Should not go through here.
//一般来说不应该为null,因为作者已经预留了文件在resources的META-INF中。但是为了万一,还是判断一下,给出个解决方案。
RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
slotChainBuilder = new DefaultSlotChainBuilder();
} else {
RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: {}",
slotChainBuilder.getClass().getCanonicalName());
}
//返回,用选择的builder调用build方法,创建ProcessorSlotChain
return slotChainBuilder.build();
}
private SlotChainProvider() {}
}
我很感兴趣slotChainBuilder,于是就去看了眼这个类。其实他是一个接口,接口中就一个build()方法。目前系统提供了两种实现,一个是DefaultSlotChainBuilder,一个是DemoSlotChainBuilder。这块对后续的增加留下了扩展性。主要体现的是建造者模式思想。
public interface SlotChainBuilder {
//构造 processor slot chain
ProcessorSlotChain build();
}
那接下来我选择DemoSlotChainBuilder看下怎么实现的。
/*
一个例子来讲述如何自定义建造一个slot chain
*/
@Spi
public class DemoSlotChainBuilder implements SlotChainBuilder {
@Override
public ProcessorSlotChain build() {
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();
// 过滤掉降级
//demo不会被降级异常阻塞的
sortedSlotList.removeIf(o -> DegradeSlot.class.equals(o.getClass()));
for (ProcessorSlot slot : sortedSlotList) {
if (!(slot instanceof AbstractLinkedProcessorSlot)) {
RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
continue;
}
chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
}
return chain;
}
}
通过上面的演示,就创建除了一个slot chain了。返回的就是ProcessorSlotChain。
-
继续深入原理
//chanMap是一个原子map,map的key是ResourceWrapper封装类,value是ProcessorSlotChain
private static volatile Map<ResourceWrapper, ProcessorSlotChain> chainMap
= new HashMap<ResourceWrapper, ProcessorSlotChain>();
/*
获取当前资源的ProcessorSlotChain,如果没有的话就创建一个ProcessorSlotChain出来
同样的资源只要ResourceWrapper.equals(Object)相等,不论在不在同一个上下文中,都会全局共享同一个ProcessorSlotChain。
还有就是ProcessorSlot的总数不能超过Constants#MAX_SLOT_CHAIN_SIZE否则会返回null
*/
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
//获取单向链表中的ProcessorSlotChain
ProcessorSlotChain chain = chainMap.get(resourceWrapper);
//chain是空的则进入,不为空就直接返回呗
if (chain == null) {
//这个就是刚才我在SlotChainProvider()方法中提到的。获取的话一定在lookProcessChain()中,因为有锁就保证了线程安全。
synchronized (LOCK) {
chain = chainMap.get(resourceWrapper);
//double check。因为在锁里了。在之前查到现在查的时间内,有可能加锁前chain被别的线程注入了,那这块就可能出问题的。
if (chain == null) {
// 超过预设值了,返回null吧。
if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
return null;
}
//刚才已经分析过了是如何创建slot chain的了。但是我只是用了demoSlotBuilder讲的。这块因为是用默认的DefaultSlotChainBuilder。所以我还得贴一下DefaultSlotChainBuilder的代码在下面
chain = SlotChainProvider.newSlotChain();
Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
chainMap.size() + 1);
newMap.putAll(chainMap);
newMap.put(resourceWrapper, chain);
chainMap = newMap;
}
}
}
return chain;
}
//默认的构建slot chain的建造者。请关注其中的SPI。
@Spi(isDefault = true)
public class DefaultSlotChainBuilder implements SlotChainBuilder {
@Override
public ProcessorSlotChain build() {
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
//采用SPI,那就看看META-INF中规定的processorSlot,看看有几种规则吧。贴在下面了
List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();
//通过下面的列表我们知道了chain中依次加入了很多slot
for (ProcessorSlot slot : sortedSlotList) {
if (!(slot instanceof AbstractLinkedProcessorSlot)) {
RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
continue;
}
//DefaultProcessorSlotChain中的addLast
chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
}
return chain;
}
}
//文件名:sentinel/sentinel-core/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.slotchain.ProcessorSlot
# Sentinel default ProcessorSlots
com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot
com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot
com.alibaba.csp.sentinel.slots.logger.LogSlot
com.alibaba.csp.sentinel.slots.statistic.StatisticSlot
com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
com.alibaba.csp.sentinel.slots.system.SystemSlot
com.alibaba.csp.sentinel.slots.block.flow.FlowSlot
com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot
以上的这些slot已经被依次添加到了chain中了。然后调用DefaultProcessorSlotChain中的addLast完成责任链的创建。
/*
DefaultProcessorSlotChain的实现。他继承了ProcessorSlotChain,继承了AbstractLinkedProcessorSlot实现了ProcessorSlot。
这一大坨的继承关系。其实仔细想想,也能明白。一个好的程序继承关系最多也就5层。不能再高了。再高了就不是一个好的设计方式了。
ProcessorSlotChain是一个抽象类,比继承的对象多了两个方法,addFirst()和addLast()。这两个方法必须在继承类中具体实现。
*/
public class DefaultProcessorSlotChain extends ProcessorSlotChain {
/*
构造了一个first节点。并且重写了entry和exit方法。
注意,这个变量指向的是他的父亲
*/
AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
throws Throwable {
//调用父亲的fireEntry()。这个方法会在真正进入entry时触发。是由first节点进行触发。
super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
//调用父亲的fireExit()
super.fireExit(context, resourceWrapper, count, args);
}
};
//同理,构造一个end节点,初始值和first指向一致,下面有图。
AbstractLinkedProcessorSlot<?> end = first;
@Override
public void addFirst(AbstractLinkedProcessorSlot<?> protocolProcessor) {
protocolProcessor.setNext(first.getNext());
first.setNext(protocolProcessor);
if (end == first) {
end = protocolProcessor;
}
}
/*
插入到链表中,从链表尾开始插。
注意,一个是赋值模型的next值。 一个是更新变量的引用。
*/
@Override
public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) {
//end的下一个节点指向变量传进来的slot。也就是first也指向了这个slot。注意这里的setNext方法。
end.setNext(protocolProcessor);
//end的指向更新,指向了新的slot
end = protocolProcessor;
}
/*
这个是这个方法的setNext。父类也有一个setNext。
像上面end.setNext()其实是调用的父亲的setNExt()。不是这个方法。
只有 this.setNext()或者setNext()才是用的这个方法。
*/
@Override
public void setNext(AbstractLinkedProcessorSlot<?> next) {
addLast(next);
}
…………
}
DefaultProcessorSlotChain中有两个AbstractLinkedProcessorSlot类型的变量:first和end,这就是链表的头结点和尾节点。
创建DefaultProcessorSlotChain对象时,首先创建了首节点,然后把首节点赋值给了尾节点,可以用下图表示:
当把第一个slot(NodeSelectorSlot
)添加(chain.addLast()
)到联表中。是这样的:
将所有的节点都加入到联表中后,整个联表的结构变成了如下图所示:
样就将所有的Slot对象添加到了链表中去了,每一个Slot都是继承自AbstractLinkedProcessorSlot。而AbstractLinkedProcessorSlot是一种责任链的设计,每个对象中都有一个next属性,指向的是另一个AbstractLinkedProcessorSlot对象。其实责任链模式在很多框架中都有,比如Netty中是通过pipeline来实现的。
知道了SlotChain是如何创建的了,那接下来就要看下是如何执行Slot的entry方法的了。
-
责任链是如何执行的
回看entryWithPriority()方法。在创建玩成chain后。调用了这个方法:
chain.entry(context, resourceWrapper, null, count, prioritized, args);
这个方法最终调用的是
public class DefaultProcessorSlotChain extends ProcessorSlotChain {
…………
//其实这个方法最终调用的是first的transformEntry()
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
throws Throwable {
first.transformEntry(context, resourceWrapper, t, count, prioritized, args);
}
…………
}
也就是说,DefaultProcessorSlotChain的entry实际是执行的first属性的transformEntry方法。
而transformEntry方法会执行当前节点的entry方法,在DefaultProcessorSlotChain中first节点重写了entry方法,具体如下:
public class DefaultProcessorSlotChain extends ProcessorSlotChain {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
throws Throwable {
super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
}
}
我们看到,实际entry()又执行了super的fireEntry()。
public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {
@Override
public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
if (next != null) {
//这个next指的就是下一个slot
next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
}
}
}
从这里看到,从fireEntry()中就开始传递执行entry了,这里会执行当前节点的下一个节点的transformEntry()。上面已经分析过了,transformEntry方法会触发当前节点的entry,也就是说fireEntry方法实际是触发了下一个节点的entry方法。具体的流程如下图所示:
那责任链开始传递了,每次传递到的slot都会运行entry()。我们应该将目光看看entry()的实现了。
-
slot的entry()
从上面得知,链表中在第一个位置的是NodeSelectorSlot。
@Spi(isSingleton = false, order = Constants.ORDER_NODE_SELECTOR_SLOT)
public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {
/**
* {@link DefaultNode}s of the same resource in different context.
*/
private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);
//关注这里。实际上调用的方法。 这个entry()方法的内部逻辑
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
/*
* It's interesting that we use context name rather resource name as the map key.
*
* Remember that same resource({@link ResourceWrapper#equals(Object)}) will share
* the same {@link ProcessorSlotChain} globally, no matter in which context. So if
* code goes into {@link #entry(Context, ResourceWrapper, DefaultNode, int, Object...)},
* the resource name must be same but context name may not.
*
* If we use {@link com.alibaba.csp.sentinel.SphU#entry(String resource)} to
* enter same resource in different context, using context name as map key can
* distinguish the same resource. In this case, multiple {@link DefaultNode}s will be created
* of the same resource name, for every distinct context (different context name) each.
*
* Consider another question. One resource may have multiple {@link DefaultNode},
* so what is the fastest way to get total statistics of the same resource?
* The answer is all {@link DefaultNode}s with same resource name share one
* {@link ClusterNode}. See {@link ClusterBuilderSlot} for detail.
*/
DefaultNode node = map.get(context.getName());
if (node == null) {
synchronized (this) {
node = map.get(context.getName());
if (node == null) {
node = new DefaultNode(resourceWrapper, null);
HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
cacheMap.putAll(map);
cacheMap.put(context.getName(), node);
map = cacheMap;
// Build invocation tree
((DefaultNode) context.getLastNode()).addChild(node);
}
}
}
context.setCurNode(node);
//调用下一个节点的entry()
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
fireExit(context, resourceWrapper, count, args);
}
}
执行完业务逻辑处理后,调用了fireEntry()方法,由此触发了下一个节点的entry方法。此时我们就知道了sentinel的责任链就是这样传递的:每个Slot节点执行完自己的业务后,会调用fireEntry来触发下一个节点的entry方法。因为每个slot的逻辑都不太一样,所以这里就不展开了,我将来会写每个slot的分析的。
至此就通过SlotChain完成了对每个节点的entry()方法的调用,每个节点会根据创建的规则,进行自己的逻辑处理,当统计的结果达到设置的阈值时,就会触发限流、降级等事件,具体是抛出BlockException异常。
本文参考: