首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Avro架构演进:无法添加或删除字段

Avro架构演进:无法添加或删除字段
EN

Stack Overflow用户
提问于 2017-08-28 11:19:09
回答 1查看 4.6K关注 0票数 4

我目前正在尝试改进我的avro模式,根据文档,这应该没什么大不了的。但是,当添加或删除字段时,Avro无法反序列化字节。

我使用以下模式:

AvroSchemas.avsc:

代码语言:javascript
复制
[
  {
    "namespace": "stackoverflow.example.avro",
    "type": "record",
    "name": "Record_1_1",
    "fields": [
      {"name": "value0", "type": "string"}
    ]
  },
  {
    "namespace": "stackoverflow.example.avro",
    "type": "record",
    "name": "Record_1_2",
    "fields": [
      {"name": "value0", "type": "string"},
      {"name": "value1", "type": "string", "default": "Hello World"}
    ]
  },
  {
    "namespace": "stackoverflow.example.avro",
    "type": "record",
    "name": "Record_2_1",
    "fields": [
      {"name": "someList", "type": {"type": "array", "items": "int"}}
    ]
  },
  {
    "namespace": "stackoverflow.example.avro",
    "type": "record",
    "name": "Record_2_2",
    "fields": [
      {"name": "someBool", "type": "boolean", "default": "false"},
      {"name": "someList", "type": {"type": "array", "items": "int"}}
    ]
  }
]

类是使用以下Maven构建插件生成的:

代码语言:javascript
复制
  <plugin>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro-maven-plugin</artifactId>
    <version>1.8.2</version>
    <executions>
      <execution>
        <phase>generate-sources</phase>
        <goals>
          <goal>schema</goal>
        </goals>
        <configuration>
          <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
          <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
          <stringType>String</stringType>
        </configuration>
      </execution>
    </executions>
  </plugin>

这是我用来测试我的进化的代码:

AvroTest.java:

代码语言:javascript
复制
package stackoverflow.example;

import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.Objects;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import stackoverflow.example.avro.Record_1_1;
import stackoverflow.example.avro.Record_1_2;
import stackoverflow.example.avro.Record_2_1;
import stackoverflow.example.avro.Record_2_2;

public class AvroTest {

    public static void main(String[] args) throws Exception {
        executeTest0();
        executeTest1();
        executeTest2();
    }

    /**
     * Test if read and write methods work
     */
    private static void executeTest0() {
        Record_1_1 source1 = new Record_1_1("A");
        Record_1_1 dest1 = trySerializeDeserialize(source1, Record_1_1.class, Record_1_1.class);
        if (dest1 == null || !Objects.equals(source1.getValue0(), dest1.getValue0())) {
            throw new RuntimeException("Record_1_1 Test 0 failed");
        }

        Record_1_2 source2 = new Record_1_2("A", "B");
        Record_1_2 dest2 = trySerializeDeserialize(source2, Record_1_2.class, Record_1_2.class);
        if (dest2 == null || !Objects.equals(source2.getValue0(), dest2.getValue0()) || !Objects.equals(source2.getValue1(), dest2.getValue1())) {
            throw new RuntimeException("Record_1_2 Test 0 failed");
        }

        Record_2_1 source3 = new Record_2_1(new ArrayList<>());
        Record_2_1 dest3 = trySerializeDeserialize(source3, Record_2_1.class, Record_2_1.class);
        if (dest3 == null || !Objects.equals(source3.getSomeList(), dest3.getSomeList())) {
            throw new RuntimeException("Record_2_1 Test 0 failed");
        }

        Record_2_2 source4 = new Record_2_2(true, new ArrayList<>());
        Record_2_2 dest4 = trySerializeDeserialize(source4, Record_2_2.class, Record_2_2.class);
        if (dest4 == null || !Objects.equals(source4.getSomeBool(), dest4.getSomeBool()) || !Objects.equals(source4.getSomeList(), dest4.getSomeList())) {
            throw new RuntimeException("Record_2_2 Test 0 failed");
        }
    }

    private static void executeTest1() {
        Record_1_1 source1 = new Record_1_1("Test");
        Record_1_2 dest1 = trySerializeDeserialize(source1, Record_1_1.class, Record_1_2.class);
        if (dest1 == null || !Objects.equals(dest1.getValue1(), "Hello World")) {
            System.out.println("adding field with default value failed: " + dest1);
        }

        Record_1_2 source2 = new Record_1_2("Test0", "Test1");
        Record_1_1 dest2 = trySerializeDeserialize(source2, Record_1_2.class, Record_1_1.class);
        if (dest2 == null || !Objects.equals(source2.getValue0(), dest2.getValue0())) {
            System.out.println("removing field failed: " + dest2);
        }
    }

    private static void executeTest2() {
        Record_2_1 source1 = new Record_2_1(new ArrayList<>());
        Record_2_2 dest1 = trySerializeDeserialize(source1, Record_2_1.class, Record_2_2.class);
        if (dest1 == null || !Objects.equals(source1.getSomeList(), dest1.getSomeList())) {
            System.out.println("adding boolean field with default value failed: " + dest1);
        }

        Record_2_2 source2 = new Record_2_2(true, new ArrayList<>());
        Record_2_1 dest2 = trySerializeDeserialize(source2, Record_2_2.class, Record_2_1.class);
        if (dest2 == null || !Objects.equals(source2.getSomeList(), dest2.getSomeList())) {
            System.out.println("removing boolean field failed: " + dest2);
        }
    }

    private static <T, E> E trySerializeDeserialize(T source, Class<T> sourceClass, Class<E> destClass) {
        E result;

        try {
            byte[] bytes = write(source, sourceClass);
            result = read(bytes, destClass);
        } catch (Exception e) {
            result = null;
        }

        return result;
    }

    private static <T> byte[] write(T value, Class<T> clazz) throws Exception {
        byte[] bytes;

        try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
            Encoder encoder = EncoderFactory.get().binaryEncoder(bos, null);
            DatumWriter<T> writer = new SpecificDatumWriter<>(clazz);
            writer.write(value, encoder);

            encoder.flush();
            bytes = bos.toByteArray();
        }

        return bytes;
    }

    private static <T> T read(byte[] bytes, Class<T> clazz) throws Exception {
        Decoder decoder = DecoderFactory.get().binaryDecoder(bytes, null);
        DatumReader<T> reader = new SpecificDatumReader<>(clazz);

        return reader.read(null, decoder);
    }
}

输出:

代码语言:javascript
复制
adding field with default value failed: null
adding boolean field with default value failed: null
removing boolean field failed: null

根据docs,我所有的测试都应该正常工作(添加一个带有默认值的字段,或者在接收端删除一个字段)。但我不认为这些文档是为了好玩而写的,所以我是不是错过了什么场景?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-09-03 00:46:05

问题在于您试图反序列化数据的方式。当使用SpecificDatumReader(Class<T>)构造函数时,读取器假定作者的模式和读者的模式是相同的。

您可以通过使用SpecificDatumReader(Schema writer, Schema reader)来修复这个问题。例如:

代码语言:javascript
复制
private static <T, E> E read(byte[] bytes, Class<T> sourceClass, Class<E> destClass) throws Exception {
    Decoder decoder = DecoderFactory.get().binaryDecoder(bytes, null);
    DatumReader<E> reader = new SpecificDatumReader<>(
            SpecificData.get().getSchema(sourceClass),
            SpecificData.get().getSchema(destClass));

    return reader.read(null, decoder);
}

注意,DatumWriter的输出不是Avro文件,它总是包含一个用于序列化其标头中的数据的模式,而是一个没有头的序列化对象。如果要测试Avro文件,则应该使用DataFileWriterDataFileReader

所有模式更改都是兼容的,应该按照Avro格式规范工作。模式中唯一的错误是someBool的默认值-它应该是布尔值(false)而不是字符串("false")。

票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/45917760

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档