首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Spring集成- WIth AWS S3 (重试策略)

Spring集成- WIth AWS S3 (重试策略)
EN

Stack Overflow用户
提问于 2022-07-06 08:36:20
回答 1查看 140关注 0票数 0

我正在使用AWS S3创建一个简单的集成服务。当出现异常时,我正面临一些困难。

我的要求是定期轮询一个S3桶,并在新将文件放入S3桶中时应用一些转换。下面的代码片段可以正常工作,,但是当异常发生时,它会继续一次又一次地重试。我不想这样。能在这里帮助我吗?

IntegrationFlow的定义如下。

代码语言:javascript
复制
    @Configuration
    public class S3Routes {
    
        @Bean
        public IntegrationFlow downloadFlow(MessageSource<InputStream> s3InboundStreamingMessageSource) {
    
            return IntegrationFlows.from(s3InboundStreamingMessageSource)
                                   .channel("s3Channel")
                                   .handle("QueryServiceImpl", "processFile")
                

                   .get();
    }

}

配置文件如下所示。

代码语言:javascript
复制
@Service
public class S3AppConfiguration {

    @Bean
    @InboundChannelAdapter(value = "s3Channel")
    public MessageSource<InputStream> s3InboundStreamingMessageSource(S3RemoteFileTemplate template) {

        S3StreamingMessageSource messageSource = new S3StreamingMessageSource(template);
        messageSource.setRemoteDirectory("my-bucket-name");
        messageSource.setFilter(new S3PersistentAcceptOnceFileListFilter(new SimpleMetadataStore(),
                                                                         "streaming"));

        return messageSource;
    }

    @Bean
    public PollableChannel s3Channel() {
        return new QueueChannel();
    }

    @Bean
    public S3RemoteFileTemplate template(AmazonS3 amazonS3) {
        return new S3RemoteFileTemplate(new S3SessionFactory(amazonS3));
    }

    @Bean(name = "amazonS3")
    public AmazonS3 nonProdAmazonS3(BasicAWSCredentials basicAWSCredentials) {
        ClientConfiguration config = new ClientConfiguration();
        config.setProxyHost("localhost");
        config.setProxyPort(3128);

        return AmazonS3ClientBuilder.standard().withRegion(Regions.fromName("ap-southeast-1"))
                                    .withCredentials(new AWSStaticCredentialsProvider(basicAWSCredentials))
                                    .withClientConfiguration(config)
                                    .build();
    }

    @Bean
    public BasicAWSCredentials basicAWSCredentials() {
        return new BasicAWSCredentials("access_key", "secret_key");
    }

    @Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerMetadata nonProdPoller() {

        return Pollers.cron("* */2 * * * *")
                      .get();
    }
}

我在这里使用的AcceptOnceFileList过滤器帮助我防止处理同一个文件进行连续重试。但是,我不想使用AcceptOnceFileList过滤器,因为当第一次尝试没有处理一个文件时,我希望在下一个Poll上重试(通常每1小时在Prod区域发生一次)。当处理失败时,我尝试使用filter.remove()方法(在任何异常情况下),它再次导致连续重试。

我不知道如何在失败时禁用连续重试。我应该在哪里配置它?

我看了一下Spring Integration ( Retry Strategy)。相同的场景,但是不同的集成。我不知道如何为我的IntegrationFlow设置这个。有人能帮忙吗?提前感谢

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-07-06 15:12:28

  1. 这个故事是不同的:它谈论的是一个用于AMQP的侦听器容器。您使用的是源轮询通道适配器--方法可能有所不同。

  1. 您可以创建两个源轮询通道适配器:一个通过该@InboundChannelAdapter,另一个通过IntegrationFlows.from(s3InboundStreamingMessageSource)。它们都向同一个通道产生数据。不确定这是否真的是故意的。

  1. 不清楚在您的情况下重试是什么,除非您真正执行手动filter.remove()调用。在这种情况下,它确实要重新尝试。但这是一个单一的,没有控制的重试。只有当您再次调用该filter.remove()时,它才会再次重试。那么,如果你自己做每件事,为什么要问这个问题呢?

  1. 考虑使用为该handle()配置的RequestHandlerRetryAdvicehttps://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#message-handler-advice-chain。这样,您实际上只需要提取远程文件一次,重试将由Spring API.

管理。

更新

所以,在学习了一些Cron表情之后,我意识到你的观点是错误的:

* */2 * * * * -意思是指每隔一分钟的每一秒

一定是这样的:

0 */2 * * * * -在每一分钟的开头

也许类似的东西是与您的hourly cron表达式的触角..。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72880304

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档