Spark 1.6 / Java-7
带有新列的初始数据帧
# adding new column for the UDF computation:
df = df.withColumn("TEMP_COLUMN", lit(null));UDF函数创建新StructType并将其放入单元格的正确格式是什么?
public static DataFrame compute(SQLContext sqlContext, DataFrame df) {
sqlContext.udf().register("compute", new MyUdf(), new ArrayType(new StructType(), true));
return df.withColumn("TEMP_COLUMN", functions.callUDF("compute"));
}
class MyUdf implements UDF0<List<StructType>> {
@Override
public List<StructType> call() {
...
return ? // what must be returned here? List<StructType> or List<String> or anything else?
}
+-------------------------+
|TEMP_COLUMN |
+-------------------------+
|[A[1, 2, 3], B[4, 5, 6]] |
+-------------------------+我想要一个包含元素数组的结构,每个元素都有几个字段。
我不明白,使用new ArrayType(new StructType(), true)类型注册是否正确,对于UDF函数List<StructType>的返回类型是否正确。
如何返回数据?是不是像new StructType(new StructField[]{new StructField(...))?
发布于 2021-01-22 16:41:40
回答我自己的问题,因为我们很幸运地找到了如何做到这一点:
假设我们有一个“复杂”的结构来满足我们的需求:
MapType CLIENTS_INFO_DATA_TYPE = DataTypes.createMapType(
DataTypes.StringType,
DataTypes.createStructType(
new StructField[] {
DataTypes.createStructField("NAME_1", DataTypes.DoubleType, false),
DataTypes.createStructField("NAME_2", DataTypes.DoubleType, false),
DataTypes.createStructField("NAME_3", DataTypes.DoubleType, false)
),
true
);
StructType COMPLEX_DATA_TYPE = DataTypes.createStructType(new StructField[] {
DataTypes.createStructField("CLIENTS_INFO", CLIENTS_INFO_DATA_TYPE, true),
DataTypes.createStructField("COMMENT", DataTypes.StringType, true)
}这是一个模式:
dataFrame.printSchema()
|-- COMPLEX_DATA_TYPE: struct (nullable = true)
| |-- CLIENTS_INFO: map (nullable = true)
| | |-- key: string
| | |-- value: struct (valueContainsNull = true)
| | | |-- NAME_1: double (nullable = false)
| | | |-- NAME_2: double (nullable = false)
| | | |-- NAME_3: double (nullable = false)
| |-- COMMENT: string (nullable = true)接下来,我们必须注册使用我们的结构操作的UDF函数:
DataFrame compute(SQLContext sqlContext, DataFrame df) {
sqlContext.udf().register(
"computeUDF",
new MyUDF(),
COMPLEX_DATA_TYPE);
return df.withColumn("TEMP_FIELD_NAME", functions.callUDF("computeUDF", field_1.getColumn(), field_2.getColumn()));
}最后一步是返回Row对象(将转换为我们的结构)的UDF函数本身:
public final class MyUDF implements UDF2<Double, Double, Row> {
@Override
public Row call(Double value1, Double value2) {
Map<String, Row> clientsInfoMap = new HashMap<>();
...
for (Map.Entry<String, ClientInfo> clientInfoEntry : clientsInfo.entrySet()) {
final String client = clientInfoEntry.getKey();
final ClientInfo clientInfo = clientInfoEntry.getValue();
final Double[] clientInfoValues = {10.0, 20.0, 30.0};
Row clientInfoRow = new GenericRow(clientInfoValues);
clientsInfoMap.put(client, clientInfoRow);
}
Object[] fullClientsInfo = new Object[] {clientsInfoMap, "string-as-a-comment"};
return new GenericRow(fullClientsInfo);
}
}现在,由于它是一个结构,我们可以通过使用TEMP_FIELD_NAME.CLIENTS_INFO和其他任何名称来进行选择。
https://stackoverflow.com/questions/65704232
复制相似问题