网站首页 > 精选文章 / 正文
RabbitMQ实现延迟对类有两种方式:
一、使用死信队列
二、使用延迟队列插件
使用死信队列
首先引入对应的三方依赖包:
implementation 'org.springframework.cloud:spring-cloud-starter-bus-amqp:2.1.0.RELEASE' //可根据自己喜好引入版本
然后创建连接:
private static final String HOST = "127.0.0.1";
private static final int PORT = 5672;
private static final String USERNAME = "guest";
private static final String PASSWORD = "guest";
private static final String VIRTUAL_HOST = "/";
private Connection getConnection() {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(HOST);
connectionFactory.setPort(PORT);
connectionFactory.setUsername(USERNAME);
connectionFactory.setPassword(PASSWORD);
connectionFactory.setVirtualHost(VIRTUAL_HOST);
Connection connection = null;
try {
connection = connectionFactory.newConnection();
} catch (Exception e) {
e.printStackTrace();
}
return connection;
}
创建死信消息完整过程如下:
@PostMapping("/createDelayQueue")
public String createDelayQueue() {
Connection connection = getConnection();
Channel channel = null;
try {
channel = connection.createChannel();
channel.exchangeDeclare("delay_exchange", BuiltinExchangeType.DIRECT);
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "handler_exchange");//消息过期后转发到的交换机
arguments.put("x-message-ttl", 5000);//消息过期时间
arguments.put("x-dead-letter-routing-key", "handler_routing_key");//消息过期后要转发到的队列的routingKey
channel.queueDeclare("delay_queue", true, false, false, arguments);
channel.queueBind("delay_queue", "delay_exchange", "delay_routing_key");
for (int i = 0; i < 200; i++) {
AMQP.BasicProperties properties = new AMQP.BasicProperties();
channel.basicPublish("delay_exchange", "delay_routing_key", properties, ("delay_queue" + i + ":").getBytes());
}
} catch (IOException e) {
e.printStackTrace();
}
return "Success";
}
我们定义了一个交换机 delay_exchange:
channel.exchangeDeclare("delay_exchange", BuiltinExchangeType.DIRECT);
并定义了 delay_queue 的死信交换机:
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "handler_exchange");//消息过期后转发到的交换机
arguments.put("x-message-ttl", 5000);//消息过期时间
arguments.put("x-dead-letter-routing-key", "handler_routing_key");//消息过期后要转发到的队列的routingKey
声明 delay_queue 队列并与 delay_exchange 交换机进行绑定:
channel.queueDeclare("delay_queue", true, false, false, arguments);
channel.queueBind("delay_queue", "delay_exchange", "delay_routing_key");
那么死信消息的创建就完成了,接下来进行消息推送:
for (int i = 0; i < 200; i++) {
AMQP.BasicProperties properties = new AMQP.BasicProperties();
channel.basicPublish("delay_exchange", "delay_routing_key", properties, ("delay_queue" + i + ":").getBytes());
}
由于我们还未给死信交换机 handler_exchange 绑定死信队列,那么往 delay_queue 发送的200条消息在5秒钟超时后将会丢失。
接下来我们绑定一下死信队列:
@PostMapping("/createDelayQueueConsumer")
public String createDelayQueueConsumer() {
Connection connection = getConnection();
Channel channel = null;
try {
channel = connection.createChannel();
channel.exchangeDeclare("handler_exchange", BuiltinExchangeType.DIRECT);
channel.queueDeclare("handler_queue", true, false, false, null);
channel.queueBind("handler_queue", "handler_exchange", "handler_routing_key");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleConsumeOk(String consumerTag) {
System.out.println("handleConsumeOk:" + consumerTag);
super.handleConsumeOk(consumerTag);
}
@Override
public void handleCancelOk(String consumerTag) {
System.out.println("handleCancelOk:" + consumerTag);
super.handleCancelOk(consumerTag);
}
@Override
public void handleCancel(String consumerTag) throws IOException {
System.out.println("handleCancel:" + consumerTag);
super.handleCancel(consumerTag);
}
@Override
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
System.out.println("handleShutdownSignal:" + consumerTag);
super.handleShutdownSignal(consumerTag, sig);
}
@Override
public void handleRecoverOk(String consumerTag) {
System.out.println("handleRecoverOk:" + consumerTag);
super.handleRecoverOk(consumerTag);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("handleDelivery : consumerTag : " + consumerTag);
System.out.println("handleDelivery:" + new String(body));
getChannel().basicAck(envelope.getDeliveryTag(), false);
}
@Override
public Channel getChannel() {
return super.getChannel();
}
@Override
public String getConsumerTag() {
return super.getConsumerTag();
}
};
channel.basicConsume("handler_queue", consumer);
} catch (Exception e) {
e.printStackTrace();
}
return "Success";
}
核心代码如下:
channel.exchangeDeclare("handler_exchange", BuiltinExchangeType.DIRECT);
channel.queueDeclare("handler_queue", true, false, false, null);
channel.queueBind("handler_queue", "handler_exchange", "handler_routing_key");
那么调用两个接口请求:
http://localhost:8080/createDelayQueueConsumer
http://llocalhost:8888/createDelayQueue
结果如下:
死信队列原理如下:
我们首先声明了一个普通交换机 delay_exchange 并绑定了队列 delay_queue ,并设置了5秒钟的消息超时时间,由于delay_queue 没有定义消费者,因此我们往 delay_queue 发消息后,最终会超时并将消息转发到死信交换机 handler_exchange 上,并由死信交换机 handler_exchange 绑定的死信队列 handler_queue 消费。
使用延迟队列插件
在RabbitMQ 3.5.7及以后的版本提供了一个插件(rabbitmq-delayed-message-exchange)来实现延时队列功能。同时插件依赖Erlang/OPT 18.0及以上。
插件下载地址:
https://bintray.com/rabbitmq/community-plugins/rabbitmq_delayed_message_exchange
进入插件目录后,启用插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
停用插件命令:
rabbitmq-plugins disable rabbitmq_delayed_message_exchange
启动好之后,打开RabbitMQ 控制台进行确认:
在 Exchange 选项上多了一个 x-delayed-message 的交换机类型,证明已经将插件安装启动完毕。
接下来创建延迟队列:
@PostMapping("/createDelayQueueByPlugin")
public String createDelayQueueByPlugin() {
Connection connection = getConnection();
Channel channel = null;
try {
channel = connection.createChannel();
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("x-delayed-exchange", "x-delayed-message", true, false, args);
// 消息内容
for (int i = 0; i < 5; i++) {
Map<String, Object> headers = new HashMap<String, Object>();
long delayTime = 2000;
if (i % 2 == 0) {
delayTime = 5000;
}
headers.put("x-delay", delayTime);//消息延迟时间
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.headers(headers).build();
String message = "序号:" + i + ",时间:" + System.currentTimeMillis();
channel.basicPublish("x-delayed-exchange", "delay", props, message.getBytes());
System.out.println("Sent message:" + message);
}
} catch (Exception e) {
e.printStackTrace();
}
return "Success";
}
核心代码如下:
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("x-delayed-exchange", "x-delayed-message", true, false, args);
首先声明交换机的类型,即我们刚刚安装的延迟消息的类型(x-delayed-message)。接着我们在发送消息时设置消息的延迟时间:
headers.put("x-delay", delayTime);//消息延迟时间
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.headers(headers).build();
最后把消息发送出去:
channel.basicPublish("x-delayed-exchange", "delay", props, message.getBytes());
看上去是不是比死信队列简单多了?
接下来我们定义消费者:
@PostMapping("/createDelayQueueConsumerByPlugin")
public String createDelayQueueConsumerByPlugin() {
Connection connection = getConnection();
try {
final Channel channel = connection.createChannel();
//声明x-delayed-type类型的exchange
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("x-delayed-exchange", "x-delayed-message", true, false, args);
// 声明队列
channel.queueDeclare("delay_plugin_queue", true, false, false, null);
// 绑定队列到交换机
channel.queueBind("delay_plugin_queue", "x-delayed-exchange", "delay");
// 指该消费者在接收到队列里的消息但没有返回确认结果之前,它不会将新的消息分发给它。
channel.basicQos(1);
// 定义队列的消费者
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("收到消息:" + new String(body) + ",当前时间:" + System.currentTimeMillis());
// 消费者手动发送ack应答
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume("delay_plugin_queue", consumer);
} catch (Exception e) {
e.printStackTrace();
}
// 监听队列
return "Success";
}
跟定义普通的队列没有区别,也是声明交换机、声明对类,然后将队列和交换机进行绑定,routingKey 是 delay :
//声明x-delayed-type类型的exchange
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("x-delayed-exchange", "x-delayed-message", true, false, args);
// 声明队列
channel.queueDeclare("delay_plugin_queue", true, false, false, null);
// 绑定队列到交换机
channel.queueBind("delay_plugin_queue", "x-delayed-exchange", "delay");
运行一下代码,分别依次请求一下以下地址:
http://localhost:8080/createDelayQueueConsumerByPlugin
http://localhost:8080/createDelayQueueByPlugin
首先看下第一个请求的控制台输出:
Sent message:序号:0,时间:1633697085740
Sent message:序号:1,时间:1633697085743
Sent message:序号:2,时间:1633697085743
Sent message:序号:3,时间:1633697085743
Sent message:序号:4,时间:1633697085743
可以看到消息是按顺序发送出去的。再看下第二个请求的控制台输出:
收到消息:序号:1,时间:1633697085743,当前时间:1633697087769
收到消息:序号:3,时间:1633697085743,当前时间:1633697087773
收到消息:序号:0,时间:1633697085740,当前时间:1633697090756
收到消息:序号:2,时间:1633697085743,当前时间:1633697090757
收到消息:序号:4,时间:1633697085743,当前时间:1633697090758
由于序号 1 和 3 设置的延迟时间是 2秒,0、2、4 设置的超时时间是5秒,因此 1 和 3 优先被消费,接着是 0、2、4。
因此延迟队列插件的实现也完成了。
总结:
使用延迟队列插件的方式比较贴近人的思维方式,使用也比较简单,因此个人比较推荐用插件的方式来实现延迟队列。
Tags:rabbitmq queue
猜你喜欢
- 2024-12-02 基于zabbix4.2 监控rabbitmq消息队列服务
- 2024-12-02 消息队列选型(RabbitMq、RocketMq、Kafaka)
- 2024-12-02 mq的那些破事儿,你不好奇吗?
- 2024-12-02 RabbitMQ镜像队列集群搭建、与SpringBoot整合
- 2024-12-02 你真的知道怎么实现一个延迟队列吗?|腾讯开发工程师干货分享
- 2024-12-02 一口气整理一波延时队列实现方案
- 2024-12-02 RabbitMQ 延迟队列
- 2024-12-02 zabbix4.0基于Rabbitmq网页API获取队列信息
- 2024-12-02 RabbitMQ 实现延迟队列的两种方式
- 2024-12-02 每日进步一点点:解读消息中间件—RabbitMQ(集群原理与搭建篇)