首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >查看远程目录中添加的文件,并对其进行流以读取SFTP上的数据。

查看远程目录中添加的文件,并对其进行流以读取SFTP上的数据。
EN

Stack Overflow用户
提问于 2019-04-30 04:47:43
回答 1查看 2.1K关注 0票数 1

我想为新添加的CSV文件或未读的文件在远程机器上添加一个手表。一旦文件被识别,根据它们的时间戳读取它,时间戳将在文件名中。该文件将使用流读取,而不是处理本地计算机。当文件被读取时,将_reading附加到文件名,并在文件读取后追加_read。该文件将通过sftp协议读取,我计划使用。如果在读取文件或数据时出错,根据预期,我希望将该文件移到子目录中。

我尝试轮询远程目录并读取一次CSV文件。一旦读取,我将从目录中删除该文件。

代码语言:javascript
复制
 <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-sftp</artifactId>
            <version>5.1.0.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-core</artifactId>
            <version>5.0.6.RELEASE</version>
        </dependency>

Spring boot version 2.0.3.RELEASE   
代码语言:javascript
复制
 @Bean
    public SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost(hostname);
        factory.setPort(22);
        factory.setUser(username);
        factory.setPassword(password);
        factory.setAllowUnknownKeys(true);
        return new CachingSessionFactory<ChannelSftp.LsEntry>(factory);
    }

    @Bean
    public MessageSource<InputStream> sftpMessageSource() {
        SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(template());
        messageSource.setRemoteDirectory(path);
        messageSource.setFilter(compositeFilters());
        return messageSource;
    }

    public CompositeFileListFilter compositeFilters() {
        return new CompositeFileListFilter()
                .addFilter(new SftpRegexPatternFileListFilter(".*csv"));
    }

    @Bean
    public SftpRemoteFileTemplate template() {
        return new SftpRemoteFileTemplate(sftpSessionFactory());
    }

    @Bean
    public IntegrationFlow sftpOutboundListFlow() {
        return IntegrationFlows.from(this.sftpMessageSource(), e -> e.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)))
                .handle(Sftp.outboundGateway(template(), NLST, path).options(Option.RECURSIVE)))
                .filter(compositeFilters())
                .transform(sorter())
                .split()
                .handle(Sftp.outboundGateway(template(), GET, "headers['file_remoteDirectory'] + headers['file_remoteFile']").options(STREAM))
                .transform(csvToPojoTransformer())
                .handle(service())
                .handle(Sftp.outboundGateway(template(), MV, "headers['file_remoteDirectory'] + headers['file_remoteFile'] + _read"))
                .handle(after())
                .get();
    }

    @Bean
    public MessageHandler sorter() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                List<String> fileNames = (List<String>) message.getPayload();
                Collections.sort(fileNames);
            }
        };
    }

    @Bean
    public MessageHandler csvToPojoTransformer() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                InputStream streamData = (InputStream) message.getPayload();
                convertStreamtoObject(streamData, Class.class);
            }
        };
    }

    public List<?> convertStreamtoObject(InputStream inputStream, Class clazz) {
        HeaderColumnNameMappingStrategy ms = new HeaderColumnNameMappingStrategy();
        ms.setType(clazz);
        Reader reader = new InputStreamReader(inputStream);

        CsvToBean cb = new CsvToBeanBuilder(reader)
                .withType(clazz)
                .withMappingStrategy(ms)
                .withSkipLines(0)
                .withSeparator('|')
                .withThrowExceptions(true)
                .build();
        return cb.parse();
    }

    @Bean
    public MessageHandler service() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                List<Class> csvDataAsListOfPojo = List < Class > message.getPayload();
                // use this
            }
        };
    }
    @Bean
    public ExpressionEvaluatingRequestHandlerAdvice after() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setSuccessChannelName("success.input");
        advice.setOnSuccessExpressionString("payload + ' was successful'");
        advice.setFailureChannelName("failure.input");
        advice.setOnFailureExpressionString("payload + ' was bad, with reason: ' + #exception.cause.message");
        advice.setTrapException(true);
        return advice;
    }
    @Bean
    public IntegrationFlow success() {
        return f -> f.handle(System.out::println);
    }

    @Bean
    public IntegrationFlow failure() {
        return f -> f.handle(System.out::println);
    }

更新代码

EN

回答 1

Stack Overflow用户

发布于 2019-04-30 12:49:10

对于复杂的场景(列表、移动、获取、删除等),您应该使用替代SFTP远程文件网关

SFTP出站网关提供了一组有限的命令,允许您与远程SFTP服务器交互: ls (列表文件) nlst (列表文件名) 获取(检索文件) mget (检索多个文件) rm (删除文件) mv (移动和重命名文件) 放(发文件) mput (发送多个文件)

或者直接从代码中使用SftpRemoteFileTemplate

编辑

作为对你的评论的回应,你需要这样的东西

  • 入站通道适配器(带poller) -返回目录名
  • LS网关
  • 筛选器(删除已获取的所有文件)
  • 变压器(排序列表)
  • 分离器
  • 获取网关(流选项)
  • 变压器(csv到POJO)
  • 服务(process )

如果你加上

  • RM网关

在您的服务之后(要删除远程文件),您不需要筛选步骤。

您可能会发现Java DSL更容易组装这个流..。

代码语言:javascript
复制
@Bean
public IntegrationFlow flow() {
    return IntegrationFlows.from(() -> "some/dir", e -> e.poller(...))
        .handle(...) // LS Gateway
        .filter(...)
        .transform(sorter())
        .split
        .handle(...) // GET Gateway
        .transform(csvToPojoTransformer())
        .handle(myService())
        .get()
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/55914129

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档