我需要一个计时器,它是基于用户在‘s上的变量。下面是最小的例子:
Flux.<Integer> create(e -> {
log.info("create"); // Never gets triggered
e.next(2); // Sample user input: change to 2 second interval
})
.switchMap(v -> Flux.interval(Duration.ofSeconds(v)))
.startWith(Flux.interval(Duration.ofSeconds(1)))
.subscribe(e -> log.info("subscribe: {}", e)); // This works在上述情况中:
Flux<Integer>,它应该根据用户的输入(在上面只发出2)发出东西,switchMap的新间隔。上面的内容在switchMap部件下面工作,也就是说,我看到它每秒钟记录一次“订阅: N”,但是"create“不会被记录,e.next(2)也不会被调用。
为什么这不管用?这个用例有更好的解决方案吗?
发布于 2019-03-21 08:57:28
正如在JavaDoc中所描述的那样,Flux#startWith将在前面加上给定的序列。
由于您将Flux.interval(Duration.ofSeconds(1))作为参数传递,它将每秒钟无限地发出多头,并且您的Flux.create-based发布服务器永远不会被订阅。
但是,如果将其更改为:
.startWith(Mono.delay(Duration.ofSeconds(1)))还可以考虑将代码更改为:
Flux.<Integer> create(e -> {
log.info("create");
e.next(2);
})
.startWith(1)
.switchMap(v -> Flux.interval(Duration.ofSeconds(v)))
.subscribe(e -> log.info("subscribe: {}", e));在这里,我们在startWith块之后使用Flux.create,并让switchMap处理它作为任何其他信号。
另外,请注意,switchMap(v -> Flux.interval(Duration.ofSeconds(v)))读为:
“开始每N秒发出一次,其中N是最新发出的值”
如果您只需要“延迟”一次,也可以考虑在这里使用Mono.delay。
https://stackoverflow.com/questions/55273419
复制相似问题