根据http://ask.github.com/celery/userguide/routing.html#manual-routing中的文档,我可以将queue参数传递给apply_async,以便将任务路由到特定队列。但是,在使用TaskSet时,我会得到
TypeError at /some/path/
apply_async() got an unexpected keyword argument 'queue'在给定TaskSet类https://github.com/ask/celery/blob/master/celery/task/sets.py#L122中的以下代码的情况下,这是不可接受的
def apply_async(self, connection=None, connect_timeout=None,
publisher=None, taskset_id=None):
"""Apply taskset."""
app = self.app
if app.conf.CELERY_ALWAYS_EAGER:
return self.apply(taskset_id=taskset_id)
with app.default_connection(connection, connect_timeout) as conn:
setid = taskset_id or uuid()
pub = publisher or self.Publisher(connection=conn)
try:
results = self._async_results(setid, pub)
finally:
if not publisher: # created by us.
pub.close()
return app.TaskSetResult(setid, results)在某些情况下,我有许多不确定的任务需要应用特殊路由,我应该如何处理?不使用TaskSet?
发布于 2012-03-02 02:59:47
可以将subtasks与选项参数一起使用
>>> from celery.task.sets import TaskSet
>>> from tasks import add
>>>
>>> job = TaskSet(tasks=[add.subtask(args=(i, i),options={'queue':'celery'}) for i in range(10)])
>>> result = job.apply_async()
>>> result.ready()
True
>>> job
[tasks.add(0, 0), tasks.add(1, 1), tasks.add(2, 2), tasks.add(3, 3), tasks.add(4, 4), tasks.add(5, 5), tasks.add(6, 6), tasks.add(7, 7), tasks.add(8, 8), tasks.add(9, 9)]
>>> https://stackoverflow.com/questions/9518568
复制相似问题