我有火花数据
df = spark.createDataFrame([('Andy', 'NY'), ('Bob', 'PA'), ('Cindy', 'DC')], ("FName","City "))在这里,我试图创建一个新的dataframe,其中包含加密的Fname列,下面的PGP加密函数将字符串作为输入,并将已封装的字符串作为输出。
df.createOrReplaceTempView("Customer")
for line in spark.table("Customer").collect():
gpg = gnupg.GPG()
gpg.import_keys('/home/keys/key.txt')
encry_str=gpg.encrypt(line.FName, 'recipientid', passphrase='passphrase', always_trust=True)
print(encry_str)加密的字符串如下所示:
-----BEGIN PGP MESSAGE-----
Version: GnuPG v2
hQEMAyBWi2alDMW+AQf5AQn4VdbVNEHMWKzXUFRUyW+m1vepxbN//ENHw0F3dzvd
NAldsgZzpCv5pPq4QLYFw4Sq0eWqOK6Ezg4VxvBIB1l5J4cGsx7kMX9sfCU8T7Be
xqF1ZtWoTOqHp/cwt5NQFi+D302kRfUjUooszOl1zHOp9uOP12WEa/eInoCGRza1
z+73TQ1/0lxieuVVsJu4CsQhEDG9atk+rD21sRsfsOEIAzgIaXyBekZJ0zaiLJCe
LSqum0HebBrl5VJ5yozoAlDAIt0+oXsG2JwqsWpoQFKkuQFsqYGJ61k1+nX/st7i
WWKUvWtjb1ABp3XhC+nT8LpZYCNGIkx0wxQCqcsjjNI/AVjRHvbZsrCfZpua+vdJ
Vv/i1ZKfq0r/FPKgspHdCtMx2/ZAEmVZ3paHM/RGuFm82ihQhXkT78Ik//EiZD5D
=mRs6
-----END PGP MESSAGE-----预期产出
+-----+-----+--------------------+
|FName|City | Encrypted|
+-----+-----+--------------------+
| Andy| NY|-----BEGIN PGP ME...|
| Bob| PA|-----BEGIN PGP ME...|
|Cindy| DC|-----BEGIN PGP ME...|
+-----+-----+--------------------+我正在尝试更新FName列,但是得到了异常
line.FName=gpg.encrypt(line.FName, 'recipientid', passphrase='passphrase', always_trust=True)例外:行是只读的。
在上面的框架中,我如何向相应的Dataframe列添加/更新加密的字符串值?
发布于 2018-11-19 03:31:13
您应该将和RDDs视为底层数据的引用/配方。因此,如果您确实希望更改数据,则需要首先进行转换,然后更新/覆盖现有数据。
转变:
from pyspark.sql import Row
def mapper(row):
# if row doesn't need updating, return original
if row['my_test_column'] != 'some_test_value':
return row
row = row.asDict()
row['updated_column'] = some_function(row['some_column'], ...)
return Row(**row)拯救:
df_updated.write.saveAsTable('my_schema.my_new_table')更新/覆盖:
df_updated.write.mode('overwrite').saveAsTable('my_schema.my_table')https://stackoverflow.com/questions/53367301
复制相似问题