首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何更新python中火花数据的值?

如何更新python中火花数据的值?
EN

Stack Overflow用户
提问于 2018-11-19 01:49:42
回答 1查看 2.9K关注 0票数 1

我有火花数据

代码语言:javascript
复制
        df = spark.createDataFrame([('Andy', 'NY'), ('Bob', 'PA'), ('Cindy', 'DC')], ("FName","City "))

在这里,我试图创建一个新的dataframe,其中包含加密的Fname列,下面的PGP加密函数将字符串作为输入,并将已封装的字符串作为输出。

代码语言:javascript
复制
df.createOrReplaceTempView("Customer")

for line in spark.table("Customer").collect():
    gpg = gnupg.GPG()
    gpg.import_keys('/home/keys/key.txt')
    encry_str=gpg.encrypt(line.FName, 'recipientid', passphrase='passphrase', always_trust=True)
    print(encry_str)

加密的字符串如下所示:

代码语言:javascript
复制
-----BEGIN PGP MESSAGE-----
Version: GnuPG v2

hQEMAyBWi2alDMW+AQf5AQn4VdbVNEHMWKzXUFRUyW+m1vepxbN//ENHw0F3dzvd
NAldsgZzpCv5pPq4QLYFw4Sq0eWqOK6Ezg4VxvBIB1l5J4cGsx7kMX9sfCU8T7Be
xqF1ZtWoTOqHp/cwt5NQFi+D302kRfUjUooszOl1zHOp9uOP12WEa/eInoCGRza1
z+73TQ1/0lxieuVVsJu4CsQhEDG9atk+rD21sRsfsOEIAzgIaXyBekZJ0zaiLJCe
LSqum0HebBrl5VJ5yozoAlDAIt0+oXsG2JwqsWpoQFKkuQFsqYGJ61k1+nX/st7i
WWKUvWtjb1ABp3XhC+nT8LpZYCNGIkx0wxQCqcsjjNI/AVjRHvbZsrCfZpua+vdJ
Vv/i1ZKfq0r/FPKgspHdCtMx2/ZAEmVZ3paHM/RGuFm82ihQhXkT78Ik//EiZD5D
=mRs6
-----END PGP MESSAGE-----

预期产出

代码语言:javascript
复制
+-----+-----+--------------------+
|FName|City |           Encrypted|
+-----+-----+--------------------+
| Andy|   NY|-----BEGIN PGP ME...|
|  Bob|   PA|-----BEGIN PGP ME...|
|Cindy|   DC|-----BEGIN PGP ME...| 
+-----+-----+--------------------+

我正在尝试更新FName列,但是得到了异常

代码语言:javascript
复制
line.FName=gpg.encrypt(line.FName, 'recipientid', passphrase='passphrase', always_trust=True)

例外:行是只读的。

在上面的框架中,我如何向相应的Dataframe列添加/更新加密的字符串值?

EN

回答 1

Stack Overflow用户

发布于 2018-11-19 03:31:13

您应该将和RDDs视为底层数据的引用/配方。因此,如果您确实希望更改数据,则需要首先进行转换,然后更新/覆盖现有数据。

转变:

代码语言:javascript
复制
from pyspark.sql import Row

def mapper(row):
    # if row doesn't need updating, return original
    if row['my_test_column'] != 'some_test_value':
        return row

    row = row.asDict()
    row['updated_column'] = some_function(row['some_column'], ...)

    return Row(**row)

拯救:

代码语言:javascript
复制
df_updated.write.saveAsTable('my_schema.my_new_table')

更新/覆盖:

代码语言:javascript
复制
df_updated.write.mode('overwrite').saveAsTable('my_schema.my_table')
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/53367301

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档