首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用新的消费api将kafka升级到.9

使用新的消费api将kafka升级到.9
EN

Stack Overflow用户
提问于 2016-03-09 15:46:02
回答 2查看 4.6K关注 0票数 6

我们正在将kafka实现升级到.9,并使用新的使用者java来创建使用者。我正在为消费者使用下面的代码,我们使用对消费者设置主题,如A行和B行是对我们的服务的调用,它处理我们接收到的消息。现在的问题是,如果我们的消息处理时间超过30秒,就会出现异常。

代码语言:javascript
复制
    Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("group.id", "test-group");
            props.put("auto.offset.reset", "earliest");
            props.put("heartbeat.interval.ms", "1000");
            props.put("receive.buffer.bytes", 10485760);
            props.put("fetch.message.max.bytes", 5242880);
            props.put("enable.auto.commit", false);
    //with partition assigned to consumer


            KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(props);
           // TopicPartition partition0 = new TopicPartition("TEST-TOPIC", 0);
            //consumer.assign(Arrays.asList(partition0));
            //assign topic to consumer without partition
//LINE A
            consumer.subscribe(Arrays.asList("TEST-TOPIC"), new ConsumerRebalanceListenerImp());
            List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
            while (true) {
                try {
                    ConsumerRecords<Object, Object> records = consumer.poll(1000);
                    consumeFromQueue(records);//LINE B
                    consumer.commitSync();

                } catch (CommitFailedException e) {
                    e.printStackTrace();
                    System.out.println("CommitFailedException");
                } catch (Exception e) {
                    e.printStackTrace();
                    System.out.println("Exception in while consuming messages");
                }

例外是

2016-03-03 10:47:35.095 INFO 6448 --询问调度程序-3 o.a.k.c.c.internals.AbstractCoordinator :标记协调器2147483647死亡。2016-03-03 10:47:35.096错误6448( org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:665) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:644) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) at org.apache.kafka.clients ).consumer.internals.RequestFuture.complete(RequestFuture.java:107) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:274) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)

以上异常是在提交偏移时出现的。任何建议都会有助于感谢你。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2016-03-09 18:43:24

之所以会发生这种情况,是因为新的使用者是单线程的,与使用者组保持心跳的唯一方法是轮询或提交偏移量,30秒后,组协调器将您的使用者标记为死了,并要求组重新平衡。对于这种情况,您可以增加request.timeout.ms,也可以将消耗和处理的工作拆分到两个线程之间。

票数 6
EN

Stack Overflow用户

发布于 2017-02-03 14:37:26

您可以通过以下方式限制轮询()返回的消息数量

代码语言:javascript
复制
max.partition.fetch.bytes

到一个合适的阈值,这个阈值比您最大的消息要大,但是太低,您每次投票得到的消息都会更少。

Kafka 0.10.x支持显式限制返回客户端的消息数量,方法是设置

代码语言:javascript
复制
max.poll.records
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/35896194

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档