首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在Kafka中实现延迟主题体系结构

如何在Kafka中实现延迟主题体系结构
EN

Stack Overflow用户
提问于 2020-05-28 13:02:10
回答 1查看 696关注 0票数 2

如果我的消费者暂时无法处理信息,我希望它被推送到5分钟延迟的主题,如果它不能从那里处理,我希望它推到30分钟延迟的主题。如果它在这里也失败了,想把它推到死信队列中。

5分钟延迟主题:用户应在第一次处理5分钟后收听。

30分钟延迟话题:消费者应在前一次失败30分钟后收听。

如何设计延迟队列?这是很容易推动卡夫卡主题后,失败,但我的消费者/听众应该如何听5分钟或30分钟的延迟?

我正在为消费者使用SpringKafka来收听以下主题-

代码语言:javascript
复制
@KafkaListener(topics = "${kafka.topic}")
public void receive1(String payload) {
    logger.info("Getting message on receiver-1");
    submitPayloadToExecutor(payload);
}

我对我自己的项目有以下的实现,有人能指出退路和任何新的建议吗?

代码语言:javascript
复制
    @KafkaListener(topics = "${kafka.topic}")
    public void receive3(String payload) {

        logger.info("Getting message on receiver-3");
        submitPayloadToExecutor(payload);
    }

    private void submitPayloadToExecutor(String payload) {

        StartupService startupService = StartupServiceSingleton.INSTANCE.getStartupServiceInstance();

        ObjectMapper mapper = startupService.getConverter().getObjectMapper();

        PublishPostProcessorEntity publishPostProcessorEntity = null;
        try {
            publishPostProcessorEntity = mapper.readValue(payload, PublishPostProcessorEntity.class);
            
            sleepForDelayedPublishedEntity(publishPostProcessorEntity);

            // ... do some work

            topicExecutorService.submit(publishPostProcessorEntity);
        } catch (Exception e) {
            // Work on exception
        }
    }

    private void sleepForDelayedPublishedEntity(PublishPostProcessorEntity publishPostProcessorEntity) {

        if (publishPostProcessorEntity instanceof DelayedPublishPostProcessorEntity) {

            DelayedPublishPostProcessorEntity delayedPublishPostProcessorEntity = (DelayedPublishPostProcessorEntity) publishPostProcessorEntity;

            // Fetch the topicName and sleep based on the configuration
            long pushedTimeStamp = delayedPublishPostProcessorEntity.getPushedTimeStamp();
            delayedPublishPostProcessorEntity.setComingTopicName(delayedPublishPostProcessorEntity.getNextTopicName());

            long currentTimeStamp = System.currentTimeMillis();

            if (CMSKafkaConstants.FIVE_MINUTES_DELAYED_TOPIC
                    .equalsIgnoreCase(delayedPublishPostProcessorEntity.getNextTopicName())) {

                long timeElapsed = currentTimeStamp - pushedTimeStamp;
                if ((Long.parseLong(firstDelay)-timeElapsed) > 0) {
                    // wait for timeToWait
                    try {
                        Thread.sleep(Long.parseLong(firstDelay)-timeElapsed);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

            } else if (CMSKafkaConstants.THRITY_MINUTES_DELAYED_TOPIC
                    .equalsIgnoreCase(delayedPublishPostProcessorEntity.getNextTopicName())) {

                long timeElapsed = currentTimeStamp - pushedTimeStamp;
                if ((Long.parseLong(secondDelay)-timeElapsed) > 0) {
                    try {
                        Thread.sleep(Long.parseLong(secondDelay)-timeElapsed);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

            }

        }

    }
EN

回答 1

Stack Overflow用户

发布于 2020-05-28 14:18:28

您可以根据需要通过KafkaListenerEndpointRegistry (使用getListenerContainer(id).start())停止并启动消费者。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/62065405

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档