冰河技术
导读
♻学习路线
  • 面试必问系列

    • 面试必问
  • 架构与模式

    • Java极简设计模式
    • 实战高并发设计模式
  • Java核心技术

    • Java8新特性
    • IOC核心技术
    • JVM调优技术
  • 容器化核心技术

    • Dockek核心技术
  • 分布式存储

    • Mycat核心技术
  • 数据库核心技术

    • MySQL基础篇
  • 服务器核心技术

    • Nginx核心技术
  • 渗透核心技术

    • 渗透实战技术
  • 底层技术
  • 源码分析
  • 基础案例
  • 实战案例
  • 面试
  • 系统架构
  • Spring6核心技术
  • 分布式事务

    • 分布式事务系列视频
  • SpringBoot
  • SpringCloudAlibaba
  • 🔥AI大模型项目

    • 一站式AI智能平台
    • AI智能客服系统
    • AI智能问答系统
    • 实战AI大模型
  • 中间件项目

    • 手写高性能Redis组件
    • 手写高性能脱敏组件
    • 手写线程池项目
    • 手写高性能SQL引擎
    • 手写高性能Polaris网关
    • 手写高性能RPC项目
  • 高并发项目

    • 分布式IM即时通讯系统(新)
    • 分布式Seckill秒杀系统
    • 实战高并发设计模式
  • 微服务项目

    • 简易电商脚手架项目
  • 手撕源码

    • 手撕Spring6源码
🌍知识星球
  • 总览

    • 《书籍汇总》
  • 出版图书

    • 《深入理解高并发编程:核心原理与案例实战》
    • 《深入理解高并发编程:JDK核心技术》
    • 《深入高平行開發:深度原理&專案實戰》
    • 《深入理解分布式事务:原理与实战》
    • 《MySQL技术大全:开发、优化与运维实战》
    • 《海量数据处理与大数据技术实战》
  • 电子书籍

    • 《实战高并发设计模式》
    • 《深入理解高并发编程(第2版)》
    • 《深入理解高并发编程(第1版)》
    • 《从零开始手写RPC框架(基础篇)》
    • 《SpringCloud Alibaba实战》
    • 《冰河的渗透实战笔记》
    • 《MySQL核心知识手册》
    • 《Spring IOC核心技术》
  • 关于自己
  • 关于学习
  • 关于职场
B站
Github
导读
♻学习路线
  • 面试必问系列

    • 面试必问
  • 架构与模式

    • Java极简设计模式
    • 实战高并发设计模式
  • Java核心技术

    • Java8新特性
    • IOC核心技术
    • JVM调优技术
  • 容器化核心技术

    • Dockek核心技术
  • 分布式存储

    • Mycat核心技术
  • 数据库核心技术

    • MySQL基础篇
  • 服务器核心技术

    • Nginx核心技术
  • 渗透核心技术

    • 渗透实战技术
  • 底层技术
  • 源码分析
  • 基础案例
  • 实战案例
  • 面试
  • 系统架构
  • Spring6核心技术
  • 分布式事务

    • 分布式事务系列视频
  • SpringBoot
  • SpringCloudAlibaba
  • 🔥AI大模型项目

    • 一站式AI智能平台
    • AI智能客服系统
    • AI智能问答系统
    • 实战AI大模型
  • 中间件项目

    • 手写高性能Redis组件
    • 手写高性能脱敏组件
    • 手写线程池项目
    • 手写高性能SQL引擎
    • 手写高性能Polaris网关
    • 手写高性能RPC项目
  • 高并发项目

    • 分布式IM即时通讯系统(新)
    • 分布式Seckill秒杀系统
    • 实战高并发设计模式
  • 微服务项目

    • 简易电商脚手架项目
  • 手撕源码

    • 手撕Spring6源码
🌍知识星球
  • 总览

    • 《书籍汇总》
  • 出版图书

    • 《深入理解高并发编程:核心原理与案例实战》
    • 《深入理解高并发编程:JDK核心技术》
    • 《深入高平行開發:深度原理&專案實戰》
    • 《深入理解分布式事务:原理与实战》
    • 《MySQL技术大全:开发、优化与运维实战》
    • 《海量数据处理与大数据技术实战》
  • 电子书籍

    • 《实战高并发设计模式》
    • 《深入理解高并发编程(第2版)》
    • 《深入理解高并发编程(第1版)》
    • 《从零开始手写RPC框架(基础篇)》
    • 《SpringCloud Alibaba实战》
    • 《冰河的渗透实战笔记》
    • 《MySQL核心知识手册》
    • 《Spring IOC核心技术》
  • 关于自己
  • 关于学习
  • 关于职场
B站
Github
  • RPC框架介绍

    • 【置顶】这次我设计了一款TPS百万级别的分布式、高性能、可扩展的RPC框架
  • 第一篇:整体设计

    • 第01章:开篇,从零开始手撸一个能在实际场景使用的高性能RPC框架
    • 第02章:高性能分布式RPC框架整体架构设计
    • 第03章:RPC服务核心注解的设计与实现
    • 第04章:实现RPC服务核心注解的扫描与解析
  • 第二篇:服务提供者

    • 第05章:服务提供者收发消息基础功能实现
    • 第06章:自定义网络传输协议的实现
    • 第07章:自定义网络编解码的实现
    • 第08章:模拟服务消费者与服务提供者之间的数据交互
    • 第09章:服务提供者调用真实方法的实现
    • 第10章:测试服务提供者调用真实方法
    • 第11章:服务提供者扩展支持CGLib调用真实方法
  • 第三篇:服务消费者

    • 第12章:实现服务消费者与服务提供者直接通信
    • 第13章:服务消费者异步转同步直接获取返回结果
    • 第14章:服务消费者异步转同步的自定义Future与AQS实现
    • 第15章:服务消费者同步、异步、单向调用的实现
    • 第16章:服务消费者回调方法的实现
    • 第17章:服务消费者实现动态代理功能屏蔽构建请求协议对象的细节
    • 第18章:服务消费者整合动态代理实现直接调用接口返回结果数据
    • 第19章:服务消费者动态代理实现异步调用
    • 第20章:服务消费者动态代理扩展优化
  • 第四篇:注册中心

    • 第21章:注册中心基础服务功能的实现
    • 第22章:服务提供者整合注册中心实现服务注册
    • 第23章:服务消费者整合注册中心实现服务发现
  • 第五篇:负载均衡

    • 第24章:服务消费者实现基于随机算法的负载均衡策略
  • 第六篇:SPI扩展序列化机制

    • 第25章:对标Dubbo实现SPI扩展机制的基础功能
    • 第26章:基于SPI扩展JDK序列化与反序列化机制
    • 第27章:基于SPI扩展Json序列化与反序列化机制
    • 第28章:基于SPI扩展Hessian2序列化与反序列化机制
    • 第29章:基于SPI扩展FST序列化与反序列化机制
    • 第30章:基于SPI扩展Kryo序列化与反序列化机制
    • 第31章:基于SPI扩展Protostuff序列化与反序列化机制
  • 第七篇:SPI扩展动态代理机制

    • 第32章:基于SPI扩展JDK动态代理机制
    • 第33章:基于SPI扩展CGLib动态代理机制
    • 第34章:基于SPI扩展Javassist动态代理机制
    • 第35章:基于SPI扩展ByteBuddy动态代理机制
    • 第36章:基于SPI扩展ASM动态代理机制
  • 第八篇:SPI扩展反射机制

    • 第37章:基于SPI扩展JDK反射机制调用真实方法
    • 第38章:基于SPI扩展CGLib反射机制调用真实方法
    • 第39章:基于SPI扩展Javassist反射机制调用真实方法
    • 第40章:基于SPI扩展ByteBuddy反射机制调用真实方法
    • 第41章:基于SPI扩展ASM反射机制调用真实方法
  • 第九篇:SPI扩展负载均衡策略

    • 第42章:基于SPI扩展随机算法负载均衡策略
    • 第43章:基于SPI扩展加权随机算法负载均衡策略
    • 第44章:基于SPI扩展轮询算法负载均衡策略
    • 第45章:基于SPI扩展加权轮询算法负载均衡策略
    • 第46章:基于SPI扩展Hash算法负载均衡策略
    • 第47章:基于SPI扩展加权Hash算法负载均衡策略
    • 第48章:基于SPI扩展源IP地址Hash算法负载均衡策略
    • 第49章:基于SPI扩展源IP地址加权Hash算法负载均衡策略
    • 第50章:基于SPI扩展Zookeeper的一致性Hash算法负载均衡策略
  • 第十篇:SPI扩展增强型负载均衡策略

    • 第51章:基于SPI扩展增强型加权随机算法负载均衡策略
    • 第52章:基于SPI扩展增强型加权轮询算法负载均衡策略
    • 第53章:基于SPI扩展增强型加权Hash算法负载均衡策略
    • 第54章:基于SPI扩展增强型加权源IP地址Hash算法负载均衡策略
    • 第55章:基于SPI扩展增强型Zookeeper一致性Hash算法负载均衡策略
    • 第56章:基于SPI扩展最少连接数负载均衡策略
  • 第十一篇:SPI扩展实现注册中心

    • 第57章:基于SPI扩展实现Zookeeper注册中心
    • 第57-X章:注册中心阶段性作业
  • 第十二篇:心跳机制

    • 第58章:心跳机制交互数据模型设计
    • 第59章:心跳机制增强数据模型与协议解析设计
    • 第60章:服务消费者向服务提供者发送心跳信息并接收心跳响应
    • 第61章:服务消费者心跳间隔时间配置化
    • 第62章:服务提供者向服务消费者发送心跳消息并接收心跳响应
    • 第63章:服务提供者心跳间隔时间配置化
    • 第63-X章:心跳机制阶段性作业
  • 第十三篇:增强型心跳机制

    • 第64章:服务提供者增强型心跳检测机制
    • 第65章:服务消费者增强型心跳检测机制
  • 第十四篇:重试机制

    • 第66章:服务消费者实现服务订阅的重试机制
    • 第67章:服务消费者连接服务提供者的重试机制
  • 第十五篇:整合Spring

    • 第68章:服务提供者整合Spring
    • 第69章:基于Spring XML接入服务提供者
    • 第70章:基于Spring注解接入服务提供者
    • 第71章:服务消费者整合Spring
    • 第72章:基于Spring XML接入服务消费者
    • 第73章:基于Spring注解接入服务消费者
    • 第73章-X:整合Spring阶段作业
  • 第十六篇:整合SpringBoot

    • 第74章:服务提供者整合SpringBoot
    • 第75章:基于SpringBoot接入服务提供者
    • 第76章:服务消费者整合SpringBoot
    • 第77章:基于SpringBoot接入服务消费者
    • 第77章-X:整合SpringBoot阶段作业
  • 第十七篇:整合Docker

    • 第78章:基于Docker接入服务提供者
    • 第79章:基于Docker接入服务消费者
    • 第79章-X:整合Docker阶段作业
  • 第十八篇:整合SpringCloud Alibaba

    • 第80章:整合SpringCloud Alibaba实际项目
    • 第80章-X:整合SpringCloud Alibaba阶段作业
  • 第十九篇:结果缓存

    • 第81章:结果缓存通用模型设计
    • 第82章:服务提供者支持结果缓存
    • 第83章:服务消费者支持结果缓存
    • 第83章-X:结果缓存阶段作业
  • 第二十篇:路由控制

    • 第84章:服务消费者直连某个服务提供者
    • 第85章:服务消费者直连多个服务提供者
    • 第85章-X:路由控制阶段作业
  • 第二十一篇:延迟连接

    • 第86章:服务消费者支持延迟连接服务提供者
    • 第87章:服务消费者支持非延迟连接服务提供者
    • 第87章-X:延迟连接阶段作业
  • 第二十二篇:并发控制

    • 第88章:并发控制基础模型设计
    • 第89章:服务提供者支持并发控制
    • 第90章:服务消费者支持并发控制
    • 第90章-X:并发控制阶段作业
  • 第二十三篇:流控分析

    • 第91章:流控分析后置处理器模型设计
    • 第92章:服务提供者整合流控分析
    • 第93章:服务消费者整合流控分析
    • 第93章-X:流控分析阶段作业
  • 第二十四篇:连接控制

    • 第94章:连接控制基础模型设计
    • 第95章:服务提供者整合连接控制
    • 第95章-X:连接控制阶段作业
  • 第二十五篇:SPI扩展连接淘汰策略

    • 第96章:基于SPI扩展最早连接淘汰策略
    • 第97章:基于SPI扩展最晚连接淘汰策略
    • 第98章:基于SPI扩展先进先出连接淘汰策略
    • 第99章:基于SPI扩展使用次数最少连接淘汰策略
    • 第100章:基于SPI扩展最近未被使用连接淘汰策略
    • 第101章:基于SPI扩展随机连接淘汰策略
    • 第102章:基于SPI扩展拒绝连接淘汰策略
    • 第102章-X:SPI扩展连接拒绝策略阶段作业
  • 第二十六篇:数据缓冲

    • 第103章:数据缓冲基础模型设计
    • 第104章:服务提供者整合数据缓冲
    • 第105章:服务消费者整合数据缓冲
    • 第105章-X:数据缓冲阶段作业
  • 第二十七篇:服务容错(降级)

    • 第106章:服务容错设计与研发
    • 第107章:服务容错效果测试
    • 第108章:服务容错失效问题修复
    • 第108章-X:服务容错阶段作业
  • 第二十八篇:服务限流

    • 第109章:服务限流基础模型设计
    • 第110章:服务提供者整合服务限流
    • 第111章:服务消费者整合服务限流
    • 第111章-X:服务限流阶段作业
  • 第二十九篇:基于SPI扩展限流策略

    • 第112章:基于SPI扩展Semaphore限流策略
    • 第113章:基于SPI扩展Guava限流策略
    • 第113章-X:基于SPI扩展限流策略阶段作业
  • 第三十篇:超出限流规则

    • 第114章:服务提供者超出限流上限触发的规则
    • 第115章:服务消费者超出限流上限触发的规则
    • 第115章-X:超出限流规则阶段作业
  • 第三十一篇:服务熔断

    • 第116章:服务熔断基础模型设计
    • 第117章:服务提供者整合服务熔断
    • 第118章:服务消费者整合服务熔断
    • 第118章-X:服务熔断阶段作业
  • 第三十二篇:基于SPI扩展熔断策略

    • 第119章:基于SPI扩展错误率熔断策略
    • 第119章-X:基于SPI扩展熔断策略阶段作业
  • 第三十三篇:异常监控

    • 第120章:异常监控后置处理器基础模型设计
    • 第121章:服务提供者整合异常监控
    • 第122章:服务消费者整合异常监控
    • 第122章-X:异常监控阶段作业
  • 维护篇:持续维护篇

    • 第fix-01章:修复服务消费者读取配置优先级的问题
    • 第fix-02章:修复Zookeeper一致性Hash负载均衡泛型类型不匹配的问题
    • 第fix-03章:修复自定义扫描器递归扫描文件标识不起作用的问题
    • 第fix-04章:修复基于SpringBoot启动服务消费者Netty Group多次连接的问题
    • 第fix-05章:修复基于计数器的限流策略不起作用的问题
    • 第fix-06章:修复基于SpringBoot启动服务消费者无法同时连接多个服务提供者的问题
    • 第fix-07章:更新基于Semaphore的限流策略
    • 第fix-08章:优化服务熔断半开启状态的执行逻辑
  • 番外篇

    • 《从零开始手写RPC框架》电子书重磅发布

《RPC手撸专栏》第fix-08章:优化服务熔断半开启状态的执行逻辑

作者:冰河
星球:http://m6z.cn/6aeFbs
博客1:https://binghe001.github.io
博客2:https://binghe.site
文章汇总:https://binghe.site/md/all/all.html

沉淀,成长,突破,帮助他人,成就自我。

大家好,我是冰河~~

在写《RPC手撸专栏》的过程中,针对专栏版本的代码,在书写的过程中,会提前埋一些坑进去,使各位星球的小伙伴在调试代码的过程中,能够自己去发现问题,并且分析问题,最好也能够自己解决问题。经过自己发现问题->分析问题->解决问题的过程,能够提升大家对于RPC框架源码的参与过程,更重要的是,能够不断提升大家自己发现问题、分析问题和解决问题的能力,这种能够力才是程序员最核心的竞争力。

一、问题描述

本章要解决什么问题呢?

在高并发、大流量场景下,如果熔断状态处于半开启状态时,可能会导致大量请求穿透访问后端服务的问题。

二、问题分析

这个问题是如何产生的呢?

在服务提供者和服务消费者整合服务熔断时,大体流程的伪代码如下所示。

//如果触发了熔断的规则,则直接返回降级处理数据
if (fusingInvoker.invokeFusingStrategy()){
   return 降级处理结果;
}
//请求计数加1
fusingInvoker.incrementCount();
if (逻辑处理失败){
     fusingInvoker.incrementFailureCount();
}

或者

//如果触发了熔断的规则,则直接返回降级处理数据
if (fusingInvoker.invokeFusingStrategy()){
    return 降级处理结果;
}
//请求计数加1
fusingInvoker.incrementCount();
try{
     //########执行逻辑处理###########
}catch(Throwable e){
     fusingInvoker.incrementFailureCount();
}

总体逻辑就是,先判断是否达到了熔断条件,如果已经达到熔断条件,则直接触发降级处理,不再访问真实服务。如果未达到熔断条件,则对访问请求数加1,如果逻辑处理失败或者发生异常,则对访问失败的请求数加1。

接下来,看下修改前的FusingInvoker接口及其实现类的方法。

1.FusingInvoker接口

FusingInvoker接口是服务熔断的SPI接口,源码详见:bhrpc-fusing-api工程下的io.binghe.rpc.fusing.api.FusingInvoker,修改前的源码如下所示。

@SPI(RpcConstants.DEFAULT_FUSING_INVOKER)
public interface FusingInvoker {
    /**
     * 是否会触发熔断操作,规则如下:
     * 1.断路器默认处于“关闭”状态,当错误个数或错误率到达阈值,就会触发断路器“开启”。
     * 2.断路器开启后进入熔断时间,到达熔断时间终点后重置熔断时间,进入“半开启”状态。
     * 3.在半开启状态下,如果服务能力恢复,则断路器关闭熔断状态。进而进入正常的服务状态。
     * 4.在半开启状态下,如果服务能力未能恢复,则断路器再次触发服务熔断,进入熔断时间。
     * @return 是否要触发熔断,true:触发熔断,false:不触发熔断
     */
    boolean invokeFusingStrategy();
    /**
     * 处理请求的次数
     */
    void incrementCount();

    /**
     * 处理请求失败的次数
     */
    void incrementFailureCount();
    /**
     * 在milliSeconds毫秒内错误数量或者错误百分比达到totalFailure,则触发熔断操作
     * @param totalFailure 在milliSeconds毫秒内触发熔断操作的上限值
     * @param milliSeconds 毫秒数
     */
    default void init(double totalFailure, int milliSeconds){}
}

2.FusingInvoker接口核心实现逻辑

这里,我们实现了基于错误数和错误百分比的熔断策略,不管是基于错误数的熔断策略,还是基于百分比的熔断策略,其核心逻辑都是一样的。这里,我们就将这些核心逻辑抽象出来,修改前的核心逻辑如下所示。

@SPIClass
public class XxxFusingInvoker extends AbstractFusingInvoker {

    private final Logger logger = LoggerFactory.getLogger(CounterFusingInvoker.class);

    @Override
    public boolean invokeFusingStrategy() {
        boolean result = false;
        switch (fusingStatus.get()){
            //关闭状态
            case RpcConstants.FUSING_STATUS_CLOSED:
                result =  this.invokeClosedFusingStrategy();
                break;
            //半开启状态
            case RpcConstants.FUSING_STATUS_HALF_OPEN:
                result = this.invokeHalfOpenFusingStrategy();
                break;
            //开启状态
            case RpcConstants.FUSING_STATUS_OPEN:
                result = this.invokeOpenFusingStrategy();
                break;
            default:
                result = this.invokeClosedFusingStrategy();
                break;
        }
        logger.info("execute counter fusing strategy, current fusing status is {}", fusingStatus.get());
        return result;
    }

    /**
     * 处理开启状态
     */
    private boolean invokeOpenFusingStrategy() {
        //获取当前时间
        long currentTimeStamp = System.currentTimeMillis();
        //超过一个指定的时间范围,则将状态设置为半开启状态
        if (currentTimeStamp - lastTimeStamp >= milliSeconds){
            fusingStatus.set(RpcConstants.FUSING_STATUS_HALF_OPEN);
            lastTimeStamp = currentTimeStamp;
            this.resetCount();
            return false;
        }
        return true;
    }

    /**
     * 处理半开启状态
     */
    private boolean invokeHalfOpenFusingStrategy() {
        //获取当前时间
        long currentTimeStamp = System.currentTimeMillis();
        //服务已经恢复
        if (currentFailureCounter.get() <= 0){
            fusingStatus.set(RpcConstants.FUSING_STATUS_CLOSED);
            lastTimeStamp = currentTimeStamp;
            this.resetCount();
            return false;
        }
        //服务未恢复
        fusingStatus.set(RpcConstants.FUSING_STATUS_OPEN);
        lastTimeStamp = currentTimeStamp;
        return true;
    }

    /**
     * 处理关闭状态逻辑
     */
    private boolean invokeClosedFusingStrategy() {
        //获取当前时间
        long currentTimeStamp = System.currentTimeMillis();
        //超过一个指定的时间范围
        if (currentTimeStamp - lastTimeStamp >= milliSeconds){
            lastTimeStamp = currentTimeStamp;
            this.resetCount();
            return false;
        }
        //超出配置的错误数量或百分比
        if (超出配置的错误数量或百分比 >= totalFailure){
            lastTimeStamp = currentTimeStamp;
            fusingStatus.set(RpcConstants.FUSING_STATUS_OPEN);
            return true;
        }
        return false;
    }
}

结合所有的代码分析:如果程序最初访问时触发了熔断条件,将熔断状态设置为开启。当经过一个时间周期时,如果有线程调用服务方法,则会执行invokeOpenFusingStrategy()方法中的 if 条件分支中的语句,将熔断状态设置为半开启状态,重置请求数量和时间窗口,返回false。此时,当前线程就会直接调用真实服务方法,来探测真实服务是否已经恢复。

如果探测真实服务是否恢复的线程还未返回结果时,又有其他线程来调用服务方法,此时服务状态为半开启状态,就会执行invokeHalfOpenFusingStrategy()方法,由于探测真实服务是否恢复的线程还未返回结果,所以,满足currentFailureCounter.get()小于或者等于0的条件,此时又会将熔断状态设置为关闭。后续就会有大量线程穿透熔断逻辑直接访问真实服务。此时,真实服务是否已经恢复仍未可知。

所以,服务熔断在半开启状态下存在执行逻辑漏洞。

三、问题解决

问题该如何解决呢?

分析并定位到问题后,解决起来就比较简单了,具体的修复步骤如下所示。

1.修改FusingInvoker接口

FusingInvoker接口的源码详见:bhrpc-fusing-api工程下的io.binghe.rpc.fusing.api.FusingInvoker,主要是重新定义了FusingInvoker接口中的方法,如下所示。

@SPI(RpcConstants.DEFAULT_FUSING_INVOKER)
public interface FusingInvoker {
    /**
     * 是否会触发熔断操作,规则如下:
     * 1.断路器默认处于“关闭”状态,当错误个数或错误率到达阈值,就会触发断路器“开启”。
     * 2.断路器开启后进入熔断时间,到达熔断时间终点后重置熔断时间,进入“半开启”状态。
     * 3.在半开启状态下,如果服务能力恢复,则断路器关闭熔断状态。进而进入正常的服务状态。
     * 4.在半开启状态下,如果服务能力未能恢复,则断路器再次触发服务熔断,进入熔断时间。
     * @return 是否要触发熔断,true:触发熔断,false:不触发熔断
     */
    boolean invokeFusingStrategy();

    /**
     * 处理请求的次数
     */
    void incrementCount();

    /**
     * 访问成功
     */
    void markSuccess();

    /**
     * 访问失败
     */
    void markFailed();
    /**
     * 在milliSeconds毫秒内错误数量或者错误百分比达到totalFailure,则触发熔断操作
     * @param totalFailure 在milliSeconds毫秒内触发熔断操作的上限值
     * @param milliSeconds 毫秒数
     */
    default void init(double totalFailure, int milliSeconds){}
}

可以看到,在FusingInvoker接口中,定义了markSuccess()方法和markFailed()方法,其中,markSuccess()方法是执行成功时调用的方法,markFailed()方法是执行失败调用的方法。

2.修改FusingInvoker接口核心实现逻辑

这里,我们将FusingInvoker接口的核心实现逻辑修改成如下所示。

public abstract class AbstractFusingInvoker implements FusingInvoker {

    /**
     * 熔断状态,1:关闭; 2:半开启; 3:开启
     */
    protected static final AtomicInteger fusingStatus = new AtomicInteger(RpcConstants.FUSING_STATUS_CLOSED);

    /**
     * 当前调用次数
     */
    protected final AtomicInteger currentCounter = new AtomicInteger(0);

    /**
     * 当前调用失败的次数
     */
    protected final AtomicInteger currentFailureCounter = new AtomicInteger(0);

    /**
     * 半开启状态下的等待状态
     */
    protected final AtomicInteger fusingWaitStatus = new AtomicInteger(RpcConstants.FUSING_WAIT_STATUS_INIT);

    /**
     * 熔断时间范围的开始时间点
     */
    protected volatile long lastTimeStamp = System.currentTimeMillis();

    /**
     * 在milliSeconds毫秒内触发熔断操作的上限值
     * 可能是错误个数,也可能是错误率
     */
    protected double totalFailure;

    /**
     * 毫秒数
     */
    protected int milliSeconds;

    /**
     * 获取失败策略的结果值
     */
    public abstract double getFailureStrategyValue();

    /**
     * 重置数量
     */
    protected void resetCount(){
        currentFailureCounter.set(0);
        currentCounter.set(0);
    }

    @Override
    public void incrementCount() {
        currentCounter.incrementAndGet();
    }

    @Override
    public void markSuccess() {
        if (fusingStatus.get() == RpcConstants.FUSING_STATUS_HALF_OPEN){
            fusingWaitStatus.compareAndSet(RpcConstants.FUSING_WAIT_STATUS_WAITINF, RpcConstants.FUSING_WAIT_STATUS_SUCCESS);
        }
    }

    @Override
    public void markFailed() {
        currentFailureCounter.incrementAndGet();
        if (fusingStatus.get() == RpcConstants.FUSING_STATUS_HALF_OPEN){
            fusingWaitStatus.compareAndSet(RpcConstants.FUSING_WAIT_STATUS_WAITINF, RpcConstants.FUSING_WAIT_STATUS_FAILED);
        }
    }

    @Override
    public void init(double totalFailure, int milliSeconds) {
        this.totalFailure = totalFailure <= 0 ? RpcConstants.DEFAULT_FUSING_TOTAL_FAILURE : totalFailure;
        this.milliSeconds = milliSeconds <= 0 ? RpcConstants.DEFAULT_FUSING_MILLI_SECONDS : milliSeconds;
    }

    /**
     * 处理开启状态的逻辑
     */
    protected boolean invokeOpenFusingStrategy() {
        //获取当前时间
        long currentTimeStamp = System.currentTimeMillis();
        //超过一个指定的时间范围
        if (currentTimeStamp - lastTimeStamp >= milliSeconds){
            //修改等待状态,让修改成功的线程进入半开启状态
            if (fusingWaitStatus.compareAndSet(RpcConstants.FUSING_WAIT_STATUS_INIT, RpcConstants.FUSING_WAIT_STATUS_WAITINF)){
                fusingStatus.set(RpcConstants.FUSING_STATUS_HALF_OPEN);
                lastTimeStamp = currentTimeStamp;
                this.resetCount();
                return false;
            }
        }
        return true;
    }

    /**
     * 处理半开启状态的逻辑
     */
    protected boolean invokeHalfOpenFusingStrategy() {
        //此时熔断状态还是半开启状态,等待状态可能是等待,可能是成功,可能是失败
        //获取当前时间
        long currentTimeStamp = System.currentTimeMillis();
        //成功了,表示服务已经恢复
        if (fusingWaitStatus.compareAndSet(RpcConstants.FUSING_WAIT_STATUS_SUCCESS, RpcConstants.FUSING_WAIT_STATUS_INIT)){
            fusingStatus.set(RpcConstants.FUSING_STATUS_CLOSED);
            lastTimeStamp = currentTimeStamp;
            this.resetCount();
            return false;
        }
        //失败了,表示服务还未恢复
        if (fusingWaitStatus.compareAndSet(RpcConstants.FUSING_WAIT_STATUS_FAILED, RpcConstants.FUSING_WAIT_STATUS_INIT)){
            //服务未恢复
            fusingStatus.set(RpcConstants.FUSING_STATUS_OPEN);
            lastTimeStamp = currentTimeStamp;
            return true;
        }
        //1.半开启状态的线程还未执行完逻辑,并发情况下的其他线程状态不变,直接返回true,执行熔断逻辑,此时熔断状态仍为半开启状态
        //2.并发情况下,只有一个线程会检测到服务是否已经恢复,其他线程状态不变,直接返回true,执行熔断逻辑,此时熔断状态为开启或者关闭
        //3.执行熔断逻辑的线程,不会执行真实方法的逻辑,会调用降级方法返回数据。
        return true;
    }

    /**
     * 处理关闭状态的逻辑
     */
    protected boolean invokeClosedFusingStrategy() {
        //获取当前时间
        long currentTimeStamp = System.currentTimeMillis();
        //超过一个指定的时间范围
        if (currentTimeStamp - lastTimeStamp >= milliSeconds){
            lastTimeStamp = currentTimeStamp;
            this.resetCount();
            return false;
        }
        //如果当前错误数或者百分比大于或等于配置的百分比
        if (this.getFailureStrategyValue() >= totalFailure){
            lastTimeStamp = currentTimeStamp;
            fusingStatus.set(RpcConstants.FUSING_STATUS_OPEN);
            return true;
        }
        return false;
    }
}

可以看到,服务熔断核心逻辑的实现中,新增了半开启状态下的等待状态,markSuccess()方法和markFailed中,都是在半开启状态下原子更新等待的状态。

修改后整体的执行逻辑如下所示。

(1)等待状态的初始值为初始化状态。

(2)当熔断状态处于开启,并且当前时间周期结束时,会将等待状态由初始化状态原子更新为等待状态,只有原子更新等待状态成功后,才会将熔断状态修改为半开启状态,此时意味着只有一个线程会将熔断状态由开启状态修改为半开启状态,进而重置请求数和时间窗口,并请求真实服务方法来探测真实服务是否已经恢复。其他原子更新等待状态失败的线程,则直接触发熔断条件,进行降级处理。

换句话说,熔断状态为开启,并且当前时间周期结束时,如果有线程访问真实服务的方法,则只会有一个线程访问真实服务的方法,并且此时熔断状态已经变更为半开启状态。

(3)探测真实服务是否恢复的线程,当返回正确结果时,会调用markSuccess()方法,在熔断状态为半开启状态时,将等待状态由等待状态原子更新为成功状态。当返回异常结果时,会调用markFailed()方法,将请求失败的记录数加1,在在熔断状态为半开启状态时,将等待状态由等待状态原子更新为失败状态。

换句话说,如果等待状态是等待,则探测真实服务是否恢复的线程还未返回结果数据,如果等待状态是成功或者失败状态,则探测真实服务是否恢复的线程一定返回了结果数据。

(4)如果探测真实服务是否恢复的线程在访问真实服务期间,又有其他线程触发了熔断规则,此时熔断状态为半开启状态,就会触发invokeHalfOpenFusingStrategy()方法的执行。此时,就会有三种情况:

  • 能够成功将等待状态由成功状态原子更新为初始化状态,说明真实服务已经恢复,则将熔断状态修改为关闭状态。
  • 能够成功将等待状态由失败状态原子更新为初始化状态,说明真实服务未恢复,则将熔断状态修改成开启状态,进入一个熔断时间窗口周期。
  • 如果既不能将等待状态由成功状态原子更新为初始化状态,也不能将等待状态由失败状态原子更新为初始化状态,说明探测真实服务是否恢复的线程还未返回结果数据,则此时线程不会重置请求数和时间窗口,直接触发熔断条件进行降级处理。

3.整合服务提供者和服务消费者的大体逻辑

在服务提供者和服务消费者整合修改后的服务熔断逻辑时,大体流程的伪代码如下所示。

//如果触发了熔断的规则,则直接返回降级处理数据
if (fusingInvoker.invokeFusingStrategy()){
    return 降级处理结果;
}
//请求计数加1
fusingInvoker.incrementCount();
if (逻辑处理失败){
    fusingInvoker.markFailed();
}else {
    fusingInvoker.markSuccess();
}

或者

//如果触发了熔断的规则,则直接返回降级处理数据
if (fusingInvoker.invokeFusingStrategy()){
    return 降级处理结果;
}
//请求计数加1
fusingInvoker.incrementCount();
try{
     fusingInvoker.markSuccess();
}catch(Throwable e){
    fusingInvoker.markFailed();
}

至此,服务熔断半开启状态的执行逻辑优化完毕。

注意:具体的代码大家可参见文章开头的代码工程链接,这里不再粘贴详细的代码逻辑。

四、问题总结

修改完问题不总结下怎么行?

我们自己手写的RPC框架不是一蹴而就的,它是一个不断优化和不断调整的过程,冰河也会将这些调整的过程整理好分享给各位星球的小伙伴。

总之,我们写的RPC框架正在一步步实现它该有的功能。

最后,我想说的是:学习《RPC手撸专栏》一定要塌下心来,一步一个脚印,动手实践,认真思考,遇到不懂的问题,可以直接到星球发布主题进行提问。一定要记住:纸上得来终觉浅,绝知此事要躬行的道理。否则,一味的CP,或者光看不练,不仅失去了学习的意义,到头来更是一无所获。

好了,本章就到这里吧,我是冰河,我们下一章见~~

五、关于星球

大家可以加入 冰河技术 知识星球,和星球小伙伴们一起学习《SpringCloud Alibaba实战》专栏和《RPC手撸专栏》,冰河技术知识星球的《RPC手撸专栏》是个连载大几十篇的专栏(目前已更新几十大篇章,110+篇文章,110+工程源码,120+源码Tag分支,真正的企业级、分布式、高并发、高性能、高可用,可扩展的RPC框架,仍在持续更新)。

另外,星球中《企业级大规模分布式调度系统》和《企业级大规模分布式IM系统》也已经提升日程,期待你的加入,与星球小伙伴一起开发企业级中间件项目,一起提升硬核技术!

星球提供的服务

冰河整理了星球提供的一些服务,如下所示。

加入星球,你将获得:

1.学习从零开始手撸可用于实际场景的高性能RPC框架项目

2.学习SpringCloud Alibaba实战项目—从零开发微服务项目

3.学习高并发、大流量业务场景的解决方案,体验大厂真正的高并发、大流量的业务场景

4.学习进大厂必备技能:性能调优、并发编程、分布式、微服务、框架源码、中间件开发、项目实战

5.提供站点 https://binghe.site 所有学习内容的指导、帮助

6.GitHub:https://github.com/binghe001/BingheGuide - 非常有价值的技术资料仓库,包括冰河所有的博客开放案例代码

7.提供技术问题、系统架构、学习成长、晋升答辩等各项内容的回答

8.定期的整理和分享出各类专属星球的技术小册、电子书、编程视频、PDF文件

9.定期组织技术直播分享,传道、授业、解惑,指导阶段瓶颈突破技巧

如何加入星球

加入星球:扫描优惠券二维码即可加入星球。

sa-2022-04-21-007

  • 扫码 :通过扫描优惠券二维码加入星球。
  • 链接 :打开链接 http://m6z.cn/6aeFbs 加入星球。
  • 回复 :在公众号 冰河技术 回复 星球 领取优惠券加入星球。

特别提醒: 苹果用户进圈或续费,请加微信 hacker_binghe 扫二维码,或者去公众号 冰河技术 回复 星球 扫二维码加入星球。

好了,今天就到这儿吧,我是冰河,我们下期见~~

写在最后

如果你觉得冰河写的还不错,请微信搜索并关注「 冰河技术 」微信公众号,跟冰河学习高并发、分布式、微服务、大数据、互联网和云原生技术,「 冰河技术 」微信公众号更新了大量技术专题,每一篇技术文章干货满满!不少读者已经通过阅读「 冰河技术 」微信公众号文章,吊打面试官,成功跳槽到大厂;也有不少读者实现了技术上的飞跃,成为公司的技术骨干!如果你也想像他们一样提升自己的能力,实现技术能力的飞跃,进大厂,升职加薪,那就关注「 冰河技术 」微信公众号吧,每天更新超硬核技术干货,让你对如何提升技术能力不再迷茫!

加群交流

本群的宗旨是给大家提供一个良好的技术学习交流平台,所以杜绝一切广告!由于微信群人满 100 之后无法加入,请扫描下方二维码先添加作者 “冰河” 微信(hacker_binghe),备注:学习加群。

冰河微信

公众号

分享各种编程语言、开发技术、分布式与微服务架构、分布式数据库、分布式事务、云原生、大数据与云计算技术和渗透技术。另外,还会分享各种面试题和面试技巧。

公众号:冰河技术

星球

加入星球 冰河技术,可以获得本站点所有学习内容的指导与帮助。如果你遇到不能独立解决的问题,也可以添加冰河的微信:hacker_binghe, 我们一起沟通交流。另外,在星球中不只能学到实用的硬核技术,还能学习实战项目!

关注 冰河技术公众号,回复 星球 可以获取入场优惠券。

知识星球:冰河技术

在 GitHub 上编辑此页
上次更新: 2026/4/29 16:18
Contributors: binghe001
Prev
第fix-07章:更新基于Semaphore的限流策略
阅读全文
×

扫码或搜索:冰河技术
发送:290992
即可立即永久解锁本站全部文章

星球会员
跳转链接