一、CountDownLatch
1、概述
1、CountDownLatch是一个同步辅助类,直译就是倒计数(CountDown)门闩(Latch)。倒计数不用说,门闩的意思就是阻止执行。在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。
用给定的计数初始化CountDownLatch。由于调用了countDown()方法,所以在当前计数到达零之前,await()方法会一直受阻塞。之后会释放所有等待的线程,await()的所有后续调用都将立即返回。
2、主要方法:
-
CountDownLatch(int count)
:构造一个用给定计数初始化的CountDownLatch。
-
void await()
:当一个或多个线程调用此方法时,调用线程就会被阻塞,直到锁存器倒计时到零结束等待(或者线程被中断)。
-
void countDown()
:减少锁存器的计数,如果计数达到零,因调用await方法被阻塞的线程会被唤醒继续执行。如果当前计数大于零,则递减。调用此方法的线程不会被阻塞。
-
long getCount()
:返回当前计数。
3、应用场景
-
在一些应用中,需要等待某个条件达到要求后才能做后面的事情;同时当所有线程都完成后,也会触发事件,以便进行后面的操作。
2、使用示例
1、不使用CountDownLatch的情况下,主线程会提前执行完,其他线程还在执行。
public class CountDownLatchTest {public static void main(String[] args) throws InterruptedException {for (int i = 1; i <= 5; i++) {final int name = i;new Thread(() -> {System.out.println(Thread.currentThread().getName() + "到达目的地,等待出发");}, "兄弟" + name).start();}System.out.println("人到齐,乘车出发");}
}
2、使用CountDownLatch
public class CountDownLatchTest {public static void main(String[] args) throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(5);for (int i = 1; i <= 5; i++) {final int name = i;new Thread(() -> {System.out.println(Thread.currentThread().getName() + "到达目的地,等待出发");countDownLatch.countDown();}, "兄弟" + name).start();}countDownLatch.await();System.out.println("人到齐,乘车出发");}
}
3、CountDownLatch原理(部分源码)
public class CountDownLatch {private static final class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 4982264981922014374L;Sync(int count) {setState(count);}int getCount() {return getState();}protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}protected boolean tryReleaseShared(int releases) {for (;;) {int c = getState();if (c == 0)return false;int nextc = c-1;if (compareAndSetState(c, nextc))return nextc == 0;}}}private final Sync sync;public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");this.sync = new Sync(count);}public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);}public void countDown() {sync.releaseShared(1);}
}
二、CyclicBarrier
1、概述
1、CyclicBarrier是一个同步辅助类,翻译过来叫循环栅栏、循环(cyclic)屏障(barrier)。它允许一组线程到达某个公共屏障点(common barrier point)时被阻塞,直到最后一个线程到达屏障时,所有被屏障拦截的线程才会继续执行。
2、主要方法:
-
CyclicBarrier(int parties)
:创建一个新的CyclicBarrier ,它将在给定数量的参与方(线程)等待它时触发,并且在触发障碍时不执行预定义的操作。
-
CyclicBarrier(int parties, Runnable barrierAction)
:创建一个新的CyclicBarrier ,当给定数量的参与方(线程)正在等待它时,它将触发,并且当障碍被触发时,它将执行给定的屏障动作,由最后一个进入屏障的线程执行。
-
int await()
:在所有参与者(线程)都已经在此屏障上调用await方法之前,将一直等待。
-
int await(long timeout, TimeUnit unit)
:在所有参与者(线程)都已经在此屏障上调用await方法之前将一直等待,或者超出了指定的等待时间。
-
int getNumberWaiting()
:返回当前在屏障处等待的参与者(线程)数目。
-
int getParties()
:返回触发此障碍所需的参与方(线程)数目。
-
boolean isBroken()
:查询此屏障是否处于损坏状态。
-
void reset()
:将屏障重置为其初始状态。如果任何一方当前在屏障处等待,他们将返回BrokenBarrierException 。请注意,由于其他原因发生破损后的重置可能会很复杂;线程需要以其他方式重新同步,并选择一个执行重置。相反,最好为后续使用创建一个新的屏障。
2、使用示例
public class CyclicBarrierTest {private static ConcurrentHashMap<Integer, Integer> map = new ConcurrentHashMap<>();private static AtomicInteger sum = new AtomicInteger(0);public static void main(String[] args) {CyclicBarrier barrier = new CyclicBarrier(3, new Thread(() -> {System.out.println("子任务全都执行完成,开始执行汇总结果...");sum.getAndAccumulate(map.values().stream().reduce(Integer::sum).get(), Integer::sum);System.out.println("总结果为:" + sum.get());}));WorkTask workTask1 = new WorkTask(barrier, 893);WorkTask workTask2 = new WorkTask(barrier, 1894);WorkTask workTask3 = new WorkTask(barrier, 3005);workTask1.start();workTask2.start();workTask3.start();}static class WorkTask extends Thread {private CyclicBarrier barrier;private Integer userId;public WorkTask(CyclicBarrier barrier, Integer userId) {this.barrier = barrier;this.userId = userId;}@Overridepublic void run() {try {int num = userId + 1000;System.out.println("用户 " + userId + " 线程开始统计数据,结果为:" + num);map.put(userId, num);barrier.await();System.out.println("用户 " + userId + " 线程等待结束之后继续执行其他操作!!!");} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}}
}
3、与CountDonwLatch对比
1、CountDownLatch减计数,CyclicBarrier加计数。
2、CountDownLatch是一次性的,CyclicBarrier可以重用。
3、对于CountDownLatch来说,重点是那个“一个线程”在等待,而另外那N个线程在执行完成之后可以继续等待,可以终止。
4、对于CyclicBarrier来说,重点是那N个线程,它们之间任何一个没有完成,所有线程必须等待。
4、CyclicBarrier原理(部分源码)
1、它的实现原理:利用ReentrantLock做线程安全锁,实现线程安全等待,部分源码如下:
public class CyclicBarrier {private final ReentrantLock lock = new ReentrantLock();private final Condition trip = lock.newCondition();private final int parties;private final Runnable barrierCommand;public int await() throws InterruptedException, BrokenBarrierException {try {return dowait(false, 0L);} catch (TimeoutException toe) {throw new Error(toe); }}private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {final ReentrantLock lock = this.lock;lock.lock();try {final Generation g = generation;if (g.broken)throw new BrokenBarrierException();if (Thread.interrupted()) {breakBarrier();throw new InterruptedException();}int index = --count;if (index == 0) {boolean ranAction = false;try {final Runnable command = barrierCommand;if (command != null)command.run();ranAction = true;nextGeneration();return 0;} finally {if (!ranAction)breakBarrier();}}for (;;) {try {if (!timed)trip.await();else if (nanos > 0L)nanos = trip.awaitNanos(nanos);} catch (InterruptedException ie) {if (g == generation && ! g.broken) {breakBarrier();throw ie;} else {Thread.currentThread().interrupt();}}if (g.broken)throw new BrokenBarrierException();if (g != generation)return index;if (timed && nanos <= 0L) {breakBarrier();throw new TimeoutException();}}} finally {lock.unlock();}}
}
三、Semaphore
1、概述
1、Semaphore是一个计数信号量,利用它可以控制一定数量的请求,从而实现资源访问限制的目的,实际应用中,可以用来限制访问某种资源的数量,比如在Hystrix中就有基于Semaphore的资源隔离策略。
2、最简单的理解信号量就是,一个计数器、一个等待队列、两个方法(在Java实现的Semaphore中就是acquire和release)。
-
调用一次acquire方法,计数器就减1,如果此时计数器小于0,则阻塞当前线程,否则当前线程可继续执行。
-
调用一次release方法,计数器就加1,如果此时计数器小于等于0,则唤醒一个等待队列中的线程,并将其中等待队列中移除。
3、主要方法:
-
Semaphore(int permits)
:创建具有给定的许可数量和给定的非公平设置的Semaphore数量。
-
Semaphore(int permits, boolean fair)
:创建具有给定的许可数量和给定的公平设置的Semaphore数量。所谓的公平性就是是否先进来的先释放,默认为非公平的
。
-
void acquire()/void acquire(int permits)
:从信号量获取一个或多个许可,如果一个可用并立即返回,将可用许可的数量减少一个,如果没有可用的许可,则当前线程出于线程调度目的而被禁用并处于休眠状态。
-
void release()/void release(int permits)
:释放一个或多个许可证,将其返回给信号量。
-
int availablePermits()
:返回信号量中当前可用的许可数。
-
int drainPermits()
:获取并返回立即可用的所有许可。
-
int getQueueLength()
:返回正在等待获取的线程数的估计值。该值只是一个估计值,因为在此方法遍历内部数据结构时线程数可能会动态变化。此方法设计用于监控系统状态,而不是用于同步控制。
-
boolean hasQueuedThreads()
:查询是否有线程正在等待获取。请注意,因为取消可能随时发生,所以true的返回并不能保证任何其他线程将永远获取。此方法主要设计用于监控系统状态。
-
void reducePermits(int reduction)
:根据指定的缩减量减少可用许可证的数量。
-
boolean isFair()
:如果此信号量的公平性设置为true,则返回true 。
-
boolean tryAcquire()
:仅当调用时可用时,才从此信号量获取许可。获取一个许可,如果一个可用并立即返回,值为true ,将可用许可的数量减少一个。如果没有可用的许可,则此方法将立即返回值false。
2、使用示例
@Slf4j
public class SemaphoreTest {public static void main(String[] args) {Semaphore semaphore = new Semaphore(3);for (int i = 1; i <= 6; i++) {new Thread(() -> {try {semaphore.acquire();log.info("车辆 " + Thread.currentThread().getName() + " 抢到车位");TimeUnit.SECONDS.sleep(3);semaphore.release();log.info("车辆 " + Thread.currentThread().getName() + " 离开车位");} catch (InterruptedException e) {e.printStackTrace();}}, i + "").start();}}
}
3、Semaphore实现互斥锁
1、如果Semaphore初始化时设置为1,则和synchronized效果类似
@Slf4j
public class SemaphoreImplMutex {private static final Semaphore semaphore = new Semaphore(1);public static void main(String[] args) {SemaphoreImplMutex semaphoreMutex = new SemaphoreImplMutex();for (int i = 1; i < 4; i++) {new Thread(semaphoreMutex::method, "线程 " + i).start();}}public void method() {try {semaphore.acquire();log.info(Thread.currentThread().getName() + " 正在执行!");Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();} finally {semaphore.release();log.info(Thread.currentThread().getName() + " 执行结束!");}}
}
4、Semaphore原理(部分源码)
public class Semaphore implements java.io.Serializable {private final Sync sync;abstract static class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 1192457210091910933L;Sync(int permits) {setState(permits);}final int getPermits() {return getState();}final int nonfairTryAcquireShared(int acquires) {for (;;) {int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();int next = current + releases;if (next < current) throw new Error("Maximum permit count exceeded");if (compareAndSetState(current, next))return true;}}final void reducePermits(int reductions) {for (;;) {int current = getState();int next = current - reductions;if (next > current) throw new Error("Permit count underflow");if (compareAndSetState(current, next))return;}}final int drainPermits() {for (;;) {int current = getState();if (current == 0 || compareAndSetState(current, 0))return current;}}}static final class NonfairSync extends Sync {private static final long serialVersionUID = -2694183684443567898L;NonfairSync(int permits) {super(permits);}protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);}}static final class FairSync extends Sync {private static final long serialVersionUID = 2014338818796000944L;FairSync(int permits) {super(permits);}protected int tryAcquireShared(int acquires) {for (;;) {if (hasQueuedPredecessors())return -1;int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}}public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);}public void release() {sync.releaseShared(1);}
}