1、RabbitMQ的定义与主要特性(极简)
RabbitMQ是一个开源的消息代理软件,也称为消息队列(MQ),支持多种消息协议,包括AMQP(高级消息队列协议)。RabbitMQ由Erlang语言编写,提供了可靠性、消息持久性、集群支持和灵活的路由功能。
- RabbitMQ是通过信道的方式进行传输数据,将消息发布至交换机上。
- RabbitMQ常用的交换机(Exchange)有三种:fanout、direct、topic。
- RabbitMQ在吞吐方面略有不足,提供了更多的消息队列功能。
2、RocketMQ的定义与主要特性(极简)
Apache RocketMQ是一个开源的分布式消息和流媒体平台,是阿里的开源产品。RocketMQ在Kafka的基础上进行了一些改进,被设计为一个低延迟、高性能、高可靠性和可扩展的系统,适用于大规模分布式系统。
- 在订单,交易,binglog分发、充值等场景有良好的表现。
- RocketMQ单机理论上支持1万以上持久化队列。
- RocketMQ并没有在核心实现中直接支持 JMS(Java 消息服务)接口。
3、Kafka的定义与主要特性(极简)
Apache Kafka是一个分布式流处理平台,其本质是一个开源的消息发布订阅系统。
- kafka的核心优势是高吞吐量(理论上每秒可以处理几十万条消息)与低延迟(毫秒级延迟)。
- kafka使用Pull(拉取)方式消费消息,适用于大量的数据收集或传递场景(如日志的收集)。
- Kafka提供了幂等生产者,确保消息在发送过程中不会因为网络或其他问题导致重复发送,从而避免消息重复。
4、RabbitMQ、RocketMQ、Kafka的横向对比
特性/中间件 | Kafka | RabbitMQ | RocketMQ |
1、设计目标 | 大规模实时数据流处理 | 企业级消息队列,支持多种消息传递模式 | 可靠消息传递,高性能,分布式队列 |
2、数据模型 | 主题-分区(Topic-Partition) | 消息队列(Queue)、交换机(Exchange)、路由键(Routing Key) | 主题(Topic)、队列(Queue) |
3、主要特点 | 高吞吐量、低延迟、分布式日志系统 | 可靠性、灵活性、多种消息模式、事务支持 | 高吞吐量、低延迟、分布式事务支持、消息过滤 |
4、性能 | 高吞吐量,适合大数据场景 | 适中吞吐量,适合中小规模消息传递 | 高吞吐量,适合大规模消息传递 |
5、可扩展性 | 水平扩展,易于扩展 | 水平和垂直扩展 | 水平扩展,易于扩展 |
6、数据一致性 | 通过副本机制保证数据一致性 | 通过事务和确认机制保证数据一致性 | 通过事务消息和副本机制保证数据一致性 |
7、消息传递保证 | 至少一次、最多一次、精确一次 | 持久化、确认机制、死信队列 | 持久化、事务消息、死信队列 |
8、容错性 | 分区和副本机制 | Master-Slave复制、镜像队列 | 主从复制、消息复制 |
5、RabbitMQ、RocketMQ、Kafka的应用场景对比
应用场景 | Kafka | RabbitMQ | RocketMQ |
日志处理 | 适合 | 不适合 | 适合 |
流处理 | 适合 | 不适合 | 适合 |
事件源 | 适合 | 适合 | 适合 |
消息队列 | 适合 | 适合 | 适合 |
系统解耦 | 适合 | 适合 | 适合 |
异步处理 | 适合 | 适合 | 适合 |
负载均衡 | 适合 | 适合 | 适合 |
优先级队列 | 不适合 | 适合 | 不适合 |
分布式事务 | 适合 | 不适合 | 适合 |
数据同步 | 适合 | 适合 | 适合 |
简要总结 | 以高吞吐量和低延迟著称,主要用于大数据处理、实时分析和事件驱动架构,如日志聚合和数据流平台。 | 适用于企业级应用,强调消息的可靠性和灵活性,常用于任务队列、消息传递和应用解耦。 | 专为大规模分布式系统设计,支持复杂场景如金融级分布式事务,适合日志收集、流处理和大规模消息传递。 |
6、三种消息队列代码实现思路(以RocketMQ为例)
RabbitMQ实现(伪)代码示例—生产者:
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitMQProducer {
private final static String EXCHANGE_NAME = "test_exchange";
public static void main(String[] argv) throws IOException, TimeoutException {
// 创建连接和通道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost"); // 设置RabbitMQ服务的地址
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
// 消息内容
String message = "Hello, RabbitMQ!";
// 将消息发送到交换机
channel.basicPublish(EXCHANGE_NAME, "routingKey", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
}
}
RabbitMQ实现(伪)代码示例—消费者:
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitMQConsumer {
private final static String EXCHANGE_NAME = "test_exchange";
public static void main(String[] argv) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost"); // 设置RabbitMQ服务的地址
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
// 声明队列并绑定到交换机
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "routingKey");
System.out.println(" [*] 正在等待消息。要退出,请按CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
}
Tags:basicconsume