本篇内容主要讲解“怎么实现Java异步延迟消息队列”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“怎么实现Java异步延迟消息队列”吧!
1. 一条链路调用
系统在收到一个请求后,完整链路同步顺序调用,实现起来简单易懂,这也是所有功能在实现时最初选择的方案。这种方案实现起来简单,在并发量不高且调用链路不长的情况下,是最好的选择方案,因为简单所以不容易出错和容易维护。如图所示,一个请求按照1-2-3-4的顺序。

2. 异步消息队列方案
由于系统是账户系统,每天的交易量大概在50-100w左右,且大部分交易集中在某个时间点,这就致使系统必须要支持高并发,异步方案也是该系统在最早设计时就已经选择好了。异步化后的请求必须要保证成功且需要有持久化的能力,基于此选择了消息中间件,在对比了几款中间件后,发现rabbitmq比较符合现有的业务。
如图所示,同颜色的线属于一个同步流程,不同线的表示异步,流程讲解如下:
主流程1->2->3:应用端发起交易,账户系统走完主流程后,将调用外部系统的请求放入MQ消息队列,则直接响应应用端成功,应用端不关心后续异步操作
MQ通知①:MQ服务器收到消息后,回调账户系统,通知已收到消息。
消息消费⑴->⑵->⑶:监听到队列有新消息,调用外部系统,完成请求。
优点:快速响应应用端,提高并发量。
缺点:在主流程的第2步,可能由于网络原因导致MQ没收到消息,造成主流程成功响应应用端成功,但是由于消息丢失造成后续的异步处理失败。

3. 100%可靠发送及幂等消费
基于方案2的缺点,为了避免由于网络异常造成的发送消息成功但实际MQ没收到消息的情况,增加了一种发送成功后保证MQ能100%收到消息的机制,即使MQ宕机也能知道哪些消息是MQ没收到的。
如图所示,与方案2的对比,不同的是增加了缓存标志,即在发送消息到MQ前,先将该消息缓存到redis作为一个标志(如 2 保存标志),表明发送进行中,等到MQ收到消息并回调通知成功时才将该标志删除(如 ② 删除标志);
增加了定时任务:

4.基于死信队列实现的延迟队列
基于方案3,基本能保证不会受网络异常的影响导致消息丢失的情况出现,至此,发送端的保证已经完成,但是消费端还有些不理想。
正常情况下,消费者在消费完消息后,会通知MQ告知已经消费成功,MQ收到后则从队列删除消息。如果告知消费失败,则该消息会重新回到队列重新被消费者监听并且获取。对应到RabbitMQ有以下三种情况:
基于以上三种情况,如果消息消费失败时,希望的是消息重回队列,隔一段时间后再被消费,也就是消息具备延迟的效果,但是找遍了官网,发现不支持延迟的机制。如果不延迟消费,那么消息一回到队列又会马上被消费,如果外部系统在一段时间内没有修复,那么在这段时间内的重复消费都属于无效重试且浪费性能。
脑壳疼ing。。。下班回家在地铁上头脑风暴时,终于灵机一动,联想到了死信队列的一个功能,那就是可以设置消息在指定时间内没被消费的话,就认定为是死亡消息,则该消息会被转到对应的死信队列。比如,正常队列A,B作为A的死信队列,设置A队列的消息的死亡时间为n秒,如果n秒内没被消费,则会自动转移到B队列。如图当时马上在备忘录记下来。

如图所示,在一个消息消费失败后的做法如下:
消息消费失败发送no_ack,让消息回到队列,并记录失败次数
重复消费失败超过三次后,发送rejected,让消息转移到死信队列B
由于死信队列B无消费者,所以消息在n秒后会转移到死信队列C(在这一步起到延迟的效果)
队列C的消费者消费死亡消息,将消息重新发送到正常队列A

总结
基于以上的最终方案,在测试同事的压测下,大概500TPS/秒,不过没有模拟数据库方面的瓶颈(往数据库插入一定量级的数据)。
由于代码跟项目有关,所以就暂时不发源码,等我把公司业务相关的移除掉后,再基于最终方案做个demo发到git上。
后续会进行优化,比如:


消费者虽然恢复了,但是一时间堆积了海量的数据,消费完需要一定的时间。为了不对正常的MQ造成影响,一种解决方案是先快速消费调堆积的消息,但原来的消费者是带有逻辑的,处理完一条消息可能需要200ms左右,所以为了快速消费,就先启动一个临时消费者,只做转发逻辑大概消费一条消息只需要10ms。临时消费者将消息转发到一个临时MQ,接着再启动n个原来带逻辑的消费者去消费,这样可以达到快速消费堆积消息的效果。




到此,相信大家对“怎么实现Java异步延迟消息队列”有了更深的了解,不妨来实际操作一番吧!这里是天达云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!