我希望能够在我的celery任务中使用异步代码。它适用于异步或龙卷风。我发现,我可以做这样的事情
from tornado.ioloop import IOLoop
from celery._state import _task_stack
from . import celery
class AsyncTask(celery.Task):
def __call__(self, *args, **kwargs):
_task_stack.push(self)
self.push_request(args=args, kwargs=kwargs)
try:
IOLoop.current().run_sync(lambda: self.run(*args, **kwargs))
finally:
self.pop_request()
_task_stack.pop()然后像这样使用它
from .celery import celery
from tornado.httpclient import AsyncHTTPClient
@celery.task(base=AsyncTask)
async def test_async_celery_task(x, y):
result = await AsyncHTTPClient().fetch(request='https://google.com.ua')
print('Async IS OKAY: {}'.format(result))或者,我可以在任务中直接使用run_sync,这并不可取
我想知道是否可以这样做,或者我应该在worker中启动事件循环,并通过add_future启动我的任务。有没有其他人做过类似的事情?我能希望得到一些性能上的增强吗?
我需要这样做,因为我需要能够使用项目其他部分的异步代码,比如数据库调用ets
发布于 2019-09-08 17:23:40
并不是ioloop的正确使用使这个过程变得复杂,而是让Celery知道它可以并且应该使用异步任务。Celery使用池进行操作,在那里它可以调度您的作业。池可以是线程化的,可以是多进程的,Celery知道(通过配置)它有X个进程,Y个线程,Z个工作进程,其中有多少有工作或空闲。但是自动柜员机,芹菜从技术上能够接收和运行协程,但开箱即可无法计算它们,以跟踪它们中有多少有工作或空闲。如果你想看看它是什么样子的--那就是a development code of future Celery's Asyncpool。
https://stackoverflow.com/questions/57773149
复制相似问题