我在中有一个工作流管道,它从pub/sub中获取数据,在数据流中执行充实并将其传递给大查询。
在流窗口中,我希望确保消息不会被复制(因为pub/sub保证至少只有一次传递)。
所以,我想我应该使用与beam不同的方法,但是一旦我使用它,我的管道中断(不能再继续,任何本地打印也是不可见的)。
这是我的管道代码:
with beam.Pipeline(options=options) as p:
message = (p | "ReadFromPubSub" >> beam.io.ReadFromPubSub(topic=known_args.topic).
with_output_types(bytes))
bq_data = (message | "Decode" >> beam.FlatMap(lambda x: [x.decode('utf-8')])
| "Deduplication" >> beam.Distinct()
| "JSONLoad" >> beam.ParDo(ReadAsJSON())
| "Windowing" >> beam.WindowInto(window.FixedWindows(10, 0))
| "KeepRelevantData" >> beam.ParDo(KeepRelevantData())
| "PreProcessing" >> beam.ParDo(PreProcessing())
| "SendLimitedKeys" >> beam.ParDo(SendLimitedKeys(), schema=schema)
)
if not known_args.local:
bq_data | "WriteToBigQuery" >> beam.io.WriteToBigQuery(table=known_args.bq_table, schema=schema)
else:
bq_data | "Display" >> beam.ParDo(Display())正如您在反复制标签中看到的那样,我正在调用beam.Distinct方法。
问题:
如有任何解决方案或建议,将不胜感激。谢谢。
发布于 2019-07-25 21:05:42
您可能会发现精确-一次处理上的这个博客很有帮助。首先,Dataflow已经在基于发布/子记录id执行去重复操作。然而,正如博客所述:“然而,在某些情况下,这还不够,用户的发布过程可能会重新尝试发布”。
因此,如果将消息发布到Pub/Sub的系统可能多次发布相同的消息,那么您可能希望添加自己的确定性记录id。然后将检测到这些信息。这是我推荐的方法,而不是在您自己的管道中尝试去重复。
您可以通过使用withIdAttribute on PubSubIO.Read来实现这一点。示例。
关于为什么我相信不同的原因的原因的一些解释。分异试图在窗口中去复制数据。我相信您正在尝试删除全局窗口,因此您的管道必须缓冲和比较所有元素,因为这是一个无界的PCollection。它将尝试永远缓冲。
我相信,如果您首先执行窗口化,并且具有确定性事件时间戳(它看起来不像在使用withTimestampAttribute),这将正常工作。然后,Distinct只适用于窗口中的元素(具有相同时间戳的相同元素将放置在同一个窗口中)。您可能想知道这是否适用于原型开发,但我建议在可能的情况下添加唯一的记录id,并允许Dataflow基于记录id处理复制,以获得最佳性能。
https://stackoverflow.com/questions/57208405
复制相似问题