首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在Spring中通过Kafka消耗批处理

在Spring中通过Kafka消耗批处理
EN

Stack Overflow用户
提问于 2020-07-08 12:25:44
回答 1查看 1.5K关注 0票数 1

我是卡夫卡的新手,想要通过消费者批量加工。

阅读文档并发现,从3.0版本开始,我们可以启用批处理。

目前,我们正在为kafka使用Spring Boot 2.1.3.RELEASE和下面的依赖项:

代码语言:javascript
复制
<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>Greenwich.SR3</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

在启动属性和代码更改之前,我需要在pom.xml中做哪些更改?我需要更改Springboot版本吗?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-07-08 20:25:21

您可以使用@StreamListener作为批处理使用它。你只需要给一个反序列化器。例子:

你只需要给一个反序列化器。

代码语言:javascript
复制
public class Person {

    private String name;
    private String surname;
    .........
}


   @StreamListener(value = PersonStream.INPUT)
    private void personBulkReceiver(List<Person> person) {
        System.out.println("personBulkReceiver : " + person.size());
    }


spring:
  cloud:
    stream:
      kafka:
      binders:
        bulkKafka:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: localhost:9092
                      configuration:
                        max.poll.records: 1500
                        fetch.min.bytes: 1000000
                        fetch.max.wait.ms: 10000
                        value.deserializer: tr.cloud.stream.examples.PersonDeserializer
      bindings:
        person-topic-in:
          binder: bulkKafka
          destination: person-topic
          contentType: application/person
          group : omercelik
          consumer:
            batch-mode: true

public class PersonDeserializer extends JsonDeserializer<Person> {
}
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/62794772

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档