我有两个不同列数的拼板文件,并试图将它们与下面的代码片段合并
Dataset<Row> dataSetParquet1 = testSparkSession.read().option("mergeSchema",true).parquet("D:\\ABC\\abc.parquet");
Dataset<Row> dataSetParquet2 = testSparkSession.read().option("mergeSchema",true).parquet("D:\\EFG\\efg.parquet");
dataSetParquet1.unionByName(dataSetParquet2);
// dataSetParquet1.union(dataSetParquet2);对于unionByName(),我得到了错误:
Caused by: org.apache.spark.sql.AnalysisException: Cannot resolve column name对于union(),我得到了错误:
Caused by: org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the same number of columns, but the first table has 7 columns and the second table has 6 columns;;如何使用java中的spark合并这些文件?
更新:示例
数据集1:
epochMillis | one | two | three| four
--------------------------------------
1630670242000 | 1 | 2 | 3 | 4
1630670244000 | 1 | 2 | 3 | 4
1630670246000 | 1 | 2 | 3 | 4dataset2:
epochMillis | one | two | three|five
---------------------------------------
1630670242000 | 11 | 22 | 33 | 55
1630670244000 | 11 | 22 | 33 | 55
1630670248000 | 11 | 22 | 33 | 55合并后的最终数据集:
epochMillis | one | two | three|four |five
--------------------------------------------
1630670242000 | 11 | 22 | 33 |4 |55
1630670244000 | 11 | 22 | 33 |4 |55
1630670246000 | 1 | 2 | 3 |4 |null
1630670248000 | 11 | 22 | 33 |null |55如何获得合并两个数据集的结果
发布于 2021-09-03 19:02:13
要合并来自两种不同数据格式的两行,首先要加入这两个dataframes,然后根据合并的方式选择正确的列。
所以对于你的案子来说,这意味着:
below)
epochTime列上加入两个数据格式,使用full_outer连接,因为您希望将所有行保持在一个数据rows中,而不是在另一个
columnMerges合并的列(由epochTime实现)。
翻译成代码:
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Dataset<Row> dataframe1 = testSparkSession.read().parquet("D:\\ABC\\abc.parquet");
Dataset<Row> dataframe2 = testSparkSession.read().parquet("D:\\EFG\\efg.parquet");
dataframe1.join(dataframe2, dataframe1.col("epochTime").equalTo(dataframe2.col("epochTime")), "full_outer")
.select(Selector.columnMerges(dataframe2, dataframe1))
.orderBy("epochTime")注意:当我们读取parquets时,不需要使用mergeSchema选项,因为对于每个数据帧,我们只读取一个parquet文件,因此只读取一个模式。
对于合并函数Selector.columnMerges,对于每一行,我们要做的是:
如果列在两个数据中都存在,则在dataframe1
dataframe2
dataframe2中),在dataframe2
dataframe1中),在dataframe1中取值
因此,我们首先构建了一组dataframe1列、一组dataframe2列,以及来自这两种数据格式的列列表。然后,我们迭代这个列列表,为每个列应用以前的规则:
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import static org.apache.spark.sql.functions.when;
public class Selector {
public static Column[] columnMerges(Dataset<Row> main, Dataset<Row> second) {
List<Column> columns = new ArrayList<>();
Set<String> columnsFromMain = new HashSet<>(Arrays.asList(main.columns()));
Set<String> columnsFromSecond = new HashSet<>(Arrays.asList(second.columns()));
List<String> columnNames = new ArrayList<>(Arrays.asList(main.columns()));
for (String column: second.columns()) {
if (!columnsFromMain.contains(column)) {
columnNames.add(column);
}
}
for (String column : columnNames) {
if (columnsFromMain.contains(column) && columnsFromSecond.contains(column)) {
columns.add(when(main.col(column).isNull(), second.col(column)).otherwise(main.col(column)).as(column));
} else if (columnsFromMain.contains(column)) {
columns.add(main.col(column).as(column));
} else {
columns.add(second.col(column).as(column));
}
}
return columns.toArray(new Column[0]);
}
}发布于 2021-08-26 13:43:26
您可以使用mergeSchema选项,同时在parquet方法中添加要合并的所有拼图文件路径,如下所示:
Dataset<Row> finalDataset = testSparkSession.read()
.option("mergeSchema", true)
.parquet("D:\\ABC\\abc.parquet", "D:\\EFG\\efg.parquet");第一个数据集中而不是第二个数据集中的所有列都将在第二个数据集中使用null值进行设置。
https://stackoverflow.com/questions/68939377
复制相似问题