5.1 Topic模式简介
RabbitMQ的Topic模式的交换机类型就是topic,这里的routing key 类似于一个表达式可以进行模糊匹配对应的routing key。这里的*号表示一个单词,#号表示一个或者多个单词。也就是说想要队列里面能接收到消息路由Key必须要是 .orange.前后各一个单词或者是单词.单词.rabbit或者是lazy.单词(多个)这种形式的。
5.2 Topic模式实现
Topic模式的生产者实现,从代码中可以看出本次使用的交换机的类型为Topic,与Routing模式差不多,不同的是路由key变成了表达式的形式,可以进行模糊的匹配。
public class TopicProducer {
@SneakyThrows
public static void main(String[] args) {
// 1、建立连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置连接参数
connectionFactory.setHost("192.168.133.128");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/"); // 默认就是/
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
// 3、创建连接
Connection connection = connectionFactory.newConnection();
// 4、创建channel
Channel channel = connection.createChannel();
// 5、创建交换机
/**
* 参数介绍
* String exchange 交换机名称
* BuiltinExchangeType type 交换机类型 direct:定向方式
* fanout:扇形(广播) ,发送消息给每一个与之绑定的队列
* topic: 通配符方式 headers:参数匹配方式
* boolean durable 是否持久化
* boolean autoDelete 是否自动删除
* boolean internal 内部使用
* Map arguments 参数列表
*/
String exchangeName = "exchangeName_Topic";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, false, null);
// 6、创建队列
String queueName1 = "topic_queue_name_1";
String queueName2 = "topic_queue_name_2";
channel.queueDeclare(queueName1, true, false, false, null);
channel.queueDeclare(queueName2, true, false, false, null);
// 7、绑定交换机和队列
/**
* 参数介绍
* String queue 队列名称
* String exchange 交换机名称
* String routingKey 路由名称,交换机类型为fanout,routingKey默认为空字符串
* Map arguments 参数列表
*/
// 队列1绑定error级别的日志
channel.queueBind(queueName1, exchangeName, "#.error", null);
// 队列2绑定info和debug级别的日志
channel.queueBind(queueName2, exchangeName, "*.info", null);
channel.queueBind(queueName2, exchangeName, "debug.*", null);
// 8、发送消息
String body_error = "hello com.dream house ===> error";
String body_info = "hello com.dream house ===> info";
channel.basicPublish(exchangeName, "order.error", null, body_error.getBytes());
channel.basicPublish(exchangeName, "error.info", null, body_error.getBytes());
channel.basicPublish(exchangeName, "fight.info", null, body_info.getBytes());
// 9、关闭资源
channel.close();
connection.close();
}
}
Topic模式的消费者实现,消费者实际是差不多的,还是只需要监听对应需要自己的消费的队列,就能够进行消息的消费。
public class TopicConsumer1 {
@SneakyThrows
public static void main(String[] args) {
// 1、建立连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置连接参数
connectionFactory.setHost("192.168.133.128");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/"); // 默认就是/
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
// 3、创建连接
Connection connection = connectionFactory.newConnection();
// 4、创建channel
Channel channel = connection.createChannel();
// 5、创建队列
/**
* 参数介绍
* queue 队列名称
* durable 是否持久化,当MQ重启后还在
* exclusive 是否独占(只能有一个消费者监听这个队列),当Connection关闭时是否删除队列,
* autoDelete 是否自动删除,当没有消费者的时候会自动删除掉
* arguments 参数信息
*/
// 如果没有一个叫dream_house的队列就会自动创建一个队列
// channel.queueDeclare("dream_house", true, false, false, null);
// 6、接收消息
/**
* 参数介绍
* String queue 队列名称
* boolean autoAck 是否自动确认
* Consumer callback 回调函数
*/
Consumer consumer = new DefaultConsumer(channel) {
// 回调方法当收到消息后自动执行方法,匿名内部类
/**
* 参数介绍
* @param consumerTag 消息标识
* @param envelope 获取一些信息,比如交换机,路由
* @param properties 配置信息
* @param body 真实的报文信息
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// super.handleDelivery(consumerTag, envelope, properties, body);
System.out.println("body:" + new String(body));
}
};
channel.basicConsume("topic_queue_name_1", true, consumer);
}
}
5.3 Topic模式总结
Topic模式实际上跟Routing模式很是相似,然后只是交换机的类型不一致还有就是Topic模式的路由Key可以进行表达式匹配。
Tags:basicconsume