首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >冰山表没有看到生成的Parquet文件

冰山表没有看到生成的Parquet文件
EN

Stack Overflow用户
提问于 2022-10-22 18:23:50
回答 1查看 24关注 0票数 1

在我的用例中,创建了Iceberg格式的表。它只接收附加操作,因为它是关于在时间序列流中记录事件。为了评估Iceberg格式在这个用例中的使用情况,我创建了一个简单的Java程序,它创建了一组27600行代码。元数据和拼图文件都是创建的,但我无法通过Java (https://iceberg.apache.org/docs/latest/java-api-quickstart/)访问它们。我正在使用HadoopCatalogFileAppender<GenericRecord>。重要的是,我可以通过Python3脚本读取使用pyarrowdatafusion模块创建的Parquet文件,它是正确的!

我认为,我的程序中某些将生成的Parquet文件链接到目录中创建的表的方法必须丢失。

注意:我只在1.0.0版中使用的Java

API中有一个接受org.apache.iceberg.Transaction的org.apache.iceberg.DataFile对象,但是我还没有看到如何使用它的例子,我也不知道解决这个问题是否有用。

请参阅下面的程序:

代码语言:javascript
复制
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.*;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;

import java.io.File;
import java.io.IOException;
import java.time.LocalDate;
import java.time.temporal.ChronoUnit;
import java.util.List;

import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;

public class IcebergTableAppend {
    public static void main(String[] args) {
        System.out.println("Appending records ");
        Configuration conf = new Configuration();
        String lakehouse = "/tmp/iceberg-test";
        conf.set(CatalogProperties.WAREHOUSE_LOCATION, lakehouse);
        Schema schema = new Schema(
                required(1, "hotel_id", Types.LongType.get()),
                optional(2, "hotel_name", Types.StringType.get()),
                required(3, "customer_id", Types.LongType.get()),
                required(4, "arrival_date", Types.DateType.get()),
                required(5, "departure_date", Types.DateType.get()),
                required(6, "value", Types.DoubleType.get())
        );
        PartitionSpec spec = PartitionSpec.builderFor(schema)
                .month("arrival_date")
                .build();
        TableIdentifier id = TableIdentifier.parse("bookings.rome_hotels");
        String warehousePath = "file://" + lakehouse;
        Catalog catalog = new HadoopCatalog(conf, warehousePath);
        // rm -rf  /tmp/iceberg-test/bookings
        Table table = catalog.createTable(id, schema, spec);
        List<GenericRecord> records = Lists.newArrayList();
        // generating a bunch of records
        for (int j = 1; j <= 12; j++) {
            int NUM_ROWS_PER_MONTH = 2300;
            for (int i = 0; i < NUM_ROWS_PER_MONTH; i++) {
                GenericRecord rec = GenericRecord.create(schema);
                rec.setField("hotel_id", (long) (i * 2) + 10000);
                rec.setField("hotel_name", "hotel_name-" + i + 1000);
                rec.setField("customer_id", (long) (i * 2) + 20000);
                rec.setField("arrival_date",
                        LocalDate.of(2022, j, (i % 23) + 1)
                                .plus(1, ChronoUnit.DAYS));
                rec.setField("departure_date",
                        LocalDate.of(2022, j, (i % 23) + 5));
                rec.setField("value", (double) i * 4.13);
                records.add(rec);
            }
        }
        File parquetFile = new File(
                lakehouse + "/bookings/rome_hotels/arq_001.parquet");
        FileAppender<GenericRecord> appender = null;
        try {
            appender = Parquet.write(Files.localOutput(parquetFile))
                    .schema(table.schema())
                    .createWriterFunc(GenericParquetWriter::buildWriter)
                    .build();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
        try {
            appender.addAll(records);
        } finally {
            try {
                appender.close();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }
}
EN

回答 1

Stack Overflow用户

发布于 2022-10-22 21:01:03

我找到了如何修复Java程序。

只需将下面的行添加到main方法的末尾

代码语言:javascript
复制
PartitionKey partitionKey = new PartitionKey(table.spec(), table.schema());
DataFile dataFile = DataFiles.builder(table.spec())
        .withPartition(partitionKey)
        .withInputFile(localInput(parquetFile))
        .withMetrics(appender.metrics())
        .withFormat(FileFormat.PARQUET)
        .build();
 Transaction t = table.newTransaction();
 t.newAppend().appendFile(dataFile).commit();
 // commit all changes to the table
 t.commitTransaction();

还将下面的依赖项添加到POM文件中

代码语言:javascript
复制
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-mapreduce-client-core</artifactId>
    <version>3.3.4</version>
</dependency>

这避免了如下所示的运行时错误:

java.lang.ClassNotFoundException: org.apache.hadoop.mapreduce.lib.input.FileInputFormat

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/74166090

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档