首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >暂停并恢复并发数据库更新队列

暂停并恢复并发数据库更新队列
EN

Code Review用户
提问于 2017-08-14 18:05:52
回答 2查看 686关注 0票数 4

我有一个类DatabaseQueue,它异步运行从LinkedBlockingQueue获取的SQLite更新。

我为这个类实现了一个暂停机制,理想情况下,它应该尽快停止队列的执行,然后在执行更多查询之前等待它被取消暂停。

我对并发编程非常陌生,但我用多个ReentrantLock's和Condition's成功地完成了这个工作,虽然这看起来非常冗长和笨拙,尤其是所有的锁()、‘1996和解锁()’。

此外,我也不喜欢为了让executor线程继续其循环而不得不执行额外的虚拟查询。有什么更好的方式来实现这一点呢?

代码语言:javascript
复制
import java.sql.Connection;
import java.sql.SQLException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class DatabaseQueue {

    private static final LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
    private static final QueueExecutor executor = new QueueExecutor();

    private static boolean locked = false;
    private static ReentrantLock lockAttempted = new ReentrantLock();
    private static ReentrantLock lockAcquired = new ReentrantLock();
    private static Condition attempted = lockAttempted.newCondition();
    private static Condition acquired = lockAcquired.newCondition();

    public static void start() {
        if (locked) {
            locked = false;
            lockAttempted.lock();
            attempted.signal(); //Signal to resume queue execution
            lockAttempted.unlock();
        } else {
            executor.start();
        }
    }

    public static void pause() {
        locked = true;
        if (queue.isEmpty()) {
            queue.add("SELECT 1");
        }
        lockAcquired.lock();
        acquired.awaitUninterruptibly(); //Wait for the lock to be acquired
        lockAcquired.unlock();
    }

    public static void queue(String update) {
        queue.add(update);
    }

    public static class QueueExecutor extends Thread {

        public QueueExecutor() {
            setName("Database Queue Executor");
            setDaemon(true);
        }

        @Override
        public void run() {
            Connection connection = Database.getConnection();
            while (true) {
                try {
                    if (locked) {
                        lockAcquired.lock();
                        acquired.signal(); //Signal that the lock has been acquired
                        lockAcquired.unlock();
                        lockAttempted.lock();
                        attempted.awaitUninterruptibly(); //Wait for the lock to be released
                        lockAttempted.unlock();
                    }
                    String update = queue.take();
                    connection.createStatement().executeUpdate(update);
                    Database.update(update);
                } catch (InterruptedException | SQLException e) {
                    e.printStackTrace();
                }
            }
        }

    }

}
EN

回答 2

Code Review用户

回答已采纳

发布于 2017-08-14 19:03:43

  1. 为什么所有变量都是静态的?你不应该使用它们。
  2. 你不需要那么多锁。实际上,您只需要1和一个AtomicBoolean (locked)。
  3. 如果"SELECT 1"是空的,我不明白为什么要添加queue。那会浪费一些宝贵的时间。
  4. 我认为只要只有一个QueueExecutor,您就不需要使queue线程安全。
  5. 你可以分批执行你的陈述,但我不确定它的利弊。
  6. 除非你100%信任用户,否则你应该使用PreparedStatements,但是即使你信任用户,你也应该使用它,因为它会给你一个速度(也就是查询执行)。
  7. Statements传给queue而不是String不是更好吗?它将允许您轻松地允许用户使用PreparedStatements。

实例实现(未测试):

代码语言:javascript
复制
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.atomic.AtomicBoolean;

public class DatabaseQueue {

    private final Deque<PreparedStatement> queue = new ArrayDeque<>();
    private final QueueExecutor executor = new QueueExecutor();
    private final AtomicBoolean paused = new AtomicBoolean(false);
    private final Object object = new Object();

    public DatabaseQueue() {
        executor.start();
    }

    public void start() {
        if (!paused.compareAndSet(true, false)) {
            throw new IllegalStateException("DatabaseQueue wasn't paused");
        }

        executor.notify();
    }

    public void pause() {
        if (!paused.compareAndSet(false, true)) {
            throw new IllegalStateException("DatabaseQueue is already paused");
        }
    }

    public void close() {
        executor.shutdown();

        paused.set(false); // we want to execute all the statements before closing
        executor.notify();
    }

    public void queue(PreparedStatement statement) {
        queue.addLast(statement);

        if (!paused.get())
            executor.notify();
    }

    public PreparedStatement createStatement(String sql) throws SQLException {
        return Database.getConnection().prepareStatement(sql);
    }

    class QueueExecutor extends Thread {
        private boolean shutdown = false;

        QueueExecutor() {
            setName("Database Queue Executor");
            setDaemon(true);
        }

        void shutdown() {
            shutdown = true;
        }

        @Override
        public void run() {
            // even if we should shutdown we first should execute all the statements
            do {
                if (paused.get() || queue.isEmpty()) {

                    try {
                        this.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

                try {
                    PreparedStatement statement = queue.removeFirst();

                    if (!paused.get())
                        statement.execute();
                    else
                        queue.addFirst(statement);
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            } while (!shutdown || !queue.isEmpty());
        }
    }
}
票数 3
EN

Code Review用户

发布于 2017-08-15 02:23:33

@Mibac的所有点都是有效的。

命名也非常重要。您有locked变量,但是控制它的函数是pause(),这使您的代码混淆。如果变量也被命名为paused,情况会更好。@Mibac在他的代码中做了这个重命名,但是没有明确提到它。

locked没有被声明为volatile,所以它不是线程安全的。或者,正如@Mibac所建议的那样,使用AtomicBoolean会更加清晰。

但是,与其使用布尔值,我认为更好的方法是使用阻塞暂停条件的东西。这样,写入线程中的连续循环就可以简单地阻塞--等待1)队列中的一个新元素,以及2)按顺序排列的暂停条件。然后将元素写入DB。然后返回到while循环的开始,再次等待1)和2)。

例如,您可以使用带有一个许可证的信号量作为2的阻塞条件。例如:

代码语言:javascript
复制
while (...) {
    element = queue.take(); // blocking

    semaphore.acquire();  // blocking
    semaphore.release(); 

    writeToDB(element);
}

信号量允许在获得后立即释放,因为它只是用作一个阻塞机制,以便在另一个线程请求暂停时等待。也许有一种更干净的方法可以用CyclicBarrier、CountDownLatch或Phaser来实现这一点。我认为使用那些块(包括信号量)的构造比只使用布尔标志更干净。锁可能无法工作,因为它可能要求从同一个线程调用.lock().unlock()

以及为了暂停和恢复其他线程:

代码语言:javascript
复制
public void pause() {
    if (semaphore.availablePermits() == 1) {
        semaphore.acquire(); // blocking, but should never happen
    }
}

public void resume() {
    semaphore.release();
}

请注意,只有在没有其他线程请求暂停的情况下,我才获得许可。这基本上意味着,即使有来自多个线程的请求,也只能有一个暂停。在没有发布许可的情况下调用semaphore.release()并没有什么坏处。semaphore.acquire()可以是阻塞调用,但是在这种设置中,它应该只阻塞极小的时间--在很少的情况下,写入线程将同时调用but (),但后面紧跟release()。

票数 2
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/173005

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档