我想为新添加的CSV文件或未读的文件在远程机器上添加一个手表。一旦文件被识别,根据它们的时间戳读取它,时间戳将在文件名中。该文件将使用流读取,而不是处理本地计算机。当文件被读取时,将_reading附加到文件名,并在文件读取后追加_read。该文件将通过sftp协议读取,我计划使用。如果在读取文件或数据时出错,根据预期,我希望将该文件移到子目录中。
我尝试轮询远程目录并读取一次CSV文件。一旦读取,我将从目录中删除该文件。
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-sftp</artifactId>
<version>5.1.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
<version>5.0.6.RELEASE</version>
</dependency>
Spring boot version 2.0.3.RELEASE @Bean
public SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost(hostname);
factory.setPort(22);
factory.setUser(username);
factory.setPassword(password);
factory.setAllowUnknownKeys(true);
return new CachingSessionFactory<ChannelSftp.LsEntry>(factory);
}
@Bean
public MessageSource<InputStream> sftpMessageSource() {
SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(template());
messageSource.setRemoteDirectory(path);
messageSource.setFilter(compositeFilters());
return messageSource;
}
public CompositeFileListFilter compositeFilters() {
return new CompositeFileListFilter()
.addFilter(new SftpRegexPatternFileListFilter(".*csv"));
}
@Bean
public SftpRemoteFileTemplate template() {
return new SftpRemoteFileTemplate(sftpSessionFactory());
}
@Bean
public IntegrationFlow sftpOutboundListFlow() {
return IntegrationFlows.from(this.sftpMessageSource(), e -> e.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)))
.handle(Sftp.outboundGateway(template(), NLST, path).options(Option.RECURSIVE)))
.filter(compositeFilters())
.transform(sorter())
.split()
.handle(Sftp.outboundGateway(template(), GET, "headers['file_remoteDirectory'] + headers['file_remoteFile']").options(STREAM))
.transform(csvToPojoTransformer())
.handle(service())
.handle(Sftp.outboundGateway(template(), MV, "headers['file_remoteDirectory'] + headers['file_remoteFile'] + _read"))
.handle(after())
.get();
}
@Bean
public MessageHandler sorter() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
List<String> fileNames = (List<String>) message.getPayload();
Collections.sort(fileNames);
}
};
}
@Bean
public MessageHandler csvToPojoTransformer() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
InputStream streamData = (InputStream) message.getPayload();
convertStreamtoObject(streamData, Class.class);
}
};
}
public List<?> convertStreamtoObject(InputStream inputStream, Class clazz) {
HeaderColumnNameMappingStrategy ms = new HeaderColumnNameMappingStrategy();
ms.setType(clazz);
Reader reader = new InputStreamReader(inputStream);
CsvToBean cb = new CsvToBeanBuilder(reader)
.withType(clazz)
.withMappingStrategy(ms)
.withSkipLines(0)
.withSeparator('|')
.withThrowExceptions(true)
.build();
return cb.parse();
}
@Bean
public MessageHandler service() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
List<Class> csvDataAsListOfPojo = List < Class > message.getPayload();
// use this
}
};
}
@Bean
public ExpressionEvaluatingRequestHandlerAdvice after() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setSuccessChannelName("success.input");
advice.setOnSuccessExpressionString("payload + ' was successful'");
advice.setFailureChannelName("failure.input");
advice.setOnFailureExpressionString("payload + ' was bad, with reason: ' + #exception.cause.message");
advice.setTrapException(true);
return advice;
}
@Bean
public IntegrationFlow success() {
return f -> f.handle(System.out::println);
}
@Bean
public IntegrationFlow failure() {
return f -> f.handle(System.out::println);
}更新代码
发布于 2019-04-30 12:49:10
对于复杂的场景(列表、移动、获取、删除等),您应该使用替代SFTP远程文件网关。
SFTP出站网关提供了一组有限的命令,允许您与远程SFTP服务器交互: ls (列表文件) nlst (列表文件名) 获取(检索文件) mget (检索多个文件) rm (删除文件) mv (移动和重命名文件) 放(发文件) mput (发送多个文件)
或者直接从代码中使用SftpRemoteFileTemplate。
编辑
作为对你的评论的回应,你需要这样的东西
如果你加上
在您的服务之后(要删除远程文件),您不需要筛选步骤。
您可能会发现Java DSL更容易组装这个流..。
@Bean
public IntegrationFlow flow() {
return IntegrationFlows.from(() -> "some/dir", e -> e.poller(...))
.handle(...) // LS Gateway
.filter(...)
.transform(sorter())
.split
.handle(...) // GET Gateway
.transform(csvToPojoTransformer())
.handle(myService())
.get()
}https://stackoverflow.com/questions/55914129
复制相似问题