我正在尝试为我的kafka流应用程序实现一个重试机制。其想法是,我将从输入主题中获取使用者和分区ID以及主题名称,然后在有效负载中存储的持续时间内暂停使用者。
我搜索过文档和示例,但我找到的都是基于spring-cloud-stream提供的经典绑定的示例。我正在尝试看看是否有一种方法可以用函数式风格访问这些信息。
例如,下面的代码可以让我使用经典绑定样式访问消费者。
@StreamListener(Sink.INPUT)
public void in(String in, @Header(KafkaHeaders.CONSUMER) Consumer consumer) {
System.out.println(in);
consumer.pause(Collections.singleton(new TopicPartition("myTopic", 0)));
}如何获得与
函数式风格
我尝试了下面的代码,但我得到了异常,说没有找到这样的绑定。
@Bean
public Function, KStream> process() {
message -> {
Consumer consumer = message.getHeaders().get(KafkaHeaders.Consumer, Consumer.class);
String topic = message.getHeaders().get(KafkaHeaders.Topic, String.class);
Integer partitionId = message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class);
CustomPayload payload = (CustomPayload) message.getPayload();
if (payload.getRetryTime() < System.currentTimeMillis()) {
consumer.pause(Collections.singleton(new TopicPartition(topic, partitionId)));
}
}
}我得到的异常
Caused by: java.lang.IllegalStateException: No factory found for binding target type: org.springframework.messaging.Message among registered factories: channelFactory,messageSourceFactory,kStreamBoundElementFactory,kTableBoundElementFactory,globalKTableBoundElementFactory
at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.getBindingTargetFactory(AbstractBindableProxyFactory.java:82)
at org.springframework.cloud.stream.binder.kafka.streams.function.KafkaStreamsBindableProxyFactory.bindInput(KafkaStreamsBindableProxyFactory.java:191)
at org.springframework.cloud.stream.binder.kafka.streams.function.KafkaStreamsBindableProxyFactory.afterPropertiesSet(KafkaStreamsBindableProxyFactory.java:111)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1853)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1790)
... 96 more发布于 2021-03-02 03:18:39
在您的功能bean示例中,您将两者混合在一起
和
..。这就是那个特定异常的原因。功能bean可以重写如下。
@Bean
public java.util.function.Consumer> process() {
return message -> {
Consumer consumer = message.getHeaders().get(KafkaHeaders.Consumer, Consumer.class);
String topic = message.getHeaders().get(KafkaHeaders.Topic, String.class);
Integer partitionId = message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class);
CustomPayload payload = (CustomPayload) message.getPayload();
if (payload.getRetryTime() < System.currentTimeMillis()) {
consumer.pause(Collections.singleton(new TopicPartition(topic, partitionId)));
}
}
}https://stackoverflow.com/questions/66123464
复制相似问题