目前,我正在使用ZeroRPC,我有“工作人员”连接到“服务器”,并做服务器发送给他们的工作。
目前,只要有调用,就通过ZeroRPC进行调用,据我所知,它使用的是先进先出队列。
我想使用我自己的队列,这样我就可以限制/优先处理呼叫。
我希望ZeroRPC公开一个gevent Event,当它的内部队列为空时触发。
发布于 2012-12-06 03:36:19
你想做的是,在你的服务器上创建你自己的工作队列。在你想要的优先级中调度你自己的电话。
由于很少的几行代码在3卷中表达了比任何吸血故事更多的内容,让我们在伪代码中看看服务器可能是什么样子:
myqueue = MySuperBadAssQueue()
def myqueueprocessor():
for request in myqueue: # blocks until next request
gevent.spawn(request.processme) # do the job asynchronously
gevent.spawn(myqueueprocessor) # do that at startup
class Server:
def dosomething(args...blabla...): # what users are calling
request = Request(args...blabla...)
myqueue.put(request) # something to do buddy!
return request.future.get() # return when request is completed
# (can also raise an exception)
# An example of what a request could look like:
class Request:
def __init__(self, ....blablabla...):
self.future = gevent.AsyncResult()
def process():
try:
result = someworker(self.args*) # call some worker
self.future.set(result) # complete the initial request
except Exception as e:
self.future.set_exception(e)这是由MySuperBadAssQueue来做所有的智能工作,如果你想要油门,取消一个请求,如果必要的异常,等等……
ZeroRPC不会公开任何事件,让你知道它的“内部”队列是否为空:
实际上,ZeroRPC中没有显式队列。发生的事情很简单,就是先来先服务,确切的顺序取决于ZeroMQ和Gevent IOLoop (libevent或libev取决于版本)。碰巧在实践中,这就像一个先进先出队列。
发布于 2013-09-15 10:00:40
我自己还没有尝试过,但我已经通读了源码。我之所以有动力,是因为我想自己做这件事。
看起来您要做的是继承zerorpc.Server并覆盖_acceptor方法。根据source的说法,_acceptor接收消息,然后产生线程来运行它们。因此,如果您更改逻辑/循环以合并您的队列,则可以使用它来限制。
https://stackoverflow.com/questions/13724387
复制相似问题