我希望使用Java实现各种发布者/订阅者模式,并且目前已经没有任何想法了。
有一个发布者和N个订阅者,发布者发布对象然后每个订阅者需要以正确的顺序处理每一个对象一次和一次。发布服务器和每个订阅服务器在各自的线程中运行。
在我最初的实现中,每个订阅服务器都有自己的阻塞队列,发布者将对象放入每个订阅服务器队列中。此操作正常,但如果订阅服务器的任何队列已满,则发布服务器将被阻塞。这导致性能下降,因为每个订阅者在处理对象时花费不同的时间。
然后,在另一个实现中,发布服务器将对象保存在自己的队列中。与对象一起,AtomicInteger计数器与外部订阅者的数量相关联。然后,每个订阅者查看队列并减少计数器,并在计数器达到零时将其从队列中移除。
这样,发布服务器就没有阻塞,但现在订阅服务器需要等待对方处理对象,将对象从队列中移除,然后才能查看下一个对象。
有什么更好的方法吗?我想这应该是一个很常见的模式。
发布于 2013-05-31 16:33:29
您的“多队列”实现是要走的路。我不认为您需要关注一个阻塞生产者的完整队列,因为完成的整个时间不会受到影响。假设有三个消费者,两个以每秒1的速度消费,第三个以每5秒1的速度消费,同时生产者以每2秒1的速度生产。最后,第三个队列将被填满,因此生产者将阻塞它,并将停止在第一个和第二个队列中放置项目。解决这个问题的方法是有办法的,但它们不会改变这样一个事实:第三个消费者将永远是的瓶颈。如果您正在生产/消费100项产品,那么由于第三个用户(5秒乘以100项),这至少要花费500秒,即使第一个和第二个消费者在200秒后完成(因为您已经做了一些聪明的事情,允许生产者在第三个队列满后也继续填充他们的队列),或者如果它们在500秒后完成(因为生产者阻塞了第三个队列),情况也是如此。
发布于 2013-05-31 16:46:43
确定
每个订阅服务器都有自己的阻塞队列,而发布服务器将对象放入每个订阅服务器的队列中。
这就是该走的路。您可以使用线程方法将其放入队列中..。因此,如果一个队列已满,发布者将不会等待。
例如。
s1 s2 s3是用户,addToQueue是每个用户中的方法,这增加了腐蚀队列。addQueue方法是等待队列为非空队列的方法。所以呼叫addQueue将是一个阻塞呼叫ideally synchronised code..。
然后,在publisher中,您可以执行类似于以下代码的操作
注意:代码可能不处于工作状态。但应该给你个主意。
List<subscriber> slist;// Assume its initialised
public void publish(final String message){
for (final subscriber s: slist){
Thread t=new Thread(new Runnable(){
public void run(){
s.addToQueue(message);
}
});
t.start();
}
}发布于 2013-05-31 17:42:38
有一个发布者和N个订阅者,发布者发布对象然后每个订阅者需要以正确的顺序处理每一个对象一次和一次。发布服务器和每个订阅服务器在各自的线程中运行。
我会改变这个架构。我最初考虑了每个订阅者的队列,但我不喜欢这种机制。例如,如果第一个订阅服务器运行时间更长,那么所有的作业都将在该队列中结束,而您只需要执行一个线程的工作。
由于您必须按顺序运行订阅服务器,所以我将有一个线程池,它通过所有订阅服务器运行每条消息。对订阅者的调用将需要重入,这可能是不可能的。
因此,您将有一个由10个线程组成的池(比方说),并且每个线程都从发布服务器队列中排出队列,并执行如下操作:
public void run() {
while (!shutdown && !Thread.currentThread().isInterrupted()) {
Article article = publisherQueue.take();
for (Subscriber subscriber : subscriberList) {
subscriber.process(article);
}
}
}https://stackoverflow.com/questions/16861729
复制相似问题