我试图在PySpark中使用UDF函数创建一个列。
我尝试的代码如下所示:
# The function checks year and adds a multiplied value_column to the final column
def new_column(row, year):
if year == "2020":
return row * 0.856
elif year == "2019":
return row * 0.8566
else:
return row
final_udf = F.udf(lambda z: new_column(z), Double()) #How do I get - Double datatype here
res = res.withColumn("final_value", final_udf(F.col('value_column'), F.col('year')))如何用final_udf编写Double()?我看到,对于字符串,我们可以使用StringType()。但是,如何在"final_value“列中返回双重类型呢?
发布于 2022-10-24 07:49:38
输入:
from pyspark.sql import functions as F, types as T
res = spark.createDataFrame([(1.0, '2020',), (1.0, '2019',), (1.0, '2018',)], ['value_column', 'year'])udf在处理大数据时效率很低。
您应该首先尝试在本机星火:中进行此操作。
res = res.withColumn(
'final_value',
F.when(F.col('year') == "2020", F.col('value_column') * 0.856)
.when(F.col('year') == "2019", F.col('value_column') * 0.8566)
.otherwise(F.col('value_column'))
)
res.show()
# +------------+----+-----------+
# |value_column|year|final_value|
# +------------+----+-----------+
# | 1.0|2020| 0.856|
# | 1.0|2019| 0.8566|
# | 1.0|2018| 1.0|
# +------------+----+-----------+如果在原生星火中是不可能的,请转到pandas_udf
from pyspark.sql import functions as F, types as T
import pandas as pd
@F.pandas_udf(T.DoubleType())
def new_column(row: pd.Series, year: pd.Series) -> pd.Series:
if year == "2020":
return row * 0.856
elif year == "2019":
return row * 0.8566
else:
return row
res = res.withColumn("final_value", final_udf('value_column', 'year'))
res.show()
# +------------+----+-----------+
# |value_column|year|final_value|
# +------------+----+-----------+
# | 1.0|2020| 0.856|
# | 1.0|2019| 0.8566|
# | 1.0|2018| 1.0|
# +------------+----+-----------+作为最后的手段,你应该选择udf
@F.udf(T.DoubleType())
def new_column(row, year):
if year == "2020":
return row * 0.856
elif year == "2019":
return row * 0.8566
else:
return row
res = res.withColumn("final_value", new_column('value_column', 'year'))
res.show()
# +------------+----+-----------+
# |value_column|year|final_value|
# +------------+----+-----------+
# | 1.0|2020| 0.856|
# | 1.0|2019| 0.8566|
# | 1.0|2018| 1.0|
# +------------+----+-----------+发布于 2022-10-24 01:31:42
使用简单的字符串"double"或导入pypspark的DoubleType
# like this
final_udf = F.udf(lambda z: new_column(z), "double")
# or this
import pyspark.sql.types as T
final_udf = F.udf(lambda z: new_column(z), T.DoubleType())https://stackoverflow.com/questions/74174377
复制相似问题