25CompletionService

批量执行异步任务

CompletionService将线程池和阻塞队列组合使用,让批量管理异步任务更简单,先执行完先进入阻塞队列。

构造方法

1
2
ExecutorCompletionService(Executor executor);
ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue)。

自己创建线程池可提供线程池隔离特性,避免耗时任务拖垮系统。

接口

1
2
3
4
5
Future<V> submit(Callable<V> task);
Future<V> submit(Runnable task, V result);
Future<V> take() throws InterruptedException;//空队列阻塞
Future<V> poll();//空队列返回null
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;//等待unit时间仍空队列返回null

实现Dubbo中的Forking Cluster

Dubbo中有一种叫做Forking的集群模式,并行调用服务,一个返回结果,整个服务就返回
如地址转坐标服务,为保证服务高可用和性能,可以并行调用3个地图服务商API,只要有1个正确返回,那么地址转坐标这个服务就直接返回。这种集群模式可以容忍2个地图服务商服务异常,缺点是消耗的资源偏多。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public class _25CompletionService {
@Test
void do1() throws InterruptedException, ExecutionException {
ExecutorService pool = Executors.newFixedThreadPool(9);
ExecutorCompletionService service = new ExecutorCompletionService(pool, new ArrayBlockingQueue<>(10));
List<Future> list = new ArrayList<>();
Future<String> f1 = service.submit(() -> {
return getPrice1Sleep10();
});
list.add(f1);
Future<String> f2 = service.submit(() -> {
return getPrice2Sleep2();
});
list.add(f2);
Future<String> f3 = service.submit(() -> {
return getPrice3Sleep5();
});
list.add(f3);

BlockingQueue<String> bq =
new LinkedBlockingQueue<>(10);

try {
for (int i = 0; i < 3; i++) {
System.out.printf("take %s\r\n", i);
Future take = service.take();
String r = take.get().toString();
System.out.printf("take %s ok,r:%s\r\n", i, r);
if (r != null) {
break;
}
}
}finally {
for (Future f : list) {
f.cancel(true);
}
}
}

private String getPrice1Sleep10() {
System.out.println("1-start will sleep 10s");
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("1 sleep end");
return "1-result";
}

private String getPrice2Sleep2() {
System.out.println("2-start will sleep 2s");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("2 sleep end");
return "2-result";
}

private String getPrice3Sleep5() {
System.out.println("3-start will sleep 5s");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("3 sleep end");
return "3-result";
}
}