首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >无法打开气流的智能传感器功能(自定义传感器)

无法打开气流的智能传感器功能(自定义传感器)
EN

Stack Overflow用户
提问于 2021-09-28 22:40:55
回答 1查看 403关注 0票数 0

我试图在自定义传感器操作员(即BaseSensorOperator的子类)上使用气流的BaseSensorOperator。关于这个特性的文档现在非常稀少。

碎片作业(smart_sensor_group_shard_[x])正在运行,但我不认为它们会探测到我的传感器。日志上写着Loaded 0 sensor_works

我认为问题在于,即使我在我的配置BaseSensorOperator.is_smart_sensor_compatible()中打开了特性,仍然返回False。这是我的配置:

代码语言:javascript
复制
[smart_sensor]
sensors_enabled = MyCustomSensor
use_smart_sensor = True

但是这里有来自MySensorOperator的日志

代码语言:javascript
复制
INFO - self.sensor_service_enabled=False
INFO - self.sensors_support_sensor_service={'NamedHivePartitionSensor'}
INFO - Sensor is NOT Smart Sensor compatible

如您所见,操作员仍然会看到那些配置值的气流的默认设置。我不知道为什么会出现这种不一致,因为我可以在UI中正确地看到配置集。

剩下的代码

来自MyCustomSensor的相关代码:

代码语言:javascript
复制
class MyCustomSensor(BaseSensorOperator):
    poke_context_fields = ['some_arg', 'use_smart_sensor']

    def __init__(self, some_arg,
                 use_smart_sensor=False,
                 *args, **kwargs):
        self.some_arg = some_arg
        self.use_smart_sensor = use_smart_sensor
        super(MyCustomSensor, self).__init__(*args, **kwargs)
    
    def is_smart_sensor_compatible(self):

        # If we have turned it off.
        if not self.use_smart_sensor:
            is_compatible = False
        else:
            self.soft_fail = False

            # super() should be BaseSensorOperator
            is_compatible = super().is_smart_sensor_compatible()

        log.info(f'{self.sensor_service_enabled=}')
        log.info(f'{self.sensors_support_sensor_service=}')

        if is_compatible:
            log.info('Sensor IS Smart Sensor compatible')
        else:
            log.info('Sensor is NOT Smart Sensor compatible')
        return is_compatible

如何创建我的传感器任务:

代码语言:javascript
复制
# NOTE: I think that poke_interval may
#   be ignored when we are using Smart 
#   Sensors.
my_sensor = MyCustomSensor(
    task_id='some_name',
    prior_task='some_other_name',
    timeout=518400,
    mode='reschedule',
    poke_interval=30,
    use_smart_sensor=True,
    dag=dag
)

我正在使用Composer,特别是版本composer-1.17.1-airflow-2.1.2。我已经验证了这些并不是云编写器的阻塞配置。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-09-30 08:09:49

当我在现有Composer实例上使用命令sensors_enableduse_smart_sensor更新gcloud composer environments updateuse_smart_sensor值时,我能够使用代码再现您的错误。看起来Composer没有在运行时应用新的配置。

但我找到了解决办法。见以下步骤:

  1. 我创建了一个新的composer实例,在创建页面上,我已经定义了气流配置覆盖。

  1. 使用您的DAG,我进行了测试,气流配置是正确的应用。

我创建了一个公共问题跟踪器来向Composer工程团队报告这一点。

更新:

另一个解决方法是设置一个虚拟环境变量,以迫使工人重新启动并将更改应用于气流配置。

代码语言:javascript
复制
gcloud composer environments update <composer_env_name> \
--location <location> \
--update-env-variables=DUMMY=dummy

更新2:

从气流2.2开始,可递延算子是比智能传感器更好的解决方案。你应该先看看这个特性。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69369169

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档