我正在尝试在创建列之后应用UDF函数。
但我有个问题:
Cannot resolve column name "previous_status" among
这意味着列不存在。
我可能会修改UDF函数,使其不再是UDF,而只是使用F.when & otherwise的普通函数。问题是,我需要一个全局的字典,你可以看到,以确定我是否已经看到了这个id。
alreadyAuthorized = {}
def previously_authorized_spark(id, failed, alreadyAuthorized = alreadyAuthorized):
if id in alreadyAuthorized:
previously_authorized = 1
else:
previously_authorized = 0
if not failed:
alreadyAuthorized[id] = True
return previously_authorized
previously_authorized_udf = udf(lambda x, y : previously_authorized_spark(x, y), IntegerType())
def get_previous_status(data):
partition = Window.partitionBy("id").orderBy("date")
data = data.withColumn("previous_status", F.lag(F.col("failed")).over(partition))\
.withColumn("previously_authorized", previously_authorized_udf(data["id"], data["previous_status"]))
data = get_previous_status(data)发布于 2019-10-15 12:00:04
尝试使用col函数获取列,因为正如@LaSul所指出的,在分配data之前使用data:
from pyspark.sql.function import col
...
data = data.withColumn("previous_status", F.lag(F.col("failed")).over(partition))\
.withColumn("previously_authorized", previously_authorized_udf(col("id"), col("previous_status")))
...https://stackoverflow.com/questions/58393834
复制相似问题