首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何从主题末尾开始阅读,而不考虑组的承诺偏移量

如何从主题末尾开始阅读,而不考虑组的承诺偏移量
EN

Stack Overflow用户
提问于 2019-05-15 22:04:01
回答 1查看 145关注 0票数 0

我正在使用以下包来消费kafka消息

代码语言:javascript
复制
compile 'org.springframework.boot:spring-boot-starter-webflux'
compile("org.springframework.boot:spring-boot-starter-web")
// tag::actuator[]
compile("org.springframework.boot:spring-boot-starter-actuator")
compile('org.springframework.kafka:spring-kafka:2.1.7.RELEASE')
compile 'io.projectreactor.kafka:reactor-kafka:1.0.0.RELEASE'

我希望从主题的末尾消费消息,而不管组的提交偏移量

当我搜索时,我发现我们可以使用以下代码

代码语言:javascript
复制
consumer = new KafkaConsumer<>(properties);
consumer.seekToEnd(Collections.emptySet());

但是我无法找到如何在spring boot webflux中使用上述代码。

代码语言:javascript
复制
@Component
public class EventConsumer
{
    private final EmitterProcessor<ServerSentEvent<String>> emitter = EmitterProcessor.create();

    public Flux<ServerSentEvent<String>> get()
    {
        return emitter;
    }

    @KafkaListener(topics = "${kafka.zone.status.topic.name}")
    public void receive(String data)
    {
        //System.out.println(data);
        emitter.onNext(ServerSentEvent.builder(data).id(UUID.randomUUID().toString()).build());
    }
}
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-05-15 23:37:49

参见the documentation

onPartitionsAssigned方法中实现ConsumerSeekAware并执行查找。

代码语言:javascript
复制
@Component
public class EventConsumer implements ConsumerSeekAware {

    private final EmitterProcessor<ServerSentEvent<String>> emitter = EmitterProcessor.create();

    public Flux<ServerSentEvent<String>> get() {
        return emitter;
    }

    @KafkaListener(topics = "${kafka.zone.status.topic.name}")
    public void receive(String data) {
        // System.out.println(data);
        emitter.onNext(ServerSentEvent.builder(data).id(UUID.randomUUID().toString()).build());
    }

    @Override
    public void registerSeekCallback(ConsumerSeekCallback callback) {

    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        assignments.keySet().forEach(tp -> callback.seekToEnd(tp.topic(), tp.partition()));
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {

    }

}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56151257

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档