在命令执行期间,我需要以这样的方式对命令进行排队,以避免冲突。命令是按顺序接收的。命令对特定实体进行操作,因此我希望根据它们要操作的实体对命令进行排队。例如,命令c1和c3对实体id 5进行操作,因此它们需要按照接收到q5的顺序进行操作。同时,命令c2和c4对实体id 6进行操作,因此它们需要进入特定于该实体的队列(q-6)。我不确定spring集成是否是这个用例的最佳选择,但以下是我到目前为止所拥有的:
@Bean
public IntegrationFlow commands() {
return f -> f .<Command>split()
.enrichHeaders(h -> {
h.headerExpression("objectId", "payload.objectId.toString() ");
})
.route("@channelResolver.resolve(headers['objectId'])") // how to create channels dynamically
.handle(mySingletonBeanHandler); // how to use singleton bean here
}我面临的问题是使用动态路由器。由于我事先不知道实体I,所以我希望动态地创建这些队列。我得到的例外是:
'currentComponent‘currentComponent是一个单向的'MessageHandler’,配置'outputChannel‘是不合适的。这是集成流的结束。
所以,我的问题是:
如何为每个entity/object-Id.
我是春季集成的新手,希望在可能的情况下继续使用java以避免混淆。
发布于 2022-09-06 19:35:41
实际上,不建议在运行时创建通道:更详细的是,您可能会询问最终如何删除它们。你也不知道他们中会有多少人在那里。
我建议你换一种可能适合你的模式。
ExecutorChannel,用于并行执行每个实体组。还可以使用resequencer,因为在上一次发布之前,它不允许释放下一个命令,但是如果我们使用ExecutorChannel进行并行处理,那么它的用途就会被忽略。
聚合器的问题在于,必须等待所有命令,实体才能释放它们进行处理。
或者..。对于某些并行性和相应的路由器,您只需使用几个QueueChannel将相同密钥的消息发送到同一个队列通道。所有这些队列都可以具有相同的使用者。我想这就是你提到你的mySingletonBeanHandler的地方。您确实不能在route()之后继续这个流,但是您可以在这些QueueChannel bean上使用一个@BridgeTo("myHandlerChannel")将它们的消息发送给您的处理程序,从这个通道开始,然后是handle()。
在CPU允许的情况下,您无法达到更多的并行性,因此,无论哪种方式都没有“每个实体队列”的意义。
https://stackoverflow.com/questions/73625914
复制相似问题