这是我的数据集:
Dataset<Row> myResult = pot.select(col("number")
, col("document")
, explode(col("mask")).as("mask"));现在我需要从现有的myResult创建一个新的数据集。如下所示:
Dataset<Row> myResultNew = myResult.select(col("number")
, col("name")
, col("age")
, col("class")
, col("mask");名称、年龄和类是从Dataset myResult的列文档中创建的。我想我可以调用列文档上的函数,然后对它执行任何操作。
myResult.select(extract(col("document")));
private String extract(final Column document) {
//TODO ADD A NEW COLUMN nam, age, class TO THE NEW DATASET.
// PARSE DOCUMENT AND GET THEM.
XMLParser doc= (XMLParser) document // this doesnt work???????
} 我的问题是:文档是类型列,我需要将它转换为不同的对象类型,并解析它以提取名称、年龄和类。我怎么能这么做。文档是一个xml,我需要进行解析以获得其他3列,因此无法避免将其转换为XML。
发布于 2020-05-23 20:23:52
将extract方法转换为UDF将是一个尽可能接近您所要求的解决方案。UDF可以接受一个或多个列的值,并使用此输入执行任何逻辑。
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.udf;
[...]
UserDefinedFunction extract = udf(
(String document) -> {
List<String> result = new ArrayList<>();
XMLParser doc = XMLParser.parse(document);
String name = ... //read name from xml document
String age = ... //read age from xml document
String clazz = ... //read class from xml document
result.add(name);
result.add(age);
result.add(clazz);
return result;
}, DataTypes.createArrayType(DataTypes.StringType)
);UDF的一个限制是它们只能返回一个列。因此,函数返回一个字符串数组,随后必须解压缩。
Dataset<Row> myResultNew = myResult
.withColumn("extract", extract.apply(col("document"))) //1
.withColumn("name", col("extract").getItem(0)) //2
.withColumn("age", col("extract").getItem(1)) //2
.withColumn("class", col("extract").getItem(2)) //2
.drop("document", "extract"); //3apply函数的参数。注意: udf在dataset中每一行执行一次。如果创建xml解析器的开销很大,这可能会减慢火花作业的执行速度,因为每个行都会实例化一个解析器。由于Spark的并行特性,无法为下一行重用解析器。如果这是一个问题,另一个选项(至少在Java世界中稍微复杂一些)是使用mapPartitions。在这里,每个数据集的每个分区不需要一个解析器,而只需要一个解析器。
另一种完全不同的方法是使用星星之火。
https://stackoverflow.com/questions/61755477
复制相似问题