五、同步计数器及源码

一、CountDownLatch

1、概述

1、CountDownLatch是一个同步辅助类,直译就是倒计数(CountDown)门闩(Latch)。倒计数不用说,门闩的意思就是阻止执行。在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。用给定的计数初始化CountDownLatch。由于调用了countDown()方法,所以在当前计数到达零之前,await()方法会一直受阻塞。之后会释放所有等待的线程,await()的所有后续调用都将立即返回。
2、主要方法:
  • CountDownLatch(int count):构造一个用给定计数初始化的CountDownLatch。
  • void await():当一个或多个线程调用此方法时,调用线程就会被阻塞,直到锁存器倒计时到零结束等待(或者线程被中断)。
  • void countDown():减少锁存器的计数,如果计数达到零,因调用await方法被阻塞的线程会被唤醒继续执行。如果当前计数大于零,则递减。调用此方法的线程不会被阻塞。
  • long getCount():返回当前计数。
3、应用场景
  • 在一些应用中,需要等待某个条件达到要求后才能做后面的事情;同时当所有线程都完成后,也会触发事件,以便进行后面的操作。

2、使用示例

1、不使用CountDownLatch的情况下,主线程会提前执行完,其他线程还在执行。
/*** @Date: 2022/5/26* 5个人同时约定去旅游,等人到齐之后,乘车出发*/
public class CountDownLatchTest {public static void main(String[] args) throws InterruptedException {for (int i = 1; i <= 5; i++) {final int name = i;new Thread(() -> {System.out.println(Thread.currentThread().getName() + "到达目的地,等待出发");}, "兄弟" + name).start();}System.out.println("人到齐,乘车出发");}
}
/*** 运行结果:* 兄弟3到达目的地,等待出发* 人到齐,乘车出发* 兄弟5到达目的地,等待出发* 兄弟2到达目的地,等待出发* 兄弟4到达目的地,等待出发*/
2、使用CountDownLatch
/*** @Date: 2022/5/26* 5个人同时约定去旅游,等人到齐之后,乘车出发*/
public class CountDownLatchTest {public static void main(String[] args) throws InterruptedException {//计数器CountDownLatch countDownLatch = new CountDownLatch(5);for (int i = 1; i <= 5; i++) {final int name = i;new Thread(() -> {System.out.println(Thread.currentThread().getName() + "到达目的地,等待出发");//计数减1countDownLatch.countDown();}, "兄弟" + name).start();}//等待所有线程执行完成。主线程才继续向下执行countDownLatch.await();System.out.println("人到齐,乘车出发");}
}
/*** 运行结果:* 兄弟3到达目的地,等待出发* 兄弟5到达目的地,等待出发* 兄弟2到达目的地,等待出发* 兄弟4到达目的地,等待出发* 人到齐,乘车出发*/

3、CountDownLatch原理(部分源码)

//CountDownLatch没有显示继承哪个父类或者实现哪个父接口, 它底层是AQS是通过内部类Sync来实现的
public class CountDownLatch {/*** CountDownLatch的核心实现机制利用AbstractQueuedSynchronizer,简称AQS的state状态来实现Count的阻塞机制*/private static final class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 4982264981922014374L;//构造器Sync(int count) {setState(count);}// 返回当前计数int getCount() {return getState();}/*** 试图在共享模式下获取对象状态* 该函数只是简单的判断AQS的state是否为0,为0则返回1,不为0则返回-1*/protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}/*** 试图设置状态来反映共享模式下的一个释放,重写AQS的释放状态方法,实现自己的逻辑,来削减count线程数*/protected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zerofor (;;) {//获取状态int c = getState();//没有被线程占有if (c == 0)return false;//下一个状态int nextc = c-1;//比较并且设置成功if (compareAndSetState(c, nextc))return nextc == 0;}}}//同步队列private final Sync sync;/*** 构造一个用给定计数初始化的CountDownLatch,并且构造函数内完成了sync的初始化,并设置了状态数*/public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");this.sync = new Sync(count);}/*** 此函数将会使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断*/public void await() throws InterruptedException {//转发到sync对象上//由源码可知,对CountDownLatch对象的await的调用会转发为对Sync的acquireSharedInterruptibly(从AQS继承的方法)方法的调用sync.acquireSharedInterruptibly(1);}/*** 此函数将递减锁存器的计数,如果计数到达零,则释放所有等待的线程*/public void countDown() {//对countDown的调用转换为对Sync对象的releaseShared(从AQS继承而来)方法的调用sync.releaseShared(1);}
}

二、CyclicBarrier

1、概述

1、CyclicBarrier是一个同步辅助类,翻译过来叫循环栅栏、循环(cyclic)屏障(barrier)。它允许一组线程到达某个公共屏障点(common barrier point)时被阻塞,直到最后一个线程到达屏障时,所有被屏障拦截的线程才会继续执行。
2、主要方法:
  • CyclicBarrier(int parties):创建一个新的CyclicBarrier ,它将在给定数量的参与方(线程)等待它时触发,并且在触发障碍时不执行预定义的操作。
  • CyclicBarrier(int parties, Runnable barrierAction):创建一个新的CyclicBarrier ,当给定数量的参与方(线程)正在等待它时,它将触发,并且当障碍被触发时,它将执行给定的屏障动作,由最后一个进入屏障的线程执行。
  • int await():在所有参与者(线程)都已经在此屏障上调用await方法之前,将一直等待。
  • int await(long timeout, TimeUnit unit):在所有参与者(线程)都已经在此屏障上调用await方法之前将一直等待,或者超出了指定的等待时间。
  • int getNumberWaiting():返回当前在屏障处等待的参与者(线程)数目。
  • int getParties():返回触发此障碍所需的参与方(线程)数目。
  • boolean isBroken():查询此屏障是否处于损坏状态。
  • void reset():将屏障重置为其初始状态。如果任何一方当前在屏障处等待,他们将返回BrokenBarrierException 。请注意,由于其他原因发生破损后的重置可能会很复杂;线程需要以其他方式重新同步,并选择一个执行重置。相反,最好为后续使用创建一个新的屏障。

2、使用示例

/*** @Date: 2022/6/8* 汇总用户数据*/
public class CyclicBarrierTest {//用于保存用户数据private static ConcurrentHashMap<Integer, Integer> map = new ConcurrentHashMap<>();private static AtomicInteger sum = new AtomicInteger(0);public static void main(String[] args) {CyclicBarrier barrier = new CyclicBarrier(3, new Thread(() -> {System.out.println("子任务全都执行完成,开始执行汇总结果...");sum.getAndAccumulate(map.values().stream().reduce(Integer::sum).get(), Integer::sum);System.out.println("总结果为:" + sum.get());}));WorkTask workTask1 = new WorkTask(barrier, 893);WorkTask workTask2 = new WorkTask(barrier, 1894);WorkTask workTask3 = new WorkTask(barrier, 3005);workTask1.start();workTask2.start();workTask3.start();}/*** 工作线程*/static class WorkTask extends Thread {//CyclicBarrierprivate CyclicBarrier barrier;//用户idprivate Integer userId;public WorkTask(CyclicBarrier barrier, Integer userId) {this.barrier = barrier;this.userId = userId;}@Overridepublic void run() {try {int num = userId + 1000;System.out.println("用户 " + userId + " 线程开始统计数据,结果为:" + num);//统计不同用户数据map.put(userId, num);//等待barrier.await();//结束等待之后继续执行System.out.println("用户 " + userId + " 线程等待结束之后继续执行其他操作!!!");} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}}
}/*** 运行结果* 用户 1894 线程开始统计数据,结果为:2894* 用户 893 线程开始统计数据,结果为:1893* 用户 3005 线程开始统计数据,结果为:4005* 子任务全都执行完成,开始执行汇总结果...* 子任务汇总结果为:8792* 用户 3005 线程等待结束之后继续执行其他操作!!!* 用户 1894 线程等待结束之后继续执行其他操作!!!* 用户 893 线程等待结束之后继续执行其他操作!!!*/

3、与CountDonwLatch对比

1、CountDownLatch减计数,CyclicBarrier加计数。
2、CountDownLatch是一次性的,CyclicBarrier可以重用。
3、对于CountDownLatch来说,重点是那个“一个线程”在等待,而另外那N个线程在执行完成之后可以继续等待,可以终止。
4、对于CyclicBarrier来说,重点是那N个线程,它们之间任何一个没有完成,所有线程必须等待。

4、CyclicBarrier原理(部分源码)

1、它的实现原理:利用ReentrantLock做线程安全锁,实现线程安全等待,部分源码如下:
public class CyclicBarrier {//可重入锁private final ReentrantLock lock = new ReentrantLock();//利用lock.newCondition()实现主线程的唤醒和等待private final Condition trip = lock.newCondition();//表示同时到达屏障的线程个数private final int parties;//parties个线程到达屏障时,要执行的线程private final Runnable barrierCommand;/*** 重点看下await方法,发现调用的是dowait方法*/public int await() throws InterruptedException, BrokenBarrierException {try {return dowait(false, 0L);} catch (TimeoutException toe) {throw new Error(toe); // cannot happen}}/*** 主要屏障代码*/private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {//保存当前锁final ReentrantLock lock = this.lock;//加锁lock.lock();try {//保存当前代final Generation g = generation;//屏障被破坏,抛出异常if (g.broken)throw new BrokenBarrierException();//线程被打断if (Thread.interrupted()) {//结束当前屏障,并且唤醒所有等待的线程,只有拥有锁的时候才会调用breakBarrier();throw new InterruptedException();}//减少正在等待进入屏障的线程数量int index = --count;//正在等待进入屏障的线程数量为0,所有线程都已经进入,表示parties个线程已经到达屏障if (index == 0) {//运行的动作标识boolean ranAction = false;try {//都到达线程阈之后要执行的线程final Runnable command = barrierCommand;//线程不为空,就运行if (command != null)command.run();//设置ranAction状态ranAction = true;//进入下一代nextGeneration();return 0;} finally {//没有运行动作,就结束当前屏障if (!ranAction)breakBarrier();}}//无限循环,直到所有线程都到达设置的线程数,或者中断,超时才结束for (;;) {try {//没有设置超时时间,就等待if (!timed)trip.await();//设置了等待时间,并且等待时间大于0,就等待指定时长else if (nanos > 0L)nanos = trip.awaitNanos(nanos);} catch (InterruptedException ie) {//等于当前代并且屏障没有被损坏if (g == generation && ! g.broken) {//结束当前屏障breakBarrier();throw ie;} else {//不等于当前带后者是屏障被损坏,就中断当前线程// We're about to finish waiting even if we had not// been interrupted, so this interrupt is deemed to// "belong" to subsequent execution.Thread.currentThread().interrupt();}}//屏障被损坏,抛出异常if (g.broken)throw new BrokenBarrierException();//不等于当前代,返回索引if (g != generation)return index;//设置了等待时间,并且等待时间小于0if (timed && nanos <= 0L) {//结束屏障,并且唤醒所有等待的线程breakBarrier();throw new TimeoutException();}}} finally {//释放锁lock.unlock();}}//其他代码省略......
}

三、Semaphore

1、概述

1、Semaphore是一个计数信号量,利用它可以控制一定数量的请求,从而实现资源访问限制的目的,实际应用中,可以用来限制访问某种资源的数量,比如在Hystrix中就有基于Semaphore的资源隔离策略。
2、最简单的理解信号量就是,一个计数器、一个等待队列、两个方法(在Java实现的Semaphore中就是acquire和release)。
  • 调用一次acquire方法,计数器就减1,如果此时计数器小于0,则阻塞当前线程,否则当前线程可继续执行。
  • 调用一次release方法,计数器就加1,如果此时计数器小于等于0,则唤醒一个等待队列中的线程,并将其中等待队列中移除。
3、主要方法:
  • Semaphore(int permits):创建具有给定的许可数量和给定的非公平设置的Semaphore数量。
  • Semaphore(int permits, boolean fair):创建具有给定的许可数量和给定的公平设置的Semaphore数量。所谓的公平性就是是否先进来的先释放,默认为非公平的
  • void acquire()/void acquire(int permits):从信号量获取一个或多个许可,如果一个可用并立即返回,将可用许可的数量减少一个,如果没有可用的许可,则当前线程出于线程调度目的而被禁用并处于休眠状态。
  • void release()/void release(int permits):释放一个或多个许可证,将其返回给信号量。
  • int availablePermits():返回信号量中当前可用的许可数。
  • int drainPermits():获取并返回立即可用的所有许可。
  • int getQueueLength():返回正在等待获取的线程数的估计值。该值只是一个估计值,因为在此方法遍历内部数据结构时线程数可能会动态变化。此方法设计用于监控系统状态,而不是用于同步控制。
  • boolean hasQueuedThreads():查询是否有线程正在等待获取。请注意,因为取消可能随时发生,所以true的返回并不能保证任何其他线程将永远获取。此方法主要设计用于监控系统状态。
  • void reducePermits(int reduction):根据指定的缩减量减少可用许可证的数量。
  • boolean isFair():如果此信号量的公平性设置为true,则返回true 。
  • boolean tryAcquire():仅当调用时可用时,才从此信号量获取许可。获取一个许可,如果一个可用并立即返回,值为true ,将可用许可的数量减少一个。如果没有可用的许可,则此方法将立即返回值false。

2、使用示例

/*** @Date: 2022/6/11* 停车场占用车位,每走一辆才会停一辆*/
@Slf4j
public class SemaphoreTest {public static void main(String[] args) {//初始化一个信号量为3,默认是false非公平锁, 模拟3个停车位Semaphore semaphore = new Semaphore(3);//模拟多台车for (int i = 1; i <= 6; i++) {new Thread(() -> {try {//进入车位占用semaphore.acquire();log.info("车辆 " + Thread.currentThread().getName() + " 抢到车位");//停车3sTimeUnit.SECONDS.sleep(3);//释放semaphore.release();log.info("车辆 " + Thread.currentThread().getName() + " 离开车位");} catch (InterruptedException e) {e.printStackTrace();}}, i + "").start();}}
}
/*** 运行结果:* 00:50:24.311 [2] INFO com.itan.ut.SemaphoreTest - 车辆 2 抢到车位* 00:50:24.311 [1] INFO com.itan.ut.SemaphoreTest - 车辆 1 抢到车位* 00:50:24.311 [3] INFO com.itan.ut.SemaphoreTest - 车辆 3 抢到车位* 00:50:27.328 [1] INFO com.itan.ut.SemaphoreTest - 车辆 1 离开车位* 00:50:27.328 [3] INFO com.itan.ut.SemaphoreTest - 车辆 3 离开车位* 00:50:27.328 [2] INFO com.itan.ut.SemaphoreTest - 车辆 2 离开车位* 00:50:27.328 [4] INFO com.itan.ut.SemaphoreTest - 车辆 4 抢到车位* 00:50:27.328 [5] INFO com.itan.ut.SemaphoreTest - 车辆 5 抢到车位* 00:50:27.328 [6] INFO com.itan.ut.SemaphoreTest - 车辆 6 抢到车位* 00:50:30.339 [5] INFO com.itan.ut.SemaphoreTest - 车辆 5 离开车位* 00:50:30.339 [6] INFO com.itan.ut.SemaphoreTest - 车辆 6 离开车位* 00:50:30.339 [4] INFO com.itan.ut.SemaphoreTest - 车辆 4 离开车位*/

3、Semaphore实现互斥锁

1、如果Semaphore初始化时设置为1,则和synchronized效果类似
/*** @Date: 2022/6/11* Semaphore实现互斥锁,类似synchronized效果*/
@Slf4j
public class SemaphoreImplMutex {private static final Semaphore semaphore = new Semaphore(1);public static void main(String[] args) {SemaphoreImplMutex semaphoreMutex = new SemaphoreImplMutex();for (int i = 1; i < 4; i++) {new Thread(semaphoreMutex::method, "线程 " + i).start();}}public void method() {//同时只会有一个线程执行此方法!try {semaphore.acquire();log.info(Thread.currentThread().getName() + " 正在执行!");Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();} finally {semaphore.release();log.info(Thread.currentThread().getName() + " 执行结束!");}}
}
/*** 23:05:07.154 [线程 1] INFO com.itan.syncutil.SemaphoreImplMutex - 线程 1 正在执行!* 23:05:08.180 [线程 1] INFO com.itan.syncutil.SemaphoreImplMutex - 线程 1 执行结束!* 23:05:08.186 [线程 3] INFO com.itan.syncutil.SemaphoreImplMutex - 线程 3 正在执行!* 23:05:09.200 [线程 3] INFO com.itan.syncutil.SemaphoreImplMutex - 线程 3 执行结束!* 23:05:09.200 [线程 2] INFO com.itan.syncutil.SemaphoreImplMutex - 线程 2 正在执行!* 23:05:10.201 [线程 2] INFO com.itan.syncutil.SemaphoreImplMutex - 线程 2 执行结束!*/

4、Semaphore原理(部分源码)

public class Semaphore implements java.io.Serializable {//实现自己的内部同步类private final Sync sync;//内部类,继承自AQSabstract static class Sync extends AbstractQueuedSynchronizer {//版本号private static final long serialVersionUID = 1192457210091910933L;//构造函数Sync(int permits) {//设置状态数,通过AbstractQueuedSynchronizer的状态机制实现信号量的设置setState(permits);}//获取许可final int getPermits() {return getState();}//共享模式下非公平策略获取final int nonfairTryAcquireShared(int acquires) {for (;;) {//获取许可数int available = getState();//剩余许可数int remaining = available - acquires;//许可小于0或者比较并且设置状态成功if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}//共享模式下进行释放protected final boolean tryReleaseShared(int releases) {for (;;) {//获取许可int current = getState();//可用的许可int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");//比较并进行设置成功if (compareAndSetState(current, next))return true;}}//根据指定的缩减量减小可用许可的数目final void reducePermits(int reductions) {for (;;) {//获取许可int current = getState();//可用的许可int next = current - reductions;if (next > current) // underflowthrow new Error("Permit count underflow");//比较并进行设置成功if (compareAndSetState(current, next))return;}}//获取并返回立即可用的所有许可final int drainPermits() {for (;;) {//获取许可int current = getState();//许可为0或者比较并设置成功if (current == 0 || compareAndSetState(current, 0))return current;}}}/*** NonfairSync类继承了Sync类,表示采用非公平策略获取资源,其只有一个tryAcquireShared方法,重写了AQS的该方法* 说明: 从tryAcquireShared方法的源码可知,其会调用父类Sync的nonfairTryAcquireShared方法,表示按照非公平策略进行资源的获取*/static final class NonfairSync extends Sync {private static final long serialVersionUID = -2694183684443567898L;NonfairSync(int permits) {super(permits);}protected int tryAcquireShared(int acquires) {//共享模式下获取return nonfairTryAcquireShared(acquires);}}/*** FairSync类继承了Sync类,表示采用公平策略获取资源,其只有一个tryAcquireShared方法,重写了AQS的该方法* 说明: 从tryAcquireShared方法的源码可知,它使用公平策略来获取资源,它会判断同步队列中是否存在其他的等待节点*/static final class FairSync extends Sync {private static final long serialVersionUID = 2014338818796000944L;FairSync(int permits) {super(permits);}protected int tryAcquireShared(int acquires) {for (;;) {//同步队列中存在其他节点if (hasQueuedPredecessors())return -1;//获取许可int available = getState();//剩余的许可int remaining = available - acquires;//剩余的许可小于0或者比较设置成功if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}}//从此信号量中获取一个许可,在提供一个许可前一直将线程阻塞public void acquire() throws InterruptedException {//调用原生的AbstractQueuedSynchronizersync.acquireSharedInterruptibly(1);}//释放一个许可,将其返回给信号量public void release() {sync.releaseShared(1);}
}

Published by

风君子

独自遨游何稽首 揭天掀地慰生平