首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Sping Cloud Stream Kafka -批量消费消息,单条处理消息发送

Sping Cloud Stream Kafka -批量消费消息,单条处理消息发送
EN

Stack Overflow用户
提问于 2019-11-08 19:45:13
回答 2查看 1.1K关注 0票数 0

我尝试为下一个Spring Cloud Stream版本准备我们的应用程序。(当前使用3.0.0.RC1)。使用Kafka活页夹。

现在,我们收到一条消息,对其进行处理,然后将其重新发送到另一个主题。单独处理每条消息会导致对数据库的大量单个请求。

在3.0.0版本中,我们希望以批处理的方式处理消息,这样我们就可以在批处理更新中保存数据。

在当前版本中,我们使用@EnableBinding、@StreamListener

代码语言:javascript
复制
@StreamListener( ExchangeableItemProcessor.STOCK_INPUT )
public void processExchangeableStocks( final ExchangeableStock item ) {
    publishItems( exchangeableItemProcessor.stocks(), articleService.updateStockInformation( Collections.singletonList( item ) ) );
}

void publishItems( final MessageChannel messageChannel, final List<? extends ExchangeableItem> item ) {
    for ( final ExchangeableItem exchangeableItem : item ) {
        final Message<ExchangeableItem> message = MessageBuilder.withPayload( exchangeableItem )
                            .setHeader( "partitionKey", exchangeableItem.getId() )
                            .build();
        messageChannel.send( message )
    }
}

我已经将消费者属性设置为“批处理模式”,并将签名更改为List<>,但这样做会导致收到List<byte[]>而不是预期的List<ExchangeableStock>。Ofc可以在之后进行转换,但感觉像是"meh",我认为这应该是在调用侦听器之前发生的事情。

然后我尝试了(新的)函数版本,消费工作得很好。我也喜欢这个简单的处理版本

代码语言:javascript
复制
@Bean
public Function<List<ExchangeableStock>, List<ExchangeableStock>> stocks() {
    return articleService::updateStockInformation;
}

但是,输出主题现在将对象列表作为一条消息接收,并且以下使用者不能正常工作。

我想我错过了什么..。

我是否需要添加某种类型的MessageConverter (对于注释驱动版本),或者是否也有一种方法可以在函数版本中实现所需的行为?

EN

回答 2

Stack Overflow用户

发布于 2019-11-08 21:47:30

IIRC,只有函数支持批处理模式。

你能不能不像现在在你的StreamListener中那样使用Consumer<List< ExchangeableStock>>向通道发送消息?

票数 0
EN

Stack Overflow用户

发布于 2019-11-12 16:28:05

我已经做到了:

代码语言:javascript
复制
@Bean
@Measure
public Consumer<List<ExchangeableStock>> stocks() {
    return items -> {
        for ( final ExchangeableStock exchangeableItem : articleService. updateStockInformation( items ) ) {
            final Message<?> message = MessageBuilder.withPayload( exchangeableItem )
                            .setHeader( "partitionKey", exchangeableItem.getId() )
                            .setHeader( KafkaHeaders.TOPIC, "stocks-stg" )
                            .build();

            processor.onNext( message );
        }
    };
}

private final TopicProcessor<Message<?>> processor = TopicProcessor.create();

@Bean
@Measure
public Supplier<Flux<?>> source() {
    return () -> processor;
}

但是动态目的地解析对我不起作用。我尝试使用KafkaHeaders.TOPICspring.cloud.stream.sendto.destination作为头文件,并设置Kafka绑定生产者属性use-topic-header: true (用于绑定source-out-0)

如果我为source-out-0设置一个目的地,它是有效的,但这样做会导致大量的TopicProceessorSupplier-我们大约有10种不同的消息类型。

也许我错过了一些小东西来获得动态目标解析工作……

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/58765961

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档