首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >avro.io.AvroTypeException: datum <avro data>不是架构{...}的示例

avro.io.AvroTypeException: datum <avro data>不是架构{...}的示例
EN

Stack Overflow用户
提问于 2016-03-01 08:39:13
回答 1查看 2.4K关注 0票数 0

我们正在努力将Apache Storm与Kafka的Confluent框架集成在一起。我们正在使用名为"Pyleus“的storm的python包装器

我们设置了一个Confluent-Kafka JDBC连接器来监视数据库表,每当DB中发生变化时,新记录就会以Avro格式作为Kafka消息发送。

在Pyleus bolt中,我们能够获得Kafka消息,但是,我们不能将其反序列化为JSON。

我们使用了两个名为"avro_json_serializer“和"avro”的python-Avro模块。当我尝试反序列化我放在一起的简单Avro文件时,它们会起作用。

Kafka消息中Avro数据的Avro模式是通过使用HTTP GET从Confluent的模式注册表中获得的。我将Kafka消息中的schema和Avro数据放入两个文件中,下面是我的测试程序:

代码语言:javascript
复制
import avro
import avro_json_serializer as ajs

import json

# Avro schema from Confluent's schema registry using HTTP GET
schema_string = open("realAvroSchemaFromKK.avsc").read()

schema_dict = json.loads(schema_string)
avro_schema = avro.schema.make_avsc_object(schema_dict, avro.schema.Names())

serializer = ajs.AvroJsonSerializer(avro_schema)

# Avro data with in Kafka message - I wrote it into this file
avrofile = open("realAvroFromKK.avro", "r")
avro = avrofile.read()

jsonData = serializer.to_json(avro) # where the code error out #

print jsonData

我解释错误消息的方式是我的avro模式与我的avro数据不匹配:

代码语言:javascript
复制
avro.io.AvroTypeException: The datum �bankbankHoward �����THoward �����T� is not an example of the schema {
  "namespace": "example.avro",
  "type": "record",
  "connect.name": "TABLE_NAME",
  "fields": [
    {
      "type": "int",
      "name": "Column_1"
    },
    ... (omitting the rest of the schema)

我从here上读到来自Confluent框架的Avro格式的Kafka消息在消息的开头有4个额外的字节来表示模式ID。我试着去掉Avro数据的前4个字节,然后把它发送给"serializer.to_json()“,但还是没有成功。

帮助!

EN

回答 1

Stack Overflow用户

发布于 2016-04-14 19:40:01

当我通过Storm kafka spout读取Kafka融合数据时,我遇到了完全类似的问题。下面是为我工作的等效Java代码。

代码语言:javascript
复制
    ByteBuffer input = ByteBuffer.wrap(data);
    int id = input.getInt();
    int start = input.position() + 1;
    MyAvroObject obj = null;
    try {
        obj  = datum_reader.read(null, DecoderFactory.get().binaryDecoder(input.array(), start, input.limit(), null));

    } catch (IOException e) {
        e.printStackTrace();
    }
    return obj;

ByteBuffer上的getInt()和position方法将指针移动到架构Id之后。希望这能有所帮助。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/35712601

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档