我是卡夫卡的新手,想要通过消费者批量加工。
阅读文档并发现,从3.0版本开始,我们可以启用批处理。
目前,我们正在为kafka使用Spring Boot 2.1.3.RELEASE和下面的依赖项:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Greenwich.SR3</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>在启动属性和代码更改之前,我需要在pom.xml中做哪些更改?我需要更改Springboot版本吗?
发布于 2020-07-08 20:25:21
您可以使用@StreamListener作为批处理使用它。你只需要给一个反序列化器。例子:
你只需要给一个反序列化器。
public class Person {
private String name;
private String surname;
.........
}
@StreamListener(value = PersonStream.INPUT)
private void personBulkReceiver(List<Person> person) {
System.out.println("personBulkReceiver : " + person.size());
}
spring:
cloud:
stream:
kafka:
binders:
bulkKafka:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
configuration:
max.poll.records: 1500
fetch.min.bytes: 1000000
fetch.max.wait.ms: 10000
value.deserializer: tr.cloud.stream.examples.PersonDeserializer
bindings:
person-topic-in:
binder: bulkKafka
destination: person-topic
contentType: application/person
group : omercelik
consumer:
batch-mode: true
public class PersonDeserializer extends JsonDeserializer<Person> {
}https://stackoverflow.com/questions/62794772
复制相似问题