我正在创建一个守护程序。我试图从本地文件夹中感测多个文件,但无法感测文件夹中的所有文件。正确的逻辑应该是什么?
sensing_task=FileSensor(task_id='senseFile',
filepath='dags/source/*.csv',
poke_interval=10,
fs_conn_id='fs_my_conn')在执行时,它实际上是搜索"*.csv“文件,而不是模式。
发布于 2022-03-22 05:38:13
看来你可能需要定义你自己的传感器。看看文件传感器的源代码,您可能会做这样的事情来实现全局化。
class GlobSensor(BaseSensorOperator):
template_fields = ('filepath',)
ui_color = '#91818a'
@apply_defaults
def __init__(self, filepath, fs_conn_id='fs_default', *args, **kwargs):
super().__init__(*args, **kwargs)
self.filepath = filepath
self.fs_conn_id = fs_conn_id
def poke(self, context):
hook = FSHook(self.fs_conn_id)
basepath = hook.get_path()
full_path = os.path.join(basepath, self.filepath)
self.log.info('Poking for glob %s', full_path)
return len(glob.glob(full_path)) > 0https://stackoverflow.com/questions/71567116
复制相似问题