我正在使用spring-kafka和spring-kafka-test版本1.0.2.RELEASE。
在我的一个测试中,我的应用程序使用KafkaTemplate和大多数默认配置设置将100条记录连续发送到EmbeddedKafka实例上的单个TopicPartion。
我使用KafkaTestUtils.getRecords(消费者)方法尝试从Kafka实例获取记录,并验证它们是否都已发送。
第一次调用getRecords时,我只收到一条记录。如果我再打一次我就能拿到剩下的99。
如果我显式地将消费者的位置设置为TopicPartition的开头,然后调用getRecords,我将得到全部100。
为什么getRecords第一次只会得到一条记录?有没有比显式地在消费者上调用seekToBeginning更好的一次获得100个的更好方法?
发布于 2016-08-13 19:31:00
这听起来像是时间问题。很可能在您第一次调用poll()时只有一条消息可用--该方法不能保证将获取多少条消息。当你写代码时,你不应该假设你会一下子收到X条记录。Kafka 0.10 max.poll.records中有一个消费者属性,出于测试目的,您可能希望将该属性设置为1,然后执行接收循环,直到您轮询完所有100个。
发布于 2016-08-12 01:41:27
很可能只是一个竞争条件--消费者坐在poll()中,代理在第一个消息到达时立即发送它。
请参见kafka docs中的属性fetch.min.bytes和fetch.max.wait.ms。
fetch.min.bytes默认为1。
编辑
您还可以在调用getRecords()之前尝试对KafkaTemplate执行flush()操作。
然而,您的测试不应该真的依赖于一次获取所有消息-太脆弱了。
https://stackoverflow.com/questions/38902377
复制相似问题