首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >束流/数据流作业中的加工单元数限制

束流/数据流作业中的加工单元数限制
EN

Stack Overflow用户
提问于 2021-03-29 07:30:59
回答 2查看 1.2K关注 0票数 1

我有一个梁流作业运行在数据流流道上。它加载来自PubSub的请求(使用Python的apache_beam.io.ReadFromPubSub),然后从BigTable获取数据,对数据进行大量计算,然后再次写入PubSub。

代码语言:javascript
复制
with beam.Pipeline(options=pipeline_options) as pipeline:
        (
            pipeline
            | "Receive" >> beam.io.ReadFromPubSub(topic=TOPIC_READ)
            | "Parse" >> beam.ParDo(Parse())
            | "Fetch" >> beam.ParDo(FetchFromBigtable(project, args.bt_instance, args.bt_par, args.bt_batch))
            | "Process" >> beam.ParDo(Process())
            | "Publish" >> beam.io.WriteToPubSub(topic=TOPIC_WRITE)
        )

基本上我不需要任何窗口,我只想限制在一台机器上并行处理的元素的数量(即通过工人的数量来控制并行性)。,否则,它会在繁重的计算过程中导致内存不足,我还需要限制BigTable请求的速率。

我使用的是标准的2 CPU机器,所以我希望它将并行地处理2 elemets我还设置了--number_of_worker_harness_threads=2--sdk_worker_parallelism=1。不过,出于某种原因,我看到许多元素被多个线程并行处理,这会导致内存和速率限制问题。我猜这些包是基于日志(例如work: "process_bundle-105")并行处理的。

我试图通过在processElement中使用一个信号量来破解它(只处理每个DoFN实例中的一个元素),而且它可以工作,但是自动标度并没有开始,它看起来像一个纯粹的黑客,可能会产生其他后果。

你有什么推荐的?如何限制要处理的并行包的数量?理想情况下,每个工作人员只能使用一个线束吗?beam/数据流是否适合这样的用例,还是用普通的kubernetes自动标度来实现它更好?

编辑:

在BeamSDK2.28.0上运行

我想限制并行性,但我没有很好地描述导致我得出这个结论的症状。

  1. 有时我会在Fetch阶段暂停
代码语言:javascript
复制
Deadline of 60.0s exceeded while calling functools.partial(<bound method PartialRowsData._read_next of <google.cloud.bigtable.row_data.PartialRowsData object at 0x7f16b405ba50>>)
  1. Process阶段中,一个元素的处理速度明显减慢(分为几分钟而不是秒),有时甚至会被卡住(可能是因为内存问题)。

下面是一个工作人员在处理Process阶段的一个元素(单线程)之前和之后记录的日志,由jsonPayload.workerjsonPayload.portability_worker_id过滤(我希望这些日志应该是来自一个容器的日志)。我可以看到超过12个元素在一个时刻被处理。

EN

回答 2

Stack Overflow用户

发布于 2021-03-30 17:46:19

数据流为每个核心启动一个SDK工人容器,因此在您的示例中,每台机器将有2个工作容器(进程)。每个工作进程都有一个用于处理包的无界线程池,但我认为由于python,每次只有一个包被处理。

您可以通过--experiments no_use_multiple_sdk_containers将sdk容器数量限制为一个(因为您的用例似乎不太关心吞吐量)。

票数 2
EN

Stack Overflow用户

发布于 2021-03-30 17:35:53

通过使用有状态处理,我成功地解决了Dataflow和Elasticsearch的这类问题。如果接收器跟不上管道其他部分的速度,可以使用GroupIntoBatches来减少并行性。

据我所知,州是由跑步者在每个窗口的基础上进行维护的.要使用有状态处理,您的数据需要有键。这些键可以是任意的,并且可以被用于使用元素的DoFn忽略。

您提到您不需要窗口,如果您目前没有使用任何窗口,这将意味着您使用的是默认的单数全局窗口。在这种情况下,任意分配给数据的任意数量的不同键将是维护的最大并行状态数。请注意,此解决方案不会移植到所有运行程序,因为状态处理并不是所有运行程序都支持的。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/66850234

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档