ReentrantLock
Lock lock = new ReentrantLock();
for (int i = 0; i < 3; i++) {
new Thread(() -> {
lock.lock();
System.out.println(Thread.currentThread().getName() + " " + LocalDateTime.now());
sleep(TimeUnit.SECONDS, 2);
lock.unlock();
}).start();
}
---------------------------------执行结果------------------------------------
Thread-0 2020-01-01T17:12:24.678
Thread-1 2020-01-01T17:12:26.681
Thread-2 2020-01-01T17:12:28.681独占锁 公平锁在tryAcquire方法中会先判断队列中有没有阻塞节点,有就加入队列,没有就通过CAS尝试获取锁非公平锁在tryAcquire中不管队列中有没有阻塞节点,直接先通过CAS尝试获取锁,获取成功就返回,获取失败就加入阻塞队列CountDownLatch countDownLatch = new CountDownLatch(3);
new Thread(() -> {
sleep(TimeUnit.MILLISECONDS, 80);
System.out.println(Thread.currentThread().getName() + " Finished");
countDownLatch.countDown();
}).start();
new Thread(() -> {
sleep(TimeUnit.MILLISECONDS, 50);
System.out.println(Thread.currentThread().getName() + " Finished");
countDownLatch.countDown();
}).start();
new Thread(() -> {
sleep(TimeUnit.MILLISECONDS, 60);
System.out.println(Thread.currentThread().getName() + " Finished");
countDownLatch.countDown();
}).start();
new Thread(() -> {
sleep(TimeUnit.MILLISECONDS, 60);
System.out.println(Thread.currentThread().getName() + " Finished");
countDownLatch.countDown();
}).start();
countDownLatch.await();
System.out.println("All Finished");
---------------------------------执行结果------------------------------------
Thread-1 Finished
Thread-2 Finished
Thread-3 Finished
All Finished
Thread-0 Finished共享锁 state的值CountDownLatch#await方法的时候,会判断state的值是否等于0,不等于0就添加到阻塞队列,等于0就直接返回CountDownLatch#countDown方法的时候,state减1,判断state的值是否等于0,等于0就释放因调用CountDownLatch#await方法而阻塞的线程,不等于0就直接返回CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> System.out.println("阻塞解除之后执行一些逻辑 " + LocalDateTime.now()));
for (int i = 0; i < 3; i++) {
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " doXX");
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " doXX");
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
sleep(TimeUnit.SECONDS, 2);
}
---------------------------------执行结果------------------------------------
Thread-0 doXX
Thread-1 doXX
阻塞解除之后执行一些逻辑 2020-01-01T14:19:15.735
Thread-2 doXX
Thread-3 doXX
阻塞解除之后执行一些逻辑 2020-01-01T14:19:17.706
Thread-4 doXX
Thread-5 doXX
阻塞解除之后执行一些逻辑 2020-01-01T14:19:19.706字面意思就是循环壁垒,使用上与CountDownLatch类似,不过实现上完全不一样,CyclicBarrier统计的的是调用了CyclicBarrier#await方法的线程数,当线程数达到了CyclicBarrier初始时规定的数目时,所有进入等待状态的线程将被唤醒然后进入下一轮,可以重复使用
ReentrantLock和Condition CyclicBarrier#await之前,需要先执行ReentrantLock#lock方法,完成之后执行ReentrantLock#unlock方法,即通过ReentrantLock保证执行CyclicBarrier#await方法是安全的CyclicBarrier#await方法的时候,不满足释放条件(调用CyclicBarrier#await方法的线程数不等于初始值)时,会调用Condition#await方法是当前线程阻塞,满足释放条件时会调用Condition#signalAll唤醒所有阻塞的线程然后进入下一轮CyclicBarrier中有一个内部类Generation,该内部类就表示一轮一轮循环的意思,当满足释放条件时,除了唤醒所有因调用CyclicBarrier#await方法而阻塞的线程,还会生成一个新的Generation对象,代表下一轮开始信号量,可以用来控制同时访问资源的线程个数,比如可以用在对线程数的限流,在初始化的时候需要用户传入许可的数量,通过Semaphore#acquire方法获取一个许可,如果Semaphore还有许可可获取就直接返回,否则阻塞当前线程
Semaphore semaphore = new Semaphore(2);
for (int i = 0; i < 4; i++) {
new Thread(() -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "访问资源");
sleep(TimeUnit.MILLISECONDS, 10 + new Random().nextInt(10));
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(Thread.currentThread().getName() + "释放资源");
semaphore.release();
}
}).start();
}
---------------------------------执行结果------------------------------------
Thread-1访问资源
Thread-0访问资源
Thread-0释放资源
Thread-2访问资源
Thread-1释放资源
Thread-3访问资源
Thread-2释放资源
Thread-3释放资源共享锁 Sync实现了AbstractQueuedSynchronizer,Sync实现类有两个,对应公平锁和非公平锁Semaphore其实就是一个共享锁,根据我们对共享锁的了解,共享锁表示同一时刻最多有多少个线程持有锁,这和Semaphore的特性是一致的,所以Semaphore只需要再封装一层API,调用Semaphore#acquire方法时候获取锁或阻塞,调用,调用Semaphore#release`通过自旋释放锁就可以了ReentrantLock和Condition 优先级 无界 阻塞队列Delayed接口,Delayed#getDelay方法返回值<=0代表该元素延迟时间已到,可以出队了优先级 保证元素插入的顺序,基于Delayed#getDelay来控制元素延迟的时间,基于Condition控制在获取元素时是阻塞还是返回(队首元素)ReentrantLock和2个Condition,分别为ReentrantLock存取锁和Condition存取锁 有界 阻塞队列,如果不指定,容量默认为Integer.MAX_VALUE Node节点,每个Node节点有一个next指针尾进头出,添加元素的时候放到尾部,获取元素的时候放到头部涉及到的方法:LinkedBlockingQueue#put,LinkedBlockingQueue#offer,如果队列满了,前者会阻塞,后者不会阻塞
ReentrantLock存锁 Condition存锁阻塞当前线程,否则添加元素入队,将当前元素添加到队尾,将上次队位元素的next指针指向该元素,更新last指向容量==0,唤醒Condition写锁 涉及到的方法:LinkedBlockingQueue#take,LinkedBlockingQueue#poll,如果队列空了,前者会阻塞,后者不会阻塞
ReentrantLock写锁 Condition写锁阻塞当前线程,否则获取元素出队,从队首去取出一个元素,更新head指向容量满了,唤醒Condition读锁 ReentrantLock和2个Condition,两个Condition分别用于判断是否为空和是否已满有界 阻塞队列,基于final数组,自然是有界的final数组,,在初始化ArrayBlockingQueue的时候需要指定容量涉及到的方法:ArrayBlockingQueue#put,ArrayBlockingQueue#offer,如果队列满了,前者会阻塞,后者不会阻塞
ReentrantLock锁Condition阻塞当前线程putIndex用于记录下次添加元素时对应的数组下标,当takeIndex==队列.length的时候,重置该变量为0Condition 涉及到的方法:ArrayBlockingQueue#take,ArrayBlockingQueue#poll,如果队列空了,前者会阻塞,后者不会阻塞
ReentrantLock锁Condition阻塞当前线程takeIndex用于记录下次获取元素时对应的数组下标,当takeIndex==队列.length的时候,重置该变量为0Condition 进行线程间数据交换的利器 todo
ReentrantLock和Condition 优先级 无界 阻塞队列,优先级的实现基于二叉堆 null,且需要实现Comparable接口优先级的 无界阻塞队列,优先级可以基于自然排序,也可以基于Comparable接口,取决于你使用哪个构造函数
涉及到的方法: PriorityBlockingQueue#add PriorityBlockingQueue#offer
ReentrantLock获取锁size >= queue.length,如果条件成立就扩容
3.通过Comparable#和二叉堆便利找到合适的位置插入元素size加1PriorityBlockingQueue#take方法而阻塞的线程(即队列中没有元素的时候),这个通过Condition实现涉及到的方法: PriorityBlockingQueue#poll PriorityBlockingQueue#take
ReentrantLock获取锁PriorityBlockingQueue#poll方法,在队列中没有元素的时候直接返回null,不会阻塞当前线程PriorityBlockingQueue#take方法,在队列中没有元素的时候会阻塞当前线程,知道队列中有元素然后再被唤醒返回,基于Condition实现涉及到的方法: PriorityBlockingQueue#tryGrow
ReentrantLock#unlock方法,释放ReentrantLock锁,为什么要释放这个锁呢?我猜这里是为了提高性能,在扩容之前先释放锁,然后通过一个CAS变量来控制扩容的并发问题,这样在扩容期间就不会接阻塞其他调用线程,比如take操作,很妙volatile变量表示,0表示目前不在扩容,1表示正在扩容,每次扩容之前通过CAS将其设置为1,如果CAS失败说明目前有其他线程正在扩容,此时不做处理*2+2;如果不小于64,则对容量*1.5 Object[] Thread#yield让出当前CPUSystem#arraycopy对行队列赋值,在赋值之前需要先通过ReentrantLock#lock再次获取锁可以将它看成是一个线程安全的ArrayList,在涉及到修改操作时,通过ReentrantLock获取锁,然后复制一个新的数组去修改,基于volatile语义可以读数据时不会有问题,适用于读多写少的场景,如果写比较多的比较影响性能
ReentrantLock和volatile null 例如CopyOnWriteArrayList#get方法,直接返回数组下标对应的元素即可
例如CopyOnWriteArrayList#set CopyOnWriteArrayList#add`方法
ReentrantLock锁内部持有一个CopyOnWriteArrayList引用,也就是它的实现完全是基于CopyOnWriteArrayList,那它是如何保证元素不唯一呢?在CopyOnWriteArrayList中有一个addIfAbsent方法,该方法会通过遍历的方式去判断你要添加的元素是否存在.
适合读多写少的场景

StringBuilder data = new StringBuilder("data");
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
new Thread((() -> {
// sleep一会,让写锁先持有锁
sleep(TimeUnit.MILLISECONDS, 10);
readWriteLock.readLock().lock();
System.out.println(Thread.currentThread().getName() + "获得读锁 " + LocalDateTime.now());
System.out.println(Thread.currentThread().getName() + data);
readWriteLock.readLock().unlock();
}), "readLock ").start();
new Thread((() -> {
readWriteLock.writeLock().lock();
System.out.println(Thread.currentThread().getName() + "获得写锁 " + LocalDateTime.now());
sleep(TimeUnit.SECONDS, 3);
data.append("666");
// 锁降级 这里这样子使用降级感觉没有什么意思,那锁降级一般用到什么场景
readWriteLock.readLock().lock();
System.out.println(Thread.currentThread().getName() + data);
readWriteLock.writeLock().unlock();
}), "writeLock ").start();public ReentrantReadWriteLock() {
this(false);
}
// 可以在创建ReentrantReadWriteLock时选择公平模式还是非公平模式
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}ReentrantReadWriteLock时选择使用公平锁还是非公平锁Sync继承自AbstractQueuedSynchronizer,它负责实现同步器的模板方法,是实现同步器的关键ReadLock和WriteLock实现了Lock接口,可以将它们看作是API层,具体逻辑委托给Sync实现,面向用户static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** Returns the number of shared holds represented in count */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }public void lock() {
sync.acquireShared(1);
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// 其实这里就涉及到锁降级,如果当前已经有写锁,返回-1,将它加入到阻塞队列, 否则继续尝试获取读锁
if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
// readerShouldBlock主要是针对公平锁和非公平锁, `c + SHARED_UNIT`是因为共享锁用的是高16位
if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
// firstReader是把读锁状态从0变成1的那个线程
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// 锁重入
firstReaderHoldCount++;
} else {
// cachedHoldCounter是上一个获取锁成功的线程
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
// 上面的if失败了,即通过自旋获取锁
return fullTryAcquireShared(current);
}final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
// 如果 firstReader 是当前线程
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
// 并且firstReaderHoldCount == 1,说明释放锁之后需要重置firstReader
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}state==0,需要唤醒阻塞的写锁protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
// state != 0 && w==0 说明有读锁
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire 锁重入
setState(c + acquires);
return true;
}
// writerShouldBlock用于确定公平和非公平模式
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}判断是又读锁,判断是否锁重入,通过CAS设置状态
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}和正常的独占锁释放一样