首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用RabbitMQ作为Flink DataStream源而不自动创建RabbitMQ队列

使用RabbitMQ作为Flink DataStream源而不自动创建RabbitMQ队列
EN

Stack Overflow用户
提问于 2018-05-13 14:10:23
回答 1查看 991关注 0票数 3

当我使用RabbitMQ作为Flink DataStream源代码时,正如Flink文档所说的那样。

代码语言:javascript
复制
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// checkpointing is required for exactly-once or at-least-once guarantees
env.enableCheckpointing(...);

final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
    .setHost("localhost")
    .setPort(5000)
    ...
    .build();

final DataStream<String> stream = env
    .addSource(new RMQSource<String>(
    connectionConfig,            // config for the RabbitMQ connection
    "queueName",                 // name of the RabbitMQ queue to consume
    true,                        // use correlation ids; can be false if only at-least-once is required
    new SimpleStringSchema()))   // deserialization schema to turn messages into Java objects
.setParallelism(1);              // non-parallel source is only required for exactly-once

这段代码将连接到RabbitMQ并自动创建队列"queueName".So --我遇到了一个问题。RabbitMQ队列已经存在,我在前面创建了它。我不想让Flink再创造一次。问题是Flink在没有参数的情况下创建队列,即与我之前创建的队列发生冲突。这是一个例外:

代码语言:javascript
复制
    Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-message-ttl' for queue 'queueName' in vhost '/': received none but current is the value '604800000' of type 'long', class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:443)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:263)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:136)
... 10 more

如何使Flink只订阅一个RabbitMQ队列而不尝试创建一个新队列?谢谢你们所有人。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-05-14 11:27:20

您可以编写自己的类,扩展RMQSource并重写setupQueue方法,以避免创建队列。

票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/50316992

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档