14并发
1 | public interface Runnable |
中断线程
线程中断是为了引起线程的注意,不一定要结束线程。重要线程可处理异常后,继续执行。
interrupt()
调用后,线程中断状态为true。
当线程被阻塞(这里阻塞是指调用sleep或wait)时调用interrupt方法,阻塞会被Interrupted Exception中断。
islnterrupted()
检查是否被中断islnterrupted()
检测当前的线程是否被中断static boolean interrupted()
检测当前的线程是否被中断,且中断状态重置为false
1 | //错误做法 |
线程状态
- new 新创建
- Runable 可运行
- Blocked 被阻塞
- Waiting 等待
- Timed waiting 计时等待
- Terminated 被终止
新创建线程
new操作符创建了线程,未开始运行。
可运行线程
调用start方法,线程处于runnable状态
可运行的线桿可能正在运行也可能没有运行,取决于操作系统给线程提供运行的时间。
抢占式调度,一个时间片用完,操作系统剥夺线程运行权,选择优先级高的线程执行。
被阻塞线程和等待线程
- Blocking 线程1试图获取一个锁(非javiutiUoncurrent库中的锁),此锁被其他线程占有,此时线程1进入阻塞状态。
- Waiting 当线程等待另一个线程通知调度器一个条件时,它自己进入等待状态。(调用Object.wait方法或Thread.join方法,或者是等待java,util.concurrent库中的Lock或Condition时)
- Time waiting 带有超时参数的方法调用时,Thread.sleep和Object.wait、Thread.join、Lock,tryLock以及Condition.await的计时版
被终止的线程
- run方法结束,正常退出
- 没有捕获的异常终止了run方法,意外死亡
线程属性
线程优先级、守护线程、线程组以及处理未捕获异常的处理器
线程优先级
setPriority()
static void yield()
当前执行线程处于让步状态。如果有其他同优先级或跟高优先级的可运行线程,那么这些线程接下来会被调度。MIN_PRIORITY
1 , MAX_PRIORITY
10 , NORM_PRIORITY
5
如果有几个高优先级的线程没有进入非活动状态,低优先级的线程可能永远也不能执行。
守护线程
唯一用途:为其他线程提供服务,只剩下守护线程时
守护线程任何时候甚至在一个操作的中间发生中断,不应访问固有资源(文件、数据库)。
调用t.setDaemon(true);
将线程转换为守护线程(线程启动之前调用)
未捕获异常处理器
线程的run方法不能抛出受查异常,非受査异常会导致线程终止。
不需要任何catch子句来处理可以被传播的异常,线程死亡之前,异常被传递到一个用于未捕获异常的处理器。
异常处理器必须实现Thread.UncaughtExceptionHandler接口
interface UncaughtExceptionHandler{
void uncaughtException(Threadt,Throwable e)
}
安装异常处理器
- setUncaughtExceptionHandler
- Thread类的静态方法
setDefaultUncaughtExceptionHandler
为所有线程安装一个默认的处理器
如果不为独立的线程安装处理器,此时的处理器就是该线程的ThreadGroup对象,ThreadGroup类实现Thread.UncaughtExceptionHandler
接口。它的uncaughtException方法做如下操作:
- 如有父线程组,那么父线程组的uncaughtException方法被调用。
- 否则,调用
Thread.getDefaultExceptionHandler
获取默认处理器,非空则调用此默认处理器 - 否则,如果Throwable是ThreadDeath的一个实例,什么都不做。
- 否则,线程的名字以及Throwable的栈轨迹被输出到System.err上。
同步
锁对象
两种机制
- synchronized关键字
- ReentrantLock类
1
2
3
4
5
6
7ReentrantLock lock = new ReentrantLock();
lock.lock();
try {
//do something
}finally {
lock.unlock();
}
1 | java.util.concurrent.locks.Lock5.0 |
注:
听起来公平锁更合理一些,但是使用公平锁比使用常规锁要慢很多。只有当你确实了解自己要做什么并且对于你要解决的问题有一个特定的理由必须使用公平锁的时候,才可以使用公平锁。即使使用公平锁,也无法确保线程调度器是公平的。如果线程调度器选择忽略一个线程,而该线程为了这个锁已经等待了很长时间,那么就没有机会公平地处理这个锁了。
条件对象
使用一个条件对象来管理那些已经获得了一个锁但是却不能做有用工作的线程
1 | class Bank |
signalAll()
重新激活因为这一条件而等待的所有线程,线程从等待集当中移出(解除阻塞),再次成为可运行的,获取到锁后继续执行。signal()
随机解除等待集中某个线程的阻塞状态
synchronized
1 | public synchronized void method() |
条件阻塞举例
1 | class Bank |
静态synchronized方法将锁住整个类
同步阻塞
synchronized(obj)//this is the syntax for a synchronizedb lock
{
//critical section
}
Volatile域
volatile关键字为实例域的同步访问提供了一种免锁机制
声明一个域为volatile,编译器和虚拟机就知道该域可能被另一个线程并发更新
- 内存可见
- 禁止指令重排 (volatile变量的写操作,保证是在读操作之前完成)
- 赋值原子性
应用:
- 多线程标志位
- CAS
final变量
final Map<String,Double> accounts = new HashKap<>();
其他线程在构造函数完成构造后才看到accounts变量。
如果不使用final,不能保证其他线程看到的是accounts更新后的值,它们可能看到null,而不是新构造的HashMap
原子性
java.util.concurrent.atomic
包中有很多类使用了很高效的机器级指令来保证操作的原子性。
如:Atomiclnteger.incrementAndGet
、Atomiclnteger.decrementAndGet
自增自减incrementAndGet
获得值、增1并设置然后生成新值的操作不会中断。
如果有大量线程要访问相同的原子值,性能会大幅下降,因为乐观更新需要太多次重试。
JavaSE8提供了LongAdder
和LongAccumulator
类来解决这个问题。LongAdder
包括多个变量(加数),其总和为当前值。可以有多个线程更新不同的加数,线程个数增加时会自动提供新的加数。通常情况下,只有当所有工作都完成之后才需要总和的值,对于这种情况,这种方法会很高效。性能会有显著的提升。
1 | final LongAdder adder=new LongAdder(); |
LongAccumulator
将这种思想推广到任意的累加操作
1 | LongAccumulator adder=new LongAccumulator(Long::sum,0); //可选择不同的操作,且满足结合律和交换律。 |
死锁
线程局部变量
为每个线程构造一个实例:public static final ThreadLocal<SimpleDateFormat> dateFormat =ThreadLocal.withInitial(()->new SimpleDateFormat("yyyy-MM-dd"));
java.util.Rand0m类是线程安全的,但如果多线程等待一个随机数生成器,很低效。
可以使用ThreadLocal辅助类为各个线程提供一个单独的生成器,还可以使用Java提供的一个便利类:ThreadLocalRandom
,ThreadLocalRandom.current()
调用会返回特定于当前线程的Random类实例
锁测试与超时
1 | if(myLock.tryLock()) |
读/写锁
1 | private ReentrantReadWriteLock rwl=new ReentrantReadWriteLock(); |
为什么弃用stop和suspend方法
stop:当线程要终止另一个线程时,无法知道什么时候调用stop方法是安全的,什么时候导致对象被破坏。因此,该方法被弃用了。在希望停止线程的时候应该中断线程,被中断的线程会在安全的时候停止。
suspend:容易引起死锁,被挂起的线程等着被恢复,而将其挂起的线程等待获得锁。
阻塞队列
当试图向队列添加元素而队列已满,或是想从队列移出元素而队列为空的时候,阻塞队列(blockingqueue)导致线程阻塞。
put,take 满或空时阻塞
add,remove,element 空时异常
offer,poll(移除返回),peek(只返回) 空时 返回false,null,null
java.util.concurrent包提供了阻塞队列的几个变种:
- LinkedBlockingQueue 容量无上界,也可选择最大容量 LinkedBlockingDeque 双端队列版本
- ArrayBlockingQueue 构造时制定容量,可设置公平性
- PriorityBlockingQueue 带优先级的队列,而不是先进先出队列。按照它们的优先级顺序被移出,容量无上限
- DelayQueue
1
2
3
4interface Delayed extends Comparable<Delayed>
{
long getDelay(TimeUnitunit); //返回对象的残留延迟,负值表示延迟结束,可移除
} - LinkedTransferQue implements TranSferQueue 允许生产者线程等待,直到消费者准备就绪可以接收一个元素。 q.transfer(item); 阻塞直到另一个线程将元素(item)删除。
线程安全的集合
高效的映射、集和队列
java.util.concurrent包提供了映射、有序集和队列的高效实现:
ConcurrentHashMap
ConcurrentSkipListMap
key有序,跳表实现,非并发使用TreeMap
,低并发可使用包装TreeMap
的Collections.synchronizedSortedMap
,高并发使用ConcurrentSkipListMap
ConcurrentSkipListSet
有序,基于SkipList的集合ConcurrentLinkedQueue
一个基于链接节点的无界线程安全队列
JavaSE8引入了一个mappingCount
方法可以把大小作为long返回(元素过多,int范围小)。
返回弱一致性(weaklyconsistent)的迭代器。迭代器不一定能反映出它们被构造之后的所有的修改,但是,它们不会将同一个值返回两次,也不会拋出ConcurrentModificationException
异常。ConcurrentHashMap
默认支持16个写线程并发执行,多余的将阻塞。
在JavaSE8中,并发散列映射将桶组织为树,防止因大量相同散列码的值而降低性能
映射条目的原子更新
1 | map.putlfAbsent(word,newLongAdder());// 返回oldValue |
传入compute或merge的函数返回null,将从映射中删除现有的条目
注:使用compute或merge时,提供的函数不能做太多工作。因为函数运行时,可能会阻塞对映射的其他更新。且这个函数也不能更新映射的其他部分。
对并发散列映射的批操作
三种操作:
- 搜索
- 规约
- foreach
每个操作四个版本 - operationKeys:处理键。
- operatioriValues:处理值。
- operation:处理键和值。
- operatioriEntries:处理Map.Entry对象
阈值参数:
期望单线程处理,阈值设置Long.MAX_VALUE
期望多线程处理,阈值设置1
搜索
1 | UsearchKeys(long threshold,BiFunction<?super K,?extends U>f) |
search
第一个val大于1000的值String result=map.search(threshold,(k,v)->v>1000?k:null);
forEach
1 | map.forEach(threshold,(k,v)->System.out.println(k+"->"+v)); |
reduce
1 |
|
1 | 对于int、long和double输出还有相应的特殊化操作,分别有后缀Tolnt、ToLong和ToDouble。需要把输入转换为一个基本类型值,并指定一个默认值和一个累加器函数。映射为空时返回默认值。 |
并发集视图
1 | ConcurrentHashMap<String, String> hashMap = new ConcurrentHashMap<String, String>(); |
写数组的拷贝
{todo}
CopyOnWriteArrayList
CopyOnWriteArraySet
并行数组算法
parallelSort
1 | String[] arr = new String[]{"1", "333", "22", "12345"}; |
parallelSetAll
1 | Arrays.parallelSetAll(arr, x -> {//x 为索引 |
parallelPrefix
1 | String[] arr = new String[]{"1", "333", "22", "12345"}; |
较早的线程安全集合
已弃用的线程安全的动态数组和散列表 Vector Hashtable
取而代之的是非线程安全的AnayList和HashMap类,可通过同步包装器变成线程安全的
1 | List<E>synchArrayList=Collections,synchronizedList(newArrayList<E>()); |
Callable与Future
1 | Callable<String> callable = () -> "callable is running"; |
执行器
使用线程池的理由
- 需要大量且生命期短的线程应使用线程池
- 控制并发线程数数,大量线程导致性能降低
构建线程池执行器类的静态工厂方法 newCachedThreadPool
必要时创建新线程;空闲线程会被保留60秒newFixedThreadPool
该池包含固定数量的线程;空闲线程会一直被保留newSingleThreadExecutor
只有一个线程的“池”,该线程顺序执行每一个提交的任务(类似于Swing事件分配线程)newScheduledThreadPool
用于预定执行而构建的固定线程池,替代java.util.TimernewSingleThreadScheduledExecutor
用于预定执行而构建的单线程“池”
线程池
newCachedThreadPool
、newFixedThreadPool
、newSingleThreadExecutor
返回实现了ExecutorService
接口的ThreadPoolExecute
对象。可使用以下方法提交任务:
Future<?>submit(Runnabletask)
返回一个奇怪样子的Future<?>,调用get返回nullFuture<T>submit(Runnabletask,Tresult)
get返回指定的resultFuture<T>submit(Callable<T>task)
get返回计算结构
关闭线程池
- shutdow 启动该池的关闭序列。被关闭的执行器不再接受新的任务。当所有任务都完成以后,线程池中的线程死亡。
- shutdownNow 该池取消尚未开始的所有任务并试图中断正在运行的线程
预定执行
ScheduledExecutorService
接口具有为预定执行(ScheduledExecution)或重复执行任务而设计的方法。
Executors类的newScheduledThreadPool
和newSingleThreadScheduledExecutor
方法将返回实现了Scheduled-ExecutorService接口的对象。
可以预定Runnable或Callable在初始的延迟之后只运行一次。也可以预定一个Runnable对象周期性地运行。
控制任务组
使用执行器控制一组任务
- invokeAny 提交所有对象到一个Callable对象的集合中,并返回某个已经完成了的任务的结果(无法确定是那个任务的结果,可能是最先完成任务的结果)
- invokeAll 方法提交所有对象到一个Callable对象的集合中,并返回一个Future对象的列表,代表所有任务的解决方案。invokeAll缺点:若第一个任务耗时时很多,需要等待
1
2
3
4List<Callab1e<T>> tasks=...;
List<Future<T>> results = executor.invokeAll(tasks):
for(Future<T> result : results)
processFurther(result.get());
ExecutorCompletionService来排序改进1
2
3
4ExecutorCompletionService<T> service = newExecutorCompletionServiceo(executor);
for(Callable<T> task : tasks) service.submit(task);
for(int i = 0 ; i < tasks.size();i++)
processFurther(service.take().get());
ExecutorCompletionService.take
移除下一个已完成的结果,如果没有任何已完成的结果可用则阻塞。ExecutorCompletionService.poll
移除下一个已完成的结果,如果没有任何已完成结果可用则返回null。
Fork-Join
针对计算密集型任务,有足够多的处理器可并行处理的任务
要采用框架可用的一种方式完成这种递归计算,需要提供一个扩展RecursiveTask
1 | public class ForkJoinTest |
fork-join框架平衡可用线程工作负载的方法:(工作密取work stealing)
每个工作线程有一个双端队列(deque)来完成任务。一个工作线程将子任务压人队列的队头。(只有一个线程可以访问队头,所以不需要加锁。)一个工作线程空闲时,它会从另一个队列的队尾“密取”一个任务。由于大的子任务都在队尾,这种密取很少出现。
可完成Future
CompletableFuture
实现了CompletionStage
接口和Future
接口,处理多任务协同工作
创建异步任务
supplyAsync
supplyAsync(Supplier<U>)
supplyAsync(Supplier<U>,Executor)
runAsync
创建没有返回值的异步任务
获取结果
// 如果完成则返回结果,否则就抛出具体的异常public T get() throws InterruptedException, ExecutionException
// 最大时间等待返回结果,否则就抛出具体异常public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
// 完成时返回结果值,否则抛出unchecked异常。为了更好地符合通用函数形式的使用,如果完成此 CompletableFuture
所涉及的计算引发异常,则此方法将引发unchecked异常并将底层异常作为其原因public T join()
// 如果完成则返回结果值(或抛出任何遇到的异常),否则返回给定的 valueIfAbsent。public T getNow(T valueIfAbsent)
// 如果任务没有完成,返回的值设置为给定值public boolean complete(T value)
// 如果任务没有完成,就抛出给定异常public boolean completeExceptionally(Throwable ex)
异步回调处理
thenApply和thenApplyAsync
thenAccept和thenAcceptAsync
thenRun和thenRunAsync
whenComplete和whenCompleteAsync
whenComplete是当某个任务执行完成后执行的回调方法
handle和handleAsync
跟whenComplete基本一致,区别在于handle的回调方法有返回值。
1 | CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> { |
多任务组合处理
thenCombine、thenAcceptBoth 和runAfterBoth
applyToEither、acceptEither和runAfterEither
allOf / anyOf
同步器
CyclicBarrier
允许线程集等待直至其中预定数目的线程到达一个公共障栅(barrier),然后
可以选择执行一个处理障栅的动作
当大量的线程需要在它们的结果可用之前完
成时Phaser
类似于循环障栅,不过有一个可变的计数
JavaSE7中引人CountDownLatch
允许线程集等待直到计数器减为0
当一个或多个线程需要等待直到指定数目的事件发生Exchanger
允许两个线程在要交换的对象准备好时交换对象
当两个线程工作在同一数据结构的两个实例上的时候,一个向实例添加数据而另一个从实例清除数据Semaphore
允许线程集等待直到被允许继续运行为止
限制访问资源的线程总数。如果许可数是1,常常阻塞线程直到另一个线程给出许可为止SynchronousQueue
允许一个线程把对象交给另一个线程
在没有显式同步的情况下,当两个线程准备好将一个对象从一个线程传递到另一个时
信号量
倒计时门栓
CountDownLatch
等待计数变为0可继续执行
应用场景例如:多线程准备数据,每个线程完成自己的工作计数器-1,所有线程准备工作完成,计数器变为0,此时处理数据线程开始工作。
栅栏
1 | CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> { |
CyclicBarrier
在所有等待线程被释放后被重用CountDownLatch
只能被使用一次
{todo} Phaser???
交换器
Exchanger
两个线程间交换数据,当多个线程交换时,按顺序先到的两个先交换
同步队列
当一个线程调用SynchronousQueue
的put
方法时,它会阻塞直到另一个线程调用take
方法为止,反之亦然
其他
LongAdder AtomicLong
AtomicLong 采用CAS,在高并发时性能不如LongAdder