首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >创建死信队列

创建死信队列
EN

Stack Overflow用户
提问于 2021-08-02 01:01:18
回答 1查看 309关注 0票数 1

如何创建一个死信队列并进行测试?我有一个制片人和消费者。我也有制片人的吐露,像杰克,重试。

Producer.java

代码语言:javascript
复制
package org.timothy.producer;

import org.apache.kafka.clients.producer.*;
import org.timothy.producer.common.AppConfigs;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.timothy.producer.common.PropConfigs;

import java.util.concurrent.ExecutionException;

public class Producer{

    private static final Logger logger = LogManager.getLogger(Producer.class);
    public static void main(String[] args) {
        logger.info("Creating Kafka Producer...");
        KafkaProducer<Integer, String> producer = PropConfigs.prodProps();

        logger.info("Start sending messages...");

        for (int i = 1; i <= AppConfigs.numEvents; i++) {
            ProducerRecord<Integer, String> record = new ProducerRecord<>(AppConfigs.topicName, "This is Message: " + i);
            try {
                RecordMetadata metadata = producer.send(record).get();
                System.out.println("Record sent with key " + i + " to partition " + metadata.partition()
                        + " with offset " + metadata.offset());
            }
            catch (ExecutionException e) {
                System.out.println("Error in sending record");
                //System.out.println(e);
                e.printStackTrace();
            }
            catch (InterruptedException e) {
                System.out.println("Error in sending record");
                //System.out.println(e);
            }
        }
        producer.flush();
        producer.close();
        logger.info("Finished - Closing Kafka Producer.");
    }
}

Consumer.java

代码语言:javascript
复制
package org.timothy.producer;

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.timothy.producer.common.AppConfigs;
import org.timothy.producer.common.PropConfigs;
import java.time.Duration;
import java.util.Collections;


public class Consumer{

    private static final Logger logger = LogManager.getLogger(Consumer.class);
    public static void main(String[] args) {
        KafkaConsumer<Integer, String> consumer = PropConfigs.consProps();

        consumer.subscribe(Collections.singleton(AppConfigs.topicName));
        int noMessageFound = 0;

        while(true){
            ConsumerRecords<Integer,String> records = consumer.poll(Duration.ofMillis(1000));
            
            if (records.count() == 0) {
                noMessageFound++;
                if (noMessageFound > AppConfigs.MAX_NO_MESSAGE_FOUND_COUNT)
                    break;
                else
                    continue;
            }

            records.forEach(record -> logger.info("Received new record: " +
                    " Key: " + record.key() +
                    ", Value: " + record.value() +
                    ", Topic: " + record.topic() +
                    ", Partition: " + record.partition() +
                    ", Offset: " + record.offset() + "\n"
            ));
            consumer.commitAsync();

        }

    }
}

如何在代码中实现DLQ?如何测试重试,当我运行我的程序时,我不会遇到错误,这就是为什么我不知道程序是否重试。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-08-02 15:00:20

当我运行我的程序时,我不会遇到错误

那就介绍他们。创建一个计数器,当它命中某个值时,抛出一个RuntimeException。在catch块中,使用生产者实例将事件发送到新主题。

如果要测试重试,请关闭代理或引入其他网络异常。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68615443

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档