在我的项目中,我希望使用Google Dataflow中的流水线来处理发布/订阅消息。在清理输入数据时,我也希望有一个来自BigQuery的侧输入。这就出现了一个问题,它将导致两个输入中的一个无法工作。
我在我的管道选项中设置了streaming=True,它允许正确处理发布/订阅输入。但是BigQuery与流水线不兼容(参见下面的链接):
我收到了这个错误:"ValueError: Cloud发布/订阅当前仅可用于流式管道。“基于这些限制,这是可以理解的。
但我只是希望使用BigQuery作为侧输入,以便将数据映射到传入的发布/订阅数据流。它在本地运行得很好,但是一旦我尝试在数据流上运行它,它就会返回错误。
有没有人找到解决这个问题的好办法?
编辑:在下面添加我的流水线的框架以供参考:
# Set all options needed to properly run the pipeline
options = PipelineOptions(streaming=True,
runner='DataflowRunner',
project=project_id)
p = beam.Pipeline(options = options)
n_tbl_src = (p
| 'Nickname Table Read' >> beam.io.Read(beam.io.BigQuerySource(
table = nickname_spec
)))
# This is the main Dataflow pipeline. This will clean the incoming dataset for importing into BQ.
clean_vote = (p
| beam.io.gcp.pubsub.ReadFromPubSub(topic = None,
subscription = 'projects/{0}/subscriptions/{1}'
.format(project_id, subscription_name),
with_attributes = True)
| 'Isolate Attributes' >> beam.ParDo(IsolateAttrFn())
| 'Fix Value Types' >> beam.ParDo(FixTypesFn())
| 'Scrub First Name' >> beam.ParDo(ScrubFnameFn())
| 'Fix Nicknames' >> beam.ParDo(FixNicknameFn(), n_tbl=AsList(n_tbl_src))
| 'Scrub Last Name' >> beam.ParDo(ScrubLnameFn()))
# The final dictionary will then be written to BigQuery for storage
(clean_vote | 'Write to BQ' >> beam.io.WriteToBigQuery(
table = bq_spec,
write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER
))
# Run the pipeline
p.run()发布于 2019-01-11 21:14:01
@Pablo上面的评论是正确的答案。对于任何经历过同样情况的人来说,下面是我的脚本中有效的更改。
# This opens the Beam pipeline to run Dataflow
p = beam.Pipeline(options = options)
logging.info('Created Dataflow pipeline.')
# This will pull in all of the recorded nicknames to compare to the incoming PubSubMessages.
client = bigquery.Client()
query_job = client.query("""
select * from `{0}.{1}.{2}`""".format(project_id, dataset_id, nickname_table_id))
nickname_tbl = query_job.result()
nickname_tbl = [dict(row.items()) for row in nickname_tbl]
# This is the main Dataflow pipeline. This will clean the incoming dataset for importing into BQ.
clean_vote = (p
| beam.io.gcp.pubsub.ReadFromPubSub(topic = None,
subscription = 'projects/{0}/subscriptions/{1}'
.format(project_id, subscription_name),
with_attributes = True)
| 'Isolate Attributes' >> beam.ParDo(IsolateAttrFn())
| 'Fix Value Types' >> beam.ParDo(FixTypesFn())
| 'Scrub First Name' >> beam.ParDo(ScrubFnameFn())
| 'Fix Nicknames' >> beam.ParDo(FixNicknameFn(), n_tbl=nickname_tbl)
| 'Scrub Last Name' >> beam.ParDo(ScrubLnameFn()))
# The final dictionary will then be written to BigQuery for storage
(clean_vote | 'Write to BQ' >> beam.io.WriteToBigQuery(
table = bq_spec,
write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER
))
# Run the pipeline
p.run()https://stackoverflow.com/questions/54130474
复制相似问题