我正在使用AWS S3创建一个简单的集成服务。当出现异常时,我正面临一些困难。
我的要求是定期轮询一个S3桶,并在新将文件放入S3桶中时应用一些转换。下面的代码片段可以正常工作,,但是当异常发生时,它会继续一次又一次地重试。我不想这样。能在这里帮助我吗?
IntegrationFlow的定义如下。
@Configuration
public class S3Routes {
@Bean
public IntegrationFlow downloadFlow(MessageSource<InputStream> s3InboundStreamingMessageSource) {
return IntegrationFlows.from(s3InboundStreamingMessageSource)
.channel("s3Channel")
.handle("QueryServiceImpl", "processFile")
.get();
}
}配置文件如下所示。
@Service
public class S3AppConfiguration {
@Bean
@InboundChannelAdapter(value = "s3Channel")
public MessageSource<InputStream> s3InboundStreamingMessageSource(S3RemoteFileTemplate template) {
S3StreamingMessageSource messageSource = new S3StreamingMessageSource(template);
messageSource.setRemoteDirectory("my-bucket-name");
messageSource.setFilter(new S3PersistentAcceptOnceFileListFilter(new SimpleMetadataStore(),
"streaming"));
return messageSource;
}
@Bean
public PollableChannel s3Channel() {
return new QueueChannel();
}
@Bean
public S3RemoteFileTemplate template(AmazonS3 amazonS3) {
return new S3RemoteFileTemplate(new S3SessionFactory(amazonS3));
}
@Bean(name = "amazonS3")
public AmazonS3 nonProdAmazonS3(BasicAWSCredentials basicAWSCredentials) {
ClientConfiguration config = new ClientConfiguration();
config.setProxyHost("localhost");
config.setProxyPort(3128);
return AmazonS3ClientBuilder.standard().withRegion(Regions.fromName("ap-southeast-1"))
.withCredentials(new AWSStaticCredentialsProvider(basicAWSCredentials))
.withClientConfiguration(config)
.build();
}
@Bean
public BasicAWSCredentials basicAWSCredentials() {
return new BasicAWSCredentials("access_key", "secret_key");
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata nonProdPoller() {
return Pollers.cron("* */2 * * * *")
.get();
}
}我在这里使用的AcceptOnceFileList过滤器帮助我防止处理同一个文件进行连续重试。但是,我不想使用AcceptOnceFileList过滤器,因为当第一次尝试没有处理一个文件时,我希望在下一个Poll上重试(通常每1小时在Prod区域发生一次)。当处理失败时,我尝试使用filter.remove()方法(在任何异常情况下),它再次导致连续重试。
我不知道如何在失败时禁用连续重试。我应该在哪里配置它?
我看了一下Spring Integration ( Retry Strategy)。相同的场景,但是不同的集成。我不知道如何为我的IntegrationFlow设置这个。有人能帮忙吗?提前感谢
发布于 2022-07-06 15:12:28
@InboundChannelAdapter,另一个通过IntegrationFlows.from(s3InboundStreamingMessageSource)。它们都向同一个通道产生数据。不确定这是否真的是故意的。filter.remove()调用。在这种情况下,它确实要重新尝试。但这是一个单一的,没有控制的重试。只有当您再次调用该filter.remove()时,它才会再次重试。那么,如果你自己做每件事,为什么要问这个问题呢?handle()配置的RequestHandlerRetryAdvice:https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#message-handler-advice-chain。这样,您实际上只需要提取远程文件一次,重试将由Spring API.管理。
更新
所以,在学习了一些Cron表情之后,我意识到你的观点是错误的:
* */2 * * * * -意思是指每隔一分钟的每一秒
一定是这样的:
0 */2 * * * * -在每一分钟的开头
也许类似的东西是与您的hourly cron表达式的触角..。
https://stackoverflow.com/questions/72880304
复制相似问题