首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Apache维表加载,有例子吗?

Apache维表加载,有例子吗?
EN

Stack Overflow用户
提问于 2017-08-05 02:15:23
回答 2查看 1.3K关注 0票数 1

我正在考虑将文件加载到一维表中。我的解决办法是:

  1. Beam.read文件
  2. 从DB中创建关于现有数据的侧输入。
  3. 在ParDo中:过滤已经在侧输入中的记录
  4. 将biquerySink转换为DB。

想问一下是否有人实现了这一点?你能给我举个例子吗?谢谢

代码语言:javascript
复制
can you give me some example about coGroupByKey. I understand that it may look like below : Sorry,I am newbie to Dataflow,and watching codes is the best way to me 

step 1: sourcedata = beam.ReadFromText(...)
step 2: existing_table = beam.pvalue.AsDict(p
                                    | beam.Read(beam.BigQuerySource(my_query)
                                    | beam.Map(format_rows)

I assume the structure of sourcedata and existing data is the same :<k,v>                       
step 3:  source_existing_Data=  {sourcedata,existing_table}
                                |'coGroupBy' >> beam.coGroupByKey()


step4:  new_Data = source_existing_Data | beam.filter(lamada (name,(existing,source)):source is NONE))

step 5:  bigQuerySink(new_Data)
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2017-08-22 03:07:12

代码语言:javascript
复制
For the row coming from the text file and row coming form BIGQUERY needed to be done with function :
    from GCPUtil import BuildTupleRowFn as BuildTupleRowFn
    from GCPUtil import BuildDictTupleRowFn as BuildDictTupleRowFn
and also the new data also after coGroupKey and Filter also need to convert since what get from coGroupKey is Tuple, so need to convert it from Dict or List.

Below is the detailed codes:

#####################################################################
#   Develop by Emma 2017/08/19
#####################################################################

    import argparse
    import logging
    from random import randrange

    import apache_beam as beam

    from apache_beam.io import WriteToText
    from apache_beam.pvalue import AsList
    from apache_beam.pvalue import AsSingleton
    from apache_beam.options.pipeline_options import PipelineOptions
    from apache_beam.options.pipeline_options import GoogleCloudOptions
    from apache_beam.options.pipeline_options import StandardOptions
    import sys
    sys.path.append("..")
    from GCPUtil import BuildTupleRowFn as BuildTupleRowFn
    from GCPUtil import BuildDictTupleRowFn as BuildDictTupleRowFn

    def configure_bigquery_write():
        return [
            ('CAND_ID', 'STRING'),
            ('CAND_NAME', 'STRING'),
        ]


    class BuildRowFn(beam.DoFn):
        def process(self, element):
            row = {}
            for entry in element:
                print('start')
                print(entry)
                # print(entry[0])
                # print(entry[1])
                print('end')
                row['CAND_ID'] = entry[0]
                row['CAND_NAME'] = entry[1]
                yield row



    def run(argv=None):
        """Run the workflow."""

        # schema = 'CAND_ID:STRING,CAND_NAME:STRING'
        schema = 'CAND_ID:STRING,CAND_NAME:STRING'
        parser = argparse.ArgumentParser()
        parser.add_argument('--input', default=r'd:/resource/test*')
        parser.add_argument('--output', default=r'd:/output/test/new_emma')

        # parser.add_argument('--project', default='chinarose_project')
        known_args, pipeline_args = parser.parse_known_args(argv)
        pipeline_options = PipelineOptions(pipeline_args)
        pipeline_options.view_as(StandardOptions).runner = 'DirectRunner'
        pipeline_options.view_as(GoogleCloudOptions).project = 'chinarose_project'
        # query = 'select store FROM [chinarose_project:emma_test.sales]'
        query = 'select CAND_ID ,CAND_NAME from emma_test.campaign'
        p = beam.Pipeline(options=pipeline_options)

        # get the length of the word and write them in the text file,noticed the UDF

        source_data = (p | beam.io.ReadFromText(known_args.input)
                       | beam.Map(lambda a: a.split(","))
                       | beam.ParDo(BuildTupleRowFn())
                       )
        # source_data | 'write' >> WriteToText(known_args.output)
        # source_data | WriteToText(known_args.output)


        print("connect to BQ")
        existing_data= (p | beam.io.Read(beam.io.BigQuerySource(query=query, project='chinarose_project'))
                          | beam.ParDo(BuildDictTupleRowFn())

                        )
        #existing_data | WriteToText(known_args.output)

        source_existing_data = ((source_data, existing_data)
                                | 'GoGroupBy' >> beam.CoGroupByKey())

        # source_existing_data |'write to text' >> WriteToText(known_args.output)
        new_data = (source_existing_data | beam.Filter(lambda (name, (source, existing)): len(existing) == 0)
                    | beam.Map(lambda (name, (source, existing)): [(name, s) for s in source])
                    | beam.ParDo(BuildRowFn())
                    | beam.io.Write(beam.io.BigQuerySink(table='campaign_emma_v2',  dataset='emma_test',project='chinarose_project',schema=schema))
                    )

        #new_data | 'write to text' >> WriteToText(known_args.output)


        p.run().wait_until_finish()

      if __name__ == '__main__':
        # logging.getLogger().setLevel(logging.INFO)
        print('begin')
        run()
        print('end')
票数 1
EN

Stack Overflow用户

发布于 2017-08-08 22:28:02

侧输入是一个很好的选择,但是考虑到如果DB表相当大,稍后您可能会发现CoGroupByKey是一个更好的选项。要在侧输入中实现这一点,您可以执行以下操作:

代码语言:javascript
复制
p = beam.Pipeline(..)
existing_table = beam.pvalue.AsDict(p
                                    | beam.Read(beam.io.BigQuerySource(my_query)
                                    | beam.Map(format_rows))

class FilterRowsDoFn(beam.DoFn):
  def process(self, elem, table_dict):
    k = elem[0]
    if k not in table_dict:
      yield elem

result = (p
          | beam.ReadFromText(...)
          | beam.ParDo(FilterRowsDoFn(), table_dict=existing_table))

然后你就可以把结果写到烧烤上了。但是,如果您的表已经包含了许多元素,那么您可能需要考虑使用CoGroupByKey

使用CoGroupByKey完成这一任务的代码应该如下所示:

代码语言:javascript
复制
sourcedata = (p 
              | beam.ReadFromText(...)
              | beam.Map(format_text))
existing_table = (p
                  | beam.Read(beam.io.BigQuerySource(my_query)
                  | beam.Map(format_rows))

source_existing_data = ((sourcedata, existing_table)
                        | 'coGroupBy' >> beam.coGroupByKey())

new_data = (source_existing_data 
            | beam.Filter(lamada (name, (source, existing)): not list(source))
            | beam.FlatMap(lambda (name, (source, existing)): [(name, s) for s in source]))

result = new_data | bigQuerySink(new_Data)

如果您在使用任何一个代码片段时有任何困难,请告诉我,我会修复它们。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/45517597

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档