首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >用spring-cloud-stream和函数风格暂停Kafka Consumer

用spring-cloud-stream和函数风格暂停Kafka Consumer
EN

Stack Overflow用户
提问于 2021-02-10 00:56:32
回答 1查看 448关注 0票数 0

我正在尝试为我的kafka流应用程序实现一个重试机制。其想法是,我将从输入主题中获取使用者和分区ID以及主题名称,然后在有效负载中存储的持续时间内暂停使用者。

我搜索过文档和示例,但我找到的都是基于spring-cloud-stream提供的经典绑定的示例。我正在尝试看看是否有一种方法可以用函数式风格访问这些信息。

例如,下面的代码可以让我使用经典绑定样式访问消费者。

代码语言:javascript
复制
@StreamListener(Sink.INPUT)
public void in(String in, @Header(KafkaHeaders.CONSUMER) Consumer consumer) {
    System.out.println(in);
    consumer.pause(Collections.singleton(new TopicPartition("myTopic", 0)));
}

如何获得与

函数式风格

我尝试了下面的代码,但我得到了异常,说没有找到这样的绑定。

代码语言:javascript
复制
@Bean
public Function, KStream> process() {
    message -> {
        Consumer consumer = message.getHeaders().get(KafkaHeaders.Consumer, Consumer.class);
        String topic = message.getHeaders().get(KafkaHeaders.Topic, String.class);
        Integer partitionId = message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class);
        CustomPayload payload = (CustomPayload) message.getPayload();
        if (payload.getRetryTime() < System.currentTimeMillis()) {
            consumer.pause(Collections.singleton(new TopicPartition(topic, partitionId)));
        }
    }
}

我得到的异常

代码语言:javascript
复制
Caused by: java.lang.IllegalStateException: No factory found for binding target type: org.springframework.messaging.Message among registered factories: channelFactory,messageSourceFactory,kStreamBoundElementFactory,kTableBoundElementFactory,globalKTableBoundElementFactory
    at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.getBindingTargetFactory(AbstractBindableProxyFactory.java:82)
    at org.springframework.cloud.stream.binder.kafka.streams.function.KafkaStreamsBindableProxyFactory.bindInput(KafkaStreamsBindableProxyFactory.java:191)
    at org.springframework.cloud.stream.binder.kafka.streams.function.KafkaStreamsBindableProxyFactory.afterPropertiesSet(KafkaStreamsBindableProxyFactory.java:111)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1853)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1790)
    ... 96 more
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-03-02 03:18:39

在您的功能bean示例中,您将两者混合在一起

..。这就是那个特定异常的原因。功能bean可以重写如下。

代码语言:javascript
复制
@Bean
public java.util.function.Consumer> process() {
    return message -> {
        Consumer consumer = message.getHeaders().get(KafkaHeaders.Consumer, Consumer.class);
        String topic = message.getHeaders().get(KafkaHeaders.Topic, String.class);
        Integer partitionId = message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class);
        CustomPayload payload = (CustomPayload) message.getPayload();
        if (payload.getRetryTime() < System.currentTimeMillis()) {
            consumer.pause(Collections.singleton(new TopicPartition(topic, partitionId)));
        }
    }
}
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/66123464

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档