我正在处理PostgreSQL中一个我不明白的死锁问题。
我正在尝试使用Python、psycopg2模块和Postgres数据库实现一个类似圆形罗宾的算法。
我希望应用程序的几个实例执行以下操作:
我正在运行PostgreSQL 9.6.1和Python2.7.12
下面是我运行的代码(这只是我为捕捉问题而做的一个简化示例):
import psycopg2
import sys
import datetime
import time
sys.path.append('/opt/workflow/lib')
import config
import ovs_lib
instance_type='scan_master'
instance_id=sys.argv[1]
dbh=psycopg2.connect(dbname=config.values['pgsql']['db'], host=config.values['pgsql']['host'], port=int(config.values['pgsql']['port']), user=config.values['pgsql']['user'], password=config.values['pgsql']['pass'])
dbh.set_session(isolation_level='READ COMMITTED', autocommit=False)
cursor = dbh.cursor()
cursor.execute("SET search_path TO "+config.values['pgsql']['schema'])
def sanitize(string):
string=string.replace("'","''")
return string
def get_task(instance_id):
task_id=None
out_struct={}
instance_id=sanitize(instance_id)
#Lock whole table
dbh.commit() #Just in case
cursor.execute("SELECT 1 FROM wf_task FOR UPDATE") #Lock the table
cursor.execute("UPDATE wf_task SET scanner_instance_id=null WHERE scanner_instance_id='"+instance_id+"'") #release task from previous run
#Now get the task
sql ="SELECT t.task_id, st.scanner_function, t.parallel_runs\n"
sql+="FROM wf_task t\n"
sql+="JOIN wf_scanner_type st ON t.scanner_type_id=st.scanner_type_id\n"
sql+="WHERE status='A'\n"
sql+="AND t.scanner_instance_id is NULL\n"
sql+="AND last_scan_ts<=now()-scan_interval*interval '1 second'\n"
sql+="ORDER BY last_scan_ts\n"
sql+="LIMIT 1\n"
cursor.execute(sql)
cnt=cursor.rowcount
if cnt>0:
row=cursor.fetchone()
task_id=row[0]
sql ="UPDATE wf_task SET scanner_instance_id='"+instance_id+"',last_scan_ts=current_timestamp(3) WHERE task_id="+str(task_id)
cursor.execute(sql)
scanner_function=row[1]
parallel_runs=row[2]
out_struct['task_id']=task_id
out_struct['scanner_function']=scanner_function
out_struct['parallel_runs']=parallel_runs
dbh.commit()
return out_struct
def process_task(task_id):
sql="UPDATE wf_task SET submitted_ts=now() WHERE task_id="+str(task_id)+" AND submitted_ts<now()"
cursor.execute(sql)
dbh.commit()
sql="UPDATE wf_task SET executed_ts=now() WHERE task_id="+str(task_id)+" AND submitted_ts<now()"
cursor.execute(sql)
dbh.commit()
while True:
if not ovs_lib.check_control(instance_type, instance_id):
now_time=datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d %H:%M:%S')
print now_time+" Stop sygnal received"
exit(0)
task_struct=get_task(instance_id)
if 'task_id' not in task_struct:
time.sleep(1)
continue
process_task(task_struct['task_id'])下面是我得到的错误的例子:
Traceback (most recent call last):
File "/opt/workflow/bin/scan_simple.py", line 70, in <module>
process_task(task_struct['task_id'])
File "/opt/workflow/bin/scan_simple.py", line 58, in process_task
cursor.execute(sql)
psycopg2.extensions.TransactionRollbackError: deadlock detected
DETAIL: Process 21577 waits for ShareLock on transaction 39243027; blocked by process 21425.
Process 21425 waits for ShareLock on transaction 39243029; blocked by process 21102.
Process 21102 waits for AccessExclusiveLock on tuple (8,12) of relation 39933 of database 16390; blocked by process 21577.
HINT: See server log for query details.
CONTEXT: while updating tuple (8,12) in relation "wf_task"
Traceback (most recent call last):
File "/opt/workflow/bin/scan_simple.py", line 66, in <module>
task_struct=get_task(instance_id)
File "/opt/workflow/bin/scan_simple.py", line 27, in get_task
cursor.execute("SELECT 1 FROM wf_task FOR UPDATE")
psycopg2.extensions.TransactionRollbackError: deadlock detected
DETAIL: Process 21776 waits for ShareLock on transaction 39488839; blocked by process 21931.
Process 21931 waits for ShareLock on transaction 39488844; blocked by process 21776.
HINT: See server log for query details.
CONTEXT: while locking tuple (17,9) in relation “wf_task"当时,我有6个脚本实例同时运行,数据库中没有其他会话处于活动状态。
稍后更新
今天,我学到了一些关于Postgres的新知识,它与这个问题非常相关。
从版本9.5开始,PostgreSQL支持跳过锁定语句,它以非常优雅的方式解决了我试图设计我的应用程序的问题
如果您在尝试实现某种队列或循环解决方案时在PostgreSQL中遇到并发问题,则绝对必须阅读以下内容:
https://blog.2ndquadrant.com/what-is-select-skip-locked-for-in-postgresql-9-5/
发布于 2017-03-11 05:27:18
问题可能是,第一个SELECT ... FOR UPDATE中的顺序扫描并不总是以相同的顺序返回行,因此并发执行该语句会以不同的顺序锁定表的行。这将导致您所经历的僵局。
在提高善度方面,有几种解决办法:
synchronize_seqscans设置为off,以便所有顺序扫描都以相同的顺序返回行。但是你真的不应该像你那样把所有的行都锁在一个表里,因为- It causes an unnecessary sequential scan.
- It is not safe. Somebody could `INSERT` new rows between the time where you lock the rows and the time where you run your `UPDATE`s.
LOCK TABLE语句而不是锁定表中的所有行。这也将打破僵局。UPDATE本身锁定行。为了避免死锁,请检查PostgreSQL为UPDATE使用的执行计划。这将是索引扫描或顺序扫描。通过索引扫描,您是安全的,因为这将以一定的顺序返回行。对于顺序扫描,禁用上面提到的synchronize_seqscans特性,理想情况下只用于事务:
启动事务;设置本地synchronize_seqscans = off;/*您的更新在这里*/提交;https://stackoverflow.com/questions/42729849
复制相似问题