给定以下代码;
我有一个假的“热源”,我想在上面每2秒打印一次每个city的最后一个值。我看到log点A和B的行为正如我所期望的那样。然而,代码在groupBy上阻塞,并且只在log点C发出最后的值。我怎么能让"C“每2秒发出一次。
public class Weather {
String city;
Integer temperature;
public Weather(String city, Integer temperature) {
super();
this.city = city;
this.temperature = temperature;
}
@Override
public String toString() {
return "Weather [city=" + city + ", temperature=" + temperature + "]";
}
public static void main(String[] args) {
BlockingQueue<Weather> queue = new LinkedBlockingQueue<>();
new Thread(() -> {
for (int d = 1; d < 100; d += 1) {
for (String s: new String[] {"LDN", "NYC", "PAR", "ZUR"}) {
queue.add(new Weather(s, d));
try { Thread.sleep(250); } catch (InterruptedException e) {}
}
}
}).start();
Flux<Weather> outgoing = Flux.create(
sink -> {
for (int i = 0; i < 100; i++) {
try {
sink.next(queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
sink.complete();
}
);
ConnectableFlux<Weather> subscriber = outgoing.publish();
subscriber
.buffer(Duration.ofSeconds(2))
.log("A")
.flatMap(Flux::fromIterable)
.log("B")
.groupBy(c -> c.city)
.flatMap(Flux::last)
.log("C")
.subscribe(s -> System.out.println(">>>>>" + s));
subscriber.connect();
System.exit(0);
}}
发布于 2019-03-17 03:44:11
这似乎起作用了;
subscriber
.groupBy(c -> c.city)
.flatMap(g -> g
.take(Duration.ofSeconds(5))
.takeLast(1)
)
.subscribe(s -> System.out.println(">>>>>" + s));https://stackoverflow.com/questions/55162934
复制相似问题