首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何将去复制添加到流管道[apache-beam]

如何将去复制添加到流管道[apache-beam]
EN

Stack Overflow用户
提问于 2019-07-25 18:45:18
回答 1查看 3.1K关注 0票数 1

我在中有一个工作流管道,它从pub/sub中获取数据,在数据流中执行充实并将其传递给大查询。

在流窗口中,我希望确保消息不会被复制(因为pub/sub保证至少只有一次传递)。

所以,我想我应该使用与beam不同的方法,但是一旦我使用它,我的管道中断(不能再继续,任何本地打印也是不可见的)。

这是我的管道代码:

代码语言:javascript
复制
    with beam.Pipeline(options=options) as p:
        message = (p | "ReadFromPubSub" >> beam.io.ReadFromPubSub(topic=known_args.topic).
                   with_output_types(bytes))

        bq_data = (message | "Decode" >> beam.FlatMap(lambda x: [x.decode('utf-8')])
                           | "Deduplication" >> beam.Distinct()
                           | "JSONLoad" >> beam.ParDo(ReadAsJSON())
                           | "Windowing" >> beam.WindowInto(window.FixedWindows(10, 0))
                           | "KeepRelevantData" >> beam.ParDo(KeepRelevantData())
                           | "PreProcessing" >> beam.ParDo(PreProcessing())
                           | "SendLimitedKeys" >> beam.ParDo(SendLimitedKeys(), schema=schema)
                   )

        if not known_args.local:
            bq_data | "WriteToBigQuery" >> beam.io.WriteToBigQuery(table=known_args.bq_table, schema=schema)
        else:
            bq_data | "Display" >> beam.ParDo(Display())

正如您在反复制标签中看到的那样,我正在调用beam.Distinct方法。

问题:

  1. 去重复应该在哪里发生在管道中?
  2. 这是一种正确/理智的做法吗?
  3. 否则,我怎样才能去复制流缓冲区数据?
  4. 是否需要去复制,还是我只是在浪费时间?

如有任何解决方案或建议,将不胜感激。谢谢。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-07-25 21:05:42

您可能会发现精确-一次处理上的这个博客很有帮助。首先,Dataflow已经在基于发布/子记录id执行去重复操作。然而,正如博客所述:“然而,在某些情况下,这还不够,用户的发布过程可能会重新尝试发布”。

因此,如果将消息发布到Pub/Sub的系统可能多次发布相同的消息,那么您可能希望添加自己的确定性记录id。然后将检测到这些信息。这是我推荐的方法,而不是在您自己的管道中尝试去重复。

您可以通过使用withIdAttribute on PubSubIO.Read来实现这一点。示例

关于为什么我相信不同的原因的原因的一些解释。分异试图在窗口中去复制数据。我相信您正在尝试删除全局窗口,因此您的管道必须缓冲和比较所有元素,因为这是一个无界的PCollection。它将尝试永远缓冲。

我相信,如果您首先执行窗口化,并且具有确定性事件时间戳(它看起来不像在使用withTimestampAttribute),这将正常工作。然后,Distinct只适用于窗口中的元素(具有相同时间戳的相同元素将放置在同一个窗口中)。您可能想知道这是否适用于原型开发,但我建议在可能的情况下添加唯一的记录id,并允许Dataflow基于记录id处理复制,以获得最佳性能。

票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/57208405

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档