我正在尝试使用Spring WebFlux构建一个简单的聊天服务器。这是一件容易的事情,而且像往常一样工作。我现在尝试实现的是在服务器端终止Flux流。想像一下,有一个无限的流量,就像这样:
@GetMapping(produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<Object> join(@PathVariable String user)我有10个客户端/订阅者连接到该事件流。现在,我想终止一个特定客户端的连接,因为例如,用户在聊天时被诅咒。管他呢。是否可以管理/识别此类端点的订阅者?
发布于 2019-03-24 15:29:32
您可以使用.takeUntilOther(Publisher)操作符构建一些内容,并让给定的Publisher在用户应该断开连接时发出...
@GetMapping(produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<Object> join(@PathVariable String user) {
return Flux.from(/* your logic to build your flux */)
.takeUntilOther(disconnector.onDisconnectUser(user));
}onDisconnectUser(String user)的一种可能的实现是过滤发出用户名的“全局”热Flux<String>,使其按给定的用户名断开连接。可能是这样的:
public class UserDisconnecter {
private final FluxProcessor<String, String> processor;
private final FluxSink<String> sink;
public UserDisconnecter() {
this.processor = DirectProcessor.create();
this.sink = this.processor.sink();
}
/**
* Signals that all existing streams for this user should be disconnected.
*/
public void disconnectUser(String user) {
this.sink.next(user);
}
/**
* Returns a Mono that emits when the given user should be disconnected.
*/
public Mono<String> onDisconnectUser(String user) {
return processor
.filter(user::equals)
.next();
}
}这是一个简单的实现,但应该是您开始使用的东西。
https://stackoverflow.com/questions/47933765
复制相似问题