首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用Spring reactive redis StreamReceiver的随机RedisConnectionException

使用Spring reactive redis StreamReceiver的随机RedisConnectionException
EN

Stack Overflow用户
提问于 2020-09-01 21:15:01
回答 1查看 242关注 0票数 0

我已经实现了Spring data Redis StreamReceiver来消费消息。这对于独立的Redis工作得很好,但是对于集群节点和通过Spring Redis属性配置的连接性,如下所示。

代码语言:javascript
复制
spring:
  redis:
    timeout: 3000
    lettuce:
      cluster:
        refresh:
          adaptive: true
          period: 5m

但我们观察到,Flux流有时会过早关闭,不再侦听新消息,并且flux过早终止。我们使用的示例代码如下所示。

代码语言:javascript
复制
        StreamReceiver<String, MapRecord<String, String, String>> receiver = StreamReceiver.create(connectionFactory);
        return reactiveRedisTemplate.opsForStream().createGroup("stream001", "consumer001")
                .thenMany(receiver.receive(Consumer.from(consumerGroup, getConsumerName()),
                                                StreamOffset.create(stream, ReadOffset.lastConsumed()))
                        .onErrorContinue((err, obj) -> {
                            log.error("Continue on error in receiver.receive()", err);
                        })
                        .doOnCancel(() -> {
                            log.error("Stream receiver flux cancelled");
                        }).doOnComplete(() -> {
                            log.error("Stream receiver flux completed");
                        }).doOnError(err -> {
                            log.error("Stream receiver flux on error: ", err);
                        }).doOnTerminate(() -> {
                            log.error("Stream receiver flux on Termination");
                        })
                );

我们想要进一步调试,因为我们碰巧更好地使用了响应式推送标准。我们尝试添加上面提到的adaptiveRefresh和refreshPeriod道具,但没有解决问题。

我们还在评估重新实例化Stream Listener,以便刷新连接。

我们面对的生态环境

代码语言:javascript
复制
 2020-08-30 16:38:54.955 [lettuce-epollEventLoop-4-2] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
  reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.CompletionException: io.lettuce.core.RedisConnectionExceptio     n: Unable to connect to 10.123.34.38:6379
  Caused by: java.util.concurrent.CompletionException: io.lettuce.core.RedisConnectionException: Unable to connect to 10.123.34.38:6379
          at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
          at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
          at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
          at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
          at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
          at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
          at io.lettuce.core.cluster.PooledClusterConnectionProvider.lambda$getConnectionAsync$6(PooledClusterConnectionProvider.java:38     4)
          at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
          at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
          at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
          at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
          at io.lettuce.core.AbstractRedisClient.lambda$initializeChannelAsync0$4(AbstractRedisClient.java:329)
          at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
          at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570)
          at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549)
          at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
          at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
          at io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
          at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
          at io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe$2.run(AbstractEpollChannel.java:577)
          at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
          at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170)
          at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
          at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
          at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
          at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
          at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
          at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
          at java.lang.Thread.run(Thread.java:748)
  Caused by: io.lettuce.core.RedisConnectionException: Unable to connect to /10.123.34.38:6379
          at io.lettuce.core.RedisConnectionException.create(RedisConnectionxception.java:78)
          at io.lettuce.core.RedisConnectionException.create(RedisConnectionException.java:56)
          ... 23 common frames omitted

谢谢你的帮助

EN

回答 1

Stack Overflow用户

发布于 2020-09-14 11:01:32

给任何可能来到这里的人,张贴用于解决它的方法。这里也没有注意到消息延迟。但是在几个微秒内观察到多个连接问题,这在我们的应用程序中是可以接受的。

代码语言:javascript
复制
        redisStreamMsgs.doOnNext(req -> {
            CopyRequest copyRequest = req.getCopyRequest();
            log.info("Picked from Stream the message for RequestId {}", req.getId());

            // Process our business reactive logiccs
            businessService.performAction(req);

        }).onErrorContinue((throwable, obj) -> {
            log.error("Error while processing the Message ID {}", obj.getClass().getName(), throwable);
        }).doOnSubscribe(msg -> log.error("Message flux Subscribed ")
        ).doOnError(fluxErr -> log.error("Message flux error: ", fluxErr)
        ).doOnComplete(() -> log.error("Messages flux completed for configured")
        ).doOnTerminate(() -> {
            log.info("CopyContext StreamMessages flux terminated ");
            if (!this.beingDestroyed) {
                log.error("Unexpected stream flux termination");
                beansReloaderService.reinitRedisStream().subscribe();
            } else {
                log.info("Normal receiver flux termination due to shutdown");
            }
        }).doFinally(signal -> {
            if(ON_ERROR.equalsIgnoreCase(signal.toString()))
                health.setStreamListenerStatus(false);
        }).subscribe();

我碰巧遇到了beansReloaderService.reinitRedisStream(),如下所示。为了安全起见,如果同样的问题仍然存在,我们会在实例上降低健康

代码语言:javascript
复制
    public Mono<Long> reinitRedisStream() {
        return Mono.delay(Duration.ofMillis(200L)).doOnTerminate(() -> {
            try {
                ConfigurableListableBeanFactory bf = configContext.getBeanFactory();

            bf.destroyBean(bf.getBean(Constants.STREAM_LISTENER));
                bf.destroyBean(bf.getBean(Constants.LISTENER_FACTORY));

                bf.createBean(RedisListenFactory.class);
                bf.createBean(RedisStreamListener.class);
            } catch (Exception ex) {
                log.error("Exception occurred when reloading stream beans", ex);
            }
        });
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/63688444

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档