等待唤醒机制的规范实现
GuardedObject
obj持有的受保护对象(回复消息)
调用者get() 获取受保护对象(回复消息),检查条件等待
消费者fireEvent()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
class GuardedObject<T>{
//持有受保护的对象
T obj;
final Lock lock = new ReentrantLock();
final Condition done =lock.newCondition();
final int timeout=2;
//保存所有GuardedObject
final static Map<Object, GuardedObject> gos=new ConcurrentHashMap<>();
//静态方法创建GuardedObject
static <K> GuardedObject create(K key){
GuardedObject go=new GuardedObject();
gos.put(key, go);
return go;
}


static <K, T> void fireEvent(K key, T obj){
GuardedObject go=gos.remove(key);
if (go != null){
go.onChanged(obj);
}
}
//事件通知方法
void onChanged(T obj) {
lock.lock();
try {
this.obj = obj;
done.signalAll();
} finally {
lock.unlock();
}
}

//获取受保护对象
T get(Predicate<T> p) {
lock.lock();
try {
//MESA管程推荐写法
while(!p.test(obj)){
done.await(timeout, TimeUnit.SECONDS);
}
}catch(InterruptedException e){
throw new RuntimeException(e);
}finally{
lock.unlock();
}
//返回非空的受保护对象
return obj;
}
}


//处理浏览器发来的请求
Respond handleWebReq(){
int id=序号生成器.get();
//创建一消息
Message msg1 = new
Message(id,"{...}");
//创建GuardedObject实例
GuardedObject<Message> go=
GuardedObject.create(id);
//发送消息
send(msg1);
//等待MQ消息
Message r = go.get(
t->t != null);
}
void onMessage(Message msg){
//唤醒等待的线程
GuardedObject.fireEvent(
msg.id, msg);
}

没有共享,就没有伤害

1
2
3
4
5
6
7
8
9
10
11
static class SafeDateFormat {
//定义ThreadLocal变量
static final ThreadLocal<DateFormat>
tl=ThreadLocal.withInitial(()-> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));

static DateFormat get(){
return tl.get();
}
}
//不同线程执行下面代码 返回的df是不同的
DateFormat df = SafeDateFormat.get();

ThreadLocal的工作原理

ThreadLocalMap,key:WeakReference,value:Value

ThreadLocal与内存泄露

线程池中线程的存活时间长,往往和程序同生共死的,则Thread持有的ThreadLocalMap一直都不会被回收。
ThreadLocalMap中的Entry对ThreadLocal是弱引用(WeakReference),所以只要ThreadLocal结束了自己的生命周期是可以被回收掉的。但是Entry中的Value却是被Entry强引用的,所以即便Value的生命周期结束了,Value也是无法被回收的,从而导致内存泄露。

解决办法:手动释放

1
2
3
4
5
6
7
8
9
10
11
12
ExecutorService es;
ThreadLocal tl;
es.execute(()->{
//ThreadLocal增加变量
tl.set(obj);
try {
// 省略业务逻辑代码
}finally {
//手动清理ThreadLocal
tl.remove();
}
});

InheritableThreadLocal与继承性

通过ThreadLocal创建的线程变量,其子线程是无法继承的(无法访问)

不建议使用:线程池中线程的创建是动态的,很容易导致继承关系错乱,若业务逻辑依赖InheritableThreadLocal,那么很可能导致业务逻辑计算错误。

问题

实际工作中,有很多平台型的技术方案都是采用ThreadLocal来传递一些上下文信息,例如Spring使用ThreadLocal来传递事务信息。我们曾经说过,异步编程已经很成熟了,那你觉得在异步场景中,是否可以使用Spring的事务管理器呢?
???

不可变对象的写操作往往都是使用Copy-on-Write方法解决的
COW 写时复制

CopyOnWriteArrayList和CopyOnWriteArraySet这两个Copy-on-Write容器在修改时复制整个数组,所以如果容器经常被修改或者这个数组本身就非常大的时候,不建议使用的。修改非常少、数组数量也不大,并且对读性能要求苛刻的场景,使用Copy-on-Write容器效果就非常好了。

Copy-on-Write模式的应用领域

CopyOnWriteArrayList和CopyOnWriteArraySet这两个Copy-on-Write容器,读操无锁,性能极好
创建进程
类Unix的操作系统中创建进程的API是fork(),传统的fork()函数会创建父进程的一个完整副本,例如父进程的地址空间现在用到了1G的内存,那么fork()子进程的时候要复制父进程整个进程的地址空间(占有1G内存)给子进程,这个过程是很耗时的。而Linux中的fork()函数就聪明得多了,fork()子进程的时候,并不复制整个进程的地址空间,而是让父子进程共享同一个地址空间;只用在父进程或者子进程需要写入的时候才会复制地址空间,从而使父子进程拥有各自的地址空间。
文件系统
Btrfs (B-Tree File System)、aufs(advanced multi-layered unification filesystem)等

设计路由表

对读的性能要求很高,读多写少,弱一致性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
//路由信息
public final class Router{
private final String ip;
private final Integer port;
private final String iface;
//构造函数
public Router(String ip,
Integer port, String iface){
this.ip = ip;
this.port = port;
this.iface = iface;
}
//重写equals方法
public boolean equals(Object obj){
if (obj instanceof Router) {
Router r = (Router)obj;
return iface.equals(r.iface) &&
ip.equals(r.ip) &&
port.equals(r.port);
}
return false;
}
public int hashCode() {
//省略hashCode相关代码
}
}
//路由表信息
public class RouterTable {
//Key:接口名
//Value:路由集合
ConcurrentHashMap<String, CopyOnWriteArraySet<Router>>
rt = new ConcurrentHashMap<>();
//根据接口名获取路由表
public Set<Router> get(String iface){
return rt.get(iface);
}
//删除路由
public void remove(Router router) {
Set<Router> set=rt.get(router.iface);
if (set != null) {
set.remove(router);
}
}
//增加路由
public void add(Router router) {
Set<Router> set = rt.computeIfAbsent(
route.iface, r ->
new CopyOnWriteArraySet<>());
set.add(router);
}
}

设计思考

服务提供方上线、下线都会更新路由信息,这时有两种选择。
一种通过更新Router的状态位来标识,这样做所有访问该状态位的地方都需要同步访问,很影响性能。
一种采用Immutability模式,每次上线、下线创建新的Router对象或者删除对应的Router对象。由于上线、下线的频率很低,所以后者是最好的选择。

解决并发问题最简单方法:只读
上升到解决并发问题的设计模式:不变性(Immutability)模式
不变性,简单讲,就是对象创建之后,状态不再发生变化

实现不变模式

类、方法、属性设置成final

String和Long、Integer、Double等基础类型的包装类都具备不可变性,这些对象的线程安全性都靠不可变性来保证

不可变性的类,提供修改功能

具备不可变性的类,需要提供类似修改的功能:创建一个新的不可变对象
如String.replace方法,String字符替换操作,没有修改源字符串,而是组合新字符串返回

避免创建重复对象:享元模式

Long、Integer、Short、Byte等基本数据类型的包装类用到了享元模式,减少创建对象的数量,减少内存占用

享元模式本质:对象池
Long内部维护了一个静态的对象池,仅缓存了[-128,127],对象池在JVM启动的时候就创建好了
Long对象的状态有264种,太多不宜全部缓存,[-128,127]利用率最高

基本所有的基础类型应用享元模式,不适合作为锁对象,因为看上去私有的锁,其实是共有的

Immutability模式注意事项

  • 属性声明为final并不一定保证不可变
  • 不可变对象需要正确发布

不可变对象线程安全,不意味着引用不可变对象的对象就是线程安全的

1
2
3
4
5
6
7
8
9
10
11
12
//Foo线程安全
final class Foo{
final int age=0;
final int name="abc";
}
//Bar线程不安全
class Bar {
Foo foo;
void setFoo(Foo f){
this.foo=f;
}
}

总结

具备不变性的对象,只有一种状态,状态由对象内部所有的不变属性共同决定
无状态对象内部没有属性,只有方法;还有无状态的服务、无状态的协议等。
多线程领域,无状态没有线程安全问题,不需处理线程同步,性能自然好
分布式领域,无状态可无限水平扩展,性能问题不会出现在无状态节点

Fork/Join并行计算框架,支持分治任务模型
解决复杂问题的思维方法和模式
分治:复杂问题分解成相似子问题,直到子问题简单到可直接求解

分治任务模型

两阶段:Fork任务分解、Join结果合并
两部分:分治任务线程池ForkJoinPool,分治任务ForkJoinTask
类似ThreadPoolExecutor和Runnable的关系,都可以理解为提交任务到线程池,不过分治任务有自己独特类型ForkJoinTask。

Fork/Join使用

ForkJoinTask是一个抽象类包含两个方法:

  • fork()方法会异步地执行一个子任务
  • join()方法则会阻塞当前线程等待子任务执行结果
    ForkJoinTask两个子抽象类,定义了compute方法
  • RecursiveAction
    compute方法没有返回
  • RecursiveTask
    compute方法有返回
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//Fork/Join这个并行计算框架计算斐波那契数列
static void main(String[] args){
//创建分治任务线程池
ForkJoinPool fjp =
new ForkJoinPool(4);
//创建分治任务
Fibonacci fib =
new Fibonacci(30);
//启动分治任务
Integer result =
fjp.invoke(fib);
//输出结果
System.out.println(result);
}
//递归任务
static class Fibonacci extends
RecursiveTask<Integer>{
final int n;
Fibonacci(int n){this.n = n;}
protected Integer compute(){
if (n <= 1)
return n;
Fibonacci f1 =
new Fibonacci(n - 1);
//创建子任务
f1.fork();
Fibonacci f2 =
new Fibonacci(n - 2);
//等待子任务结果,并合并结果
return f2.compute() + f1.join();
}
}

ForkJoinPool工作原理

  • ThreadPoolExecutor
    生产者-消费者模式,内部一个任务队列是生产者和消费者通信的媒介
    多个工作线程共享一个任务队列(双端队列)。
  • ThreadPoolExecutor

内部多个任务队列,通过ForkJoinPool的invoke()或者submit()方法提交任务时,ForkJoinPool根据一定的路由规则把任务提交到一个任务队列中,如果任务在执行过程中会创建出子任务,子任务会提交到工作线程对应的任务队列中。

“任务窃取”机制让所有线程的工作量基本均衡,不会出现忙线程和闲线程,性能很好。
工作线程正常获取任务和“窃取任务”是从任务队列不同的端消费,这样能避免很多不必要的数据竞争。

Stream API应用ForkJoinPool

Stream API并行流以ForkJoinPool为基础,默认所有并行流计算共享一个ForkJoinPool,默认的线程数是CPU的核数。
并行流计算都是CPU密集型计算完全没有问题,但存在I/O密集型的并行流计算,可能会因为慢的I/O计算而拖慢系统性能。建议用不同的ForkJoinPool执行不同类型的计算任务。

批量执行异步任务

CompletionService将线程池和阻塞队列组合使用,让批量管理异步任务更简单,先执行完先进入阻塞队列。

构造方法

1
2
ExecutorCompletionService(Executor executor);
ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue)。

自己创建线程池可提供线程池隔离特性,避免耗时任务拖垮系统。

接口

1
2
3
4
5
Future<V> submit(Callable<V> task);
Future<V> submit(Runnable task, V result);
Future<V> take() throws InterruptedException;//空队列阻塞
Future<V> poll();//空队列返回null
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;//等待unit时间仍空队列返回null

实现Dubbo中的Forking Cluster

Dubbo中有一种叫做Forking的集群模式,并行调用服务,一个返回结果,整个服务就返回
如地址转坐标服务,为保证服务高可用和性能,可以并行调用3个地图服务商API,只要有1个正确返回,那么地址转坐标这个服务就直接返回。这种集群模式可以容忍2个地图服务商服务异常,缺点是消耗的资源偏多。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public class _25CompletionService {
@Test
void do1() throws InterruptedException, ExecutionException {
ExecutorService pool = Executors.newFixedThreadPool(9);
ExecutorCompletionService service = new ExecutorCompletionService(pool, new ArrayBlockingQueue<>(10));
List<Future> list = new ArrayList<>();
Future<String> f1 = service.submit(() -> {
return getPrice1Sleep10();
});
list.add(f1);
Future<String> f2 = service.submit(() -> {
return getPrice2Sleep2();
});
list.add(f2);
Future<String> f3 = service.submit(() -> {
return getPrice3Sleep5();
});
list.add(f3);

BlockingQueue<String> bq =
new LinkedBlockingQueue<>(10);

try {
for (int i = 0; i < 3; i++) {
System.out.printf("take %s\r\n", i);
Future take = service.take();
String r = take.get().toString();
System.out.printf("take %s ok,r:%s\r\n", i, r);
if (r != null) {
break;
}
}
}finally {
for (Future f : list) {
f.cancel(true);
}
}
}

private String getPrice1Sleep10() {
System.out.println("1-start will sleep 10s");
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("1 sleep end");
return "1-result";
}

private String getPrice2Sleep2() {
System.out.println("2-start will sleep 2s");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("2 sleep end");
return "2-result";
}

private String getPrice3Sleep5() {
System.out.println("3-start will sleep 5s");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("3 sleep end");
return "3-result";
}
}

CompletableFuture异步编程(java1.8)

创建CompletableFuture对象

  • 两个基础接口:
    runAsync(Runnable runnable) 与 supplyAsync(Supplier<U> supplier)区别:是否有返回值
  • 两个重载接口-可自定义线程池:
    runAsync(Runnable runnable, Executor executor)、supplyAsync(Supplier<U> supplier, Executor executor)
    CompletableFuture 实现 Future和CompletionStage,Futrue接口可获取线程运行状态和结果

如何理解CompletionStage接口

描述任务间的时序关系

串行

  • thenApply
    有参数和返回值
  • thenAccept
    有参数无返回值
  • thenRun
    无参数无返回值
  • thenCompose

以上接口区别:fn、consumer、action这三个核心参数不同

*Async版本接口在ForkjoinPool线程池中获取一个线程继续执行

1
2
3
4
5
6
7
8
9
CompletableFuture<String> f0 = 
CompletableFuture.supplyAsync(
() -> "Hello World") //①
.thenApply(s -> s + " QQ") //②
.thenApply(String::toUpperCase);//③

System.out.println(f0.join());
//输出结果
HELLO WORLD QQ

supplyAsync启动一个异步流程,之后是两个串行操作,任务①②③串行执行。

AND汇聚关系

  • thenCombine
    有参数有返回值
  • thenAcceptBoth
    有参数无返回值
  • runAfterBoth
    无参数无返回值

OR汇聚关系

  • applyToEither
    有参数有返回值
  • acceptEither
    有参数无返回值
  • runAfterEither
    无参数无返回值

异常

注意:外部无法自动捕获异常,需使用以下方法

  1. catch
    • exceptionally(ex)
  2. finally
    • handle(result,ex)
      有返回值
    • whenComplete(result,ex)
      无返回值

思考题

以下代码是否有问题?

1
2
3
4
5
6
7
8
9
10
11
//采购订单
PurchersOrder po;
CompletableFuture<Boolean> cf =
CompletableFuture.supplyAsync(()->{
//在数据库中查询规则
return findRuleByJdbc();
}).thenApply(r -> {
//规则校验
return check(po, r);
});
Boolean isOk = cf.join();

答:findRuleByJdbc方法隐藏着阻塞式I/O,这意味着会阻塞调用线程。
默认情况下所有的CompletableFuture共享一个ForkJoinPool,当有阻塞式I/O时,可能导致所有的ForkJoinPool线程都阻塞,进而影响整个系统的性能。
利用共享往往能让我们快速实现功能,有福同享,代价就是有难要同当。强调高可用的今天,大多数人更倾向于隔离方案。

void execute(Runnable command)方法无法获取线程执行结果

获取任务执行结果

submit方法

Java通过ThreadPoolExecutor的3个submit()方法和1个FutureTask工具类支持获得任务执行结果

1
2
3
4
5
6
7
//提交Runnable任务,Future结果对象为空,可通过此对象判断任务是否结束
Future<?> submit(Runnable task);
//提交Runnable任务及结果引用,
<T> Future<T> submit(Runnable task, T result);

//提交Callable任务
<T> Future<T> submit(Callable<T> task);

submit(Runnable,result)

<T> Future<T> submit(Runnable task, T result);
通过future.get获取的结果和submit参数result是一个对象。
经典用法:Runnable接口的实现类Task声明了一个有参构造函数Task(Result r)
创建Task对象的时传入了result对象,即可在类Task的run()方法中对result进行各种操作(即在子线程中操作)。
result相当于主线程和子线程之间的桥梁,通过它主子线程可以共享数据。

Future

Future接口有5个方法
取消任务的方法 cancel()
判断任务是否已取消的方法 isCancelled()
判断任务是否已结束的方法 isDone()
获得任务执行结果2个 get()和get(timeout, unit)

1
2
3
4
5
6
7
8
9
10
//取消任务
boolean cancel(boolean mayInterruptIfRunning);
//判断是否已取消
boolean isCancelled();
//判断是否已结束
boolean isDone();
//获得执行结果
get();
//获得执行结果,支持超时
get(long timeout, TimeUnit unit);

FutureTask

FutureTask实现了Runnable和Future接口
实现Runnable接口,所以可以将FutureTask提交给ThreadPoolExecutor执行,也可被Thread执行
实现Future接口,所以也能获得任务结果

示例1.提交给ThreadPoolExecutor去执行

1
2
3
4
5
6
7
8
9
// 创建FutureTask
FutureTask<Integer> futureTask
= new FutureTask<>(()-> 1+2);
// 创建线程池
ExecutorService es = Executors.newCachedThreadPool();
// 提交FutureTask
es.submit(futureTask);
// 获取计算结果
Integer result = futureTask.get();

示例2:Thread执行

1
2
3
4
5
6
7
8
// 创建FutureTask
FutureTask<Integer> futureTask
= new FutureTask<>(()-> 1+2);
// 创建并启动线程
Thread T1 = new Thread(futureTask);
T1.start();
// 获取计算结果
Integer result = futureTask.get();

总结

Future可以很容易获得异步(线程池或新建线程)任务的执行结果
任务之间依赖关系,基本上可用Future来解决,可用有向图描述任务间的依赖关系,同时做好线程分工
多线程可将串行的任务并行化提高性能

创建线程不仅在堆区分配内存(普通对象),还需要操作系统调用系统内核API为线程分配资源,成本很高。
作为重量级对象,应避免频繁创建销毁。

生产-消费模式
ThreadPoolExecutor不像通常的池资源那样pool.aquaire,pool.release,而是强调executor的生产消费模式,调用者是生产者,线程池是消费者。

1
2
3
4
5
6
7
8
ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
  • corePoolSize 核心线程数
  • maximumPoolSize 最大线程数
  • keepAliveTime & unit 超过指定时间回收非核心线程
  • workQueue 工作队列(任务队列)
  • threadFactory 自定义如何创建线程,例如你可以给线程指定一个有意义的名字
  • handler 自定义任务的拒绝策略(前提有界队列)
    1. CallerRunsPolicy:提交任务的线程自己去执行该任
    2. AbortPolicy:默认的拒绝策略,会throws RejectedExecutionException。
    3. DiscardPolicy:直接丢弃任务,没有任何异常抛出。
    4. DiscardOldestPolicy:丢弃最老的任务,新任务加入到工作队列

Java1.6增加了allowCoreThreadTimeOut(boolean value)方法允许核心线程超时

不建议使用无界队列
不建议使用Executors的最重要的原因是:Executors提供的很多方法默认使用的都是无界的LinkedBlockingQueue,高负载情境下,无界队列很容易导致OOM,而OOM会导致所有请求都无法处理,这是致命问题。所以强烈建议使用有界队列。

拒绝策略
有界队列,任务过多触发拒绝策略,线程池默认拒绝策略会throw RejectedExecutionException运行时异常,编译器不会强制处理它,避免容易忽略,默认拒绝策略要慎用。
重要的任务要自定义拒绝策略,与降级策略配合使用。
异常
ThreadPoolExecutor.execute提交任务,执行出现异常线程终止,但获取不到通知,需自己捕获异常按需处理。

其他

SpringBoot线程池 ThreadPoolTaskExecutor 可设置线程名前缀

无锁工具类的典范

实现

硬件支持CAS指令,作为一条CPU指令本身能保证原子性

原子类与锁比较

加解锁本身消耗性能,获取不到锁的线程会阻塞出发线程切换,线程切换也消耗性能。
原子类无锁

ABA问题

解决:版本号,每次改变版本号自增,同时修改值时检查版本号
AtomicStampedReference
AtomicMarkableReference (boolean类型版本号)

原子类分类

  • 原子化基础数据类型
  • 原子化的对象引用类型
  • 原子化数组
  • 原子化对象属性更新器
  • 原子化的累加器

基础数据类型

AtomicBoolean、AtomicInteger、AtomicLong
相关方法类似

1
2
3
4
5
6
7
8
9
10
11
12
getAndIncrement()	//	原⼦化	i++
getAndDecrement() // 原⼦化的 i--
incrementAndGet() // 原⼦化的 ++i
decrementAndGet() // 原⼦化的 --i
getAndAdd(delta) // 当前值 +=delta,返回 += 前的值
addAndGet(delta) // 当前值 +=delta,返回 += 后的值
compareAndSet(expect,update) //CAS 操作,返回是否成功
//以下四个⽅法 新值可以通过传⼊func函数来计算
getAndUpdate(func)
updateAndGet(func)
getAndAccumulate(x,func)
accumulateAndGet(x,func)

对象引用类型

AtomicReference、AtomicStampedReference、AtomicMarkableReference

AtomicStampedReference实现的CAS方法就增加了版本号参数

1
2
3
4
5
boolean compareAndSet(
V expectedReference,
V newReference,
int expectedStamp,
int newStamp)

AtomicMarkableReference将版本号简化成了一个Boolean值

1
2
3
4
5
boolean compareAndSet(
V expectedReference,
V newReference,
boolean expectedMark,
boolean newMark)

数组

AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray

对象属性更新器

AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicReferenceFieldUpdater
可以原子化地更新对象的属性,对象属性必须是volatile类型,只有这样才能保证可见性;反
射机制实现

累加器

DoubleAccumulator、DoubleAdder、LongAccumulator、LongAdder
仅执行累加操作,相比原子化的基本数据类型,速度更快,不支持compareAndSet方法
若只需要累加操作,使用原子化的累加器性能会更好

总结

相对于互斥锁方案性能好,不会死锁(但会饥饿、活锁,自旋重试)
只针对一个共享变量,对于多共享变量使用互斥锁

0%