我有一个系统,它将一个大的任务分解成小任务,每次使用大约30个线程。当每个单独的线程完成时,它将其计算结果持久化到数据库中。我想要实现的是让每个线程将其结果传递给一个新的persisance类,该类将在其自己的线程中运行时执行一种双缓冲和数据持久化。
例如,在100个线程将它们的数据移动到缓冲区后,persistance类交换缓冲区并将所有100个条目持久化到数据库。这将允许使用准备好的语句,从而减少程序和数据库之间的I/O。
有没有这种类型的多线程双缓冲的模式或好的例子?
发布于 2010-05-11 23:42:20
我见过这种模式,称为异步数据库写入或后写模式。这是分布式缓存产品(Teracotta、Coherence、GigaSpaces等)支持的典型模式。因为您不希望缓存更新还包括将更改写入底层数据库。
此模式的复杂性取决于您对丢失的数据库更新的容忍度。由于完成工作和将结果写入数据库之间的延迟,您可能会由于错误、电源故障等原因而丢失更新。(你明白了吧)。
我建议使用某种队列将完成的结果写入数据库,然后以100个为一批(使用您的示例)或在一段时间后对它们进行处理。使用时间延迟的原因也是为了处理不能被100整除的结果集。
如果您对弹性/耐久性没有要求,那么您可以在同一过程中完成所有这些工作。但是,如果您不能容忍任何损失,那么您可以用持久JMS队列(较慢但更安全)替换vm中的队列。
发布于 2010-05-12 00:04:14
为了具有较低的同步开销,使用线程本地(对于每个计算线程)来构建批处理结果。达到一定数量的结果后,将批处理排入阻塞队列。使用ArrayBlockingQueue来支持您的持久性类,因为您可能不希望您的内存使用变得不受限制。您可以让多个数据库编写器线程获取结果组并将它们保存到数据库中。
class WriteBehindPersister {
ThreadLocal<List<Result>> internalBuffer;
static ArrayBlockingQueue<List<Result>> persistQueue;
static {
persistQueue = new ArrayBlockingQueue(10);
new WriteThread().start();
}
public WriteBehindPersister() {
internalBuffer = new ThreadLocal<List<Result>>();
}
public void persist(Result r) {
List<Result> localResult = internalBuffer.get();
localResult.add(r);
if (localResult.size() > max) {
persistQueue.put(new ArrayList(localResult));
localResult.clear();
}
}
class WriteThread extends Thread {
public void run() {
while (true) {
List<Result> batch = persistQueue.take();
beginTransaction();
for (Result r : batch) {
batchInsert(r);
}
endTransaction();
}
}
}
}此外,您可以使用executor服务(而不是单个写入线程)来同时将多个批处理持久化到DB,但要权衡使用多个DB连接。如果您的驱动程序支持JDBC批处理API,请确保使用它。
https://stackoverflow.com/questions/2811980
复制相似问题