试图将简单的Parquet文件读入我的Google DataFlow管道
使用以下代码
Read.Bounded<KV<Void, GenericData>> results = HadoopFileSource.readFrom("/home/avi/tmp/db_demo/simple.parquet", AvroParquetInputFormat.class, Void.class, GenericData.class);在运行管道时始终触发以下异常
org.apache.avro.generic.GenericData类的IllegalStateException:无法找到编码器
似乎HadoopFileSource内部的这个方法不能处理这种类型的类,就像编码器一样
private <T> Coder<T> getDefaultCoder(Class<T> c) {
if (Writable.class.isAssignableFrom(c)) {
Class<? extends Writable> writableClass = (Class<? extends Writable>) c;
return (Coder<T>) WritableCoder.of(writableClass);
} else if (Void.class.equals(c)) {
return (Coder<T>) VoidCoder.of();
}
// TODO: how to use registered coders here?
throw new IllegalStateException("Cannot find coder for " + c);}
如有任何帮助,将不胜感激。
Avi
发布于 2017-02-13 19:25:56
这是HadoopFileSource设计中的一个问题。我建议移到apache-beam或(scio),这是dataflow sdk的apache“版本”(以及“未来”)。一旦你在光束上,你就可以:
这将是scala (但您可以轻松地转换为java):
HDFSFileSource.from(
input,
classOf[AvroParquetInputFormat[AvroSchemaClass]],
AvroCoder.of(classOf[AvroSchemaClass]),
new SerializableFunction[KV[Void, AvroSchemaClass], AvroSchemaClass]() {
override def apply(e: KV[Void, AvroSchemaClass]): AvroSchemaClass =
CoderUtils.clone(AvroCoder.of(classOf[AvroSchemaClass]), e.getValue)
}
)它是from的另一个版本,它接受coder。
https://stackoverflow.com/questions/41912890
复制相似问题