首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >条件语句Python Apache Beam管道

条件语句Python Apache Beam管道
EN

Stack Overflow用户
提问于 2018-11-13 16:44:00
回答 1查看 3.4K关注 0票数 2

现状

该管道的主要部分是使用地理数据从pub/sub读取有效载荷,然后对这些数据进行转换和分析,并在条件为真或假的情况下返回。

代码语言:javascript
复制
 with beam.Pipeline(options=pipeline_options) as p:
        raw_data = (p
                    | 'Read from PubSub' >> beam.io.ReadFromPubSub(
                    subscription='projects/XXX/subscriptions/YYY'))

        geo_data = (raw_data
                    | 'Geo data transform' >> beam.Map(lambda s: GeoDataIngestion(s)))
                    
                    

def GeoDataIngestion(string_input):
    <...>
    return True or False

理想情况1

如果GeoDataIngestion结果为真,则raw_data将存储在大查询中。

代码语言:javascript
复制
geo_data = (raw_data
                | 'Geo data transform' >> beam.Map(lambda s: GeoDataIngestion(s))
                | 'Evaluate condition' >> beam.Map(lambda s: Condition(s))
                )

def Condition(condition):
    if condition:
        <...WriteToBigQuery...>


#The class I used before to store raw_data without depending on evaluate condition:

class WriteToBigQuery(beam.PTransform):
    def expand(self, pcoll):
        return (
                pcoll
                | 'Format' >> beam.ParDo(FormatBigQueryFn())
                | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
            'XXX',
            schema=TABLE_SCHEMA,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))

理想情境2

与其将数据存储在BigQuery中,不如将数据发送到pub/sub

代码语言:javascript
复制
def Condition(condition):
    if condition:
        <...SendToPubSub(Topic1)...>
    else:
        <...SendToPubSub(Topic2)...>

在这里,问题是根据条件结果设置主题,因为我无法在管道中传递像参数一样的主题

代码语言:javascript
复制
 | beam.io.WriteStringsToPubSub(TOPIC)

在函数/类中都没有

问题

我怎么能这么做?

如果评估条件的结果为真,我应该如何/在哪里调用WriteToBigQuery来存储PCollection raw_data?

EN

回答 1

Stack Overflow用户

发布于 2018-11-17 21:54:42

我认为基于评估条件结果的分支集合可能会对您的场景有所帮助。请参阅文档这里

为了说明分支,假设下面有一个集合,您希望根据字符串的内容执行不同的操作。

代码语言:javascript
复制
'this line is for BigQuery',
'this line for pubsub topic1',
'this line for pubsub topic2'

下面的代码将创建标记集合,您可以根据该标记获得三个不同的PCollections。然后,您可以决定要对单个集合执行哪些进一步的操作。

代码语言:javascript
复制
import apache_beam as beam
from apache_beam import pvalue
import sys

class Split(beam.DoFn):

    # These tags will be used to tag the outputs of this DoFn.
    OUTPUT_TAG_BQ = 'BigQuery'
    OUTPUT_TAG_PS1 = 'pubsub topic1'
    OUTPUT_TAG_PS2 = 'pubsub topic2'

    def process(self, element):
        """
        tags the input as it processes the orignal PCollection
        """
        print element
        if "BigQuery" in element:
            yield pvalue.TaggedOutput(self.OUTPUT_TAG_BQ, element)
            print 'found bq'
        elif "pubsub topic1" in element:
            yield pvalue.TaggedOutput(self.OUTPUT_TAG_PS1, element)
        elif "pubsub topic2" in element:
            yield pvalue.TaggedOutput(self.OUTPUT_TAG_PS2, element)


if __name__ == '__main__':
    output_prefix = 'C:\\pythonVirtual\\Mycodes\\output'
    p = beam.Pipeline(argv=sys.argv)
    lines = (p
            | beam.Create([
               'this line is for BigQuery',
               'this line for pubsub topic1',
               'this line for pubsub topic2']))

    # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
    tagged_lines_result = (lines
                          | beam.ParDo(Split()).with_outputs(
                              Split.OUTPUT_TAG_BQ,
                              Split.OUTPUT_TAG_PS1,
                              Split.OUTPUT_TAG_PS2))

    # tagged_lines_result is an object of type DoOutputsTuple. It supports
    # accessing result in alternative ways.
    bq_records = tagged_lines_result[Split.OUTPUT_TAG_BQ]| "write BQ" >> beam.io.WriteToText(output_prefix + 'bq')
    ps1_records = tagged_lines_result[Split.OUTPUT_TAG_PS1] | "write PS1" >> beam.io.WriteToText(output_prefix + 'ps1')
    ps2_records = tagged_lines_result[Split.OUTPUT_TAG_PS2] | "write PS2" >> beam.io.WriteToText(output_prefix + 'ps2')

    p.run().wait_until_finish()

如果有帮助请告诉我。

票数 8
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/53285726

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档