我们正在努力将Apache Storm与Kafka的Confluent框架集成在一起。我们正在使用名为"Pyleus“的storm的python包装器
我们设置了一个Confluent-Kafka JDBC连接器来监视数据库表,每当DB中发生变化时,新记录就会以Avro格式作为Kafka消息发送。
在Pyleus bolt中,我们能够获得Kafka消息,但是,我们不能将其反序列化为JSON。
我们使用了两个名为"avro_json_serializer“和"avro”的python-Avro模块。当我尝试反序列化我放在一起的简单Avro文件时,它们会起作用。
Kafka消息中Avro数据的Avro模式是通过使用HTTP GET从Confluent的模式注册表中获得的。我将Kafka消息中的schema和Avro数据放入两个文件中,下面是我的测试程序:
import avro
import avro_json_serializer as ajs
import json
# Avro schema from Confluent's schema registry using HTTP GET
schema_string = open("realAvroSchemaFromKK.avsc").read()
schema_dict = json.loads(schema_string)
avro_schema = avro.schema.make_avsc_object(schema_dict, avro.schema.Names())
serializer = ajs.AvroJsonSerializer(avro_schema)
# Avro data with in Kafka message - I wrote it into this file
avrofile = open("realAvroFromKK.avro", "r")
avro = avrofile.read()
jsonData = serializer.to_json(avro) # where the code error out #
print jsonData我解释错误消息的方式是我的avro模式与我的avro数据不匹配:
avro.io.AvroTypeException: The datum �bankbankHoward �����THoward �����T� is not an example of the schema {
"namespace": "example.avro",
"type": "record",
"connect.name": "TABLE_NAME",
"fields": [
{
"type": "int",
"name": "Column_1"
},
... (omitting the rest of the schema)我从here上读到来自Confluent框架的Avro格式的Kafka消息在消息的开头有4个额外的字节来表示模式ID。我试着去掉Avro数据的前4个字节,然后把它发送给"serializer.to_json()“,但还是没有成功。
帮助!
发布于 2016-04-14 19:40:01
当我通过Storm kafka spout读取Kafka融合数据时,我遇到了完全类似的问题。下面是为我工作的等效Java代码。
ByteBuffer input = ByteBuffer.wrap(data);
int id = input.getInt();
int start = input.position() + 1;
MyAvroObject obj = null;
try {
obj = datum_reader.read(null, DecoderFactory.get().binaryDecoder(input.array(), start, input.limit(), null));
} catch (IOException e) {
e.printStackTrace();
}
return obj;ByteBuffer上的getInt()和position方法将指针移动到架构Id之后。希望这能有所帮助。
https://stackoverflow.com/questions/35712601
复制相似问题