首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >像场景这样的生产者消费者模型不起作用。

像场景这样的生产者消费者模型不起作用。
EN

Stack Overflow用户
提问于 2016-02-26 09:41:09
回答 1查看 31关注 0票数 0

情况就是这样。我必须在一个时间间隔内轮询一个ftp服务器并获取csv文件。然后,必须解析这些CSV文件,并将其作为输入发送到某些业务逻辑。我就是这样做的。

FTPClientPolling (制片人)

代码语言:javascript
复制
public class FTPClientPolling {

    private static FTPClientPolling instance = null;

    private FTPClientPolling() {
    }

    public synchronized static FTPClientPolling getInstance() {
        if (instance == null) {
            logger.info("Object created for Client Polling");
            instance = new FTPClientPolling();
            initializeFTPClient();
        }
        return instance;
    }

    public static void initializeFTPClient() {
        // initialize the values from properties file
    }

    public void startPolling() {

        FTPClient ftpClient = null;
        try {
            //connecting to ftp server

            //iterating the files in it
            FTPFile[] filesList = ftpClient.listFiles();
            for (FTPFile tmpFile : filesList) {
                //..
                File tempFile = File.createTempFile(tmpFile.getName(), null);
                FileOutputStream fileOut = new FileOutputStream(tempFile);
                ftpClient.retrieveFile(tmpFile.getName(), fileOut);

                //adding the file to the Queue of the file processor
                FileProcessor.getInstance().getFilesToBeProcessedQueue().add(tempFile);
            }

            if (ftpClient.isConnected())
                ftpClient.disconnect();
        } catch (Exception e) {
            //logging
        } finally {
            //closing ftpclient
        }

    }
}

FTPClientPollingTasker (生产者塔斯克)

代码语言:javascript
复制
public class FTPClientTasker extends TimerTask {
    private static Long timeInterval = 10000l;

    @Override
    public void run() {
        FTPClientPolling.getInstance().startPolling();
    }

    public static void start() {
        TimerTask timerTask = new FTPClientTasker();
        Timer timer = new Timer();
        timer.scheduleAtFixedRate(timerTask, timeInterval, timeInterval);
    }

    public static void main(String[] args) {
        start();
    }
}

FileProcessor (消费者)

代码语言:javascript
复制
public final class FileProcessor {

    private static FileProcessor instance = null;
    private Queue<File> filesToBeProcessedQueue = new ArrayBlockingQueue<File>(10);

    private FileProcessor() {
    }

    public synchronized static FileProcessor getInstance() {
        if (instance == null) {
            instance = new FileProcessor();
        }
        return instance;
    }

    public void run() {
        while (!filesToBeProcessedQueue.isEmpty()) {
            processSyncFiles(filesToBeProcessedQueue.poll());
        }
    }

    private void processSyncFiles(File inputFile) {
        try {
            HashMap<String, Boolean> outputConsolidation = new HashMap<String, Boolean>();

            FileReader fileReader = new FileReader(inputFile);
            List<InputBean> csvContentsList = CSVParser.readContentsFromCSV(fileReader, new InputBean());
            for (InputBean inputBean : csvContentsList) {
                boolean output = false;
                // some business logic
                outputConsolidation.put(inputBean.toString(), output);
            }
        } catch (Exception e) {
            //logging
        }
    }

    public synchronized Queue<File> getFilesToBeProcessedQueue() {
        return filesToBeProcessedQueue;
    }
}

FileProcessor Tasker (消费计划程序)这个类为FileProcessor创建一个Tasker,并在计划的时间间隔内运行它。

代码语言:javascript
复制
public final class FileProcessorTasker extends TimerTask {

    private static Long timeInterval = 5000l;

    @Override
    public void run() {
        FileProcessor.getInstance().run();
    }

    public static void start() {
        TimerTask timerTask = new FileProcessorTasker();
        Timer timer = new Timer();
        timer.schedule(timerTask, timeInterval, timeInterval);
    }

    public static void main(String[] args) {
        FileProcessorTasker.start();
    }
}

这两个程序各自运行得很好。但是,当通过filesToBeProcessedQueue连接在一起时,它似乎不起作用。问题是FTPClientPolling创建了一个FileProcessor对象并将文件添加到队列中。但是 FileProcessorTasker创建了FileProcessor的另一个对象,其队列大小为零。这两个不同的对象就是问题所在。当类是singleton时,如何创建两个对象。我在单例实现中遗漏了什么吗?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2016-02-26 09:55:11

首先,不要使用TimerTimerTask。使用ExecutorService进行多线程处理.

并在Singleton类中使用急切的初始化。或者对null进行双重检查锁定,以便使singleton真正独立。

FTPClientPolling.java

代码语言:javascript
复制
public class FTPClientPolling {

    private static FTPClientPolling instance = new FTPClientPolling();

    private FTPClientPolling() {
        logger.info("Object created for Client Polling");
        initializeFTPClient();
    }

    public static FTPClientPolling getInstance() {
        return instance;
    }

    public static void initializeFTPClient() {
        // initialize the values from properties file
    }

    public void startPolling() {

        FTPClient ftpClient = null;
        try {
            //connecting to ftp server

            //iterating the files in it
            FTPFile[] filesList = ftpClient.listFiles();
            for (FTPFile tmpFile : filesList) {
                //..
                File tempFile = File.createTempFile(tmpFile.getName(), null);
                FileOutputStream fileOut = new FileOutputStream(tempFile);
                ftpClient.retrieveFile(tmpFile.getName(), fileOut);

                //adding the file to the Queue of the file processor
                FileProcessor.getInstance().getFilesToBeProcessedQueue().add(tempFile);
            }

            if (ftpClient.isConnected())
                ftpClient.disconnect();
        } catch (Exception e) {
            //logging
        } finally {
            //closing ftpclient
        }

    }
}

FileProcessor.java

代码语言:javascript
复制
public final class FileProcessor {

    private static FileProcessor instance = new FileProcessor();
    private Queue<File> filesToBeProcessedQueue = new ArrayBlockingQueue<File>(10);

    private FileProcessor() {
    }

    public static FileProcessor getInstance() {
        return instance;
    }

    public void run() {
        while (!filesToBeProcessedQueue.isEmpty()) {
            processSyncFiles(filesToBeProcessedQueue.poll());
        }
    }

    private void processSyncFiles(File inputFile) {
        try {
            HashMap<String, Boolean> outputConsolidation = new HashMap<String, Boolean>();

            FileReader fileReader = new FileReader(inputFile);
            List<InputBean> csvContentsList = CSVParser.readContentsFromCSV(fileReader, new InputBean());
            for (InputBean inputBean : csvContentsList) {
                boolean output = false;
                // some business logic
                outputConsolidation.put(inputBean.toString(), output);
            }
        } catch (Exception e) {
            //logging
        }
    }

    public synchronized Queue<File> getFilesToBeProcessedQueue() {
        return filesToBeProcessedQueue;
    }
}

有关更多信息,请阅读此帖子

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/35648184

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档