我尝试为下一个Spring Cloud Stream版本准备我们的应用程序。(当前使用3.0.0.RC1)。使用Kafka活页夹。
现在,我们收到一条消息,对其进行处理,然后将其重新发送到另一个主题。单独处理每条消息会导致对数据库的大量单个请求。
在3.0.0版本中,我们希望以批处理的方式处理消息,这样我们就可以在批处理更新中保存数据。
在当前版本中,我们使用@EnableBinding、@StreamListener
@StreamListener( ExchangeableItemProcessor.STOCK_INPUT )
public void processExchangeableStocks( final ExchangeableStock item ) {
publishItems( exchangeableItemProcessor.stocks(), articleService.updateStockInformation( Collections.singletonList( item ) ) );
}
void publishItems( final MessageChannel messageChannel, final List<? extends ExchangeableItem> item ) {
for ( final ExchangeableItem exchangeableItem : item ) {
final Message<ExchangeableItem> message = MessageBuilder.withPayload( exchangeableItem )
.setHeader( "partitionKey", exchangeableItem.getId() )
.build();
messageChannel.send( message )
}
}我已经将消费者属性设置为“批处理模式”,并将签名更改为List<>,但这样做会导致收到List<byte[]>而不是预期的List<ExchangeableStock>。Ofc可以在之后进行转换,但感觉像是"meh",我认为这应该是在调用侦听器之前发生的事情。
然后我尝试了(新的)函数版本,消费工作得很好。我也喜欢这个简单的处理版本
@Bean
public Function<List<ExchangeableStock>, List<ExchangeableStock>> stocks() {
return articleService::updateStockInformation;
}但是,输出主题现在将对象列表作为一条消息接收,并且以下使用者不能正常工作。
我想我错过了什么..。
我是否需要添加某种类型的MessageConverter (对于注释驱动版本),或者是否也有一种方法可以在函数版本中实现所需的行为?
发布于 2019-11-08 21:47:30
IIRC,只有函数支持批处理模式。
你能不能不像现在在你的StreamListener中那样使用Consumer<List< ExchangeableStock>>向通道发送消息?
发布于 2019-11-12 16:28:05
我已经做到了:
@Bean
@Measure
public Consumer<List<ExchangeableStock>> stocks() {
return items -> {
for ( final ExchangeableStock exchangeableItem : articleService. updateStockInformation( items ) ) {
final Message<?> message = MessageBuilder.withPayload( exchangeableItem )
.setHeader( "partitionKey", exchangeableItem.getId() )
.setHeader( KafkaHeaders.TOPIC, "stocks-stg" )
.build();
processor.onNext( message );
}
};
}
private final TopicProcessor<Message<?>> processor = TopicProcessor.create();
@Bean
@Measure
public Supplier<Flux<?>> source() {
return () -> processor;
}但是动态目的地解析对我不起作用。我尝试使用KafkaHeaders.TOPIC和spring.cloud.stream.sendto.destination作为头文件,并设置Kafka绑定生产者属性use-topic-header: true (用于绑定source-out-0)
如果我为source-out-0设置一个目的地,它是有效的,但这样做会导致大量的TopicProceessor和Supplier-我们大约有10种不同的消息类型。
也许我错过了一些小东西来获得动态目标解析工作……
https://stackoverflow.com/questions/58765961
复制相似问题