首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Postgresql中的意外死锁(同时使用psycopg2)

Postgresql中的意外死锁(同时使用psycopg2)
EN

Stack Overflow用户
提问于 2017-03-11 00:21:14
回答 1查看 8.6K关注 0票数 3

我正在处理PostgreSQL中一个我不明白的死锁问题。

我正在尝试使用Python、psycopg2模块和Postgres数据库实现一个类似圆形罗宾的算法。

我希望应用程序的几个实例执行以下操作:

  • 用任务列表锁定整个表,间隔时间很短
  • 选择要执行的任务(最近执行的任务最少,但有一些限制)
  • 标记任务,以便其他实例不选择它(只允许一个实例同时执行相同的任务)
  • 解锁表
  • 执行任务
  • 重复 其他会议也应能够更新本表的某些字段。 突然间,我陷入了无法解释的僵局。我已经尽可能地简化了我的Python脚本,我在每个语句之后执行一个提交(如果可能的话),但是仍然不时会出现死锁。 由于某种原因,每次我遇到死锁时,它都是事务中的第一个语句。怎么可能呢?我的表没有任何触发器,或者外键约束,或者任何会使事情变得复杂的东西。我能给出的唯一解释是,PostgreSQL不会在提交之后立即释放锁。或者是psycopg2没有按照我所期望的方式工作?我无法通过在不同的会话中手动运行语句来重现这个问题。 死锁很少见,但我至少每隔几个小时就会收到一次。

我正在运行PostgreSQL 9.6.1和Python2.7.12

下面是我运行的代码(这只是我为捕捉问题而做的一个简化示例):

代码语言:javascript
复制
import psycopg2
import sys
import datetime
import time
sys.path.append('/opt/workflow/lib')
import config
import ovs_lib


instance_type='scan_master'
instance_id=sys.argv[1]

dbh=psycopg2.connect(dbname=config.values['pgsql']['db'], host=config.values['pgsql']['host'], port=int(config.values['pgsql']['port']), user=config.values['pgsql']['user'], password=config.values['pgsql']['pass'])
dbh.set_session(isolation_level='READ COMMITTED', autocommit=False)
cursor = dbh.cursor()
cursor.execute("SET search_path TO "+config.values['pgsql']['schema'])

def sanitize(string):
  string=string.replace("'","''")
  return string

def get_task(instance_id):
  task_id=None
  out_struct={}
  instance_id=sanitize(instance_id)
  #Lock whole table
  dbh.commit() #Just in case
  cursor.execute("SELECT 1 FROM wf_task FOR UPDATE") #Lock the table
  cursor.execute("UPDATE wf_task SET scanner_instance_id=null WHERE scanner_instance_id='"+instance_id+"'") #release task from previous run
  #Now get the task
  sql ="SELECT t.task_id, st.scanner_function, t.parallel_runs\n"
  sql+="FROM wf_task t\n"
  sql+="JOIN wf_scanner_type st ON t.scanner_type_id=st.scanner_type_id\n"
  sql+="WHERE status='A'\n"
  sql+="AND t.scanner_instance_id is NULL\n"
  sql+="AND last_scan_ts<=now()-scan_interval*interval '1 second'\n"
  sql+="ORDER BY last_scan_ts\n"
  sql+="LIMIT 1\n"
  cursor.execute(sql)
  cnt=cursor.rowcount
  if cnt>0:
    row=cursor.fetchone()
    task_id=row[0]
    sql ="UPDATE wf_task SET scanner_instance_id='"+instance_id+"',last_scan_ts=current_timestamp(3) WHERE task_id="+str(task_id)
    cursor.execute(sql)
    scanner_function=row[1]
    parallel_runs=row[2]
    out_struct['task_id']=task_id
    out_struct['scanner_function']=scanner_function
    out_struct['parallel_runs']=parallel_runs
  dbh.commit()
  return out_struct

def process_task(task_id):
  sql="UPDATE wf_task SET submitted_ts=now() WHERE task_id="+str(task_id)+" AND submitted_ts<now()"
  cursor.execute(sql)
  dbh.commit()
  sql="UPDATE wf_task SET executed_ts=now() WHERE task_id="+str(task_id)+" AND submitted_ts<now()"
  cursor.execute(sql)
  dbh.commit()

while True:
  if not ovs_lib.check_control(instance_type, instance_id):
    now_time=datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d %H:%M:%S')
    print now_time+" Stop sygnal received"
    exit(0)
  task_struct=get_task(instance_id)
  if 'task_id' not in task_struct:
    time.sleep(1)
    continue
  process_task(task_struct['task_id'])

下面是我得到的错误的例子:

代码语言:javascript
复制
Traceback (most recent call last):
  File "/opt/workflow/bin/scan_simple.py", line 70, in <module>
process_task(task_struct['task_id'])
  File "/opt/workflow/bin/scan_simple.py", line 58, in process_task
cursor.execute(sql)
psycopg2.extensions.TransactionRollbackError: deadlock detected
DETAIL:  Process 21577 waits for ShareLock on transaction 39243027; blocked by process 21425.
Process 21425 waits for ShareLock on transaction 39243029; blocked by process 21102.
Process 21102 waits for AccessExclusiveLock on tuple (8,12) of relation 39933 of database 16390; blocked by process 21577.
HINT:  See server log for query details.
CONTEXT:  while updating tuple (8,12) in relation "wf_task"

Traceback (most recent call last):
  File "/opt/workflow/bin/scan_simple.py", line 66, in <module>
    task_struct=get_task(instance_id)
  File "/opt/workflow/bin/scan_simple.py", line 27, in get_task
    cursor.execute("SELECT 1 FROM wf_task FOR UPDATE")
psycopg2.extensions.TransactionRollbackError: deadlock detected
DETAIL:  Process 21776 waits for ShareLock on transaction 39488839; blocked by process 21931.
Process 21931 waits for ShareLock on transaction 39488844; blocked by process 21776.
HINT:  See server log for query details.
CONTEXT:  while locking tuple (17,9) in relation “wf_task"

当时,我有6个脚本实例同时运行,数据库中没有其他会话处于活动状态。

稍后更新

今天,我学到了一些关于Postgres的新知识,它与这个问题非常相关。

从版本9.5开始,PostgreSQL支持跳过锁定语句,它以非常优雅的方式解决了我试图设计我的应用程序的问题

如果您在尝试实现某种队列或循环解决方案时在PostgreSQL中遇到并发问题,则绝对必须阅读以下内容:

https://blog.2ndquadrant.com/what-is-select-skip-locked-for-in-postgresql-9-5/

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-03-11 05:27:18

问题可能是,第一个SELECT ... FOR UPDATE中的顺序扫描并不总是以相同的顺序返回行,因此并发执行该语句会以不同的顺序锁定表的行。这将导致您所经历的僵局。

在提高善度方面,有几种解决办法:

  • 我认为为该更新锁定整个表的技术在性能上很糟糕,但是如果您坚持保留代码,可以将synchronize_seqscans设置为off,以便所有顺序扫描都以相同的顺序返回行。但是你真的不应该像你那样把所有的行都锁在一个表里,因为
代码语言:javascript
复制
- It causes an unnecessary sequential scan.
- It is not safe. Somebody could `INSERT` new rows between the time where you lock the rows and the time where you run your `UPDATE`s.

  • 如果确实希望锁定整个表,请使用LOCK TABLE语句而不是锁定表中的所有行。这也将打破僵局。
  • 最好的解决方案可能是使用UPDATE本身锁定行。为了避免死锁,请检查PostgreSQL为UPDATE使用的执行计划。这将是索引扫描或顺序扫描。通过索引扫描,您是安全的,因为这将以一定的顺序返回行。对于顺序扫描,禁用上面提到的synchronize_seqscans特性,理想情况下只用于事务: 启动事务;设置本地synchronize_seqscans = off;/*您的更新在这里*/提交;
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/42729849

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档