我们正在将kafka实现升级到.9,并使用新的使用者java来创建使用者。我正在为消费者使用下面的代码,我们使用对消费者设置主题,如A行和B行是对我们的服务的调用,它处理我们接收到的消息。现在的问题是,如果我们的消息处理时间超过30秒,就会出现异常。
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("auto.offset.reset", "earliest");
props.put("heartbeat.interval.ms", "1000");
props.put("receive.buffer.bytes", 10485760);
props.put("fetch.message.max.bytes", 5242880);
props.put("enable.auto.commit", false);
//with partition assigned to consumer
KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(props);
// TopicPartition partition0 = new TopicPartition("TEST-TOPIC", 0);
//consumer.assign(Arrays.asList(partition0));
//assign topic to consumer without partition
//LINE A
consumer.subscribe(Arrays.asList("TEST-TOPIC"), new ConsumerRebalanceListenerImp());
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
try {
ConsumerRecords<Object, Object> records = consumer.poll(1000);
consumeFromQueue(records);//LINE B
consumer.commitSync();
} catch (CommitFailedException e) {
e.printStackTrace();
System.out.println("CommitFailedException");
} catch (Exception e) {
e.printStackTrace();
System.out.println("Exception in while consuming messages");
}例外是
2016-03-03 10:47:35.095 INFO 6448 --询问调度程序-3 o.a.k.c.c.internals.AbstractCoordinator :标记协调器2147483647死亡。2016-03-03 10:47:35.096错误6448( org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:665) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:644) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) at org.apache.kafka.clients ).consumer.internals.RequestFuture.complete(RequestFuture.java:107) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:274) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
以上异常是在提交偏移时出现的。任何建议都会有助于感谢你。
发布于 2016-03-09 18:43:24
之所以会发生这种情况,是因为新的使用者是单线程的,与使用者组保持心跳的唯一方法是轮询或提交偏移量,30秒后,组协调器将您的使用者标记为死了,并要求组重新平衡。对于这种情况,您可以增加request.timeout.ms,也可以将消耗和处理的工作拆分到两个线程之间。
发布于 2017-02-03 14:37:26
您可以通过以下方式限制轮询()返回的消息数量
max.partition.fetch.bytes到一个合适的阈值,这个阈值比您最大的消息要大,但是太低,您每次投票得到的消息都会更少。
Kafka 0.10.x支持显式限制返回客户端的消息数量,方法是设置
max.poll.recordshttps://stackoverflow.com/questions/35896194
复制相似问题