首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >将对象并发地序列化/从数据库序列化/反序列化

将对象并发地序列化/从数据库序列化/反序列化
EN

Code Review用户
提问于 2015-07-13 14:35:39
回答 1查看 1.6K关注 0票数 5

我有一个类,它在容器中作为spring运行,它的目的是数据持久性。在overriden方法public void executeDataSyncOperation(DataSyncOperation[] operations)中,我们可以选择将数据保存在operations数组中。但是输入的数据流非常大,而且速度很快,我们希望它的执行是并行的,这样当发生致命的异常时,我们就不会产生大量的数据积压,而这些数据没有被持久保存。我应该提到的一件事是,一旦调用了executeDataSyncOperation方法,任何其他线程都不会修改operations。所以我想出了以下的想法;

我实现了使用workManager的逻辑,这将在每次调用上面的方法时进行实例化,而这些工作管理器每个都实例化n个线程,然后通过从原始数组创建的列表创建的迭代器。下面是我的实现,你能不能告诉我这个方法的缺点,或者有没有更好的方法。

CustomDataSyncEndPoint.java (类作为spring运行,每当数据被传递用于持久性时都调用它的方法)

代码语言:javascript
复制
public class CustomDataSyncEndPoint extends SpaceSynchronizationEndpoint {

    private final Logger logger = Logger.getLogger(CustomDataSyncEndPoint.class.getName());

    private DataSource dataSource;

    private List<PersistenceWorkManager> workManagers;

    private Thread workManagerMonitor;

    private volatile boolean stopProcessing;

    private boolean isTraceLoggingEnabled;

    public CustomDataSyncEndPoint(DataSource dataSource) {
        this.dataSource = dataSource;
        workManagers = new ArrayList<PersistenceWorkManager>();
        isTraceLoggingEnabled = logger.isLoggable(Level.FINER);
    }

    @PostConstruct
    private void init() {
        stopProcessing = false;
        workManagerMonitor = new Thread(new WorkManagerMonitor(), "workManagerMonitor");
        workManagerMonitor.setDaemon(true);
        workManagerMonitor.start();
    }

    @PreDestroy
    private void destroy() {
        stopProcessing = true;
        try {
            Thread.sleep(5000);
            if (workManagerMonitor.isAlive()) {
                workManagerMonitor.interrupt();
            }
            for (PersistenceWorkManager workManager : workManagers) {
                workManager.stopProcessing();
            }
        } catch (InterruptedException e) {
            // Ignore this as we are cleaning the resources anyway
        }
    }

    @Override
    public void onTransactionSynchronization(TransactionData transactionData) {
        executeDataSyncOperation(transactionData.getTransactionParticipantDataItems());
    }

    @Override
    public void onOperationsBatchSynchronization(OperationsBatchData batchData) {
        executeDataSyncOperation(batchData.getBatchDataItems());
    }

    public void executeDataSyncOperation(DataSyncOperation[] operations) {
        if (operations == null || operations.length == 0)
            return;

        // if number of operations are greater than five then only start multiple runnables otherwise just start one runnable
        int numOfRunnables = (operations.length < 5) ? 1 : 5;

        if (isTraceLoggingEnabled)
            logger.finer("execution sync operation with " + operations.length + " dataitems");
        PersistenceWorkManager workManager = new PersistenceWorkManager(numOfRunnables, operations, dataSource);
        workManagers.add(workManager);
    }

    /**
     * WorkManagerMonitor runnable class. This class keeps polling the list of 
     * workManagers to check if they have finished processing. If so it will remove 
     * it from the list.
     * @author desai
     *
     */
    private class WorkManagerMonitor implements Runnable {

        @Override
        public void run() {
            if (isTraceLoggingEnabled)
                logger.finer("started workMangerMonitor");
            try {
                while (!stopProcessing) {
                    for (PersistenceWorkManager workManager : workManagers) {
                        if (workManager.hasFinishedPersisting()) {
                            workManagers.remove(workManager);
                        }
                    }
                    // TODO: The polling interval should be set by spring property
                    Thread.sleep(180 * 1000);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

PersistenceWorkManager.java

代码语言:javascript
复制
public class PersistenceWorkManager implements Serializable {

    private static final long serialVersionUID = 3503180330888692087L;

    private int numOfRunnables;

    private List<DataSyncOperation> datasyncOperation;

    private List<Thread> persitenceRunnableThread;

    private Iterator<DataSyncOperation> iterator;

    private DataSource dataSource;

    private volatile boolean finishedPersisting = false;

    private boolean isTraceLoggingEnabled;

    private static final Logger logger = Logger.getLogger(PersistenceWorkManager.class.getName());

    public PersistenceWorkManager(int numOfRunnables, DataSyncOperation[] datasyncOperation, DataSource dataSource) {
        this.numOfRunnables = numOfRunnables;
        this.datasyncOperation = Arrays.asList(datasyncOperation);
        iterator = this.datasyncOperation.iterator();
        this.dataSource = dataSource;
        persitenceRunnableThread = new ArrayList<Thread>();
        isTraceLoggingEnabled = logger.isLoggable(Level.FINER);
        init();
    }

    public void setFinishedPersisting(boolean finishedPersisting) {
        this.finishedPersisting = finishedPersisting;
    }

    public boolean hasFinishedPersisting() {
        return finishedPersisting;
    }

    // @PostConstruct
    public void init() {
        if (isTraceLoggingEnabled)
            logger.info("Initialising workmanager with " + numOfRunnables + " runnables");
        for (int i = 0; i < numOfRunnables; i++) {
            PersistenceRunnable persistenceRunnable = new PersistenceRunnable(this, "runnable-" + i, dataSource);
            Thread t = new Thread(persistenceRunnable, "persistenceRunnable-" + i);
            persitenceRunnableThread.add(t);
            t.start();
        }
    }

    void stopProcessing() {
        for (Thread thread : persitenceRunnableThread) {
            if (thread.isAlive() || !thread.isInterrupted()) {
                thread.interrupt();
            }
        }
    }

    public synchronized boolean hasNext() {
        return iterator.hasNext();
    }

    public synchronized DataSyncOperation next() {
        return iterator.next();
    }
}

PersistenceRunnable.java

代码语言:javascript
复制
public class PersistenceRunnable implements Runnable {

    private JdbcTemplate jdbcTemplate;

    private static final Logger logger = Logger.getLogger(PersistenceRunnable.class.getName());

    private String threadName;

    private PersistenceWorkManager workManager;

    private boolean isTraceLoggingEnabled;

    public PersistenceRunnable(PersistenceWorkManager workManager, String threadName, DataSource dataSource) {
        this.workManager = workManager;
        this.threadName = threadName;
        jdbcTemplate = new JdbcTemplate(dataSource);
        isTraceLoggingEnabled = logger.isLoggable(Level.FINER);
    }

    public JdbcTemplate getJdbcTemplate() {
        return jdbcTemplate;
    }

    @Override
    synchronized public void run() {
        if (isTraceLoggingEnabled)
            logger.finer("running runnable " + threadName);
        while (workManager.hasNext()) {
            DataSyncOperation dataSyncOperation = workManager.next();
            Object obj = dataSyncOperation.getDataAsObject();
            if (obj instanceof Data) {
                Data data = (Data) obj;
                switch (dataSyncOperation.getDataSyncOperationType()) {
                case WRITE:
                    executeWrite(data);
                    break;
                case UPDATE:
                case PARTIAL_UPDATE:
                    executeUpdateIfExists(data);
                    break;
                case REMOVE:
                    executeRemoveIfExists(data);
                    break;
                case CHANGE:
                    executeUpdateIfExists(data);
                    break;
                default:
                    logger.warning("Unsupported opeartion " + dataSyncOperation.getDataSyncOperationType().toString()
                            + " sent to mirror pu!!!");
                    break;
                }
            } else {
                logger.warning("Unsupported class sent to mirror service");
                logger.warning("class type was not Data");
                logger.warning("Class type: " + obj.getClass().getName() + ", " + obj.getClass().getSimpleName());
            }
        }
        workManager.setFinishedPersisting(true);
        workManager.stopProcessing();
        if (isTraceLoggingEnabled)
            logger.info("Finished processing from runnable " + threadName);
    }

    private void executeWrite(Data data) {
        if (isTraceLoggingEnabled)
            logger.finer(threadName + " executing write on " + data.toString());
        if (!existsInDatabase(data)) {
            String sql = "INSERT INTO data (Id, obj_name, Object) VALUES (?, ?, ?)";
            getJdbcTemplate().update(sql, new Object[] { data.getMessageUID(), data.getClass().getName(), data });
        }
    }

    private void executeRemoveIfExists(Data data) {
        if (existsInDatabase(data)) {
            if (isTraceLoggingEnabled)
                logger.finer("executing remove on " + data.toString());
            String sql = "DELETE FROM data WHERE Id = ?";
            getJdbcTemplate().update(sql, new Object[] { data.getMessageUID() });
        }
    }

    private void executeUpdateIfExists(Data data) {

        if (existsInDatabase(data)) {
            if (isTraceLoggingEnabled)
                logger.finer("executing update on " + data.toString());
            String sql = "UPDATE data SET Object = ? where Id = ?";
            getJdbcTemplate().update(sql, new Object[] { data, data.getMessageUID() });
        }
    }

    private boolean existsInDatabase(Data data) {
        String sql = "select count(*) from data where Id = ?";
        int count = getJdbcTemplate().queryForObject(sql, new Object[] { data.getMessageUID() }, Integer.class);
        if (count == 1)
            return true;
        return false;
    }
}

以这种方式持久化数据的原因是因为类类型Data是泛型的,而且所有子类都有不同和复杂的数据类型,我们不希望为每个子类创建单独的表。

注意:这需要在Java 7上运行,因此无法使用Java 8的任何高级特性。

EN

回答 1

Code Review用户

回答已采纳

发布于 2015-07-14 14:15:22

还有一个更复杂的建议--您应该真正尝试使用java.util.concurrent API。这会让你的生活更轻松。一个小小的重构会导致:

代码语言:javascript
复制
public class DataPersistor implements Runnable {

    private static final Logger LOGGER = Logger.getLogger(DataPersistor.class.getName());

    private final boolean isTraceLoggingEnabled = LOGGER.isLoggable(Level.FINER);

    private final DataSyncOperation dataSyncOperation;

    private final JdbcTemplate jdbcTemplate;

    public DataPersistor(final DataSyncOperation dataSyncOperation, final DataSource dataSource) {
        this.dataSyncOperation = dataSyncOperation;
        this.jdbcTemplate = new JdbcTemplate(dataSource);
    }

    @Override
    public void run() {
        if (this.isTraceLoggingEnabled) {
            LOGGER.finer("Executing " + this.dataSyncOperation);
        }

        final Object obj = this.dataSyncOperation.getDataAsObject();
        if (obj instanceof Data) {
            final Data data = (Data) obj;
            switch (this.dataSyncOperation.getDataSyncOperationType()) {
                case WRITE:
                    executeWrite(data);
                    break;
                case UPDATE:
                case PARTIAL_UPDATE:
                    executeUpdateIfExists(data);
                    break;
                case REMOVE:
                    executeRemoveIfExists(data);
                    break;
                case CHANGE:
                    executeUpdateIfExists(data);
                    break;
                default:
                    LOGGER.warning("Unsupported operation " + this.dataSyncOperation.getDataSyncOperationType().toString() + " sent to mirror service!!!");
                    break;
            }
        } else {
            LOGGER.warning("Unsupported class sent to mirror service");
            LOGGER.warning("class type was not Data");
            LOGGER.warning("Class type: " + obj.getClass().getName() + ", " + obj.getClass().getSimpleName());
        }

        if (this.isTraceLoggingEnabled) {
            LOGGER.info("Finished executing " + this.dataSyncOperation);
        }
    }

    private void executeWrite(final Data data) {
        if (!existsInDatabase(data)) {
            String sql = "INSERT INTO data (Id, obj_name, Object) VALUES (?, ?, ?)";
            this.jdbcTemplate.update(sql, new Object[] { data.getMessageUID(), data.getClass().getName(), data });
        }
    }

    private void executeRemoveIfExists(final Data data) {
        if (existsInDatabase(data)) {
            String sql = "DELETE FROM data WHERE Id = ?";
            this.jdbcTemplate.update(sql, new Object[] { data.getMessageUID() });
        }
    }

    private void executeUpdateIfExists(final Data data) {
        if (existsInDatabase(data)) {
            String sql = "UPDATE data SET Object = ? where Id = ?";
            this.jdbcTemplate.update(sql, new Object[] { data, data.getMessageUID() });
        }
    }

    private boolean existsInDatabase(final Data data) {
        final String sql = "select count(*) from data where Id = ?";
        final int count = this.jdbcTemplate.queryForObject(sql, new Object[] { data.getMessageUID() }, Integer.class);
        return count == 1;
    }
}

代码语言:javascript
复制
public final class CustomDataSyncEndPoint extends SpaceSynchronizationEndpoint {

    private final Logger logger = Logger.getLogger(CustomDataSyncEndPoint.class.getName());

    private final boolean isTraceLoggingEnabled = this.logger.isLoggable(Level.FINER);

    private final Set<ExecutorService> executorServices = new HashSet<>();

    private final DataSource dataSource;

    private Thread workManagerMonitor;

    public CustomDataSyncEndPoint(final DataSource dataSource) {
        this.dataSource = dataSource;
    }

    @PostConstruct
    private void init() {
        this.workManagerMonitor = new Thread(new ExecutorServiceMonitor(180, TimeUnit.SECONDS), "workManagerMonitor");
        this.workManagerMonitor.setDaemon(true);
        this.workManagerMonitor.start();
    }

    @PreDestroy
    private void destroy() {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            Thread.interrupted();
        }

        if (this.workManagerMonitor.isAlive()) {
            this.workManagerMonitor.interrupt();
        }

        for (final ExecutorService executorService : this.executorServices) {
            executorService.shutdownNow();
        }
    }

    @Override
    public void onTransactionSynchronization(final TransactionData transactionData) {
        executeDataSyncOperation(transactionData.getTransactionParticipantDataItems());
    }

    @Override
    public void onOperationsBatchSynchronization(final OperationsBatchData batchData) {
        executeDataSyncOperation(batchData.getBatchDataItems());
    }

    public void executeDataSyncOperation(final DataSyncOperation[] operations) {
        if ((operations == null) || (operations.length == 0)) {
            return;
        }

        if (this.isTraceLoggingEnabled) {
            this.logger.finer("executing sync operation with " + operations.length + " data items");
        }

        final int numberOfThreads = (operations.length < 5) ? 1 : 5;
        final ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads, new CountingThreadFactory());
        for (final DataSyncOperation dataSyncOperation : operations) {
            executorService.submit(new DataPersistor(dataSyncOperation, this.dataSource));
        }
        executorService.shutdown(); // free up resources after all tasks are completed.
        this.executorServices.add(executorService);
    }

    /**
     * Polls the set of <code>ExecutorServices</code> and removes any that have terminated.
     * 
     * @author desai
     */
    private class ExecutorServiceMonitor implements Runnable {

        private final long sleepMillis;

        public ExecutorServiceMonitor(final long sleepTime, final TimeUnit timeUnit) {
            this.sleepMillis = timeUnit.toMillis(sleepTime);
        }

        @Override
        public void run() {
            try {
                Thread.sleep(this.sleepMillis);
            } catch (final InterruptedException e) {
                return;
            }

            for (final Iterator<ExecutorService> iterator = executorServices.iterator(); iterator.hasNext();) {
                final ExecutorService executorService = iterator.next();
                if (executorService.isTerminated()) {
                    iterator.remove();
                }
            }
        }
    }

    protected static final class CountingThreadFactory implements ThreadFactory {

        private static int threadNumber = 0;

        @Override
        public Thread newThread(final Runnable r) {
            final Thread thread = new Thread("Persistor-" + threadNumber);
            threadNumber++;
            return thread;
        }

    }
}

如果你愿意把所有的DataPersistors放在同一个Executor中,你也可以去掉ExecutorServiceMonitor类和它的所有装束。另一个更重的选项是使用ThreadPoolExecutor的自定义扩展。您可以扩展像beforeExecute()afterExecute()这样的方法来进行日志记录,并且(如果使用多个执行器)扩展terminated(),让执行器从集合中删除自己。

我也同意@h.j.k。该SLF4J将是一个更好的日志记录框架,但您的手可能被绑在那里。

票数 2
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/96779

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档