我是响应式编程的新手,有很多问题。我认为这并不是缺乏示例或文档,而是我的理解是错误的。
我在试着模仿慢速用户;
以下是代码示例
Flux.create(sink -> {
int i = 0;
while (true) {
try {
System.out.println("Sleep for " + MILLIS);
Thread.sleep(MILLIS);
int it = i++;
System.out.println("Back to work, iterator " + it);
sink.next(it);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).subscribeOn(Schedulers.elastic())
.subscribe(x -> {
try {
System.out.println("Value: " + x + ", Thread: " + Thread.currentThread().toString());
Thread.sleep(MILLIS + 4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});系统输出为
Sleep for 1000
Back to work, iterator 0
Value: 0, Thread: Thread[elastic-2,5,main]
Sleep for 1000
Back to work, iterator 1
Value: 1, Thread: Thread[elastic-2,5,main]
Sleep for 1000
Back to work, iterator 2
Value: 2, Thread: Thread[elastic-2,5,main]我想如果订阅者很慢,我应该会看到更多的线程,因为Schedulers.elastic()
我也试着让publishOn()看起来像是异步的,但是仍然不能处理几个线程的结果。
感谢您的评论和回答。
发布于 2019-02-21 18:36:02
如果你想让它在不同的线程中运行,你需要像这样使用.parallel(),它将会在不同的线程中运行
Flux.create(sink -> {
int i = 0;
while (true) {
try {
System.out.println("Sleep for " + MILLIS);
Thread.sleep(100);
int it = i++;
System.out.println("Back to work, iterator " + it);
sink.next("a");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
})
.parallel()
.runOn(Schedulers.elastic())
.subscribe(x -> {
try {
System.out.println("Value: " + x + ", Thread: " + Thread.currentThread().toString());
Thread.sleep(100 + 4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
})
;
}https://stackoverflow.com/questions/54802472
复制相似问题