我使用的是一个Gateway,在网关内部,我正在循环通过sqs-outbound adapter将消息发送到两个队列。
我想实现这样的目标:
jsonArray.forEach(data -> {
Future<AggregatedAckorErrorFrom2Queues> result = dataPublishGateway.publishToService(data);
futures.add(result);
});
futures.forEach(f -> {
System.out.println("Future Result -> "+ f.get())
});网关与SQS适配器Config
<!-- Gateway to Publish to SQS -->
<task:executor id="dataExecutor" pool-size="5"/>
<int:publish-subscribe-channel id="dataChannel" task-executor="dataExecutor" />
<int:gateway service-interface="com.integration.gateway.PublishGateway" id="dataPublishGateway">
<int:method name="publishToDataService" payload-expression="#args[0]" request-channel="dataChannel" />
</int:gateway>
<!-- Send to 2 Queues -->
<int:channel id="successChannel"/>
<int-aws:sqs-outbound-channel-adapter sqs="amazonSQS"
queue="queue-1"
channel="dataChannel"
success-channel="successChannel"
failure-channel="errorChannel"/>
<int-aws:sqs-outbound-channel-adapter sqs="amazonSQS"
queue="queue-2"
channel="dataChannel"
success-channel="successChannel"
failure-channel="errorChannel"/>我看的是apply-sequence上的dataChannel和aggregator,但是aggregator必须能够同时处理ack和error。
问题:如何将聚合响应(ack+)从2个队列返回到网关?
发布于 2020-09-03 17:12:56
你在正确的方向上与那些success-channel和failure-channel。聚合器可以订阅该successChannel,并使用默认策略进行关联和发布。对于failure-channel,您可能需要一个自定义错误通道,而不是一个全局errorChannel。从ErrorMessage发送到该信道的SqsMessageHandler有一个类似于这个AwsRequestFailureException的有效负载,其中它的failedMessage实际上包含一个包含上述相关细节的请求消息。因此,在进入聚合器之前,您需要添加一些转换步骤来从异常中提取该信息。
https://stackoverflow.com/questions/63720635
复制相似问题