首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >引发PicklingError的AWS胶问题

引发PicklingError的AWS胶问题
EN

Stack Overflow用户
提问于 2022-07-14 23:48:51
回答 1查看 91关注 0票数 0

我遇到了AWS的一个问题,当我将Map.apply函数运行到DataFrame以解密给定的列值时,它会抛出一个错误。但是,我得到的错误是PicklingError: Could not serialize object: TypeError: can't pickle _ModuleWithDeprecations objects,但我不认为这是由代码本身造成的(因为它运行在我的本地机器上,具有相同的库版本),而是与Spark和Glue包脚本的方式有关,而与我通过导入加密库来处理解密的方式有关。我想知道在使用这段代码来完成我想要的任务方面是否有什么明显的缺陷,或者这仅仅是在Python中使用AWS Glue的限制,我需要切换到将Jar与已经预先打包好的代码和Scala集成起来来处理这个用例吗?

下面是一些稍微简化的代码:

代码语言:javascript
复制
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from base64 import b64decode, b64encode
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.padding import PKCS7
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes

KEY = b'our-secret-key-value'

def decrypt_pbe_with_hmac_sha512_aes_256(obj: str) -> str:
    # re-generate key from
    encrypted_obj = b64decode(obj)
    salt = encrypted_obj[0:16]
    iv = encrypted_obj[16:32]
    cypher_text = encrypted_obj[32:]
    kdf = PBKDF2HMAC(hashes.SHA512(), 32, salt, 1000, backend=default_backend())
    key = kdf.derive(KEY)

    # decrypt
    cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend())
    decryptor = cipher.decryptor()
    padded_text = decryptor.update(cypher_text) + decryptor.finalize()

    # remove padding
    unpadder = PKCS7(128).unpadder()
    clear_text = unpadder.update(padded_text) + unpadder.finalize()
    return clear_text.decode()


def decryptDescription(rec):
    rec["updated_description"] = decrypt_pbe_with_hmac_sha512_aes_256(rec["description"])
    del rec["description"]
    return rec


args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# Script generated for node
node1 = glueContext.create_dynamic_frame.from_catalog(...)

mapped_dyF = Map.apply(frame = node1, f = decryptDescription)

# Script generated for node ApplyMapping
ApplyMapping_node2 = ApplyMapping.apply(...)

# Script generated for node
node3 = glueContext.write_dynamic_frame.from_catalog(...)

job.commit()
EN

回答 1

Stack Overflow用户

发布于 2022-07-18 23:59:22

好的,在搜索了几天之后,我成功地构建了一个适用于上述场景的答案。由于问题是试图将解密模块传递给Map.apply函数,所以我将其移到自己的单独步骤中,从加密值创建字典到解密值。然后,我为该字典创建一个函数,查找替换,并将其Map.apply到动态框架。代码如下:

代码语言:javascript
复制
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from base64 import b64decode, b64encode
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.padding import PKCS7
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes

KEY = b'our-secret-key-value'
my_dict = {}

def decrypt_pbe_with_hmac_sha512_aes_256(obj: str) -> str:
    # re-generate key from
    encrypted_obj = b64decode(obj)
    salt = encrypted_obj[0:16]
    iv = encrypted_obj[16:32]
    cypher_text = encrypted_obj[32:]
    kdf = PBKDF2HMAC(hashes.SHA512(), 32, salt, 1000, backend=default_backend())
    key = kdf.derive(KEY)

    # decrypt
    cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend())
    decryptor = cipher.decryptor()
    padded_text = decryptor.update(cypher_text) + decryptor.finalize()

    # remove padding
    unpadder = PKCS7(128).unpadder()
    clear_text = unpadder.update(padded_text) + unpadder.finalize()
    return clear_text.decode()


def update_description(rec):
    rec["updated_description"] = my_dict[rec["description"]]
    del rec["description"]
    return rec


args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# Script generated for node
node1 = glueContext.create_dynamic_frame.from_catalog(...)

description_list = list(node1.toDF().select("description").toPandas()["description"])

for x in description_list:
    my_dict[x] = decrypt_pbe_with_hmac_sha512_aes_256(x) 

mapped_dyF = Map.apply(frame = node1, f = update_description)

# Script generated for node ApplyMapping
ApplyMapping_node2 = ApplyMapping.apply(...)

# Script generated for node
node3 = glueContext.write_dynamic_frame.from_catalog(...)

job.commit()
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72987729

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档