如果我的消费者暂时无法处理信息,我希望它被推送到5分钟延迟的主题,如果它不能从那里处理,我希望它推到30分钟延迟的主题。如果它在这里也失败了,想把它推到死信队列中。
5分钟延迟主题:用户应在第一次处理5分钟后收听。
30分钟延迟话题:消费者应在前一次失败30分钟后收听。
如何设计延迟队列?这是很容易推动卡夫卡主题后,失败,但我的消费者/听众应该如何听5分钟或30分钟的延迟?
我正在为消费者使用SpringKafka来收听以下主题-
@KafkaListener(topics = "${kafka.topic}")
public void receive1(String payload) {
logger.info("Getting message on receiver-1");
submitPayloadToExecutor(payload);
}我对我自己的项目有以下的实现,有人能指出退路和任何新的建议吗?
@KafkaListener(topics = "${kafka.topic}")
public void receive3(String payload) {
logger.info("Getting message on receiver-3");
submitPayloadToExecutor(payload);
}
private void submitPayloadToExecutor(String payload) {
StartupService startupService = StartupServiceSingleton.INSTANCE.getStartupServiceInstance();
ObjectMapper mapper = startupService.getConverter().getObjectMapper();
PublishPostProcessorEntity publishPostProcessorEntity = null;
try {
publishPostProcessorEntity = mapper.readValue(payload, PublishPostProcessorEntity.class);
sleepForDelayedPublishedEntity(publishPostProcessorEntity);
// ... do some work
topicExecutorService.submit(publishPostProcessorEntity);
} catch (Exception e) {
// Work on exception
}
}
private void sleepForDelayedPublishedEntity(PublishPostProcessorEntity publishPostProcessorEntity) {
if (publishPostProcessorEntity instanceof DelayedPublishPostProcessorEntity) {
DelayedPublishPostProcessorEntity delayedPublishPostProcessorEntity = (DelayedPublishPostProcessorEntity) publishPostProcessorEntity;
// Fetch the topicName and sleep based on the configuration
long pushedTimeStamp = delayedPublishPostProcessorEntity.getPushedTimeStamp();
delayedPublishPostProcessorEntity.setComingTopicName(delayedPublishPostProcessorEntity.getNextTopicName());
long currentTimeStamp = System.currentTimeMillis();
if (CMSKafkaConstants.FIVE_MINUTES_DELAYED_TOPIC
.equalsIgnoreCase(delayedPublishPostProcessorEntity.getNextTopicName())) {
long timeElapsed = currentTimeStamp - pushedTimeStamp;
if ((Long.parseLong(firstDelay)-timeElapsed) > 0) {
// wait for timeToWait
try {
Thread.sleep(Long.parseLong(firstDelay)-timeElapsed);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} else if (CMSKafkaConstants.THRITY_MINUTES_DELAYED_TOPIC
.equalsIgnoreCase(delayedPublishPostProcessorEntity.getNextTopicName())) {
long timeElapsed = currentTimeStamp - pushedTimeStamp;
if ((Long.parseLong(secondDelay)-timeElapsed) > 0) {
try {
Thread.sleep(Long.parseLong(secondDelay)-timeElapsed);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}发布于 2020-05-28 14:18:28
您可以根据需要通过KafkaListenerEndpointRegistry (使用getListenerContainer(id).start())停止并启动消费者。
https://stackoverflow.com/questions/62065405
复制相似问题