我正在使用ApacheBEAM2.5.0pythonSDK
附加代码片段,在一个管道中,我从公共主题解析它获取i/p,并希望对它执行一些操作,当我使用DataflowRunner运行它时,它运行得很好,但是看起来“数据处理fun1”、“数据处理fun2”“数据处理fun3”是连续运行的,我需要它并行运行。我对数据流很陌生。
有办法并行化吗?
def run():
parser = argparse.ArgumentParser()
args, pipeline_args = parser.parse_known_args()
options = PipelineOptions(pipeline_args)
with beam.Pipeline(options=options) as p:
data = (p | "Read Pubsub Messages" >>
beam.io.ReadFromPubSub(topic=config.pub_sub_topic)
| "Parse messages " >> beam.Map(parse_pub_sub_message_with_bq_data)
)
data | "data processing fun1 " >> beam.ParDo(Fun1())
data | "data processing fun2" >> beam.ParDo(Fun2())
data | "data processing fun3" >> beam.ParDo(Fun3())
if __name__ == '__main__':
run()发布于 2018-10-02 16:50:35
为什么需要同时运行这些函数?
Beam / Dataflow获取您的图形,并尝试优化可以在同一线程中运行的对象。这叫做融合优化,它在水槽爪哇纸中提到过。
关键是通常在同一个线程上一个一个地运行这些函数,而不是在多个处理线程或VM之间交换数据来并行处理。
如果函数必须或多或少并行运行,则可以在下游转换之前添加beam.Reshuffle转换:
data = (p
| beam.io.ReadFromPubSub(topic)
| beam.Map(parse_messages))
# After the data has been shuffled, it may be consumed by multiple workers
data | beam.Reshuffle() | beam.ParDo(Fun1())
data | beam.Reshuffle() | beam.ParDo(Fun2())
data | beam.Reshuffle() | beam.ParDo(Fun3())如果我能在这里面加上一些细节,请告诉我。
https://stackoverflow.com/questions/52608541
复制相似问题