首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >当消费者宕机时,Kafka消息丢失

当消费者宕机时,Kafka消息丢失
EN

Stack Overflow用户
提问于 2019-02-19 02:10:55
回答 1查看 870关注 0票数 1

你好,我正在写一个使用spring cloud stream的kafka消费者-生产者。在我的消费者内部,我将我的数据保存到数据库中,如果数据库关闭,我将手动退出应用程序.After重启应用程序,如果数据库仍然关闭,则应用程序再次停止。现在,如果我第三次重启应用程序,在中间时间间隔(两次失败)收到的消息丢失,kafka使用者将获取最新的消息,并跳过我退出代码的消息。

入站和出站通道绑定器接口

代码语言:javascript
复制
public interface EventChannel {

String inputEvent = "inputChannel";
String outputEvent = "outputChannel";

@Input(inputChannel)
SubscribableChannel consumeEvent();

@Output(outputEvent)
SubscribableChannel produceEvent();
}

服务类别-

1)生产者服务

代码语言:javascript
复制
@Service
@EnableBinding(EventChannel.class)

public class EventProducerService{

private final EventChannel eventChannel;

@Autowired  
public EventConsumerService(EventChannel eventChannel){
this.eventChannel = eventChannel;
}

public void postEvent(EventDTO event) {
    MessageChannel messageChannel = eventChannel.produceEvent();
    messageChannel.send(MessageBuilder
            .withPayload(event)
            .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
            .setHeader("partitionKey",event.getId().toString())
            .build());     
    }
}

2)消费者服务

代码语言:javascript
复制
@Component
@EnableBinding(EventChannel.class)
public class EventConsumerService{ 

private final ApplicationContext applicationContext;
private final EventChannel eventChannel;

@Autowired  
public EventConsumerService(ApplicationContext applicationContext,EventChannel eventChannel){
this.applicationContext = applicationContext;
this.eventChannel = eventChannel;
}

@StreamListener(EventChannel.inputEvent)
public void saveUpdateCassandra(EventDTO event){
  Event newEvent = new Event(event);
  try{
     eventRepository.save(newEvent)
    } catch(Exceptione e){
     e.printStackTrace();
     SpringApplication.exit(applicationContext,()-> 0); 
  }
}

应用程序属性文件

代码语言:javascript
复制
#Spring Cloud Streams Configuration
##Broker
spring.cloud.stream.kafka.binder.brokers=localhost:9092
##EventIngestion 
spring.cloud.stream.bindings.outputChannel.destination=Event
spring.cloud.stream.bindings.outputChannel.producer.partitionKeyExpression=headers.partitionKey
spring.cloud.stream.bindings.inputChannel.destination=Event
spring.cloud.stream.bindings.inputChannel.group=event-consumer-1
spring.cloud.stream.kafka.bindings.inputChannel.consumer.startOffset=earliest

这两个应用程序都是独立运行的,所以如果我的数据库宕机,使用者会停止,如果连续失败,消息就会丢失

EN

回答 1

Stack Overflow用户

发布于 2019-02-19 03:21:25

首先,我不确定您对SpringApplication.exit(applicationContext,()-> 0);的期望是什么,但您实际上是在拖垮整个应用程序,其中可能运行的所有东西都在那里运行。其次,你的消息丢失是因为Kafka绑定器完全不知道发生了异常,它必须将消息放回主题上。实际上,从绑定器的角度来看,由于代码的原因,每条消息都会被成功处理。所以。。。

请从您的StreamListener方法中删除try/catch,并让异常传播,从而让绑定器知道存在错误。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/54753093

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档