首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Flink KafkaSource读取来自主题的所有消息

Flink KafkaSource读取来自主题的所有消息
EN

Stack Overflow用户
提问于 2022-05-24 20:01:47
回答 1查看 756关注 0票数 0

我的目标是使用Flink KafkaSource阅读卡夫卡主题的所有信息。我试着用批处理和流模式执行。问题是:当我将env.setParallelism设置为大于2时时,必须使用包含bug的Sink。

我想使用的Kafka主题包含3个分区。这是我拥有的代码片段:

代码语言:javascript
复制
KafkaSourceBuilder<Request> builder = KafkaSource.builder();
    builder.setBootstrapServers(kafkaBrokers);
    builder.setProperty("partition.discovery.interval.ms", "10000");
    builder.setTopics(topic);
    builder.setGroupId(groupId);
    builder.setBounded(OffsetsInitializer.latest());
    builder.setStartingOffsets(OffsetsInitializer.earliest());
    builder.setDeserializer(KafkaRecordDeserializationSchema.of(deserializer));

DataStreamSource<Request> streamSource = streamExecutionEnvironment.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
    streamSource.map(new MyMapper())
            .addSink(new Sink(props)).setParallelism(3) //by this setting I expected to have exactly 3 consumers - per partition/split, but it doesn't work when I do not set anything as well
            .name("Flink " + context.getJobDetail().getKey());

这段代码应该在将要被文件化的Spring应用程序中运行,我配置了一个定期执行的石英作业,streamExecutionEnvironment是本地环境:StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

在这一点上,主题中已经有超过1000万条消息。执行作业时,我可以在日志中看到:

代码语言:javascript
复制
    [ -> Map (1/1)#0] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=rrequest_consumer_test-0, groupId=request_consumer_test] Seeking to EARLIEST offset of partition request-2
INFO 7748 --- [ -> Map (1/1)#0] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=request_consumer_test-0, groupId=request_consumer_test] Seeking to EARLIEST offset of partition request-0
INFO 7748 --- [ -> Map (1/1)#0] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=request_consumer_test-0, groupId=request_consumer_test] Seeking to EARLIEST offset of partition request-1

然后,他们总共消耗了大约100万条消息,停止了消费,我看到了所有3条消息:

代码语言:javascript
复制
[ -> Map (1/1)#0] o.a.f.c.b.s.reader.fetcher.SplitFetcher  : Finished reading from splits [request-1]

因此,他们并没有完全使用这个主题,只是其中的一部分。当石英作业被重新触发时,它再次从OffsetsInitializer.earliest()开始读取,它们使用重复的消息,但也使用新的消息,不仅是新添加到主题中,而且还有一些在以前的执行过程中没有使用的消息。

我也尝试重命名消费者群体,以消除与补偿的问题,以防消费者在上次消费后提交。

我的问题是-我如何配置数据流,以充分阅读主题。我的问题与setParallelism(1)一般的设置或并行性、使用者组配置或其他任何东西有什么关系?请给我任何关于疑难解答的建议。

EN

回答 1

Stack Overflow用户

发布于 2022-05-26 09:30:42

这个问题与

代码语言:javascript
复制
builder.setBounded(OffsetsInitializer.latest());

这一行告诉卡夫卡将消息读入作业开始时看到的最后一个偏移量。然后,它将停止消耗更多的消息。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72368847

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档