闭锁是一种同步工具类,可以延迟线程的进度直到其到达终止状态,可以用来确保某些活动直到其他活动都完成后才继续执行。
适用场景如下
- 确保某个计算在其需要的所有资源都被初始化之后才继续执行。
- 确保某个服务在其依赖的所有其他服务都已经启动后才启动。
- 确保等待某个操作的所有参与者都就绪后再继续执行。
总之用于需要某个线程需要等待其他线程都做完某项工作才继续执行的场景。例如程序想让若干线程同时启动,如果在程序中对每个线程顺序调用start()方法,必然无法做到同时启动,此时可以利用闭锁实现,比如使用java.util.concurrent包下的CountDownLatch类。
CountDownLatch类 CountDownLatch的作用及其实现思想CountDownLatch是闭锁的一种实现,用于协调线程间同步。
CountDownLatch类用一个计数器表示闭锁状态,该计数器初始化时应赋予一个正整数,表示需要等待的事件数量。CountDownLatch类中有两个关键方法,分别为
- await():调用该方法的线程将进入阻塞状态,直到计数器值为0。
- countDown():递减计数器,表示有一个需要等待的事件已经执行完成。
整个闭锁过程基于AQS队列同步器实现,接下来一起来看一下CountDownLatch的源码
CountDownLatch源码简析首先关注一下CountDownLatch的静态内部类Sync以及构造方法(部分不必要源码省略)
package java.util.concurrent; import java.util.concurrent.locks.AbstractQueuedSynchronizer; public class CountDownLatch { private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; //AQS中的state是一个volatile变量 保证了可见性 Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } private final Sync sync; //注意CountDowLatch类只有此构造方法 相当于约束用户必须对计数器初始化赋值 public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } }
接下来关注await()方法和countDown()方法的实现
//await()方法实现 public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) //此方法用于阻塞线程 与本文讨论相关性不大 不展开讨论 doAcquireSharedInterruptibly(arg); }
//countDown()方法实现 public void countDown() { //同上 传入参数1无实际意义 sync.releaseShared(1); } public final boolean releaseShared(int arg) { //调用了Sync中重写的tryReleaseShared方法进行判断 if (tryReleaseShared(arg)) { //释放共享锁操作 不展开讨论 doReleaseShared(); return true; } return false; }
此外CountDownLatch中还提供了一个阻塞超时处理方法await(long timeout, TimeUnit unit),在超过这个方法设定的时间后,便不会阻塞当前线程。
CountDownLatch使用示例假设有三项任务需要完成,分别为任务A、任务B、任务C,其中任务B在Main线程中执行,任务A和任务C在分散在其余若干线程中完成,同时任务A为任务B的前置条件,任务B为任务C的前置条件,使用CountDownLatch可以较好的满足此场景需求。(作者初学,例子的形式在实际工作中可能根本不会出现,仅作演示用)示例代码如下
public class CountDownLatchTest { //预设准备线程数量 public static final int ThreadNum = 10; public static final CountDownLatch mainGate = new CountDownLatch(ThreadNum); public static final CountDownLatch threadGate = new CountDownLatch(1); public static void method() { for (int i = 0; i < ThreadNum; i++) { int finalI = i; Thread thread = new Thread(){ @Override public void run(){ try { System.out.println("线程"+ finalI +"任务A执行完成"); mainGate.countDown(); threadGate.await(); System.out.println("线程"+ finalI + "阻塞结束,继续执行任务C,启动时间:" + System.currentTimeMillis()); } catch (InterruptedException e) { } } }; thread.start(); } } public static void main(String[] args) { new Thread(){ @Override public void run(){ try { //让匿名线程sleep一小段时间,好让主线程先于method()调用startGate的await()方法 Thread.sleep(100); method(); } catch (InterruptedException e) { } } }.start(); try { System.out.println("Main线程阻塞"); mainGate.await(); System.out.println("Main线程阻塞结束"); System.out.println("任务B执行完成"); threadGate.countDown(); } catch (InterruptedException e) { } } }
执行结果如下: Main线程阻塞 线程0任务A执行完成 线程1任务A执行完成 线程2任务A执行完成 线程3任务A执行完成 线程4任务A执行完成 线程5任务A执行完成 线程6任务A执行完成 线程7任务A执行完成 线程8任务A执行完成 线程9任务A执行完成 Main线程阻塞结束 任务B执行完成 线程0阻塞结束,继续执行任务C,启动时间:1635316374515 线程2阻塞结束,继续执行任务C,启动时间:1635316374515 线程6阻塞结束,继续执行任务C,启动时间:1635316374515 线程9阻塞结束,继续执行任务C,启动时间:1635316374515 线程1阻塞结束,继续执行任务C,启动时间:1635316374515 线程8阻塞结束,继续执行任务C,启动时间:1635316374515 线程7阻塞结束,继续执行任务C,启动时间:1635316374515 线程5阻塞结束,继续执行任务C,启动时间:1635316374515 线程4阻塞结束,继续执行任务C,启动时间:1635316374515 线程3阻塞结束,继续执行任务C,启动时间:1635316374515 Process finished with exit code 0CountDownLatch的缺陷
CountDownLatch只能使用一次,因为计数器的值在构造函数初始化后便不能被改变,无法循环使用。
作者才疏学浅,如文中出现纰漏,还望指正