由于Apache的1.15版本,您可以使用压缩功能将多个文件合并到一个文件中。https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/#compaction
如何使用批量Parquet格式的压缩?RecordWiseFileCompactor.Reader的现有实现(DecoderBasedReader和ImputFormatBasedReader)似乎不适合Parquet。
此外,我们找不到任何的例子,压缩Parquet或其他散装格式。
发布于 2022-09-28 02:53:30
flink的文档中提到了两种类型的文件压缩器。
OutputStreamBasedFileCompactor :用户可以将压缩后的结果写入输出流。当用户不希望或不能从输入文件中读取记录时,这是非常有用的。 RecordWiseFileCompactor :压缩程序可以从输入文件逐个读取记录,并将类似于FileWriter的结果文件写入结果文件。
如果我没记错,Parquet会在文件末尾保存元信息。因此,显然我们需要使用RecordWiseFileCompactor。因为我们需要读取整个Parquet文件,这样我们就可以在文件的末尾获得元信息。然后,我们可以使用元信息(行组数、模式)来解析文件。
在java中,要构造一个RecordWiseFileCompactor,我们需要一个RecordWiseFileCompactor.Reader.Factory实例。
接口RecordWiseFileCompactor.Reader.Factory、DecoderBasedReader.Factory和InputFormatBasedReader.Factory分别有两种实现。
DecoderBasedReader.Factory创建一个DecoderBasedReader实例,该实例从InputStream读取整个文件内容。我们可以将字节加载到缓冲区中,并从字节缓冲区解析文件,这显然是痛苦的。所以我们不使用这个实现。
InputFormatBasedReader.Factory创建一个InputFormatBasedReader,它使用我们传递给InputFormatBasedReader.Factory构造函数的FileInputFormat供应商读取整个文件内容。
InputFormatBasedReader实例使用FileInputFormat到逐条读取记录,并将记录传递给我们传递给forBulkFormat调用的写入器,直到文件结束。
作者收到所有的记录和将记录压缩到一个文件中。
因此,问题变成了什么是FileInputFormat,以及如何实现它。
虽然FileInputFormat类的方法和字段很多,但是我们知道只有四个方法是从InputFormatBasedReader中从上面提到的InputFormatBasedReader源代码调用的。
幸运的是,我们可以利用包AvroParquetReader中的org.apache.parquet.avro。它已经实现了open/read/close。因此,我们可以将读取器封装在FileInputFormat中,并使用AvroParquetReader完成所有的脏工作。
下面是一个示例代码片段
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.InputFile;
import java.io.IOException;
public class ExampleFileInputFormat extends FileInputFormat<GenericRecord> {
private ParquetReader<GenericRecord> parquetReader;
private GenericRecord readRecord;
@Override
public void open(FileInputSplit split) throws IOException {
Configuration config = new Configuration();
// set hadoop config here
// for example, if you are using gcs, set fs.gs.impl here
// i haven't tried to use core-site.xml but i believe this is feasible
InputFile inputFile = HadoopInputFile.fromPath(new org.apache.hadoop.fs.Path(split.getPath().toUri()), config);
parquetReader = AvroParquetReader.<GenericRecord>builder(inputFile).build();
readRecord = parquetReader.read();
}
@Override
public void close() throws IOException {
parquetReader.close();
}
@Override
public boolean reachedEnd() throws IOException {
return readRecord == null;
}
@Override
public GenericRecord nextRecord(GenericRecord genericRecord) throws IOException {
GenericRecord r = readRecord;
readRecord = parquetReader.read();
return r;
}
}然后,您可以像下面这样使用ExampleFileInputFormat
FileSink<GenericRecord> sink = FileSink.forBulkFormat(
new Path(path),
AvroParquetWriters.forGenericRecord(schema))
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.enableCompact(
FileCompactStrategy.Builder.newBuilder()
.enableCompactionOnCheckpoint(10)
.build(),
new RecordWiseFileCompactor<>(
new InputFormatBasedReader.Factory<>(new SerializableSupplierWithException<FileInputFormat<GenericRecord>, IOException>() {
@Override
public FileInputFormat<GenericRecord> get() throws IOException {
FileInputFormat<GenericRecord> format = new ExampleFileInputFormat();
return format;
}
})
))
.build();我已经成功地将它部署到k8s上的flink和gcs上的压缩文件中。有一些关于部署的注意事项。
经过所有这些步骤,你可以开始你的工作,并好走。
https://stackoverflow.com/questions/72716885
复制相似问题