网站首页 > 精选文章 / 正文
Apache Flink 自身提供了 Kafka、RabbitMQ、ActiveMQ、Elasticsearch 等丰富的 connectors ,它们有些同时包含了 source 和 sink,有些只包含了 source 或 sink,这篇文章就来介绍一下 Flink 自带的 RabbitMQ connectors 的使用,我们把 RabbitMQ 同时当作 source 和 sink,把 RabbitMQ 队列里的数据通过 Flink 处理后再写入到 RabbitMQ 队列里。
在开始之前,首先要安装好 RabbitMQ,具体安装方法这里就跳过了,可以参照 RabbitMQ 官网文档来安装,然后手动创建 rabbitmq_connectors 和 rabbitmq_connectors_sink 这两个队列,队列也可以通过代码创建。
我们首先使用IDEA开发工具创建好项目,项目结构如图所示。
接下来添加 RabbitMQ connectors 的依赖,如下所示,Flink 和 Scala 的版本可以根据实际情况来指定,我这边使用的 Flink 和 Scala 的版本分别是 1.10.0 和 2.12。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-rabbitmq_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
接着开始编写主类,也就是程序的入口,具体实现如下图所示,这里涉及到的 RabbitMQ 的配置信息都写在了 application.properties 配置文件里,并通过自定义的 PropertyUtil 工具类来读取配置。
到这里,这个简单的 RabbitMQ connectors 的 demo 程序就已经开发好了,是不是非常的简单吖。接下来我们运行一下看看效果吧,然而,To my surprise,刚启动就失败了,提示“Job failed”,由于报错的 trace 有点多,这里就把关键报错信息展示一下吧,如下图所示。
很明显,提示 sink 队列的 durable 值不对,用过 RabbitMQ 的应该都知道 durable 这个配置,它指定了这个队列是否持久化,false 为不持久化,一般情况下我们在创建队列的时候都会指定 durable=true,也即持久化队列。可是刚刚在创建队列的时候,source 和 sink 队列的 durable 都是设置了 true,为什么运行的时候提示 sink 队列的 durable 是 false 呢?带着这个疑问去查看了一下 RMQSource 和 RMQSink 这两个类的源码,果然发现了问题所在,RMQSource 和 RMQSink 这两个类在声明队列的时候都给 durable 赋值了默认值,但是一个给了 true 一个给了 false,源码分别如下,queueDeclare() 方法的第二个参数指定了 durable 的值。
// RMQSource类
protected void setupQueue() throws IOException {
this.channel.queueDeclare(this.queueName, true, false, false, (Map)null);
}
// RMQSink类
protected void setupQueue() throws IOException {
if (this.queueName != null) {
this.channel.queueDeclare(this.queueName, false, false, false, (Map)null);
}
}
解决这个问题,我们只需要重写 RMQSink 类的 setupQueue() 方法即可,具体代码实现如下,然后把 main 方法里的 sink 对象改用成自定义的 RabbitMQSink 就可以了。
public class RabbitMQSink<IN> extends RMQSink<IN> {
public RabbitMQSink(RMQConnectionConfig rmqConnectionConfig, String queueName, SerializationSchema schema) {
super(rmqConnectionConfig, queueName, schema);
}
protected void setupQueue() throws IOException {
if (this.queueName != null) {
this.channel.queueDeclare(this.queueName, true, false, false, (Map) null);
}
}
}
接下来就运行程序看看效果吧,我们在 source 队列 rabbitmq_connectors 里随便 publish 一些数据,看看能否 sink 到队列 rabbitmq_connectors_sink 里,操作结果如下图所示。
到这里,这个 demo 案例就已经开发好了,我们实现了使用 Flink 读取 RabbitMQ 的队列消息并将该消息写入到另一个 RabbitMQ 的队列,完整的代码(包含 Java 和 Scala 两种语言开发)已经同步到了 GitHub。另外,在这个例子中也提到了一个解决问题的思路,开发过程中遇到问题的时候可以考虑从源码角度去寻找问题的本质并解决问题。
Tags:rabbitmq queue
猜你喜欢
- 2024-12-02 基于zabbix4.2 监控rabbitmq消息队列服务
- 2024-12-02 RabbitMQ实现延迟队列的两种方式
- 2024-12-02 消息队列选型(RabbitMq、RocketMq、Kafaka)
- 2024-12-02 mq的那些破事儿,你不好奇吗?
- 2024-12-02 RabbitMQ镜像队列集群搭建、与SpringBoot整合
- 2024-12-02 你真的知道怎么实现一个延迟队列吗?|腾讯开发工程师干货分享
- 2024-12-02 一口气整理一波延时队列实现方案
- 2024-12-02 RabbitMQ 延迟队列
- 2024-12-02 zabbix4.0基于Rabbitmq网页API获取队列信息
- 2024-12-02 RabbitMQ 实现延迟队列的两种方式