MySQL, Oracle, Linux, 软件架构及大数据技术知识分享平台

网站首页 > 精选文章 / 正文

RabbitMQ基础(Topic广播模式)

2025-02-06 15:03 huorong 精选文章 2 ℃ 0 评论

5.1 Topic模式简介

RabbitMQ的Topic模式的交换机类型就是topic,这里的routing key 类似于一个表达式可以进行模糊匹配对应的routing key。这里的*号表示一个单词,#号表示一个或者多个单词。也就是说想要队列里面能接收到消息路由Key必须要是 .orange.前后各一个单词或者是单词.单词.rabbit或者是lazy.单词(多个)这种形式的。

5.2 Topic模式实现

Topic模式的生产者实现,从代码中可以看出本次使用的交换机的类型为Topic,与Routing模式差不多,不同的是路由key变成了表达式的形式,可以进行模糊的匹配。

public class TopicProducer {
    @SneakyThrows
    public static void main(String[] args) {
        // 1、建立连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();

        // 2、设置连接参数
        connectionFactory.setHost("192.168.133.128");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/"); // 默认就是/
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");

        // 3、创建连接
        Connection connection = connectionFactory.newConnection();

        // 4、创建channel
        Channel channel = connection.createChannel();

        // 5、创建交换机
        /**
         * 参数介绍
         * String exchange 交换机名称
         * BuiltinExchangeType type 交换机类型  direct:定向方式
         * fanout:扇形(广播) ,发送消息给每一个与之绑定的队列
         * topic: 通配符方式   headers:参数匹配方式
         * boolean durable 是否持久化
         * boolean autoDelete 是否自动删除
         * boolean internal 内部使用
         * Map arguments 参数列表
         */
        String exchangeName = "exchangeName_Topic";
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, false, null);

        // 6、创建队列
        String queueName1 = "topic_queue_name_1";
        String queueName2 = "topic_queue_name_2";
        channel.queueDeclare(queueName1, true, false, false, null);
        channel.queueDeclare(queueName2, true, false, false, null);

        // 7、绑定交换机和队列
        /**
         * 参数介绍
         * String queue 队列名称
         * String exchange 交换机名称
         * String routingKey 路由名称,交换机类型为fanout,routingKey默认为空字符串
         * Map arguments 参数列表
         */
        // 队列1绑定error级别的日志
        channel.queueBind(queueName1, exchangeName, "#.error", null);

        // 队列2绑定info和debug级别的日志
        channel.queueBind(queueName2, exchangeName, "*.info", null);
        channel.queueBind(queueName2, exchangeName, "debug.*", null);

        // 8、发送消息
        String body_error = "hello com.dream house ===> error";
        String body_info = "hello com.dream house ===> info";
        channel.basicPublish(exchangeName, "order.error", null, body_error.getBytes());
        channel.basicPublish(exchangeName, "error.info", null, body_error.getBytes());
        channel.basicPublish(exchangeName, "fight.info", null, body_info.getBytes());

        // 9、关闭资源
        channel.close();
        connection.close();
    }
}

Topic模式的消费者实现,消费者实际是差不多的,还是只需要监听对应需要自己的消费的队列,就能够进行消息的消费。

public class TopicConsumer1 {
    @SneakyThrows
    public static void main(String[] args) {
        // 1、建立连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();

        // 2、设置连接参数
        connectionFactory.setHost("192.168.133.128");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/"); // 默认就是/
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");

        // 3、创建连接
        Connection connection = connectionFactory.newConnection();

        // 4、创建channel
        Channel channel = connection.createChannel();

        // 5、创建队列
        /**
         * 参数介绍
         * queue 队列名称
         * durable 是否持久化,当MQ重启后还在
         * exclusive 是否独占(只能有一个消费者监听这个队列),当Connection关闭时是否删除队列,
         * autoDelete 是否自动删除,当没有消费者的时候会自动删除掉
         * arguments 参数信息
         */
        // 如果没有一个叫dream_house的队列就会自动创建一个队列
        // channel.queueDeclare("dream_house", true, false, false, null);

        // 6、接收消息
        /**
         * 参数介绍
         * String queue  队列名称
         * boolean autoAck 是否自动确认
         * Consumer callback 回调函数
         */
        Consumer consumer = new DefaultConsumer(channel) {
            // 回调方法当收到消息后自动执行方法,匿名内部类

            /**
             * 参数介绍
             * @param consumerTag 消息标识
             * @param envelope 获取一些信息,比如交换机,路由
             * @param properties 配置信息
             * @param body 真实的报文信息
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                super.handleDelivery(consumerTag, envelope, properties, body);
                System.out.println("body:" + new String(body));
            }
        };
        channel.basicConsume("topic_queue_name_1", true, consumer);
    }
}

5.3 Topic模式总结

Topic模式实际上跟Routing模式很是相似,然后只是交换机的类型不一致还有就是Topic模式的路由Key可以进行表达式匹配。

Tags:basicconsume

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言