网站首页 > 精选文章 / 正文
1: 如何从RabbitMQ消费消息
首先导入所需的库。
from amqpstorm import Connection
在消费消息时,我们首先需要定义一个函数来处理接收到的消息。这可以是任何可调用的函数,并且需要接收一个消息对象,或者一个消息元组(具体取决于在start_consuming中定义的to_tuple参数)。
除了处理接收到的消息数据之外,我们还需要确认或拒绝消息。这是非常重要的,因为我们需要让RabbitMQ知道我们已经正确接收并处理了消息。
def on_message(message):
"""当接收到消息时调用此函数。
:param message: 接收到的消息。
:return:
"""
print("Message:", message.body)
# 确认我们已成功处理消息。
message.ack()
# 拒绝消息。
# message.reject()
# 拒绝消息,并将其重新放回队列。
# message.reject(requeue=True)
接下来,我们需要建立与RabbitMQ服务器的连接。
connection = Connection('127.0.0.1', 'guest', 'guest')
之后,我们需要设置一个通道。每个连接可以有多个通道,一般来说,在执行多线程任务时,建议(但不是必须)为每个线程分配一个通道。
channel = connection.channel()
设置好通道之后,我们需要告知RabbitMQ我们希望开始消费消息。在这个例子中,我们将使用之前定义的on_message函数来处理所有消费的消息。
我们将要监听的RabbitMQ服务器上的队列名称为simple_queue,并且我们还告知RabbitMQ我们将在处理完所有接收到的消息后确认它们。
channel.basic.consume(callback=on_message, queue='simple_queue', no_ack=False)
最后,我们需要启动IO循环以开始处理RabbitMQ服务器传递的消息。
channel.start_consuming(to_tuple=False)
2: 如何向RabbitMQ发布消息
首先导入所需的库。
from amqpstorm import Connection
from amqpstorm import Message
接下来,我们需要建立与RabbitMQ服务器的连接。
connection = Connection('127.0.0.1', 'guest', 'guest')
之后,我们需要设置一个通道。每个连接可以有多个通道,一般来说,在执行多线程任务时,建议(但不是必须)为每个线程分配一个通道。
channel = connection.channel()
设置好通道之后,我们可以开始准备我们的消息。
# 消息属性。
properties = {
'content_type': 'text/plain',
'headers': {'key': 'value'}
}
# 创建消息。
message = Message.create(channel=channel, body='Hello World!', properties=properties)
现在,我们可以通过调用publish并提供一个routing_key来发布消息。在这个例子中,我们将消息发送到名为simple_queue的队列。
message.publish(routing_key='simple_queue')
3: 如何在RabbitMQ中创建延迟队列
首先,我们需要设置两个基本的通道,一个用于主队列,另一个用于延迟队列。在我的示例中,我包含了一些额外的标志(虽然不是必需的,但可以使代码更加可靠),例如确认投递、投递模式和持久性。您可以在RabbitMQ手册中找到更多关于这些内容的信息。
设置好通道之后,我们向主通道添加一个绑定,以便我们可以从延迟通道发送消息到主队列。
channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')
接下来,我们需要配置延迟通道,以便在消息过期后将其转发到主队列。
delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
'x-message-ttl': 5000,
'x-dead-letter-exchange': 'amq.direct',
'x-dead-letter-routing-key': 'hello'
})
- x-message-ttl(消息存活时间)
- 通常用于在队列中自动删除过期的消息,但通过添加两个可选参数,我们可以改变这种行为,而是让此参数决定消息在延迟队列中停留的时间(以毫秒为单位)。
- x-dead-letter-routing-key
- 此变量允许我们在消息过期后将其转移到另一个队列,而不是默认的行为——完全删除消息。
- x-dead-letter-exchange
- 此变量决定了用于将消息从hello_delay队列转移到hello队列的交换机。
向延迟队列发布消息
设置好所有基本的Pika参数后,您只需使用basic.publish将消息发送到延迟队列即可。
delay_channel.basic.publish(exchange='',
routing_key='hello_delay',
body='test',
properties={'delivery_mod': 2})
执行脚本后,您应该在RabbitMQ管理模块中看到以下队列被创建。
示例代码。
from amqpstorm import Connection
connection = Connection('127.0.0.1', 'guest', 'guest')
# 创建普通的“Hello World”类型的通道。
channel = connection.channel()
channel.confirm_deliveries()
channel.queue.declare(queue='hello', durable=True)
# 我们需要将此通道绑定到一个交换机,该交换机将用于从延迟队列传输消息。
channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')
# 创建延迟通道。
delay_channel = connection.channel()
delay_channel.confirm_deliveries()
# 在这里我们声明延迟时间和延迟通道的路由。
delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
'x-message-ttl': 5000, # 消息传输前的延迟时间(以毫秒为单位)。
'x-dead-letter-exchange': 'amq.direct', # 用于从A到B传输消息的交换机。
'x-dead-letter-routing-key': 'hello' # 我们希望消息被传输到的队列名称。
})
delay_channel.basic.publish(exchange='',
routing_key='hello_delay',
body='test',
properties={'delivery_mode': 2})
print("[x] 已发送")
Tags:rabbitmq创建用户
猜你喜欢
- 2025-03-13 RabbitMQ面试题总结二
- 2025-03-13 RabbitMQ的安装
- 2025-03-13 RabbitMQ原理与架构
- 2025-03-13 详细介绍一下RabbitMQ的消息接收过程?
- 2025-03-13 超详细!!!Windows下安装RabbitMQ的步骤详解
- 2025-03-13 既然简历上写了RabbitMQ,就用这6道面试题来应付面试官
- 2025-03-13 RabbitMQ:死信队列 | 延迟插件 | 二合一用法+踩坑手记+最佳使用心得
- 2025-03-13 Springboot中使用RabbitMQ的实例(发布订阅模式、延时队列)