我正在使用Apache Flink,并尝试使用Apache Kafka协议连接到Azure eventhub以接收来自它的消息。我设法连接到Azure eventhub并接收消息,但我不能使用flink功能"setStartFromTimestamp(...)“如下所述(https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration)。当我试图从timestamp获取一些消息时,Kafka说broker端的消息格式是0.10.0之前的版本。有没有人遇到过这种情况?Apache Kafka客户端版本为2.0.1 Apache Flink版本为1.7.2
更新:尝试在消费包中使用Azure-Event-Hub quickstart examples (https://github.com/Azure/azure-event-hubs-for-kafka/tree/master/quickstart/java)添加代码以获取带有时间戳的偏移量,如果消息版本低于0.10.0 kafka版本,则返回null。
List<PartitionInfo> partitionInfos = consumer.partitionsFor(TOPIC);
List<TopicPartition> topicPartitions = partitionInfos.stream().map(pi -> new TopicPartition(pi.topic(), pi.partition())).collect(Collectors.toList());
Map<TopicPartition, Long> topicPartitionToTimestampMap = topicPartitions.stream().collect(Collectors.toMap(tp -> tp, tp -> 0L));
Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestamp = consumer.offsetsForTimes(topicPartitionToTimestampMap);
System.out.println(offsetAndTimestamp);发布于 2019-10-15 07:44:11
很抱歉我们错过了这个。EH现在支持Kafka offsetsForTimes() (以前不支持)。
请随时在未来针对我们的Github提出问题。https://github.com/Azure/azure-event-hubs-for-kafka
https://stackoverflow.com/questions/55280227
复制相似问题