由于不触发SleuthKafkaAspect.wrapProducerFactory()方法,跟踪信息不会在kafka消息上传播。在生产者端,消息被正确发送,跟踪信息被正确记录。在消费者方面,反而创建了一个新的traceId和spanId。
以下两条日志记录行显示了traceId、spanId (和parentId)的不同值:
2021-03-23 11:42:30.158 [http-nio-9185-exec-2] INFO my.company.Producer - /4afe07273872918b/4afe07273872918b// - Sending event='MyEvent'
2021-03-23 11:42:54.374 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO my.company.Consumer /1fec3bf6a3c91773/ff4bd26b2e509ed8/1fec3bf6a3c91773/ - Received new event='MyEvent'在第一次使用Krafdrop和调试时,我验证了消息头不包含任何跟踪信息。
在此之后,我发现方法SleuthKafkaAspect.wrapProducerFactory()从未被触发,相反,在消费者端,方法SleuthKafkaAspect.anyConsumerFactory()是。
所使用的库版本如下:
kakfa客户端库版本为2.4.1,这是由于在2.5.1版本的kafka客户端上出现了与生产缺陷相关的版本降级,从而增加了cpu的使用率。我还尝试使用以下库版本组合,但没有成功:
我们将我们的项目迁移到一个不同的spring引导版本,从2.3.0.RELEASE迁移到2.3.7。以下是旧库版本:
我们还引入了log42 42/log4j(在它是带有logback的slf4j之前)。
以下是相关图书馆:
- org.springframework.boot:spring-boot-starter-log4j2:jar:2.3.7.RELEASE:compile
- org.slf4j:jul-to-slf4j:jar:1.7.30:compile
- io.projectreactor:reactor-test:jar:3.3.12.RELEASE:test
- io.projectreactor:reactor-core:jar:3.3.12.RELEASE:test
- org.reactivestreams:reactive-streams:jar:1.0.3:test配置的属性如下:
spring.sleuth.messaging.enabled=true
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.client-id=myClientIdentifier
spring.kafka.consumer.group-id=MyConsumerGroup
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializerProducerFactory创建的配置类如下:
@Configuration
@EnableTransactionManagement
public class KafkaProducerConfig {
KafkaProperties kafkaProperties;
@Autowired
public KafkaProducerConfig(
KafkaProperties kafkaProperties) {
this.kafkaProperties = kafkaProperties;
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory());
return kafkaTemplate;
}
private ProducerFactory<String, Object> producerFactory() {
DefaultKafkaProducerFactory<String, Object> defaultKafkaProducerFactory =
new DefaultKafkaProducerFactory<>(producerConfigs());
//defaultKafkaProducerFactory.transactionCapable();
//defaultKafkaProducerFactory.setTransactionIdPrefix("tx-");
return defaultKafkaProducerFactory;
}
private Map<String, Object> producerConfigs() {
Map<String, Object> configs = kafkaProperties.buildProducerProperties();
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return configs;
}
}我的spring引导应用程序类:
@Profile("DEV")
@SpringBootApplication(
scanBasePackages = {"my.company"},
exclude = {
DataSourceAutoConfiguration.class,
DataSourceTransactionManagerAutoConfiguration.class,
HibernateJpaAutoConfiguration.class
}
)
@EnableSwagger2
@EnableFeignClients(basePackages = {"my.company.common", "my.company.integration"})
@EnableTransactionManagement
@EnableMongoRepositories(basePackages = {
"my.company.repository"})
@EnableMBeanExport(registration = RegistrationPolicy.IGNORE_EXISTING)
@ServletComponentScan
public class DevAppStartup extends SpringBootServletInitializer {
public static void main(String[] args) {
SpringApplication.run(DevAppStartup.class, args);
}
}在这里,您可以找到命令"mvn依赖项:tree“tree.txt的输出。
发布于 2021-03-31 23:45:08
如文件所示,如果您想使用自己的KafkaTemplate,则需要创建一个KafkaTemplate bean。
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, Object>producerFactory(KafkaProperties kafkaProperties) {
return new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties());
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory<String, Object>producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
}发布于 2021-10-02 02:00:20
我们装饰卡夫卡客户端(KafkaProducer和KafkaConsumer),为生产或消费的每个事件创建一个跨度。可以通过将spring.sleuth.kafka.enabled的值设置为false来禁用此功能。 你必须注册生产者或消费者为豆类,以使斯洛思的自动配置,以装饰它们。然后注入bean时,预期的类型必须是生产者或消费者(而不是KafkaProducer)。
https://stackoverflow.com/questions/66766936
复制相似问题