首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Google数据流和云函数错误- ModuleNotFoundError

Google数据流和云函数错误- ModuleNotFoundError
EN

Stack Overflow用户
提问于 2021-02-04 01:59:19
回答 1查看 520关注 0票数 0

我正在从GCP中的云函数触发数据流作业。

嵌入云函数的代码

代码语言:javascript
复制
import apache_beam as beam
import argparse

PROJECT = 'projectName'
BUCKET='bucketName'
SCHEMA = 'sr:INTEGER,abv:FLOAT,id:INTEGER,name:STRING,style:STRING,ounces:FLOAT'
DATAFLOW_JOB_NAME = 'jobName'

def discard_incomplete(data):
    """Filters out records that don't have an information."""
    return len(data['abv']) > 0 and len(data['id']) > 0 and len(data['name']) > 0 and len(data['style']) > 0


def convert_types(data):
    """Converts string values to their appropriate type."""
    data['abv'] = float(data['abv']) if 'abv' in data else None
    data['id'] = int(data['id']) if 'id' in data else None
    data['name'] = str(data['name']) if 'name' in data else None
    data['style'] = str(data['style']) if 'style' in data else None
    data['ounces'] = float(data['ounces']) if 'ounces' in data else None
    return data

def del_unwanted_cols(data):
    """Delete the unwanted columns"""
    del data['ibu']
    del data['brewery_id']
    return data

def execute(event, context):
    argv = [
      '--project={0}'.format(PROJECT),
      '--job_name={0}'.format(DATAFLOW_JOB_NAME),
      '--staging_location=gs://{0}/staging/'.format(BUCKET),
      '--temp_location=gs://{0}/staging/'.format(BUCKET),
      '--region=us-central1',
      '--runner=DataflowRunner'
   ]

    p = beam.Pipeline(argv=argv)
    input = 'gs://{0}/beers.csv'.format(BUCKET)

    (p | 'ReadData' >> beam.io.ReadFromText(input, skip_header_lines =1)
       | 'SplitData' >> beam.Map(lambda x: x.split(','))
       | 'FormatToDict' >> beam.Map(lambda x: {"sr": x[0], "abv": x[1], "ibu": x[2], "id": x[3], "name": x[4], "style": x[5], "brewery_id": x[6], "ounces": x[7]}) 
       | 'DeleteIncompleteData' >> beam.Filter(discard_incomplete)
       | 'ChangeDataType' >> beam.Map(convert_types)
       | 'DeleteUnwantedData' >> beam.Map(del_unwanted_cols)
       | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
           '{0}:sandeep_beer_test.beer_data'.format(PROJECT),
           schema=SCHEMA,
           write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
    p.run()

当云函数被执行时,数据流作业确实会被触发,但是作业总是失败。当我检查作业日志时,我看到以下错误消息- ModuleNotFoundError: No module named 'google.cloud.functions'

requirements.txt

apache-beam[gcp]

如果我在安装apache之后直接从云shell运行,云函数中嵌入的python代码工作得很好。

请分享您的输入,如何通过数据流错误的缺失模块。

谢谢

桑迪普

EN

回答 1

Stack Overflow用户

发布于 2021-02-04 05:51:52

这很可能是因为您要通过--save_main_session

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/66038332

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档