MySQL, Oracle, Linux, 软件架构及大数据技术知识分享平台

网站首页 > 精选文章 / 正文

rabbitmq消费者ACK机制message acknowledgment

2024-12-23 10:37 huorong 精选文章 6 ℃ 0 评论

一、队列关于ACK的设置

消费者在声明队列时,可以指定noAck参数,当noAck=false时,RabbitMQ会等待消费者显式发回ack信号后才从内存(和磁盘,如果是持久化消息的话)中移去消息。否则,RabbitMQ会在队列中消息被消费后立即删除它。

二、消费端关于ACK的设置

RabbitMQ不会为未ack的消息设置超时时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开。这么设计的原因是RabbitMQ允许消费者消费一条消息的时间可以很久很久。

这个在channe.basicConsume方法中设置自动ACK还是手动ACK

1.自动 acknowledgement 模式

发后即忘

只要rabbitmq投递了消息给消费者,就认为投递成功,不管消费者是否真的收到了

这样可能会丢消息

channel.basicConsume(queueName,true,consumerB);

2.手动 acknowledgement 模式

rabbitmq投递了消息给消费者,需要消费者返回ACK,没有超时限制。

如果消费者的 TCP 连接 或者 channel 关闭了,导致这条消息没有被 acked,RabbitMQ 会自动把当前消息重新入队,再次投递。

重点!:如果消费者没发送ACK,没有超时限制,rabbitmq也不会重复投递,只有消费者的 TCP 连接 或者 channel 关闭了,rabbitmq才会重复投递!

这样可能出现重复投递消息,消费端需要做好幂等性。

//设置一个监听器监听消费消息

Consumer consumerB = new DefaultConsumer(channel){

@Override

public void handleDelivery(String consumerTag, Envelope envelope,

AMQP.BasicProperties properties, byte[] body)

throws IOException {

String message = new String(body,"UTF-8");

System.out.println("Accept:"+envelope.getRoutingKey()+":"+message);

//消费者自行确认

this.getChannel().basicAck(envelope.getDeliveryTag(),false);

}

};


//消费者自行确认:autoAck参数为false

channel.basicConsume(queueName,false,consumerB);

其中可能用到的方法和入参的解释

方法:

channel.basicAck(long deliveryTag, boolean multiple) throws IOException;

代表消费者确认收到当前消息,语义上表示消费者成功处理了当前消息。

如果消费者没发送ACK,没有超时限制,rabbitmq也不会重复投递,只有消费者的 TCP 连接 或者 channel 关闭了,rabbitmq才会重复投递!

channel.basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException

代表消费者拒绝一条或者多条消息。basicNack 算是 basicReject 的一个扩展,因为 basicReject 不能一次拒绝多条消息。

channel.basicReject(long deliveryTag, boolean requeue) throws IOException;

代表消费者拒绝这条消息,语义上表示消费者没有处理当前消息。

对于 basicNack 和 basicReject ,如果参数 boolean requeue 传入 false,消息还是会从队列里面删除。

入参:

deliveryTag

当消费者向RabbitMQ注册后,RabbitMQ使用basic.deliver向消费者投递消息时,消息体上会带上delivery tag,这个值会唯一标识本次投递,在同一通道上,此值是唯一的。delivery tag值有64位长度,值从1开始,每发送一次消息值递增1,最大值为9223372036854775807。消费者端在应答消息时,带上此参数,告诉RabbitMQ某次投递已经正确应答。

multiple

是否使用批量确认,如果此属性为true,则消息标签小于当前deliveryTag的所有消息都会被主动积极确认,不了解此属性最好使用false。

requeue

是否重新入队,如果此属性为true,消息会被重新放置回去对应队列(如果可能的话,会放回到原来的位置),如果此属性为false,消息直接被丢弃。

属性requeue如果设置为true,需要谨慎设计程序的逻辑,否则很有可能导致消息一直重复消费失败并且重复重新入队,表现为消费者线程出现死循环逻辑,耗尽服务器CPU资源。

Tags:rabbitmq查看消费情况

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言