首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >为什么Apache `DoFn.setup()‘在工人启动后被多次调用?

为什么Apache `DoFn.setup()‘在工人启动后被多次调用?
EN

Stack Overflow用户
提问于 2022-11-16 14:18:16
回答 1查看 54关注 0票数 0

我目前正在试验一个流数据流管道(在Python中)。我读取了一个数据流,我喜欢将这些数据流写入PG CloudSQL实例。为此,我正在寻找一个创建数据库连接的适当位置。当我使用ParDo函数编写数据时,我认为DoFn.setup()是个不错的地方。

根据多个资源,这应该是一个很好的位置,因为setup()只被调用一次(当工作人员启动时)。

我运行了一些测试,但它似乎更频繁地调用setup(),而不是只在工人初始化时调用。它的运行似乎与start_bundle()一样多(在这么多元素之后)。

我创建了一个简单的管道,它从PubSub读取一些消息,提取对象的文件名并输出文件名。除此之外,它还记录了调用setup()start_bundle()的时间:

代码语言:javascript
复制
import argparse
import logging
from datetime import datetime

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

setup_counter=0
bundle_counter=0

class GetFileName(beam.DoFn):
    """
    Generate file path from PubSub message attributes
    """
    def _now(self):
        return datetime.now().strftime("%Y/%m/%d %H:%M:%S")

    def setup(self):
        global setup_counter
                
        moment = self._now()
        logging.info("setup() called %s" % moment)
        
        setup_counter=setup_counter+1
        logging.info(f"""setup_counter = {setup_counter}""")

    def start_bundle(self):
        global bundle_counter
        
        moment = self._now()
        logging.info("Bundle started %s" % moment)
        
        bundle_counter=bundle_counter+1
        logging.info(f"""Bundle_counter = {bundle_counter}""")

    def process(self, element):
        attr = dict(element.attributes)

        objectid = attr["objectId"]

        # not sure if this is the prettiest way to create this uri, but works for the poc
        path = f'{objectid}'

        yield path


def run(input_subscription, pipeline_args=None):

    pipeline_options = PipelineOptions(
        pipeline_args, streaming=True
    )

    with beam.Pipeline(options=pipeline_options) as pipeline:

        files = (pipeline
                 | "Read from PubSub" >> beam.io.ReadFromPubSub(subscription=input_subscription,
                                                                with_attributes=True)
                 | "Get filepath" >> beam.ParDo(GetFileName())
                )

        files | "Print results" >> beam.Map(logging.info)


if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--input_subscription",
        dest="input_subscription",
        required=True,
        help="The Cloud Pub/Sub subscription to read from."
    )

    known_args, pipeline_args = parser.parse_known_args()

    run(
        known_args.input_subscription,
        pipeline_args
    )

基于此,我希望看到setup()只记录一次(在启动管道之后),在DirectRunner上运行此作业时,start_bundle()会被任意次数记录一次。

但是,setup()的调用似乎与start_bundle()一样多。

看看日志:

代码语言:javascript
复制
python main.py \
>     --runner DirectRunner \
>     --input_subscription <my_subscription> \
>     --direct_num_workers 1 \
>     --streaming true
...
INFO:root:setup() called 2022/11/16 15:11:13
INFO:root:setup_counter = 1
INFO:root:Bundle started 2022/11/16 15:11:13
INFO:root:Bundle_counter = 1
INFO:root:avro/20221116135543584-hlgeinp.avro
INFO:root:avro/20221116135543600-hlsusop.avro
INFO:root:avro/20221116135543592-hlmvtgp.avro
INFO:root:avro/20221116135543597-hlsuppp.avro
INFO:root:avro/20221116135553122-boevtdp.avro
INFO:root:avro/20221116135553126-bomipep.avro
INFO:root:avro/20221116135553127-hlsuppp.avro
INFO:root:avro/20221116135155024-boripep.avro
INFO:root:avro/20221116135155020-bolohdp.avro
INFO:root:avro/20221116135155029-hlmvaep.avro
...
INFO:root:setup() called 2022/11/16 15:11:16
INFO:root:setup_counter = 2
INFO:root:Bundle started 2022/11/16 15:11:16
INFO:root:Bundle_counter = 2
INFO:root:high-volume/20221112234700584-hlprenp.avro
INFO:root:high-volume/20221113011240903-hlprenp.avro
INFO:root:high-volume/20221113010654305-hlprenp.avro
INFO:root:high-volume/20221113010822785-hlprenp.avro
INFO:root:high-volume/20221113010927402-hlprenp.avro
INFO:root:high-volume/20221113011248805-hlprenp.avro
INFO:root:high-volume/20221112234730001-hlprenp.avro
INFO:root:high-volume/20221112234738994-hlprenp.avro
INFO:root:high-volume/20221113010956395-hlprenp.avro
INFO:root:high-volume/20221113011648293-hlprenp.avro
...
INFO:root:setup() called 2022/11/16 15:11:18
INFO:root:setup_counter = 3
INFO:root:Bundle started 2022/11/16 15:11:18
INFO:root:Bundle_counter = 3
INFO:root:high-volume/20221113012008604-hlprenp.avro
INFO:root:high-volume/20221113011337394-hlprenp.avro
INFO:root:high-volume/20221113011307598-hlprenp.avro
INFO:root:high-volume/20221113011345403-hlprenp.avro
INFO:root:high-volume/20221113012000982-hlprenp.avro
INFO:root:high-volume/20221113011712190-hlprenp.avro
INFO:root:high-volume/20221113011640005-hlprenp.avro
INFO:root:high-volume/20221113012751380-hlprenp.avro
INFO:root:high-volume/20221113011914286-hlprenp.avro
INFO:root:high-volume/20221113012439206-hlprenp.avro

有人能澄清这种行为吗?我想知道我对setup()功能的理解是否不正确,或者这是否可以用另一种方式解释,因为基于这个测试,setup()似乎不是建立DB连接的好地方。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-11-16 16:47:23

根据Beam 文档,可以多次调用setup方法:

代码语言:javascript
复制
DoFn.setup(): Called whenever the DoFn instance is deserialized on the worker. 
This means it can be called more than once per worker because multiple instances of a given DoFn subclass may be created 
(e.g., due to parallelization, or due to garbage collection 
after a period of disuse). 
This is a good place to connect to database instances, open network connections or other resources.

但是,它仍然是实例化和为数据库创建连接池的最佳位置。

teardown是关闭每个员工连接的最佳位置。

代码语言:javascript
复制
DoFn.teardown(): Called once (as a best effort) per DoFn instance when the DoFn instance is shutting down. 
This is a good place to close database instances, close network connections or other resources.

Note that teardown is called as a best effort and is not guaranteed. For example, 
if the worker crashes, teardown might not be called.
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/74462039

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档