首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Parquet大容量格式的压缩使用

Parquet大容量格式的压缩使用
EN

Stack Overflow用户
提问于 2022-06-22 13:57:10
回答 1查看 278关注 0票数 2

由于Apache的1.15版本,您可以使用压缩功能将多个文件合并到一个文件中。https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/#compaction

如何使用批量Parquet格式的压缩?RecordWiseFileCompactor.Reader的现有实现(DecoderBasedReader和ImputFormatBasedReader)似乎不适合Parquet。

此外,我们找不到任何的例子,压缩Parquet或其他散装格式。

EN

回答 1

Stack Overflow用户

发布于 2022-09-28 02:53:30

flink的文档中提到了两种类型的文件压缩器。

OutputStreamBasedFileCompactor :用户可以将压缩后的结果写入输出流。当用户不希望或不能从输入文件中读取记录时,这是非常有用的。 RecordWiseFileCompactor :压缩程序可以从输入文件逐个读取记录,并将类似于FileWriter的结果文件写入结果文件。

如果我没记错,Parquet会在文件末尾保存元信息。因此,显然我们需要使用RecordWiseFileCompactor。因为我们需要读取整个Parquet文件,这样我们就可以在文件的末尾获得元信息。然后,我们可以使用元信息(行组数、模式)来解析文件。

java中,要构造一个RecordWiseFileCompactor,我们需要一个RecordWiseFileCompactor.Reader.Factory实例。

接口RecordWiseFileCompactor.Reader.Factory、DecoderBasedReader.Factory和InputFormatBasedReader.Factory分别有两种实现。

DecoderBasedReader.Factory创建一个DecoderBasedReader实例,该实例从InputStream读取整个文件内容。我们可以将字节加载到缓冲区中,并从字节缓冲区解析文件,这显然是痛苦的。所以我们不使用这个实现。

InputFormatBasedReader.Factory创建一个InputFormatBasedReader,它使用我们传递给InputFormatBasedReader.Factory构造函数的FileInputFormat供应商读取整个文件内容。

InputFormatBasedReader实例使用FileInputFormat到逐条读取记录,并将记录传递给我们传递给forBulkFormat调用的写入器,直到文件结束。

作者收到所有的记录和将记录压缩到一个文件中

因此,问题变成了什么是FileInputFormat,以及如何实现它。

虽然FileInputFormat类的方法和字段很多,但是我们知道只有四个方法是从InputFormatBasedReader中从上面提到的InputFormatBasedReader源代码调用的。

  • 打开(FileInputSplit fileSplit),它打开文件
  • reachedEnd(),它检查我们是否到达文件末尾
  • nextRecord(),它从打开的文件中读取下一条记录
  • close(),它清理站点

幸运的是,我们可以利用包AvroParquetReader中的org.apache.parquet.avro。它已经实现了open/read/close。因此,我们可以将读取器封装在FileInputFormat中,并使用AvroParquetReader完成所有的脏工作。

下面是一个示例代码片段

代码语言:javascript
复制
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.InputFile;

import java.io.IOException;

public class ExampleFileInputFormat extends FileInputFormat<GenericRecord> {

    private ParquetReader<GenericRecord> parquetReader;
    private GenericRecord readRecord;


    @Override
    public void open(FileInputSplit split) throws IOException {
        Configuration config = new Configuration();
        // set hadoop config here
        // for example, if you are using gcs, set fs.gs.impl here
        // i haven't tried to use core-site.xml but i believe this is feasible
        InputFile inputFile = HadoopInputFile.fromPath(new org.apache.hadoop.fs.Path(split.getPath().toUri()), config);
        parquetReader = AvroParquetReader.<GenericRecord>builder(inputFile).build();
        readRecord = parquetReader.read();
    }

    @Override
    public void close() throws IOException {
        parquetReader.close();
    }

    @Override
    public boolean reachedEnd() throws IOException {
        return readRecord == null;
    }

    @Override
    public GenericRecord nextRecord(GenericRecord genericRecord) throws IOException {
        GenericRecord r = readRecord;
        readRecord = parquetReader.read();
        return r;
    }
}

然后,您可以像下面这样使用ExampleFileInputFormat

代码语言:javascript
复制
FileSink<GenericRecord> sink = FileSink.forBulkFormat(
                new Path(path),
                AvroParquetWriters.forGenericRecord(schema))
        .withRollingPolicy(OnCheckpointRollingPolicy.build())
        .enableCompact(
                FileCompactStrategy.Builder.newBuilder()
                        .enableCompactionOnCheckpoint(10)
                        .build(),
                new RecordWiseFileCompactor<>(
                        new InputFormatBasedReader.Factory<>(new SerializableSupplierWithException<FileInputFormat<GenericRecord>, IOException>() {
                            @Override
                            public FileInputFormat<GenericRecord> get() throws IOException {
                                FileInputFormat<GenericRecord> format = new ExampleFileInputFormat();
                                return format;
                            }
                        })
                ))
        .build();

我已经成功地将它部署到k8s上的flink和gcs上的压缩文件中。有一些关于部署的注意事项。

  • 您需要从https://flink.apache.org/downloads.html (在网页中搜索预捆绑hadoop )下载flink阴影Hadoop,并将jar下载到$FLINK_HOME/lib/
  • 如果您正在向某个对象存储(例如gcs )写入文件,则需要遵循插件指令。请记住将插件jar放入插件文件夹,而不是lib文件夹。
  • 如果要将文件写入某个对象存储,则需要从云服务供应商下载连接器jar。例如,我正在使用gcs,并在GCP指令之后下载gcs连接器jar。将jar放入一些文件夹,而不是$FLINK_HOME/lib或$FLINK_HOME/plugins。我将连接器jar放入一个新创建的文件夹$FLINK_HOME/hadoop中。
  • Set环境HADOOP_CLASSPATH=$FLINK_HOME/lib/YOUR_SHADED_HADOOP_JAR:$FLINK_HOME/hadoop-lib/YOUR_CONNECTOR_JAR

经过所有这些步骤,你可以开始你的工作,并好走。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72716885

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档