用流水线思想提高效率
生产消费模式优点 解耦、异步削峰(异步平衡生产消费速度差)
支持批量执行以提升性能 首先是以阻塞方式获取任务队列中的一条任务,而后则是以非阻塞的方式获取任务; 阻塞方式如果任务队列中没有任务能够避免无谓的循环
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 //任务队列 BlockingQueue<Task> bq = new LinkedBlockingQueue<>(2000); //启动5个消费者线程 //执⾏批量任务 void start () { ExecutorService es = xecutors .newFixedThreadPool(5); for (int i = 0; i < 5; i++) { es.execute(() -> { try { while (true) { //获取批量任务 List<Task> ts = pollTasks(); //执⾏批量任务 execTasks(ts); } } catch (Exception e) { e.printStackTrace(); } }); } } //从任务队列中获取批量任务 List<Task> pollTasks () throws InterruptedException { List<Task> ts = new LinkedList<>(); //阻塞式获取⼀条任务 Task t = bq.take(); while (t != null) { ts.add(t); //⾮阻塞式获取⼀条任务 t = bq.poll(); } return ts; } //批量执⾏任务 execTasks(List < Task > ts) { //省略具体代码⽆数 }
支持分阶段提交以提升性能 自定义日志组件,采用异步刷盘方式,刷盘的时机:
ERROR级别的日志需要立即刷盘;
数据积累到500条需要立即刷盘;
存在未刷盘数据,且5秒钟内未曾刷盘,需要立即刷盘。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 class Logger { //任务队列 final BlockingQueue<LogMsg> bq = new BlockingQueue<>(); //flush批量 static final int batchSize = 500; //只需要⼀个线程写⽇志 ExecutorService es = Executors.newFixedThreadPool(1); //启动写⽇志线程 void start() { File file = File.createTempFile("foo", ".log"); final FileWriter writer = new FileWriter(file); this.es.execute(() -> { try { //未刷盘⽇志数量 int curIdx = 0; long preFT = System.currentTimeMillis(); while (true) { LogMsg log = bq.poll( 5, TimeUnit.SECONDS); //写⽇志 if (log != null) { writer.write(log.toString()); ++curIdx; } //如果不存在未刷盘数据,则⽆需刷盘 if (curIdx <= 0) { continue; } //根据规则刷盘 if (log != null && log.level == LEVEL.ERROR || curIdx == batchSize || System.currentTimeMillis() - preFT > 5000) { writer.flush(); curIdx = 0; preFT = System.currentTimeMillis(); } } } catch (Exception e) { e.printStackTrace(); } finally { try { writer.flush(); writer.close(); } catch (IOException e) { e.printStackTrace(); } } }); } //写INFO级别⽇志 void info(String msg) { bq.put(new LogMsg( LEVEL.INFO, msg)); } //写ERROR级别⽇志 void error(String msg) { bq.put(new LogMsg( LEVEL.ERROR, msg)); } } //⽇志级别 enum LEVEL { INFO, ERROR } class LogMsg { LEVEL level; String msg; //省略构造函数实现 LogMsg(LEVEL lvl, String msg) { } //省略toString()实现 String toString() { } }
总结 Java线程池本身就是一种生产者-消费者模式的实现,每次只能从任务队列中消费一个任务来执行,批量执行以及分阶段提交的场景需自己实现。
生产-消费模式在分布式场景下,借助消息队列实现。 MQ支持两种模型:一种是点对点模型,一种是发布订阅模型 点对点:一个消息只会被一个消费者消费 发布订阅:一个消息会被多个消费者消费,本质上是一种消息的广播