首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Celery+Django --原子事务与数据库相关的任务

Celery+Django --原子事务与数据库相关的任务
EN

Stack Overflow用户
提问于 2015-10-07 18:31:11
回答 2查看 2.1K关注 0票数 1

在我使用Django、Docker和芹菜的当前项目中,基本的上传文件函数insertIntoDatabase是从任务调用的,而在views.py中,任务是用delay调用的。

在databaseinserter.py中:

代码语言:javascript
复制
def insertIntoDatabase(datapoints, user, description): # datapoints is a list of dictionaries, user and description are just strings
    # convert data and upload to our database

在tasks.py中:

代码语言:javascript
复制
@app.task()
def db_ins_task(datapoints, user, description):
    from databaseinserter import insertIntoDatabase
    insertIntoDatabase(datapoints, user, description)

在views.py中:

代码语言:javascript
复制
with transaction.atomic():
    db_ins_task.delay(datapoints, user, description)

在将芹菜引入该项目之前,insertIntoDatabase刚刚在views.py中直接被调用,因此任何无效的数据点列表(即不正确的格式化)都不会被插入,整个上传将被取消和回滚。但是,既然上传处于异步芹菜任务中,无效的上载就不再正确回滚。既然上传是一项任务,我如何确保无效的上传仍然被取消并完全取消呢?似乎Django 1.9有一些我可能需要的新东西:transaction.on_commit。但是,目前切换到1.9的主要问题是,在我们的项目Django-Hstore中,它似乎不像是一个重要的依赖项,是兼容的。1.9也在alpha中,所以即使两者兼容,目前也不太适合使用。在Django 1.8中有办法做到这一点吗?

我还查看了django_transaction_barrier,并尝试使用它,但没有运气。在tasks.py中,我将任务更改为

代码语言:javascript
复制
@task(base=TransactionBarrierTask)
def db_ins_task(datapoints, user, description):
    from databaseinserter import insertIntoDatabase
    insertIntoDatabase(datapoints, user, description)

在views.py中,我更改了任务执行:

代码语言:javascript
复制
with transaction.atomic():
    db_ins_task.apply_async_with_barrier(args=(data, user, description,))

但是,这里我的主要问题是,一旦收到任务,芹菜就会抛出一个关于意外关键字参数的错误:

代码语言:javascript
复制
worker_1   | Traceback (most recent call last):
worker_1   |   File "/usr/local/lib/python2.7/site-packages/celery/app/trace.py", line 240, in trace_task
worker_1   |     R = retval = fun(*args, **kwargs)
worker_1   |   File "/usr/local/lib/python2.7/site-packages/celery/app/trace.py", line 438, in __protected_call__
worker_1   |     return self.run(*args, **kwargs)
worker_1   | TypeError: db_ins_task() got an unexpected keyword argument '__transaction_barrier'

那么,做这件事最好的方法是什么?我是否应该继续尝试使用django_transaction_barrier (如果我确实将它用于正确的事情)?如果是这样的话,我做错了什么/错过了什么会导致错误?如果不是,从我的数据库清除无效上传的更好方法是什么?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2015-10-07 21:39:23

芹菜是一个异步的任务跑步者,基本上一旦任务被交给芹菜,它的火就会被遗忘。您不能拥有跨流程边界的事务,因为芹菜将作为工作人员运行。

您可以始终运行另一个任务来查找无效的数据点并清理数据库。简而言之,您想要一个具有两阶段提交的分布式事务,因为它有自己的问题,并且不能在Python中使用,这是不容易完成的。

票数 1
EN

Stack Overflow用户

发布于 2015-10-09 07:22:05

您考虑过将transaction.atomic语句移到任务中吗?甚至插入函数本身?这两种方法都是可行的。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/32999857

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档