我有一个场景,我希望处理csv文件并加载到其他数据库:
病例
CSV文件从远程服务器到文件夹中的一个气流服务器。
我们必须通过python脚本选择这些csv文件并进行处理。
假设我选择了一个csv文件,那么我需要以依赖关系的方式将这个csv文件传递给操作符的其余部分
filename : abc.csv
task1 >> task2 >> task3 >>task4因此,abc.csv应该可以用于所有任务。
请告诉我该怎么做。
发布于 2018-03-02 05:45:53
您的场景与实时没有任何关系。这是按时间表[间隔]进行的。或者您可以使用SensorTask操作符来检测数据可用性。
将每个需求实现为函数,并从操作符实例中调用它们。将操作符添加到DAG中,并为您的传入提要制定适当的时间表。
在扩展运算符-kwargs -jinja模板时,在execute方法中输入运算符上下文‘param_key’时,传递和访问params的方式是-jinja
相关的..。airflow pass parameter from cli date in airflow: need to access as a variable
发布于 2018-12-06 14:26:29
任务在气流中通信的方式是使用XCOM,但它只用于小值,而不是文件内容。
如果希望任务处理同一个csv文件,则应该将其保存在某个位置,然后在XCOM中传递到该位置的路径。
我们使用的是LocalExecutor,因此本地文件系统对我们来说很好。
我们决定为每个进程创建一个文件夹,其名称为dag。在该文件夹中,我们为每个执行日期生成一个文件夹(我们在第一个任务中这样做,我们总是调用start_task)。然后通过Xcom将该文件夹的路径传递给后续任务。
Start_task的示例代码:
def start(share_path, **context):
execution_date_as_string = context['execution_date'].strftime(DATE_FORMAT)
execution_folder_path = os.path.join(share_path, 'my_dag_name', execution_date_as_string)
_create_folder_delete_if_exists(execution_folder_path)
task_instance = context['task_instance']
task_instance.xcom_push(key="execution_folder_path", value=execution_folder_path)
start_task = PythonOperator(
task_id='start_task',
provide_context=True,
python_callable=start,
op_args=[share_path],
dag=dag
)share_path是所有dags的基本目录,我们将它保存在气流变量中。
后续任务可以通过以下方式获取执行文件夹:
execution_folder_path = task_instance.xcom_pull(task_ids='start_task', key='execution_folder_path')https://stackoverflow.com/questions/48982540
复制相似问题