首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >PySpark RDD:列数不匹配

PySpark RDD:列数不匹配
EN

Stack Overflow用户
提问于 2020-08-20 12:06:53
回答 1查看 931关注 0票数 0

我希望使用pyspark与其中一个列一起构建一个数据文件,该列是数据集的另外两个列的SipHash的结果。为此,我创建了一个在rdd.map()函数中调用的函数,如下所示:

代码语言:javascript
复制
import siphash
from pyspark.sql import Row
from pyspark.sql import SQLContext
from pyspark.sql.types import *

sqlContext = SQLContext( spark )

# Hashing function
def hash_two_columns( row ):
    # Transform row to a dict
    row_dict = row.asDict()
    # Concat col1 and col2
    concat_str = 'E'.join( [str(row_dict['col1']), str(row_dict['col2'])] )
    # Fill string with 0 to get 16 bytes (otherwise error is raised)
    sixteenBytes_str = concat_str.zfill(16)
    # Preserve concatenated value for testing (this can be removed later)
    row_dict["hashcols_str"] = sixteenBytes_str
    # Calculate siphash
    row_dict["hashcols_id"] = siphash.SipHash_2_4( sixteenBytes_str.encode('utf-8') ).hash()
    return Row( **row_dict )

# Create test dataframe
test_df = spark.createDataFrame([
         (1,"text1",58965,11111),
         (3,"text2",78652,888888),
         (4,"text3",78652,888888),              
    ], ("id","item","col1","col2"))

# Build the schema 
# Using this to avoid "ValueError: Some of types cannot be determined by the first 100 rows" when pyspark tries to deduct schema by itself
test_df_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("item", StringType(), True),
    StructField("col1", IntegerType(), True),
    StructField("col2", IntegerType(), True),
    StructField("hashcols_str", StringType(), True),
    StructField("hashcols_id", LongType(), True)
])

# Create the final Dataframe
final_test_df = sqlContext \
     .createDataFrame(
          test_df.rdd.map(hash_two_columns).collect(), 
          test_df_schema) \
     .toDF()

final_test_df.show(truncate=False)

虽然架构定义与最终的dataframe结构匹配,但运行此代码时会出现以下错误:

IllegalArgumentException:需求失败:列数不匹配。旧列名(6):id、item、col1、col2、hashcols_str、hashcols_id新列名(0):(java.lang.RuntimeException)

有谁知道如何正确地实现这一点吗?非常感谢您的支持。

EN

回答 1

Stack Overflow用户

发布于 2020-08-20 12:57:00

我找到了一个基于这个职位的解决方案

以下列方式更新该功能:

代码语言:javascript
复制
def hash_two_columns( col1, col2 ):
    # Concat col1 and col2
    concat_str = 'E'.join( [col1, col2] )
    # Fill string with 0 to get 16 bytes (otherwise error is raised)
    sixteenBytes_str = concat_str.zfill(16)
    # Calculate siphash
    hashcols_id = siphash.SipHash_2_4( sixteenBytes_str.encode('utf-8') ).hash()
    return hashcols_id

然后,使用UDF (用户定义函数)使用withColumn功能将新列添加到dataframe。

代码语言:javascript
复制
from pyspark.sql.functions import udf

example_udf = udf( hash_two_columns, LongType() )

test_df = test_df \
    .withColumn( "hashcols_id", example_udf( test_df.col1, test_df.col2 ) )

test_df.show()
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/63504730

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档