情况就是这样。我必须在一个时间间隔内轮询一个ftp服务器并获取csv文件。然后,必须解析这些CSV文件,并将其作为输入发送到某些业务逻辑。我就是这样做的。
FTPClientPolling (制片人)
public class FTPClientPolling {
private static FTPClientPolling instance = null;
private FTPClientPolling() {
}
public synchronized static FTPClientPolling getInstance() {
if (instance == null) {
logger.info("Object created for Client Polling");
instance = new FTPClientPolling();
initializeFTPClient();
}
return instance;
}
public static void initializeFTPClient() {
// initialize the values from properties file
}
public void startPolling() {
FTPClient ftpClient = null;
try {
//connecting to ftp server
//iterating the files in it
FTPFile[] filesList = ftpClient.listFiles();
for (FTPFile tmpFile : filesList) {
//..
File tempFile = File.createTempFile(tmpFile.getName(), null);
FileOutputStream fileOut = new FileOutputStream(tempFile);
ftpClient.retrieveFile(tmpFile.getName(), fileOut);
//adding the file to the Queue of the file processor
FileProcessor.getInstance().getFilesToBeProcessedQueue().add(tempFile);
}
if (ftpClient.isConnected())
ftpClient.disconnect();
} catch (Exception e) {
//logging
} finally {
//closing ftpclient
}
}
}FTPClientPollingTasker (生产者塔斯克)
public class FTPClientTasker extends TimerTask {
private static Long timeInterval = 10000l;
@Override
public void run() {
FTPClientPolling.getInstance().startPolling();
}
public static void start() {
TimerTask timerTask = new FTPClientTasker();
Timer timer = new Timer();
timer.scheduleAtFixedRate(timerTask, timeInterval, timeInterval);
}
public static void main(String[] args) {
start();
}
}FileProcessor (消费者)
public final class FileProcessor {
private static FileProcessor instance = null;
private Queue<File> filesToBeProcessedQueue = new ArrayBlockingQueue<File>(10);
private FileProcessor() {
}
public synchronized static FileProcessor getInstance() {
if (instance == null) {
instance = new FileProcessor();
}
return instance;
}
public void run() {
while (!filesToBeProcessedQueue.isEmpty()) {
processSyncFiles(filesToBeProcessedQueue.poll());
}
}
private void processSyncFiles(File inputFile) {
try {
HashMap<String, Boolean> outputConsolidation = new HashMap<String, Boolean>();
FileReader fileReader = new FileReader(inputFile);
List<InputBean> csvContentsList = CSVParser.readContentsFromCSV(fileReader, new InputBean());
for (InputBean inputBean : csvContentsList) {
boolean output = false;
// some business logic
outputConsolidation.put(inputBean.toString(), output);
}
} catch (Exception e) {
//logging
}
}
public synchronized Queue<File> getFilesToBeProcessedQueue() {
return filesToBeProcessedQueue;
}
}FileProcessor Tasker (消费计划程序)这个类为FileProcessor创建一个Tasker,并在计划的时间间隔内运行它。
public final class FileProcessorTasker extends TimerTask {
private static Long timeInterval = 5000l;
@Override
public void run() {
FileProcessor.getInstance().run();
}
public static void start() {
TimerTask timerTask = new FileProcessorTasker();
Timer timer = new Timer();
timer.schedule(timerTask, timeInterval, timeInterval);
}
public static void main(String[] args) {
FileProcessorTasker.start();
}
}这两个程序各自运行得很好。但是,当通过filesToBeProcessedQueue连接在一起时,它似乎不起作用。问题是FTPClientPolling创建了一个FileProcessor对象并将文件添加到队列中。但是 FileProcessorTasker创建了FileProcessor的另一个对象,其队列大小为零。这两个不同的对象就是问题所在。当类是singleton时,如何创建两个对象。我在单例实现中遗漏了什么吗?
发布于 2016-02-26 09:55:11
首先,不要使用Timer和TimerTask。使用ExecutorService进行多线程处理.
并在Singleton类中使用急切的初始化。或者对null进行双重检查锁定,以便使singleton真正独立。
FTPClientPolling.java
public class FTPClientPolling {
private static FTPClientPolling instance = new FTPClientPolling();
private FTPClientPolling() {
logger.info("Object created for Client Polling");
initializeFTPClient();
}
public static FTPClientPolling getInstance() {
return instance;
}
public static void initializeFTPClient() {
// initialize the values from properties file
}
public void startPolling() {
FTPClient ftpClient = null;
try {
//connecting to ftp server
//iterating the files in it
FTPFile[] filesList = ftpClient.listFiles();
for (FTPFile tmpFile : filesList) {
//..
File tempFile = File.createTempFile(tmpFile.getName(), null);
FileOutputStream fileOut = new FileOutputStream(tempFile);
ftpClient.retrieveFile(tmpFile.getName(), fileOut);
//adding the file to the Queue of the file processor
FileProcessor.getInstance().getFilesToBeProcessedQueue().add(tempFile);
}
if (ftpClient.isConnected())
ftpClient.disconnect();
} catch (Exception e) {
//logging
} finally {
//closing ftpclient
}
}
}FileProcessor.java
public final class FileProcessor {
private static FileProcessor instance = new FileProcessor();
private Queue<File> filesToBeProcessedQueue = new ArrayBlockingQueue<File>(10);
private FileProcessor() {
}
public static FileProcessor getInstance() {
return instance;
}
public void run() {
while (!filesToBeProcessedQueue.isEmpty()) {
processSyncFiles(filesToBeProcessedQueue.poll());
}
}
private void processSyncFiles(File inputFile) {
try {
HashMap<String, Boolean> outputConsolidation = new HashMap<String, Boolean>();
FileReader fileReader = new FileReader(inputFile);
List<InputBean> csvContentsList = CSVParser.readContentsFromCSV(fileReader, new InputBean());
for (InputBean inputBean : csvContentsList) {
boolean output = false;
// some business logic
outputConsolidation.put(inputBean.toString(), output);
}
} catch (Exception e) {
//logging
}
}
public synchronized Queue<File> getFilesToBeProcessedQueue() {
return filesToBeProcessedQueue;
}
}有关更多信息,请阅读此帖子。
https://stackoverflow.com/questions/35648184
复制相似问题