首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >未接收消息的SpringCloud AWS - SQSListener注释方法

未接收消息的SpringCloud AWS - SQSListener注释方法
EN

Stack Overflow用户
提问于 2021-10-19 00:34:44
回答 1查看 371关注 0票数 0

我正在使用SpringCloudAWS2.3.2编写一个SQS发布/消费者应用程序

代码语言:javascript
复制
<dependency>
      <groupId>io.awspring.cloud</groupId>
      <artifactId>spring-cloud-aws-messaging</artifactId>
      <version>2.3.2</version>
</dependency>

我已经可以成功地将msgs发布到我的SQS,但是我的@SqsListener注释方法不使用msgs。我在这里看了其他问题,但似乎没有提供任何适当的洞察力来解决这个问题。

我在这里跟踪API文档:https://docs.awspring.io/spring-cloud-aws/docs/current/reference/html/index.html#annotation-driven-listener-endpoints

我的配置定义如下:

代码语言:javascript
复制
@Configuration
public class SqsMessagingConfig {

    @Value("${cloud.aws.credentials.secret-key}")
    private String secretKey;
    @Value("${cloud.aws.credentials.access-key}")
    private String accessKey;

    private AWSCredentialsProvider awsCredentialsProvider() {
        return new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey,
                secretKey));
    }

    @Bean
    @Primary
    public AmazonSQSAsync amazonSQSAsync() {
        return AmazonSQSAsyncClientBuilder
                .standard()
                .withRegion("us-east-2")
                .withCredentials(awsCredentialsProvider())
                .build();
    }

    @Bean
    public QueueMessagingTemplate queueMessagingTemplate(AmazonSQSAsync amazonSQSAsync) {
        return new QueueMessagingTemplate(amazonSQSAsync);
    }

    @Bean
    public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSQSAsync) {
        SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
        factory.setAmazonSqs(amazonSQSAsync);
        factory.setAutoStartup(true);
        factory.setMaxNumberOfMessages(10);

        return factory;
    }

 
    @Bean()
    public QueueMessageHandlerFactory queueMessageHandlerFactory(final ObjectMapper mapper, final AmazonSQSAsync amazonSQSAsync) {
        final QueueMessageHandlerFactory queueHandlerFactory = new QueueMessageHandlerFactory();
        queueHandlerFactory.setAmazonSqs(amazonSQSAsync);
        queueHandlerFactory.setArgumentResolvers(Collections.singletonList(new PayloadMethodArgumentResolver(jackson2MessageConverter(mapper))));
        return queueHandlerFactory;
    }

    private MessageConverter jackson2MessageConverter(final ObjectMapper mapper) {
        final MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setObjectMapper(mapper);
        return converter;
    }
}

然后我的SqsService看起来如下所示:

代码语言:javascript
复制
@Service
public class SqsQueueService {
    private static final Logger logger = LoggerFactory.getLogger(SqsQueueService.class);
    private final QueueMessagingTemplate queueMessagingTemplate;
    private final ObjectWriter objectWriter;
    private final String QUEUE_NAME = "SCHEDULES";

    public SqsQueueService(QueueMessagingTemplate queueMessagingTemplate, ObjectMapper mapper) {
        this.queueMessagingTemplate = queueMessagingTemplate;
        this.objectWriter = mapper.writer();
    }

    public void send(List<Schedule> schedules) {
        List<String> originatingIds = schedules.stream().map(Schedule::getOriginatingId).collect(Collectors.toList());
        try {
            Message<String> message = MessageBuilder.withPayload(objectWriter.writeValueAsString(schedules))
                    .build();

            this.queueMessagingTemplate.convertAndSend(QUEUE_NAME, message);
            logger.info("Successfully sent {} schedule(s) to SQS, with originatingId={}", schedules.size(),
                    originatingIds);
        } catch (Exception e) {
            logger.error("Failed to send the following schedule(s) to SQS=" + originatingIds, e);
        }

    }

    // NO_REDRIVE ensures we do not re-queue messages forever. They will be sent to a DLQ if they exceed maxReceiveCount
    @SqsListener(value = "SCHEDULES", deletionPolicy = SqsMessageDeletionPolicy.NO_REDRIVE)
    private void receiveMessage(List<Schedule> schedules) String partnerId) {
        List<String> originatingIds = schedules.stream().map(Schedule::getOriginatingId).collect(Collectors.toList());
        logger.info("Received request from SQS for originatingId={}", originatingIds);
        try {
            someService.createSchedules(schedules);
        } catch (Exception e) {
            throw new RuntimeException("An issue occurred during ingest for originatingId=" + originatingIds, e);
        }
    }

}

我还尝试了aws-自动配置的依赖,但这增加了大量额外的噪音,我仍然无法让它从SQS消费。希望有人能发现我在哪里搞砸了/错过了什么东西。我从spring直接看到的文档指向我做正确的事情,但很明显,事实并非如此。

在我将消息发送到队列后,我可以看到它正在等待被消耗,但什么也没有发生。任何帮助都是非常感谢的。

EN

回答 1

Stack Overflow用户

发布于 2022-01-28 10:15:25

添加spring自动配置依赖项:

代码语言:javascript
复制
 <dependency>
      <groupId>io.awspring.cloud</groupId>
      <artifactId>spring-cloud-aws-autoconfigure</artifactId>
 </dependency>

https://docs.awspring.io/spring-cloud-aws/docs/current/reference/html/index.html#maven-dependencies

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69623894

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档