我正在寻找一个例子来做一个类似于GenericRecordBase的Avro SpecificRecordBase对象的双射,或者是否有一个更简单的方法来使用AvroSerializer类作为Kafka键和值序列化器。
Injection<GenericRecord, byte[]> genericRecordInjection =
GenericAvroCodecs.toBinary(schema);
byte[] bytes = genericRecordInjection.apply(type);发布于 2016-08-06 01:29:59
https://github.com/miguno/kafka-storm-starter提供了这样的示例代码。
例如,请参见AvroDecoderBolt。在它的javadoc中:
此螺栓期望传入的数据为Avro编码的二进制格式,并根据
T的Avro模式进行序列化。它将把传入的数据反序列化为一个Tpojo,并将此pojo发送给下游消费者。因此,这个螺栓可以被认为是风暴相当于推特Bijection的Injection.invert[T, Array[Byte]](bytes)的Avro数据。
哪里
Avro T:基于所使用的基础Avro模式的Avro记录的类型(例如Tweet)。必须是Avro的SpecificRecordBase的子类。
代码的关键部分是(我将代码折叠成以下代码片段):
// With T <: SpecificRecordBase
implicit val specificAvroBinaryInjection: Injection[T, Array[Byte]] =
SpecificAvroCodecs.toBinary[T]
val bytes: Array[Byte] = ...; // the Avro-encoded data
val decodeTry: Try[T] = Injection.invert(bytes)
decodeTry match {
case Success(pojo) =>
System.out.println("Binary data decoded into pojo: " + pojo)
case Failure(e) => log.error("Could not decode binary data: " + Throwables.getStackTraceAsString(e))
}发布于 2017-05-05 11:44:48
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(new File("/Users/.../schema.avsc"));
Injection<Command, byte[]> objectInjection = SpecificAvroCodecs.toBinary(schema);
byte[] bytes = objectInjection.apply(c);https://stackoverflow.com/questions/38787596
复制相似问题