首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >无法使用Spingboot应用程序配置Camel-Kafka事务

无法使用Spingboot应用程序配置Camel-Kafka事务
EN

Stack Overflow用户
提问于 2021-05-17 12:38:14
回答 1查看 264关注 0票数 1

我目前正在创建一个Springboot应用程序,它将使用Kafka主题的消息,处理后将写回另一个主题。我用骆驼来整合。

代码语言:javascript
复制
My Route looks like this :
    onException(IllegalArgumentException.class).maximumRedeliveries(4); 
        

from("kafka:CDC-GEXPUAT-CUSTOMER")
             .id("CamelRouteCustomer_1")
             .**transacted**()
             .choice()
                .when(simple("${body} contains 'GEXPUAT.CUSTOMER'" ))
                    .unmarshal().json(JsonLibrary.Jackson, CustomerWrapper.class)
                    .process(customerProcessor)
                .otherwise()
                    .log("${body}")                 
             .end()
             .to("seda:aggregate_1");

当我在我的路由中使用.transacted()时,我得到以下错误: org.apache.camel.NoSuchBeanException: No可以在类型: PlatformTransactionManager的注册表中找到

所以我现在尝试创建一个配置类来定义TransactionManager

代码语言:javascript
复制
@Configuration
public class CommonBean {
    
     @Bean
        SpringTransactionPolicy springTransactionPolicy() throws Exception {
            SpringTransactionPolicy txRequired = new SpringTransactionPolicy();
            txRequired.setTransactionManager(transactionManager());
            txRequired.setPropagationBehaviorName("PROPAGATION_REQUIRED");
            return txRequired;
        }

        @Bean
        public DefaultKafkaProducerFactory<byte[], byte[]> producerFactory() {
            DefaultKafkaProducerFactory<byte[], byte[]> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<byte[], byte[]>(
                    kafkaConfigs());
            // enable transaction manager
            defaultKafkaProducerFactory.setTransactionIdPrefix(transactionIdPrefix);
            return defaultKafkaProducerFactory;
        }


        @Bean
        @Primary
        public ChainedKafkaTransactionManager<byte[], byte[]> transactionManager() throws Exception {
            return new ChainedKafkaTransactionManager<>(kafkaTransactionManager());
        }

        @Bean
        public PlatformTransactionManager kafkaTransactionManager() {
            KafkaTransactionManager<byte[], byte[]> kafkaTransactionManager = new KafkaTransactionManager<>(producerFactory);
            kafkaTransactionManager.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
            kafkaTransactionManager.setRollbackOnCommitFailure(true);
            return kafkaTransactionManager;
        }

}

但是现在我得到了编译错误和未找到的类:

  1. SpringTransactionPolicy
  2. DefaultKafkaProducerFactory
  3. ChainedKafkaTransactionManager
  4. KafkaTransactionManager

我不确定需要在pom.xml中添加哪些依赖项,以便在Camel Spingboot项目中配置我的KafkaTransactionManager

目前,POM.XML看起来是这样的

我已经注释了xml文件的某些部分。

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.4.5</version>
        <!-- <version>2.3.3.RELEASE</version> -->
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.surajit.camel</groupId>
    <artifactId>camel-microservice-a</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>CamelProject</name>
    <description>Camel project for Spring Boot</description>
    <properties>
        <java.version>8</java.version>
        <camel.version>3.7.0</camel.version>
        <spring-boot.version>2.3.3.RELEASE</spring-boot.version>
        <spring-cloud.version>Hoxton.SR8</spring-cloud.version>
    </properties>
    <dependencies>
        
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.camel.springboot</groupId>
            <artifactId>camel-spring-boot-starter</artifactId>
            <version>${camel.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.camel.springboot</groupId>
            <artifactId>camel-activemq-starter</artifactId>
            <version>${camel.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.camel.springboot</groupId>
            <artifactId>camel-kafka-starter</artifactId>
            <version>${camel.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.camel.springboot</groupId>
            <artifactId>camel-jackson-starter</artifactId>
            <version>${camel.version}</version>
        </dependency>
        
    <!--    <dependency>
            <groupId>com.datastax.cassandra</groupId>
            <artifactId>cassandra-driver-core</artifactId>
            <version>3.0.2</version>
        </dependency>
        
        <dependency>
            <groupId>com.datastax.oss</groupId>
            <artifactId>java-driver-core</artifactId>
            <version>4.2.1</version>
        </dependency> -->

    <!--    <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-dependencies</artifactId>
          <version>${spring-boot.version}</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>${spring-cloud.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency> -->
        
    <!--     <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.0</version>
        </dependency> -->
        

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>
EN

回答 1

Stack Overflow用户

发布于 2021-05-17 16:15:17

我可以通过在pom.xml中添加以下内容来解决这个问题

代码语言:javascript
复制
<dependency>
          <groupId>org.springframework.kafka</groupId>
          <artifactId>spring-kafka</artifactId>
         <!--  <version>2.2.14.RELEASE</version> -->
          <version>2.7.0</version>
        </dependency>

然后我删除了CommonBean自定义事务Bean类

代码语言:javascript
复制
Added the following in the application.properties
spring.kafka.producer.transaction-id-prefix="producer"
spring.kafka.producer.bootstrap-servers=xxx.xxx.xxx.xxx:9092
spring.kafka.jaas.enabled=false

现在按路线开始的时候

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/67569821

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档