现状
该管道的主要部分是使用地理数据从pub/sub读取有效载荷,然后对这些数据进行转换和分析,并在条件为真或假的情况下返回。
with beam.Pipeline(options=pipeline_options) as p:
raw_data = (p
| 'Read from PubSub' >> beam.io.ReadFromPubSub(
subscription='projects/XXX/subscriptions/YYY'))
geo_data = (raw_data
| 'Geo data transform' >> beam.Map(lambda s: GeoDataIngestion(s)))
def GeoDataIngestion(string_input):
<...>
return True or False理想情况1
如果GeoDataIngestion结果为真,则raw_data将存储在大查询中。
geo_data = (raw_data
| 'Geo data transform' >> beam.Map(lambda s: GeoDataIngestion(s))
| 'Evaluate condition' >> beam.Map(lambda s: Condition(s))
)
def Condition(condition):
if condition:
<...WriteToBigQuery...>
#The class I used before to store raw_data without depending on evaluate condition:
class WriteToBigQuery(beam.PTransform):
def expand(self, pcoll):
return (
pcoll
| 'Format' >> beam.ParDo(FormatBigQueryFn())
| 'Write to BigQuery' >> beam.io.WriteToBigQuery(
'XXX',
schema=TABLE_SCHEMA,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))理想情境2
与其将数据存储在BigQuery中,不如将数据发送到pub/sub
def Condition(condition):
if condition:
<...SendToPubSub(Topic1)...>
else:
<...SendToPubSub(Topic2)...>在这里,问题是根据条件结果设置主题,因为我无法在管道中传递像参数一样的主题
| beam.io.WriteStringsToPubSub(TOPIC)在函数/类中都没有
问题
我怎么能这么做?
如果评估条件的结果为真,我应该如何/在哪里调用WriteToBigQuery来存储PCollection raw_data?
发布于 2018-11-17 21:54:42
我认为基于评估条件结果的分支集合可能会对您的场景有所帮助。请参阅文档这里。
为了说明分支,假设下面有一个集合,您希望根据字符串的内容执行不同的操作。
'this line is for BigQuery',
'this line for pubsub topic1',
'this line for pubsub topic2'下面的代码将创建标记集合,您可以根据该标记获得三个不同的PCollections。然后,您可以决定要对单个集合执行哪些进一步的操作。
import apache_beam as beam
from apache_beam import pvalue
import sys
class Split(beam.DoFn):
# These tags will be used to tag the outputs of this DoFn.
OUTPUT_TAG_BQ = 'BigQuery'
OUTPUT_TAG_PS1 = 'pubsub topic1'
OUTPUT_TAG_PS2 = 'pubsub topic2'
def process(self, element):
"""
tags the input as it processes the orignal PCollection
"""
print element
if "BigQuery" in element:
yield pvalue.TaggedOutput(self.OUTPUT_TAG_BQ, element)
print 'found bq'
elif "pubsub topic1" in element:
yield pvalue.TaggedOutput(self.OUTPUT_TAG_PS1, element)
elif "pubsub topic2" in element:
yield pvalue.TaggedOutput(self.OUTPUT_TAG_PS2, element)
if __name__ == '__main__':
output_prefix = 'C:\\pythonVirtual\\Mycodes\\output'
p = beam.Pipeline(argv=sys.argv)
lines = (p
| beam.Create([
'this line is for BigQuery',
'this line for pubsub topic1',
'this line for pubsub topic2']))
# with_outputs allows accessing the explicitly tagged outputs of a DoFn.
tagged_lines_result = (lines
| beam.ParDo(Split()).with_outputs(
Split.OUTPUT_TAG_BQ,
Split.OUTPUT_TAG_PS1,
Split.OUTPUT_TAG_PS2))
# tagged_lines_result is an object of type DoOutputsTuple. It supports
# accessing result in alternative ways.
bq_records = tagged_lines_result[Split.OUTPUT_TAG_BQ]| "write BQ" >> beam.io.WriteToText(output_prefix + 'bq')
ps1_records = tagged_lines_result[Split.OUTPUT_TAG_PS1] | "write PS1" >> beam.io.WriteToText(output_prefix + 'ps1')
ps2_records = tagged_lines_result[Split.OUTPUT_TAG_PS2] | "write PS2" >> beam.io.WriteToText(output_prefix + 'ps2')
p.run().wait_until_finish()如果有帮助请告诉我。
https://stackoverflow.com/questions/53285726
复制相似问题