首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Flink Kafka连接器连接到eventhub

Flink Kafka连接器连接到eventhub
EN

Stack Overflow用户
提问于 2019-03-21 20:14:29
回答 1查看 677关注 0票数 0

我正在使用Apache Flink,并尝试使用Apache Kafka协议连接到Azure eventhub以接收来自它的消息。我设法连接到Azure eventhub并接收消息,但我不能使用flink功能"setStartFromTimestamp(...)“如下所述(https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration)。当我试图从timestamp获取一些消息时,Kafka说broker端的消息格式是0.10.0之前的版本。有没有人遇到过这种情况?Apache Kafka客户端版本为2.0.1 Apache Flink版本为1.7.2

更新:尝试在消费包中使用Azure-Event-Hub quickstart examples (https://github.com/Azure/azure-event-hubs-for-kafka/tree/master/quickstart/java)添加代码以获取带有时间戳的偏移量,如果消息版本低于0.10.0 kafka版本,则返回null。

代码语言:javascript
复制
        List<PartitionInfo> partitionInfos = consumer.partitionsFor(TOPIC);
        List<TopicPartition> topicPartitions = partitionInfos.stream().map(pi -> new TopicPartition(pi.topic(), pi.partition())).collect(Collectors.toList());
        Map<TopicPartition, Long> topicPartitionToTimestampMap = topicPartitions.stream().collect(Collectors.toMap(tp -> tp, tp -> 0L));
        Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestamp = consumer.offsetsForTimes(topicPartitionToTimestampMap);
        System.out.println(offsetAndTimestamp);
EN

回答 1

Stack Overflow用户

发布于 2019-10-15 07:44:11

很抱歉我们错过了这个。EH现在支持Kafka offsetsForTimes() (以前不支持)。

请随时在未来针对我们的Github提出问题。https://github.com/Azure/azure-event-hubs-for-kafka

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/55280227

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档