MySQL, Oracle, Linux, 软件架构及大数据技术知识分享平台

网站首页 > 精选文章 / 正文

消息队列黄金三剑客RabbitMQ、RocketMQ、Kafka的区别与应用场景

2025-02-06 15:03 huorong 精选文章 2 ℃ 0 评论

1、RabbitMQ的定义与主要特性(极简)

RabbitMQ是一个开源的消息代理软件,也称为消息队列(MQ),支持多种消息协议,包括AMQP(高级消息队列协议)。RabbitMQ由Erlang语言编写,提供了可靠性、消息持久性、集群支持和灵活的路由功能。

  • RabbitMQ是通过信道的方式进行传输数据,将消息发布至交换机上。
  • RabbitMQ常用的交换机(Exchange)有三种:fanout、direct、topic
  • RabbitMQ在吞吐方面略有不足,提供了更多的消息队列功能

2、RocketMQ的定义与主要特性(极简)

Apache RocketMQ是一个开源的分布式消息和流媒体平台,是阿里的开源产品。RocketMQ在Kafka的基础上进行了一些改进,被设计为一个低延迟、高性能、高可靠性和可扩展的系统,适用于大规模分布式系统

  • 订单,交易,binglog分发、充值等场景有良好的表现。
  • RocketMQ单机理论上支持1万以上持久化队列
  • RocketMQ并没有在核心实现中直接支持 JMS(Java 消息服务)接口。

3、Kafka的定义与主要特性(极简)

Apache Kafka是一个分布式流处理平台,其本质是一个开源的消息发布订阅系统

  • kafka的核心优势是高吞吐量(理论上每秒可以处理几十万条消息)与低延迟(毫秒级延迟)。
  • kafka使用Pull(拉取)方式消费消息,适用于大量的数据收集或传递场景(如日志的收集)。
  • Kafka提供了幂等生产者,确保消息在发送过程中不会因为网络或其他问题导致重复发送,从而避免消息重复。

4、RabbitMQ、RocketMQ、Kafka的横向对比

特性/中间件

Kafka

RabbitMQ

RocketMQ

1、设计目标

大规模实时数据流处理

企业级消息队列,支持多种消息传递模式

可靠消息传递,高性能,分布式队列

2、数据模型

主题-分区(Topic-Partition)

消息队列(Queue)、交换机(Exchange)、路由键(Routing Key)

主题(Topic)、队列(Queue)

3、主要特点

高吞吐量、低延迟、分布式日志系统

可靠性、灵活性、多种消息模式、事务支持

高吞吐量、低延迟、分布式事务支持、消息过滤

4、性能

高吞吐量,适合大数据场景

适中吞吐量,适合中小规模消息传递

高吞吐量,适合大规模消息传递

5、可扩展性

水平扩展,易于扩展

水平和垂直扩展

水平扩展,易于扩展

6、数据一致性

通过副本机制保证数据一致性

通过事务和确认机制保证数据一致性

通过事务消息和副本机制保证数据一致性

7、消息传递保证

至少一次、最多一次、精确一次

持久化、确认机制、死信队列

持久化、事务消息、死信队列

8、容错性

分区和副本机制

Master-Slave复制、镜像队列

主从复制、消息复制

5、RabbitMQ、RocketMQ、Kafka的应用场景对比

应用场景

Kafka

RabbitMQ

RocketMQ

日志处理

适合

不适合

适合

流处理

适合

不适合

适合

事件源

适合

适合

适合

消息队列

适合

适合

适合

系统解耦

适合

适合

适合

异步处理

适合

适合

适合

负载均衡

适合

适合

适合

优先级队列

不适合

适合

不适合

分布式事务

适合

不适合

适合

数据同步

适合

适合

适合

简要总结

以高吞吐量和低延迟著称,主要用于大数据处理、实时分析和事件驱动架构,如日志聚合和数据流平台。

适用于企业级应用,强调消息的可靠性和灵活性,常用于任务队列、消息传递和应用解耦。

专为大规模分布式系统设计,支持复杂场景如金融级分布式事务,适合日志收集、流处理和大规模消息传递。

6、三种消息队列代码实现思路(以RocketMQ为例)

RabbitMQ实现(伪)代码示例—生产者:

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitMQProducer {
private final static String EXCHANGE_NAME = "test_exchange";
public static void main(String[] argv) throws IOException, TimeoutException {
// 创建连接和通道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost"); // 设置RabbitMQ服务的地址
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
// 消息内容
String message = "Hello, RabbitMQ!";
// 将消息发送到交换机
channel.basicPublish(EXCHANGE_NAME, "routingKey", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
}
}

RabbitMQ实现(伪)代码示例—消费者:

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitMQConsumer {
private final static String EXCHANGE_NAME = "test_exchange";
public static void main(String[] argv) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost"); // 设置RabbitMQ服务的地址
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
// 声明队列并绑定到交换机
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "routingKey");
System.out.println(" [*] 正在等待消息。要退出,请按CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
}

Tags:basicconsume

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言