首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >带有Kafka源代码和数据流运行器的Beam java SDK2.10.0: windowed Count.perElement从不发出数据

带有Kafka源代码和数据流运行器的Beam java SDK2.10.0: windowed Count.perElement从不发出数据
EN

Stack Overflow用户
提问于 2019-02-26 23:50:37
回答 1查看 97关注 0票数 1

我在谷歌DataFlow上运行Beam到2.10.0作业时遇到问题

流程很简单:我使用Kafka作为源,然后应用固定的窗口,然后逐个键计数元素。但看起来,在作业耗尽之前,数据永远不会离开计数阶段。Count.PerElement/Combine.perKey(Count)/Combine.GroupedValues.out0的输出集合始终为零。只有在排出数据流作业之后才会发出元素。

代码如下:

代码语言:javascript
复制
public KafkaProcessingJob(BaseOptions options) {

    PCollection<GenericRecord> genericRecordPCollection = Pipeline.create(options)
                     .apply("Read binary Kafka messages", KafkaIO.<String, byte[]>read()
                           .withBootstrapServers(options.getBootstrapServers())
                           .updateConsumerProperties(configureConsumerProperties())
                           .withCreateTime(Duration.standardMinutes(1L))
                           .withTopics(inputTopics)
                           .withReadCommitted()
                           .commitOffsetsInFinalize()
                           .withKeyDeserializer(StringDeserializer.class)
                           .withValueDeserializer(ByteArrayDeserializer.class))

                    .apply("Map binary message to Avro GenericRecord", new DecodeBinaryKafkaMessage());

                    .apply("Apply windowing to records", Window.into(FixedWindows.of(Duration.standardMinutes(5)))
                                       .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
                                       .discardingFiredPanes()
                                       .withAllowedLateness(Duration.standardMinutes(5)))

                    .apply("Write aggregated data to BigQuery", MapElements.into(TypeDescriptors.strings()).via(rec -> getKey(rec)))
                            .apply(Count.<String>perElement())
                            .apply(
                                new WriteWindowedToBigQuery<>(
                                    project,
                                    dataset,
                                    table,
                                    configureWindowedTableWrite()));   
}

private Map<String, Object> configureConsumerProperties() {
    Map<String, Object> configUpdates = Maps.newHashMap();
    configUpdates.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    return configUpdates;
}

private static String getKey(GenericRecord record) {
    //extract key
}

看起来flow从未离开过.apply(Count.<String>perElement())的舞台

有人能帮帮忙吗?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-03-01 20:06:13

我已经找到原因了。

它与这里使用的TimestampPolicy (.withCreateTime(Duration.standardMinutes(1L)))相关。

由于我们的Kafka主题中存在空分区,主题水印从未使用默认TimestampPolicy提前。我需要实现自定义策略来解决这个问题。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/54889350

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档