我想使用spring-integration-aws向AWS SNS发送消息,并从AWS SQS接收消息。我在理解如何将我的应用程序从spring-coud-aws-messaging迁移到使用它时遇到了一些麻烦。我的代码基本上是一个SQS配置类:
@Configuration
@EnableConfigurationProperties(SqsProperties.class)
@Profile("!test")
public class SqsConfiguration {
private final SqsProperties sqsProperties;
@Autowired
public SqsConfiguration(SqsProperties sqsProperties) {
this.sqsProperties = sqsProperties;
}
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer() {
SimpleMessageListenerContainer msgListenerContainer =
simpleMessageListenerContainerFactory().createSimpleMessageListenerContainer();
msgListenerContainer.setMessageHandler(queueMessageHandler());
return msgListenerContainer;
}
@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory() {
SimpleMessageListenerContainerFactory msgListenerContainerFactory = new SimpleMessageListenerContainerFactory();
msgListenerContainerFactory.setAmazonSqs(amazonSQSClient());
msgListenerContainerFactory.setDestinationResolver(customDestinationResolver(sqsProperties));
return msgListenerContainerFactory;
}
@Bean
public QueueMessageHandler queueMessageHandler() {
QueueMessageHandlerFactory queueMsgHandlerFactory = new QueueMessageHandlerFactory();
queueMsgHandlerFactory.setAmazonSqs(amazonSQSClient());
return queueMsgHandlerFactory.createQueueMessageHandler();
}
@Bean(name = "amazonSQS", destroyMethod = "shutdown")
public AmazonSQSAsync amazonSQSClient() {
return AmazonSQSAsyncClientBuilder.defaultClient();
}
@Bean
public SqsQueuesDestinationResolver customDestinationResolver(SqsProperties sqsProperties) {
return new SqsQueuesDestinationResolver(sqsProperties);
}
}它使用如下定义的customDestinationResolver (基本上是因为队列名称只能根据不同的部署而是动态的):
public class SqsQueuesDestinationResolver implements DestinationResolver<String> {
private static final Logger LOG = LoggerFactory.getLogger(SqsQueuesDestinationResolver.class);
private SqsProperties sqsProperties;
@Autowired
private AmazonSQSAsync amazonSQSAsync;
public SqsQueuesDestinationResolver(SqsProperties sqsProperties) {
this.sqsProperties = sqsProperties;
}
@Override
public String resolveDestination(String queueName) throws DestinationResolutionException {
String finalQueueName = getFinalQueueName(queueName);
try {
return amazonSQSAsync.getQueueUrl(finalQueueName).getQueueUrl();
}
catch (QueueDoesNotExistException queueDoesNotExistException) {
LOG.error(String.format("The '%s' queue was not found.", finalQueueName));
}
return null;
}
private String getFinalQueueName(String queueName) {
String finalQueueName;
switch (queueName) {
case "queue-foo":
finalQueueName = sqsProperties.getFooQueue();
break; case "queue-bar":
finalQueueName = sqsProperties.getBarQueue();
break;
default:
finalQueueName = null;
}
return finalQueueName;
}
}这就是全部内容:基本上,在这个配置中,我只需要在一个适当的使用者方法中使用注释@SqsListener("foo-queue")或@SqsListener("bar-queue")
@SqsListener("foo-queue")
public void listen(String message){
processMessage(message);
}我一直在尝试遵循https://github.com/spring-projects/spring-integration-aws#spring-integrations-extensions-to-aws的文档,在“入站通道适配器”一章中,我一直在努力理解的是,既然我使用了一个自定义的destinationResolver,我应该给SqsMessageDrivenChannelAdapter()的构造函数提供什么队列作为参数,以及我到底应该如何使用这些消息,或者它是否应该像以前一样使用来自spring-cloud-aws-messaging的@SqsListener注解。
谢谢你的帮助,如果这不是合适的地方,或者如果这是一个非常愚蠢的问题,我很抱歉,我只是第一次尝试这个:)
发布于 2020-04-09 04:11:43
我不确定您遗漏了什么,但您仍然可以在SqsMessageDrivenChannelAdapter(AmazonSQSAsync amazonSqs, String... queues) ctor中使用您的foo-queue和bar-queue。
您的自定义DestinationResolver确实也可以在那里使用:
public void setDestinationResolver(DestinationResolver<String> destinationResolver) {
this.simpleMessageListenerContainerFactory.setDestinationResolver(destinationResolver);
}事实上,这个SqsMessageDrivenChannelAdapter完全基于相同的SimpleMessageListenerContainerFactory。
要使用从SQS接收的消息,只需使用MessageChannel配置此通道适配器即可生成消息。然后你向下游订阅该频道。
有关通道和消息的更多信息,请参阅Spring Integration Reference Manual:https://docs.spring.io/spring-integration/docs/5.3.0.M4/reference/html/index.html
https://stackoverflow.com/questions/61108842
复制相似问题