我希望我的应用程序对来自数千个不同客户端的UDP事件做出反应。每个客户端每5-10秒发送1-10个UDP数据包。每个包都会并且应该很快地被处理(主要是在内存和小的计算中,帮助进行redis,只是偶尔的DB调用)。将不会将数据返回给来电者。
我在春天实现了反应堆,就像他们在维基上描述的那样。然后,我实现了UDP入站通道,就像中描述的那样。下面是配置:
<int-ip:udp-inbound-channel-adapter id="receiverChannel"
channel="stringConvert"
port="9000"
multicast="false"
check-length="false"
pool-size="10"
lookup-host="false"
/>
<int:transformer id="convertChannel"
input-channel="stringConvert"
output-channel="toProcess"
ref="transformer"
method="transform"
/>
<int:service-activator input-channel="toProcess"
ref="accumulator"
method="accumulate"/>
<bean id="accumulator" class="hello.UDPAccumulator" />
<bean id="transformer" class="hello.UDPTransformer" />然后在UDPAccumulator上,我向反应堆发布了这条信息:
@Service
public class UDPAccumulator {
@Autowired
ReactorProducer producer;
public void accumulate(String quote) {
producer.fireEvent(quote);
}}
这是“正确”的方式来做这件事,我想要高通量产出?int-ip:udp-入站通道适配器的内部工作原理是什么?在将消息传递给反应堆之前,它能成为瓶颈吗?我看到反应堆有一些TCP相关的类和支持,但没有UDP。任何关于如何做到这最好的方式的建议都会受到赞赏!
奖金问题。如果信息到达的速度比发送到反应堆的速度还快呢?redis消息存储 (文章的底部)会有帮助吗?如果我在反应堆里处理这些包的方法很慢呢?
发布于 2013-11-04 15:00:09
由于我们在反应堆中还没有直接的UDP支持,所以您将事件发布到反应堆中的抽象是非常明智的。但是你在你的“奖金问题”中确实注意到,在发行者/消费者吞吐量方面存在一些问题,必须以特定领域的方式来管理;那里没有灵丹妙药。
在您的用例中,我很想说Processor [1]可能更合适。它为数据处理提供了更高的总体吞吐量,因为它绕过了在普通Reactor中发生的基于动态选择器的调度。除非您是根据某些主题标准将传入事件分配给不同的处理程序,否则我建议您转而考虑这一点。有了更高的吞吐量,您将不得不少担心消费者保持进度(除非您的Consumer正在做一些非常慢的事情,这没有什么能够自动加快)。
但是,如果您真的非常需要管理待办事项,我建议通过一个Queue将您的生产者和消费者分离开来。反应器有一个PersistentQueue [2]抽象,您可以使用JavaChronicle [3]将对象发布到磁盘并持久化到磁盘,然后使用Poller将对象释放到Consumer (javadoc将在本周某个时候出现在Poller上,因为我们已经为1.0...it做好了以前称为Pipe [4]的准备)。
发布于 2013-11-04 14:22:34
我不能和反应堆交谈,但是UDP适配器有一个专门的线程,可以读取原始数据包并将它们交给TaskExecutor。它会尽快这样做,这样它就可以继续阅读下一个包了。
默认的TaskExecutor是一个固定的线程池。
反应堆有一个可以注入适配器的DispatcherTaskExecutor。
主读取器线程和切换使用相同的任务执行器。
https://stackoverflow.com/questions/19749382
复制相似问题