我正在运行一个简单的测试,将来自4个线程的消息发布到TopicProcessor,而在订阅服务器中,我只需将它们添加到集合中即可。守则如下:
@Test
public void testProcessingMessages() throws Exception {
int numberOfMessages = 1000;
TopicProcessor<Integer> processor = TopicProcessor.create();
ExecutorService executorService = Executors.newFixedThreadPool(4);
Queue<Integer> messages = new ConcurrentLinkedQueue<>();
processor.subscribe(messages::add);
AtomicInteger counter = new AtomicInteger(0);
for (int i = 0; i < numberOfMessages; i++) {
executorService.submit(() -> {
processor.onNext(counter.incrementAndGet());
});
}
Thread.sleep(10000);
assertEquals(numberOfMessages, messages.size());
}但是最终断言失败了,通常大约在980-990条实际消息,而不是预期的1000条消息。我是不是遗漏了什么?
发布于 2017-04-13 13:04:22
问题是,TopicProcessor.create创建了一个处理器,该处理器期望从单个线程发布。从多个线程生成时应使用TopicProcessor.share。
https://stackoverflow.com/questions/43392047
复制相似问题