我正在使用flask和flask-restx,尝试创建一个协议来从另一个服务中获取一个特定的字符串。我正在设法在不同的线程中运行服务器中的函数。下面是我的代码示例:
from flask_restx import Api,fields,Resource
from flask import Flask
app = Flask(__name__)
api = Api(app)
parent = api.model('Parent', {
'name': fields.String(get_answer(a,b)),
'class': fields.String(discriminator=True)
})
@api.route('/language')
class Language(Resource):
# @api.marshal_with(data_stream_request)
@api.marshal_with(parent)
@api.response(403, "Unauthorized")
def get(self):
return {"happy": "good"}我所期望的:
在客户端,首先服务器应该运行,也就是说,我们应该能够使curl -i localhost:8080工作。然后,当特定条件为真时,客户端应该收到带有我在服务器中的父JSON数据的GET请求。但是,如果该条件为真,则GET请求不应该能够返回正确的结果。
我所做的:
我使用的方法之一是将装饰器和Class Language(Resource)部件封装在不同的函数中,并在不同的线程中错误地处理该函数,并将该线程置于条件检查之下。我不确定这是否是正确的方法,我看到有人说celery可能是一个好的选择,但不确定这在flask-restx中是否有效。
发布于 2022-12-04 15:43:00
我有答案给你。若要使用烧瓶在后台运行进程,请将其调度为使用APScheduler使用另一个进程运行。一个非常简单的包,它帮助您安排任务,以便在某个时间间隔运行函数,在您的例子中,只需在utcnow()一次。
这是指向瓶-APScheduler的链接。
job = scheduler.add_job(myfunc, 'interval', minutes=2)在您的情况下,使用“日期”而不是“间隔”,并指定run_date
job = scheduler.add_job(myfunc, 'date', run_date=datetime.utcnow())以下是文档:用户指南
https://stackoverflow.com/questions/74672488
复制相似问题