我们运行的是Airflow v1.9.0,其中一个DAG出现了问题。这个SSHOperator操作符DAG (称为匹配)从06***开始,通常在一小时内完成。每个月有一次,我们有大量的数据摄取,导致这个任务占用7个小时。不幸的是,当这种情况发生时,DAG会阻止我们的其他DAG启动,直到它完成。它是这7小时内唯一运行的DAG。这不是正常行为,也不是我们的其他DAGS (它们继续运行,其他DAGS将启动)。我们找不到任何可能导致此问题的表锁(PostgreSQL)。
from airflow import DAG
from airflow.contrib.operators.ssh_operator import SSHOperator
from datetime import datetime, timedelta
from os import path
PATH = '/path/to/code/'
default_args = {
'owner': 'airflow',
'depends_on_past': True,
'start_date': datetime(2018, 1, 24),
'email': ['myemail@emails.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
}
with open(path.join(PATH, 'matching', 'run_nightly_matching.sh')) as f:
nightly_matching_command = f.read()
dag = DAG('matching',
template_searchpath = PATH,
schedule_interval = '0 6 * * *', #10:00PM
default_args = default_args)
nightly_matching = SSHOperator(task_id = 'nightly_matching',
ssh_conn_id = 'external_server_name',
command = nightly_matching_command,
do_xcom_push = True,
dag = dag)以下是我查看airflow数据库时的查询结果。
airflow=# select task_id, dag_id, start_date, execution_date,end_date, duration, state, pool, queue from task_instance where date_trunc('day', start_date)::DATE = '2018-05-09'::DATE order by start_date;
task_id: nightly_matching dag_id: matching start_date: 2018-05-09 06:00:04.486709 execution_date: 2018-05-08 06:00:00 end_date: 2018-05-09 12:50:52.509942 duration: 24648.023233 state: success pool: queue: default
紧随问题DAG之后的此DAG计划于10:40启动。
task_id: Task1 dag_id: dag2 start_date: 2018-05-09 12:51:04.963004 execution_date: 2018-05-08 10:40:00 end_date: 2018-05-09 12:51:07.060369 duration: 2.097365 state: success pool: queue: default
run_nightly_matching.sh运行一个Python脚本,该脚本使用psycopg2连接到我们的数据库并匹配成对的表。
*** Reading local log.
[2018-05-09 06:00:04,352] {cli.py:374} INFO - Running on host airflow
[2018-05-09 06:00:04,486] {models.py:1197} INFO - Dependencies all met for <TaskInstance: matching.nightly_matching 2018-05-08 06:00:00 [queued]>
[2018-05-09 06:00:04,587] {models.py:1197} INFO - Dependencies all met for <TaskInstance: matching.nightly_matching 2018-05-08 06:00:00 [queued]>
[2018-05-09 06:00:04,588] {models.py:1407} INFO -
--------------------------------------------------------------------------------
Starting attempt 1 of 2
--------------------------------------------------------------------------------
[2018-05-09 06:00:04,617] {models.py:1428} INFO - Executing <Task(SSHOperator): nightly_matching> on 2018-05-08 06:00:00
[2018-05-09 06:00:04,617] {base_task_runner.py:115} INFO - Running: ['bash', '-c', u'airflow run matching nightly_matching 2018-05-08T06:00:00 --job_id 274416 --raw -sd DAGS_FOLDER/matching.py']
[2018-05-09 06:00:04,981] {base_task_runner.py:98} INFO - Subtask: [2018-05-09 06:00:04,980] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt
[2018-05-09 06:00:05,023] {base_task_runner.py:98} INFO - Subtask: [2018-05-09 06:00:05,022] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
[2018-05-09 06:00:05,140] {base_task_runner.py:98} INFO - Subtask: [2018-05-09 06:00:05,139] {__init__.py:45} INFO - Using executor LocalExecutor
[2018-05-09 06:00:05,190] {base_task_runner.py:98} INFO - Subtask: [2018-05-09 06:00:05,190] {models.py:189} INFO - Filling up the DagBag from /home/airflow/airflow/dags/matching.py
[2018-05-09 06:00:05,445] {base_task_runner.py:98} INFO - Subtask: [2018-05-09 06:00:05,444] {base_hook.py:80} INFO - Using connection to: external_server_name
[2018-05-09 06:00:05,511] {base_task_runner.py:98} INFO - Subtask: [2018-05-09 06:00:05,509] {transport.py:1687} INFO - Connected (version 2.0, client OpenSSH_6.6.1p1)
[2018-05-09 06:00:05,621] {base_task_runner.py:98} INFO - Subtask: [2018-05-09 06:00:05,620] {transport.py:1687} INFO - Authentication (publickey) successful!
[2018-05-09 12:50:52,431] {base_task_runner.py:98} INFO - Subtask: [2018-05-09 12:50:52,430] {ssh_operator.py:113} INFO - Matching: ('table_Z', 'table_Y')
[2018-05-09 12:50:52,432] {base_task_runner.py:98} INFO - Subtask: Started at: 2018-05-08 23:00:58.172398
[2018-05-09 12:50:52,433] {base_task_runner.py:98} INFO - Subtask: Finished at: 2018-05-08 23:17:19.273990
[2018-05-09 12:50:52,433] {base_task_runner.py:98} INFO - Subtask: Matching: ('table_Y', 'table_R')
[2018-05-09 12:50:52,433] {base_task_runner.py:98} INFO - Subtask: Started at: 2018-05-08 23:17:19.274092
[2018-05-09 12:50:52,433] {base_task_runner.py:98} INFO - Subtask: Finished at: 2018-05-09 03:16:46.339119
[2018-05-09 12:50:52,434] {base_task_runner.py:98} INFO - Subtask: Matching: ('table_X', 'table_Y')
[2018-05-09 12:50:52,434] {base_task_runner.py:98} INFO - Subtask: Started at: 2018-05-09 03:16:46.339228
[2018-05-09 12:50:52,434] {base_task_runner.py:98} INFO - Subtask: Finished at: 2018-05-09 04:49:16.901285
[2018-05-09 12:50:52,435] {base_task_runner.py:98} INFO - Subtask: Matching: ('table_X', 'clients')
[2018-05-09 12:50:52,435] {base_task_runner.py:98} INFO - Subtask: Started at: 2018-05-09 04:49:16.901410
[2018-05-09 12:50:52,435] {base_task_runner.py:98} INFO - Subtask: Finished at: 2018-05-09 05:02:30.502418
[2018-05-09 12:50:52,435] {base_task_runner.py:98} INFO - Subtask: Matching: ('clients', 'table_R')
[2018-05-09 12:50:52,436] {base_task_runner.py:98} INFO - Subtask: Started at: 2018-05-09 05:02:30.502494
[2018-05-09 12:50:52,436] {base_task_runner.py:98} INFO - Subtask: Finished at: 2018-05-09 05:02:47.035880
[2018-05-09 12:50:52,436] {base_task_runner.py:98} INFO - Subtask: Matching: ('table_Y', 'table_B')
[2018-05-09 12:50:52,436] {base_task_runner.py:98} INFO - Subtask: Started at: 2018-05-09 05:02:47.035974
[2018-05-09 12:50:52,437] {base_task_runner.py:98} INFO - Subtask: Finished at: 2018-05-09 05:03:17.464931
[2018-05-09 12:50:52,437] {base_task_runner.py:98} INFO - Subtask: Matching: ('table_E', 'table_Y')
[2018-05-09 12:50:52,437] {base_task_runner.py:98} INFO - Subtask: Started at: 2018-05-09 05:03:17.465061
[2018-05-09 12:50:52,437] {base_task_runner.py:98} INFO - Subtask: Finished at: 2018-05-09 05:43:05.543177
[2018-05-09 12:50:52,438] {base_task_runner.py:98} INFO - Subtask: Matching: ('table_P', 'table_Y')
[2018-05-09 12:50:52,438] {base_task_runner.py:98} INFO - Subtask: Started at: 2018-05-09 05:43:05.543298
[2018-05-09 12:50:52,438] {base_task_runner.py:98} INFO - Subtask: Finished at: 2018-05-09 05:51:42.683216发布于 2018-05-11 14:59:21
我认为这可能与执行任务的数量和这一位有关:
with open(path.join(PATH, 'matching', 'run_nightly_matching.sh')) as f:
nightly_matching_command = f.read()如果我没记错的话,这个代码位的执行频率可能比计划的要高得多,因为它不是任务的一部分,也是不可触发的。这将在每次执行Python文件本身时执行。这可能永远不会发生,也不会经常发生,这取决于您的设置。由于打开了一个文件句柄,这可能与问题有关。这可能没什么,但可能值得一查。
此外,我无法查看cfg文件,因为它存储在Dropbox上,我必须登录。
在这里,并发允许线程的数量可能会变得有趣。听起来像是一个阻塞问题。也可能不是DAG本身停止了其他任务,而是负载增加到足以消耗太多负载的地步。如果没有具体细节,很难说。
https://stackoverflow.com/questions/50281587
复制相似问题