我遇到了AWS的一个问题,当我将Map.apply函数运行到DataFrame以解密给定的列值时,它会抛出一个错误。但是,我得到的错误是PicklingError: Could not serialize object: TypeError: can't pickle _ModuleWithDeprecations objects,但我不认为这是由代码本身造成的(因为它运行在我的本地机器上,具有相同的库版本),而是与Spark和Glue包脚本的方式有关,而与我通过导入加密库来处理解密的方式有关。我想知道在使用这段代码来完成我想要的任务方面是否有什么明显的缺陷,或者这仅仅是在Python中使用AWS Glue的限制,我需要切换到将Jar与已经预先打包好的代码和Scala集成起来来处理这个用例吗?
下面是一些稍微简化的代码:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from base64 import b64decode, b64encode
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.padding import PKCS7
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
KEY = b'our-secret-key-value'
def decrypt_pbe_with_hmac_sha512_aes_256(obj: str) -> str:
# re-generate key from
encrypted_obj = b64decode(obj)
salt = encrypted_obj[0:16]
iv = encrypted_obj[16:32]
cypher_text = encrypted_obj[32:]
kdf = PBKDF2HMAC(hashes.SHA512(), 32, salt, 1000, backend=default_backend())
key = kdf.derive(KEY)
# decrypt
cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend())
decryptor = cipher.decryptor()
padded_text = decryptor.update(cypher_text) + decryptor.finalize()
# remove padding
unpadder = PKCS7(128).unpadder()
clear_text = unpadder.update(padded_text) + unpadder.finalize()
return clear_text.decode()
def decryptDescription(rec):
rec["updated_description"] = decrypt_pbe_with_hmac_sha512_aes_256(rec["description"])
del rec["description"]
return rec
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
# Script generated for node
node1 = glueContext.create_dynamic_frame.from_catalog(...)
mapped_dyF = Map.apply(frame = node1, f = decryptDescription)
# Script generated for node ApplyMapping
ApplyMapping_node2 = ApplyMapping.apply(...)
# Script generated for node
node3 = glueContext.write_dynamic_frame.from_catalog(...)
job.commit()发布于 2022-07-18 23:59:22
好的,在搜索了几天之后,我成功地构建了一个适用于上述场景的答案。由于问题是试图将解密模块传递给Map.apply函数,所以我将其移到自己的单独步骤中,从加密值创建字典到解密值。然后,我为该字典创建一个函数,查找替换,并将其Map.apply到动态框架。代码如下:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from base64 import b64decode, b64encode
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.padding import PKCS7
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
KEY = b'our-secret-key-value'
my_dict = {}
def decrypt_pbe_with_hmac_sha512_aes_256(obj: str) -> str:
# re-generate key from
encrypted_obj = b64decode(obj)
salt = encrypted_obj[0:16]
iv = encrypted_obj[16:32]
cypher_text = encrypted_obj[32:]
kdf = PBKDF2HMAC(hashes.SHA512(), 32, salt, 1000, backend=default_backend())
key = kdf.derive(KEY)
# decrypt
cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend())
decryptor = cipher.decryptor()
padded_text = decryptor.update(cypher_text) + decryptor.finalize()
# remove padding
unpadder = PKCS7(128).unpadder()
clear_text = unpadder.update(padded_text) + unpadder.finalize()
return clear_text.decode()
def update_description(rec):
rec["updated_description"] = my_dict[rec["description"]]
del rec["description"]
return rec
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
# Script generated for node
node1 = glueContext.create_dynamic_frame.from_catalog(...)
description_list = list(node1.toDF().select("description").toPandas()["description"])
for x in description_list:
my_dict[x] = decrypt_pbe_with_hmac_sha512_aes_256(x)
mapped_dyF = Map.apply(frame = node1, f = update_description)
# Script generated for node ApplyMapping
ApplyMapping_node2 = ApplyMapping.apply(...)
# Script generated for node
node3 = glueContext.write_dynamic_frame.from_catalog(...)
job.commit()https://stackoverflow.com/questions/72987729
复制相似问题