首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Smallrye响应消息停止接收异常传入的消息

Smallrye响应消息停止接收异常传入的消息
EN

Stack Overflow用户
提问于 2022-09-30 18:02:00
回答 1查看 216关注 0票数 0

根据https://quarkus.io/guides/kafka故障中的文档-策略=忽略如下(忽略:故障被记录,但处理仍在继续。未正确处理的记录的偏移量已提交。)

我已经配置了

代码语言:javascript
复制
mp.messaging.incoming.words-in.topic=words
mp.messaging.outgoing.words-out.topic=words
mp.messaging.incoming.words-in.auto.offset.reset=earliest
mp.messaging.incoming.words-in.commit-strategy=throttled
mp.messaging.incoming.words-in.failure-strategy=ignore

我有下面的测试代码。

代码语言:javascript
复制
    @Incoming("words-in")
    @NonBlocking
    public Uni<Void> storeToDB(Message<String> message) {
        Log.info("Received publish direction");
        return test(message).onFailure().invoke(throwable -> {
            System.out.println("NACK!!!");
            System.out.println("NACK!!!");
            System.out.println("NACK!!!");
            System.out.println("NACK!!!---");
            message.nack(throwable);
        });
    }

    public Uni<Void> test(Message<String> message) {
        return Uni.createFrom().failure(new IllegalAccessError("TEST"));
    }

这产生了:

代码语言:javascript
复制
2022-09-30 13:50:22,268 INFO  [com.sim.mes.PublishModuleProcessor] (vert.x-eventloop-thread-6) Received publish direction
NACK!!!
NACK!!!
NACK!!!
NACK!!!---
2022-09-30 13:50:22,270 WARN  [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-6) SRMSG18204: A message sent to channel `words-in` has been nacked, ignored failure is: TEST.
2022-09-30 13:50:22,271 ERROR [io.sma.rea.mes.provider] (vert.x-eventloop-thread-6) SRMSG00201: Error caught while processing a message: java.lang.IllegalAccessError: TEST
    at com.acme.messaging.PublishModuleProcessor.test(PublishModuleProcessor.java:75)
    at com.acme.messaging.PublishModuleProcessor_Subclass.test$$superforward1(Unknown Source)
    at com.acme.messaging.PublishModuleProcessor_Subclass$$function$$75.apply(Unknown Source)
    at io.quarkus.arc.impl.AroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:53)
    at io.quarkus.arc.runtime.devconsole.InvocationInterceptor.proceed(InvocationInterceptor.java:62)
    at io.quarkus.arc.runtime.devconsole.InvocationInterceptor.monitor(InvocationInterceptor.java:51)
    at io.quarkus.arc.runtime.devconsole.InvocationInterceptor_Bean.intercept(Unknown Source)
    at io.quarkus.arc.impl.InterceptorInvocation.invoke(InterceptorInvocation.java:41)
    at io.quarkus.arc.impl.AroundInvokeInvocationContext.perform(AroundInvokeInvocationContext.java:40)
    at io.quarkus.arc.impl.InvocationContexts.performAroundInvoke(InvocationContexts.java:32)
    at com.acme.messaging.PublishModuleProcessor_Subclass.test(Unknown Source)
    at com.acme.messaging.PublishModuleProcessor.storeToDB(PublishModuleProcessor.java:65)
    at com.acme.messaging.PublishModuleProcessor_Subclass.storeToDB$$superforward1(Unknown Source)
    at com.acme.messaging.PublishModuleProcessor_Subclass$$function$$74.apply(Unknown Source)
    at io.quarkus.arc.impl.AroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:53)
    at io.quarkus.arc.runtime.devconsole.InvocationInterceptor.proceed(InvocationInterceptor.java:62)
    at io.quarkus.arc.runtime.devconsole.InvocationInterceptor.monitor(InvocationInterceptor.java:51)
    at io.quarkus.arc.runtime.devconsole.InvocationInterceptor_Bean.intercept(Unknown Source)
    at io.quarkus.arc.impl.InterceptorInvocation.invoke(InterceptorInvocation.java:41)
    at io.quarkus.arc.impl.AroundInvokeInvocationContext.perform(AroundInvokeInvocationContext.java:40)
    at io.quarkus.arc.impl.InvocationContexts.performAroundInvoke(InvocationContexts.java:32)
    at com.acme.messaging.PublishModuleProcessor_Subclass.storeToDB(Unknown Source)
    at com.acme.messaging.PublishModuleProcessor_ClientProxy.storeToDB(Unknown Source)
    at com.acme.messaging.PublishModuleProcessor_SmallRyeMessagingInvoker_storeToDB_308e8bd532ece24c38471c9208b94b5b4fa0f2f7.invoke(Unknown Source)
    at io.smallrye.reactive.messaging.providers.AbstractMediator.invoke(AbstractMediator.java:95)
    at io.smallrye.reactive.messaging.providers.AbstractMediator.lambda$invokeOnMessageContext$1(AbstractMediator.java:103)
    at io.smallrye.reactive.messaging.providers.locals.LocalContextMetadata.lambda$invokeOnMessageContext$0(LocalContextMetadata.java:34)
    at io.smallrye.reactive.messaging.providers.locals.LocalContextMetadata.lambda$invokeOnMessageContext$2(LocalContextMetadata.java:55)
    at io.smallrye.context.impl.wrappers.SlowContextualConsumer.accept(SlowContextualConsumer.java:21)
    at io.smallrye.mutiny.operators.uni.builders.UniCreateWithEmitter.subscribe(UniCreateWithEmitter.java:22)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
    at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni.subscribe(UniOnItemTransformToUni.java:25)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
    at io.smallrye.mutiny.operators.uni.UniOnItemOrFailureFlatMap.subscribe(UniOnItemOrFailureFlatMap.java:27)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
    at io.smallrye.mutiny.converters.uni.UniToMultiPublisher$UniToMultiSubscription.request(UniToMultiPublisher.java:74)
    at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapInner.onSubscribe(MultiFlatMapOp.java:601)
    at io.smallrye.mutiny.converters.uni.UniToMultiPublisher.subscribe(UniToMultiPublisher.java:26)
    at io.smallrye.mutiny.groups.MultiCreate$1.subscribe(MultiCreate.java:163)
    at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.onItem(MultiFlatMapOp.java:193)
    at io.smallrye.mutiny.operators.multi.MultiMapOp$MapProcessor.onItem(MultiMapOp.java:50)
    at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
    at io.smallrye.mutiny.subscription.SafeSubscriber.onNext(SafeSubscriber.java:99)
    at io.smallrye.mutiny.helpers.HalfSerializer.onNext(HalfSerializer.java:31)
    at io.smallrye.mutiny.helpers.StrictMultiSubscriber.onItem(StrictMultiSubscriber.java:85)
    at io.smallrye.mutiny.operators.multi.MultiOperatorProcessor.onItem(MultiOperatorProcessor.java:100)
    at io.smallrye.reactive.messaging.providers.locals.ContextDecorator$ContextMulti$ContextProcessor.lambda$onItem$1(ContextDecorator.java:78)
    at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:264)
    at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:246)
    at io.vertx.core.impl.EventLoopContext.lambda$runOnContext$0(EventLoopContext.java:43)
    at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:503)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:833)

问题是,当这种情况发生时,发送到队列的后续消息将在我重新启动之前不再被处理。这与我基于失败策略配置的预期不同。

如果我的理解是错误的,谁能指点我一下吗?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-10-13 15:01:26

用以下方法解决这一问题

代码语言:javascript
复制
    return test(message).onItem().invoke(() -> {
        Log.info("Successful");
        message.ack();
    }).onFailure().recoverWithUni(a -> Uni.createFrom().voidItem());
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73912425

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档