首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Pyspark使用udf处理数组列并返回另一个数组

Pyspark使用udf处理数组列并返回另一个数组
EN

Stack Overflow用户
提问于 2019-02-07 01:37:56
回答 1查看 4.4K关注 0票数 0

使用udf处理数组列并返回另一个数组

下面是我的输入:

docID Shingles D1 23,25,39,59 D2 34,45,65

我想通过处理shingles数组列来生成一个名为hashes的新列:例如,我想提取min和max (这只是一个示例,以表明我想要一个固定长度的数组列,实际上我并不想找到min或max)

docID Shingles散列D1 23、25、39、59 D2 34、45、65

我创建了udf,如下所示:

代码语言:javascript
复制
def generate_minhash_signatures(shingles, coeffA, coeffB):
    signature = []
    minHashCode = nextPrime + 1
    maxHashCode = 0
    for shingleID in shingles:
        if shingleID < minHashCode:
            minHashCode = shingleID
        if shingleID > maxHashCode:
            maxHashCode = shingleID
    return [minHashCode, maxHashCode]

minhash_udf = udf(generate_minhash_signatures, ArrayType(IntegerType()))
df_with_minhash = df.withColumn('min_max_hash', minhash_udf("shingles", coeffA, coeffB))
df_with_minhash.show()

但它会给出以下错误:

代码语言:javascript
复制
TypeError: Invalid argument, not a string or column: [2856022824, 2966132496, 947839218, 1658426276, 1862779421, 3729685802, 1710806966, 2696513050, 3630333076, 2555745391] of type <class 'list'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.

实际的udf:

代码语言:javascript
复制
def generate_minhash_signatures(shingles, coeffA, coeffB, numHashes):
    signature = []
    for i in range(0, numHashes):
        minHashCode = nextPrime + 1
        for shingleID in shingles:
            hashCode = (coeffA[i] * shingleID + coeffB[i]) % nextPrime

            if hashCode < minHashCode:
                minHashCode = hashCode

        signature.append(minHashCode)
    return signature
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-02-07 01:44:57

您的udf要求所有这三个参数都是列。coeffAcoeffB很可能不仅仅是数值,您需要使用lit将它们转换为列对象

代码语言:javascript
复制
import pyspark.sql.functions as f
df.withColumn('min_max_hash', minhash_udf(f.col("shingles"), f.lit(coeffA), f.lit(coeffB)))

如果coeffAcoeffB是列表,请使用f.array创建文字,如下所示:

代码语言:javascript
复制
df.withColumn('min_max_hash', 
  minhash_udf(f.col("shingles"), 
  f.array(*map(f.lit, coeffA)),
  f.array(*map(f.lit, coeffB))
)

或者将列参数和非列参数分开,如下所示:

代码语言:javascript
复制
def generate_minhash_signatures(coeffA, coeffB, numHashes)
    def generate_minhash_signatures_inner(shingles):
        signature = []
        for i in range(0, numHashes):
            minHashCode = nextPrime + 1
            for shingleID in shingles:
                hashCode = (coeffA[i] * shingleID + coeffB[i]) % nextPrime

                if hashCode < minHashCode:
                    minHashCode = hashCode

            signature.append(minHashCode)
        return signature
    return f.udf(generate_minhash_signatures_inner, ArrayType(IntegerType()))

然后,您可以像这样调用函数:

代码语言:javascript
复制
df.withColumn('min_max_hash', generate_minhash_signatures(coeffA, coeffB, numHashes)("shingles"))
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/54559509

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档