首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在列创建后立即执行Pyspark UDF函数

在列创建后立即执行Pyspark UDF函数
EN

Stack Overflow用户
提问于 2019-10-15 11:40:04
回答 1查看 59关注 0票数 0

我正在尝试在创建列之后应用UDF函数。

但我有个问题:

Cannot resolve column name "previous_status" among

这意味着列不存在。

我可能会修改UDF函数,使其不再是UDF,而只是使用F.when & otherwise的普通函数。问题是,我需要一个全局的字典,你可以看到,以确定我是否已经看到了这个id。

代码语言:javascript
复制
alreadyAuthorized = {}

def previously_authorized_spark(id, failed, alreadyAuthorized = alreadyAuthorized):
    if id in alreadyAuthorized:
        previously_authorized = 1
    else:
        previously_authorized = 0

    if not failed:
        alreadyAuthorized[id] = True

    return previously_authorized

previously_authorized_udf = udf(lambda x, y : previously_authorized_spark(x, y), IntegerType())

def get_previous_status(data):
    partition = Window.partitionBy("id").orderBy("date")

    data = data.withColumn("previous_status", F.lag(F.col("failed")).over(partition))\
                .withColumn("previously_authorized", previously_authorized_udf(data["id"], data["previous_status"]))

data = get_previous_status(data)
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-10-15 12:00:04

尝试使用col函数获取列,因为正如@LaSul所指出的,在分配data之前使用data

代码语言:javascript
复制
from pyspark.sql.function import col

...
    data = data.withColumn("previous_status", F.lag(F.col("failed")).over(partition))\
                .withColumn("previously_authorized", previously_authorized_udf(col("id"), col("previous_status")))

...
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/58393834

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档