对于消息中间件来说,producer 将消息发送到mq的服务器上,但并不希望这条消息马上被消费,而是推迟到当前时间节点之后的某个时间点,再将消息投递到 queue 中让 consumer 进行消费。
二、延迟消息的使用场景 延迟消息的使用场景很多,一种比较常见的场景就是在电商系统中,订单创建后,会有一个等待用户支付的时间窗口,一般为30分钟,30分钟后 customer 会收到这条订单消息,然后程序去订单表中检查当前这条订单的支付状态,如果是未支付的状态,则自动清理掉(取消订单释放库存),这样就不需要使用定时任务的方式去处理了。
三、Rocket的延迟消息 使用限制级别RocketMQ 支持定时的延迟消息,但是不支持任意时间精度,仅支持特定的 level(共18级),例如定时 5s, 10s, 1m 等。其中,level=0 级表示不延时,level=1 表示 1 级延时,level=2 表示 2 级延时,以此类推。
支持的level如下:level=0 级表示不延时,level=1 表示 延时1s,level=2 表示 延时5s
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h四、延迟消息 的整合实现 4.1创建Springboot项目,添加rockermq 依赖
4.2配置rocketmqorg.apache.rocketmq rocketmq-spring-boot-starter 2.2.1
# 端口 server: port: 8083 # 配置 rocketmq rocketmq: name-server: 127.0.0.1:9876 #生产者 producer: #生产者组名,规定在一个应用里面必须唯一 group: group1 #消息发送的超时时间 默认3000ms send-message-timeout: 3000 #消息达到4096字节的时候,消息就会被压缩。默认 4096 compress-message-body-threshold: 4096 #最大的消息限制,默认为128K max-message-size: 4194304 #同步消息发送失败重试次数 retry-times-when-send-failed: 3 #在内部发送失败时是否重试其他代理,这个参数在有多个broker时才生效 retry-next-server: true #异步消息发送失败重试的次数 retry-times-when-send-async-failed: 34.3 生产者 发送延时消息
package com.example.springbootrocketdemo.controller; import com.alibaba.fastjson.JSONObject; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.time.LocalDateTime; import java.util.HashMap; import java.util.Map; @RestController public class RocketMQDelayCOntroller { @Autowired private RocketMQTemplate rocketMQTemplate; @RequestMapping("/testDelaySend") public void testDelaySend(){ Map4.4 消费者 监听延时消息,消费消息orderMap = new HashMap<>(); orderMap.put("orderNumber","1357890"); orderMap.put("createTime", LocalDateTime.now()); //参数一:topic 如果想添加tag,可以使用"topic:tag"的写法 //参数二:Message> //参数三:消息发送超时时间 //参数四:delayLevel 延时level messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h rocketMQTemplate.syncSend("test-topic-delay", MessageBuilder.withPayload(JSONObject.toJSONString(orderMap)).build(),3000,3); } }
package com.example.springbootrocketdemo.config; import com.alibaba.fastjson.JSONObject; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; import java.util.Map; @Service @RocketMQMessageListener(consumerGroup = "test-delay",topic = "test-topic-delay") public class RocketMQDelayConsumerListener implements RocketMQListener{ @Override public void onMessage(String s) { Map orderMap = JSONObject.parseObject(s,Map.class); String orderNumber = String.valueOf(orderMap.get("orderNumber")); String createTime = String.valueOf(orderMap.get("createTime")); //根据orderNumber 查询订单状态,若为未支付,则消息订单并修改库存 //.... System.out.println("consumer 延时消息消费 orderNumber:"+orderNumber+",createTime:"+createTime); } }
消费者类要实现RocketMQListener接口,以及动态指定消息类型String。
类上要加上@RocketMQMessageListener注解,指定topic主题test-topic-delay,以及消费者组test-delay
延时消息发送与接收搭建完毕!
4.5启动服务,测试延时消息测试OK,成功消费!