首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >为从MySQL查询返回的每一行运行芹菜任务?

为从MySQL查询返回的每一行运行芹菜任务?
EN

Stack Overflow用户
提问于 2016-08-01 22:10:03
回答 1查看 2.5K关注 0票数 2

我以前使用过Python,但只在Flask应用程序中使用过,但我以前从未使用过芹菜。在读取了文档并设置了所有内容(正如我对多个工作人员测试过的那样)之后,我尝试运行一个SQL查询,对于从查询中返回的每一行,将其发送出去,由芹菜工作者处理。

下面是一个非常基本的代码示例。

代码语言:javascript
复制
from celery import Celery
import MySQLdb

app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def print_domain():
    db = MySQLdb.connect(host="localhost", user="DB_USER", passwd="DB_PASS", db="DB_NAME")
    cur = db.cursor()
    cur.execute("SELECT * FROM myTable")

    for row in cur.fetchall():
        print_query_result(row[0])

    db.close()

def print_query_result(result):
    print result

基本上,它选择'myTable‘表中的所有内容,并为返回的每一行打印出来。如果我只使用Python调用代码,它就可以正常工作,并打印MySQL表中的所有数据。当我调用它时,使用.delay()函数将其发送给一个工作人员来处理,它只将它发送给一个工作人员,并且只输出数据库中的上一行。

我一直在试着读一些子任务,但我不确定我是否会朝着正确的方向前进。

总之,我希望这一切发生,但我不知道从何说起。有人有什么想法吗?

  • SQL查询以选择表中的所有行
  • 将每一行/结果发送给工作人员以处理某些代码
  • 将代码结果返回数据库
  • 在队列中捡起下一项(如果有的话)

提前谢谢。

编辑1:

我已经更新了代码以使用SQLAlchemy,但是结果仍然像我以前的查询一样返回,这很好。

代码语言:javascript
复制
from celery import Celery
from models import DBDomains

app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def print_domain():
    query = DBDomains.query.all()
    for i in query:
        print i.domain
        print_query_result.s()

@app.task
def print_query_result():
    print "Received job"

print_domain.delay()

运行.py文件时的工作人员返回:

代码语言:javascript
复制
[2016-08-02 02:08:40,881: INFO/MainProcess] Received task: tasks.print_domain[65d7667a-fc70-41f7-8caa-b991f360a9de]
[2016-08-02 02:08:41,036: WARNING/Worker-3] result1
[2016-08-02 02:08:41,037: WARNING/Worker-3] result2
[2016-08-02 02:08:41,039: INFO/MainProcess] Task tasks.print_domain[65d7667a-fc70-41f7-8caa-b991f360a9de] succeeded in 0.154022816569s: None

如您所见,工作人员从我正在查询的表中获取'result1‘和'result2’,但是它似乎没有在子任务中执行命令,即只打印"Job“。

更新:看起来子任务的末尾必须有一个.delay(),就像芹菜文档一样,所以我的代码看起来是这样的,并且现在已经成功地在工人之间分发了作业。

代码语言:javascript
复制
from celery import Celery
from models import DBDomains

app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def print_domain():
    query = DBDomains.query.all()
    for i in query:
        subtask = print_query_result.s(i.domain)
        subtask.delay()


@app.task
def print_query_result(domain):
    print domain

 print_domain.delay()
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2016-08-02 00:54:50

无论何时从任务内部调用任务,都必须使用子任务。幸运的是,语法很简单。

代码语言:javascript
复制
from celery import Celery

app = Celery('tasks', broker='redis://127.0.0.1:6379/0')


@app.task
def print_domain():
    for x in range(20):
        print_query_result.s(x)


@app.task
def print_query_result(result):
    print(result)

(用查询结果替换范围(20)中的x。)如果你在观察芹菜的产量,你会看到任务的创建和分配给工人。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/38708893

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档