Java同步组件之CountDownLatch,Semaphore

Java同步组件概况

  • CountDownLatch : 是闭锁,通过一个计数来保证线程是否一直阻塞
  • Semaphore: 控制同一时间,并发线程数量
  • CyclicBarrier:字面意思是回环栅栏,通过它可以实现让一组线程等待至某个状态之后再全部同时执行。
  • ReentrantLock:是一个重入锁,一个线程获得了锁之后仍然可以反复加锁,不会出现自己阻塞自己的情况。
  • Condition:配合ReentrantLock,实现等待/通知模型
  • FutureTask:FutureTask实现了接口Future,同Future一样,代表异步计算的结果。

CountDownLatch 同步辅助类

CountDownLatch类位于java.util.concurrent包,利用它可以实现类似计数器的功能,比如有一个任务A,它要等待其它4个任务执行完毕后才能执行,此时就可以使用CountDownLatch来实现这种功能。

image-20210115093334548

假设计数器的值是3,线程A调用await()方法后,当前线程就进入了等待状态,之后其它线程中执行CountDownLatch,计数器就会减1,当计数器从3变成0,线程A继续执行,CountDownLatch这个类可以阻塞当前线程,保证线程在某种条件下,继续执行。

构造器中的计数值(count)实际上就是闭锁需要等待的线程数量,这个值只能被设置一次,而且CountDownLatch没有提供任何机会修改这个计数值。

CountDownLatch代码案例

package com.rumenz.task;import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class CountDownLatchTest {public static void main(String[] args) throws Exception {ExecutorService executorService = Executors.newCachedThreadPool();CountDownLatch countDownLatch=new CountDownLatch(2);executorService.execute(()->{try{Thread.sleep(3000);System.out.println("任务一完成");}catch (Exception e){e.printStackTrace();}countDownLatch.countDown();});executorService.execute(()->{try{Thread.sleep(5000);System.out.println("任务二完成");}catch (Exception e){e.printStackTrace();}countDownLatch.countDown();});countDownLatch.await();//所有子任务执行完后才会执行System.out.println("主线程开始工作.....");executorService.shutdown();}
}任务一完成
任务二完成
主线程开始工作.....

CountDownlatch指定时间完成任务,如果在规定时间内完成,则等待之前的等待线程(countDownLatch.await())继续执行

countDownLatch.await(int timeout,TimeUnit timeUnit);设置,第一个参数没超时时间,第二个参数为时间单位。

package com.rumenz.task;import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;public class CountDownLatchTest {public static void main(String[] args) throws Exception {ExecutorService executorService = Executors.newCachedThreadPool();CountDownLatch countDownLatch=new CountDownLatch(2);executorService.execute(()->{try{Thread.sleep(3000);System.out.println("任务一完成");}catch (Exception e){e.printStackTrace();}countDownLatch.countDown();});executorService.execute(()->{try{Thread.sleep(5000);System.out.println("任务二完成");}catch (Exception e){e.printStackTrace();}countDownLatch.countDown();});//这里只等3秒countDownLatch.await(3, TimeUnit.SECONDS);//所有子任务执行完后才会执行System.out.println("主线程开始工作.....");executorService.shutdown();}
}
//任务一完成
//主线程开始工作.....
//任务二完成

Semaphore控制线程数量

Semaphore经常用于限制获取某种资源的线程数量,其内部是基于AQS的共享模式,AQS的状态可以表示许可证的数量,许可证数量不够线程被挂起;而一旦有一个线程释放资源,那么可唤醒等待队列中的线程继续执行。

5b4741030001ee2d19201080.jpg (1920×1080)

Semaphore翻译过来就是信号量,Semaphore可以阻塞进程并控制同时访问的线程数,通过acquire()获取一个许可,如果没有就等待,而release()释放一个许可,Semaphore有点类似锁。

CountDownLatchSemaphore在使用时,通过和线程池配合使用。
Semaphore适合控制并发,CountDownLatch比较适合保证线程执行完后再执行其它处理,因此模拟并发两者结合最好。

Semaohore应用场景

Semaphore适合做流量控制,特别是共享的有限资源,比如数据库连接,假如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程并发的读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这时我们必须控制只有十个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。这个时候,我们就可以使用Semaphore来做流控。

package com.rumenz.task;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;public class SemaphoreExample1 {private static Integer clientTotal=30;private static Integer threadTotal=3;public static void main(String[] args) throws Exception {ExecutorService executorService = Executors.newCachedThreadPool();Semaphore semaphore=new Semaphore(threadTotal);for (int i = 0; i < clientTotal; i++) {final Integer j=i;executorService.execute(()->{try{semaphore.acquire(); // 获取一个许可update(j);semaphore.release(); // 释放一个许可}catch (Exception e) {e.printStackTrace();}});}executorService.shutdown();}private static void update(Integer j) throws Exception {System.out.println(j);Thread.sleep(2000);}
}

每2秒打印3个数字。

package com.rumenz.task;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;public class SemaphoreExample1 {private static Integer clientTotal=30;private static Integer threadTotal=3;public static void main(String[] args) throws Exception {ExecutorService executorService = Executors.newCachedThreadPool();Semaphore semaphore=new Semaphore(threadTotal);for (int i = 0; i < clientTotal; i++) {final Integer j=i;executorService.execute(()->{try{semaphore.acquire(3); // 获取多个许可update(j);semaphore.release(3); // 释放多个许可}catch (Exception e) {e.printStackTrace();}});}executorService.shutdown();}private static void update(Integer j) throws Exception {System.out.println(j);Thread.sleep(2000);}
}

每2秒打印一个数字。

tryAcquire

尝试获取许可,如果获取不成功,则放弃操作,tryAcquire方法提供几个重载

  • tryAcquire() : boolean
  • tryAcquire(int permits) : boolean 尝试获取指定数量的许可
  • tryAcquire(int permits,long timeout,TimeUnit timeUnit) : boolean
  • tryAcquire(long timeout,TimeUnit timeUnit) : boolean 尝试获取许可的时候可以等待一段时间,在指定时间内未获取到许可则放弃

Semaphore源码分析

Semaphore有两种模式,公平模式和非公平模式。公平模式就是调用acquire的顺序就是获取许可证的顺序,遵循FIFO;而非公平模式是抢占式的,也就是有可能一个新的获取线程恰好在一个许可证释放时得到了这个许可证,而前面还有等待的线程。

// 非公平模式
public Semaphore(int permits) {sync = new NonfairSync(permits);
}
// fair=true为公平模式,false=非公平模式
public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

public class Semaphore implements java.io.Serializable {/** 只指定许可量,构造不公平模式*/public Semaphore(int permits) {sync = new NonfairSync(permits);}/** 指定许可量,并指定模式*/public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);}//Semaphore内部基于AQS的共享模式,所以实现都委托给了Sync类。 abstract static class Sync extends AbstractQueuedSynchronizer {}/*** NonFair version*/static final class NonfairSync extends Sync {private static final long serialVersionUID = -2694183684443567898L;NonfairSync(int permits) {// 可以看到调用了setState方法,也就是说AQS中的资源就是许可证的数量。super(permits);}protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);}}/*** Fair version*/static final class FairSync extends Sync {private static final long serialVersionUID = 2014338818796000944L;FairSync(int permits) {// 可以看到调用了setState方法,也就是说AQS中的资源就是许可证的数量。super(permits);}protected int tryAcquireShared(int acquires) {for (;;) {if (hasQueuedPredecessors())return -1;int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}}}

关注微信公众号:【入门小站】,解锁更多知识点

Published by

风君子

独自遨游何稽首 揭天掀地慰生平

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注