首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >SimpleXMLRPCServer调用芹菜任务

SimpleXMLRPCServer调用芹菜任务
EN

Stack Overflow用户
提问于 2016-01-04 19:48:10
回答 1查看 215关注 0票数 0

我试图用SimpleXMLRPCServer和芹菜制作一个简单的RPC服务器。基本上,远程客户端(client.py)可以通过xmlrpc.client将任务调用到服务器(server.py),其中包括注册为芹菜任务(runnable.py)的功能。

问题是,当RPC函数通过register_function注册时,我可以直接调用它的名称,因此它将被正确地执行,但不使用芹菜。我想要实现的是通过name.delay()在client.py中调用它,这种方式将由芹菜执行,但不会锁定服务器线程。因此,server.py应该充当代理,并允许多个客户端调用完整的函数集,如:

代码语言:javascript
复制
for task in flow:
    job = globals()[task]
    job.delay("some arg")
    while True:
        if job.ready():
            break

我尝试过将register_instance与allow_dotted_names=True结合使用,但出现了一个错误:

代码语言:javascript
复制
xmlrpc.client.Fault: <Fault 1: "<class 'TypeError'>:cannot marshal <class '_thread.RLock'> objects">

这让我想到了一个问题

简化代码:

server.py

代码语言:javascript
复制
# ...runnable.py import
# ...rpc init
def register_tasks():
    for task in get_all_tasks():
        setattr(self, task, globals()[task])
        self.server.register_function(getattr(self, task), task)

runnable.py

代码语言:javascript
复制
app = Celery("tasks", backend="amqp", broker="amqp://")

@app.task()
def say_hello():
    return "hello there"

@app.task()
def say_goodbye():
    return "bye, bye"

def get_all_tasks():
    tasks = app.tasks
    runnable = []

    for t in tasks:
        if t.startswith("modules.runnable"):
            runnable.append(t.split(".")[-1])

    return runnable

最后,client.py

代码语言:javascript
复制
s = xmlrpc.client.ServerProxy("http://127.0.0.1:8000")
print(s.say_hello())
EN

回答 1

Stack Overflow用户

发布于 2016-01-05 14:30:22

我想出了一个主意,为芹菜的延迟功能创造一些额外的包装。这些注册方式与RPC客户机调用rpc.the_remote_task.delay(*args)的方式相同。这将返回芹菜作业ID,然后,客户端询问作业是否已通过rpc.ready(job_id)准备好,并使用rpc.get(job_id)获取结果。至于现在,有一个明显的安全漏洞,因为你可以得到结果,当你知道工作ID,但仍然-它工作良好。

注册任务(server.py)

代码语言:javascript
复制
def register_tasks():
    for task in get_all_tasks():
        exec("""def """ + task + """_runtime_task_delay(*args):
return celery_wrapper(""" + task + """, "delay", *args)
setattr(self, task + "_delay", """ + task + """_runtime_task_delay)
            """)

        f_delay = task + "_delay"
        self.server.register_function(getattr(self, f_delay), task + ".delay")

    def job_ready(jid):
        return celery_wrapper(None, "ready", jid)

    def job_get(jid):
        return celery_wrapper(None, "get", jid)

    setattr(self, "ready", job_ready)
    setattr(self, "get", job_get)

    self.server.register_function(job_ready, "ready")
    self.server.register_function(job_get, "get")

包装器(server.py)

代码语言:javascript
复制
def celery_wrapper(task, method, *args):
    if method == "delay":
        job = task.delay(*args)
        job_id = job.id

        return job_id
    elif method == "ready":
        res = app.AsyncResult(args[0])
        return res.ready()
    elif method == "get":
        res = app.AsyncResult(args[0])
        return res.get()
    else:
        return "0"

和RPC调用(client.py)

代码语言:javascript
复制
jid = s.the_remote_task.delay("arg1", "arg2")
is_running = True
while is_running:
        is_running = not s.ready(jid)

        if not is_running:
                print(s.get(jid))
        time.sleep(.01)
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/34598680

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档