首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Kafka EOS重试标志

Kafka EOS重试标志
EN

Stack Overflow用户
提问于 2022-04-27 10:30:29
回答 1查看 126关注 0票数 0

我有一个Kafka集群和一个为EOS配置的spring引导应用程序。应用程序从主题A中消耗一些业务逻辑,然后生成到主题B。如果EOS没有向主题B写入,它会重试并再次执行我的所有业务逻辑。这对我来说是个问题,因为我会复制API调用。当重试发生时是否有某种标志,这样我就可以跳过业务逻辑,直接进行生成了吗?

KafkaConsumerConfig

代码语言:javascript
复制
@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Value("${kafka.server}")
    String server;

    @Value("${kafka.consumer.groupid}")
    String groupid;

    @Autowired
    Tracer tracer;

    @Bean
    public ConsumerFactory<String, TransactionAvroEntity> consumerFactory() {

        Map<String, Object> config = new HashMap<>();

        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, groupid);
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroDeserializer.class);

        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 120000);
        config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10000);
        config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 15000);
        config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);

        return new TracingConsumerFactory<>(new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
                new AvroDeserializer<>(TransactionAvroEntity.class)), tracer);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, TransactionAvroEntity> kafkaListenerContainerFactory(
            KafkaAwareTransactionManager<Object, Object> transactionManager) {

        ConcurrentKafkaListenerContainerFactory<String, TransactionAvroEntity> factory = new ConcurrentKafkaListenerContainerFactory<String, TransactionAvroEntity>();

        factory.setConsumerFactory(consumerFactory());
        factory.setAutoStartup(false);
        factory.setBatchListener(true);
        factory.getContainerProperties().setAckMode(AckMode.BATCH);
        factory.getContainerProperties().setEosMode(EOSMode.ALPHA);
        factory.getContainerProperties().setTransactionManager(transactionManager);
        factory.setConcurrency(5);

        return factory;
    }

}

KafkaProducerConfig

代码语言:javascript
复制
@Configuration
public class KafkaProducerConfig {

    @Value("${kafka.server}")
    String server;
        
    @Autowired
    public Tracer tracer;
    
    @Bean
    public ProducerFactory<String, TransactionAvroEntity> producerFactory() {
        
        Map<String, Object> config = new HashMap<>();
        
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroSerializer.class.getName());       
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        config.put(ProducerConfig.ACKS_CONFIG, "all");
        config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tranDec-1");
        config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
        config.put(ProducerConfig.LINGER_MS_CONFIG, "200");
        config.put(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(256*1024));
        config.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,120000);
        config.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,60000);
        config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
        config.put(ProducerConfig.BUFFER_MEMORY_CONFIG, Integer.toString(32768* 1024));
        config.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.EXAMPLE.config.KafkaCustomPatitioner");
        
        return new TracingProducerFactory<>(new DefaultKafkaProducerFactory<>(config), tracer);
    }

    @Bean
    public KafkaTemplate<String, TransactionAvroEntity> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
    
    @Bean
    public KafkaAwareTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<String, TransactionAvroEntity> producerFactory) {
        return new KafkaTransactionManager<>(producerFactory);
    }
     

}

KafkaTopicProducer

代码语言:javascript
复制
@Service
public class KafkaTopicProducer {
    @Autowired
    private KafkaTemplate<String, TransactionAvroEntity> kafkaTemplate;

    public void topicProducer(TransactionAvroEntity payload, String topic, Headers headers) {

        ProducerRecord<String, TransactionAvroEntity> producerRecord = new ProducerRecord<String, TransactionAvroEntity>(topic, null, payload.getNumber(), payload,
                headers);
        kafkaTemplate.send(producerRecord);

    }
}

KafkaConsumer

代码语言:javascript
复制
@Service
public class KafkaConsumerTransaction {

    private static final Logger log = LoggerFactory.getLogger(KafkaConsumerTransaction.class);

    @Autowired
    TransactionEnrichmentService enrichmentService;

    @Autowired
    KafkaTopicProducer kafkaTopicProducer;

    @Value("${kafka.producer.topic.transactionDecouplingException}")
    String exceptionTopic;

    @Autowired
    JaegerTrace tracer;

    @Autowired
    ObjectMapper objectMapper;



    @KafkaListener(topics = "${kafka.consumer.topic.transaction}", groupId = "${kafka.consumer.groupid}", id = "${kafka.consumer.listenerid}")
    public boolean consume(List<ConsumerRecord<String, TransactionAvroEntity>> records, Consumer<?, ?> consumer) {
        // loop through batch read
        for (ConsumerRecord<String, TransactionAvroEntity> record : records) {

            Integer partition = record.partition();
            Long offset = record.offset();
            TransactionAvroEntity te = record.value();

            try {

                

if (enrichmentService.enrichAndValidateTransactions(te, partition, offset, record)) {
    
//Do some logic
                    } else {
//Do some logic
                    }
    
                } catch (Exception e) {
    //Do some logic

                }
kafkaTopicProducer.topicProducer(t, topic, record.headers());
            }
            return true;
        }
    
    }
EN

回答 1

Stack Overflow用户

发布于 2022-05-03 16:44:51

EOS仅适用于整个consume->process->publish

consume->process部分至少是一次。

你需要使过程的一部分是幂等的。

一种常见的技术是将主题/分区/偏移量与其他数据一起存储在process中,这样您就可以检查该记录是否已被处理过。

您可以启用传递尝试标头,但这只会告诉您这是重新传递,它不能告诉您失败是在process的开始还是在结束时(在发送期间)。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72027226

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档