首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >与Alpakka一起使用KafkaAvroDeserializer

与Alpakka一起使用KafkaAvroDeserializer
EN

Stack Overflow用户
提问于 2019-05-31 16:06:03
回答 1查看 559关注 0票数 1

我有一个SchemaRegistry和一个KafkaBroker,我使用Avrov1.8.1从它们中提取数据。对于反序列化,我一直在使用Confluent的KafkaAvroDeserializer。现在,我打算重构我的代码,以便使用Alpakka提供的Elasticsearch API,但不幸的是,这破坏了反序列化,因为它导致了NullPointerExceptions:

线程“主”org.apache.kafka.common.errors.SerializationException:错误中的异常,在偏移量0处反序列化分区主题-0的键/值。如有需要,请翻开记录,继续消费。导致: org.apache.kafka.common.errors.SerializationException:错误反序列化id 2的Avro消息,原因是: io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:116) at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:88) at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55) at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:58引起的java.lang.NullPointerException)在org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1030) at org.apache.kafka.clients.consumer.internals.Fetcher.access$3300(Fetcher.java:110) at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1250) at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1400(Fetcher.java:1099) at org.apache.kafka.clients.consumer.internalsorg.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:506),org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1269),org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1200),org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1176),de.adesso.fds.connectors.dpa.news.NewsConsumer.main(MyConsumer.java:58)

我一直在使用Alpakka的ConsumerSettings API,如本示例所述

代码语言:javascript
复制
val system = ActorSystem.create();

// necessary to convert timestamps correctly in Avro Version 1.8.1 to avoid ClassCastExceptions
SpecificData.get().addLogicalTypeConversion(new TimeConversions.TimestampConversion());

val consumerSettings = ConsumerSettings.create(system, new StringDeserializer(), new KafkaAvroDeserializer())
    .withBootstrapServers(kafkaBootstrapServerUrl)
    .withClientId(InetAddress.getLocalHost().getHostName())
    .withGroupId("" + new Random().nextInt())
    .withProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl)
    .withProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true")
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
    .withStopTimeout(Duration.ofSeconds(5));

这些设置导致了NullPointerExceptions,而这个普通的Kafka消费者道具工作得很好:

代码语言:javascript
复制
val props = new Properties();
props.put(ConsumerConfig.CLIENT_ID_CONFIG, InetAddress.getLocalHost().getHostName()); 
props.put(ConsumerConfig.GROUP_ID_CONFIG, "" + new Random().nextInt());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerUrl);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// necessary to convert timestamps correctly in newer Avro Versions and to avoid ClassCastExceptions
SpecificData.get().addLogicalTypeConversion(new TimeConversions.TimestampConversion());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
val consumer = new KafkaConsumer<String, MyClass>(props);

在工作示例中,ConsumerRecords的值成功地反序列化为AvroMavenPlugin从模式生成的类。

任何提示都是非常感谢的!

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-06-04 02:08:17

我认为您需要将new KafkaAvroDeserializer()拉到它自己的变量中,然后调用该实例上的.configure()方法来传递一个非空注册表URL。

然后将配置好的实例传递给ConsumerSettings.create

FWIW,取决于您的需要,Kafka Connect可以很好地加载弹性搜索

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56398444

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档