首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >AutoCreate卡夫卡主题如何根据传入的主题模式通过kafkalistener服务在春季卡夫卡?

AutoCreate卡夫卡主题如何根据传入的主题模式通过kafkalistener服务在春季卡夫卡?
EN

Stack Overflow用户
提问于 2021-12-10 14:06:34
回答 1查看 347关注 0票数 0

我将订阅卡夫卡主题模式,如“主题。*”我的目标是为我所听的每一个卡夫卡主题创建死信队列。

例如,当我收听名为"topic.1“的主题时,我希望自动创建名为"topic.1_deadletter”的死信队列。

到目前为止,我试图做的事情如下:

我的消费者:

代码语言:javascript
复制
@Component
@Slf4j
public class LibraryEventsConsumer {

    @Autowired
    LibraryEventConsumerConfig libraryEventConsumerConfig;

    @KafkaListener(topicPattern = "kafka.*")
    public void onMessage(String consumerRecord, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) throws Exception{

        log.info("ConsumerRecord : {}", consumerRecord);

        String deadlettertopic = String.format("%s_deadletter",topic);
        System.out.println(deadlettertopic);
        System.out.println(KafkaHeaders.RECEIVED_TOPIC);

        libraryEventConsumerConfig.getTopic(topic);`

在这里,使用getTopic方法,我试图自动创建卡夫卡主题。下面可以看到libraryEventConsumer类:

代码语言:javascript
复制
@Configuration
@EnableKafka
public class LibraryEventConsumerConfig {

    @Bean
    public void getTopic(String topic){
        NewTopic deadlettertopic = TopicBuilder.name(String.format("%s_deadletter",topic))
                .partitions(1)
                .replicas(1)
                .build();
    }
}

不幸的是,这种方法不起作用,我得到了下面的错误消息:

代码语言:javascript
复制
Parameter 0 of method getTopic in com.kafkalibrary.Config.LibraryEventConsumerConfig required a bean of type 'java.lang.String' that could not be found.

知道该怎么做吗?

解决方案示例代码:

对于那些正在寻找相同目标的人,下面是我的示例代码:感谢Gary的启发。

代码语言:javascript
复制
   private static void createTopic(String topicName, int numPartitions) throws Exception {
    Properties config = new Properties();
    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:5052,localhost:5053,localhost:5054");
    AdminClient admin = AdminClient.create(config);

    //checking if topic already exists
    boolean alreadyExists = admin.listTopics().names().get().stream()
            .anyMatch(existingTopicName -> existingTopicName.equals(topicName));
    if (alreadyExists) {
        System.out.printf("topic already exits: %s%n", topicName);
    } else {
        //creating new topic
        System.out.printf("creating topic: %s%n", topicName);
        NewTopic newTopic = new NewTopic(topicName, numPartitions, (short) 1);
        admin.createTopics(Collections.singleton(newTopic)).all().get();
    }
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-12-13 14:56:20

添加一个重新平衡的侦听器,或者扩展AbstractConsumerSeekAware (或者只是实现ConsumerSeekAware)。

public class LibraryEventsConsumer extends AbstractConsumerSeekAware {

然后,在onPartitionsAssigned()中使用AdminClient检查DLT主题是否存在,如果不存在,则创建它。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70305724

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档