MySQL, Oracle, Linux, 软件架构及大数据技术知识分享平台

网站首页 > 精选文章 / 正文

详解如何利用RabbitMQ生产一个简单的消息

2025-02-06 15:02 huorong 精选文章 3 ℃ 0 评论

最近业务中有有这样一个场景,就是用户在商城下单之后,如果30分钟没有付款,那么就需要将这个订单处理掉,要么直接删除,要么直接标识为失效状态,为什么要这么做?

  • 1、库存,用户在下单之后,会锁定一个库存,如果用户一直不支付,那么就会占用库存,影响别的用户购买,
  • 2、随着业务的发展,用户量的增加,我们的订单数据会越来越多,那么我们要及时的清理无效的订单,提升系统的性能;

曾经的纯洁无瑕

首先说下,我曾经那些纯洁无瑕的想法,第一次看到这种需求的时候,如果要清理失效的订单,那我直接写一个定时任务,5分钟或者10分钟跑一次,删除过期的订单,增加库存。

定时任务确实可以解决上面的问题,但是存在一个现实的问题,那就是数据库压力,甚至影响整个系统的性能,如果你是几百、几千个订单还好,要是十几万、甚至上百万,那么此方法肯定是行不通的。

进阶版本

既然发现了上面的方法不行,那么就重新想办法,有什么办法可以不用查询数据库,就可以知道哪些订单快要过期,再这样的思考下,我用Redis做了一个消息队列,当生成订单的时候,生成一个延迟10分钟消息,10分钟结束的时候,就会从Redis的队列中将这个订单取出来,然后这样我就可以将这个订单删除,增加库存。

Redis做消息队列存在的问题

之前用一个全新的Redis做消息队列,存储的数据也不多,感觉还好,当随着业务增加,Redis存储超过90%的时候,大量的消息没有被消费,就是消息丢失很严重。那么这样肯定是不行的。

删除订单,增加库存这是不能有太多误差的事情,所以Redis消息队列已经不能满足我的需求,那么就需要可靠性高的消息队列,也就是我们这次要介绍的RabbitMQ。

RabbitMQ安装与面板介绍

这里我就不跟大家介绍如何安装RabbitMQ了,网上其实有很多这种教程,所以大家自行搜索吧。重点要跟大家说下,RabbitMQ的面板,我们的消息队列,以及消息都是可以在面板上看到的。我是用的MQ的版本是3.8,各个版本之间的面板多多少少可能有点不太一样。

第一次接触的话,我们不要想着全部我一次性都看懂,都知道是干嘛的,我觉得没必要,先熟悉最基础的,我在上面圈了两个地方Queue、Admin和Add a new queue 这个三个是最基本的,我们学习必须要用的东西。

  • Queue:这个就是我们声明的消息队列;
  • Admin:用户管理,RabbitMQ默认有一个用户是guest,但是RabbitMQ神奇的就是每个库都必须创建一个用户角色;
  • Add a new queue:这个就是创建一个新的队列,但是我们一般不这么直接创建,而是在代码中创建;

再来补充下Admin吧,首先要告诉大家一个基本的东西,就是如果想要声明一个队列,那么你必须要有一个库(比喻手法),队列存在于库中,可以想象下mysql,是不是得先有库,再有表。MQ也是这样的。

右边的Virtual Hosts就是创建库,Name就是库名,写的时候前面必须加/

如果细心的朋友可以看到第一张图中两个队列前面就是库名,标识队列存在与xiaoshuo库中。

一个简单的消息队列

当生产者生产出消息之后,发送到队列中,消费者监听到队列中有消息进行消费,那么我们本篇就先实现一个简单的消息队列。

代码

我们不使用SpringBoot框架,我们就从基本的写,人生原生的API,这样以后才懂得什么是怎么回事。

1、引入需要的pom文件

    com.rabbitmq    amqp-client    4.0.2     org.slf4j    slf4j-api    1.7.10     org.slf4j    slf4j-log4j12    1.7.5     log4j    log4j    1.2.17     junit    junit    4.11

2、连接RabbitMQ

/** * @description: TODO MQ连接工厂 * @author: bingfeng * @create: 2020-05-07 08:55 */public class MQConnectUtil {     public static Connection getConnection() throws IOException, TimeoutException {         // 定义连接工厂        ConnectionFactory factory = new ConnectionFactory();         // 设置连接地址        factory.setHost("127.0.0.1");         // 设置端口        factory.setPort(5672);         // 选择 vhost        factory.setVirtualHost("/xiaoshuo");         // 设置用户名        factory.setUsername("bingfeng");         // 密码        factory.setPassword("123");         return factory.newConnection();    }}

3、发送消息

/** * @description: TODO 模拟发送消息 * @author: bingfeng * @create: 2020-05-07 09:01 */public class Producer {     /**     * 队列名     */    public static final String QUEUE_NAME = "simple_queue_test";     public static void main(String[] args) throws IOException, TimeoutException {         Connection connection = MQConnectUtil.getConnection();         // 从连接中获取一个通道        Channel channel = connection.createChannel();         // 创建队列声明        channel.queueDeclare(QUEUE_NAME, false, false, false, null);         // 消息内容        String msg = "你好,冰峰!";         channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());         System.out.println("发送消息:" + msg);         channel.close();        connection.close();    }}

4、消费消息

/** * @description: TODO 消费MQ * @author: bingfeng * @create: 2020-05-07 09:11 */public class Consumer {     public static final String QUEUE_NAME = "simple_queue_test";     public static void main(String[] args) throws Exception {         // 获取连接        Connection connection = MQConnectUtil.getConnection();         // 创建频道        Channel channel = connection.createChannel();         // 队列声明        channel.queueDeclare(QUEUE_NAME, false, false, false, null);         // 定义消费者        DefaultConsumer consumer = new DefaultConsumer(channel) {            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {                String msg = new String(body, StandardCharsets.UTF_8);                 System.out.println("消费消息:" + msg);            }        };         // 监听队列        channel.basicConsume(QUEUE_NAME, true, consumer);    }}

当我们发送一个消息之后,我们可以直接在面板看到队列中的消息数

点进来之后,拉到下面,Messages就是我们想要查看的消息数量,就是你想看几条消息填几就行了,填完之后点击下面的Get Messages,我们最近的消息就会显示在下面。

当我们消费了这个消息之后,队列中就没有这个消息了。

最后的话

今天就写到这,我打算把RabbitMQ从入门到项目中的实战用法全部一级一级分享给大家,一是防止自己以后忘记可以回来翻翻看,二是分享给大家有兴趣的朋友可以一起学习,后面我也会把我之前说的,订单过期删除的业务场景也会写一遍。

熟悉RabbitMQ的朋友,可能看下来解决写的东西很简单,但是毕竟也有很多人实际工作中并没有用过MQ,自己可能也没有了解过,对于没有了解过的朋友来说,我觉得入个门还是挺不错的。

有什么问题,欢迎大家下方solo。

Tags:basicconsume

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言