MySQL, Oracle, Linux, 软件架构及大数据技术知识分享平台

网站首页 > 精选文章 / 正文

如何理解RabbitMQ 的六种工作模式

2025-02-06 15:03 huorong 精选文章 4 ℃ 0 评论

整体的架构图,并介绍一些基本概念:

  • channel: 信道是生产者,消费者和 RabbitMQ 通信的渠道,是建立在 TCP 连接上的虚拟连接。一个 TCP 连接上可以建立成百上千个信道,通过这种方式,可以减少系统开销,提高性能。
  • Broker: 接收客户端连接,实现 AMQP 协议的消息队列和路由功能的进程。
  • Virtual Host: 虚拟主机的概念,类似权限控制组,一个 Virtual Host 里可以有多个 Exchange 和 Queue,权限控制的最小粒度是 Virtual Host。
  • Exchange: 交换机,接收生产者发送的消息,并根据 Routing Key 将消息路由到服务器中的队列 Queue。
  • ExchangeType: 交换机类型决定了路由消息的行为,RabbitMQ 中有三种 Exchange 类型,分别是 direct、fanout、topic。
  • Message Queue: 消息队列,用于存储还未被消费者消费的消息,由 Header 和 body 组成。Header 是由生产者添加的各种属性的集合,包括 Message 是否被持久化、优先级是多少、由哪个 Message Queue 接收等,body 是真正需要发送的数据内容。
  • BindingKey: 绑定关键字,将一个特定的 Exchange 和一个特定的 Queue 绑定起来。

简单模式


说明: 最简单的一对一模式,一个生产者,一个消费者,这个没什么可多说的。

生产者代码SimpleRabbitMQProducer.py:

# -*- coding: utf-8 -*-
# !/usr/bin/python3
# author by : AlgorithmSecret
# wechat  by: 雨夜的博客

import pika

# 生产消费者模型,简单模式,一对一,一个消费者、一个生产者
credentials = pika.PlainCredentials('dvs', 'dvs')
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='127.0.0.1', credentials=credentials, virtual_host="dvs",
                              port=5672))  # 链接rabbit服务器(localhost是本机,如果是其他服务器请修改为ip地址)

channel = connection.channel()  # 创建频道

# 创建队列,使用durable方法
channel.queue_declare(queue='hello')  # 简单模式,一对一,一个消费者、一个生产者

channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')

print(" [x] Sent 'Hello World!'")

connection.close()

执行完代码之后,通过管理控制台可以看到,已经有一个叫 hello 的队列了,而且里面有一条消息,就是我们刚才发送过去的。

消费者SimpleRabbitMQConsumer.py代码:

# -*- coding: utf-8 -*-
# !/usr/bin/python3
# author by : AlgorithmSecret
# wechat  by: 雨夜的博客

import pika

# 消费者模型, 简单模式,一对一,一个消费者、一个生产者
credentials = pika.PlainCredentials('dvs', 'dvs')
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='127.0.0.1', credentials=credentials, virtual_host="dvs",
                              port=5672))
# 创建频道
channel = connection.channel()
channel.queue_declare(queue='hello')  # 简单模式,一对一,一个消费者、一个生产者


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)


channel.basic_consume("hello", callback, True)  # 消息接收

print(' [*] Waiting for messages. To exit press CTRL+C')

channel.start_consuming()  # 保持一直监听的状态

工作队列模式 Work Queues


说明:一对多模式,一个生产者,多个消费者,一个队列,每个消费者从队列中获取唯一的消息。

有两种消息分发机制,轮询分发和公平分发:

轮询分发的特点是将消息轮流发送给每个消费者,在实际情况中,多个消费者,难免有的处理得快,有的处理得慢,如果都要等到一个消费者处理完,才把消息发送给下一个消费者,效率就大大降低了。

而公平分发的特点是,只要有消费者处理完,就会把消息发送给目前空闲的消费者,这样就提高消费效率了。

生产者WorkRabbitMQProducer.py代码:

# -*- coding: utf-8 -*-
# !/usr/bin/python3
# author by : AlgorithmSecret
# wechat  by: 雨夜的博客

import pika
import sys

# 生产消费者模型,一对多模式,一个生产者,多个消费者,一个队列,每个消费者从队列中获取唯一的消息。
credentials = pika.PlainCredentials('dvs', 'dvs')
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='127.0.0.1', credentials=credentials, virtual_host="dvs",
                              port=5672))  # 链接rabbit服务器(localhost是本机,如果是其他服务器请修改为ip地址)

channel = connection.channel()  # 创建频道

# 创建队列,使用durable方法
channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"

channel.basic_publish(exchange='', routing_key='task_queue', body=message,
                      properties=pika.BasicProperties(
                          delivery_mode=2  # make message persistent
                      ))

print(" [x] Sent %r" % message)

connection.close()

消费者WorkRabbitMQConsumer.py代码:

# -*- coding: utf-8 -*-
# !/usr/bin/python3
# author by : AlgorithmSecret
# wechat  by: 雨夜的博客

import pika
import time

# 消费者模型, 一对多模式,一个生产者,多个消费者,一个队列,每个消费者从队列中获取唯一的消息。
credentials = pika.PlainCredentials('dvs', 'dvs')
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='127.0.0.1', credentials=credentials, virtual_host="dvs",
                              port=5672))
# 创建频道
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)


# 公平分发
channel.basic_qos(prefetch_count=1)
channel.basic_consume("task_queue", callback)  # 消息接收

channel.start_consuming()  # 保持一直监听的状态

发布/订阅模式 Publish/Subscribe


说明:生产者将消息发送给 broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。消费者监听自己的队列并进行消费。

生产者RabbitMQPublish.py代码:

# -*- coding: utf-8 -*-
# !/usr/bin/python3
# author by : AlgorithmSecret
# wechat  by: 雨夜的博客

import pika
import sys

# 生产消费者模型,发布/订阅模式

credentials = pika.PlainCredentials('dvs', 'dvs')
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='49.234.192.212', credentials=credentials, virtual_host="dvs",
                              port=5672))  # 链接rabbit服务器(localhost是本机,如果是其他服务器请修改为ip地址)

channel = connection.channel()  # 创建频道

channel.exchange_declare(exchange='logs', exchange_type='fanout')

message = ' '.join(sys.argv[1:]) or "Hello World!"

channel.basic_publish(exchange='logs', routing_key='', body=message)

print(" [x] Sent %r" % message)

connection.close()

消费者RabbitMQSubscribe.py代码:

# -*- coding: utf-8 -*-
# !/usr/bin/python3
# author by : AlgorithmSecret
# wechat  by: 雨夜的博客

import pika

# 消费者模型, 发布/订阅模式
credentials = pika.PlainCredentials('dvs', 'dvs')
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='49.234.192.212', credentials=credentials, virtual_host="dvs",
                              port=5672))
# 创建频道
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')

result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)

print(' [*] Waiting for messages. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)


channel.basic_consume(queue_name, callback, True)  # 消息接收

channel.start_consuming()  # 保持一直监听的状态

路由模式 Routing


说明:生产者将消息发送给 broker,由交换机根据 routing_key 分发到不同的消息队列,然后消费者同样根据 routing_key 来消费对应队列上的消息。

生产者
RoutingRabbitMQProducer.py代码:

# -*- coding: utf-8 -*-
# !/usr/bin/python3
# author by : AlgorithmSecret
# wechat  by: 雨夜的博客

import pika
import sys

# 生产消费者模型,路由模式 Routing
credentials = pika.PlainCredentials('dvs', 'dvs')
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='49.234.192.212', credentials=credentials, virtual_host="dvs",
                              port=5672))  # 链接rabbit服务器(localhost是本机,如果是其他服务器请修改为ip地址)

channel = connection.channel()  # 创建频道

channel.exchange_declare(exchange='direct_logs',exchange_type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[1:]) or "Hello World!"

channel.basic_publish(
    exchange='direct_logs', routing_key=severity, body=message)

print(" [x] Sent %r" % message)

connection.close()

消费者
RoutingRabbitMQConsumer.py代码:

# -*- coding: utf-8 -*-
# !/usr/bin/python3
# author by : AlgorithmSecret
# wechat  by: 雨夜的博客

import pika
import sys

# 消费者模型,路由模式 Routing
credentials = pika.PlainCredentials('dvs', 'dvs')
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='49.234.192.212', credentials=credentials, virtual_host="dvs",
                              port=5672))
# 创建频道
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(
        exchange='direct_logs', queue=queue_name, routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))


channel.basic_consume(queue_name, callback, True)  # 消息接收

channel.start_consuming()  # 保持一直监听的状态

主题模式 Topics


说明:其实,主题模式应该算是路由模式的一种,也是通过 routing_key 来分发,只不过是 routing_key 支持了正则表达式,更加灵活。

生产者TopicsRabbitMQProducer.py代码:

# -*- coding: utf-8 -*-
# !/usr/bin/python3
# author by : AlgorithmSecret
# wechat  by: 雨夜的博客

import pika
import sys

# 生产消费者模型,主题模式 Topics
credentials = pika.PlainCredentials('dvs', 'dvs')
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='49.234.192.212', credentials=credentials, virtual_host="dvs",
                              port=5672))  # 链接rabbit服务器(localhost是本机,如果是其他服务器请修改为ip地址)

channel = connection.channel()  # 创建频道

channel.exchange_declare(exchange='topic_logs',exchange_type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
message = ' '.join(sys.argv[1:]) or "Hello World!"

channel.basic_publish(
    exchange='topic_logs', routing_key=routing_key, body=message)

print(" [x] Sent %r:%r" % (routing_key, message))

connection.close()

消费者TopicsRabbitMQConsumer.py代码:

# -*- coding: utf-8 -*-
# !/usr/bin/python3
# author by : AlgorithmSecret
# wechat  by: 雨夜的博客

import pika
import sys

# 消费者模型,主题模式 Topics
credentials = pika.PlainCredentials('dvs', 'dvs')
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='49.234.192.212', credentials=credentials, virtual_host="dvs",
                              port=5672))
# 创建频道
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in binding_keys:
    channel.queue_bind(
        exchange='direct_logs', queue=queue_name, routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.binding_keys, body))


channel.basic_consume(queue_name, callback, True)  # 消息接收

channel.start_consuming()  # 保持一直监听的状态

RPC 模式 RPC


说明:通过消息队列来实现 RPC 功能,客户端发送消息到消费队列,消息内容其实就是服务端执行需要的参数,服务端消费消息内容,执行程序,然后将结果返回给客户端。

生产者RPCRabbitMQProducer.py代码:

# -*- coding: utf-8 -*-
# !/usr/bin/python3
# author by : AlgorithmSecret
# wechat  by: 雨夜的博客

import pika
import sys
import uuid


class FibonacciRpcClient(object):
    def __init__(self):
        self.credentials = pika.PlainCredentials('dvs', 'dvs')
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host='49.234.192.212', credentials=self.credentials, virtual_host="dvs",
                                      port=5672))  # 链接rabbit服务器(localhost是本机,如果是其他服务器请修改为ip地址)

        self.channel = self.connection.channel()  # 创建频道

        result = self.channel.queue_declare(queue='', exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(
            queue=self.callback_queue,
            on_message_callback=self.on_response,
            auto_ack=True)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(
            exchange='',
            routing_key='rpc_queue',
            properties=pika.BasicProperties(
                reply_to=self.callback_queue,
                correlation_id=self.corr_id,
            ),
            body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)


fibonacci_rpc = FibonacciRpcClient()

print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)

消费者RPCRabbitMQConsumer.py代码:

# -*- coding: utf-8 -*-
# !/usr/bin/python3
# author by : AlgorithmSecret
# wechat  by: 雨夜的博客

import pika
import sys

# 消费者模型,主题模式 Topics
credentials = pika.PlainCredentials('dvs', 'dvs')
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='49.234.192.212', credentials=credentials, virtual_host="dvs",
                              port=5672))
# 创建频道
channel = connection.channel()

result = channel.queue_declare(queue='rpc_queue')


def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)


def on_request(ch, method, props, body):
    n = int(body)

    print(" [.] fib(%s)" % n)
    response = fib(n)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id=props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)

print(" [x] Awaiting RPC requests")
channel.start_consuming()

总结了常用的RabbitMQ六种工作模式、如何像了解很多、可以去官网阅读很多文档。

Tags:basicconsume

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言