首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >AVRO原语类型的Serde类

AVRO原语类型的Serde类
EN

Stack Overflow用户
提问于 2018-08-21 20:03:18
回答 3查看 3.1K关注 0票数 4

我正在用Java编写一个Kafka流应用程序,它接受连接器创建的输入主题,该连接器使用模式注册表和avro作为键和值转换器。连接器产生以下模式:

代码语言:javascript
复制
key-schema: "int"
value-schema:{
"type": "record",
"name": "User",
"fields": [
    {"name": "firstname", "type": "string"},
    {"name": "lastname",  "type": "string"}
]}

实际上,有几个主题,键模式总是"int“,值模式总是某种类型(用户、产品等)的记录。我的代码包含以下定义

代码语言:javascript
复制
Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url", schemaRegistryUrl);

Serde<User> userSerde = new SpecificAvroSerde<>();
userSerde.configure(serdeConfig, false);

起初,我尝试使用类似于Consumed.with(Serdes.Integer(), userSerde);的内容来处理这个主题,但这不起作用,因为Serdes.Integer()希望整数使用4个字节进行编码,但是avro使用可变长度的编码。使用Consumed.with(Serdes.Bytes(), userSerde);可以工作,但我真的想要int而不是字节,所以我将代码更改为

代码语言:javascript
复制
KafkaAvroDeserializer keyDeserializer = new KafkaAvroDeserializer()
KafkaAvroSerializer keySerializer = new KafkaAvroSerializer();
keyDeserializer.configure(serdeConfig, true); 
keySerializer.configure(serdeConfig, true);
Serde<Integer> keySerde = (Serde<Integer>)(Serde)Serdes.serdeFrom(keySerializer, keyDeserializer);

这使得编译器产生了一个警告(它不喜欢(Serde<Integer>)(Serde)转换),但它允许我使用

Consumed.with(keySerde, userSerde);,并获得一个整数作为键。这是很好的工作,我的应用程序表现如预期(太棒了!)但是现在我想为键/值定义默认的serde,但我无法让它工作。

设置默认值serde很简单:

代码语言:javascript
复制
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);

但是,我不知道如何定义默认的键serde。

我试过了

  1. org.apache.kafka.common.serialization.Serdes$WrapperSerde生成运行时错误:无法为streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, keySerde.getClass().getName());找到公共的无参数构造函数
  2. streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);生成运行时错误:不能将java.lang.Integer转换为org.apache.avro.specific.SpecificRecord

我遗漏了什么?谢谢。

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2018-08-21 23:01:18

更新(版本5.5及更新)

合流版本5.5通过PrimitiveAvroSerde (cf )添加了对原始Avro类型的本机支持。https://github.com/confluentinc/schema-registry/blob/5.5.x/avro-serde/src/main/java/io/confluent/kafka/streams/serdes/avro/PrimitiveAvroSerde.java)

原始答案(版本5.4及更高版本)

这是个众所周知的问题。原始的Avro类型不能很好地工作在Confluent的AvroSerdes中,因为Serdes只适用于GenericAvroRecordSpecificAvroRecord

比较https://github.com/confluentinc/schema-registry/tree/master/avro-serde/src/main/java/io/confluent/kafka/streams/serdes/avro

因此,基于KafkaAvroSerializerKafkaAvroDeserializer构建您自己的Serde是正确的方法。为了能够将它作为默认的Serde传递到配置中,您不能使用Serdes.serdeFrom,因为类型信息由于genrics类型擦除而丢失。

但是,您可以实现自己的类来扩展Serde接口,并将自定义类传递到配置中:

代码语言:javascript
复制
public class MySerde extends Serde<Integer> {
    // use KafkaAvroSerializer and KafkaAvroDeserializer and cast `Object` to `Integer`
}

config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, MySerde.class);
票数 6
EN

Stack Overflow用户

发布于 2019-02-13 09:44:29

谢谢@MatthiasJ.Sax的提示,我想在solution.please周围免费发布工作来增强它。

代码语言:javascript
复制
import java.util.Collections;
import java.util.Map;

import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;

public class GenericPrimitiveAvroSerDe<T> implements Serde<T> {

    private final Serde<Object> inner;

    /**
     * Constructor used by Kafka Streams.
     */
    public GenericPrimitiveAvroSerDe() {
        inner = Serdes.serdeFrom(new KafkaAvroSerializer(), new KafkaAvroDeserializer());
    }

    public GenericPrimitiveAvroSerDe(SchemaRegistryClient client) {
        this(client, Collections.emptyMap());
    }

    public GenericPrimitiveAvroSerDe(SchemaRegistryClient client, Map<String, ?> props) {
        inner = Serdes.serdeFrom(new KafkaAvroSerializer(client), new KafkaAvroDeserializer(client, props));
    }

    @Override
    public void configure(final Map<String, ?> serdeConfig, final boolean isSerdeForRecordKeys) {
        inner.serializer().configure(serdeConfig, isSerdeForRecordKeys);
        inner.deserializer().configure(serdeConfig, isSerdeForRecordKeys);
    }

    @Override
    public void close() {
        // TODO Auto-generated method stub
        inner.serializer().close();
        inner.deserializer().close();

    }

    @SuppressWarnings("unchecked")
    @Override
    public Serializer<T> serializer() {
        // TODO Auto-generated method stub
        Object obj = inner.serializer();
        return (Serializer<T>) obj;

    }

    @SuppressWarnings("unchecked")
    @Override
    public Deserializer<T> deserializer() {
        // TODO Auto-generated method stub
        Object obj = inner.deserializer();
        return (Deserializer<T>) obj;

    }

}

作为默认流配置的用法:

代码语言:javascript
复制
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,GenericPrimitiveAvroSerDe.class);
            props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,GenericPrimitiveAvroSerDe.class);

覆盖默认值:

代码语言:javascript
复制
final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url",
                                                                        "http://localhost:8081");
       final GenericPrimitiveAvroSerDe<String> keyGenericAvroSerde = new GenericPrimitiveAvroSerDe<String>();
       keyGenericAvroSerde.configure(serdeConfig, true); // `true` for record keys
       final GenericPrimitiveAvroSerDe<Long> valueGenericAvroSerde = new GenericPrimitiveAvroSerDe<Long>();
       valueGenericAvroSerde.configure(serdeConfig, false); // `false` for record values
票数 5
EN

Stack Overflow用户

发布于 2019-08-16 13:10:35

另外,如果你想用@Thiyaga Rajan这个伟大的解决方案来解决卡夫卡的消费者和生产者

代码语言:javascript
复制
consumerConfig.put(KEY_DESERIALIZER_CLASS_CONFIG,GenericPrimitiveAvroSerDe.class);
producerConfig.put(KEY_SERIALIZER_CLASS_CONFIG, GenericPrimitiveAvroSerDe.class);

将此添加到类中

代码语言:javascript
复制
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;

    public class GenericPrimitiveAvroSerDe<T> implements Serde<T>, Serializer<T>, Deserializer<T> {

      @Override
      public T deserialize(String topic, byte[] data) {
        return this.deserializer().deserialize(topic, data);
      }

      @Override
      public byte[] serialize(String topic, T data) {
        return this.serializer().serialize(topic, data);
      }

    ...

    }
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/51955921

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档