我有一个类,它在容器中作为spring运行,它的目的是数据持久性。在overriden方法public void executeDataSyncOperation(DataSyncOperation[] operations)中,我们可以选择将数据保存在operations数组中。但是输入的数据流非常大,而且速度很快,我们希望它的执行是并行的,这样当发生致命的异常时,我们就不会产生大量的数据积压,而这些数据没有被持久保存。我应该提到的一件事是,一旦调用了executeDataSyncOperation方法,任何其他线程都不会修改operations。所以我想出了以下的想法;
我实现了使用workManager的逻辑,这将在每次调用上面的方法时进行实例化,而这些工作管理器每个都实例化n个线程,然后通过从原始数组创建的列表创建的迭代器。下面是我的实现,你能不能告诉我这个方法的缺点,或者有没有更好的方法。
CustomDataSyncEndPoint.java (类作为spring运行,每当数据被传递用于持久性时都调用它的方法)
public class CustomDataSyncEndPoint extends SpaceSynchronizationEndpoint {
private final Logger logger = Logger.getLogger(CustomDataSyncEndPoint.class.getName());
private DataSource dataSource;
private List<PersistenceWorkManager> workManagers;
private Thread workManagerMonitor;
private volatile boolean stopProcessing;
private boolean isTraceLoggingEnabled;
public CustomDataSyncEndPoint(DataSource dataSource) {
this.dataSource = dataSource;
workManagers = new ArrayList<PersistenceWorkManager>();
isTraceLoggingEnabled = logger.isLoggable(Level.FINER);
}
@PostConstruct
private void init() {
stopProcessing = false;
workManagerMonitor = new Thread(new WorkManagerMonitor(), "workManagerMonitor");
workManagerMonitor.setDaemon(true);
workManagerMonitor.start();
}
@PreDestroy
private void destroy() {
stopProcessing = true;
try {
Thread.sleep(5000);
if (workManagerMonitor.isAlive()) {
workManagerMonitor.interrupt();
}
for (PersistenceWorkManager workManager : workManagers) {
workManager.stopProcessing();
}
} catch (InterruptedException e) {
// Ignore this as we are cleaning the resources anyway
}
}
@Override
public void onTransactionSynchronization(TransactionData transactionData) {
executeDataSyncOperation(transactionData.getTransactionParticipantDataItems());
}
@Override
public void onOperationsBatchSynchronization(OperationsBatchData batchData) {
executeDataSyncOperation(batchData.getBatchDataItems());
}
public void executeDataSyncOperation(DataSyncOperation[] operations) {
if (operations == null || operations.length == 0)
return;
// if number of operations are greater than five then only start multiple runnables otherwise just start one runnable
int numOfRunnables = (operations.length < 5) ? 1 : 5;
if (isTraceLoggingEnabled)
logger.finer("execution sync operation with " + operations.length + " dataitems");
PersistenceWorkManager workManager = new PersistenceWorkManager(numOfRunnables, operations, dataSource);
workManagers.add(workManager);
}
/**
* WorkManagerMonitor runnable class. This class keeps polling the list of
* workManagers to check if they have finished processing. If so it will remove
* it from the list.
* @author desai
*
*/
private class WorkManagerMonitor implements Runnable {
@Override
public void run() {
if (isTraceLoggingEnabled)
logger.finer("started workMangerMonitor");
try {
while (!stopProcessing) {
for (PersistenceWorkManager workManager : workManagers) {
if (workManager.hasFinishedPersisting()) {
workManagers.remove(workManager);
}
}
// TODO: The polling interval should be set by spring property
Thread.sleep(180 * 1000);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}public class PersistenceWorkManager implements Serializable {
private static final long serialVersionUID = 3503180330888692087L;
private int numOfRunnables;
private List<DataSyncOperation> datasyncOperation;
private List<Thread> persitenceRunnableThread;
private Iterator<DataSyncOperation> iterator;
private DataSource dataSource;
private volatile boolean finishedPersisting = false;
private boolean isTraceLoggingEnabled;
private static final Logger logger = Logger.getLogger(PersistenceWorkManager.class.getName());
public PersistenceWorkManager(int numOfRunnables, DataSyncOperation[] datasyncOperation, DataSource dataSource) {
this.numOfRunnables = numOfRunnables;
this.datasyncOperation = Arrays.asList(datasyncOperation);
iterator = this.datasyncOperation.iterator();
this.dataSource = dataSource;
persitenceRunnableThread = new ArrayList<Thread>();
isTraceLoggingEnabled = logger.isLoggable(Level.FINER);
init();
}
public void setFinishedPersisting(boolean finishedPersisting) {
this.finishedPersisting = finishedPersisting;
}
public boolean hasFinishedPersisting() {
return finishedPersisting;
}
// @PostConstruct
public void init() {
if (isTraceLoggingEnabled)
logger.info("Initialising workmanager with " + numOfRunnables + " runnables");
for (int i = 0; i < numOfRunnables; i++) {
PersistenceRunnable persistenceRunnable = new PersistenceRunnable(this, "runnable-" + i, dataSource);
Thread t = new Thread(persistenceRunnable, "persistenceRunnable-" + i);
persitenceRunnableThread.add(t);
t.start();
}
}
void stopProcessing() {
for (Thread thread : persitenceRunnableThread) {
if (thread.isAlive() || !thread.isInterrupted()) {
thread.interrupt();
}
}
}
public synchronized boolean hasNext() {
return iterator.hasNext();
}
public synchronized DataSyncOperation next() {
return iterator.next();
}
}public class PersistenceRunnable implements Runnable {
private JdbcTemplate jdbcTemplate;
private static final Logger logger = Logger.getLogger(PersistenceRunnable.class.getName());
private String threadName;
private PersistenceWorkManager workManager;
private boolean isTraceLoggingEnabled;
public PersistenceRunnable(PersistenceWorkManager workManager, String threadName, DataSource dataSource) {
this.workManager = workManager;
this.threadName = threadName;
jdbcTemplate = new JdbcTemplate(dataSource);
isTraceLoggingEnabled = logger.isLoggable(Level.FINER);
}
public JdbcTemplate getJdbcTemplate() {
return jdbcTemplate;
}
@Override
synchronized public void run() {
if (isTraceLoggingEnabled)
logger.finer("running runnable " + threadName);
while (workManager.hasNext()) {
DataSyncOperation dataSyncOperation = workManager.next();
Object obj = dataSyncOperation.getDataAsObject();
if (obj instanceof Data) {
Data data = (Data) obj;
switch (dataSyncOperation.getDataSyncOperationType()) {
case WRITE:
executeWrite(data);
break;
case UPDATE:
case PARTIAL_UPDATE:
executeUpdateIfExists(data);
break;
case REMOVE:
executeRemoveIfExists(data);
break;
case CHANGE:
executeUpdateIfExists(data);
break;
default:
logger.warning("Unsupported opeartion " + dataSyncOperation.getDataSyncOperationType().toString()
+ " sent to mirror pu!!!");
break;
}
} else {
logger.warning("Unsupported class sent to mirror service");
logger.warning("class type was not Data");
logger.warning("Class type: " + obj.getClass().getName() + ", " + obj.getClass().getSimpleName());
}
}
workManager.setFinishedPersisting(true);
workManager.stopProcessing();
if (isTraceLoggingEnabled)
logger.info("Finished processing from runnable " + threadName);
}
private void executeWrite(Data data) {
if (isTraceLoggingEnabled)
logger.finer(threadName + " executing write on " + data.toString());
if (!existsInDatabase(data)) {
String sql = "INSERT INTO data (Id, obj_name, Object) VALUES (?, ?, ?)";
getJdbcTemplate().update(sql, new Object[] { data.getMessageUID(), data.getClass().getName(), data });
}
}
private void executeRemoveIfExists(Data data) {
if (existsInDatabase(data)) {
if (isTraceLoggingEnabled)
logger.finer("executing remove on " + data.toString());
String sql = "DELETE FROM data WHERE Id = ?";
getJdbcTemplate().update(sql, new Object[] { data.getMessageUID() });
}
}
private void executeUpdateIfExists(Data data) {
if (existsInDatabase(data)) {
if (isTraceLoggingEnabled)
logger.finer("executing update on " + data.toString());
String sql = "UPDATE data SET Object = ? where Id = ?";
getJdbcTemplate().update(sql, new Object[] { data, data.getMessageUID() });
}
}
private boolean existsInDatabase(Data data) {
String sql = "select count(*) from data where Id = ?";
int count = getJdbcTemplate().queryForObject(sql, new Object[] { data.getMessageUID() }, Integer.class);
if (count == 1)
return true;
return false;
}
}以这种方式持久化数据的原因是因为类类型Data是泛型的,而且所有子类都有不同和复杂的数据类型,我们不希望为每个子类创建单独的表。
注意:这需要在Java 7上运行,因此无法使用Java 8的任何高级特性。
发布于 2015-07-14 14:15:22
还有一个更复杂的建议--您应该真正尝试使用java.util.concurrent API。这会让你的生活更轻松。一个小小的重构会导致:
public class DataPersistor implements Runnable {
private static final Logger LOGGER = Logger.getLogger(DataPersistor.class.getName());
private final boolean isTraceLoggingEnabled = LOGGER.isLoggable(Level.FINER);
private final DataSyncOperation dataSyncOperation;
private final JdbcTemplate jdbcTemplate;
public DataPersistor(final DataSyncOperation dataSyncOperation, final DataSource dataSource) {
this.dataSyncOperation = dataSyncOperation;
this.jdbcTemplate = new JdbcTemplate(dataSource);
}
@Override
public void run() {
if (this.isTraceLoggingEnabled) {
LOGGER.finer("Executing " + this.dataSyncOperation);
}
final Object obj = this.dataSyncOperation.getDataAsObject();
if (obj instanceof Data) {
final Data data = (Data) obj;
switch (this.dataSyncOperation.getDataSyncOperationType()) {
case WRITE:
executeWrite(data);
break;
case UPDATE:
case PARTIAL_UPDATE:
executeUpdateIfExists(data);
break;
case REMOVE:
executeRemoveIfExists(data);
break;
case CHANGE:
executeUpdateIfExists(data);
break;
default:
LOGGER.warning("Unsupported operation " + this.dataSyncOperation.getDataSyncOperationType().toString() + " sent to mirror service!!!");
break;
}
} else {
LOGGER.warning("Unsupported class sent to mirror service");
LOGGER.warning("class type was not Data");
LOGGER.warning("Class type: " + obj.getClass().getName() + ", " + obj.getClass().getSimpleName());
}
if (this.isTraceLoggingEnabled) {
LOGGER.info("Finished executing " + this.dataSyncOperation);
}
}
private void executeWrite(final Data data) {
if (!existsInDatabase(data)) {
String sql = "INSERT INTO data (Id, obj_name, Object) VALUES (?, ?, ?)";
this.jdbcTemplate.update(sql, new Object[] { data.getMessageUID(), data.getClass().getName(), data });
}
}
private void executeRemoveIfExists(final Data data) {
if (existsInDatabase(data)) {
String sql = "DELETE FROM data WHERE Id = ?";
this.jdbcTemplate.update(sql, new Object[] { data.getMessageUID() });
}
}
private void executeUpdateIfExists(final Data data) {
if (existsInDatabase(data)) {
String sql = "UPDATE data SET Object = ? where Id = ?";
this.jdbcTemplate.update(sql, new Object[] { data, data.getMessageUID() });
}
}
private boolean existsInDatabase(final Data data) {
final String sql = "select count(*) from data where Id = ?";
final int count = this.jdbcTemplate.queryForObject(sql, new Object[] { data.getMessageUID() }, Integer.class);
return count == 1;
}
}和
public final class CustomDataSyncEndPoint extends SpaceSynchronizationEndpoint {
private final Logger logger = Logger.getLogger(CustomDataSyncEndPoint.class.getName());
private final boolean isTraceLoggingEnabled = this.logger.isLoggable(Level.FINER);
private final Set<ExecutorService> executorServices = new HashSet<>();
private final DataSource dataSource;
private Thread workManagerMonitor;
public CustomDataSyncEndPoint(final DataSource dataSource) {
this.dataSource = dataSource;
}
@PostConstruct
private void init() {
this.workManagerMonitor = new Thread(new ExecutorServiceMonitor(180, TimeUnit.SECONDS), "workManagerMonitor");
this.workManagerMonitor.setDaemon(true);
this.workManagerMonitor.start();
}
@PreDestroy
private void destroy() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
Thread.interrupted();
}
if (this.workManagerMonitor.isAlive()) {
this.workManagerMonitor.interrupt();
}
for (final ExecutorService executorService : this.executorServices) {
executorService.shutdownNow();
}
}
@Override
public void onTransactionSynchronization(final TransactionData transactionData) {
executeDataSyncOperation(transactionData.getTransactionParticipantDataItems());
}
@Override
public void onOperationsBatchSynchronization(final OperationsBatchData batchData) {
executeDataSyncOperation(batchData.getBatchDataItems());
}
public void executeDataSyncOperation(final DataSyncOperation[] operations) {
if ((operations == null) || (operations.length == 0)) {
return;
}
if (this.isTraceLoggingEnabled) {
this.logger.finer("executing sync operation with " + operations.length + " data items");
}
final int numberOfThreads = (operations.length < 5) ? 1 : 5;
final ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads, new CountingThreadFactory());
for (final DataSyncOperation dataSyncOperation : operations) {
executorService.submit(new DataPersistor(dataSyncOperation, this.dataSource));
}
executorService.shutdown(); // free up resources after all tasks are completed.
this.executorServices.add(executorService);
}
/**
* Polls the set of <code>ExecutorServices</code> and removes any that have terminated.
*
* @author desai
*/
private class ExecutorServiceMonitor implements Runnable {
private final long sleepMillis;
public ExecutorServiceMonitor(final long sleepTime, final TimeUnit timeUnit) {
this.sleepMillis = timeUnit.toMillis(sleepTime);
}
@Override
public void run() {
try {
Thread.sleep(this.sleepMillis);
} catch (final InterruptedException e) {
return;
}
for (final Iterator<ExecutorService> iterator = executorServices.iterator(); iterator.hasNext();) {
final ExecutorService executorService = iterator.next();
if (executorService.isTerminated()) {
iterator.remove();
}
}
}
}
protected static final class CountingThreadFactory implements ThreadFactory {
private static int threadNumber = 0;
@Override
public Thread newThread(final Runnable r) {
final Thread thread = new Thread("Persistor-" + threadNumber);
threadNumber++;
return thread;
}
}
}如果你愿意把所有的DataPersistors放在同一个Executor中,你也可以去掉ExecutorServiceMonitor类和它的所有装束。另一个更重的选项是使用ThreadPoolExecutor的自定义扩展。您可以扩展像beforeExecute()和afterExecute()这样的方法来进行日志记录,并且(如果使用多个执行器)扩展terminated(),让执行器从集合中删除自己。
我也同意@h.j.k。该SLF4J将是一个更好的日志记录框架,但您的手可能被绑在那里。
https://codereview.stackexchange.com/questions/96779
复制相似问题