《RPC手撸专栏》第fix-08章:优化服务熔断半开启状态的执行逻辑
作者:冰河
星球:http://m6z.cn/6aeFbs
博客1:https://binghe001.github.io
博客2:https://binghe.site
文章汇总:https://binghe.site/md/all/all.html
沉淀,成长,突破,帮助他人,成就自我。
大家好,我是冰河~~
在写《RPC手撸专栏》的过程中,针对专栏版本的代码,在书写的过程中,会提前埋一些坑进去,使各位星球的小伙伴在调试代码的过程中,能够自己去发现问题,并且分析问题,最好也能够自己解决问题。经过自己发现问题->分析问题->解决问题的过程,能够提升大家对于RPC框架源码的参与过程,更重要的是,能够不断提升大家自己发现问题、分析问题和解决问题的能力,这种能够力才是程序员最核心的竞争力。
一、问题描述
本章要解决什么问题呢?
在高并发、大流量场景下,如果熔断状态处于半开启状态时,可能会导致大量请求穿透访问后端服务的问题。
二、问题分析
这个问题是如何产生的呢?
在服务提供者和服务消费者整合服务熔断时,大体流程的伪代码如下所示。
//如果触发了熔断的规则,则直接返回降级处理数据
if (fusingInvoker.invokeFusingStrategy()){
return 降级处理结果;
}
//请求计数加1
fusingInvoker.incrementCount();
if (逻辑处理失败){
fusingInvoker.incrementFailureCount();
}
或者
//如果触发了熔断的规则,则直接返回降级处理数据
if (fusingInvoker.invokeFusingStrategy()){
return 降级处理结果;
}
//请求计数加1
fusingInvoker.incrementCount();
try{
//########执行逻辑处理###########
}catch(Throwable e){
fusingInvoker.incrementFailureCount();
}
总体逻辑就是,先判断是否达到了熔断条件,如果已经达到熔断条件,则直接触发降级处理,不再访问真实服务。如果未达到熔断条件,则对访问请求数加1,如果逻辑处理失败或者发生异常,则对访问失败的请求数加1。
接下来,看下修改前的FusingInvoker接口及其实现类的方法。
1.FusingInvoker接口
FusingInvoker接口是服务熔断的SPI接口,源码详见:bhrpc-fusing-api工程下的io.binghe.rpc.fusing.api.FusingInvoker,修改前的源码如下所示。
@SPI(RpcConstants.DEFAULT_FUSING_INVOKER)
public interface FusingInvoker {
/**
* 是否会触发熔断操作,规则如下:
* 1.断路器默认处于“关闭”状态,当错误个数或错误率到达阈值,就会触发断路器“开启”。
* 2.断路器开启后进入熔断时间,到达熔断时间终点后重置熔断时间,进入“半开启”状态。
* 3.在半开启状态下,如果服务能力恢复,则断路器关闭熔断状态。进而进入正常的服务状态。
* 4.在半开启状态下,如果服务能力未能恢复,则断路器再次触发服务熔断,进入熔断时间。
* @return 是否要触发熔断,true:触发熔断,false:不触发熔断
*/
boolean invokeFusingStrategy();
/**
* 处理请求的次数
*/
void incrementCount();
/**
* 处理请求失败的次数
*/
void incrementFailureCount();
/**
* 在milliSeconds毫秒内错误数量或者错误百分比达到totalFailure,则触发熔断操作
* @param totalFailure 在milliSeconds毫秒内触发熔断操作的上限值
* @param milliSeconds 毫秒数
*/
default void init(double totalFailure, int milliSeconds){}
}
2.FusingInvoker接口核心实现逻辑
这里,我们实现了基于错误数和错误百分比的熔断策略,不管是基于错误数的熔断策略,还是基于百分比的熔断策略,其核心逻辑都是一样的。这里,我们就将这些核心逻辑抽象出来,修改前的核心逻辑如下所示。
@SPIClass
public class XxxFusingInvoker extends AbstractFusingInvoker {
private final Logger logger = LoggerFactory.getLogger(CounterFusingInvoker.class);
@Override
public boolean invokeFusingStrategy() {
boolean result = false;
switch (fusingStatus.get()){
//关闭状态
case RpcConstants.FUSING_STATUS_CLOSED:
result = this.invokeClosedFusingStrategy();
break;
//半开启状态
case RpcConstants.FUSING_STATUS_HALF_OPEN:
result = this.invokeHalfOpenFusingStrategy();
break;
//开启状态
case RpcConstants.FUSING_STATUS_OPEN:
result = this.invokeOpenFusingStrategy();
break;
default:
result = this.invokeClosedFusingStrategy();
break;
}
logger.info("execute counter fusing strategy, current fusing status is {}", fusingStatus.get());
return result;
}
/**
* 处理开启状态
*/
private boolean invokeOpenFusingStrategy() {
//获取当前时间
long currentTimeStamp = System.currentTimeMillis();
//超过一个指定的时间范围,则将状态设置为半开启状态
if (currentTimeStamp - lastTimeStamp >= milliSeconds){
fusingStatus.set(RpcConstants.FUSING_STATUS_HALF_OPEN);
lastTimeStamp = currentTimeStamp;
this.resetCount();
return false;
}
return true;
}
/**
* 处理半开启状态
*/
private boolean invokeHalfOpenFusingStrategy() {
//获取当前时间
long currentTimeStamp = System.currentTimeMillis();
//服务已经恢复
if (currentFailureCounter.get() <= 0){
fusingStatus.set(RpcConstants.FUSING_STATUS_CLOSED);
lastTimeStamp = currentTimeStamp;
this.resetCount();
return false;
}
//服务未恢复
fusingStatus.set(RpcConstants.FUSING_STATUS_OPEN);
lastTimeStamp = currentTimeStamp;
return true;
}
/**
* 处理关闭状态逻辑
*/
private boolean invokeClosedFusingStrategy() {
//获取当前时间
long currentTimeStamp = System.currentTimeMillis();
//超过一个指定的时间范围
if (currentTimeStamp - lastTimeStamp >= milliSeconds){
lastTimeStamp = currentTimeStamp;
this.resetCount();
return false;
}
//超出配置的错误数量或百分比
if (超出配置的错误数量或百分比 >= totalFailure){
lastTimeStamp = currentTimeStamp;
fusingStatus.set(RpcConstants.FUSING_STATUS_OPEN);
return true;
}
return false;
}
}
结合所有的代码分析:如果程序最初访问时触发了熔断条件,将熔断状态设置为开启。当经过一个时间周期时,如果有线程调用服务方法,则会执行invokeOpenFusingStrategy()方法中的 if 条件分支中的语句,将熔断状态设置为半开启状态,重置请求数量和时间窗口,返回false。此时,当前线程就会直接调用真实服务方法,来探测真实服务是否已经恢复。
如果探测真实服务是否恢复的线程还未返回结果时,又有其他线程来调用服务方法,此时服务状态为半开启状态,就会执行invokeHalfOpenFusingStrategy()方法,由于探测真实服务是否恢复的线程还未返回结果,所以,满足currentFailureCounter.get()小于或者等于0的条件,此时又会将熔断状态设置为关闭。后续就会有大量线程穿透熔断逻辑直接访问真实服务。此时,真实服务是否已经恢复仍未可知。
所以,服务熔断在半开启状态下存在执行逻辑漏洞。
三、问题解决
问题该如何解决呢?
分析并定位到问题后,解决起来就比较简单了,具体的修复步骤如下所示。
1.修改FusingInvoker接口
FusingInvoker接口的源码详见:bhrpc-fusing-api工程下的io.binghe.rpc.fusing.api.FusingInvoker,主要是重新定义了FusingInvoker接口中的方法,如下所示。
@SPI(RpcConstants.DEFAULT_FUSING_INVOKER)
public interface FusingInvoker {
/**
* 是否会触发熔断操作,规则如下:
* 1.断路器默认处于“关闭”状态,当错误个数或错误率到达阈值,就会触发断路器“开启”。
* 2.断路器开启后进入熔断时间,到达熔断时间终点后重置熔断时间,进入“半开启”状态。
* 3.在半开启状态下,如果服务能力恢复,则断路器关闭熔断状态。进而进入正常的服务状态。
* 4.在半开启状态下,如果服务能力未能恢复,则断路器再次触发服务熔断,进入熔断时间。
* @return 是否要触发熔断,true:触发熔断,false:不触发熔断
*/
boolean invokeFusingStrategy();
/**
* 处理请求的次数
*/
void incrementCount();
/**
* 访问成功
*/
void markSuccess();
/**
* 访问失败
*/
void markFailed();
/**
* 在milliSeconds毫秒内错误数量或者错误百分比达到totalFailure,则触发熔断操作
* @param totalFailure 在milliSeconds毫秒内触发熔断操作的上限值
* @param milliSeconds 毫秒数
*/
default void init(double totalFailure, int milliSeconds){}
}
可以看到,在FusingInvoker接口中,定义了markSuccess()方法和markFailed()方法,其中,markSuccess()方法是执行成功时调用的方法,markFailed()方法是执行失败调用的方法。
2.修改FusingInvoker接口核心实现逻辑
这里,我们将FusingInvoker接口的核心实现逻辑修改成如下所示。
public abstract class AbstractFusingInvoker implements FusingInvoker {
/**
* 熔断状态,1:关闭; 2:半开启; 3:开启
*/
protected static final AtomicInteger fusingStatus = new AtomicInteger(RpcConstants.FUSING_STATUS_CLOSED);
/**
* 当前调用次数
*/
protected final AtomicInteger currentCounter = new AtomicInteger(0);
/**
* 当前调用失败的次数
*/
protected final AtomicInteger currentFailureCounter = new AtomicInteger(0);
/**
* 半开启状态下的等待状态
*/
protected final AtomicInteger fusingWaitStatus = new AtomicInteger(RpcConstants.FUSING_WAIT_STATUS_INIT);
/**
* 熔断时间范围的开始时间点
*/
protected volatile long lastTimeStamp = System.currentTimeMillis();
/**
* 在milliSeconds毫秒内触发熔断操作的上限值
* 可能是错误个数,也可能是错误率
*/
protected double totalFailure;
/**
* 毫秒数
*/
protected int milliSeconds;
/**
* 获取失败策略的结果值
*/
public abstract double getFailureStrategyValue();
/**
* 重置数量
*/
protected void resetCount(){
currentFailureCounter.set(0);
currentCounter.set(0);
}
@Override
public void incrementCount() {
currentCounter.incrementAndGet();
}
@Override
public void markSuccess() {
if (fusingStatus.get() == RpcConstants.FUSING_STATUS_HALF_OPEN){
fusingWaitStatus.compareAndSet(RpcConstants.FUSING_WAIT_STATUS_WAITINF, RpcConstants.FUSING_WAIT_STATUS_SUCCESS);
}
}
@Override
public void markFailed() {
currentFailureCounter.incrementAndGet();
if (fusingStatus.get() == RpcConstants.FUSING_STATUS_HALF_OPEN){
fusingWaitStatus.compareAndSet(RpcConstants.FUSING_WAIT_STATUS_WAITINF, RpcConstants.FUSING_WAIT_STATUS_FAILED);
}
}
@Override
public void init(double totalFailure, int milliSeconds) {
this.totalFailure = totalFailure <= 0 ? RpcConstants.DEFAULT_FUSING_TOTAL_FAILURE : totalFailure;
this.milliSeconds = milliSeconds <= 0 ? RpcConstants.DEFAULT_FUSING_MILLI_SECONDS : milliSeconds;
}
/**
* 处理开启状态的逻辑
*/
protected boolean invokeOpenFusingStrategy() {
//获取当前时间
long currentTimeStamp = System.currentTimeMillis();
//超过一个指定的时间范围
if (currentTimeStamp - lastTimeStamp >= milliSeconds){
//修改等待状态,让修改成功的线程进入半开启状态
if (fusingWaitStatus.compareAndSet(RpcConstants.FUSING_WAIT_STATUS_INIT, RpcConstants.FUSING_WAIT_STATUS_WAITINF)){
fusingStatus.set(RpcConstants.FUSING_STATUS_HALF_OPEN);
lastTimeStamp = currentTimeStamp;
this.resetCount();
return false;
}
}
return true;
}
/**
* 处理半开启状态的逻辑
*/
protected boolean invokeHalfOpenFusingStrategy() {
//此时熔断状态还是半开启状态,等待状态可能是等待,可能是成功,可能是失败
//获取当前时间
long currentTimeStamp = System.currentTimeMillis();
//成功了,表示服务已经恢复
if (fusingWaitStatus.compareAndSet(RpcConstants.FUSING_WAIT_STATUS_SUCCESS, RpcConstants.FUSING_WAIT_STATUS_INIT)){
fusingStatus.set(RpcConstants.FUSING_STATUS_CLOSED);
lastTimeStamp = currentTimeStamp;
this.resetCount();
return false;
}
//失败了,表示服务还未恢复
if (fusingWaitStatus.compareAndSet(RpcConstants.FUSING_WAIT_STATUS_FAILED, RpcConstants.FUSING_WAIT_STATUS_INIT)){
//服务未恢复
fusingStatus.set(RpcConstants.FUSING_STATUS_OPEN);
lastTimeStamp = currentTimeStamp;
return true;
}
//1.半开启状态的线程还未执行完逻辑,并发情况下的其他线程状态不变,直接返回true,执行熔断逻辑,此时熔断状态仍为半开启状态
//2.并发情况下,只有一个线程会检测到服务是否已经恢复,其他线程状态不变,直接返回true,执行熔断逻辑,此时熔断状态为开启或者关闭
//3.执行熔断逻辑的线程,不会执行真实方法的逻辑,会调用降级方法返回数据。
return true;
}
/**
* 处理关闭状态的逻辑
*/
protected boolean invokeClosedFusingStrategy() {
//获取当前时间
long currentTimeStamp = System.currentTimeMillis();
//超过一个指定的时间范围
if (currentTimeStamp - lastTimeStamp >= milliSeconds){
lastTimeStamp = currentTimeStamp;
this.resetCount();
return false;
}
//如果当前错误数或者百分比大于或等于配置的百分比
if (this.getFailureStrategyValue() >= totalFailure){
lastTimeStamp = currentTimeStamp;
fusingStatus.set(RpcConstants.FUSING_STATUS_OPEN);
return true;
}
return false;
}
}
可以看到,服务熔断核心逻辑的实现中,新增了半开启状态下的等待状态,markSuccess()方法和markFailed中,都是在半开启状态下原子更新等待的状态。
修改后整体的执行逻辑如下所示。
(1)等待状态的初始值为初始化状态。
(2)当熔断状态处于开启,并且当前时间周期结束时,会将等待状态由初始化状态原子更新为等待状态,只有原子更新等待状态成功后,才会将熔断状态修改为半开启状态,此时意味着只有一个线程会将熔断状态由开启状态修改为半开启状态,进而重置请求数和时间窗口,并请求真实服务方法来探测真实服务是否已经恢复。其他原子更新等待状态失败的线程,则直接触发熔断条件,进行降级处理。
换句话说,熔断状态为开启,并且当前时间周期结束时,如果有线程访问真实服务的方法,则只会有一个线程访问真实服务的方法,并且此时熔断状态已经变更为半开启状态。
(3)探测真实服务是否恢复的线程,当返回正确结果时,会调用markSuccess()方法,在熔断状态为半开启状态时,将等待状态由等待状态原子更新为成功状态。当返回异常结果时,会调用markFailed()方法,将请求失败的记录数加1,在在熔断状态为半开启状态时,将等待状态由等待状态原子更新为失败状态。
换句话说,如果等待状态是等待,则探测真实服务是否恢复的线程还未返回结果数据,如果等待状态是成功或者失败状态,则探测真实服务是否恢复的线程一定返回了结果数据。
(4)如果探测真实服务是否恢复的线程在访问真实服务期间,又有其他线程触发了熔断规则,此时熔断状态为半开启状态,就会触发invokeHalfOpenFusingStrategy()方法的执行。此时,就会有三种情况:
- 能够成功将等待状态由成功状态原子更新为初始化状态,说明真实服务已经恢复,则将熔断状态修改为关闭状态。
- 能够成功将等待状态由失败状态原子更新为初始化状态,说明真实服务未恢复,则将熔断状态修改成开启状态,进入一个熔断时间窗口周期。
- 如果既不能将等待状态由成功状态原子更新为初始化状态,也不能将等待状态由失败状态原子更新为初始化状态,说明探测真实服务是否恢复的线程还未返回结果数据,则此时线程不会重置请求数和时间窗口,直接触发熔断条件进行降级处理。
3.整合服务提供者和服务消费者的大体逻辑
在服务提供者和服务消费者整合修改后的服务熔断逻辑时,大体流程的伪代码如下所示。
//如果触发了熔断的规则,则直接返回降级处理数据
if (fusingInvoker.invokeFusingStrategy()){
return 降级处理结果;
}
//请求计数加1
fusingInvoker.incrementCount();
if (逻辑处理失败){
fusingInvoker.markFailed();
}else {
fusingInvoker.markSuccess();
}
或者
//如果触发了熔断的规则,则直接返回降级处理数据
if (fusingInvoker.invokeFusingStrategy()){
return 降级处理结果;
}
//请求计数加1
fusingInvoker.incrementCount();
try{
fusingInvoker.markSuccess();
}catch(Throwable e){
fusingInvoker.markFailed();
}
至此,服务熔断半开启状态的执行逻辑优化完毕。
注意:具体的代码大家可参见文章开头的代码工程链接,这里不再粘贴详细的代码逻辑。
四、问题总结
修改完问题不总结下怎么行?
我们自己手写的RPC框架不是一蹴而就的,它是一个不断优化和不断调整的过程,冰河也会将这些调整的过程整理好分享给各位星球的小伙伴。
总之,我们写的RPC框架正在一步步实现它该有的功能。
最后,我想说的是:学习《RPC手撸专栏》一定要塌下心来,一步一个脚印,动手实践,认真思考,遇到不懂的问题,可以直接到星球发布主题进行提问。一定要记住:纸上得来终觉浅,绝知此事要躬行的道理。否则,一味的CP,或者光看不练,不仅失去了学习的意义,到头来更是一无所获。
好了,本章就到这里吧,我是冰河,我们下一章见~~
五、关于星球
大家可以加入 冰河技术 知识星球,和星球小伙伴们一起学习《SpringCloud Alibaba实战》专栏和《RPC手撸专栏》,冰河技术知识星球的《RPC手撸专栏》是个连载大几十篇的专栏(目前已更新几十大篇章,110+篇文章,110+工程源码,120+源码Tag分支,真正的企业级、分布式、高并发、高性能、高可用,可扩展的RPC框架,仍在持续更新)。
另外,星球中《企业级大规模分布式调度系统》和《企业级大规模分布式IM系统》也已经提升日程,期待你的加入,与星球小伙伴一起开发企业级中间件项目,一起提升硬核技术!
星球提供的服务
冰河整理了星球提供的一些服务,如下所示。
加入星球,你将获得:
1.学习从零开始手撸可用于实际场景的高性能RPC框架项目
2.学习SpringCloud Alibaba实战项目—从零开发微服务项目
3.学习高并发、大流量业务场景的解决方案,体验大厂真正的高并发、大流量的业务场景
4.学习进大厂必备技能:性能调优、并发编程、分布式、微服务、框架源码、中间件开发、项目实战
5.提供站点 https://binghe.site 所有学习内容的指导、帮助
6.GitHub:https://github.com/binghe001/BingheGuide - 非常有价值的技术资料仓库,包括冰河所有的博客开放案例代码
7.提供技术问题、系统架构、学习成长、晋升答辩等各项内容的回答
8.定期的整理和分享出各类专属星球的技术小册、电子书、编程视频、PDF文件
9.定期组织技术直播分享,传道、授业、解惑,指导阶段瓶颈突破技巧
如何加入星球
加入星球:扫描优惠券二维码即可加入星球。

- 扫码 :通过扫描优惠券二维码加入星球。
- 链接 :打开链接 http://m6z.cn/6aeFbs 加入星球。
- 回复 :在公众号 冰河技术 回复 星球 领取优惠券加入星球。
特别提醒: 苹果用户进圈或续费,请加微信 hacker_binghe 扫二维码,或者去公众号 冰河技术 回复 星球 扫二维码加入星球。
好了,今天就到这儿吧,我是冰河,我们下期见~~
写在最后
如果你觉得冰河写的还不错,请微信搜索并关注「 冰河技术 」微信公众号,跟冰河学习高并发、分布式、微服务、大数据、互联网和云原生技术,「 冰河技术 」微信公众号更新了大量技术专题,每一篇技术文章干货满满!不少读者已经通过阅读「 冰河技术 」微信公众号文章,吊打面试官,成功跳槽到大厂;也有不少读者实现了技术上的飞跃,成为公司的技术骨干!如果你也想像他们一样提升自己的能力,实现技术能力的飞跃,进大厂,升职加薪,那就关注「 冰河技术 」微信公众号吧,每天更新超硬核技术干货,让你对如何提升技术能力不再迷茫!
加群交流
本群的宗旨是给大家提供一个良好的技术学习交流平台,所以杜绝一切广告!由于微信群人满 100 之后无法加入,请扫描下方二维码先添加作者 “冰河” 微信(hacker_binghe),备注:学习加群。

公众号
分享各种编程语言、开发技术、分布式与微服务架构、分布式数据库、分布式事务、云原生、大数据与云计算技术和渗透技术。另外,还会分享各种面试题和面试技巧。

星球
加入星球 冰河技术,可以获得本站点所有学习内容的指导与帮助。如果你遇到不能独立解决的问题,也可以添加冰河的微信:hacker_binghe, 我们一起沟通交流。另外,在星球中不只能学到实用的硬核技术,还能学习实战项目!
关注 冰河技术公众号,回复 星球 可以获取入场优惠券。

