什么是消息中间件?在高并发的应用场景中,由于来不及同步处理请求,接收到的请求往往会发生阻塞。例如,大量的新增,更新请求同时到达数据库,这会导致行或表被锁住,最后会因为请求堆积过多,造成连接数过多(too many connections)的异常或者time out异常。
因此,在高并发的应用场景中,需要一个缓冲机制,而消息队列则可以很好地充当这个角色,通过异步处理请求来削峰填谷,缓解系统的压力。
消息队列可以简单理解为:把要传输的数据放在队列中,拥有先进先出(FIFO)的特性。它主要用于不同进程或线程之间的通信,用来处理一系列的输入请求。
消息队列采用异步通信机制。消息的发送者和接受者无须同时与消息队列进行数据交换,消息会一直保存在队列中,直至被接收者读取。
消息(Message)是指应用间传递的数据。消息可以非常简单,比如只包含文本字符串、JSON等,也可以很复杂,比如内嵌对象。
消息队列中间件(Message Queue Middleware,简称MQ)是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。
目前开源的消息中间件有很多,比如主流的有RabbitMQ、Kafka、ActiveMQ、RocketMQ等。面向消息的中间件(简称MOM,Message Oriented Middleware)提供了以松散耦合的灵活方式集成应用程序的一种机制。
它们提供了基于存储和转发的应用程序之间的异步数据发送,即应用程序彼此不直接通信,而是与作为中介的消息中间通信。消息中间件提供了有保证的消息发送,应用程序开发人员无须了解远程过程调用(RPC)和网络通信协议的细节。
消息中间件适用于需要可靠的数据传送的分布式环境。采用消息中间件的系统中,不同的对象之间通过传递消息来激活对方的事件,以完成相应的操作。发送者将消息发送给消息服务器,消息服务器将消息存放在若干队列中,在适合的时候再将消息转发给接收者。消息中间件能在不同平台之间通信,它常被用来屏蔽各种平台及协议之间的特性,实现应用程序之间的协同,其优点在于能够在客户和服务器之间提供同步和异步的连接,并且在任何时刻都可以将消息进行传递或者存储转发,这也是它比远程过程调用更进步的原因。
消息中间件有哪些主要作用?消息中间凭借其独有的特性,在不同的应用场景下可以展现不同的作用。总的来说,消息中间件的作用可以概括如下:
解耦:在项目启动之初来预测将来会碰到什么需求是极其困难的。消息中间件在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口,这允许你独立扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束即可。
冗余(存储):有些情况下,处理数据的过程会失败。消息中间件可以把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。在把一个消息从消息中间件中删除之前,需要你的处理系统明确指出该消息已经被处理完成,从而确保你的数据被安全地保存直到你使用完毕。
扩展性:因为消息中间件解耦了应用的处理过程,所以提高消息入队和处理的效率是很容易的,只要另外增加处理过程即可,不需要改变代码,也不需要调节参数。
削峰:在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果以能处理这类峰值为标准而投入资源,无疑是巨大的浪费。使用消息中间件能够使关键组织支撑突发访问压力,不会因为突发的超负荷请求而完全崩溃。
可恢复性:当系统一部分组件失效时,不会影响到整个系统。消息中间件降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入消息中间件中的消息仍然可以在系统恢复后进行处理。
顺序保证:在大多数使用场景下,数据处理的顺序很重要,大部分消息中间件支持一定程度上的顺序性。
缓冲:在任何重要的系统中,都会存在需要不同处理时间的元素。消息中间件通过一个缓冲层来帮助任务最高效率地执行,写入消息中间件的处理会尽可能快速。该缓冲层有助于控制和优化数据流经过系统的速度。
异步通信:在很多时候应用不想也不需要立即处理消息。消息中间件提供了异步处理机制,允许应用把一些消息放入消息中间件中,但并不立即处理它,在之后需要的时候再慢慢处理。
什么是AMQP协议?AMQP(Advanced Message Queuing Protocol)是高级消息队列协议,2004年JPMorgan Chase(摩根大通集团)联合其他公司共同设计。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。它支持符合要求的客户端应用(application)和消息中间件代理(messaging middleware broker)之间进行通信。
消息代理(message brokers)从发布者(publishers)亦称生产者(producers)那儿接收消息,并根据既定的路由规则把接收到的消息发送给处理消息的消费者(consumers)。
由于AMQP是一个网络协议,所以这个过程中的发布者,消费者,消息代理可以存在于不同的设备上。
特性:事物支持、持久化支持,出身金融行业,在可靠性消息处理上具备天然的优势。
通过合理使用消息队列(Messaging Queue)可大幅降低网络系统架构的耦合度和复杂度,让各集成部件拥有更灵活的自主弹性。同时异步处理机制在高性能和高可靠性上也有极佳的表现,是一种比较理想的集成解决方案。
AMQP当中有四个概念非常重要:虚拟主机(virtual host),交换机(exchange),队列(queue)和绑定(binding)。
虚拟机: 通常是应用的外在边界,我们可以为不同的虚拟机分配访问权限。虚拟机可持有多个交换机、队列和绑定。
交换机: 从连接通道(Channel)接收消息,并按照特定的路由规则发送给队列。
队列: 消息最终的存储容器,直到消费客户端(Consumer)将其取走。
绑定: 也就是所谓的路由规则,告诉交换机将何种类型的消息发送到某个队列中。
AMQP 0.9.1是一种可编程的协议,也就说由应用程序本身定义AMQP 实体和路由方案,而不是Broker管理员来定义。因此,应用程序可以定义协议具体操作,如声明队列(queue)和交换机(exchange)的操作,定义它们之间的绑定(binding),订阅队列(queue)等。
交换机(exchange)是被发送消息的AMQP实体,然后交换机发送和路由(route)消息到一个或多个(或没有)队列(queue),使用的路由算法取决于交换类型(exchange type)和规则称为绑定(binding)。
AMQP 0.9.1 Broker通常提供 4 交换类型(exchange types):
Direct exchange (通常用于一对一的通信或单播)
Fanout exchange (一对多的通信或广播)
Topic exchange (一对多或多对多的通信,多播)
Headers exchange (基于元数据的消息路由)
但它是可以扩展 自定义交换类型的AMQP 0.9.1 Broker,如:
? x-random exchange (随机选择路由接收到的消息队列)
? x-recent-history (一种Fanout exchange,在内存中保留N条最近的消息)
? 基于Headers exchange各类正则表达式的变化
AMQP连接(connections)通常使用长活动连接,使用可靠传输的TCP应用层协议。并且可以使用身份认证和TSL(SSL)来保护连接。当应用程序不再需要连接到AMQP代理(broker)时,就可以关闭AMQP连接(connections),而不用突然关闭基础的TCP连接。
AMQP 0.9.1部署等级,AMQP 范围涵盖不同级别的等级,大致如下:
开发人员/临时使用:1台服务器(broker),1个用户,10个消息队列(queue),每秒1条消息。
生产应用:2台服务器(broker),10到100个用户,10到50个消息队列(queue),每秒10条消息(每小时3万6千条消息)。
部门级任务关键型应用:4台服务器(broker),100到500个用户,50到100个消息队列(queue),每秒100条消息(每小时36万条消息)。
区域级的任务关键性应用:16台服务器(broker),500到2000个用户,100到500个消息队列(queue)和主题(topic),每秒1千条消息(每小时三百六十万条消息)。
全球性的任务关键性应用: 64台服务器(broker),2千到1万个用户,500到1000个消息队列(queue)和主题(topic),每秒1万条消息(每小时三千六百万条消息)。
市场数据(交易)应用:200台服务器(broker),5千个用户,1万条主题(topic),每秒10万条消息(每小时三亿六千万条消息)。
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端。用于在分布式系统总存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特点包括:
1.可靠性(Reliability)
RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。
2.灵活的路由(Flexible Routing)
在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange。
3.消息集群(Clustering)
多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。
4.高可用(Highly Available Queues)
队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。
5.多种协议(Multi-protocol)
RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。
6.多语言客户端(Many Clients)
RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。
7.管理界面(Management UI)
RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。
8.跟踪机制(Tracing)
如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。
9.插件机制(Plugin System)
RabbitMQ 提供了许多插件,可以从多方面进行扩展,也可以编写自己的插件。
RabbitMQ目前已支持了AMQP 0.8/0.9/0.9.1协议,最新的AMQP 1.0协议支持正在研发中,RabbitMQ默认使用AMQP 0.9.1协议。
基于RabbitMQ的消息常用应用场景:
1.Point-to-point communication 点对点通信
2.One-to-many broadcasting (including multicast) 广播,包括多播
3.High-speed transient message flows 高速消息传输流
4.Reliable persistent message delivery 可靠的持久消息交付
5.Store-and-forward 存储和发送
6.Wide area messaging 广域网信息通讯
7.File streaming 文件流
RabbitMQ的应用场景:
1. 异步处理
1.1 适用于对业务逻辑没有影响的异常处理,比如发送邮件,通知等
1.2 适用于无需即时返回结果的场景,比如EAP Client 与EAP Server间的通信
2. 同步处理
适用于需即时返回结果的场景,比如移动端请求业务,车间移动看板
3. 多消费端
Queues 竞争执行可适用于多个client 处理一项共同业务的场景
4. 应用解耦
降低系统间的高耦合,适用于系统间的数据同步等场景
5. 流量削峰
适用于短时间内高流量的应用场景。利用消息队列的大小控制请求消息上限,避免处理应用被压垮。
RabbitMQ的7种使用方式:
1.发送/接收
2. Queues(竞争执行)
3.Publish/Subscribe (发布/订阅)
4.Routing(按密钥路由,选择消息)
5. Topics
6. RPC(同步模式)
7.Publisher Confirms
RabbitMQ的工作模型:
过程一:从生产者发送消息到RabbitMQ服务器的过程
过程二:确保消息从交换机路由到队列
过程三:确保消息在队列中正确的存储
过程四:确保消息从队列正确地投递到消费者
消息本身也要做持久化操作。
延迟消息的两种实现方式:
老办法:过期消息+死信队列
新办法:rabbitmq_delayed_message_exchange插件
插件安装
1.下载插件:https://www.rabbitmq.com/community-plugins.html
2.解压到 rabbitmq的插件目录 plugins/
3.启用插件: rabbitmq-plugins enable rabbitmq_delayed_message_exchange
RabbitMQ持久化机制:
Rabbitmq的持久化分为队列持久化、消息持久化和交换器持久化。
不管是持久化的消息还是非持久化的消息都可以被写入到磁盘里。
RabbitMQ如何保证消息的可靠性投递?目前来说,现在有两种方案实施:数据库持久化方案、消息延迟投递方案。
1、数据库持久化方案
流程:
1.将业务订单数据和生成的Message进行持久化操作(一般情况下插入数据库,这里如果分库的话可能涉及到分布式事务)
2.将Message发送到Broker服务器中
3.通过RabbitMQ的Confirm机制,在producer端,监听服务器是否ACK
4.如果ACK了,就将Message这条数据状态更新为已发送。如果失败,修改为失败状态。
5.分布式定时任务查询数据库3分钟(这个具体时间应该根据时效性来定)之前的发送失败的消息
6.重新发送消息,记录发送次数
7.如果发送次数过多仍然失败,那么就需要人工排查之类的操作。
优点:能够保证消息百分百不丢失
缺点:第一步中涉及到分布式事务问题,分布式事务一点会降低时效性
2、消息延迟投递方案
流程:
流程图中,颜色不同的代表不同的message
1.将业务订单持久化
2.发送一条Message到broker(称之为主Message),再发送相同的一条到不同的队列或者交换机(这条称为确认Message)中
3.主Message由实际业务处理端消费后,生成一条响应Message。之前的确认Message由Message Service应用处理入库。
4.4~6.实际业务处理发送的确认Message由Message Service接收后,将原Message状态修改。
7.如果该条Message没有被确认,则通过RPC调用重新由producer进行全过程。
优点:相对于数据库持久化方案来说响应速度有所提升
缺点:
1.系统复杂性有点高
2.万一两条消息有失败了,消息存在丢失情况,仍需Confirm机制做补偿
RabbitMQ有一个基本的吞吐量测试工具PerfTest(文档,源代码和版本),它基于Java客户端,可以配置为模拟基本工作负载。
本文就是利用PerfTest进行RabbitMQ性能测试,测试环境为Linux下的两台集群的RabbitMQ服务器。
RabbitMQ性能测试用例:
1、以下命令运行 PerfTest,其中有一个没有发布者确认的发布者、两个使用自动确认模式的消费者(每个接收每条消息的副本)和一个名为“throughput-test-x1-y2”的队列。出版商将尽快发布,没有任何速率限制。结果将以“test1”为前缀,以便于识别和比较:
runjava com.rabbitmq.perf.PerfTest -h amqp://username:password@IP:5670 -x 1 -y 2 -u "throughput-test-1" -a --id "test 1“
2、此修改将使用 2 个发布者和 4 个消费者,如果机器和 RabbitMQ 节点上有足够的 CPU 内核,通常会产生更高的吞吐量:
runjava com.rabbitmq.perf.PerfTest -h amqp://username:password@IP:5670 -x 2 -y 4 -u "throughput-test-2" -a --id "test 2"
3、此修改将使用 2 个发布者和 4 个消费者,消息大小从默认值(12 字节)更改为 4 kB:
runjava com.rabbitmq.perf.PerfTest -h amqp://username:password@IP:5670 -x 10 -y 20 -u "throughput-test-4" -a --id "test 4" -s 4000
4、此修改将使用 2 个发布者和 4 个消费者,消息大小从默认值(12 字节)更改为 40 kB:
runjava com.rabbitmq.perf.PerfTest -h amqp://username:password@IP:5670 -x 10 -y 20 -u "throughput-test-4" -a --id "test 4" -s 40000
5、此修改将使用 2 个发布者和 4 个消费者,消息大小从默认值(12 字节)更改为 400 kB:
runjava com.rabbitmq.perf.PerfTest -h amqp://username:password@IP:5670 -x 10 -y 20 -u "throughput-test-4" -a --id "test 4" -s 400000
6、此修改将使用 2 个发布者和 4 个消费者,消息大小从默认值(12 字节)更改为 1000 kB:
runjava com.rabbitmq.perf.PerfTest -h amqp://username:password@IP:5670 -x 10 -y 20 -u "throughput-test-4" -a --id "test 4" -s 1000000
7、此修改将使用 2 个发布者和 4 个消费者,消息大小从默认值(12 字节)更改为 4000 kB:
runjava com.rabbitmq.perf.PerfTest -h amqp://username:password@IP:5670 -x 10 -y 20 -u "throughput-test-4" -a --id "test 4" -s 4000000
8、以下命令行启动第一个 PerfTest 进程,该进程创建 500 个队列(从perf-test-1到perf-test-500)。每个队列将有 3 个消费者和 1 个生产者向其发送消息:
runjava com.rabbitmq.perf.PerfTest -h amqp://username:password@IP:5670 --queue-pattern 'perf-test-%d' --queue-pattern-from 1 --queue-pattern-to 500 --producers 500 --consumers 1500
9、为了防止发布者大致同时发布并更均匀地分配速率,请使用--producer-random-start-delay选项在第一条发布消息之前添加随机延迟:
runjava com.rabbitmq.perf.PerfTest -h amqp://username:password@IP:5670 --queue-pattern 'perf-test-%d' --queue-pattern-from 1 --queue-pattern-to 1000 --producers 1000 --consumers 1000 --heartbeat-sender-threads 10 --publishing-interval 5 --producer-random-start-delay 120
使用上面的命令,每个发布者都会以 1 到 120 秒之间的随机延迟开始。
RabbitMQ性能测试结果如下:
从性能测试结果来看,平均读取率200/S,两台集群性能较好。
如果您喜欢这篇文章,请关注我,收藏、点赞、评论和转发,会让更多需要帮助的人看到这篇文章。举手之劳,善莫大焉!
Tags:rabbitmq管理页面地址