我试图在自定义传感器操作员(即BaseSensorOperator的子类)上使用气流的BaseSensorOperator。关于这个特性的文档现在非常稀少。
碎片作业(smart_sensor_group_shard_[x])正在运行,但我不认为它们会探测到我的传感器。日志上写着Loaded 0 sensor_works。
我认为问题在于,即使我在我的配置BaseSensorOperator.is_smart_sensor_compatible()中打开了特性,仍然返回False。这是我的配置:
[smart_sensor]
sensors_enabled = MyCustomSensor
use_smart_sensor = True但是这里有来自MySensorOperator的日志
INFO - self.sensor_service_enabled=False
INFO - self.sensors_support_sensor_service={'NamedHivePartitionSensor'}
INFO - Sensor is NOT Smart Sensor compatible如您所见,操作员仍然会看到那些配置值的气流的默认设置。我不知道为什么会出现这种不一致,因为我可以在UI中正确地看到配置集。
剩下的代码
来自MyCustomSensor的相关代码:
class MyCustomSensor(BaseSensorOperator):
poke_context_fields = ['some_arg', 'use_smart_sensor']
def __init__(self, some_arg,
use_smart_sensor=False,
*args, **kwargs):
self.some_arg = some_arg
self.use_smart_sensor = use_smart_sensor
super(MyCustomSensor, self).__init__(*args, **kwargs)
def is_smart_sensor_compatible(self):
# If we have turned it off.
if not self.use_smart_sensor:
is_compatible = False
else:
self.soft_fail = False
# super() should be BaseSensorOperator
is_compatible = super().is_smart_sensor_compatible()
log.info(f'{self.sensor_service_enabled=}')
log.info(f'{self.sensors_support_sensor_service=}')
if is_compatible:
log.info('Sensor IS Smart Sensor compatible')
else:
log.info('Sensor is NOT Smart Sensor compatible')
return is_compatible如何创建我的传感器任务:
# NOTE: I think that poke_interval may
# be ignored when we are using Smart
# Sensors.
my_sensor = MyCustomSensor(
task_id='some_name',
prior_task='some_other_name',
timeout=518400,
mode='reschedule',
poke_interval=30,
use_smart_sensor=True,
dag=dag
)我正在使用Composer,特别是版本composer-1.17.1-airflow-2.1.2。我已经验证了这些并不是云编写器的阻塞配置。
发布于 2021-09-30 08:09:49
当我在现有Composer实例上使用命令sensors_enabled和use_smart_sensor更新gcloud composer environments update和use_smart_sensor值时,我能够使用代码再现您的错误。看起来Composer没有在运行时应用新的配置。
但我找到了解决办法。见以下步骤:


我创建了一个公共问题跟踪器来向Composer工程团队报告这一点。
更新:
另一个解决方法是设置一个虚拟环境变量,以迫使工人重新启动并将更改应用于气流配置。
gcloud composer environments update <composer_env_name> \
--location <location> \
--update-env-variables=DUMMY=dummy更新2:
从气流2.2开始,可递延算子是比智能传感器更好的解决方案。你应该先看看这个特性。
https://stackoverflow.com/questions/69369169
复制相似问题