首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Spring-Integration-SFTP下载和处理文件

Spring-Integration-SFTP下载和处理文件
EN

Stack Overflow用户
提问于 2014-10-07 11:01:45
回答 1查看 3K关注 0票数 1

我使用了下面的建议,并能够从远程服务器下载文件。如果我想在下载完成后处理这些文件,那么我该如何处理呢?一旦文件下载到我的本地系统,请有人指导我如何/在哪里获得该控件。

spring integration + cron + quartz in cluster?

ApplicationContext.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:int="http://www.springframework.org/schema/integration"
    xmlns:int-sftp="http://www.springframework.org/schema/integration/sftp"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
        http://www.springframework.org/schema/integration/sftp http://www.springframework.org/schema/integration/sftp/spring-integration-sftp.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd">    

    <context:component-scan base-package="com.reports"/>    

    <context:property-placeholder location="classpath:reports.properties"/> 

    <bean class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory"
        id="sftpSessionFactory">
        <property name="host" value="${sftp.host}"/>
        <property name="privateKey" value="${sftp.key.location}"/>
        <property name="port" value="${sftp.port}"/>
        <property name="user" value="${sftp.user}"/>
    </bean> 

    <int-sftp:inbound-channel-adapter 
        id="sftpInboundAdapter"
        auto-startup="false"
        auto-create-local-directory="false"
        channel="receiveChannel"
        delete-remote-files="false"
        filter = "customfilter"
        remote-file-separator="/"
        local-directory="${local.directory}"
        remote-directory="${remote.directory}"
        session-factory="sftpSessionFactory"
        preserve-timestamp="true"
        local-filter="acceptAll">

        <int:poller trigger="sftp-trigger"/>
    </int-sftp:inbound-channel-adapter>

    <int:channel id="receiveChannel">
        <int:queue/>
    </int:channel>

    <bean id="customfilter" class="com.reports.filters.ReportsFileListFilter"/> 
    <bean id="acceptAll" class="org.springframework.integration.file.filters.AcceptAllFileListFilter" />        
    <bean id="sftp-trigger" class="com.reports.jobs.ReportsTrigger"/>

    <!-- Quartz job --> 
    <!--  (1) Trigger - This is used in the Scheduler below --> 
    <bean id="ReportCronTrigger" class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
        <property name="jobDetail" ref="ReportsJob"/>
        <property name="cronExpression" value="${cron.expression}"/>
    </bean>

    <!-- (2) Job -->
    <bean name="ReportsJob" class="org.springframework.scheduling.quartz.JobDetailFactoryBean">
        <property name="jobClass" value="com.reports.jobs.ReportsJob"/>
        <property name="name" value="ReportsJob" />
        <property name="durability" value="true" />
    </bean>

    <!-- (3) Scheduler -->
    <bean class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
        <property name="schedulerContextAsMap">
            <map>               
                <entry key="inputEndpoint"><ref bean="sftpInboundAdapter" /></entry>
                <entry key="inputEndpointTrigger"><ref bean="sftp-trigger" /></entry>
            </map>
        </property>
        <property name="dataSource" ref="dataSource" />
        <property name="overwriteExistingJobs" value="true" />      
        <property name="quartzProperties">
            <props>
                <prop key="org.quartz.scheduler.instanceName">ReportsBatchScheduler</prop>
                <prop key="org.quartz.scheduler.instanceId">AUTO</prop>
                <prop key="org.quartz.jobStore.misfireThreshold">60000</prop>
                <prop key="org.quartz.jobStore.class">org.quartz.impl.jdbcjobstore.JobStoreTX</prop>
                <prop key="org.quartz.jobStore.driverDelegateClass">org.quartz.impl.jdbcjobstore.StdJDBCDelegate</prop>
                <prop key="org.quartz.jobStore.tablePrefix">dbo.QRTZ_</prop>
                <prop key="org.quartz.jobStore.isClustered">true</prop>
                <prop key="org.quartz.threadPool.class">org.quartz.simpl.SimpleThreadPool</prop>
                <prop key="org.quartz.threadPool.threadCount">1</prop>
                <prop key="org.quartz.threadPool.threadPriority">5</prop>
                <prop key="org.quartz.jobStore.selectWithLockSQL">SELECT * FROM {0}LOCKS UPDLOCK WHERE LOCK_NAME = ?</prop>
            </props>
        </property>
        <property name="triggers">
            <list>
                <ref bean="ReportCronTrigger" />
            </list>
        </property>
    </bean> 

    <bean id="dataSource" class="org.springframework.jndi.JndiObjectFactoryBean">
        <property name="jndiName"><value>java:/MSSQLDS_APP</value></property>
    </bean>

</beans>

自定义滤波器

代码语言:javascript
复制
public class ReportsFileListFilter extends AbstractFileListFilter<LsEntry> {    
    SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
    @Override
    protected boolean accept(LsEntry file) {
        if (".".equals(file.getFilename()) || "..".equals(file.getFilename())) {
            return false;
        }
        String dt = sdf.format(new Date());         
        if (file != null && file.getFilename().contains(dt)) {
            Log.logDebug(ReportsFileListFilter.class, "Downloading File :: "+file.getFilename());               
            return true;
        }
        return false;       
    }
}

触发器

代码语言:javascript
复制
public class ReportsTrigger implements Trigger {
    boolean done;

    public Date nextExecutionTime(TriggerContext triggerContext) {        
        if (done) {
            return null;
        }
        done = true;
        Log.logDebug(ReportsTrigger.class, "Job started for date :: "+new Date());        
        return new Date();        
    }

    public void reset() {
        Log.logDebug(ReportsTrigger.class, "Reset is called");   
        done = false;
    }
}

石英作业

代码语言:javascript
复制
public class ReportsJob extends QuartzJobBean {

    private AbstractEndpoint inputEndpoint;
    private ReportsTrigger inputEndpointTrigger;    

    public void setInputEndpoint(final AbstractEndpoint pInputEndpoint) {
        this.inputEndpoint = pInputEndpoint;
    }
    public void setInputEndpointTrigger(final ReportsTrigger pInputEndpointTrigger) {
        this.inputEndpointTrigger = pInputEndpointTrigger;
    }
    @Override
    protected void executeInternal(final JobExecutionContext pParamJobExecutionContext)
    throws JobExecutionException {      
        inputEndpoint.stop();
        inputEndpointTrigger.reset();
        inputEndpoint.start();
        Log.logDebug(ReportsJob.class, "Job runnnnnnning");
    }
}
EN

回答 1

Stack Overflow用户

发布于 2014-10-07 11:50:38

实际上,我想解析下载的文件并将它们插入数据库表中。

井。由于FTP适配器轮询远程目录,将这些文件存储到本地目录,并以Message.payload (File对象)的形式逐个发送这些文件,因此在下游流中对每个文件进行自定义处理都不会停止。

例如,您可以使用<int-file:file-to-string-transformer>并提供一些进一步的自定义解析器作为<transformer ref="">

最后,只需要处理<int-jdbc:outbound-channel-adapter>就可以将数据从payload存储到DB。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/26234503

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档