首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >具有事务、多线程的Spring-Kafka

具有事务、多线程的Spring-Kafka
EN

Stack Overflow用户
提问于 2018-12-16 14:22:46
回答 1查看 2K关注 0票数 2

我遇到了一种情况,即应用程序使用消息并生成消息作为对使用的消息的响应。这是使用kafka事务完成的,但是应用程序也有一个定期发送Kafka消息的任务(也使用事务,因为它发送到两个主题)。

当计划的作业开始发送时,我得到以下异常:

代码语言:javascript
复制
org.apache.kafka.common.KafkaException: TransactionalId aura-transaction-1: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION

有人知道原因是什么吗?我正在考虑尝试与不同的kafkaTemplates (+生产者工厂),看看这是否解决了问题。从那时起,我可以为计划的作业分配一个新的事务id前缀。目前,他们有同样的。

使用者使用一个basic @KafkaListener,它已经在来自KafkaMessageListenerContainer的事务中注册。然后它使用KafkaTemplate.send(Object)生成一条消息。

计划作业使用KafkaTemplate.executeInTransaction功能并发送到两个主题。

版本: Spring 2.1.1 Spring : 2.2.2

StackTrace:

代码语言:javascript
复制
org.apache.kafka.common.KafkaException: TransactionalId person-identhendelse-lager-1.privat-person-fregIdenthendelse-v1.0: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION
    at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:758)
    at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:751)
    at org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:216)
    at org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:606)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.beginTransaction(DefaultKafkaProducerFactory.java:459)
    at org.springframework.kafka.core.KafkaTemplate.executeInTransaction(KafkaTemplate.java:278)
    at no.nav.person.identhendelse.lager.app.aggregat.AggregatIdenthendelsePublisher.sendForPerson(AggregatIdenthendelsePublisher.java:52)
    at no.nav.person.identhendelse.lager.app.aggregat.AggregatScheduledTask.aggregate(AggregatScheduledTask.java:54)
    at no.nav.person.identhendelse.lager.app.aggregat.AggregatScheduledTask$$FastClassBySpringCGLIB$$7f682c33.invoke(<generated>)
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:749)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
    at org.springframework.aop.aspectj.MethodInvocationProceedingJoinPoint.proceed(MethodInvocationProceedingJoinPoint.java:88)
    at io.micrometer.core.aop.TimedAspect.timedMethod(TimedAspect.java:77)
    at sun.reflect.GeneratedMethodAccessor58.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethodWithGivenArgs(AbstractAspectJAdvice.java:644)
    at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethod(AbstractAspectJAdvice.java:633)
    at org.springframework.aop.aspectj.AspectJAroundAdvice.invoke(AspectJAroundAdvice.java:70)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:294)
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
    at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:93)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
    at no.nav.person.utils.precondition.feature.annotation.PreconditionMethodInterceptor.invoke(PreconditionMethodInterceptor.java:22)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688)
    at no.nav.person.identhendelse.lager.app.aggregat.AggregatScheduledTask$$EnhancerBySpringCGLIB$$e0b597f7.aggregate(<generated>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

添加了示例代码:https://github.com/Lg87/kafka-transaction-example查看readme.md并查找KafkaException以查看发生的异常。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-12-16 15:19:18

  1. 当问这样的问题时,一定要提供版本信息。
  2. 显示您的代码和完整的堆栈跟踪。
  3. 您提到了transactionTemplate --不要使用模板 executeInTransaction --它们都是多余的,因为它们都启动了事务。我们最近修复了这样的“嵌套”事务被破坏的问题。

编辑

我发现了这个问题;当使用producerPerConsumerPartition (默认为真)时,容器使用的生产者不应该添加到缓存中,以供任意的KafkaTemplate操作使用。

作为一种解决办法,使用不同的DefaultKafkaProducerFactory作为独立模板操作。

https://github.com/spring-projects/spring-kafka/issues/908

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/53803052

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档