CountDownLatch 闭锁

我们在 java 手写并发框架(一)异步查询转同步的7种实现方式从零手写并发框架(二)异步转同步实现4种锁策略 都是用过这个类,感兴趣的小伙伴可以看一下。

说明

CountDownLatch 是一种同步工具类,它允许一个或多个线程等待,直到在其他线程中执行的一组操作完成为止。

可以让一个线程等待一组事件发生后(不一定要线程结束)继续执行;

闭锁是一种同步工具类,可以延迟线程的进度直到其达到终止状态。

闭锁的作用相当于一扇门:在闭锁到达结束状态之前,这扇门一直是关闭的,并且没有任何线程能通过,当到达结束状态时,这扇门会打开并允许所有的线程通过。当闭锁到达结束状态后,将不会再改变状态,因此这扇门将永远保持打开状态。

闭锁可以用来确保某些活动直到其它活动都完成后才继续执行,例如:

1、确保某个计算在其需要的所有资源都被初始化之后才继续执行。二元闭锁(包括两个状态)可以用来表示“资源R已经被初始化”,而所有需要R的操作都必须先在这个闭锁上等待。

2、确保某个服务在其依赖的所有其它服务都已经启动之后才启动。每个服务都有一个相关的二元闭锁。当启动服务S时,将首先在S依赖的其它服务的闭锁上等待,在所有依赖的服务都启动后会释放闭锁S,这样其他依赖S的服务才能继续执行。

3、等待直到某个操作的所有参与者(例如,在多玩家游戏中的所有玩家)都就绪再继续执行。在这种情况中,当所有玩家都准备就绪时,闭锁将到达结束状态。

使用例子

一个简单的使用例子如下:

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class TestHarness { public long timeTakes(int nThreads, final Runnable task) throws InterruptedException { final CountDownLatch startGate = new CountDownLatch(1); final CountDownLatch endGate = new CountDownLatch(nThreads); for (int i = 0; i < nThreads; i++) { Thread t = new Thread() { public void run() { try { startGate.await(); try { task.run(); } finally { endGate.countDown(); } } catch (InterruptedException e) { e.printStackTrace(); } } }; t.start(); } long start = System.nanoTime(); startGate.countDown(); endGate.await(); long end = System.nanoTime(); return end - start; } public static void main(String[] args) throws InterruptedException { TestHarness testHarness = new TestHarness(); long nanoTime = testHarness.timeTakes(10, new Runnable() { public void run() { try { Thread.sleep(1000); System.out.println("Task is over!!!"); } catch (InterruptedException e) { e.printStackTrace(); } } }); System.out.println(nanoTime); } }

说明

创建一个 CountDownLatch

  [java]
1
CountDownLatch startGate = new CountDownLatch(1);

执行等待:

  [java]
1
startGate.await();

表示资源已经完成,可以使用下面的方法通知:

  [java]
1
startGate.countDown();

当降低到 0 的时候,这个门就被打开了,就可以通行了。

实际上这个实现原理和其他锁是一样的,我们一起来看一下实现源码。

CountDownLatch 源码

类声明

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class CountDownLatch { private final Sync sync; /** * Constructs a {@code CountDownLatch} initialized with the given count. * * @param count the number of times {@link #countDown} must be invoked * before threads can pass through {@link #await} * @throws IllegalArgumentException if {@code count} is negative */ public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } }

这个类的内部变量非常简单,只有一个 Sync 对象,这个类的实现实现主要继承自 AQS。

Sync 实现

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
/** * Synchronization control For CountDownLatch. * Uses AQS state to represent count. */ private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } }

其实实现还算简单,这已经是 CountDownLatch 中最复杂的一个实现了。

尝试获取锁

  [java]
1
2
3
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }

这个是尝试获取共享锁。

可见只有当 count == 0 的时候,才能获取成功。

尝试释放锁

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero // 这里就是一个 while(true) 循环 for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }

还是会判断,如果 c == 0,说明锁已经释放过了,直接返回 false。

nextc 就是 c-1,听过 CAS 进行设置。

如果 c-1 == 0,则返回释放锁成功。

基本方法

看完了上面的 Sync 实现,其他的方法就变得非常简单了。

await 等待

  [java]
1
2
3
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }

这里实际是调用的 Sync 父类 AQS 中的方法:

  [java]
1
2
3
4
5
6
7
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }

tryAcquireShared 是 Sync 中实现的方法,尝试获取共享锁。

如果获取失败,则会调用 doAcquireSharedInterruptibly 通过共享可中断的模式获取锁。

await 指定超时时间的等待

有时候业务不允许我们一直等待下去,可以通过指定超时时间:

  [java]
1
2
3
4
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); }

tryAcquireSharedNanos 也是 AQS 中的基本方法:

  [java]
1
2
3
4
5
6
7
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); }

这里实现也是类似的,首先调用 tryAcquireShared 共享模式获取锁,然后调用 doAcquireSharedNanos 方法。这个方法就是指定了对应的超时时间。

countDown 减少闭锁的次数

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
/** * Decrements the count of the latch, releasing all waiting threads if * the count reaches zero. * * <p>If the current count is greater than zero then it is decremented. * If the new count is zero then all waiting threads are re-enabled for * thread scheduling purposes. * * <p>If the current count equals zero then nothing happens. */ public void countDown() { sync.releaseShared(1); }

这里还是有点差异的:

(1)如果 count 大于 0,则只是减少 1

(2)如果 count 等于 0,则可以唤醒所有等待的线程。

这里的 releaseShared 调用的也是 AQS 中的方法。

getCount 获取当前的次数

  [java]
1
2
3
4
5
6
7
8
9
10
/** * Returns the current count. * * <p>This method is typically used for debugging and testing purposes. * * @return the current count */ public long getCount() { return sync.getCount(); }

这个注释也说了,一般用于 debug 或者测试。

实际使用中很少用到。

小结

CountDownLatch 作为一个并发的控制工具,使用起来非常的方便,使用起来也并发不麻烦。

希望本文对你有帮助,如果有其他想法的话,也可以评论区和大家分享哦。

各位极客的点赞收藏转发,是老马持续写作的最大动力!

参考资料

linux 锁实现