markword:不同虚拟机实现不同,hospot是对象头的两位(不同组合对应不同类型的锁)
可重入:同一个线程可多次获取锁 sychronized

锁升级

锁特性

synchronize(this)
synchronize(T.Class) (静态方法)

线程的基本方法

sleep 线程暂停执行,到时自动唤醒,进入就绪态
yield 将线程由执行态设置为就绪态,cpu会从众多的就绪态(可执行态)里选择
join 等待线程执行结束

创建线程对象

  • 继承Runnable
  • 继承Thread

启动线程的方式

  • runnable
  • thread
  • 线程池(也是使用前两种启动线程)

线程状态


ready:在cpu等待队列中等待
等待锁:Blocked阻塞 sychronized时会进入Blocked状态; ReentranLock.lock不会,他是自旋锁,忙等待,进入waiting状态

CAS无锁优化 自旋

相关类所在包:package java.util.concurrent.atomic

Compare and swap 说明:

1
2
3
cas(v,expected,newValue)
if v==expected set v=newValue
else fail

ABA问题

数值类型不会引发ABA问题,引用类型才有
可能引用类型对象内部已发生了改变
解决:版本号,每次改变版本号自增,同时修改值时检查版本号
AtomicStampedReference

synchronized 底层实现(hospot)

  • 早期jdk实现是重量级的(向操作系统申请锁)
  • 优化后:(锁升级)
    第一次获取锁时,只是markword 记录线程id(偏向锁)
    如果有第二个线程争用锁,升级为自旋锁(线程二空转,消耗CPU)
    自旋锁获取多次(10次)仍然无法获取锁,升级为重量级锁(此时第二个线程进入等待队列,不再消耗CPU)

自旋锁,重量级锁应用场景

自旋锁:占用CPU,不访问操作系统内核(线程数少,锁代码块执行时间短)
重量级锁:线程进入等待队列,不占用CPU,但需访问操作系统内核(线程数多,锁代码块执行时间长)

synchronized异常锁

默认,异常后自动释放锁。若数据一致性处理不好,会导致其他线程获取到中间数据(脏数据)。

synchronized优化

  • 粒度
    1. 细粒度锁,尽量不锁住不需要锁的代码
    2. 粗粒度锁,一段代码加多个细粒度锁时效率也不高(例如数据库行锁,表锁)
  • 锁对象不应发生改变

unsafe类

volatile

  • 保证线程可见性
    线程读取值默认读取线程缓存,而不是堆内存,从而导致一个线程修改了内存数据,而其他线程无法感知。此关键字强制线程读取内存中的数据
    • 多核CPU多级缓存MESI缓存一致性
  • 禁止指令重排(CPU)
  • 不保证原子性

线程可见性代码测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class TestVolatile {
boolean flag = true;

AtomicInteger atomicInteger = new AtomicInteger(0);

public void test1() throws InterruptedException {
Thread t1 = new Thread(() -> {
System.out.println("t1 start");

while (flag) {
//不能使用println
//不能使用sleep
}

System.out.println("t1 end");
});

Thread t2 = new Thread(() -> {
try {
System.out.println("t2 start sleep 3s");
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("t2 wakeup and set flag false");
flag = false;
});

t1.start();
t2.start();
}
}

问题

测试代码中test1方法的循环体内不可使用println和sleep

  • 不能使用println,println是线程安全代码,synchronized加锁会重新读取内存中的值
    1. 线程解锁前,必须把共享变量的最新值刷新到主内存中;
    2. 线程加锁时,先清空工作内存中共享变量的值,从而使用共享变量是需要从主内存中重新读取最新的值(加锁与解锁需要统一把锁)
  • 不能使用sleep(暂不清楚原因??????)

禁止指令重排DCL应用说明

赋值操作分为三个步骤 {1}堆申请空间 {2}初始化字段值 {3}返回地址给变量
若指令重排后顺序为132,则其他线程外层判空则为true,会获取到未初始化完成的对象
超高并发时可能出现此情况

CountDownLatch

门栓,实例化对象时声明门栓数
线程调用CountDownLatch.countDown();减少门栓
CountDownLatch.await等待门栓为0

比join更灵活

CyclicBarrier

满员发车

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class CustomCyclicBarrier {
public static void main(String[] args) throws InterruptedException {
CustomCyclicBarrier customCyclicBarrier = new CustomCyclicBarrier();
CyclicBarrier barrier = new CyclicBarrier(10, customCyclicBarrier::showSomething);

for (int i = 0; i < 100; i++) {
int finalI = i;
new Thread(() -> {
System.out.println(finalI + " running");
try {
barrier.await();
System.out.println(finalI + "free");
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (BrokenBarrierException e) {
throw new RuntimeException(e);
}
}).start();
Thread.sleep(1000);
System.out.println("thread started:" + i);
}
}

public void showSomething() {
System.out.println("ok");
}
}

LongAdder

解决问题:解决atomic* 并发量越大时,cas失败率越高,cpu空转,性能差
解决方案:时间换空间,维护一个值base,和一个cell数组,当线程写base有冲突时,将其写入数组的一个cell中。将base和所有cell中的值求和就得到最终LongAdder的值了。

phaser

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118

public static void main(String[] args) {
Phaser phaser = new CustomPhaser();
Random r = new Random();
phaser.bulkRegister(7);
for (int i = 0; i < 5; i++) {
new Thread(new Person("person" + i, phaser, r)).start();
}

new Thread(new Person("新郎", phaser, r)).start();
new Thread(new Person("新娘", phaser, r)).start();
}
public class Person implements Runnable {
String name;
Phaser phaser;

Random r;

int sleepSec = 3;

public Person(String name, Phaser phaser, Random random) {
this.name = name;
this.phaser = phaser;
random = r;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}


@Override
public void run() {
arrive();

eat();

leave();

hug();
}

public void arrive() {
this.sleep(sleepSec);
System.out.printf("%s 到达\n", this.name);
this.phaser.arriveAndAwaitAdvance();
System.out.println("开始吃饭+" + this.name);
}

public void eat() {
this.sleep(sleepSec);
System.out.printf("%s 吃完了\n", this.name);
this.phaser.arriveAndAwaitAdvance();
}

public void leave() {
if (this.name != "新郎" && this.name != "新娘") {
this.sleep(sleepSec);
System.out.printf("%s 离开\r\n", this.name);
this.phaser.arriveAndDeregister();
} else {
this.phaser.arriveAndAwaitAdvance();
}
}

public void hug() {
if (this.name == "新郎" || this.name == "新娘") {
this.sleep(sleepSec);
System.out.printf("%s 抱抱\n", this.name);
this.phaser.arriveAndAwaitAdvance();
} else {
this.phaser.arriveAndDeregister();
}
}


public void sleep(int millSeconds) {
try {
Thread.sleep(millSeconds);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

public void sleepRandom(int seconds) {
sleep(r.nextInt(1000) * seconds);
}
}

public class CustomPhaser extends Phaser {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
switch (phase) {
case 0:
System.out.println("------都到齐了" + registeredParties);
System.out.println();
return false;
case 1:
System.out.println("------都吃完了" + registeredParties);
System.out.println();
return false;
case 2:
System.out.println("------都离开了" + registeredParties);
System.out.println();
return false;
case 3:
System.out.println("------婚礼结束" + registeredParties);
return true;
default:
return true;
}
}
}

ReadWriteLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class CustomReadAndWriteLock {
public static void main(String[] args) {
ReadWriteLock lock = new ReentrantReadWriteLock();

Runnable read = () -> {
try {
lock.readLock().lock();
System.out.println("reading");
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.readLock().unlock();
System.out.println("read end");
}
};

Runnable write = () -> {
try {
lock.writeLock().lock();
System.out.println("writing");
Thread.sleep(5 * 1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.writeLock().unlock();
System.out.println("write end");
}
};

for (int i = 0; i < 10; i++) {
new Thread(read).start();
}

for (int i = 0; i < 2; i++) {
new Thread(write).start();
}

}
}

ReentranLock

实现是一种自旋锁CAS

condition,本质时创建多个等待队列,可唤醒指定等待队列中的线程(notifyAll唤醒所有等待队列中的线程)

trylock

1
2
3
4
5
6
7
8
ReentrantLock lock = new ReentrantLock();
try {
if (lock.tryLock(1000, TimeUnit.MILLISECONDS)) {
//do something
}
}finally {
lock.unlock();
}

lockInterruptibly

响应打断

公平锁

ReentrantLock lock = new ReentrantLock(true);
先来先执行,先检查等待队列,有其他线程等待时,进入等待队列

Semaphore

限流,最多允许多少个线程同时运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class TestSemaphore {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(5);
for (int i = 0; i < 10; i++) {
int finalI = i;
new Thread(() -> {
try {
semaphore.acquire();
System.out.println("acquire:" + finalI);
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
semaphore.release();
}
}).start();
}
}
}

Exchanger

两个线程交换数据,第一个调用exchange方法时阻塞,第二个线程调用exchange方法时交换数据后继续执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

public class TestExchange {
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
new Thread(() -> {
for (int i = 0; i < 3; i++) {
try {
TimeUnit.SECONDS.sleep(3);
String s = String.valueOf(i);
System.out.println("thread-1-org:" + s);
s = exchanger.exchange(s);
System.out.println("thread-1-exchanged:" + s);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}).start();

new Thread(() -> {
for (int i = 100; i > 97; i--) {
try {
TimeUnit.SECONDS.sleep(5);
String s = String.valueOf(i);
System.out.println("thread-2-org:" + s);
s = exchanger.exchange(s);
System.out.println("thread-2-exchanged:" + s);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}).start();
}
}

LockSuport

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class TestLockSupport {
public static void main(String[] args) throws InterruptedException {


Thread t = new Thread(() -> {
for (int i = 0; i < 10; i++) {
System.out.println(i);

if (i == 4) {
System.out.println("t park1");
LockSupport.park();
} else if (i == 8) {
System.out.println("t park2");
LockSupport.park();
}

try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});

t.start();

TimeUnit.SECONDS.sleep(6);
System.out.println("unpark");
LockSupport.unpark(t);

TimeUnit.SECONDS.sleep(1);// park之前就执行unpark也是可以的
System.out.println("unpark");
LockSupport.unpark(t);
}
}

线程interrupt打断

优雅终止线程方法之一

三个方法

interrupt 设置标志位,线程根据标志位自己决定怎么做
isInterrupt 查询标志位
static interrupted (当前线程)查询并重置标志位
可用于结束线程

interrupt与sleep、wait、join

线程在sleep 、wait、join时设置其打断标志位线程会中断并抛出异常InterruptedException

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class TestInterrupt {
public static void main(String[] args) throws InterruptedException {
new TestInterrupt().testWaitInterrupted();
}

public void testSleepInterrupted() throws InterruptedException {
Thread t = new Thread(() -> {
while (true) {
try {
Thread.sleep(500);
System.out.println("running");
} catch (InterruptedException e) {
System.out.println("InterruptedException");
System.out.println(Thread.currentThread().isInterrupted()); //输出false 异常后自动重置标志位
break;
}

if (Thread.currentThread().isInterrupted()) {
break;
}
}
},"t");
t.start();
Thread.sleep(2000);
t.interrupt();
}

public void testWaitInterrupted() throws InterruptedException {
Thread t = new Thread(() -> {
synchronized (this) {
try {
this.wait();
} catch (InterruptedException e) {
System.out.println("InterruptedException");
System.out.println(Thread.currentThread().isInterrupted());
}
}
});

t.start();

Thread.sleep(2000);

t.interrupt();
}

}

interrupt与synchronized

线程在等待锁时设置其标志位不会抛出异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void testSync() throws InterruptedException {
Thread t = new Thread(() -> {
synchronized (this) {
System.out.println("t1 locked");
try {
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println("t1 end");
});

t.start();

Thread t2 = new Thread(() -> {
synchronized (this) {
System.out.println("t2 locked");
}
System.out.println("t2 end");
});

t2.start();

t2.interrupt();
}

interrupt与ReentranLock

使用ReentranLock.lock阻塞等待获取锁时也不会被打断 ,使用lock.lookInterruptibly()获取锁可以被打断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

public void testReentrantLock() throws InterruptedException {
ReentrantLock lock = new ReentrantLock();

Thread t = new Thread(() -> {
try {
lock.lock();
System.out.println("t1 locked");
try {
Thread.sleep(6 * 1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} finally {
lock.unlock();
}

System.out.println("t1 end");
});

t.start();

Thread t2 = new Thread(() -> {
try {
lock.lockInterruptibly();
System.out.println("t2 locked");
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
System.out.println("t2 end");
});

t2.start();

Thread.sleep(2000);
t2.interrupt();
}

线程结束

stop(不建议)

不建议用stop,粗暴结束,容易产生数据不一致(释放锁,不做善后处理)
suspend/resume 暂停/继续 暂停不会释放锁,易产生死锁

interrupt

volatile flag :特定场景优雅(不是很精确)

  • 遇到sleep、wait时等待不能执行循环,不能判断标志位,不能立即结束
  • 打断时间不精确,如阻塞容器,容量为5时结束,但由于volatile同步线程标志位时间控制不是很精确,有时会延迟一会儿

interrupt :

  • sleep、wait场景下可以结束
  • 精确结束:业务线程和触发结束的线程配合

AQS(CLH)

基础框架,广泛应用于实现锁和其他同步器(如ReentrantLock、CountDownLatch、Semaphore等)

两个要点:state记录锁状态,CAS操作线程链表(等待队列)

  • state
  • 线程链表(保存要获取锁的线程链表)

state

volatile修饰 保证线程可见
作用:根据子类的实现确定其意义 如ReentrantLock用来记录是否已锁住,线程重入次数;CountdownLatch 记录 CountDown的count

线程链表

链表中的线程争用state(取锁)
向队列添加时使用CAS,
为什么是双向链表?需要考虑前一个节点的状态,若前一个节点持有锁,则等待,若已释放锁,则获取锁。
为什么添加尾节点使用CAS而不使用锁?AQS核心,CAS操作tail 、head ;替代锁整个链表
加入队列时,如果前一个结点是头结点,才尝试获得锁。若获取失败则阻塞,等待唤醒。

公平:先线程进入等待队列;
非公平:新线程尝试抢锁,抢不到进入队列

ThreadLocal

每个线程都有自己独有map
spring声明式事务,保证多个方法使用同一个链接(将连接存储在ThreadLocal中)
ThreadLocal 的key不使用后需要删除,不然会内存泄露(查看弱引用说明)

强软弱虚四种引用

软引用

SoftReference 垃圾回收时不会立刻回收,内存不足时回收 ;用于缓存

弱引用

WeakReference 垃圾回收就会回收 ; 另外一个强引用引用它时,强引用消失,弱引用就被回收。(WeakHashmap)

虚引用

JVM开发用来管理堆外内存。
JVM无法处理堆外内存,由操作系统管理。
PhantomReference<myM> saf = new PhantomReference<>(new myM(), queue1)
垃圾回收时,虚引用被加入到队列中;我们可以检测队列中是否存在值,由则手动调用对外内存回收,从而做到自动回收堆外内存。

源码

阅读原则

  • 了解骨架
  • 跑不起来不读(很困难)
  • 有目的性,理解别人的思路
  • 一条线索到底
  • 略过无关细节
  • 一般不读静态

容器

  • Collections
    • List
    • set
    • queue
  • Map

Vector/Hashtable

自带锁
Hashtable -> Hashmap -> SynchronizedMap[Collections.synchronizedMap()] -> ConcurrentHashMap

Hashtable发展历程

  • Hashtable 全部接口自带锁
  • Hashmap 无锁(现成不安全)
  • SynchronizedMap 满足Hashmap某些场景需要加锁
  • ConcurrentHashMap
    高并发写入 ConcurrentHashMap性能略低于Hashtable、SynchronizedMap
    高并发读取 ConcurrentHashMap性能远高于Hashtable、SynchronizedMap

Vector发展历程

高并发问题


虽然size、remove是线程安全的但是他们两个不是原子操作,在两个操作中间可能多个线程判断size==1,造成多个线程remove最后一个。

解决1-sychronize

同步块包围size和remove操作

解决2-ConcurrentLinkedQueue


ConcurrentLinkedQueue使用CAS

总结

Vector使用snychronized、ConcurrentLinkedQueue使用CAS
性能应考虑并发量和并发操作耗时,不同场景有自己的优势,根据实际压测决定。

ConcurrentHashmap/ConcurrentSkipListMap

map中有hashmap无序,treemap有序的区别但线程安全的类却使用ConcurrentSkipListMap而没有实现ConcurrentTreeMap,因为ConcurrentHashmap使用CAS操作,用在树结构时实现复杂,故而使用跳表代替

CopyOnWriteList

适用于读很多写少的情况,对比SynchronizedList
读操作加锁,写操作不加锁
写操作:加锁、复制一份数组数据到新的数组空间(数组长度已+1)、新数组的引用、释放锁

Queue

LinkedQueue

ConcurrentLinkedQueue

peek 取但不删除
pool 取且删除

BlockingQueue

LinkedBlockingQueue

put,take 如果队列已满则阻塞等待,直到可以添加或获取

ArrayBlockingQueue

有上限,LinkedBlockingQueue无上限
add达到上限再添加异常
offer不会异常,返回false;可以指定等待时间
put阻塞,等待

DelayQueue

按紧迫程度排序
按时间任务调度
实现

SynchronousQueue

容量为0,add操作会抛出异常
put take互相阻塞,执行put后阻塞等待其他线程take。执行take后阻塞等待其他线程put。(手递手)

LinkedTransferQueue

可以add元素

在队列中已有元素的情况下,调用 transfer 方法,可以确保队列中被传递元素之前的所有元素都能被处理。
transfer操作阻塞直到任务被take

PriorityQueue

二叉树,堆排序

List Queue比较

BlockingQueue提供了很多线程友好的api,如
offer(返回false)
peek、
poll
put、take(阻塞)而不是直接异常;(blockingQueue提供)

并发编程三大特性

线程池

自定义线程池

  • Executor
  • ForkJoinPool

Executor

runnable

相比Runnable,有返回值

Future

获取结果

FutureTask

1
2
3
4
5
6
7
8
9
10
public static void testFutureTask() throws ExecutionException, InterruptedException {
FutureTask<String> ft = new FutureTask(() -> {
System.out.println("ft");
TimeUnit.SECONDS.sleep(5);
return "1";
});

new Thread(ft).start();
System.out.println(ft.get());
}

CompletableFuture

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

CompletableFuture<String> c1 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("sleep 1");
return "1";
});
CompletableFuture<String> c3 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("sleep 3");
return "3";
});

Supplier<String> stringSupplier = () -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("sleep 2");
return "2";
};
CompletableFuture<String> c2 = CompletableFuture.supplyAsync(stringSupplier);

CompletableFuture.allOf(c1, c2, c3).join();

CompletableFuture.supplyAsync(stringSupplier)
.thenApply(String::valueOf)
.thenApply(s -> "str" + s)
.thenAccept(System.out::println);
System.in.read();

ThreadPoolExecutor参数说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

ThreadPoolExecutor executor = new ThreadPoolExecutor(
2//核心线程(不释放)
, 4//最大线程数
,60//空闲时间
,TimeUnit.SECONDS//空闲时间(单位)
,new ArrayBlockingQueue<>(4)//任务队列,使用不同的BlockingQueue会产生不同的线程池,linkedBlockingQueue 最多Integer.Max个任务,
,Executors.defaultThreadFactory()//指定了name group等
,new ThreadPoolExecutor.CallerRunsPolicy()//拒绝策略,2个核心线程在忙,后续线程进入任务队列,任务队列满,创建新线程执行任务,线程数达到最大仍然忙不过来,执行拒绝策略
//1. Abort 异常
//2.Discard 扔掉,不抛异常
//3.DiscardOldest 扔掉排队时间最久的任务,应用场景:旧数据相对来说没有意义了
//4.CallerRunsPolicy 在调用executor线程中执行,若此线程已终止则Discard
//一般会自定义处理策略,需要保存消息,尤其是对于订单等请求需要记录请求日志;大量任务不能被消费处理时,需要机器扩容
);

ThreadPoolExecutor线程池

newSingleThreadExecutor

Executors.newSingleThreadExecutor()
适用场景:顺序执行任务

newCachedThreadPool

线程池中的线程数有弹性
适用场景:流量不确定,存在高峰低谷

newFixedThreadPool

适用场景

流量平稳,不会出现高峰(不回因为线程不足任务堆积)
并行计算(parallel)

newScheduledThreadPool

适用场景:定时任务
scheduleAtFixedRate以固定的频率执行,period(周期)指的是两次成功执行之间的时间。上一个任务开始的时间计时,一个period后,检测上一个任务是否执行完毕,如果上一个任务执行完毕,则当前任务立即执行,如果上一个任务没有执行完毕,则需要等上一个任务执行完毕后立即执行。
scheduleWithFixedDelay以固定的延时执行,delay(延时)指的是一次执行终止和下一次执行开始之间的延迟。

ThreadPoolExecutor源码

常用变量解释

ctl

ctl int型 前三位表示线程池状态 后29位表示线程数

线程池5种状态

RUNNINg 运行
SHUTDOWN 调用shutdown方法进入shutdown状态
STOP 调用shutdownnow 马上停止
TIDYING 调用shutdown后,线程执行完成了,还在整理数据的状态
TERMINATED 终止了

其他方法

提交任务的方法Execute

核心线程处理 -> 核心线程队列 -> 非核心线程处理 -> 拒绝策略

  1. 获取线程池状态
  2. 有空闲的core线程则交予其处理
  3. 无空闲core线程放入core线程任务队列
  4. core线程任务队列已满,添加新非core线程执行此任务
  5. 若无法添加非core线程执行任务,执行拒绝策略

AddWorker

1.线程数+1
2.线程池中加入worker线程

Worker类(线程池任务单元)

1
2
3
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable

继承自AQS、Runnable,本身是锁且可执行
多线程执行任务时,只能有一个成功。先获取到锁的成功。

ForkJoinPool

大任务切分成小任务

ForkJoinTask

RecursiveAction

RecursiveAction extends ForkJoinTask

WorkStealingPool

Executors.newWorkStealingPool()返回的是ForkJoinPool
每个线程都有任务队列,自己的任务执行完后,取其他线程的任务队列中的任务

JMH

官方示例 https://hg.openjdk.org/code-tools/jmh/file/2be2df7dbaf8/jmh-samples/src/main/java/org/openjdk/jmh/samples/

什么是JMH

Java Microbenchmark Harness
Java微基准测试工具

创建JMH测试

引入依赖

1
2
3
4
5
6
7
8
9
10
11
12
<!--jmh 基准测试 -->
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
<version>1.23</version>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<version>1.23</version>
<scope>provided</scope>
</dependency>

安装插件

配置

JMH中的基本概念

warmup

预热 @Warmup(iterations = 3,time = 5,timeUnit = TimeUnit.SECONDS)
预热3轮 ,每轮5s

Measurement

测试
@Measurement(iterations = 3,time = 5)
测试3论 每轮5s(时间单位默认s)

Fork

创建多进程测试

Threads

创建多线程测试

BenchmarkMode

测试模式

  • Throughput吞吐量 点位时间调用次数;
  • AverageTime:平均耗时,指的是每次执行的平均时间。如果这个值很小不好辨认,可以把统计的单位时间调小一点;
  • SampleTime: 随机 取样 ;
  • SingleShotTime:执行一次,测试启动;
  • All:所有的指标,都算一遍,

Disruptor

用于替代并发线程间数据交换的环形队列的、基本无锁(使用cas)的(只有部分等待策略存在)、高性能的线程间通讯框架

特点

  • 环形数组:覆盖旧的数据,降低GC频率,且数组对于处理器缓存机制更友好
  • 无锁(使用CAS),高性能,单机高并发
  • 位运算确定index(比取模快)

数组实现队列,ConcurrentLinkedQueue是链表实现,且
实现了基于事件的生产者消费者模式(观察者模式)

RingBuffer

只记录下一个有效元素位置(sequence),数组实现,没有首尾指针(ConcurrentLinkedQueue添加删除时要加锁)。

长度设为2的n次幂,利于二进制计算,例如:第12个元素存放位置12%8=12&(8-1) pos=num&(size-1)

buffer大小取决于:消息大小,内存大小

基本用法

普通写法

lamda表达式

指定生产者线程模式

  • Single(确定生产者只有一个线程时使用)
  • Multi

等待策略

Block

指定多消费者

多消费者,对应多线程

异常处理

java线程池体系

面试题

1

实现一个容器,提供两个方法,add,size
写两个线程,线程1添加10个元素到容器中,线程2监控元素的个数,当5个时,线程2给出提示并结束

wait/notify实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63

public class q1 {
public static void main(String[] args) throws InterruptedException {
MyContainer myContainer = new MyContainer();
Object locker = new Object();

Thread t1 = new Thread(() -> {
synchronized (locker) {
for (int i = 0; i < 10; i++) {
myContainer.add(i);
System.out.println("size:" + myContainer.size());
if (myContainer.size() == 5) {
try {
System.out.println("t1 notify");
locker.notify();
System.out.println("t1 wait");
locker.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

}
}
});


Thread t2 = new Thread(() -> {
synchronized (locker) {
try {
System.out.println("t2 wait");
locker.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

if (myContainer.size() == 5) {
System.out.println("t2 notify");
locker.notify();
}
System.out.println("t2 end");
}
});

t2.start();
Thread.sleep(100);
t1.start();

}
}

class MyContainer {
List<Integer> list = Collections.synchronizedList(new ArrayList<>());

public void add(Integer i) {
list.add(i);
}

public int size() {
return list.size();
}
}

CountDownLatch实现

LockSupport实现

2

为什么说AQS是CAS+volatile

写一个固定容量的同步容器,有put和get方法,以及getCount方法,能支持两个生产者线程以及10个消费者线程

问题

  • ThreadGroup? new Thread(ThreadGroup)

  • synchronized reentranlock 锁升级?

  • 如何确定站点的并发量

  • wait notify
    wait 释放锁
    notify 不释放锁

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    public static void main(String[] args) throws InterruptedException {

    Object locker = new Thread();
    new Thread(() -> {
    System.out.println("t lock");
    synchronized (locker) {
    try {
    System.out.println("t will sleep 5s");
    TimeUnit.SECONDS.sleep(5);
    } catch (InterruptedException e) {
    throw new RuntimeException(e);
    }

    try {
    System.out.println("t sleep end ,wait");
    locker.wait();
    System.out.println("t wait end");
    } catch (InterruptedException e) {
    throw new RuntimeException(e);
    }
    }
    }).start();

    TimeUnit.SECONDS.sleep(1);

    System.out.println("main lock");
    synchronized (locker) {
    System.out.println("main notify");
    locker.notify();
    try {
    TimeUnit.SECONDS.sleep(10);
    } catch (InterruptedException e) {
    throw new RuntimeException(e);
    }
    System.out.println("main lock end");
    }
    }
  • await signal

  • 并发/并行
    并行(同时执行)多cpu同时执行
    并发(同时提交)包含并行,并发也包括1cpu交替执行两个任务,在人看来是同时执行的

  • JDK中没有ConcurrentArrayQueue
    树状图 Queue

  • DaemonThreadFactory
    ???

使用步骤

  • 创建
  • 转换为其他流的操作
  • 终止操作,产生结果(执行之前的惰性操作)
    Read more »

图解HTTPS

  1. 第一次握手
    • client发送(client hello消息)TLS版本,密码套件,随机数1给服务端
    • 收到server发送的ack结束
      Read more »

Measuring personal growth

Rate of change

Every 3-6 years, become a different person

Time to solve problems

  1. For the first decade after graduation, you figure out what you want to do with your life
  2. For the next decade, you get married, buy a house, and have kids.
  3. For the next decade, you build out your savings to retire.

Number of future options

take actions that help me maximize future options.

0%