MySQL, Oracle, Linux, 软件架构及大数据技术知识分享平台

网站首页 > 精选文章 / 正文

RabbitMQ实现延迟队列的两种方式

2024-12-02 19:31 huorong 精选文章 10 ℃ 0 评论

RabbitMQ实现延迟对类有两种方式:

一、使用死信队列

二、使用延迟队列插件

使用死信队列

首先引入对应的三方依赖包:

implementation 'org.springframework.cloud:spring-cloud-starter-bus-amqp:2.1.0.RELEASE' //可根据自己喜好引入版本

然后创建连接:

private static final String HOST = "127.0.0.1";
private static final int PORT = 5672;
private static final String USERNAME = "guest";
private static final String PASSWORD = "guest";
private static final String VIRTUAL_HOST = "/";

private Connection getConnection() {
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost(HOST);
    connectionFactory.setPort(PORT);
    connectionFactory.setUsername(USERNAME);
    connectionFactory.setPassword(PASSWORD);
    connectionFactory.setVirtualHost(VIRTUAL_HOST);
    Connection connection = null;
    try {
        connection = connectionFactory.newConnection();
    } catch (Exception e) {
        e.printStackTrace();
    }
    return connection;
}

创建死信消息完整过程如下:

@PostMapping("/createDelayQueue")
public String createDelayQueue() {
    Connection connection = getConnection();
    Channel channel = null;
    try {
        channel = connection.createChannel();
        channel.exchangeDeclare("delay_exchange", BuiltinExchangeType.DIRECT);
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", "handler_exchange");//消息过期后转发到的交换机
        arguments.put("x-message-ttl", 5000);//消息过期时间
        arguments.put("x-dead-letter-routing-key", "handler_routing_key");//消息过期后要转发到的队列的routingKey

        channel.queueDeclare("delay_queue", true, false, false, arguments);
        channel.queueBind("delay_queue", "delay_exchange", "delay_routing_key");

        for (int i = 0; i < 200; i++) {
            AMQP.BasicProperties properties = new AMQP.BasicProperties();
            channel.basicPublish("delay_exchange", "delay_routing_key", properties, ("delay_queue" + i + ":").getBytes());
        }

    } catch (IOException e) {
        e.printStackTrace();
    }
    return "Success";
}

我们定义了一个交换机 delay_exchange

channel.exchangeDeclare("delay_exchange", BuiltinExchangeType.DIRECT);

并定义了 delay_queue 的死信交换机:

Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "handler_exchange");//消息过期后转发到的交换机
arguments.put("x-message-ttl", 5000);//消息过期时间
arguments.put("x-dead-letter-routing-key", "handler_routing_key");//消息过期后要转发到的队列的routingKey

声明 delay_queue 队列并与 delay_exchange 交换机进行绑定:

channel.queueDeclare("delay_queue", true, false, false, arguments);
channel.queueBind("delay_queue", "delay_exchange", "delay_routing_key");

那么死信消息的创建就完成了,接下来进行消息推送:

for (int i = 0; i < 200; i++) {
    AMQP.BasicProperties properties = new AMQP.BasicProperties();
    channel.basicPublish("delay_exchange", "delay_routing_key", properties, ("delay_queue" + i + ":").getBytes());
}

由于我们还未给死信交换机 handler_exchange 绑定死信队列,那么往 delay_queue 发送的200条消息在5秒钟超时后将会丢失。

接下来我们绑定一下死信队列:

@PostMapping("/createDelayQueueConsumer")
public String createDelayQueueConsumer() {
    Connection connection = getConnection();
    Channel channel = null;
    try {
        channel = connection.createChannel();
        channel.exchangeDeclare("handler_exchange", BuiltinExchangeType.DIRECT);
        channel.queueDeclare("handler_queue", true, false, false, null);
        channel.queueBind("handler_queue", "handler_exchange", "handler_routing_key");
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleConsumeOk(String consumerTag) {
                System.out.println("handleConsumeOk:" + consumerTag);
                super.handleConsumeOk(consumerTag);
            }

            @Override
            public void handleCancelOk(String consumerTag) {
                System.out.println("handleCancelOk:" + consumerTag);
                super.handleCancelOk(consumerTag);
            }

            @Override
            public void handleCancel(String consumerTag) throws IOException {
                System.out.println("handleCancel:" + consumerTag);
                super.handleCancel(consumerTag);
            }

            @Override
            public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
                System.out.println("handleShutdownSignal:" + consumerTag);
                super.handleShutdownSignal(consumerTag, sig);
            }

            @Override
            public void handleRecoverOk(String consumerTag) {
                System.out.println("handleRecoverOk:" + consumerTag);
                super.handleRecoverOk(consumerTag);
            }

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("handleDelivery : consumerTag : " + consumerTag);
                System.out.println("handleDelivery:" + new String(body));
                getChannel().basicAck(envelope.getDeliveryTag(), false);
            }

            @Override
            public Channel getChannel() {
                return super.getChannel();
            }

            @Override
            public String getConsumerTag() {
                return super.getConsumerTag();
            }
        };
        channel.basicConsume("handler_queue", consumer);
    } catch (Exception e) {
        e.printStackTrace();
    }
    return "Success";
}

核心代码如下:

channel.exchangeDeclare("handler_exchange", BuiltinExchangeType.DIRECT);
channel.queueDeclare("handler_queue", true, false, false, null);
channel.queueBind("handler_queue", "handler_exchange", "handler_routing_key");

那么调用两个接口请求:

http://localhost:8080/createDelayQueueConsumer

http://llocalhost:8888/createDelayQueue

结果如下:



死信队列原理如下:

我们首先声明了一个普通交换机 delay_exchange 并绑定了队列 delay_queue ,并设置了5秒钟的消息超时时间,由于delay_queue 没有定义消费者,因此我们往 delay_queue 发消息后,最终会超时并将消息转发到死信交换机 handler_exchange 上,并由死信交换机 handler_exchange 绑定的死信队列 handler_queue 消费。

使用延迟队列插件

在RabbitMQ 3.5.7及以后的版本提供了一个插件(rabbitmq-delayed-message-exchange)来实现延时队列功能。同时插件依赖Erlang/OPT 18.0及以上。

插件下载地址:

https://bintray.com/rabbitmq/community-plugins/rabbitmq_delayed_message_exchange

进入插件目录后,启用插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

停用插件命令:

rabbitmq-plugins disable rabbitmq_delayed_message_exchange

启动好之后,打开RabbitMQ 控制台进行确认:


在 Exchange 选项上多了一个 x-delayed-message 的交换机类型,证明已经将插件安装启动完毕。

接下来创建延迟队列:

@PostMapping("/createDelayQueueByPlugin")
public String createDelayQueueByPlugin() {
    Connection connection = getConnection();
    Channel channel = null;
    try {
        channel = connection.createChannel();
        Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-delayed-type", "direct");
        channel.exchangeDeclare("x-delayed-exchange", "x-delayed-message", true, false, args);
        // 消息内容
        for (int i = 0; i < 5; i++) {
            Map<String, Object> headers = new HashMap<String, Object>();
            long delayTime = 2000;
            if (i % 2 == 0) {
                delayTime = 5000;
            }
            headers.put("x-delay", delayTime);//消息延迟时间
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
                    .headers(headers).build();
            String message = "序号:" + i + ",时间:" + System.currentTimeMillis();
            channel.basicPublish("x-delayed-exchange", "delay", props, message.getBytes());
            System.out.println("Sent message:" + message);
        }

    } catch (Exception e) {
        e.printStackTrace();
    }
    return "Success";
}

核心代码如下:

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("x-delayed-exchange", "x-delayed-message", true, false, args);

首先声明交换机的类型,即我们刚刚安装的延迟消息的类型(x-delayed-message)。接着我们在发送消息时设置消息的延迟时间:

headers.put("x-delay", delayTime);//消息延迟时间
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
        .headers(headers).build();

最后把消息发送出去:

channel.basicPublish("x-delayed-exchange", "delay", props, message.getBytes());

看上去是不是比死信队列简单多了?

接下来我们定义消费者:

@PostMapping("/createDelayQueueConsumerByPlugin")
public String createDelayQueueConsumerByPlugin() {
    Connection connection = getConnection();
    try {
        final Channel channel = connection.createChannel();

        //声明x-delayed-type类型的exchange
        Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-delayed-type", "direct");
        channel.exchangeDeclare("x-delayed-exchange", "x-delayed-message", true, false, args);
        // 声明队列
        channel.queueDeclare("delay_plugin_queue", true, false, false, null);

        // 绑定队列到交换机
        channel.queueBind("delay_plugin_queue", "x-delayed-exchange", "delay");

        // 指该消费者在接收到队列里的消息但没有返回确认结果之前,它不会将新的消息分发给它。
        channel.basicQos(1);

        // 定义队列的消费者
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("收到消息:" + new String(body) + ",当前时间:" + System.currentTimeMillis());
                // 消费者手动发送ack应答
                channel.basicAck(envelope.getDeliveryTag(), false);
            }

        };
        channel.basicConsume("delay_plugin_queue", consumer);
    } catch (Exception e) {
        e.printStackTrace();
    }
    // 监听队列
    return "Success";
}

跟定义普通的队列没有区别,也是声明交换机、声明对类,然后将队列和交换机进行绑定,routingKey delay :

//声明x-delayed-type类型的exchange
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("x-delayed-exchange", "x-delayed-message", true, false, args);
// 声明队列
channel.queueDeclare("delay_plugin_queue", true, false, false, null);
// 绑定队列到交换机
channel.queueBind("delay_plugin_queue", "x-delayed-exchange", "delay");

运行一下代码,分别依次请求一下以下地址:

http://localhost:8080/createDelayQueueConsumerByPlugin

http://localhost:8080/createDelayQueueByPlugin

首先看下第一个请求的控制台输出:

Sent message:序号:0,时间:1633697085740
Sent message:序号:1,时间:1633697085743
Sent message:序号:2,时间:1633697085743
Sent message:序号:3,时间:1633697085743
Sent message:序号:4,时间:1633697085743

可以看到消息是按顺序发送出去的。再看下第二个请求的控制台输出:

收到消息:序号:1,时间:1633697085743,当前时间:1633697087769
收到消息:序号:3,时间:1633697085743,当前时间:1633697087773
收到消息:序号:0,时间:1633697085740,当前时间:1633697090756
收到消息:序号:2,时间:1633697085743,当前时间:1633697090757
收到消息:序号:4,时间:1633697085743,当前时间:1633697090758

由于序号 1 和 3 设置的延迟时间是 2秒,0、2、4 设置的超时时间是5秒,因此 1 和 3 优先被消费,接着是 0、2、4。

因此延迟队列插件的实现也完成了。

总结:

使用延迟队列插件的方式比较贴近人的思维方式,使用也比较简单,因此个人比较推荐用插件的方式来实现延迟队列。

Tags:rabbitmq queue

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言