我想在我的服务器之间建立一个基于事件的系统。例如,当包装我的数据库逻辑的服务器更改状态时,我希望它通知我的其他服务器。发布/订阅设计似乎是这方面的理想选择,我听说过关于ZeroRPC的好消息。
有些人提到过使用zerorpc流来实现发布/订阅,但是我并不清楚如何使用流来触发事件。
发布于 2012-08-21 07:02:25
在dotCloud,我们通过zerorpc流使用了大量的发布/订阅。让我来描述一下我们这样做的方式。
总而言之
我们公开了一个用@zerorpc.stream修饰的流方法。调用此方法时,会将一个gevent.queue添加到set中。然后,该方法将永远循环,生成到达队列的每个消息。当此方法终止时(因为客户端断开连接),队列将从集合中删除。
要发布,只需在集合中注册的每个队列上发布要发布的消息。
zerorpc-python实现示例:
订阅部分
class MyService(object):
def __init__(self):
self._subscribers = set()
@zerorpc.stream
def subscribe(self):
try:
queue = gevent.queue.Queue()
self._subscribers.add(queue)
for msg in queue:
yield msg
finally:
self._subscribers.remove(queue)subscribe方法只是将事件队列添加到集合中。
在这两种情况下,都会执行finally语句,并从订阅服务器列表中删除队列。
请注意,此时可以限制队列的大小:...Queue(maxsize=42)。
发布部分
class MyService(object):
[...]
def _publish(self, msg):
for queue in self._subscribers:
if queue.size < 42:
queue.put(msg)调用此方法以发布消息。它将遍历所有订阅者队列,以便将消息放入其中。在我的示例中,如果队列到达达到特定大小,我将丢弃该消息。但是您想要在那里应用哪种模式是没有限制的。
您可以将订阅者的greenlet实例存储在set中,然后在队列已满时终止它,有效地断开慢客户端的连接(您甚至可以尝试发送一条消息,通知客户端太慢)。你也可以等待所有的消费者在从_publish返回之前并行处理消息,等等。天是有限制的,我的朋友!
希望这能有所帮助!
发布于 2012-12-13 18:19:43
现在ZeroRPC中有一套完全不同的发布者/订阅者功能,它可以在网络上完美地工作!
您可能会有兴趣阅读ZeroRPC测试,了解有关如何使用它的更多提示,在本例中是Publisher和Subscriber类。Here are the tests.
此外,关于发布者/订阅者模式和更多的内容,ÖMQ文档中也有很多很好的信息。You can find it here.
https://stackoverflow.com/questions/11979319
复制相似问题