我目前使用的是来自spring integration aws的S3StreamingMessageSource。我将流传递给一个集成流。
public MessageSource<InputStream> s3InboundStreamingMessageSource() {
S3StreamingMessageSource messageSource = new S3StreamingMessageSource(template());
messageSource.setRemoteDirectory(bucketName);
messageSource.setFilter(new S3PersistentAcceptOnceFileListFilter(new SimpleMetadataStore(),
"streaming"));
return messageSource;
}
@Bean
public IntegrationFlow s3IntegrationFlow() {
return IntegrationFlows.from(s3InboundStreamingMessageSource(), spec -> spec.poller(Pollers.fixedDelay(10, TimeUnit.SECONDS)))
.transform(new S3ObjectInputStreamToStringTransformer())
.transform(Transformers.toJson())
.handle(Http.outboundGateway("http://localhost:8099/create").httpMethod(HttpMethod.POST).extractPayload(true))
.channel("nullChannel")
.get();
}如何从S3中删除检索到的远程文件?
在S3InboundFileSynchronizer中有一种方法可以做到这一点。
如下所示:
@Bean
public S3InboundFileSynchronizer s3InboundFileSynchronizer() {
S3InboundFileSynchronizer synchronizer = new S3InboundFileSynchronizer(factory);
synchronizer.setDeleteRemoteFiles(true);
synchronizer.setPreserveTimestamp(true);
synchronizer.setRemoteDirectory(bucketName);
return synchronizer;
}有没有人能帮我或者告诉我一个好的解决办法?
发布于 2020-07-10 22:02:06
对于流通道适配器,没有远程文件的本地副本,因此我们无法猜测您将如何处理远程文件的InputStream。这就是为什么S3StreamingMessageSource上没有setDeleteRemoteFiles()的原因。
我看到你这样做了,S3ObjectInputStreamToStringTransformer。请告诉我,这个定制变压器的原因是什么?已经有一个StreamTransformer,并且使用它的charset选项,远程文件的InputStream将被转换为字符串:
/**
* Construct an instance with the charset to convert the stream to a
* String; if null a {@code byte[]} will be produced instead.
* @param charset the charset.
*/
public StreamTransformer(String charset) {另外:需要记住,在读取InputStream之后必须关闭StaticMessageHeaderAccessor.getCloseableResource(message),以避免资源泄漏。
您可能应该考虑使用handle()来调用AmazonS3.deleteObject(String bucketName, String key) .channel("nullChannel"),而不是使用API。bucketName分别存储在FileHeaders.REMOTE_DIRECTORY和key的FileHeaders.REMOTE_FILE头中。
https://stackoverflow.com/questions/62833264
复制相似问题