使用spring-cloud-stream的kafka绑定器,如何配置并发消息消费者(在单个消费者jvm中)?如果我理解正确的话,使用kafka时使用并发消息消耗需要分区,但s-c-s docs指出,要使用分区,您需要通过partitionKeyExpression或partitionKeyExtractorClass在生产者中指定分区选择。Kafka文档提到了循环分区。
s-c-s文档根本没有提到spring.cloud.stream.bindings.*.concurrency,尽管这在我上面描述的用例中似乎很重要。使用生产者配置
spring:
cloud:
stream:
bindings:
customer-save:
destination: customer-save
group: customer-save
content-type: application/json
partitionCount: 3和消费者配置
spring:
cloud:
stream:
bindings:
customer-save:
destination: customer-save
group: customer-save
content-type: application/x-java-object;type=foo.Customer
partitioned: true
concurrency: 3我似乎得到了我想要的行为(至少在某种程度上)。我可以看到有时有3个消费者线程处于活动状态,尽管看起来确实存在一些分区,而不是循环调度,因为一些消息似乎在等待繁忙的消费者线程,并在该线程完成后被消耗。我假设这是因为消息被发送到相同的分区。
当我没有指定partitionKeyExpression或partitionKeyExtractorClass时,是否有一些在生产者上使用的缺省键提取和分区策略?这是使用kafka设置s-c-s消费者的合适方式吗?您希望多个线程使用消息以增加消费者吞吐量吗?
发布于 2016-03-08 06:36:35
由于您的生产者没有分区(没有partitionKeyExpression集),生产者端将轮询3个分区(如果这不是观察到的行为,请在Git Hub中打开工单)。如果您配置了partitionKeyExpression,那么生产者将根据配置的逻辑有效地对数据进行分区。
在消费者端,我们确保线程/分区亲和性,因为这是一个广受推崇的Kafka约定-我们确保给定分区上的消息按顺序处理-这可能解释了您正在观察的行为。如果将消息A,B,C,D发送到分区0,1,2,0 -D将不得不等待,直到A被处理,即使还有另外两个线程可用。
增加吞吐量的一种选择是过度分区(这是Kafka中相当典型的策略)。这将进一步分散消息,并增加消息被发送到不同线程的机会。
如果您不关心排序,那么增加吞吐量的另一个选择是在下游异步处理消息:例如,通过将输入通道桥接到ExecutorChannel。
一般来说,partitioned指的是客户端接收分区数据的能力(Kafka客户端始终是分区的,但这个设置也适用于Rabbit和/或Redis)。它与属性instanceIndex和instanceCount一起使用,以确保在多个应用程序实例之间正确划分主题的分区(请参阅http://docs.spring.io/spring-cloud-stream/docs/1.0.0.M4/reference/htmlsingle/index.html#_instance_index_and_instance_count)
https://stackoverflow.com/questions/35854603
复制相似问题