好的,我是RSocket的新手。我正在尝试创建一个简单的RSocket客户端和简单的RSocket服务器。从我所做的研究来看,RSocket支持恢复:
它特别有用,因为当发送包含有关最后接收到的帧的信息的恢复帧时,客户端能够恢复连接,并且仅请求它尚未接收到的数据,从而避免了服务器上不必要的负载和浪费时间来尝试检索已经检索到的数据。
它还说,客户端负责启用恢复。我的问题是如何启用此恢复,以及如何发送该恢复帧。我有功能正常的客户机和服务器,但如果我关闭服务器并重新启动它,什么都不会发生,稍后当客户机再次尝试与服务器通信时,它会抛出: java.nio.channels.ClosedChannelException。
这是我的客户端配置:
@Configuration
public class ClientConfiguration {
/**
* Defining the RSocket client to use tcp transport on port 7000
*/
@Bean
public RSocket rSocket() {
return RSocketFactory
.connect()
.resumeSessionDuration(Duration.ofDays(10))
.mimeType(MimeTypeUtils.APPLICATION_JSON_VALUE, MimeTypeUtils.APPLICATION_JSON_VALUE)
.frameDecoder(PayloadDecoder.ZERO_COPY)
.transport(TcpClientTransport.create(7000))
.start()
.block();
}
/**
* RSocketRequester bean which is a wrapper around RSocket
* and it is used to communicate with the RSocket server
*/
@Bean
RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
return RSocketRequester.wrap(rSocket(), MimeTypeUtils.APPLICATION_JSON, MimeTypeUtils.APPLICATION_JSON, rSocketStrategies);
}}
这是一个RestController,我从它开始与rsocket服务器通信:
@RestController
public class UserDataRestController {
private final RSocketRequester rSocketRequester;
public UserDataRestController(RSocketRequester.Builder rSocketRequester) {
this.rSocketRequester = rSocketRequester.connectTcp("localhost", 7000).block();
}
@GetMapping(value = "/feed/{firstName}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Publisher<Person> feed(@PathVariable("firstName") String firstName) {
return rSocketRequester
.route("feedPersonData")
.data(new PersonDataRequest(firstName))
.retrieveFlux(Person.class);
}
}发布于 2020-01-29 15:49:27
由于会话存储在内存中,因此无法在服务器重新启动后恢复。参见io.rsocket.resume.SessionManager#sessions。
但是,如果您重新连接到同一服务器,您仍然可以保护自己免受网络问题的影响。而且你不需要发送简历帧,客户端会为你发送。
您应该配置服务器:
@Bean
ServerRSocketFactoryProcessor serverRSocketFactoryProcessor() {
return RSocketFactory.ServerRSocketFactory::resume;
}和客户端io.rsocket.RSocketFactory.ClientRSocketFactory#resume。
您可以找到几乎完整的here示例
发布于 2020-07-06 12:43:25
@Alexander Pankin提供的代码现在已被弃用。我使用下面的代码来配置服务器上的简历:
@Bean
RSocketServerCustomizer rSocketResume() {
Resume resume =
new Resume()
.sessionDuration(Duration.ofMinutes(15))
.retry(
Retry.fixedDelay(Long.MAX_VALUE, Duration.ofSeconds(5))
.doBeforeRetry(s -> log.debug("Disconnected. Trying to resume...")));
return rSocketServer -> rSocketServer.resume(resume);
}https://stackoverflow.com/questions/59940849
复制相似问题