14并发

1
2
3
4
5
6
7
8
public interface Runnable
{
void run();
}

Runnable r =()->{ task code };
Thread t=new Thread(r);
t.start();//直接调用run方法不会启动新线程

中断线程

线程中断是为了引起线程的注意,不一定要结束线程。重要线程可处理异常后,继续执行。

interrupt()调用后,线程中断状态为true。
当线程被阻塞(这里阻塞是指调用sleep或wait)时调用interrupt方法,阻塞会被Interrupted Exception中断。

islnterrupted() 检查是否被中断
islnterrupted() 检测当前的线程是否被中断
static boolean interrupted() 检测当前的线程是否被中断,且中断状态重置为false

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//错误做法
void mySubTask()
{
try{sleep(delay);}
catch(InterruptedException e){ }//Don'tignore!
}
//改进1

void mySubTask()
{
try{sleep(delay);}
catch(InterruptedException e)
{Thread.currentThread().interrupt();}//设置中断,调用者可以对其进行检测
}
//改进2
void mySubTask () throws InterruptedException//抛出异常,调用者可以捕获中断异常
{
...
sleep(delay);
...
}

线程状态

  • new 新创建
  • Runable 可运行
  • Blocked 被阻塞
  • Waiting 等待
  • Timed waiting 计时等待
  • Terminated 被终止

新创建线程

new操作符创建了线程,未开始运行。

可运行线程

调用start方法,线程处于runnable状态
可运行的线桿可能正在运行也可能没有运行,取决于操作系统给线程提供运行的时间。
抢占式调度,一个时间片用完,操作系统剥夺线程运行权,选择优先级高的线程执行。

被阻塞线程和等待线程

  • Blocking 线程1试图获取一个锁(非javiutiUoncurrent库中的锁),此锁被其他线程占有,此时线程1进入阻塞状态。
  • Waiting 当线程等待另一个线程通知调度器一个条件时,它自己进入等待状态。(调用Object.wait方法或Thread.join方法,或者是等待java,util.concurrent库中的Lock或Condition时)
  • Time waiting 带有超时参数的方法调用时,Thread.sleep和Object.wait、Thread.join、Lock,tryLock以及Condition.await的计时版

被终止的线程

  • run方法结束,正常退出
  • 没有捕获的异常终止了run方法,意外死亡

线程属性

线程优先级、守护线程、线程组以及处理未捕获异常的处理器

线程优先级

setPriority()
static void yield()当前执行线程处于让步状态。如果有其他同优先级或跟高优先级的可运行线程,那么这些线程接下来会被调度。
MIN_PRIORITY 1 , MAX_PRIORITY 10 , NORM_PRIORITY 5
如果有几个高优先级的线程没有进入非活动状态,低优先级的线程可能永远也不能执行。

守护线程

唯一用途:为其他线程提供服务,只剩下守护线程时
守护线程任何时候甚至在一个操作的中间发生中断,不应访问固有资源(文件、数据库)。
调用t.setDaemon(true);将线程转换为守护线程(线程启动之前调用)

未捕获异常处理器

线程的run方法不能抛出受查异常,非受査异常会导致线程终止。
不需要任何catch子句来处理可以被传播的异常,线程死亡之前,异常被传递到一个用于未捕获异常的处理器。

异常处理器必须实现Thread.UncaughtExceptionHandler接口
interface UncaughtExceptionHandler{
void uncaughtException(Threadt,Throwable e)
}

安装异常处理器

  1. setUncaughtExceptionHandler
  2. Thread类的静态方法setDefaultUncaughtExceptionHandler为所有线程安装一个默认的处理器

如果不为独立的线程安装处理器,此时的处理器就是该线程的ThreadGroup对象,ThreadGroup类实现Thread.UncaughtExceptionHandler接口。它的uncaughtException方法做如下操作:

  1. 如有父线程组,那么父线程组的uncaughtException方法被调用。
  2. 否则,调用Thread.getDefaultExceptionHandler 获取默认处理器,非空则调用此默认处理器
  3. 否则,如果Throwable是ThreadDeath的一个实例,什么都不做。
  4. 否则,线程的名字以及Throwable的栈轨迹被输出到System.err上。

同步

锁对象

两种机制

  • synchronized关键字
  • ReentrantLock类
    1
    2
    3
    4
    5
    6
    7
    ReentrantLock lock = new ReentrantLock();
    lock.lock();
    try {
    //do something
    }finally {
    lock.unlock();
    }
1
2
3
4
5
6
7
java.util.concurrent.locks.Lock5.0
void lock()
void unlock()

java,util.concurrent.locks.ReentrantLock5.0
ReentrantLock()
ReentrantLock(boo1ean fair)构建一个带有公平策略的锁。一个公平锁偏爱等待时间最长的线程。但这一公平的保证将大大降低性能。所以,默认情况下,锁没有被强制为公平的。

注:
听起来公平锁更合理一些,但是使用公平锁比使用常规锁要慢很多。只有当你确实了解自己要做什么并且对于你要解决的问题有一个特定的理由必须使用公平锁的时候,才可以使用公平锁。即使使用公平锁,也无法确保线程调度器是公平的。如果线程调度器选择忽略一个线程,而该线程为了这个锁已经等待了很长时间,那么就没有机会公平地处理这个锁了。

条件对象

使用一个条件对象来管理那些已经获得了一个锁但是却不能做有用工作的线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class Bank
{
private Condition sufficientFunds;
publicBank()
{
sufficientFunds = bankLock.newCondition();//条件对象
}

publicvoidtransfer(intfrom,intto,intamount)
{
bankLock.lock();
try {
while(accounts[from]<amount)
sufficientFunds.await();//余额不足,阻塞等待 并放弃锁,进入等待队列,等待其他线程调用signalALl或signal
//transferfunds
sufficientFunds.signalAll();//
}
finally{
bankLock.unlock();
}
}
}

signalAll() 重新激活因为这一条件而等待的所有线程,线程从等待集当中移出(解除阻塞),再次成为可运行的,获取到锁后继续执行。
signal()随机解除等待集中某个线程的阻塞状态

synchronized

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public synchronized void method()
{
//methodbody
}

//等价于

public void method()
{
this.intrinsidock.1ock();
try
{
//methodbody
}
finally{this.intrinsicLock.unlock();}
}

条件阻塞举例

1
2
3
4
5
6
7
8
9
class Bank
{
public synchronized void func1() throws InterruptedException{
if(condition){
wait()
}
notifyAll();
}
}

静态synchronized方法将锁住整个类

同步阻塞

synchronized(obj)//this is the syntax for a synchronizedb lock
{

//critical section

}

Volatile域

volatile关键字为实例域的同步访问提供了一种免锁机制
声明一个域为volatile,编译器和虚拟机就知道该域可能被另一个线程并发更新

  • 内存可见
  • 禁止指令重排 (volatile变量的写操作,保证是在读操作之前完成)
  • 赋值原子性

应用:

  • 多线程标志位
  • CAS

final变量

final Map<String,Double> accounts = new HashKap<>();
其他线程在构造函数完成构造后才看到accounts变量。
如果不使用final,不能保证其他线程看到的是accounts更新后的值,它们可能看到null,而不是新构造的HashMap

原子性

java.util.concurrent.atomic包中有很多类使用了很高效的机器级指令来保证操作的原子性。
如:Atomiclnteger.incrementAndGetAtomiclnteger.decrementAndGet 自增自减
incrementAndGet 获得值、增1并设置然后生成新值的操作不会中断。

如果有大量线程要访问相同的原子值,性能会大幅下降,因为乐观更新需要太多次重试。
JavaSE8提供了LongAdderLongAccumulator类来解决这个问题。
LongAdder包括多个变量(加数),其总和为当前值。可以有多个线程更新不同的加数,线程个数增加时会自动提供新的加数。通常情况下,只有当所有工作都完成之后才需要总和的值,对于这种情况,这种方法会很高效。性能会有显著的提升。

1
2
3
4
5
6
7
8
final LongAdder adder=new LongAdder();
for(...)
pool.submit(()->{
while(...){
if(...) adder.increment();//increment 自增1 add(l) 增加任意值
}
});
long total=adder.sum();

LongAccumulator将这种思想推广到任意的累加操作

1
2
3
LongAccumulator adder=new LongAccumulator(Long::sum,0); //可选择不同的操作,且满足结合律和交换律。
//Insomethread...
adder.accumulate(value);

死锁

线程局部变量

为每个线程构造一个实例:
public static final ThreadLocal<SimpleDateFormat> dateFormat =ThreadLocal.withInitial(()->new SimpleDateFormat("yyyy-MM-dd"));

java.util.Rand0m类是线程安全的,但如果多线程等待一个随机数生成器,很低效。
可以使用ThreadLocal辅助类为各个线程提供一个单独的生成器,还可以使用Java提供的一个便利类:
ThreadLocalRandom,ThreadLocalRandom.current()调用会返回特定于当前线程的Random类实例

锁测试与超时

1
2
3
4
5
6
7
8
if(myLock.tryLock())
{
//now the thread owns the lock
try{...}
finally{myLock.unlock();}
}
else
//do something else

读/写锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private ReentrantReadWriteLock rwl=new ReentrantReadWriteLock();
private Lock readLock=rwl.readLock();
private Lock writeLock=rwl.writeLock();

public double getTotalBalance()
{
readLock.lock();//多线程可冲入读锁,但排斥写锁
try{...}
finally{readLock.unlock();}
}

public void transfer(...)
{
writeLock.lock();//排斥其他读锁和写锁
try{...}
finally{writeLock.unlock();}
}

为什么弃用stop和suspend方法

stop:当线程要终止另一个线程时,无法知道什么时候调用stop方法是安全的,什么时候导致对象被破坏。因此,该方法被弃用了。在希望停止线程的时候应该中断线程,被中断的线程会在安全的时候停止。
suspend:容易引起死锁,被挂起的线程等着被恢复,而将其挂起的线程等待获得锁。

阻塞队列

当试图向队列添加元素而队列已满,或是想从队列移出元素而队列为空的时候,阻塞队列(blockingqueue)导致线程阻塞。

put,take 满或空时阻塞
add,remove,element 空时异常
offer,poll(移除返回),peek(只返回) 空时 返回false,null,null

java.util.concurrent包提供了阻塞队列的几个变种:

  • LinkedBlockingQueue 容量无上界,也可选择最大容量 LinkedBlockingDeque 双端队列版本
  • ArrayBlockingQueue 构造时制定容量,可设置公平性
  • PriorityBlockingQueue 带优先级的队列,而不是先进先出队列。按照它们的优先级顺序被移出,容量无上限
  • DelayQueue
    1
    2
    3
    4
    interface Delayed extends Comparable<Delayed>
    {
    long getDelay(TimeUnitunit); //返回对象的残留延迟,负值表示延迟结束,可移除
    }
  • LinkedTransferQue implements TranSferQueue 允许生产者线程等待,直到消费者准备就绪可以接收一个元素。 q.transfer(item); 阻塞直到另一个线程将元素(item)删除。

线程安全的集合

高效的映射、集和队列

java.util.concurrent包提供了映射、有序集和队列的高效实现:

  • ConcurrentHashMap
  • ConcurrentSkipListMap key有序,跳表实现,非并发使用TreeMap,低并发可使用包装TreeMapCollections.synchronizedSortedMap,高并发使用ConcurrentSkipListMap
  • ConcurrentSkipListSet 有序,基于SkipList的集合
  • ConcurrentLinkedQueue 一个基于链接节点的无界线程安全队列

JavaSE8引入了一个mappingCount方法可以把大小作为long返回(元素过多,int范围小)。

返回弱一致性(weaklyconsistent)的迭代器。迭代器不一定能反映出它们被构造之后的所有的修改,但是,它们不会将同一个值返回两次,也不会拋出ConcurrentModificationException异常。
ConcurrentHashMap 默认支持16个写线程并发执行,多余的将阻塞。

在JavaSE8中,并发散列映射将桶组织为树,防止因大量相同散列码的值而降低性能

映射条目的原子更新

1
2
3
4
5
6
7
map.putlfAbsent(word,newLongAdder());// 返回oldValue
map.get(word).increment();


map.compute(word,(k,v)->v = null ? 1 : v+1);//调用compute方法时可以提供一个键和一个计算新值的函数。

map.merge(word, 1L ,(existingValue,newValue) -> existingValue + newValue); 这个方法有一个参数表示键不存在时使用的初始值。否则,就会调用你提供的函数来结合原值与初始值。

传入compute或merge的函数返回null,将从映射中删除现有的条目
注:使用compute或merge时,提供的函数不能做太多工作。因为函数运行时,可能会阻塞对映射的其他更新。且这个函数也不能更新映射的其他部分。

对并发散列映射的批操作

三种操作:

  • 搜索
  • 规约
  • foreach
    每个操作四个版本
  • operationKeys:处理键。
  • operatioriValues:处理值。
  • operation:处理键和值。
  • operatioriEntries:处理Map.Entry对象
    阈值参数:
    期望单线程处理,阈值设置Long.MAX_VALUE
    期望多线程处理,阈值设置1

搜索

1
2
3
4
UsearchKeys(long threshold,BiFunction<?super K,?extends U>f)
UsearchVaiues(long threshold,BiFunction<?super V,?extends U>f)
Usearch(long threshold,BiFunction<?superK,?super V,?extends U>f)
UsearchEntries(long threshold,BiFunction<Map.Entry <K,V>,?extends U>f)

第一个val大于1000的值
String result=map.search(threshold,(k,v)->v>1000?k:null);

forEach

1
2
3
4
5
6
7
8
map.forEach(threshold,(k,v)->System.out.println(k+"->"+v));
map.forEach(threshold,
(k,v) -> k + "->" + v, //Transformer
System.out::println); //Consume

map.forEach(threshold,
(k,v) -> v > 1000 ? k + "->" + v : null, //Filterandtransformer
System.out::println); //The nulls are not passed to the consumer

reduce

1
2
3
4
5
6
7
8
9
10

Long sum=map.reduceValues(threshold,Long::sum);

Integer maxlength=map.reduceKeys(threshold,
String::length, //Transformer
Integer::max); //Accumulator

Longcount=map.reduceValues(threshold,
v -> v > 1000 ? 1L : null,//
Long::sum);// v大于1000的个数
1
2
3
4
5
对于int、long和double输出还有相应的特殊化操作,分别有后缀Tolnt、ToLong和ToDouble。需要把输入转换为一个基本类型值,并指定一个默认值和一个累加器函数。映射为空时返回默认值。
long sum=map.reduceValuesToLong(threshold,
Long::longValue,//Transformer to primitive type
0,//Default value for empty map
Long::sum);//Primitive type accumulator

并发集视图

1
2
3
4
5
6
7
8
9
ConcurrentHashMap<String, String> hashMap = new ConcurrentHashMap<String, String>();
hashMap.put("11", "1");
hashMap.put("12", "1");
hashMap.put("13", "1");
System.out.println(hashMap);// {11=1, 12=1, 13=1}
ConcurrentHashMap.KeySetView<String, String> keySetView = hashMap.keySet("ss");
keySetView.add("21");
keySetView.remove("12");
System.out.println(hashMap);//{11=1, 13=1, 21=ss}

写数组的拷贝

{todo}
CopyOnWriteArrayList
CopyOnWriteArraySet

并行数组算法

parallelSort

1
2
3
4
String[] arr = new String[]{"1", "333", "22", "12345"};
Arrays.parallelSort(arr, 0, 2, Comparator.comparing(String::length, Comparator.reverseOrder()));
Arrays.stream(arr).forEach(System.out::println);
//333 1 22 12345

parallelSetAll

1
2
3
4
Arrays.parallelSetAll(arr, x -> {//x 为索引
System.out.println(x);
return arr[x];
});

parallelPrefix

1
2
3
4
5
String[] arr = new String[]{"1", "333", "22", "12345"};
Arrays.parallelPrefix(arr, (x, y) -> {
return x + y;
});
Arrays.stream(arr).forEach(System.out::println); // 1 1333 133322 13332212345

较早的线程安全集合

已弃用的线程安全的动态数组和散列表 Vector Hashtable
取而代之的是非线程安全的AnayList和HashMap类,可通过同步包装器变成线程安全的

1
2
List<E>synchArrayList=Collections,synchronizedList(newArrayList<E>());
Map<K,V>synchHashMap=Col1ections.synchronizedMap(newHashMap<K,V>0);

Callable与Future

1
2
3
4
5
6
7
Callable<String> callable = () -> "callable is running";
Runnable runnable = () -> System.out.println("runable is running");
FutureTask futureTask1 = new FutureTask(callable);
FutureTask futureTask2 = new FutureTask(runnable, "result");
Thread thread = new Thread(futureTask1);
thread.start();
System.out.println(futureTask1.get());

执行器

使用线程池的理由

  • 需要大量且生命期短的线程应使用线程池
  • 控制并发线程数数,大量线程导致性能降低
    构建线程池执行器类的静态工厂方法
  • newCachedThreadPool 必要时创建新线程;空闲线程会被保留60秒
  • newFixedThreadPool 该池包含固定数量的线程;空闲线程会一直被保留
  • newSingleThreadExecutor 只有一个线程的“池”,该线程顺序执行每一个提交的任务(类似于Swing事件分配线程)
  • newScheduledThreadPool 用于预定执行而构建的固定线程池,替代java.util.Timer
  • newSingleThreadScheduledExecutor 用于预定执行而构建的单线程“池”

线程池

newCachedThreadPoolnewFixedThreadPoolnewSingleThreadExecutor返回实现了ExecutorService接口的ThreadPoolExecute对象。可使用以下方法提交任务:

  • Future<?>submit(Runnabletask) 返回一个奇怪样子的Future<?>,调用get返回null
  • Future<T>submit(Runnabletask,Tresult) get返回指定的result
  • Future<T>submit(Callable<T>task) get返回计算结构

关闭线程池

  • shutdow 启动该池的关闭序列。被关闭的执行器不再接受新的任务。当所有任务都完成以后,线程池中的线程死亡。
  • shutdownNow 该池取消尚未开始的所有任务并试图中断正在运行的线程

预定执行

ScheduledExecutorService接口具有为预定执行(ScheduledExecution)或重复执行任务而设计的方法。
Executors类的newScheduledThreadPoolnewSingleThreadScheduledExecutor方法将返回实现了Scheduled-ExecutorService接口的对象。

可以预定Runnable或Callable在初始的延迟之后只运行一次。也可以预定一个Runnable对象周期性地运行。

控制任务组

使用执行器控制一组任务

  • invokeAny 提交所有对象到一个Callable对象的集合中,并返回某个已经完成了的任务的结果(无法确定是那个任务的结果,可能是最先完成任务的结果)
  • invokeAll 方法提交所有对象到一个Callable对象的集合中,并返回一个Future对象的列表,代表所有任务的解决方案。
    1
    2
    3
    4
    List<Callab1e<T>> tasks=...;
    List<Future<T>> results = executor.invokeAll(tasks):
    for(Future<T> result : results)
    processFurther(result.get());
    invokeAll缺点:若第一个任务耗时时很多,需要等待
    ExecutorCompletionService来排序改进
    1
    2
    3
    4
    ExecutorCompletionService<T> service = newExecutorCompletionServiceo(executor);
    for(Callable<T> task : tasks) service.submit(task);
    for(int i = 0 ; i < tasks.size();i++)
    processFurther(service.take().get());

ExecutorCompletionService.take 移除下一个已完成的结果,如果没有任何已完成的结果可用则阻塞。
ExecutorCompletionService.poll 移除下一个已完成的结果,如果没有任何已完成结果可用则返回null。

Fork-Join

针对计算密集型任务,有足够多的处理器可并行处理的任务

要采用框架可用的一种方式完成这种递归计算,需要提供一个扩展RecursiveTask的类(如果计算会生成一个类型为T的结果)或者提供一个扩展RecursiveAction的类(如果不生成任何结果)。再覆盖compute方法来生成并调用子任务,然后合并其结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class ForkJoinTest
{
publicstaticvoidmain(String口args)
{
final int SIZE=10000000;
double []numbers=new double[SIZE];
for(inti=0;i<SIZE;i++) numbers[i]=Math.random();
Counter counter=new Counter(numbers,0,numbers.length,x-> x>0.5);
ForkJoinPool pool=new ForkJoinPool();
pool.invoke(counter);
System.out.println(counter.join());
}
}

class Counter extends RecursiveTask<Integer>
{
public static final int THRESHOLD=1000;
private double[]values;
private int from;
private int to;
private DoublePredicate filter;
public Counter(double[]values,int from,int to,DoublePredicate filter)
{
this,values=values;
this,from=from;
this.to=to;
this.filter=filter;
}

protected Integer compute()
{
if(to-from<THRESHOLD)
{
int count=0;
for(int i = from; i < to ; i++)
{
if(fi1ter.test(values[i])) count++;
}
return count;
}
else
{
int mid=(from+to)/2;
Counter first=new Counter(values,from,mid,filter);
Counter second=new Counter(values,mid,to,filter);
invokeAll(first,second);
return first.join()+second.join();
}
}
}

fork-join框架平衡可用线程工作负载的方法:(工作密取work stealing)
每个工作线程有一个双端队列(deque)来完成任务。一个工作线程将子任务压人队列的队头。(只有一个线程可以访问队头,所以不需要加锁。)一个工作线程空闲时,它会从另一个队列的队尾“密取”一个任务。由于大的子任务都在队尾,这种密取很少出现。

可完成Future

CompletableFuture实现了CompletionStage接口和Future接口,处理多任务协同工作

创建异步任务

supplyAsync

supplyAsync(Supplier<U>)
supplyAsync(Supplier<U>,Executor)

runAsync

创建没有返回值的异步任务

获取结果

// 如果完成则返回结果,否则就抛出具体的异常
public T get() throws InterruptedException, ExecutionException

// 最大时间等待返回结果,否则就抛出具体异常
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException

// 完成时返回结果值,否则抛出unchecked异常。为了更好地符合通用函数形式的使用,如果完成此 CompletableFuture所涉及的计算引发异常,则此方法将引发unchecked异常并将底层异常作为其原因
public T join()

// 如果完成则返回结果值(或抛出任何遇到的异常),否则返回给定的 valueIfAbsent。
public T getNow(T valueIfAbsent)

// 如果任务没有完成,返回的值设置为给定值
public boolean complete(T value)

// 如果任务没有完成,就抛出给定异常
public boolean completeExceptionally(Throwable ex)

异步回调处理

thenApply和thenApplyAsync

thenAccept和thenAcceptAsync

thenRun和thenRunAsync

whenComplete和whenCompleteAsync

whenComplete是当某个任务执行完成后执行的回调方法

handle和handleAsync

跟whenComplete基本一致,区别在于handle的回调方法有返回值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("supply");
return "1";
}).thenComposeAsync(x -> {
System.out.println("thenCompose x:" + x);
return CompletableFuture.supplyAsync(() -> {
System.out.println("compose.supply");
return "2";
});
}).thenApply(x -> {
System.out.println("thenApply x:" + x);
return "3";
}).thenAccept(System.out::println).thenRun(() -> {
System.out.println("thenRun");
}).thenApply(x -> "4").whenComplete((x, y) -> {
System.out.println("whenComplete x:" + x + " y:" + y);
}).handle((x, y) -> {
System.out.println("handle x:" + x);
return "5";
});
System.out.println(completableFuture.get());

多任务组合处理

thenCombine、thenAcceptBoth 和runAfterBoth

applyToEither、acceptEither和runAfterEither

allOf / anyOf

同步器

  • CyclicBarrier
    允许线程集等待直至其中预定数目的线程到达一个公共障栅(barrier),然后
    可以选择执行一个处理障栅的动作
    当大量的线程需要在它们的结果可用之前完
    成时
  • Phaser
    类似于循环障栅,不过有一个可变的计数
    JavaSE7中引人
  • CountDownLatch
    允许线程集等待直到计数器减为0
    当一个或多个线程需要等待直到指定数目的事件发生
  • Exchanger
    允许两个线程在要交换的对象准备好时交换对象
    当两个线程工作在同一数据结构的两个实例上的时候,一个向实例添加数据而另一个从实例清除数据
  • Semaphore
    允许线程集等待直到被允许继续运行为止
    限制访问资源的线程总数。如果许可数是1,常常阻塞线程直到另一个线程给出许可为止
  • SynchronousQueue
    允许一个线程把对象交给另一个线程
    在没有显式同步的情况下,当两个线程准备好将一个对象从一个线程传递到另一个时

信号量

倒计时门栓

CountDownLatch 等待计数变为0可继续执行
应用场景例如:多线程准备数据,每个线程完成自己的工作计数器-1,所有线程准备工作完成,计数器变为0,此时处理数据线程开始工作。

栅栏

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
System.out.println("CyclicBarrier");
});
Runnable runnable = () -> {
try {
System.out.println("await start");
cyclicBarrier.await();// barrier.await(100,TineUnit.MILLISECONDS);
System.out.println("await end");
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (BrokenBarrierException e) {
throw new RuntimeException(e);
}
};

for (int i = 0; i < 3; i++) {
Thread thread = new Thread(runnable);
thread.start();
Thread.sleep(1000);
}

CyclicBarrier 在所有等待线程被释放后被重用
CountDownLatch 只能被使用一次

{todo} Phaser???

交换器

Exchanger 两个线程间交换数据,当多个线程交换时,按顺序先到的两个先交换

同步队列

当一个线程调用SynchronousQueueput方法时,它会阻塞直到另一个线程调用take方法为止,反之亦然

其他

LongAdder AtomicLong

AtomicLong 采用CAS,在高并发时性能不如LongAdder