我以前使用过Python,但只在Flask应用程序中使用过,但我以前从未使用过芹菜。在读取了文档并设置了所有内容(正如我对多个工作人员测试过的那样)之后,我尝试运行一个SQL查询,对于从查询中返回的每一行,将其发送出去,由芹菜工作者处理。
下面是一个非常基本的代码示例。
from celery import Celery
import MySQLdb
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def print_domain():
db = MySQLdb.connect(host="localhost", user="DB_USER", passwd="DB_PASS", db="DB_NAME")
cur = db.cursor()
cur.execute("SELECT * FROM myTable")
for row in cur.fetchall():
print_query_result(row[0])
db.close()
def print_query_result(result):
print result基本上,它选择'myTable‘表中的所有内容,并为返回的每一行打印出来。如果我只使用Python调用代码,它就可以正常工作,并打印MySQL表中的所有数据。当我调用它时,使用.delay()函数将其发送给一个工作人员来处理,它只将它发送给一个工作人员,并且只输出数据库中的上一行。
我一直在试着读一些子任务,但我不确定我是否会朝着正确的方向前进。
总之,我希望这一切发生,但我不知道从何说起。有人有什么想法吗?
提前谢谢。
编辑1:
我已经更新了代码以使用SQLAlchemy,但是结果仍然像我以前的查询一样返回,这很好。
from celery import Celery
from models import DBDomains
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def print_domain():
query = DBDomains.query.all()
for i in query:
print i.domain
print_query_result.s()
@app.task
def print_query_result():
print "Received job"
print_domain.delay()运行.py文件时的工作人员返回:
[2016-08-02 02:08:40,881: INFO/MainProcess] Received task: tasks.print_domain[65d7667a-fc70-41f7-8caa-b991f360a9de]
[2016-08-02 02:08:41,036: WARNING/Worker-3] result1
[2016-08-02 02:08:41,037: WARNING/Worker-3] result2
[2016-08-02 02:08:41,039: INFO/MainProcess] Task tasks.print_domain[65d7667a-fc70-41f7-8caa-b991f360a9de] succeeded in 0.154022816569s: None如您所见,工作人员从我正在查询的表中获取'result1‘和'result2’,但是它似乎没有在子任务中执行命令,即只打印"Job“。
更新:看起来子任务的末尾必须有一个.delay(),就像芹菜文档一样,所以我的代码看起来是这样的,并且现在已经成功地在工人之间分发了作业。
from celery import Celery
from models import DBDomains
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def print_domain():
query = DBDomains.query.all()
for i in query:
subtask = print_query_result.s(i.domain)
subtask.delay()
@app.task
def print_query_result(domain):
print domain
print_domain.delay()发布于 2016-08-02 00:54:50
无论何时从任务内部调用任务,都必须使用子任务。幸运的是,语法很简单。
from celery import Celery
app = Celery('tasks', broker='redis://127.0.0.1:6379/0')
@app.task
def print_domain():
for x in range(20):
print_query_result.s(x)
@app.task
def print_query_result(result):
print(result)(用查询结果替换范围(20)中的x。)如果你在观察芹菜的产量,你会看到任务的创建和分配给工人。
https://stackoverflow.com/questions/38708893
复制相似问题