19CountDownLatch和CyclicBarrier

让多线程步调一致

业务介绍:
在线商城下单,生成电子订单,保存在订单库;
物流生成派送单发货,派送单保存在派送单库。
为防止漏派送或重复派送,对账系统每天还会校验是否存在异常订单。

串行方式

1
2
3
4
5
6
7
8
9
10
while(存在未对账订单){
// 查询未对账订单
pos = getPOrders();
// 查询派送单
dos = getDOrders();
// 执行对账操作
diff = check(pos, dos);
// 差异写入差异库
save(diff);
}

并行优化对账系统

并行查询订单和派送单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
while(存在未对账订单){
// 查询未对账订单
Thread T1 = new Thread(()->{
pos = getPOrders();
});
T1.start();
// 查询派送单
Thread T2 = new Thread(()->{
dos = getDOrders();
});
T2.start();
// 等待 T1、T2 结束
T1.join();
T2.join();
// 执行对账操作
diff = check(pos, dos);
// 差异写入差异库
save(diff);
}

线程池优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 创建 2 个线程的线程池
Executor executor =
Executors.newFixedThreadPool(2);
while(存在未对账订单){
// 查询未对账订单
executor.execute(()-> {
pos = getPOrders();
});
// 查询派送单
executor.execute(()-> {
dos = getDOrders();
});

/* ??如何实现等待??*/

// 执行对账操作
diff = check(pos, dos);
// 差异写入差异库
save(diff);
}

如下,使用线程池如何知道查询线程完成了呢

  1. 可通过管程实现计数器,计数器初始2,查询完成–,计数器为0则重置为2并执行对对账入库
  2. 使用CountDownLatch

CountDownLatch优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 创建 2 个线程的线程池
Executor executor =
Executors.newFixedThreadPool(2);
while(存在未对账订单){
// 计数器初始化为 2
CountDownLatch latch =
new CountDownLatch(2);
// 查询未对账订单
executor.execute(()-> {
pos = getPOrders();
latch.countDown();
});
// 查询派送单
executor.execute(()-> {
dos = getDOrders();
latch.countDown();
});

// 等待两个查询操作结束
latch.await();

// 执行对账操作
diff = check(pos, dos);
// 差异写入差异库
save(diff);
}

进一步性能优化

优化点:两个查询操作和对账操作check、save仍是串行

需要引入订单队列和派送单队列,T1查询订单,T2查询派送单,T1T2各自生产1条数据后通知T3执行对账。(T1T2要互相等待)

实现计数器优化

可以利用一个计数器来解决这两个难点,计数器初始化为 2,线程 T1 和 T2 生产完一条数据都将计数器减 1,如果计数器大于 0 则线程 T1 或者 T2 等待。如果计数器等于 0,则通知线程 T3,并唤醒等待的线程 T1 或者 T2,与此同时,将计数器重置为 2,这样线程 T1 和线程 T2 生产下一条数据的时候就可以继续使用这个计数器了。

但建议使用Java并发包工具类:CyclicBarrier。

CyclicBarrier

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// 订单队列
Vector<P> pos;
// 派送单队列
Vector<D> dos;
// 执行回调的线程池
Executor executor =
Executors.newFixedThreadPool(1);
final CyclicBarrier barrier =
new CyclicBarrier(2, ()->{
executor.execute(()->check());
});

void check(){
P p = pos.remove(0);
D d = dos.remove(0);
// 执行对账操作
diff = check(p, d);
// 差异写入差异库
save(diff);
}

void checkAll(){
// 循环查询订单库
Thread T1 = new Thread(()->{
while(存在未对账订单){
// 查询订单库
pos.add(getPOrders());
// 等待
barrier.await();
}
}
T1.start();
// 循环查询运单库
Thread T2 = new Thread(()->{
while(存在未对账订单){
// 查询运单库
dos.add(getDOrders());
// 等待
barrier.await();
}
}
T2.start();
}

总结

CountDownLatch 主要用来解决一个线程等待多个线程的场景,计数器是不能循环利用的,计数器为0时await直接通过

CyclicBarrier 是一组线程之间互相等待,计数器是可以循环利用的,可设置回调函数

还可使用线程池Future优化

1
2
3
CompletableFuture<List> pOrderFuture = CompletableFuture.supplyAsync(this::getPOrders);
CompletableFuture<List> dOrderFuture = CompletableFuture.supplyAsync(this::getDOrders);
pOrderFuture.thenCombine(dOrderFuture, this::check).thenAccept(this::save);

问题

CyclicBarrier的回调函数我们使用了一个固定大小的线程池,有必要吗?