我已经实现了Spring data Redis StreamReceiver来消费消息。这对于独立的Redis工作得很好,但是对于集群节点和通过Spring Redis属性配置的连接性,如下所示。
spring:
redis:
timeout: 3000
lettuce:
cluster:
refresh:
adaptive: true
period: 5m但我们观察到,Flux流有时会过早关闭,不再侦听新消息,并且flux过早终止。我们使用的示例代码如下所示。
StreamReceiver<String, MapRecord<String, String, String>> receiver = StreamReceiver.create(connectionFactory);
return reactiveRedisTemplate.opsForStream().createGroup("stream001", "consumer001")
.thenMany(receiver.receive(Consumer.from(consumerGroup, getConsumerName()),
StreamOffset.create(stream, ReadOffset.lastConsumed()))
.onErrorContinue((err, obj) -> {
log.error("Continue on error in receiver.receive()", err);
})
.doOnCancel(() -> {
log.error("Stream receiver flux cancelled");
}).doOnComplete(() -> {
log.error("Stream receiver flux completed");
}).doOnError(err -> {
log.error("Stream receiver flux on error: ", err);
}).doOnTerminate(() -> {
log.error("Stream receiver flux on Termination");
})
);我们想要进一步调试,因为我们碰巧更好地使用了响应式推送标准。我们尝试添加上面提到的adaptiveRefresh和refreshPeriod道具,但没有解决问题。
我们还在评估重新实例化Stream Listener,以便刷新连接。
我们面对的生态环境
2020-08-30 16:38:54.955 [lettuce-epollEventLoop-4-2] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.CompletionException: io.lettuce.core.RedisConnectionExceptio n: Unable to connect to 10.123.34.38:6379
Caused by: java.util.concurrent.CompletionException: io.lettuce.core.RedisConnectionException: Unable to connect to 10.123.34.38:6379
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at io.lettuce.core.cluster.PooledClusterConnectionProvider.lambda$getConnectionAsync$6(PooledClusterConnectionProvider.java:38 4)
at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at io.lettuce.core.AbstractRedisClient.lambda$initializeChannelAsync0$4(AbstractRedisClient.java:329)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549)
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
at io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
at io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe$2.run(AbstractEpollChannel.java:577)
at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
Caused by: io.lettuce.core.RedisConnectionException: Unable to connect to /10.123.34.38:6379
at io.lettuce.core.RedisConnectionException.create(RedisConnectionxception.java:78)
at io.lettuce.core.RedisConnectionException.create(RedisConnectionException.java:56)
... 23 common frames omitted谢谢你的帮助
发布于 2020-09-14 11:01:32
给任何可能来到这里的人,张贴用于解决它的方法。这里也没有注意到消息延迟。但是在几个微秒内观察到多个连接问题,这在我们的应用程序中是可以接受的。
redisStreamMsgs.doOnNext(req -> {
CopyRequest copyRequest = req.getCopyRequest();
log.info("Picked from Stream the message for RequestId {}", req.getId());
// Process our business reactive logiccs
businessService.performAction(req);
}).onErrorContinue((throwable, obj) -> {
log.error("Error while processing the Message ID {}", obj.getClass().getName(), throwable);
}).doOnSubscribe(msg -> log.error("Message flux Subscribed ")
).doOnError(fluxErr -> log.error("Message flux error: ", fluxErr)
).doOnComplete(() -> log.error("Messages flux completed for configured")
).doOnTerminate(() -> {
log.info("CopyContext StreamMessages flux terminated ");
if (!this.beingDestroyed) {
log.error("Unexpected stream flux termination");
beansReloaderService.reinitRedisStream().subscribe();
} else {
log.info("Normal receiver flux termination due to shutdown");
}
}).doFinally(signal -> {
if(ON_ERROR.equalsIgnoreCase(signal.toString()))
health.setStreamListenerStatus(false);
}).subscribe();我碰巧遇到了beansReloaderService.reinitRedisStream(),如下所示。为了安全起见,如果同样的问题仍然存在,我们会在实例上降低健康
public Mono<Long> reinitRedisStream() {
return Mono.delay(Duration.ofMillis(200L)).doOnTerminate(() -> {
try {
ConfigurableListableBeanFactory bf = configContext.getBeanFactory();
bf.destroyBean(bf.getBean(Constants.STREAM_LISTENER));
bf.destroyBean(bf.getBean(Constants.LISTENER_FACTORY));
bf.createBean(RedisListenFactory.class);
bf.createBean(RedisStreamListener.class);
} catch (Exception ex) {
log.error("Exception occurred when reloading stream beans", ex);
}
});https://stackoverflow.com/questions/63688444
复制相似问题