首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >ModuleNotFoundError:没有名为“tink”的模块

ModuleNotFoundError:没有名为“tink”的模块
EN

Stack Overflow用户
提问于 2022-07-22 18:27:33
回答 1查看 101关注 0票数 0

我正试图在数据流中启动一个简单的ETL过程,以从公共主题中获取消息并将其推入bigquery。当从pubSub读取消息时,我希望使用TINK模块对它们进行加密。但是,当我导入模块时,我在数据流作业中得到一个错误,如下所示:

ModuleNotFoundError: No module named 'tink'

代码语言:javascript
复制
import json
import apache_beam as beam
import time
from datetime import datetime, timedelta
from google.cloud import bigquery

client = bigquery.Client()


def format_message_element(message, timestamp=beam.DoFn.TimestampParam):
    import tink
    from tink import aead, cleartext_keyset_handle
    import json
    import time
    import logging
    import io
    import base64
    import os
    from datetime import datetime, timedelta
    date_time_obj = datetime.utcnow()
    processing_timestamp = date_time_obj.strftime("%Y-%m-%d %H:%M:%S.%f")
    try:
        decoded_message = message.data.decode('UTF-8')
        # Id = json.loads(decoded_message).get('id')
    except AttributeError:
        # Id = ''
        decoded_message = ''
    except json.JSONDecodeError:
        # Id = ''
        decoded_message = message.data.decode('UTF-8')
    Id = str(message.attributes.get('entityId'))

    # Setting up Cryptography library call
    aead.register()
    keyset_handle = tink.new_keyset_handle(aead.aead_key_templates.AES256_GCM)
    aead_primitive = keyset_handle.primitive(aead.Aead)
    # Encoding the message body into byte for TINK encryption
    encoded_message_body = bytes(decoded_message, 'UTF-8')
    encoded_globalId = bytes(Id, 'UTF-8')
    ciphertext = aead_primitive.encrypt(encoded_message_body, encoded_Id)
    out = io.BytesIO()
    writer = tink.BinaryKeysetWriter(out)
    cleartext_keyset_handle.write(writer, keyset_handle)
    out.seek(0)
    key = base64.b64encode(out.read())
    data = {
        'Id': Id,
        'message_body': ciphertext,
        'message_attributes': str(message.attributes),
        'insert_timestamp': processing_timestamp
    }

    return data

def run():
    o = beam.options.pipeline_options.PipelineOptions()
    # Replace this by --stream execution param
    standard_options = o.view_as(beam.options.pipeline_options.StandardOptions)
    standard_options.streaming = True
    p = beam.Pipeline(options=o)
query = """
       SELECT 
        subscriptionName, destinationTable
       FROM `myProject.myDataset.myTable`
   """
    query_job = client.query(query) 
    results = query_job.result()

    for row in results:
        subs_name = row.subscriptionName
        destination_table_name = row.destinationTable
        subs_variable = subs_name.split('/')[-1]
        Talent_records1 = (
                p
                | 'read from  pubsub subscription:' + subs_variable >> beam.io.ReadFromPubSub(subscription=subs_name,
                                                                                              with_attributes=True)
        )

        Talent_record_format = (Talent_records1
                                | 'Format Talent API' + subs_variable + 'message' >> beam.Map(format_message_element))

        Talent_record_format | 'send Talent API' + subs_variable + 'record to bigquery' >> beam.io.gcp.bigquery.WriteToBigQuery(
            table=destination_table_name, write_disposition='WRITE_APPEND')

    result = p.run()
    result.wait_until_finish()

 
run()

在数据流中运行模板时,我得到错误:format_message_element ModuleNotFoundError: No module named 'tink' [while running 'Format Talent APITest_DIP_Crypto-submessage-ptransform-57']

我尝试在format_message_element外部使用导入函数;在顶部得到以下错误format_message_element NameError: name 'aead' is not defined [while running 'Format Talent APITest_DIP_Crypto-submessage-ptransform-57']

EN

回答 1

Stack Overflow用户

发布于 2022-07-22 18:34:49

请尝试

代码语言:javascript
复制
pip list 
pip3 list 

在您的终端,并检查是否存在tink模块。如果没有,则使用https://pypi.org/project/tink/安装

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73084636

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档