
阻塞队列是一种特殊的队列,它在数据结构的基础上附加了两个额外的操作特性:
简单来说,阻塞队列是一个线程安全的、支持阻塞等待的生产者-消费者模型的核心容器。
阻塞队列的实现原理主要依赖于 锁(Lock) 和 条件变量(Condition)。在Java中,这通常通过 ReentrantLock 和 Condition 来实现。
我们以一个简单的有界数组阻塞队列为例,剖析其核心原理:
ReentrantLock,用于保证所有操作的线程安全性。notEmpty:一个与锁绑定的条件,用于表示“队列非空”。当消费者因队列为空而等待时,会挂在这个条件上。当生产者放入一个新元素后,会唤醒挂在这个条件上的线程。notFull:一个与锁绑定的条件,用于表示“队列未满”。当生产者因队列已满而等待时,会挂在这个条件上。当消费者取走一个元素后,会唤醒挂在这个条件上的线程。put(E e) 方法(阻塞插入)
notFull.await() 释放锁并进入等待状态。e 入队。notEmpty.signal() 或 notEmpty.signalAll(),唤醒一个或所有正在 notEmpty 上等待的消费者线程。take() 方法(阻塞移除)
notEmpty.await() 释放锁并进入等待状态。notFull.signal() 或 notFull.signalAll(),唤醒一个或所有正在 notFull 上等待的生产者线程。关键点总结:
Condition 的 await() 和 signal() 代替传统的 Object.wait() 和 Object.notify(),可以更精确地控制等待和唤醒的线程类型(生产者或消费者),避免了“惊群效应”。await() 返回后,必须重新检查条件(队列是否满/空),这是应对“虚假唤醒”的标准做法。生产者-消费者模型是一种经典的多线程协作模式,它通过一个共享的缓冲区(即阻塞队列) 来解耦生产者和消费者,使他们不必直接通信,而是各自以不同的速率对缓冲区进行操作。
阻塞队列天生就是为这个模型设计的,使用它来实现非常简单优雅。
ArrayBlockingQueue。queue.put(data) 将数据放入队列。如果队列满,put 方法会自动阻塞,直到有空间。queue.take() 从队列中获取数据。如果队列空,take 方法会自动阻塞,直到有数据可用。import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ProducerConsumerExample {
public static void main(String[] args) {
// 1. 创建一个容量为10的阻塞队列
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
// 2. 创建生产者线程
Thread producerThread = new Thread(() -> {
try {
int value = 0;
while (true) {
// 生产数据
queue.put(value);
System.out.println("Produced: " + value);
value++;
// 模拟生产耗时
Thread.sleep(1000);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 3. 创建消费者线程
Thread consumerThread = new Thread(() -> {
try {
while (true) {
// 消费数据
Integer value = queue.take();
System.out.println("Consumed: " + value);
// 模拟消费耗时
Thread.sleep(2000);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 4. 启动线程
producerThread.start();
consumerThread.start();
}
}put 方法处阻塞,等待消费者消费。take 方法处阻塞,等待生产者生产。Java 的 java.util.concurrent 包提供了多种现成的阻塞队列实现,可以直接使用:
ArrayBlockingQueue:基于数组的有界阻塞队列。LinkedBlockingQueue:基于链表的阻塞队列,可选有界或无界。PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。SynchronousQueue:一个不存储元素的阻塞队列,每个插入操作必须等待另一个线程的移除操作,反之亦然。它实现了数据的直接传递。DelayQueue:一个使用优先级队列实现的无界阻塞队列,只有在延迟期满时才能从中提取元素。put,消费者调用 take,无需开发者手动处理线程同步和通信问题,大大简化了并发编程的难度。原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。