首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >为什么追踪信息不传播在卡夫卡消息,当春天侦察是在类路径?

为什么追踪信息不传播在卡夫卡消息,当春天侦察是在类路径?
EN

Stack Overflow用户
提问于 2021-03-23 16:07:37
回答 2查看 3.5K关注 0票数 0

由于不触发SleuthKafkaAspect.wrapProducerFactory()方法,跟踪信息不会在kafka消息上传播。在生产者端,消息被正确发送,跟踪信息被正确记录。在消费者方面,反而创建了一个新的traceId和spanId。

以下两条日志记录行显示了traceId、spanId (和parentId)的不同值:

代码语言:javascript
复制
2021-03-23 11:42:30.158 [http-nio-9185-exec-2] INFO  my.company.Producer - /4afe07273872918b/4afe07273872918b// - Sending event='MyEvent'
2021-03-23 11:42:54.374 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO my.company.Consumer /1fec3bf6a3c91773/ff4bd26b2e509ed8/1fec3bf6a3c91773/ - Received new event='MyEvent'

在第一次使用Krafdrop和调试时,我验证了消息头不包含任何跟踪信息。

在此之后,我发现方法SleuthKafkaAspect.wrapProducerFactory()从未被触发,相反,在消费者端,方法SleuthKafkaAspect.anyConsumerFactory()是。

所使用的库版本如下:

  • 弹簧引导:2.3.7
  • 春云bom: Hoxton.SR10
  • 春季云:2.2.7(和2.2.5)
  • 春季卡夫卡:2.5.10
  • kakfa客户: 2.4.1
  • 弹簧云起动机:2.2.7 starter
  • 弹簧-云-雪橇-zipkin:2.2.7

kakfa客户端库版本为2.4.1,这是由于在2.5.1版本的kafka客户端上出现了与生产缺陷相关的版本降级,从而增加了cpu的使用率。我还尝试使用以下库版本组合,但没有成功:

  • 弹簧引导:2.3.7
  • 春云bom: Hoxton.SR10 (和Hoxton.SR8)
  • 春季云:2.2.7(和2.2.5)
  • 春季卡夫卡:2.5.10
  • kakfa客户: 2.5.1
  • 弹簧云起动机:2.2.7 starter(和2.2.5 starter)
  • 弹簧-云-雪橇-zipkin:2.2.7(和2.2.5)
  • 弹簧引导:2.3.7
  • 春云bom: Hoxton.SR10 (和Hoxton.SR8)
  • 春季云:2.2.7(和2.2.5)
  • 春季卡夫卡:2.5.10
  • kakfa客户: 2.6.0
  • 弹簧云起动机:2.2.7 starter(和2.2.5 starter)
  • 弹簧-云-雪橇-zipkin:2.2.7(和2.2.5)
  • 弹簧引导:2.3.7
  • 春云bom: Hoxton.SR10 (和Hoxton.SR8)
  • 春季云:2.2.7(和2.2.5)
  • 春季卡夫卡: 2.6.x
  • kakfa客户: 2.6.0
  • 弹簧云起动机:2.2.7 starter(和2.2.5 starter)
  • 弹簧-云-雪橇-zipkin:2.2.7(和2.2.5)

我们将我们的项目迁移到一个不同的spring引导版本,从2.3.0.RELEASE迁移到2.3.7。以下是旧库版本:

  • 弹簧-启动:2.3.0
  • 春季-卡夫卡:2.5.0
  • kafka-客户: 2.4.1
  • 春云:2.2.5
  • 弹簧云起动机:2.2.5 starter
  • 弹簧-云-雪橇-zipkin:2.2.5

我们还引入了log42 42/log4j(在它是带有logback的slf4j之前)。

以下是相关图书馆:

代码语言:javascript
复制
- org.springframework.boot:spring-boot-starter-log4j2:jar:2.3.7.RELEASE:compile
- org.slf4j:jul-to-slf4j:jar:1.7.30:compile
- io.projectreactor:reactor-test:jar:3.3.12.RELEASE:test
- io.projectreactor:reactor-core:jar:3.3.12.RELEASE:test
- org.reactivestreams:reactive-streams:jar:1.0.3:test

配置的属性如下:

代码语言:javascript
复制
spring.sleuth.messaging.enabled=true
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.client-id=myClientIdentifier
spring.kafka.consumer.group-id=MyConsumerGroup
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

ProducerFactory创建的配置类如下:

代码语言:javascript
复制
@Configuration
@EnableTransactionManagement
public class KafkaProducerConfig {

    KafkaProperties kafkaProperties;

    @Autowired
    public KafkaProducerConfig(
            KafkaProperties kafkaProperties) {
        this.kafkaProperties = kafkaProperties;
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory());
        return kafkaTemplate;
    }


    private ProducerFactory<String, Object> producerFactory() {
        DefaultKafkaProducerFactory<String, Object> defaultKafkaProducerFactory =
                new DefaultKafkaProducerFactory<>(producerConfigs());
        //defaultKafkaProducerFactory.transactionCapable();
        //defaultKafkaProducerFactory.setTransactionIdPrefix("tx-");
        return defaultKafkaProducerFactory;
    }

    private Map<String, Object> producerConfigs() {

        Map<String, Object> configs = kafkaProperties.buildProducerProperties();
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return configs;
    }

}

我的spring引导应用程序类:

代码语言:javascript
复制
@Profile("DEV")
@SpringBootApplication(
        scanBasePackages = {"my.company"},
        exclude = {
                DataSourceAutoConfiguration.class,
                DataSourceTransactionManagerAutoConfiguration.class,
                HibernateJpaAutoConfiguration.class
        }
)
@EnableSwagger2
@EnableFeignClients(basePackages = {"my.company.common", "my.company.integration"})
@EnableTransactionManagement
@EnableMongoRepositories(basePackages = {
        "my.company.repository"})
@EnableMBeanExport(registration = RegistrationPolicy.IGNORE_EXISTING)
@ServletComponentScan
public class DevAppStartup extends SpringBootServletInitializer {

    public static void main(String[] args) {
        SpringApplication.run(DevAppStartup.class, args);
    }

}

在这里,您可以找到命令"mvn依赖项:tree“tree.txt的输出。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2021-03-31 23:45:08

如文件所示,如果您想使用自己的KafkaTemplate,则需要创建一个KafkaTemplate bean。

代码语言:javascript
复制
@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, Object>producerFactory(KafkaProperties kafkaProperties) {
        return new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties());
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory<String, Object>producerFactory) {
        return new KafkaTemplate<>(producerFactory);
    }
}
票数 0
EN

Stack Overflow用户

发布于 2021-10-02 02:00:20

基于Spring:https://docs.spring.io/spring-cloud-sleuth/docs/current-SNAPSHOT/reference/html/integrations.html#sleuth-kafka-integration的文档

我们装饰卡夫卡客户端(KafkaProducer和KafkaConsumer),为生产或消费的每个事件创建一个跨度。可以通过将spring.sleuth.kafka.enabled的值设置为false来禁用此功能。 你必须注册生产者或消费者为豆类,以使斯洛思的自动配置,以装饰它们。然后注入bean时,预期的类型必须是生产者或消费者(而不是KafkaProducer)。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/66766936

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档