首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何合并火花(java)中具有不同模式的两个拼花文件

如何合并火花(java)中具有不同模式的两个拼花文件
EN

Stack Overflow用户
提问于 2021-08-26 13:24:27
回答 2查看 2.5K关注 0票数 2

我有两个不同列数的拼板文件,并试图将它们与下面的代码片段合并

代码语言:javascript
复制
Dataset<Row> dataSetParquet1 = testSparkSession.read().option("mergeSchema",true).parquet("D:\\ABC\\abc.parquet");
              
Dataset<Row> dataSetParquet2 = testSparkSession.read().option("mergeSchema",true).parquet("D:\\EFG\\efg.parquet");                    
  
dataSetParquet1.unionByName(dataSetParquet2);
// dataSetParquet1.union(dataSetParquet2);

对于unionByName(),我得到了错误:

代码语言:javascript
复制
Caused by: org.apache.spark.sql.AnalysisException: Cannot resolve column name

对于union(),我得到了错误:

代码语言:javascript
复制
Caused by: org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the same number of columns, but the first table has 7 columns and the second table has 6 columns;;

如何使用java中的spark合并这些文件?

更新:示例

数据集1:

代码语言:javascript
复制
epochMillis   | one | two | three| four
--------------------------------------
1630670242000 | 1   | 2   | 3    | 4
1630670244000 | 1   | 2   | 3    | 4
1630670246000 | 1   | 2   | 3    | 4

dataset2:

代码语言:javascript
复制
epochMillis   | one | two | three|five
---------------------------------------
1630670242000 | 11  | 22  | 33   | 55
1630670244000 | 11  | 22  | 33   | 55
1630670248000 | 11  | 22  | 33   | 55

合并后的最终数据集:

代码语言:javascript
复制
epochMillis   | one | two | three|four |five
--------------------------------------------
1630670242000 | 11  | 22  | 33   |4    |55
1630670244000 | 11  | 22  | 33   |4    |55
1630670246000 | 1   | 2   | 3    |4    |null
1630670248000 | 11  | 22  | 33   |null |55

如何获得合并两个数据集的结果

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2021-09-03 19:02:13

要合并来自两种不同数据格式的两行,首先要加入这两个dataframes,然后根据合并的方式选择正确的列。

所以对于你的案子来说,这意味着:

below)

  • Optional

  • 分别从它们的parquet location

  • 中读取两个数据格式,在epochTime列上加入两个数据格式,使用full_outer连接,因为您希望将所有行保持在一个数据rows中,而不是在另一个

  • 中,以重复两个数据格式的所有列,选择使用函数columnMerges合并的列(由epochTime

实现)。

翻译成代码:

代码语言:javascript
复制
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> dataframe1 = testSparkSession.read().parquet("D:\\ABC\\abc.parquet");
Dataset<Row> dataframe2 = testSparkSession.read().parquet("D:\\EFG\\efg.parquet");

dataframe1.join(dataframe2, dataframe1.col("epochTime").equalTo(dataframe2.col("epochTime")), "full_outer")
  .select(Selector.columnMerges(dataframe2, dataframe1))
  .orderBy("epochTime")

注意:当我们读取parquets时,不需要使用mergeSchema选项,因为对于每个数据帧,我们只读取一个parquet文件,因此只读取一个模式。

对于合并函数Selector.columnMerges,对于每一行,我们要做的是:

如果列在两个数据中都存在,则在dataframe1

  • if中取值(如果不为空),否则在dataframe2

  • if中只取值(在dataframe2中),在dataframe2

  • if中取值(在dataframe1中),在dataframe1

中取值

因此,我们首先构建了一组dataframe1列、一组dataframe2列,以及来自这两种数据格式的列列表。然后,我们迭代这个列列表,为每个列应用以前的规则:

代码语言:javascript
复制
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

import static org.apache.spark.sql.functions.when;

public class Selector {

  public static Column[] columnMerges(Dataset<Row> main, Dataset<Row> second) {
    List<Column> columns = new ArrayList<>();

    Set<String> columnsFromMain = new HashSet<>(Arrays.asList(main.columns()));
    Set<String> columnsFromSecond = new HashSet<>(Arrays.asList(second.columns()));

    List<String> columnNames = new ArrayList<>(Arrays.asList(main.columns()));
    for (String column: second.columns()) {
      if (!columnsFromMain.contains(column)) {
        columnNames.add(column);
      }
    }

    for (String column : columnNames) {
      if (columnsFromMain.contains(column) && columnsFromSecond.contains(column)) {
        columns.add(when(main.col(column).isNull(), second.col(column)).otherwise(main.col(column)).as(column));
      } else if (columnsFromMain.contains(column)) {
        columns.add(main.col(column).as(column));
      } else {
        columns.add(second.col(column).as(column));
      }
    }

    return columns.toArray(new Column[0]);
  }
}
票数 1
EN

Stack Overflow用户

发布于 2021-08-26 13:43:26

您可以使用mergeSchema选项,同时在parquet方法中添加要合并的所有拼图文件路径,如下所示:

代码语言:javascript
复制
Dataset<Row> finalDataset = testSparkSession.read()
  .option("mergeSchema", true)
  .parquet("D:\\ABC\\abc.parquet", "D:\\EFG\\efg.parquet");

第一个数据集中而不是第二个数据集中的所有列都将在第二个数据集中使用null值进行设置。

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68939377

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档