我正试图在pyspark中创建一个UDF,以使一列的精度达到在每一行中由另一列指定的精度,例如,下面的数据文件:
+--------+--------+
| Data|Rounding|
+--------+--------+
|3.141592| 3|
|0.577215| 1|
+--------+--------+当提交给上述UDF时,应该给出以下结果:
+--------+--------+--------------+
| Data|Rounding|Rounded Column|
+--------+--------+--------------+
|3.141592| 3| 3.142|
|0.577215| 1| 0.6|
+--------+--------+--------------+特别是,我尝试了以下代码:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, FloatType, LongType,
IntegerType
pdDF = pd.DataFrame(columns=["Data", "Rounding"], data=[[3.141592, 3],
[0.577215, 1]])
mySchema = StructType([ StructField("Data", FloatType(), True),
StructField("Rounding", IntegerType(), True)])
spark = SparkSession.builder.master("local").appName("column
rounding").getOrCreate()
df = spark.createDataFrame(pdDF,schema=mySchema)
df.show()
def round_column(Data, Rounding):
return (lambda (Data, Rounding): round(Data, Rounding), FloatType())
spark.udf.register("column rounded to the precision specified by another",
round_column, FloatType())
df_rounded = df.withColumn('Rounded Column', round_column(df["Data"],
df["Rounding"]))
df_rounded .show()但我得到了以下错误:
Traceback (most recent call last):
File "whatever.py", line 21, in <module>
df_redondeado = df.withColumn('columna_redondeada',round_column(df["Data"], df["Rounding"]))
File "whomever\spark\spark-2.3.1-bin-hadoop2.7\python\pyspark\sql\dataframe.py", line 1848, in withColumn
assert isinstance(col, Column), "col should be Column"
AssertionError: col should be Column如能提供任何帮助,将不胜感激:)
发布于 2018-10-10 18:39:50
正如在另一个答案中提到的,您的udf无效。
您可以使用内联udf,如下所示:
udf_round_column = udf(lambda row: round(row['data'], row['rounding']), FloatType())
df_rounded = df.withColumn('rounded_col', udf_round_column(struct('data', 'rounding')))或作为一个单独的职能:
def round_column(data, rounding):
return round(data, rounding)
udf_round_column= udf(round_column, FloatType())
df_rounded = df.withColumn('rounded_col', udf_round_to_decimal('data', 'rounding'))两人都返回以下内容:
+---+---------+--------+-----------+
| id| data|rounding|rounded_col|
+---+---------+--------+-----------+
| 1|3.1415926| 3| 3.142|
| 2| 0.12345| 6| 0.12345|
| 3| 2.3456| 1| 2.3|
+---+---------+--------+-----------+发布于 2018-10-08 10:27:27
您的代码失败,因为round_column不是有效的udf。你应该
from pyspark.sql.functions import udf
@udf(FloatType())
def round_column(data, rounding):
return round(data, rounding)spark.udf.register用于注册从SQL查询调用的函数,因此在这里不适用。
然而,您根本不需要udf。只是:
from pyspark.sql.functions import expr
df_rounded = df.withColumn('Rounded Column', 'expr(round(Data, Rounding))')发布于 2018-10-11 08:47:28
如果您想将一个UDF应用于一个dataframe,那么只需导入它,如下所示
from pyspark.sql.functions import udf
然后把它当作
round_column_udf = udf(round_column, FloatType()) df_rounded = df.withColumn('Rounded_Column', round_column_udf(df['Data'], df['Rounding']))
注册udf与spark查询一起使用,如
spark.udf.register("round_column_udf",round_column, FloatType()) df.registerTempTable("df") spark.sql("select Data, Rounding,round_column_udf(Data, Rounding) as Rounded_Column from df").show()
两者都应该有效..。
https://stackoverflow.com/questions/52700047
复制相似问题