什么是数据库连接池

作用:避免重量级资源(数据库连接)频繁创建和销毁

HiKariCP性能高 微观上在字节码角度优化java代码,编译出的字节码更高效
宏观上和两个数据结构有关:FastList、ConcurrentBag

FastList解决了哪些性能问题

查找顺序变逆序查找

假设一个Connection依次创建6个Statement,分别是S1、S2、S3、S4、S5、S6,按照正常的编码习惯,关闭Statement的顺序一般是逆序的,关闭的顺序是:S6、S5、S4、S3、S2、S1,而ArrayList的remove(Object o)方法是顺序遍历查找,逆序删除而顺序查找,这样的查找效率就太慢了。如何优化呢?很简单,优化成逆序查找就可以了。

优化越界检查

保证不会越界,不需要每次进行越界检查

ConcurrentBag

add方法

创建一个数据库连接,add方法加入到ConcurrentBag中
逻辑说明:将连接加入共享队列sharedList中,如果此时有线程在等待数据库连接,那么就通过handoffQueue将其分配给等待的线程。

1
2
3
4
5
6
7
8
9
10
11
12
//将空闲连接添加到队列
void add(final T bagEntry){
//加⼊共享队列
sharedList.add(bagEntry);
//如果有等待连接的线程,
//则通过handoffQueue直接分配给等待的线程
while(waiters.get() > 0
&& bagEntry.getState() == STATE_NOT_IN_USE
&& !handoffQueue.offer(bagEntry)) {
yield();
}
}

borrow方法,获取空闲数据库连接

  1. 优先从线程本地存储获取空闲连接
    • 需要用CAS方法防止重复分配,因为本地存储中的连接是可以被其他线程窃取
  2. 没有,则到共享队列获取(CAS获取)
  3. 共享队列中没有,则请求线程等待

requite 释放连接

连接状态更改为STATE_NOT_IN_USE
有等待线程则分配给它,没有则保存到线程本地存储

总结

HiKariCP中的FastList和ConcurrentBag这两个数据结构设计巧妙,适用于数据库连接池这个特定的场景。
FastList适用于逆序删除场景
ConcurrentBag通过ThreadLocal做一次预分配,避免直接竞争共享资源,非常适合池化资源的分配。

高性能有界内存队列
高性能原因

  • 内存分配更加合理,RingBuffer结构,数组初始化一次性创建提升缓存命中,对象循环利用,避免GC
  • 避免伪共享
  • 无锁算法
  • 支持批量消费,消费者可无锁方式消费多个消息

避免为共享

由于共享缓存行导致缓存无效的场景
举例说明:
两个变量long l1 = 1; long l2=2;存储在数组中, CPU1、CPU2缓存行都加载了l1、l2,CPU1修改l1,CPU2缓存行 会失效,即使CPU2不操作l1也会失效,造成伪共享。

避免缓存行失效,填充字节避免数据在一个缓存行
缺点:牺牲内存

无锁算法

总结

并发性能优化到极致,两个方向:

  • 无锁算法避免争用
  • 将CPU性能发挥到机制(填充方式避免伪共享就)

Reactor模式

核心逻辑
while(true){} 的方式调用事件多路选择器提供的select()方法监听网络事件,有就绪网络事件就绪遍历事件处理器处理。

Netty线程模型

EventLoop

核心概念EventLoop事件循环 对应Reactor模式的reactor,负责网络事件监听和调用事件处理器处理。
多线程对应一个EventLoop,EventLoop和Java线程11对应,即一个网络连接只对应一个线程,避免并发问题

EventLoopGroup

处理TCP连接请求和读写请求是两个不同的socket
bossGroup处理连接请求,workerGroup处理读写请求;bossGroup处理完连接请求后,会将连接提交给workerGroup,轮训选择其中一个EventLoop处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//限流器流速:2个请求/秒
RateLimiter limiter = RateLimiter.create(2.0);
//执⾏任务的线程池
ExecutorService es = Executors.newFixedThreadPool(1);
//记录上⼀次执⾏时间
prev = System.nanoTime();
//测试执⾏20次
for (int i = 0; i < 20; i++) {
//限流器限流
limiter.acquire();
//提交任务异步执⾏
es.execute(() -> {
long cur = System.nanoTime();
//打印时间间隔:毫秒
System.out.println(
(cur - prev) / 1000_000);
prev = cur;
});
}

令牌桶算法

  1. 令牌以固定的速率添加到令牌桶中,假设限流的速率是r/秒,则令牌每1/r秒会添加一个;
  2. 假设令牌桶的容量是b,如果令牌桶已满,则新的令牌会被丢弃;
  3. 请求能够通过限流器的前提是令牌桶中有令牌。

令牌桶的容量b:限流器允许的最大突发流量

使用定时器定时生成令牌到桶方案,高并发情况下有问题:
当系统压力已经临近极限的时候,定时器的精度误差会非常大,同时定时器本身会创建调度线程,也会对系统的性能产生影响。

Guava如何实现令牌桶算法

记录并动态计算下一令牌发放的时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58

class SimpleLimiter {
//当前令牌桶中的令牌数量
long storedPermits = 0;
//令牌桶的容量
long maxPermits = 3;
//下⼀令牌产⽣时间
long next = System.nanoTime();
//发放令牌间隔:纳秒
long interval = 1000_000_000;

//请求时间在下⼀令牌产⽣时间之后,则
// 1.重新计算令牌桶中的令牌数
// 2.将下⼀个令牌发放时间重置为当前时间
void resync(long now) {
if (now > next) {
//新产⽣的令牌数
long newPermits = (now - next) / interval;
//新令牌增加到令牌桶
storedPermits = min(maxPermits, storedPermits + newPermits);
//将下⼀个令牌发放时间重置为当前时间
next = now;
}
}

//预占令牌,返回能够获取令牌的时间
synchronized long reserve(long now) {
resync(now);
//能够获取令牌的时间
long at = next;
//令牌桶中能提供的令牌
long fb = min(1, storedPermits);
//令牌净需求:⾸先减掉令牌桶中的令牌
long nr = 1 - fb;
//重新计算下⼀令牌产⽣时间
next = next + nr * interval;
//重新计算令牌桶中的令牌
this.storedPermits -= fb;
return at;
}

//申请令牌
void acquire() {
//申请令牌时的时间
long now = System.nanoTime();
//预占令牌
long at = reserve(now);
long waitTime = max(at - now, 0);
//按照条件等待
if (waitTime > 0) {
try {
TimeUnit.NANOSECONDS.sleep(waitTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

支持预热

漏桶算法

漏桶会按照一定的速率自动将水漏掉,只有漏桶里还能注入水的时候,请求才能通过限流器。

避免共享设计模式

  • Immutability模式
    对象属性的不可变性
  • Copy-on-Write模式
    性能问题
  • 线程本地存储模式
    异步执行问题

多线程版本IF的设计模式

  • Guarded Suspension模式
  • Balking模式

三种最简单的分工模式

  • Thread-Per-Message模式

  • Worker Thread模式

  • 生产者-消费者模式

用流水线思想提高效率

生产消费模式优点

解耦、异步削峰(异步平衡生产消费速度差)

支持批量执行以提升性能

首先是以阻塞方式获取任务队列中的一条任务,而后则是以非阻塞的方式获取任务;
阻塞方式如果任务队列中没有任务能够避免无谓的循环

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
//任务队列
BlockingQueue<Task> bq = new LinkedBlockingQueue<>(2000);
//启动5个消费者线程
//执⾏批量任务
void start () {
ExecutorService es = xecutors
.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
es.execute(() -> {
try {
while (true) {
//获取批量任务
List<Task> ts = pollTasks(); //执⾏批量任务
execTasks(ts);
}
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
//从任务队列中获取批量任务
List<Task> pollTasks ()
throws InterruptedException {
List<Task> ts = new LinkedList<>();
//阻塞式获取⼀条任务
Task t = bq.take();
while (t != null) {
ts.add(t);
//⾮阻塞式获取⼀条任务
t = bq.poll();
}
return ts;
}
//批量执⾏任务
execTasks(List < Task > ts) {
//省略具体代码⽆数
}

支持分阶段提交以提升性能

自定义日志组件,采用异步刷盘方式,刷盘的时机:

  1. ERROR级别的日志需要立即刷盘;
  2. 数据积累到500条需要立即刷盘;
  3. 存在未刷盘数据,且5秒钟内未曾刷盘,需要立即刷盘。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
class Logger {
//任务队列
final BlockingQueue<LogMsg> bq
= new BlockingQueue<>();
//flush批量
static final int batchSize = 500;
//只需要⼀个线程写⽇志
ExecutorService es =
Executors.newFixedThreadPool(1);

//启动写⽇志线程
void start() {
File file = File.createTempFile("foo", ".log");
final FileWriter writer =
new FileWriter(file);
this.es.execute(() -> {
try {
//未刷盘⽇志数量
int curIdx = 0;
long preFT = System.currentTimeMillis();
while (true) {
LogMsg log = bq.poll(
5, TimeUnit.SECONDS);
//写⽇志
if (log != null) {
writer.write(log.toString());
++curIdx;
}
//如果不存在未刷盘数据,则⽆需刷盘
if (curIdx <= 0) {
continue;
}
//根据规则刷盘
if (log != null && log.level == LEVEL.ERROR ||
curIdx == batchSize ||
System.currentTimeMillis() - preFT > 5000) {
writer.flush();
curIdx = 0;
preFT = System.currentTimeMillis();
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
writer.flush();
writer.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}

//写INFO级别⽇志
void info(String msg) {
bq.put(new LogMsg(
LEVEL.INFO, msg));
}

//写ERROR级别⽇志
void error(String msg) {
bq.put(new LogMsg(
LEVEL.ERROR, msg));
}
}

//⽇志级别
enum LEVEL {
INFO, ERROR
}

class LogMsg {
LEVEL level;
String msg;

//省略构造函数实现
LogMsg(LEVEL lvl, String msg) {
}

//省略toString()实现
String toString() {
}
}

总结

Java线程池本身就是一种生产者-消费者模式的实现,每次只能从任务队列中消费一个任务来执行,批量执行以及分阶段提交的场景需自己实现。

生产-消费模式在分布式场景下,借助消息队列实现。
MQ支持两种模型:一种是点对点模型,一种是发布订阅模型
点对点:一个消息只会被一个消费者消费
发布订阅:一个消息会被多个消费者消费,本质上是一种消息的广播

如何优雅地终止线程

interrupt()方法和线程终止的标志位

  • 捕获异常重新设置标志位
  • 自定义标志位,避免第三方库处理中指
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    class Proxy {
    boolean started = false;
    //采集线程
    Thread rptThread;

    //启动采集功能
    synchronized void start() {
    //不允许同时启动多个采集线程
    if (started) {
    return;
    }
    started = true;
    rptThread = new Thread(() -> {
    while (!Thread.currentThread().isInterrupted()) {
    //省略采集、回传实现
    report();
    //每隔两秒钟采集、回传⼀次数据
    try {
    Thread.sleep(2000);
    } catch (InterruptedException e) {
    //重新设置线程中断状态
    Thread.currentThread().interrupt();
    }
    }
    //执⾏到此处说明线程⻢上终⽌
    started = false;
    });
    rptThread.start();
    }

    //终⽌采集功能
    synchronized void stop() {
    rptThread.interrupt();
    }
    }
    上面示例代码能够解决问题,但是建议工作中谨慎使用。原因在于可能在线程的run()方法中调用第三方类库的方法,没有办法保证第三方类库正确处理了线程的中断异常(例如第三方类库在捕获到Thread.sleep()方法抛出的中断异常后,没有重新设置线程的中断状态,那么就会导致线程不能够正常终止。)
    强烈建议设置自己的线程终止标志位,如下代码中,使用isTerminated作为线程终止标志位,此时无论是否正确处理了线程的中断异常,都不会影响线程优雅地终止。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    class Proxy {
    //线程终⽌标志位
    volatile boolean terminated = false;
    boolean started = false;
    //采集线程
    Thread rptThread;

    //启动采集功能
    synchronized void start() {
    //不允许同时启动多个采集线程
    if (started) {
    return;
    }
    started = true;
    terminated = false;
    rptThread = new Thread(() -> {
    while (!terminated) {
    //省略采集、回传实现
    report();
    //每隔两秒钟采集、回传⼀次数据
    try {
    Thread.sleep(2000);
    } catch (InterruptedException e) {
    //重新设置线程中断状态
    Thread.currentThread().interrupt();
    }
    }
    //执⾏到此处说明线程⻢上终⽌
    started = false;
    });
    rptThread.start();
    }

    //终⽌采集功能
    synchronized void stop() {
    //设置中断标志位
    terminated = true;
    //中断线程rptThread
    rptThread.interrupt();
    }
    }

如何优雅地终止线程池

shutdown()和shutdownNow()

  • shutdown
    拒绝新任务,等待线程池中正在执行的任务和已进入阻塞队列的任务执行完后关闭线程池
  • shutdownNow
    拒绝接收新的任务,中断线程池中正在执行的任务,已进入阻塞队列的任务也被剥夺执行机会,不过这些被剥夺执行机会的任务会作为shutdownNow()方法的返回值返回。
    如果需要优雅地结束,需要正确处理线程中断。

正确地创建线程池

  • 使用有界队列
  • 指明拒绝策略
  • 业务相关的名字

线程死锁的场景

现象:应用每运行一段时间偶尔就会处于无响应的状态,监控数据看上去一切都正常,但是实际上已经不能正常工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//L1、L2两阶段任务共用一个线程池
ExecutorService es = Executors.
newFixedThreadPool(2);
//L1阶段的闭锁
CountDownLatch l1=new CountDownLatch(2);
for (int i=0; i<2; i++){
System.out.println("L1");
//执行L1阶段任务
es.execute(()->{
//L2阶段的闭锁
CountDownLatch l2=new CountDownLatch(2);
//执行L2阶段子任务
for (int j=0; j<2; j++){
es.execute(()->{
System.out.println("L2");
l2.countDown();
});
}
//等待L2阶段任务执行完
l2.await();
l1.countDown();
});
}
//等着L1阶段任务执行完
l1.await();
System.out.println("end");

两个线程全部都阻塞在 l2.await();没有空闲的线程执行L2阶段的任务了.
解决办法:不同阶段任务使用不同线程池

提交到相同线程池中的任务一定是相互独立的,否则就一定要慎重

总结

  • 正确创建线程池
    使用有界队列
    指明拒绝策略
    业务相关的名字
  • 避免死锁
  • ThreadLocal内存泄露问题
  • 异常处理

线程实现成本高,频繁创建销毁成本高,无限创建OOM
轻量级线程:协程

OpenJDK Loom项目,解决Java语言的轻量级线程问题,轻量级线程被叫做Fiber

0%