首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Google Dataflow和Pubsub -无法实现精确的一次交付。

Google Dataflow和Pubsub -无法实现精确的一次交付。
EN

Stack Overflow用户
提问于 2018-09-20 10:42:34
回答 3查看 2.3K关注 0票数 2

我正在尝试使用和使用ApacheBeamSDK2.6.0实现精确的一次交付。

用例非常简单:

“生成器”数据流作业向PubSub主题发送100万条消息。

代码语言:javascript
复制
GenerateSequence
          .from(0)
          .to(1000000)
          .withRate(100000, Duration.standardSeconds(1L));

“归档”数据流作业从PubSub订阅中读取消息,并保存到。

代码语言:javascript
复制
pipeline
        .apply("Read events",
            PubsubIO.readMessagesWithAttributes()
                // this is to achieve exactly-once delivery
                .withIdAttribute(ATTRIBUTE_ID)
                .fromSubscription('subscription')
                .withTimestampAttribute(TIMESTAMP_ATTRIBUTE))
        .apply("Window events",
            Window.<Dto>into(FixedWindows.of(Duration.millis(options.getWindowDuration())))
                .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
                .withAllowedLateness(Duration.standardMinutes(15))
                .discardingFiredPanes())
        .apply("Events count metric", ParDo.of(new CountMessagesMetric()))
        .apply("Write files to archive",
            FileIO.<String, Dto>writeDynamic()
                .by(Dto::getDataSource).withDestinationCoder(StringUtf8Coder.of())
                .via(Contextful.of((msg, ctx) -> msg.getData(), Requirements.empty()), TextIO.sink())
                .to(archiveDir)
                .withTempDirectory(archiveDir)
                .withNumShards(options.getNumShards())
                .withNaming(dataSource ->
                    new SyslogWindowedDataSourceFilenaming(dataSource, archiveDir, filenamePrefix, filenameSuffix)
                ));

我将'withIdAttribute‘添加到Pubsub.IO.Write (’生成器‘作业)和PubsubIO.Read ('Archive’作业)中,并期望它能保证精确的语义。

我想测试一下“负面”的情况:

  1. “生成器”数据流作业向PubSub主题发送100万条消息。
  2. “归档”数据流作业开始工作,但我在处理过程中停止单击“停止作业”-> 'Drain‘。部分消息已经被处理并保存到Cloud,比如400K消息。
  3. 我再次启动“Archive”作业,并期望它将接受未经处理的消息(600 K),最终我将看到正好有100万条消息保存到存储中。

事实上,我得到的是所有的信息(至少实现了一次),但最重要的是,还有很多重复信息--大约在每100万条消息中有30到50K的副本。

是否有任何解决方案来实现准确的一次交付?

EN

回答 3

Stack Overflow用户

发布于 2019-04-24 21:55:33

数据流不能使您在运行期间持久化状态。如果您使用更新正在运行的管道,您可以以一种不会导致它失去现有状态的方式进行,从而允许您跨管道版本去复制。

如果这对您不起作用,您可能希望以ATTRIBUTE_ID键控消息的方式存档消息,例如,。扳手或GCS使用此文件名。

票数 2
EN

Stack Overflow用户

发布于 2018-09-20 12:15:00

所以,我自己从来没有这样做过,但是对你的问题的推理--这就是我如何处理它.

我的解决方案有点复杂,但我没有找到其他方法来实现这一点,而不涉及其他外部服务。所以,这里什么都没有。

您可以让您的管道从pubsub和GCS中读取,然后将它们合并到去复制数据。这里的棘手之处在于,一个是有界的pCollection (GCS),另一个是无界的(pubsub)。您可以添加时间戳到有界集合,然后窗口数据。在此阶段,您可能会删除GCS数据,时间可能超过15分钟(在您的先例实现中窗口的持续时间)。这两个步骤(即正确添加时间戳和删除可能已经足够老到不能创建重复项的数据)是目前为止最棘手的部分。

一旦解决了这个问题,就追加两个pCollections,然后在两个数据集中常见的Id上使用一个GroupByKey。这将产生一个PCollection<KV<Long, Iterable<YOUR_DATUM_TYPE>>。然后,您可以使用一个额外的DoFn,它删除结果Iterable中除第一个元素之外的所有元素,还可以删除KV<>装箱。从那里开始,您可以像往常一样继续处理数据。

最后,当重新启动管道时,这个额外的工作应该只对第一个pubsub窗口是必要的。在此之后,您应该将GCS pCollection重新分配给空的pCollection,这样组按键就不会做太多额外的工作。

告诉我你是怎么想的,如果能成功的话。另外,如果您决定采用此策略,请张贴您的里程:)。

票数 0
EN

Stack Overflow用户

发布于 2022-03-21 12:31:44

同时,Pub/Sub支持一次交货

它目前正处于GA发射前的状态,因此不幸的是还没有准备好投入生产使用。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/52423275

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档