MySQL, Oracle, Linux, 软件架构及大数据技术知识分享平台

网站首页 > 精选文章 / 正文

python散装笔记——127: 使用AMQPStorm入门RabbitMQ

2025-03-13 15:57 huorong 精选文章 2 ℃ 0 评论

1: 如何从RabbitMQ消费消息

首先导入所需的库。

 from amqpstorm import Connection

在消费消息时,我们首先需要定义一个函数来处理接收到的消息。这可以是任何可调用的函数,并且需要接收一个消息对象,或者一个消息元组(具体取决于在start_consuming中定义的to_tuple参数)。

除了处理接收到的消息数据之外,我们还需要确认或拒绝消息。这是非常重要的,因为我们需要让RabbitMQ知道我们已经正确接收并处理了消息。

 def on_message(message):
   """当接收到消息时调用此函数。
     
   :param message: 接收到的消息。
   :return:
   """
   print("Message:", message.body)
   
   # 确认我们已成功处理消息。
   message.ack()
   
   # 拒绝消息。
   # message.reject()
   
   # 拒绝消息,并将其重新放回队列。
   # message.reject(requeue=True)

接下来,我们需要建立与RabbitMQ服务器的连接。

 connection = Connection('127.0.0.1', 'guest', 'guest')

之后,我们需要设置一个通道。每个连接可以有多个通道,一般来说,在执行多线程任务时,建议(但不是必须)为每个线程分配一个通道。

 channel = connection.channel()

设置好通道之后,我们需要告知RabbitMQ我们希望开始消费消息。在这个例子中,我们将使用之前定义的on_message函数来处理所有消费的消息。

我们将要监听的RabbitMQ服务器上的队列名称为simple_queue,并且我们还告知RabbitMQ我们将在处理完所有接收到的消息后确认它们。

 channel.basic.consume(callback=on_message, queue='simple_queue', no_ack=False)

最后,我们需要启动IO循环以开始处理RabbitMQ服务器传递的消息。

 channel.start_consuming(to_tuple=False)

2: 如何向RabbitMQ发布消息

首先导入所需的库。

 from amqpstorm import Connection
 from amqpstorm import Message

接下来,我们需要建立与RabbitMQ服务器的连接。

 connection = Connection('127.0.0.1', 'guest', 'guest')

之后,我们需要设置一个通道。每个连接可以有多个通道,一般来说,在执行多线程任务时,建议(但不是必须)为每个线程分配一个通道。

 channel = connection.channel()

设置好通道之后,我们可以开始准备我们的消息。

 # 消息属性。
 properties = {
   'content_type': 'text/plain',
   'headers': {'key': 'value'}
 }
 
 # 创建消息。
 message = Message.create(channel=channel, body='Hello World!', properties=properties)

现在,我们可以通过调用publish并提供一个routing_key来发布消息。在这个例子中,我们将消息发送到名为simple_queue的队列。

 message.publish(routing_key='simple_queue')

3: 如何在RabbitMQ中创建延迟队列

首先,我们需要设置两个基本的通道,一个用于主队列,另一个用于延迟队列。在我的示例中,我包含了一些额外的标志(虽然不是必需的,但可以使代码更加可靠),例如确认投递、投递模式和持久性。您可以在RabbitMQ手册中找到更多关于这些内容的信息。

设置好通道之后,我们向主通道添加一个绑定,以便我们可以从延迟通道发送消息到主队列。

 channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')

接下来,我们需要配置延迟通道,以便在消息过期后将其转发到主队列。

 delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
   'x-message-ttl': 5000,
   'x-dead-letter-exchange': 'amq.direct',
   'x-dead-letter-routing-key': 'hello'
 })
  • x-message-ttl(消息存活时间)
  • 通常用于在队列中自动删除过期的消息,但通过添加两个可选参数,我们可以改变这种行为,而是让此参数决定消息在延迟队列中停留的时间(以毫秒为单位)。
  • x-dead-letter-routing-key
  • 此变量允许我们在消息过期后将其转移到另一个队列,而不是默认的行为——完全删除消息。
  • x-dead-letter-exchange
  • 此变量决定了用于将消息从hello_delay队列转移到hello队列的交换机。

向延迟队列发布消息

设置好所有基本的Pika参数后,您只需使用basic.publish将消息发送到延迟队列即可。

 delay_channel.basic.publish(exchange='',
                             routing_key='hello_delay',
                             body='test',
                             properties={'delivery_mod': 2})

执行脚本后,您应该在RabbitMQ管理模块中看到以下队列被创建。

示例代码。

 from amqpstorm import Connection
 
 connection = Connection('127.0.0.1', 'guest', 'guest')
 
 # 创建普通的“Hello World”类型的通道。
 channel = connection.channel()
 channel.confirm_deliveries()
 channel.queue.declare(queue='hello', durable=True)
 
 # 我们需要将此通道绑定到一个交换机,该交换机将用于从延迟队列传输消息。
 channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')
 
 # 创建延迟通道。
 delay_channel = connection.channel()
 delay_channel.confirm_deliveries()
 
 # 在这里我们声明延迟时间和延迟通道的路由。
 delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
   'x-message-ttl': 5000, # 消息传输前的延迟时间(以毫秒为单位)。
   'x-dead-letter-exchange': 'amq.direct', # 用于从A到B传输消息的交换机。
   'x-dead-letter-routing-key': 'hello' # 我们希望消息被传输到的队列名称。
 })
 
 delay_channel.basic.publish(exchange='',
                             routing_key='hello_delay',
                             body='test',
                             properties={'delivery_mode': 2})
 
 print("[x] 已发送")

Tags:rabbitmq创建用户

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言