我们在Stream里有什么方法来应对异常呢?
应付异常情况无非就是下面三个思路:
- 重试 再来一次,简单方便。但是要留意程序逻辑是否实现了幂等性
- 降级 这个方案需要规划降级链路,但是对于那种“必须成功”的场景(比如银行转账),降级不是一个好的选择
- 人工介入 这个方式很多,可以归集异常信息然后触发系统报警。人工介入是异常处理的最后一个环节,我们在系统设计环节应该尽量提高应用的self-heal能力,也就是容错能力,尽量减少人工介入的频率
大部分的小型应用系统只会使用第一种方案,也就是重试,能用到降级策略的一般都是有一定规模的应用。
对于重试和降级都不能解决的异常情况来说,我们要么直接丢弃,要么就要打上某些特殊标记,然后通知人工介入处理。
在Stream中对以上三个情况都有相应的方案。
直接发起重试
我们先来看一下最简单的重试机制,在Stream中进行重试有两个方法
Consumer 端重试
重试只发生在 Consumer 端,Stream 将失败的消息原封不动的发给当前这个 Consumer。也就是说,重试只会发生在当前机器,并不会换一个节点进行重试。
当然我们不能让重试无休止的进行下去,Stream 默认情况下会进行 3 次重试,如果我们想增加或减少重试次数,可以给某个 Consumer 指定重试次数。
Re-queue重试
不同于上一种的重试方式,在 Re-queue 的方案下 Stream 会将失败的消息重新发送到消息组件中,这种方式和前面的 Consumer 端重试有两个不同之处:
- 消息来源
Re-queue重试的发起点是消息组件,Stream 通过 Re-queue 机制向队列中添加了一个新的消息,因此 Consumer 相当于从消息组件中处理了一个新消息。 而 Consumer 重试的触发点是 Stream 本身,只是从业务逻辑层面进行重试,消费的是同一个消息 - 消费地点 Re-queue 添加的新消息可以被其他 Consumer 进行消费,而 Consumer 重试只发生在当前这个 Consumer 上,并不会跑到其他节点进行重试
降级
降级就是一段自定义的异常处理流程,在 Stream 中我们可以借助 spring-integration 的一个注解来完成,相关的配置非常简单
@ServiceActivator(inputChannel = "xxxx")
上面这个注解和 Hystrix 的 Fallback 类似,都是将异常请求转交给 Fallback 逻辑进行处理。
人工处理
人工介入的方式五花八门,但归根结底我要知道在哪里才能找到需要人工介入的数据,这其实就是一个归集异常数据的功能。
在Stream里我们可以通过下面两个常用的方式,将异常消息归集到某个消息队列中,等待人工介入:
- 异常队列 将异常消息丢入Error Queue,如果做的再细致一些,可以分门别类地将不同错误类型的消息放到不同的异常队列中去
- 死信队列 又叫 DLQ(Dead Letter Queue),是 RabbitMQ 专门为各种异常情况设置的队列,Stream 天然集成了 DLQ ,在 Stream 中不仅可以将 Re-queue 的路径指向 DLQ,也可以设置 DLQ 中消息的过期时间(对某些有时效性要求的消息)。在消息到达死信队列后,我们也可以人工将消息移动到其他队列中进行重新消费。
文档信息
- 本文作者:Piter Jia
- 本文链接:https://piterjia.github.io/2020/06/21/spring-cloud-stream-error/
- 版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)