List

CopyOnWriteArrayList
写的时候将共享变量复制一份,好处是读操作完全无锁
写操作:新复制数组上增加元素后再将array指向这个新的数组

Map

ConcurrentHashMap key无序 k/v都不可为空 线程安全
ConcurrentSkipListMap key有序 k/v都不可为空 线程安全

HashMap k/v 都允许为空 线程不安全
TreeMap k/v k不可空,v可空 线程不安全
HashTable k/v 都不可空 线程安全

SkipList 插入、删除、查询操作平均的时间复杂度是 O(logn)
跳表理论上和并发线程数没关系,在并发度非常高时,若对ConcurrentHashMap性能不满意,可尝试ConcurrentSkipListMap,并发线程数越高跳表结构的优势越明显.

Set

CopyOnWriteArraySet
ConcurrentSkipListSet
参考CopyOnWriteArrayList、ConcurrentSkipListMap原理一致

Queue

最复杂的并发容器类
分维度分类:

  • 维度1-阻塞/非阻塞
    队列满时入队是否阻塞,空时出队是否阻塞;阻塞队列都用Blocking关键字标识
  • 维度2-单端/双端
    单端指的是只能队尾入队,队首出队;Queue标识
    双端指的是队首队尾皆可入队出队;Deque标识
    是否有界内部队列容量是否有限,无界队列易导致OOM不建议使用

单端阻塞队列

数组实现 ArrayBlockingQueue 有锁结构
链表实现 LinkedBlockingQueue 有锁结构
无队列 SynchronousQueue
LinkedBlockingQueue + SynchronousQueue = LinkedTransferQueue ,CAS无锁结构
优先级队列 PriorityBlockingQUeue
延迟队列 DelayQueue

SynchronousQueue

take线程阻塞直到有put的线程放入元素为止,反之亦然

//⽤于存储所有的数据库连接
CopyOnWriteArrayList sharedList;
//线程本地存储中的数据库连接
ThreadLocal<List> threadList;
//等待数据库连接的线程数
AtomicInteger waiters;
//分配数据库连接的⼯具
SynchronousQueue handoffQueue;

双端阻塞队列

LinkedBlockingDeque

单端非阻塞队列

ConcurrentLinkedQueue

双端非阻塞队列

ConcurrentLinkedDeque

总结

理清楚每种容器的特性
ArrayBlockingQueue 和 LinkedBlockingQueue支持有界,其他无界,注意无界队列导致OOM

问题

容器的Fail-Fast(快速失败)机制什么是

concurrentskiplistmap比concurrenthashmap性能好

concurrentskiplistmap没有key冲突问题,HashMap key冲突通过链表或者tree解决所以O(1)是理想状态。增删改操作也影响hashmap性能,要看冲突情况。所以hashmap的稳定性差,如果正好遇到稳定性问题无法接受可尝试ConcurrentSkipListMap

让多线程步调一致

业务介绍:
在线商城下单,生成电子订单,保存在订单库;
物流生成派送单发货,派送单保存在派送单库。
为防止漏派送或重复派送,对账系统每天还会校验是否存在异常订单。

串行方式

1
2
3
4
5
6
7
8
9
10
while(存在未对账订单){
// 查询未对账订单
pos = getPOrders();
// 查询派送单
dos = getDOrders();
// 执行对账操作
diff = check(pos, dos);
// 差异写入差异库
save(diff);
}

并行优化对账系统

并行查询订单和派送单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
while(存在未对账订单){
// 查询未对账订单
Thread T1 = new Thread(()->{
pos = getPOrders();
});
T1.start();
// 查询派送单
Thread T2 = new Thread(()->{
dos = getDOrders();
});
T2.start();
// 等待 T1、T2 结束
T1.join();
T2.join();
// 执行对账操作
diff = check(pos, dos);
// 差异写入差异库
save(diff);
}

线程池优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 创建 2 个线程的线程池
Executor executor =
Executors.newFixedThreadPool(2);
while(存在未对账订单){
// 查询未对账订单
executor.execute(()-> {
pos = getPOrders();
});
// 查询派送单
executor.execute(()-> {
dos = getDOrders();
});

/* ??如何实现等待??*/

// 执行对账操作
diff = check(pos, dos);
// 差异写入差异库
save(diff);
}

如下,使用线程池如何知道查询线程完成了呢

  1. 可通过管程实现计数器,计数器初始2,查询完成–,计数器为0则重置为2并执行对对账入库
  2. 使用CountDownLatch

CountDownLatch优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 创建 2 个线程的线程池
Executor executor =
Executors.newFixedThreadPool(2);
while(存在未对账订单){
// 计数器初始化为 2
CountDownLatch latch =
new CountDownLatch(2);
// 查询未对账订单
executor.execute(()-> {
pos = getPOrders();
latch.countDown();
});
// 查询派送单
executor.execute(()-> {
dos = getDOrders();
latch.countDown();
});

// 等待两个查询操作结束
latch.await();

// 执行对账操作
diff = check(pos, dos);
// 差异写入差异库
save(diff);
}

进一步性能优化

优化点:两个查询操作和对账操作check、save仍是串行

需要引入订单队列和派送单队列,T1查询订单,T2查询派送单,T1T2各自生产1条数据后通知T3执行对账。(T1T2要互相等待)

实现计数器优化

可以利用一个计数器来解决这两个难点,计数器初始化为 2,线程 T1 和 T2 生产完一条数据都将计数器减 1,如果计数器大于 0 则线程 T1 或者 T2 等待。如果计数器等于 0,则通知线程 T3,并唤醒等待的线程 T1 或者 T2,与此同时,将计数器重置为 2,这样线程 T1 和线程 T2 生产下一条数据的时候就可以继续使用这个计数器了。

但建议使用Java并发包工具类:CyclicBarrier。

CyclicBarrier

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// 订单队列
Vector<P> pos;
// 派送单队列
Vector<D> dos;
// 执行回调的线程池
Executor executor =
Executors.newFixedThreadPool(1);
final CyclicBarrier barrier =
new CyclicBarrier(2, ()->{
executor.execute(()->check());
});

void check(){
P p = pos.remove(0);
D d = dos.remove(0);
// 执行对账操作
diff = check(p, d);
// 差异写入差异库
save(diff);
}

void checkAll(){
// 循环查询订单库
Thread T1 = new Thread(()->{
while(存在未对账订单){
// 查询订单库
pos.add(getPOrders());
// 等待
barrier.await();
}
}
T1.start();
// 循环查询运单库
Thread T2 = new Thread(()->{
while(存在未对账订单){
// 查询运单库
dos.add(getDOrders());
// 等待
barrier.await();
}
}
T2.start();
}

总结

CountDownLatch 主要用来解决一个线程等待多个线程的场景,计数器是不能循环利用的,计数器为0时await直接通过

CyclicBarrier 是一组线程之间互相等待,计数器是可以循环利用的,可设置回调函数

还可使用线程池Future优化

1
2
3
CompletableFuture<List> pOrderFuture = CompletableFuture.supplyAsync(this::getPOrders);
CompletableFuture<List> dOrderFuture = CompletableFuture.supplyAsync(this::getDOrders);
pOrderFuture.thenCombine(dOrderFuture, this::check).thenAccept(this::save);

问题

CyclicBarrier的回调函数我们使用了一个固定大小的线程池,有必要吗?

使用

三种模式锁:写锁、悲观读锁、乐观读
乐观读允许其他线程获取写锁,它是无锁的,性能比ReadWriteLock更好

注意事项:

  • 不支持重入
  • 不支持条件变量
  • 中断使用readLockInterruptibly、writeLockInterruptibly
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    final StampedLock sl = new StampedLock();  
    // 获取 / 释放悲观读锁示意代码
    long stamp = sl.readLock();
    try {
    // 省略业务相关代码
    } finally {
    sl.unlockRead(stamp);
    }

    // 获取 / 释放写锁示意代码
    long stamp = sl.writeLock();
    try {
    // 省略业务相关代码
    } finally {
    sl.unlockWrite(stamp);
    }

    class Point {
    private int x, y;
    final StampedLock sl =
    new StampedLock();
    // 计算到原点的距离
    int distanceFromOrigin() {
    // 乐观读
    long stamp = sl.tryOptimisticRead();
    // 读入局部变量,
    // 读的过程数据可能被修改
    int curX = x, curY = y;
    // (判断版本号)判断执行读操作期间,是否存在写操作,如果存在,则 sl.validate 返回 false;
    if (!sl.validate(stamp)){ //
    // 升级为悲观读锁
    stamp = sl.readLock();
    try {
    curX = x;
    curY = y;
    } finally {
    // 释放悲观读锁
    sl.unlockRead(stamp);
    }
    }
    return Math.sqrt(
    curX * curX + curY * curY);
    }
    }
    上例中升级为悲观读锁是合理的,否则需要循环执行乐观读直到乐观读期间没有写操作,避免循环读浪费大量CPU。

数据库乐观锁

增加版本号列,每次更新时验证版本号是否是最新的
优点:利用数据库行锁进行并发控制,降低锁粒度。相对业务悲观锁降低复杂度,不需要处理死锁及超时问题。
应用场景:短事务,不侵入业务处理,集成简单

管程和信号量可以解决所有并发问题,java SDK提供其他工具类有什么用呢?分场景性能优化,提升易用性

ReadWriteLock 读多写少场景

读写锁的原则

  1. 允许多线程读共享变量
  2. 只允许一个线程写共享变量
  3. 一个线程写时,其他线程禁止读写

读多写少的情况下优于互斥锁

快速实现一个缓存

不支持懒加载,需一次性初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class Cache<K,V> {
final Map<K, V> m =
new HashMap<>();
final ReadWriteLock rwl =
new ReentrantReadWriteLock();
// 读锁
final Lock r = rwl.readLock();
// 写锁
final Lock w = rwl.writeLock();
// 读缓存
V get(K key) {
r.lock();
try { return m.get(key); }
finally { r.unlock(); }
}
// 写缓存
V put(String key, Data v) {
w.lock();
try { return m.put(key, v); }
finally { w.unlock(); }
}
}

支持懒加载版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class Cache<K,V> {
final Map<K, V> m =
new HashMap<>();
final ReadWriteLock rwl =
new ReentrantReadWriteLock();
final Lock r = rwl.readLock();
final Lock w = rwl.writeLock();

V get(K key) {
V v = null;
// 读缓存
r.lock(); ①
try {
v = m.get(key); ②
} finally{
r.unlock(); ③
}
// 缓存中存在,返回
if(v != null) { ④
return v;
}
// 缓存中不存在,查询数据库
w.lock(); ⑤
try {
// 再次验证
// 其他线程可能已经查询过数据库
v = m.get(key); ⑥
if(v == null){ ⑦
// 查询数据库
v= 省略代码无数
m.put(key, v);
}
} finally{
w.unlock();
}
return v;
}
}

问题????
get方法读为什么加锁,只给写加锁不行吗?如下
V get(K key) {

v = m.get(key); ②

// 缓存中存在,返回
if(v != null) {   ④
  return v;
}
// 缓存中不存在,查询数据库
w.lock();         ⑤
try {
  // 再次验证
  // 其他线程可能已经查询过数据库
  v = m.get(key); ⑥
  if(v == null){  ⑦
    // 查询数据库
    v= 省略代码无数
    m.put(key, v);
  }
} finally{
  w.unlock();
}
return v; 

}

锁升级降级

锁升级:先是获取读锁,再升级为写锁
ReadWriteLock不支持锁升级,支持降级
不支持升级原因:读锁未释放时获取写锁,导致写锁永久等待,线程阻塞无法被唤醒。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
lass CachedData {
Object data;
volatile boolean cacheValid;
final ReadWriteLock rwl =
new ReentrantReadWriteLock();
// 读锁
final Lock r = rwl.readLock();
// 写锁
final Lock w = rwl.writeLock();

void processCachedData() {
// 获取读锁
r.lock();
if (!cacheValid) {
// 释放读锁,因为不允许读锁的升级
r.unlock();
// 获取写锁
w.lock();
try {
// 再次检查状态 并发很低可不检查
if (!cacheValid) {
data = ...
cacheValid = true;
}
// 释放写锁前,降级为读锁
// 降级是可以的
r.lock(); ①
} finally {
// 释放写锁
w.unlock();
}
}
// 此处仍然持有读锁
try {use(data);}
finally {r.unlock();}
}
}

总结

读写锁的读锁不支持条件变量,写锁支持

问题

如何保证缓存与数据库数据一致?

  1. 延迟双删策略
    先删除缓存,再更新数据库,更新数据库后,延迟一段时间再次删除缓存
    减少高并发场景下的数据不一致问题,但仍然存在一些挑战,如延迟时间的选择和第二次删除缓存失败的处理
  2. 加锁串行化
    强一致实现方案,会导致性能下降
  3. 订阅binlog
    需要额外的系统支持,减少业务代码入侵

系统停止响应了,CPU 利用率很低,你怀疑有同学一不小心写出了读锁升级写锁的方案,那你该如何验证自己的怀疑呢?

尝试获取读/写锁看是否能获取到

1965迪杰斯特拉提出,之后15年是并发编程领域的终结者,直到 1980年管程提出来,才有了第二选择

信号量模型

一个计数器,一个等待队列,三个方法(init、down、up)
计数器和等待队列对外是透明的,只能通过信号量模型提供的三个方法来访问它们

  • init
    设置计数器的初始值。
  • down
    计数器值减1,如果计数器的值小于0,则当前线程将被阻塞,否则继续执行。
  • up
    计数器值加1,如果计数器值小于或者等于0,则唤醒等待队列中的一个线程,并将其移除。

快速实现一个限流器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class ObjPool<T, R> {
final List<T> pool;
// 用信号量实现限流器
final Semaphore sem;
// 构造函数
ObjPool(int size, T t){
pool = new Vector<T>(){};
for(int i=0; i<size; i++){
pool.add(t);
}
sem = new Semaphore(size);
}
// 利用对象池的对象,调用 func
R exec(Function<T,R> func) {
T t = null;
sem.acquire();
try {
t = pool.remove(0);
return func.apply(t);
} finally {
pool.add(t);
sem.release();
}
}
}
// 创建对象池
ObjPool<Long, String> pool =
new ObjPool<Long, String>(10, 2);
// 通过对象池获取 t,之后执行
pool.exec(t -> {
System.out.println(t);
return t.toString();
});

注意:Semaphore 可以允许多个线程访问一个临界区,所以在acquire后的操作要保证线程安全

思考题

  • 对象保存在了 Vector 中,Vector 是 Java 提供的线程安全的容器,如果我们把 Vector 换成 ArrayList,是否可以呢?
    不可,semophore允许多线程访问临界区,即多线程同时调用pool.remove方法时应保证线程安全

Java SDK并发包实现管程:Lock、Condition接口来
Lock解决互斥问题,Condition解决同步问题

为什么再造管程

java语言层面synchronized实现管程为什么还提供SDK并发包里的实现呢?性能不是原因那是什么?
重新设计一把互斥锁解决不可抢占的问题

  1. 能够响应中断
    void lockInterruptibly()
  2. 支持超时
    boolean tryLock(long time, TimeUnit unit)
  3. 非阻塞地获取锁
    boolean tryLock();

synchronized阻塞状态调用线程的interrupt方法不会中断

ReentrantLock

volatile/CAS/AQS

AQS 同步队列 FIFO 双向链表
实现条件变量的条件队列 单向链表 用于支持await/signal
调用await时,释放锁进入条件队列,调用signal时,从条件队列迁移至同步队列,重新竞争锁。

同步队列为什么使用双向链表
为了支持高效的线程管理和同步操作,

  • 高效删除任意节点,单链表需遍历
    如超时,取消等待,迁移到条件队列

ReentrantLock 如何保证可见性
关键点:volatile state
获取锁时先读写state,释放锁时再次读写state,happens-before传递性保证可见

1
2
3
4
5
6
7
8
9
10
11
12
13
class SampleLock {
volatile int state;
// 加锁
lock() {
// 省略代码无数
state = 1;
}
// 解锁
unlock() {
// 省略代码无数
state = 0;
}
}

可重入
可重入锁
可重入函数,指的是多个线程可以同时调用该函数,每个线程都能得到正确结果:线程安全

公平/非公平

Lock、Condition

利用两个条件变量快速实现阻塞队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class BlockedQueue<T>{
final Lock lock =
new ReentrantLock();
// 条件变量:队列不满
final Condition notFull =
lock.newCondition();
// 条件变量:队列不空
final Condition notEmpty =
lock.newCondition();

// 入队
void enq(T x) {
lock.lock();
try {
while (队列已满){
// 等待队列不满
notFull.await();
}
// 省略入队操作...
// 入队后, 通知可出队
notEmpty.signal();
}finally {
lock.unlock();
}
}
// 出队
void deq(){
lock.lock();
try {
while (队列已空){
// 等待队列不空
notEmpty.await();
}
// 省略出队操作...
// 出队后,通知可入队
notFull.signal();
}finally {
lock.unlock();
}
}
}

同步与异步

如何实现异步

  • 异步调用
    调用方创建一个子线程,在子线程中执行方法调用
  • 异步方法
    方法实现的时候,创建一个新的线程执行主要逻辑,主线程直接 return

Dubbo同步转异步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class DubboInvoker{
Result doInvoke(Invocation inv){
// 下面这行就是源码中 108 行
// 为了便于展示,做了修改
return currentClient
.request(inv, timeout) //DefaultFuture
.get();
}
}
// 创建锁与条件变量
private final Lock lock
= new ReentrantLock();
private final Condition done
= lock.newCondition();

// 调用方通过该方法等待结果
Object get(int timeout){
long start = System.nanoTime();
lock.lock();
try {
while (!isDone()) {
done.await(timeout);
long cur=System.nanoTime();
if (isDone() ||
cur-start > timeout){
break;
}
}
} finally {
lock.unlock();
}
if (!isDone()) {
throw new TimeoutException();
}
return returnFromResponse();
}
// RPC 结果是否已经返回
boolean isDone() {
return response != null;
}
// RPC 结果返回时调用该方法
private void doReceived(Response res) {
lock.lock();
try {
response = res;
if (done != null) {
done.signal();
}
} finally {
lock.unlock();
}
}

获取结果时await,等待doReceived调用signal唤醒

总结

Lock&Condition实现的管程相对于synchronized更灵活、功能更丰富

线程声明周期

生命周期中各个节点的状态转移机制是怎样的

通用线程生命周期

  1. 初始状态
    编程语言层面的线程被创建,操作系统层面真正的线程未创建,不可分配CPU
  2. 可运行状态
    操作系统层面线程创建完成,可分配CPU
  3. 运行状态
    有空闲CPU时,分配到CPU的线程变为运行态
  4. 休眠状态
    运行态线程调用阻塞API(如阻塞方式读取文件)或等待某个事件(如条件变量),此时线程进入休眠态,释放CPU。等待事件出现后,从休眠太转变为可运行态(休眠态不会获得CPU使用权)
  5. 终止状态
    执行完毕或异常才进入终止,生命周期结束,不可切换为其他状态。

Java中线程的生命周期

  • NEW(初始化状态)
    调用线程对象的start方法即可从NEW状态转换到RUNNABLE状态
  • RUNNABLE(可运行/运行状态)
  • BLOCKED(阻塞状态)
  • WAITING(无时限等待)
  • TIMED_WAITING(有时限等待)
  • TERMINATED(终止状态)

Java线程中的BLOCKED、WAITING、TIMED_WAITING都是操作系统线程的休眠状态,没有CPU的使用权

NEW 到 RUNNABLE

调用线程对象的start方法即可从NEW状态转换到RUNNABLE状态

创建 Thread 对象主要有两种方法

  1. 继承Thread重写run方法
    1
    2
    3
    4
    5
    6
    7
    8
    9
    // 自定义线程对象
    class MyThread extends Thread {
    public void run() {
    // 线程需要执行的代码
    ......
    }
    }
    // 创建线程对象
    MyThread myThread = new MyThread();
  2. 实现Runable接口,重写run方法
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    // 实现 Runnable 接口
    class Runner implements Runnable {
    @Override
    public void run() {
    // 线程需要执行的代码
    ......
    }
    }
    // 创建线程对象
    Thread thread = new Thread(new Runner());

RUNNABLE 与 BLOCKED

只有一种场景会触发,线程等待synchronized的隐式锁就会从RUNNABLE转换到BLOCKED状态

线程调用阻塞式API(如阻塞方式读取文件、网络通信)
操作系统层面线程是会转换到休眠状态的,系统线程释放CPU挂起直到I/O完成
Java线程的状态会依然保持RUNNABLE状态。
JVM层面不关心操作系统调度相关的状态,JVM看来等待CPU使用权(操作系统层面此时处于可执行状态,JVM层合并了可运行、运行状态)与等待 I/O(操作系统层面此时处于休眠状态)没有区别。

我们平时所谓的Java在调用阻塞式API时,线程会阻塞,指的是操作系统线程的状态,并不是Java线程的状态。

阻塞状态不会响应中断

RUNNABLE 与 WAITING

  1. 获得synchronized隐式锁的线程,调用无参数的Object.wait()方法
  2. 调用无参数的Thread.join()方法
  3. 调用LockSupport.park()方法
    unpark线程的状态又会从WAITING状态转换到RUNNABLE

RUNNABLE 与 TIMED_WAITING

  1. 带超时参数的Thread.sleep(long millis)方法
  2. 获得synchronized隐式锁的线程,调用带超时参数的Object.wait(long timeout)方法
  3. 调用带超时参数的Thread.join(long millis)方法\
  4. 调用带超时参数的LockSupport.parkNanos(Object blocker, long deadline)方法
  5. 调用带超时参数的LockSupport.parkUntil(long deadline)方法

TIMED_WAITING和WAITING只是超时参数

RUNNABLE 到 TERMINATED

  1. 线程执行完成自动
  2. 执行run方法时抛出异常
  3. 调用interrupt方法
  4. 调用stop方法(已弃用)
1
2
3
4
5
6
7
8
9
10
11
12
Thread th = Thread.currentThread();
while(true) {
if(th.isInterrupted()) {
break;
}
// 省略业务代码无数
try {
Thread.sleep(100);
}catch (InterruptedException e){
e.printStackTrace();//捕获到中断异常会重置中断状态,可能会无限循环
}
}

stop与interrupt区别

stop杀死线程,不会释放锁,导致其他线程无法获取到锁。类似的方法还有suspend,resume都不建议使用

interrupt通知线程,线程有机会做一些操作,也可忽略此通知。
通知方式:异常,主动监测

异常:在线程A wait,join,sleep时线程B调用A.interrupt,以上三个方法会抛出异常
主动监测:线程调用isInterrupted方法自己检查

诊断多线程BUG

jstack命令或者Java VisualVM可视化工具导出JVM线程栈信息,包括线程的当前状态、调用栈,还包括了锁的信息

示例

  1. 查询进程id
    jsp -l 命令查看本机所有java进程pid

  2. top查看目前正在运行的进程使用系统资源情况

  3. 导出指定进程pid所有线程信息
    jstack [-F] [-l] pid > xxx.log
    -F强制导出

  4. 分析

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    "consumer_redirectUrl_topic_jmq206_1546013217302" daemon prio=10 tid=0x00007f1bf03f6800 nid=0x693e waiting on condition [0x00007f1b38388000]
    java.lang.Thread.State: TIMED_WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for <0x00000000f76e21a0> (a java.util.concurrent.CountDownLatch$Sync)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1033)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1326)
    at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:282)
    at com.jd.jmq.common.network.netty.ResponseFuture.await(ResponseFuture.java:133)
    at com.jd.jmq.common.network.netty.NettyTransport.sync(NettyTransport.java:241)
    at com.jd.jmq.common.network.netty.failover.FailoverNettyClient.sync(FailoverNettyClient.java:94)
    at com.jd.jmq.client.consumer.GroupConsumer.pull(GroupConsumer.java:246)
    at com.jd.jmq.client.consumer.GroupConsumer$QueueConsumer.run(GroupConsumer.java:445)
    at java.lang.Thread.run(Thread.java:745)

    Locked ownable synchronizers:
    - None

    线程名:consumer_redirectUrl_topic_jmq206_1546013217302
    线程优先级:prio=10
    java线程的identifier:tid=0x00007f1bf03f6800
    native线程的identifier:nid=0x693e
    线程的状态:waiting on condition [0x00007f1b38388000]
    java.lang.Thread.State: TIMED_WAITING (parking)
    线程栈起始地址:[0x00007f1b38388000]

根据进程id获取线程
top -H -p
将线程ID转换为16进制,在线程dump文件中搜索相关信息
例如:27840 ==> 6cc0

管程

解决并发问题的核心技术:管程
管程:管理共享变量以及线程对共享变量的操作过程,让他们支持并发
翻译为Java领域的语言:管理类的变量和方法,让类线程安全

Java在1.5之前,提供的唯一的并发原语就是管程(Java采用的是管程技术synchronized关键字及wait、notify、notifyAll三个方法都是管程的组成部分)

管程(Monitor)和信号量等价,等价指管程、实现信号量可相互实现
(操作系统原理课程告诉我,信号量能解决所有并发问题)

并发编程领域两大核心问题:

  • 互斥,同一时刻只允许一个线程访问共享资源
  • 同步,线程之间如何通信、协作

三种管程模型
Hasen模型、Hoare模型、MESA模型
核心区别:当条件满足后,如何通知相关线程。管程要求同一时刻只允许一个线程执行,那当线程T2的操作使线程T1等待的条件满足时,T1和T2 究竟谁可以执行呢?

  • Hasen 模型
    要求 notify() 放在代码的最后,这样 T2 通知完 T1 后,T2 就结束了,然后 T1 再执行,这样就能保证同一时刻只有一个线程执行。
  • Hoare 模型
    T2 通知完 T1 后,T2 阻塞,T1 马上执行;等 T1 执行完,再唤醒 T2,也能保证同一时刻只有一个线程执行。但是相比 Hasen 模型,T2 多了一次阻塞唤醒操作。
  • MESA 模型
    T2通知完T1后,T2继续执行,T1从条件变量等待队列进到入口等待队列并不立即执行。好处是notify不用放到代码的最后,T2也没有多余的阻塞唤醒操作。副作用是当T1再次执行时,曾经满足的条件,现在可能已不满足,所以需要以循环方式检验条件变量。

MESA 模型

解决互斥问题

将共享变量及其对共享变量的操作统一封装起来
只能通过调用管程提供的 enq()、deq()方法访问共享变量queue,enq()、deq()互斥,只允许一个线程进入管程

解决同步

条件变量和等待队列解决线程同步问题
多个线程同时试图进入管程时,只允许一个进入,其他则在入口等待队列等待
管程里的条件变量都对应有一个等待队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class BlockedQueue<T>{
final Lock lock =
new ReentrantLock();
// 条件变量:队列不满
final Condition notFull =
lock.newCondition();
// 条件变量:队列不空
final Condition notEmpty =
lock.newCondition();

// 入队
void enq(T x) {
lock.lock();
try {
while (队列已满){
// 等待队列不满
notFull.await();
}
// 省略入队操作...
// 入队后, 通知可出队
notEmpty.signal();
}finally {
lock.unlock();
}
}
// 出队
void deq(){
lock.lock();
try {
while (队列已空){
// 等待队列不空
notEmpty.await();
}
// 省略出队操作...
// 出队后,通知可入队
notFull.signal();
}finally {
lock.unlock();
}
}
}

await和wait语义一样,signal和notify语义一样
wait需要在一个while循环里面调用,这个是MESA管程特有的。

notify何时可以使用

满足以下三个条件:

  • 所有等待线程拥有相同的等待条件;在同一个条件下等待。
  • 所有等待线程被唤醒后,执行相同的操作;
  • 只需要唤醒一个线程。

总结

Java内置的管程方案是对MESA模型进行了精简。
synchronized关键字修饰的代码块,编译期自动生成加锁和解锁的代码,仅支持一个条件变量;而Java SDK并发包实现的管程支持多个条件变量,不过并发包里的锁,需要开发人员自己进行加锁和解锁操作。

并发编程需要注意的问题,总结主要包括三个问题:安全性,活跃性,性能问题

并发编程是一个复杂的技术领域,
微观上涉及到原子性问题、可见性问题和有序性问题
宏观则表现为安全性、活跃性以及性能问题。

安全性

是否线程安全:本质上就是正确性,正确性即程序按照期望执行
共享数据(多线程同时读写)时才会出现并发问题
如果数据不共享或者状态不变化就能保证线程安全性。

数据竞争:多线程访问,且一个线程会写
竞态条件:程序的执行结果依赖线程执行的顺序。两个线程同时++则结果是1,顺序执行++结果为2。

存在数据竞争、竞态条件如何保证线程安全性:互斥方案(使用锁)

活跃性

活跃性问题,指的是某个操作无法执行下去,如死锁、活锁、饥饿

死锁

线程相互等待资源(不可抢占,不让出)

活锁

同时放弃,然后又重试竞争,最后死循环
解决:等待随机事件释放资源

饥饿

指的是线程因无法访问所需资源而无法执行下去的情况。

导致饥饿的情况:

  1. 持有锁的线程长时间执行
  2. 线程繁忙时,线程优先级低很难执行

解决饥饿

  • 保证资源充足
  • 公平分配
  • 避免长期持有锁

资源的稀缺性是没办法解决的,持有锁的线程执行的时间也很难缩短。所以2的适用场景相对更多。主要使用公平锁,先到先得,按顺序获取资源。

性能

阿姆达尔(Amdahl)定律,代表了处理器并行运算之后效率提升的能力,具体公式如下:S = 1 / ( (1-p) + (p/n) )

n:CPU 的核数,p:并行百分比,(1-p):串行百分比
假设串行百分比5%, CPU的核数n无穷大,那加速比 S 的极限就是20。即,如果串行率是5%,无论采用什么技术,最高也就只能提高 20 倍的性能。
所以使用锁需关注性能影响。SDK并发包里之所以有那么多东西,有很大一部分原因就是要提升在某个特定领域的性能。

方案层面解决性能问题

  1. 使用无锁算法和结构
    如线程本地存储(TLS Thread Local Storage)、写入时复制 (Copy-on-write)、乐观锁等;Java并发包里面的原子类也是一种无锁的数据结构;Disruptor则是一个无锁的内存队列,性能都非常好…
  2. 减少锁持有的时间
    如使用细粒度锁,ConcurrentHashMap分段锁技术、读写锁(读时无锁,写时互斥)

遇到具体问题,还是要具体分析,根据特定的场景选择合适的数据结构和算法。

性能指标

我认为的重要指标:并发量,吞吐量,延迟
吞吐量:指的是单位时间内能处理的请求数量。吞吐量越高,说明性能越好。
延迟:指的是从发出请求到收到响应的时间。延迟越小,说明性能越好。
并发量:指的是能同时处理的请求数量,一般来说随着并发量的增加、延迟也会增加。所以延迟这个指标,一般都会是基于并发量来说的。例如并发量是 1000 的时候,延迟是 50 毫秒

问题

1

Java Vector是一个线程安全的容器,如下代码是否存在并发问题呢?

1
2
3
4
5
void addIfNotExist(Vector v, Object o){
if(!v.contains(o)) {
v.add(o);
}
}

vector是线程安全,指的是它方法单独执行时候线程安全,组合在一起不安全,组合操作要小心。

2

服务器上存了2000万个电话号码相关的数据,要做的是把这批号码从服务器上请求下来写入到本地的文件中,为了将数据打散到多个文件中,这里通过 电话号码%1024 得到的余数来确定这个号码需要存入到哪个文件中取,比如13888888888 % 1024 =56,那么这个号码会被存入到 56.txt的文件中,写入时是一行一个号码。
为了效率这里使用了多线程来请求数据并将请求下来的数据写入到文件,也就是每个线程包含向服务器请求数据,然后在将数据写入到电话号码对1024取余的那个文件中去,如果这么做目前会有一个隐患,多线程时如果 电话号码%1024 后定位的是同一个文件,那么就会出现多线程同时写这个文件的操作,一定程度上会造成最终结果错误。

写一个文件只需要一个线程就够了。
你可以用生产者-消费者模式试一下。
可以创建64个线程,每个线程负责16个文件,
同时创建64个阻塞队列,64个线程消费这64个阻塞队列,
电话号码%1024 % 64 进入目标阻塞队列。

原子性

原子性问题的源头是线程切换
原子性外在表现为不可分割,本质是多个资源间有一致性的要求,操作的中间状态对外不可见。

如何解决原子性问题?
禁用线程切换不就解决了吗?操作系统线程切换依赖CPU中断,所以禁用CPU发生中断就能禁用线程切换。

禁用中断不能保证多线程原子性
单CPU禁止中断禁止切换线程单线程连续执行,可保证原子性。
多CPU只能保证CPU上线程连续执行,不能保证同一时刻只有一个线程执行,如果两个线程同时写long型变量高32位,就可能出现诡异BUG。

互斥可保证共享变量修改的原子性
同一时刻只有一个线程执行成为互斥。

锁模型

  1. 创建保护资源R的锁:LR
  2. 加锁操作:lock(LR)
  3. 临界区:一段代码
    • 受保护资源:R
  4. 解锁操作 :unlock(LR)

sychronized
修饰静态方法锁住this.class
修饰实例方法锁住当前对象this

Happens-before规则:对一个锁的解锁Happens-Before于后续对此锁的加锁,指的是前一个线程的解锁操作对后一个线程的加锁操作可见,传递性得出前一个线程对共享变量的修改对后一个线程可见。所以可以解决并发Count++问题。

细粒度锁
细粒度的锁可提高并行度,是重要优化手段
细粒度锁导致死锁(竞争资源的线程因互相等待,导致永久阻塞的现象)

死锁

发生死锁的四个条件

  1. 互斥,共享资源X和Y只能被一个线程占用;
  2. 占有且等待,线程T1已经取得共享资源 X,在等待共享资源Y的时候,不释放共享资源 X;
  3. 不可抢占,其他线程不能强行抢占线程T1占有的资源;
  4. 循环等待,线程T1等待线程T2占有的资源,线程T2等待线程T1占有的资源,就是循环等待。

避免死锁

破坏其中一个即可避免死锁
锁本身互斥无法破坏,其他三个条件如何做呢?

  1. 占用切等待,一次性申请所有资源,则不再存在等待
  2. 不可抢占,申请其他资源时失败可主动释放
  3. 循环等待,按序申请资源

破坏占用切等待

一次性申请资源,需声明一个角色管理自愿申请(单例),如下转账操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
//转账前申请到两个账户的锁
class Allocator {
private List<Object> als =
new ArrayList<>();
// 一次性申请所有资源
synchronized boolean apply(Object from, Object to){
if(als.contains(from)||als.contains(to)){
return false;
} else {
als.add(from);
als.add(to);
}
return true;
}
// 归还资源
synchronized void free(
Object from, Object to){
als.remove(from);
als.remove(to);
}
}


class Account {
// actr 应该为单例
private Allocator actr;
private int balance;
// 转账
void transfer(Account target, int amt){
// 一次性申请转出账户和转入账户,直到成功
while(!actr.apply(this, target))

try{
// 锁定转出账户
synchronized(this){
// 锁定转入账户
synchronized(target){
if (this.balance > amt){
this.balance -= amt;
target.balance += amt;
}
}
}
} finally {
actr.free(this, target)
}
}
}

存在问题
那在高并发下synchronized(Account.class)会使得所有转账串行化,使用apply方法能提高转账的吞吐量。
但apply方法也有问题,在同一个账户转账操作并发量高的场景下,apply方法频繁失败,转账的线程会不断的阻塞唤醒阻塞唤醒,开销大。
也许应该改进一下由Allocator负责在有资源的情况下唤醒调用apply的线程?

等待通知机制

wait/notify/notifyAll,持有锁才可使用,且是同一把锁。
notify代表条件曾经满足,只能保证在通知时间点条件满足。而被通知线程的执行时间点和通知的时间点基本上不会重合,所以当线程执行的时候,很可能条件已经不满足了(保不齐有其他线程插队)。
notify有风险,可能导致某些线程永远不会被通知,尽量使用 notifyAll
如:
资源 A、B、C、D,线程1申请到AB,线程2申请到CD,此时线程3申请AB,线程4申请CD,线程3,4进入等待队列。
之后线程1归还AB,使用notify通知线程等待队列的线程,可能唤醒线程4,此时4继续等待,而3再也没有机会被唤醒了

1
2
3
while(条件不满足) {
wait();
}

改进后

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class Allocator {
private List<Object> als;
// 一次性申请所有资源
synchronized void apply(Object from, Object to){
// 经典写法
while(als.contains(from)||als.contains(to)){
try{
wait();
}catch(Exception e){}
}

als.add(from);
als.add(to);
}
// 归还资源
synchronized void free(
Object from, Object to){
als.remove(from);
als.remove(to);
notifyAll();
}
}

sleep,wait区别?
不同点

  • wait 释放锁,sleep不释放
  • wait只能在持有锁时可用
  • wait无参调用进入waiting状态,需唤醒
  • sleep(1000L),wait(1000L)进入time_waiting状态,不需唤醒

相同点:都会让出CPU

破坏不可抢占

破坏循环等待

需要对资源进行排序,然后按序申请资源
如id 可以作为排序字段,申请的时候,我们可以按照从小到大的顺序来申请。
①~⑥处的代码对转出账户(this)和转入账户(target)排序,然后按照序号从小到大的顺序锁定账户

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class Account {
private int id;
private int balance;
// 转账
void transfer(Account target, int amt){
Account left = this ①
Account right = target; ②
if (this.id > target.id) { ③
left = target; ④
right = this; ⑤
} ⑥
// 锁定序号小的账户
synchronized(left){
// 锁定序号大的账户
synchronized(right){
if (this.balance > amt){
this.balance -= amt;
target.balance += amt;
}
}
}
}
}

预防死锁成本需要评估,选择成本最低的方案。
破坏占用且等待条件,锁了所有的账户,且用了死循环,不过好在 apply() 这个方法基本不耗时。

问题

如何判断多线程的阻塞导致的问题呢?有什么工具吗?
可以用top命令查看Java线程的cpu利用率,用jstack来dump线程。
开发环境可以用java visualvm查看线程执行情况

0%