我正在尝试实现一个spring引导aws动态使用者,它能够自动缩放,以便与原始实例共享负载(拆分处理碎片)。
我所能做的是:使用定义良好的read和这里可用的示例(运动活页夹文档),我已经能够启动多个使用者,通过提供这些属性来划分处理的碎片。
在生产者上,我通过一个应用程序属性提供partitionCount: 2。在消费者方面,我提供instanceIndex和instanceCount。
在消费者1上有instanceIndex=0和instantCount=2,在消费者2上有instanceIndex=1和instantCount=2
这很好,我有两个spring引导应用程序处理它们的特定碎片。但在这种情况下,我必须为每个引导应用程序配置一个预配置的属性文件,这些属性文件需要在加载时可用,这样才能将加载分开。如果我只启动第一个使用者(非自动缩放),我只处理特定于索引0的碎片,而其他碎片仍未处理。
我想要做的(但不确定是否可能)是部署一个单一的使用者(处理所有的碎片)。如果我部署了另一个实例,我希望这个实例能够重新体验某些负载的第一个使用者,换句话说,如果我有2个碎片,一个用户,它将处理这两个部分,如果我然后部署另一个应用程序,我希望第一个使用者现在只处理从一个碎片到第二个用户的进程。
我试图这样做,没有在使用者上指定instanceIndex或instanceCount,只提供组名,但这使得第二个使用者处于空闲状态,直到第一个使用者被关闭。FYI --我还创建了自己的元数据和锁定表,阻止了绑定程序创建默认的元数据和锁定表。
配置:生产者?
originator: KinesisProducer
server:
port: 8090
spring:
cloud:
stream:
bindings:
output:
destination: <stream-name>
content-type: application/json
producer:
headerMode: none
partitionKeyExpression: headers.typeoriginator: KinesisSink
server:
port: 8091
spring:
cloud:
stream:
kinesis:
bindings:
input:
consumer:
listenerMode: batch
recordsLimit: 10
shardIteratorType: TRIM_HORIZON
binder:
checkpoint:
table: <checkpoint-table>
locks:
table: <locking-table
bindings:
input:
destination: <stream-name>
content-type: application/json
consumer:
concurrency: 1
listenerMode: batch
useNativeDecoding: true
recordsLimit: 10
idleBetweenPolls: 250
partitioned: true
group: mygroup发布于 2018-09-19 13:37:14
这是正确的。这就是它现在的工作方式:如果一个消费者在那里,那么它就需要所有的碎片来处理。第二个将采取行动,只有当第一个是以某种方式被打破至少一个碎片。
正确的卡夫卡式再平衡就在我们的路线图上。我们还没有坚实的愿景,所以问题和后续的贡献是欢迎的!
https://stackoverflow.com/questions/52378232
复制相似问题