首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >当服务器在中途重新启动时,RSocket重新连接无法工作。

当服务器在中途重新启动时,RSocket重新连接无法工作。
EN

Stack Overflow用户
提问于 2020-12-13 08:13:01
回答 1查看 830关注 0票数 1

我正在尝试测试RSocket的重连接特性,但是当服务器在中间关闭并重新启动时,它无法工作。

步骤1:在服务器关闭时启动客户端

代码语言:javascript
复制
16:02:35.351 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.processId: 55104 (auto-detected)
16:02:35.515 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.machineId: 10:e7:c6:ff:fe:31:38:c0 (auto-detected)
16:02:35.560 [reactor-tcp-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id: 0x7b01559f] Created a new pooled channel, now 1 active connections and 0 inactive connections
16:02:35.569 [reactor-tcp-nio-2] DEBUG reactor.netty.transport.TransportConfig - [id: 0x7b01559f] Initialized pipeline DefaultChannelPipeline{(reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
doAfterRetry ===>io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:7999
16:02:36.619 [reactor-tcp-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id: 0x8a39ea36] Created a new pooled channel, now 1 active connections and 0 inactive connections
16:02:36.620 [reactor-tcp-nio-2] DEBUG reactor.netty.transport.TransportConfig - [id: 0x8a39ea36] Initialized pipeline DefaultChannelPipeline{(reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
doAfterRetry ===>io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:7999
16:02:37.625 [reactor-tcp-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id: 0x9754e5dd] Created a new pooled channel, now 1 active connections and 0 inactive connections
16:02:37.625 [reactor-tcp-nio-2] DEBUG reactor.netty.transport.TransportConfig - [id: 0x9754e5dd] Initialized pipeline DefaultChannelPipeline{(reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
doAfterRetry ===>io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:7999

步骤2:服务器启动。重新连接工作

代码语言:javascript
复制
16:08:31.359 [reactor-tcp-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id: 0x67b9f3a1] Created a new pooled channel, now 1 active connections and 0 inactive connections
16:08:31.359 [reactor-tcp-nio-2] DEBUG reactor.netty.transport.TransportConfig - [id: 0x67b9f3a1] Initialized pipeline DefaultChannelPipeline{(reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
16:08:31.862 [reactor-tcp-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id: 0x67b9f3a1, L:/127.0.0.1:57689 - R:localhost/127.0.0.1:7999] Registering pool release on close event for channel
16:08:31.863 [reactor-tcp-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id: 0x67b9f3a1, L:/127.0.0.1:57689 - R:localhost/127.0.0.1:7999] Channel connected, now 1 active connections and 0 inactive connections
16:08:31.863 [reactor-tcp-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id: 0x67b9f3a1, L:/127.0.0.1:57689 - R:localhost/127.0.0.1:7999] onStateChange(PooledConnection{channel=[id: 0x67b9f3a1, L:/127.0.0.1:57689 - R:localhost/127.0.0.1:7999]}, [connected])
16:08:31.865 [reactor-tcp-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id: 0x67b9f3a1, L:/127.0.0.1:57689 - R:localhost/127.0.0.1:7999] onStateChange(ChannelOperations{PooledConnection{channel=[id: 0x67b9f3a1, L:/127.0.0.1:57689 - R:localhost/127.0.0.1:7999]}}, [configured])
16:08:31.891 [reactor-tcp-nio-2] DEBUG io.rsocket.FrameLogger - sending -> 
Frame => Stream ID: 0 Type: SETUP Flags: 0b0 Length: 75
Data:

16:08:31.957 [reactor-tcp-nio-2] DEBUG reactor.netty.channel.FluxReceive - [id: 0x67b9f3a1, L:/127.0.0.1:57689 - R:localhost/127.0.0.1:7999] FluxReceive{pending=0, cancelled=false, inboundDone=false, inboundError=null}: subscribing inbound receiver
16:08:31.969 [reactor-tcp-nio-2] DEBUG io.rsocket.FrameLogger - sending -> 
Frame => Stream ID: 1 Type: REQUEST_STREAM Flags: 0b100000000 Length: 57 InitialRequestN: 9223372036854775807
Metadata:

步骤3:关闭服务器

代码语言:javascript
复制
16:10:10.993 [reactor-tcp-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id: 0x67b9f3a1, L:/127.0.0.1:57689 ! R:localhost/127.0.0.1:7999] Channel closed, now 0 active connections and 0 inactive connections
null
16:10:10.999 [reactor-tcp-nio-2] DEBUG org.springframework.core.codec.CharSequenceEncoder - Writing "ClientName:1607847010998"
16:10:11.008 [reactor-tcp-nio-2] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.nio.channels.ClosedChannelException
Caused by: java.nio.channels.ClosedChannelException: null
16:10:11.008 [reactor-tcp-nio-2] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.nio.channels.ClosedChannelException
Caused by: java.nio.channels.ClosedChannelException: null
16:10:11.010 [reactor-tcp-nio-2] DEBUG reactor.netty.ReactorNetty - [id: 0x67b9f3a1, L:/127.0.0.1:57689 ! R:localhost/127.0.0.1:7999] Non Removed handler: RSocketLengthCodec, context: ChannelHandlerContext(RSocketLengthCodec, [id: 0x67b9f3a1, L:/127.0.0.1:57689 ! R:localhost/127.0.0.1:7999]), pipeline: DefaultChannelPipeline{(RSocketLengthCodec = io.rsocket.transport.netty.RSocketLengthCodec), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
16:10:11.010 [reactor-tcp-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id: 0x67b9f3a1, L:/127.0.0.1:57689 ! R:localhost/127.0.0.1:7999] onStateChange(ChannelOperations{PooledConnection{channel=[id: 0x67b9f3a1, L:/127.0.0.1:57689 ! R:localhost/127.0.0.1:7999]}}, [disconnecting])

在上述步骤之后,RSocket没有重新连接。下面是我的节目。能不能有人帮我复习一下,并建议一下这是怎么回事?

代码语言:javascript
复制
        RSocketStrategies strategies = RSocketStrategies.builder()
                .encoders(e -> e.add(new Jackson2JsonEncoder()))
                .decoders(e -> e.add(new Jackson2JsonDecoder()))
                .build();

        RSocketRequester r = RSocketRequester.builder()
                .rsocketConnector(connector ->
                        connector.reconnect(Retry.indefinitely().doAfterRetry(e->  System.out.println("doAfterRetry ===>"+e.failure())))
                ).dataMimeType(MediaType.APPLICATION_JSON)
                .rsocketStrategies(strategies)
                .tcp("localhost", 7999);

更新于12月15日

下面的代码在步骤2之后发送请求。断开连接后,它无法恢复流。我肯定我的代码里漏掉了什么。请帮帮忙

代码语言:javascript
复制
requester.route("route_name")
                .data("RequestData")
                .retrieveFlux(MyResponse.class)
                .doOnError(ex ->{
                    System.out.println("doOnError"+ex);
                }).doOnCancel(()->{
                    System.out.println("doOnCancel");
                }).doOnComplete(()-> {
                    System.out.println("doOnCancel");
                })
                .subscribe(result -> {
                    System.out.println("===>"+result);
                });
EN

回答 1

Stack Overflow用户

发布于 2020-12-15 16:24:51

重新连接方法具有广泛的Javadoc。这个特性的主要目的是建立一个单一的共享连接,无论同一时间有多少订阅者:

当启用

时,该类的连接方法返回一个特殊的Mono,它维护单个共享的RSocket

Retry在放弃之前确定要继续尝试连接的时间,但是一旦它停止尝试,或者一旦连接建立,然后丢失,它就不会自动尝试再次连接。

单个请求的

下游订阅者仍然需要自己的重试逻辑来确定是否或何时重新尝试失败的请求,从而触发共享重新连接。

因此,每个单独请求的需求决定是否再次尝试连接,而给Retryreconnect决定每个共享重新连接的重试逻辑。

也一定要检查Javadoc中相应的代码片段。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65273622

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档