我有一个UDF,如下所示,它是一个普通的标量Pyspark:
@udf()
def redact(colVal: column, offset: int = 0):
if not colVal or not offset:
return 'X'*8
else:
charList=list(colVal)
charList[:-offset]='X'*(len(colVal)-offset)
return "".join(charList)当我试着把它转换成pandas_udf的时候,当我读到用向量化的UDF代替标量UDF的时候,我得到了很多与熊猫相关的问题,而我的经验却比较少。
请帮助我将此UDF转换为矢量化的Pandas UDF。
发布于 2021-12-23 19:44:34
redact函数可以包装在对pd.Series的每一项应用redact的函数中。
由于要传递标量
值,因此需要应用
offset运行。
from pyspark.sql import functions as F
import pandas as pd
def pandas_wrapper(values: pd.Series, offset: int) -> pd.Series:
def redact(colVal: str, offset: int = 0):
if not colVal or not offset:
return 'X'*8
else:
charList=list(colVal)
charList[:-offset]='X'*(len(colVal)-offset)
return "".join(charList)
return values.apply(lambda value: redact(value, offset))
def curried_wrapper(offset: int):
return F.pandas_udf(lambda x: pandas_wrapper(x, offset), "string")
df = spark.createDataFrame([("abcdef", ), ("12yz", ), (None,)], ("data_col", ))
df.withColumn("redacted", curried_wrapper(2)(F.col("data_col"))).show()输出
+--------+--------+
|data_col|redacted|
+--------+--------+
| abcdef| XXXXef|
| 12yz| XXyz|
| null|XXXXXXXX|
+--------+--------+https://stackoverflow.com/questions/69711698
复制相似问题