我正试图在数据流中启动一个简单的ETL过程,以从公共主题中获取消息并将其推入bigquery。当从pubSub读取消息时,我希望使用TINK模块对它们进行加密。但是,当我导入模块时,我在数据流作业中得到一个错误,如下所示:
ModuleNotFoundError: No module named 'tink'
import json
import apache_beam as beam
import time
from datetime import datetime, timedelta
from google.cloud import bigquery
client = bigquery.Client()
def format_message_element(message, timestamp=beam.DoFn.TimestampParam):
import tink
from tink import aead, cleartext_keyset_handle
import json
import time
import logging
import io
import base64
import os
from datetime import datetime, timedelta
date_time_obj = datetime.utcnow()
processing_timestamp = date_time_obj.strftime("%Y-%m-%d %H:%M:%S.%f")
try:
decoded_message = message.data.decode('UTF-8')
# Id = json.loads(decoded_message).get('id')
except AttributeError:
# Id = ''
decoded_message = ''
except json.JSONDecodeError:
# Id = ''
decoded_message = message.data.decode('UTF-8')
Id = str(message.attributes.get('entityId'))
# Setting up Cryptography library call
aead.register()
keyset_handle = tink.new_keyset_handle(aead.aead_key_templates.AES256_GCM)
aead_primitive = keyset_handle.primitive(aead.Aead)
# Encoding the message body into byte for TINK encryption
encoded_message_body = bytes(decoded_message, 'UTF-8')
encoded_globalId = bytes(Id, 'UTF-8')
ciphertext = aead_primitive.encrypt(encoded_message_body, encoded_Id)
out = io.BytesIO()
writer = tink.BinaryKeysetWriter(out)
cleartext_keyset_handle.write(writer, keyset_handle)
out.seek(0)
key = base64.b64encode(out.read())
data = {
'Id': Id,
'message_body': ciphertext,
'message_attributes': str(message.attributes),
'insert_timestamp': processing_timestamp
}
return data
def run():
o = beam.options.pipeline_options.PipelineOptions()
# Replace this by --stream execution param
standard_options = o.view_as(beam.options.pipeline_options.StandardOptions)
standard_options.streaming = True
p = beam.Pipeline(options=o)
query = """
SELECT
subscriptionName, destinationTable
FROM `myProject.myDataset.myTable`
"""
query_job = client.query(query)
results = query_job.result()
for row in results:
subs_name = row.subscriptionName
destination_table_name = row.destinationTable
subs_variable = subs_name.split('/')[-1]
Talent_records1 = (
p
| 'read from pubsub subscription:' + subs_variable >> beam.io.ReadFromPubSub(subscription=subs_name,
with_attributes=True)
)
Talent_record_format = (Talent_records1
| 'Format Talent API' + subs_variable + 'message' >> beam.Map(format_message_element))
Talent_record_format | 'send Talent API' + subs_variable + 'record to bigquery' >> beam.io.gcp.bigquery.WriteToBigQuery(
table=destination_table_name, write_disposition='WRITE_APPEND')
result = p.run()
result.wait_until_finish()
run()在数据流中运行模板时,我得到错误:format_message_element ModuleNotFoundError: No module named 'tink' [while running 'Format Talent APITest_DIP_Crypto-submessage-ptransform-57']
我尝试在format_message_element外部使用导入函数;在顶部得到以下错误format_message_element NameError: name 'aead' is not defined [while running 'Format Talent APITest_DIP_Crypto-submessage-ptransform-57']
发布于 2022-07-22 18:34:49
https://stackoverflow.com/questions/73084636
复制相似问题