信息发布→ 登录 注册 退出

Java多线程之同步工具类CountDownLatch

发布时间:2026-01-11

点击量:
目录
  • 1 CountDownLatch主要方法
  • 2 CountDownLatch使用例子
  • 3 CountDownLatch源码分析
    • 构造函数
    • countDown方法
    • countDown方法的内部实现
    • await方法
      • (1)不带参数
      • (2)带参数
    • await()方法的内部实现
    • 4 CountDownLatch和CyclicBarrier区别

      前言:

      CountDownLatch是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程执行完后再执行。例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有框架服务之后执行。

      1 CountDownLatch主要方法

      void await():如果当前count大于0,当前线程将会wait,直到count等于0或者中断。 PS:当count等于0的时候,再去调用await()
      线程将不会阻塞,而是立即运行。后面可以通过源码分析得到。
      boolean await(long timeout, TimeUnit unit):使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间。
      void countDown(): 递减锁存器的计数,如果计数到达零,则释放所有等待的线程。
      long getCount() :获得计数的数量

      2 CountDownLatch使用例子

      public class CountDownLatchTest {
          private static final  int N = 4;
          public static void main(String[] args) {
      
              final CountDownLatch latch = new CountDownLatch(4);
      
              for(int i=0;i<N;i++)
              {
                  new Thread(){
                      public void run() {
                          try {
                              System.out.println("子线程"+Thread.currentThread().getName()+"正在执行");
                              Thread.sleep(3000);
                              System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕");
                              latch.countDown();
                              System.out.println("剩余计数"+latch.getCount());
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                      };
                  }.start();
              }
      
      
              try {
                  System.out.println("等待"+N+"个子线程执行完毕...");
                  latch.await();
                  System.out.println(N+"个子线程已经执行完毕");
                  System.out.println("继续执行主线程");
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
          }
      }
      
      

      子线程Thread-1正在执行
      子线程Thread-3正在执行
      子线程Thread-2正在执行
      等待4个子线程执行完毕...
      子线程Thread-0正在执行
      子线程Thread-3执行完毕
      子线程Thread-2执行完毕
      剩余计数2
      子线程Thread-1执行完毕
      剩余计数1
      子线程Thread-0执行完毕
      剩余计数3
      剩余计数0
      4个子线程已经执行完毕
      继续执行主线程

      3 CountDownLatch源码分析

      CountDownLatch是通过计数器的方式来实现,计数器的初始值为线程的数量。每当一个线程完成了自己的任务之后,就会对计数器减1,当计数器的值为0时,表示所有线程完成了任务,此时等待在闭锁上的线程才继续执行,从而达到等待其他线程完成任务之后才继续执行的目的。

      构造函数

      public CountDownLatch(int count) {
          if (count < 0) throw new IllegalArgumentException("count < 0");
          this.sync = new Sync(count);
      }
      
      

      通过传入一个数值来创建一个CountDownLatch,数值表示线程可以从等待状态恢复,countDown方法必须被调用的次数

      countDown方法

      public void countDown() {
              sync.releaseShared(1);
          }
      
      

      线程调用此方法对count进行减1。当count本来就为0,此方法不做任何操作,当count比0大,调用此方法进行减1,当new count为0,释放所有等待当线程。

      countDown方法的内部实现

         /**
           * Decrements the count of the latch, releasing all waiting threads if
           * the count reaches zero.
           *
           * <p>If the current count is greater than zero then it is decremented.
           * If the new count is zero then all waiting threads are re-enabled for
           * thread scheduling purposes.
           *
           * <p>If the current count equals zero then nothing happens.
           */
          public void countDown() {
              sync.releaseShared(1);
          }
      
      
          public final boolean releaseShared(int arg) {
              if (tryReleaseShared(arg)) {
                  doReleaseShared();//释放所有正在等待的线程节点
                  return true;
              }
              return false;
          }
      
              protected boolean tryReleaseShared(int releases) {
                  // Decrement count; signal when transition to zero
                  for (;;) {
                      int c = getState();
                      if (c == 0)
                          return false;
                      int nextc = c-1;
                      if (compareAndSetState(c, nextc))
                          return nextc == 0;
                  }
              }
          private void doReleaseShared() {
              for (;;) {
                  Node h = head;
                  if (h != null && h != tail) {
                      int ws = h.waitStatus;
                      if (ws == Node.SIGNAL) {
                          if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                              continue;            // loop to recheck cases
                          unparkSuccessor(h);
                      }
                      else if (ws == 0 &&
                               !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                          continue;                // loop on failed CAS
                  }
                  if (h == head)                   // loop if head changed
                      break;
              }
          }
      
      

      await方法

      (1)不带参数

      public void await() throws InterruptedException {
          sync.acquireSharedInterruptibly(1);
      }
      
      

      调用此方法时,当count为0,直接返回true,当count比0大,线程会一直等待,直到count的值变为0,或者线程被中断(interepted,此时会抛出中断异常)。

      (2)带参数

      public boolean await(long timeout, TimeUnit unit)
          throws InterruptedException {
          return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
      }
      
      

      调用此方法时,当count为0,直接返回true,当count比0大,线程会等待一段时间,等待时间内如果count的值变为0,返回true;当超出等待时间,返回false;或者等待时间内线程被中断,此时会抛出中断异常。

      await()方法的内部实现

      public void await() throws InterruptedException {
              sync.acquireSharedInterruptibly(1);
          }
      
      
      

      具体如下:

      • 1、检测中断标志位
      • 2、调用tryAcquireShared方法来检查AQS标志位state是否等于0,如果state等于0,则说明不需要等待,立即返回,否则进行3
      • 3、调用doAcquireSharedInterruptibly方法进入AQS同步队列进行等待,并不断的自旋检测是否需要唤醒
          public final void acquireSharedInterruptibly(int arg)
                  throws InterruptedException {
              if (Thread.interrupted())
                  throw new InterruptedException();
      
              if (tryAcquireShared(arg) < 0)
                  doAcquireSharedInterruptibly(arg);
          }
          /*
              函数功能:根据AQS的状态位state来返回值,
              如果为state=0,返回 1
              如果state=1,则返回-1
          */
          protected int tryAcquireShared(int acquires) {
              return (getState() == 0) ? 1 : -1;
          }
      
          /**
           * Acquires in shared interruptible mode.
           * @param arg the acquire argument
           */
          private void doAcquireSharedInterruptibly(int arg)
              throws InterruptedException {
              final Node node = addWaiter(Node.SHARED);
              boolean failed = true;
              try {
                  for (;;) {
                      final Node p = node.predecessor();
                      if (p == head) {
                          int r = tryAcquireShared(arg);
                          if (r >= 0) {//如果大于零,则说明需要唤醒
                              setHeadAndPropagate(node, r);
                              p.next = null; // help GC
                              failed = false;
                              return;
                          }
                      }
                      if (shouldParkAfterFailedAcquire(p, node) &&
                          parkAndCheckInterrupt())
                          throw new InterruptedException();
                  }
              } finally {
                  if (failed)
                      cancelAcquire(node);
              }
          }
      
      

      4 CountDownLatch和CyclicBarrier区别

      CountDownLatchCyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同:

      • CountDownLatch一般用于某个线程A等待若干个其他线程执行完任务之后,它才执行;
      • CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行;

      CountDownLatch是不能够重用的,而CyclicBarrier是可以重用的。

      在线客服
      服务热线

      服务热线

      4008888355

      微信咨询
      二维码
      返回顶部
      ×二维码

      截屏,微信识别二维码

      打开微信

      微信号已复制,请打开微信添加咨询详情!