我有一个类DatabaseQueue,它异步运行从LinkedBlockingQueue获取的SQLite更新。
我为这个类实现了一个暂停机制,理想情况下,它应该尽快停止队列的执行,然后在执行更多查询之前等待它被取消暂停。
我对并发编程非常陌生,但我用多个ReentrantLock's和Condition's成功地完成了这个工作,虽然这看起来非常冗长和笨拙,尤其是所有的锁()、‘1996和解锁()’。
此外,我也不喜欢为了让executor线程继续其循环而不得不执行额外的虚拟查询。有什么更好的方式来实现这一点呢?
import java.sql.Connection;
import java.sql.SQLException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class DatabaseQueue {
private static final LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
private static final QueueExecutor executor = new QueueExecutor();
private static boolean locked = false;
private static ReentrantLock lockAttempted = new ReentrantLock();
private static ReentrantLock lockAcquired = new ReentrantLock();
private static Condition attempted = lockAttempted.newCondition();
private static Condition acquired = lockAcquired.newCondition();
public static void start() {
if (locked) {
locked = false;
lockAttempted.lock();
attempted.signal(); //Signal to resume queue execution
lockAttempted.unlock();
} else {
executor.start();
}
}
public static void pause() {
locked = true;
if (queue.isEmpty()) {
queue.add("SELECT 1");
}
lockAcquired.lock();
acquired.awaitUninterruptibly(); //Wait for the lock to be acquired
lockAcquired.unlock();
}
public static void queue(String update) {
queue.add(update);
}
public static class QueueExecutor extends Thread {
public QueueExecutor() {
setName("Database Queue Executor");
setDaemon(true);
}
@Override
public void run() {
Connection connection = Database.getConnection();
while (true) {
try {
if (locked) {
lockAcquired.lock();
acquired.signal(); //Signal that the lock has been acquired
lockAcquired.unlock();
lockAttempted.lock();
attempted.awaitUninterruptibly(); //Wait for the lock to be released
lockAttempted.unlock();
}
String update = queue.take();
connection.createStatement().executeUpdate(update);
Database.update(update);
} catch (InterruptedException | SQLException e) {
e.printStackTrace();
}
}
}
}
}发布于 2017-08-14 19:03:43
AtomicBoolean (locked)。"SELECT 1"是空的,我不明白为什么要添加queue。那会浪费一些宝贵的时间。QueueExecutor,您就不需要使queue线程安全。PreparedStatements,但是即使你信任用户,你也应该使用它,因为它会给你一个速度(也就是查询执行)。Statements传给queue而不是String不是更好吗?它将允许您轻松地允许用户使用PreparedStatements。实例实现(未测试):
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.atomic.AtomicBoolean;
public class DatabaseQueue {
private final Deque<PreparedStatement> queue = new ArrayDeque<>();
private final QueueExecutor executor = new QueueExecutor();
private final AtomicBoolean paused = new AtomicBoolean(false);
private final Object object = new Object();
public DatabaseQueue() {
executor.start();
}
public void start() {
if (!paused.compareAndSet(true, false)) {
throw new IllegalStateException("DatabaseQueue wasn't paused");
}
executor.notify();
}
public void pause() {
if (!paused.compareAndSet(false, true)) {
throw new IllegalStateException("DatabaseQueue is already paused");
}
}
public void close() {
executor.shutdown();
paused.set(false); // we want to execute all the statements before closing
executor.notify();
}
public void queue(PreparedStatement statement) {
queue.addLast(statement);
if (!paused.get())
executor.notify();
}
public PreparedStatement createStatement(String sql) throws SQLException {
return Database.getConnection().prepareStatement(sql);
}
class QueueExecutor extends Thread {
private boolean shutdown = false;
QueueExecutor() {
setName("Database Queue Executor");
setDaemon(true);
}
void shutdown() {
shutdown = true;
}
@Override
public void run() {
// even if we should shutdown we first should execute all the statements
do {
if (paused.get() || queue.isEmpty()) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
PreparedStatement statement = queue.removeFirst();
if (!paused.get())
statement.execute();
else
queue.addFirst(statement);
} catch (SQLException e) {
e.printStackTrace();
}
} while (!shutdown || !queue.isEmpty());
}
}
}发布于 2017-08-15 02:23:33
@Mibac的所有点都是有效的。
命名也非常重要。您有locked变量,但是控制它的函数是pause(),这使您的代码混淆。如果变量也被命名为paused,情况会更好。@Mibac在他的代码中做了这个重命名,但是没有明确提到它。
locked没有被声明为volatile,所以它不是线程安全的。或者,正如@Mibac所建议的那样,使用AtomicBoolean会更加清晰。
但是,与其使用布尔值,我认为更好的方法是使用阻塞暂停条件的东西。这样,写入线程中的连续循环就可以简单地阻塞--等待1)队列中的一个新元素,以及2)按顺序排列的暂停条件。然后将元素写入DB。然后返回到while循环的开始,再次等待1)和2)。
例如,您可以使用带有一个许可证的信号量作为2的阻塞条件。例如:
while (...) {
element = queue.take(); // blocking
semaphore.acquire(); // blocking
semaphore.release();
writeToDB(element);
}信号量允许在获得后立即释放,因为它只是用作一个阻塞机制,以便在另一个线程请求暂停时等待。也许有一种更干净的方法可以用CyclicBarrier、CountDownLatch或Phaser来实现这一点。我认为使用那些块(包括信号量)的构造比只使用布尔标志更干净。锁可能无法工作,因为它可能要求从同一个线程调用.lock()和.unlock()。
以及为了暂停和恢复其他线程:
public void pause() {
if (semaphore.availablePermits() == 1) {
semaphore.acquire(); // blocking, but should never happen
}
}
public void resume() {
semaphore.release();
}请注意,只有在没有其他线程请求暂停的情况下,我才获得许可。这基本上意味着,即使有来自多个线程的请求,也只能有一个暂停。在没有发布许可的情况下调用semaphore.release()并没有什么坏处。semaphore.acquire()可以是阻塞调用,但是在这种设置中,它应该只阻塞极小的时间--在很少的情况下,写入线程将同时调用but (),但后面紧跟release()。
https://codereview.stackexchange.com/questions/173005
复制相似问题