我希望使用Spring从Unix位置读取.txt文件,而不将其复制到本地,这应该在连续模式下进行,即当一个新文件出现时,应该检测和读取该文件。
代码:
@SpringBootApplication
public class SftpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public SessionFactory<LsEntry> sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost("ip");
factory.setPort(port);
factory.setUser("user");
factory.setPassword("pwd");
factory.setAllowUnknownKeys(true);
return new CachingSessionFactory<LsEntry>(factory);
}
@Bean
@Transformer(inputChannel = "stream",outputChannel="data")
public org.springframework.integration.transformer.Transformer transformer () {
return new org.springframework.integration.transformer.StreamTransformer("UTF-8");
}
@Bean
@InboundChannelAdapter(value = "stream", poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))
public MessageSource<InputStream> ftpMessageSource() {
SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(template(), null);
messageSource.setRemoteDirectory("/test1/test2/test3");
messageSource.setFilter(new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(),
"streaming"));
return messageSource;
}
@Bean
public SftpRemoteFileTemplate template() {
return new SftpRemoteFileTemplate(sftpSessionFactory());
}
@Bean
@ServiceActivator(inputChannel = "data" )
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(">>>>>>>>>>>>>"+message.getPayload()); //this prints the data in the file
}
};
}}
属地:
compile("org.springframework.cloud:spring-cloud-spring-service-connector:1.2.1.RELEASE")
compile("org.springframework.cloud:spring-cloud-cloudfoundry-connector:1.2.1.RELEASE")
compile("org.springframework.boot:spring-boot-starter-integration")
compile group: 'com.jcraft', name: 'jsch', version: '0.1.44-1'
compile group: 'org.springframework.integration', name: 'spring-integration-sftp', version: '4.3.1.RELEASE'
compile group: 'org.springframework.integration', name: 'spring-integration-file', version: '4.3.1.RELEASE'
compile('org.kie.modules:org-apache-commons-lang3:6.3.0.Final')
compile("com.h2database:h2:1.4.192")编译组:‘org.Spring integration work.Integration’,名称:'spring-integration-core',版本:'4.3.1.RELEASE‘
堆栈跟踪:
Caused by: org.springframework.core.NestedIOException: Failed to list files; nested exception is 2: No such file
at org.springframework.integration.sftp.session.SftpSession.list(SftpSession.java:104)
at org.springframework.integration.sftp.session.SftpSession.list(SftpSession.java:50)
at org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession.list(CachingSessionFactory.java:218)
at org.springframework.integration.file.remote.RemoteFileTemplate$6.doInSession(RemoteFileTemplate.java:417)
at org.springframework.integration.file.remote.RemoteFileTemplate$6.doInSession(RemoteFileTemplate.java:413)
at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:435)
... 24 more
Caused by: 2: No such file
at com.jcraft.jsch.ChannelSftp.throwStatusError(ChannelSftp.java:2297)
at com.jcraft.jsch.ChannelSftp._stat(ChannelSftp.java:1750)
at com.jcraft.jsch.ChannelSftp._stat(ChannelSftp.java:1767)
at com.jcraft.jsch.ChannelSftp.ls(ChannelSftp.java:1205)
at org.springframework.integration.sftp.session.SftpSession.list(SftpSession.java:92)
... 29 more发布于 2016-08-31 13:20:41
从Spring 4.3开始,远程文件支持(FTP/SFTP)提供了streaming适配器:
<int-ftp:inbound-streaming-channel-adapter id="ftpInbound"
channel="ftpChannel"
session-factory="sessionFactory"
filename-pattern="*.txt"
filename-regex=".*\.txt"
filter="filter"
remote-file-separator="/"
comparator="comparator"
remote-directory-expression="'foo/bar'">
<int:poller fixed-rate="1000" />
</int-ftp:inbound-streaming-channel-adapter>适配器
但!我们不能“在连续模式下”这样做,因为(S)FTP不是事件驱动的协议。因此,我们仍然应该定期轮询远程目录。
如果您真的知道如何让它侦听远程目录中的某些事件,我们将非常高兴在中有这样一个组件。
编辑
当您升级Spring的依赖项时,请确保所有模块都在同一个版本中,您的版本控制不是很好。
spring-integration-file也应该是4.3.1.RELEASE。
你根本不需要它。它是spring-integration-sftp的一个传递依赖项。同样,您也不需要spring-integration-core,因为它是所有它们的传递依赖关系。
EDIT2
我只想知道过滤器.It中使用的“流”一词的重要性,因为它并没有阻止文件在我的末尾读取。但为了知识的目的,我想知道
Streaming适配器的目的是让目标应用程序不创建本地文件副本。只需在内存中直接从远程源读取数据。因此,您仍然可以读取文件,但这是一个远程文件。不是吗?
关于PCF的错误。
很高兴在这件事上与我们分享StackTrace。从另一边,请尝试使用allowUnknownKeys = true作为DefaultSftpSessionFactory。并在参考手册中阅读更多内容。
https://stackoverflow.com/questions/39244760
复制相似问题