首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何利用气流进行实时数据处理

如何利用气流进行实时数据处理
EN

Stack Overflow用户
提问于 2018-02-26 06:20:42
回答 2查看 2K关注 0票数 0

我有一个场景,我希望处理csv文件并加载到其他数据库:

病例

  1. pic csv文件并以与csv同名的名称加载到mysql。
  2. 然后使用python任务文件对已加载的行进行一些修改。
  3. 之后,从mysql中提取数据并加载到其他数据库。

CSV文件从远程服务器到文件夹中的一个气流服务器。

我们必须通过python脚本选择这些csv文件并进行处理。

假设我选择了一个csv文件,那么我需要以依赖关系的方式将这个csv文件传递给操作符的其余部分

代码语言:javascript
复制
filename : abc.csv

task1 >> task2 >> task3 >>task4

因此,abc.csv应该可以用于所有任务。

请告诉我该怎么做。

EN

回答 2

Stack Overflow用户

发布于 2018-03-02 05:45:53

您的场景与实时没有任何关系。这是按时间表[间隔]进行的。或者您可以使用SensorTask操作符来检测数据可用性。

将每个需求实现为函数,并从操作符实例中调用它们。将操作符添加到DAG中,并为您的传入提要制定适当的时间表。

在扩展运算符-kwargs -jinja模板时,在execute方法中输入运算符上下文‘param_key’时,传递和访问params的方式是-jinja

相关的..。airflow pass parameter from cli date in airflow: need to access as a variable

票数 1
EN

Stack Overflow用户

发布于 2018-12-06 14:26:29

任务在气流中通信的方式是使用XCOM,但它只用于小值,而不是文件内容。

如果希望任务处理同一个csv文件,则应该将其保存在某个位置,然后在XCOM中传递到该位置的路径。

我们使用的是LocalExecutor,因此本地文件系统对我们来说很好。

我们决定为每个进程创建一个文件夹,其名称为dag。在该文件夹中,我们为每个执行日期生成一个文件夹(我们在第一个任务中这样做,我们总是调用start_task)。然后通过Xcom将该文件夹的路径传递给后续任务。

Start_task的示例代码:

代码语言:javascript
复制
def start(share_path, **context):
    execution_date_as_string = context['execution_date'].strftime(DATE_FORMAT)    
    execution_folder_path = os.path.join(share_path, 'my_dag_name', execution_date_as_string)
    _create_folder_delete_if_exists(execution_folder_path)
    task_instance = context['task_instance']
    task_instance.xcom_push(key="execution_folder_path", value=execution_folder_path)

start_task = PythonOperator(
    task_id='start_task',
    provide_context=True,
    python_callable=start,
    op_args=[share_path],
    dag=dag
)

share_path是所有dags的基本目录,我们将它保存在气流变量中。

后续任务可以通过以下方式获取执行文件夹:

代码语言:javascript
复制
execution_folder_path = task_instance.xcom_pull(task_ids='start_task', key='execution_folder_path')
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/48982540

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档