首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Google DataFlow

Google DataFlow
EN

Data Science用户
提问于 2020-04-30 06:10:31
回答 1查看 318关注 0票数 0

我正试图通过中间的一个帖子构建一个Google数据流管道。

https://levelup.gitconnected.com/scaling-scikit-learn-with-apache-beam-251eb6fcf75b

但是,我似乎缺少了一个项目参数,它引发了以下错误。我很感激你能帮我渡过难关。

错误:

代码语言:javascript
复制
ERROR:apache_beam.runners.direct.executor:Giving up after 4 attempts.
WARNING:apache_beam.runners.direct.executor:A task failed with exception: Missing executing project information. Please use the --project command line option to specify it.

代码:

代码语言:javascript
复制
import apache_beam as beam
import argparse
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.io.gcp.bigquery import parse_table_schema_from_json
import json

query = """
    SELECT year, plurality, apgar_5min, 
    mother_age, father_age,    
       gestation_weeks, ever_born
       ,case when mother_married = true 
          then 1 else 0 end as mother_married
      ,weight_pounds as weight
      ,current_timestamp as time
      ,GENERATE_UUID() as guid
    FROM `bigquery-public-data.samples.natality` 
    limit 100    
"""

class ApplyDoFn(beam.DoFn):

    def __init__(self):
        self._model = None
        from google.cloud import storage
        import pandas as pd
        import pickle as pkl
        self._storage = storage
        self._pkl = pkl
        self._pd = pd

    def process(self, element):
        if self._model is None:
            bucket = self._storage.Client().get_bucket('dsp_model_store')
            blob = bucket.get_blob('natality/sklearn-linear')
            self._model = self._pkl.loads(blob.download_as_string())

        new_x = self._pd.DataFrame.from_dict(element, orient = "index").transpose().fillna(0)   
        weight = self._model.predict(new_x.iloc[:,1:8])[0]
        return [ { 'guid': element['guid'], 'weight': weight, 'time': str(element['time']) } ]

schema = parse_table_schema_from_json(json.dumps({'fields':
            [ { 'name': 'guid', 'type': 'STRING'},
              { 'name': 'weight', 'type': 'FLOAT64'},
              { 'name': 'time', 'type': 'STRING'} ]}))

class PublishDoFn(beam.DoFn):

    def __init__(self):
        from google.cloud import datastore       
        self._ds = datastore

    def process(self, element):
        client = self._ds.Client()
        key = client.key('natality-guid', element['guid'])
        entity = self._ds.Entity(key)
        entity['weight'] = element['weight']         
        entity['time'] = element['time']
        client.put(entity)

parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(None)
pipeline_options = PipelineOptions(pipeline_args)

# define the pipeline steps
p = beam.Pipeline(options=pipeline_options)
data = p | 'Read from BigQuery' >> beam.io.Read(
       beam.io.BigQuerySource(query=query, use_standard_sql=True))
scored = data | 'Apply Model' >> beam.ParDo(ApplyDoFn())
scored | 'Save to BigQuery' >> beam.io.Write(beam.io.BigQuerySink(
                'weight_preds', 'dsp_demo', schema = schema,
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))

scored | 'Create entities' >> beam.ParDo(PublishDoFn())

# run the pipeline
result = p.run()
result.wait_until_finish()
EN

回答 1

Data Science用户

回答已采纳

发布于 2020-04-30 06:28:41

此错误告诉您指定与要在其中运行Dataflow作业的项目关联的项目ID。

在Python中,可以通过以下方式设置此变量和其他与Google相关的变量:

代码语言:javascript
复制
gcloud_options = pipeline_options.view_as(GoogleCloudOptions)
gcloud_options.project = '<insert project ID here>'
(...)
p = beam.Pipeline(options=pipeline_options)

对于完整的代码段,您可以检查数据流文档

票数 0
EN
页面原文内容由Data Science提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://datascience.stackexchange.com/questions/73271

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档