36生产者消费者模式

用流水线思想提高效率

生产消费模式优点

解耦、异步削峰(异步平衡生产消费速度差)

支持批量执行以提升性能

首先是以阻塞方式获取任务队列中的一条任务,而后则是以非阻塞的方式获取任务;
阻塞方式如果任务队列中没有任务能够避免无谓的循环

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
//任务队列
BlockingQueue<Task> bq = new LinkedBlockingQueue<>(2000);
//启动5个消费者线程
//执⾏批量任务
void start () {
ExecutorService es = xecutors
.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
es.execute(() -> {
try {
while (true) {
//获取批量任务
List<Task> ts = pollTasks(); //执⾏批量任务
execTasks(ts);
}
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
//从任务队列中获取批量任务
List<Task> pollTasks ()
throws InterruptedException {
List<Task> ts = new LinkedList<>();
//阻塞式获取⼀条任务
Task t = bq.take();
while (t != null) {
ts.add(t);
//⾮阻塞式获取⼀条任务
t = bq.poll();
}
return ts;
}
//批量执⾏任务
execTasks(List < Task > ts) {
//省略具体代码⽆数
}

支持分阶段提交以提升性能

自定义日志组件,采用异步刷盘方式,刷盘的时机:

  1. ERROR级别的日志需要立即刷盘;
  2. 数据积累到500条需要立即刷盘;
  3. 存在未刷盘数据,且5秒钟内未曾刷盘,需要立即刷盘。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
class Logger {
//任务队列
final BlockingQueue<LogMsg> bq
= new BlockingQueue<>();
//flush批量
static final int batchSize = 500;
//只需要⼀个线程写⽇志
ExecutorService es =
Executors.newFixedThreadPool(1);

//启动写⽇志线程
void start() {
File file = File.createTempFile("foo", ".log");
final FileWriter writer =
new FileWriter(file);
this.es.execute(() -> {
try {
//未刷盘⽇志数量
int curIdx = 0;
long preFT = System.currentTimeMillis();
while (true) {
LogMsg log = bq.poll(
5, TimeUnit.SECONDS);
//写⽇志
if (log != null) {
writer.write(log.toString());
++curIdx;
}
//如果不存在未刷盘数据,则⽆需刷盘
if (curIdx <= 0) {
continue;
}
//根据规则刷盘
if (log != null && log.level == LEVEL.ERROR ||
curIdx == batchSize ||
System.currentTimeMillis() - preFT > 5000) {
writer.flush();
curIdx = 0;
preFT = System.currentTimeMillis();
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
writer.flush();
writer.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}

//写INFO级别⽇志
void info(String msg) {
bq.put(new LogMsg(
LEVEL.INFO, msg));
}

//写ERROR级别⽇志
void error(String msg) {
bq.put(new LogMsg(
LEVEL.ERROR, msg));
}
}

//⽇志级别
enum LEVEL {
INFO, ERROR
}

class LogMsg {
LEVEL level;
String msg;

//省略构造函数实现
LogMsg(LEVEL lvl, String msg) {
}

//省略toString()实现
String toString() {
}
}

总结

Java线程池本身就是一种生产者-消费者模式的实现,每次只能从任务队列中消费一个任务来执行,批量执行以及分阶段提交的场景需自己实现。

生产-消费模式在分布式场景下,借助消息队列实现。
MQ支持两种模型:一种是点对点模型,一种是发布订阅模型
点对点:一个消息只会被一个消费者消费
发布订阅:一个消息会被多个消费者消费,本质上是一种消息的广播