Java异步执行测试。
实现Runable重写run方法
1 2 3 4 5 6 7 8 9
| new Thread(() -> { System.out.println("task executing"); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("task executed"); }).start();
|
原生的CompletableFuture
从Java 8开始引入了CompletableFuture,它针对Future做了改进,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。
示例一:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public class Main { public static void main(String[] args) throws Exception { CompletableFuture<Double> cf = CompletableFuture.supplyAsync(Main::fetchPrice); cf.thenAccept((result) -> { System.out.println("price: " + result); }); cf.exceptionally((e) -> { e.printStackTrace(); return null; }); Thread.sleep(200); }
static Double fetchPrice() { try { Thread.sleep(100); } catch (InterruptedException e) { } if (Math.random() < 0.3) { throw new RuntimeException("fetch price failed!"); } return 5 + Math.random() * 20; } }
|
结果
创建一个CompletableFuture是通过CompletableFuture.supplyAsync()实现的,它需要一个实现了Supplier接口的对象:
1 2 3
| public interface Supplier<T> { T get(); }
|
这里我们用lambda语法简化了一下,直接传入Main::fetchPrice,因为Main.fetchPrice()静态方法的签名符合Supplier接口的定义(除了方法名外)。
紧接着,CompletableFuture已经被提交给默认的线程池执行了,我们需要定义的是CompletableFuture完成时和异常时需要回调的实例。完成时,CompletableFuture会调用Consumer对象:
1 2 3
| public interface Consumer<T> { void accept(T t); }
|
异常时,CompletableFuture会调用Function对象:
1 2 3
| public interface Function<T, R> { R apply(T t); }
|
CompletableFuture的优点是:
- 异步任务结束时,会自动回调某个对象的方法;
- 异步任务出错时,会自动回调某个对象的方法;
- 主线程设置好回调后,不再关心异步任务的执行。
如果只是实现了异步回调机制,我们还看不出CompletableFuture相比Future的优势。CompletableFuture更强大的功能是,多个CompletableFuture可以串行执行,例如,定义两个CompletableFuture,第一个CompletableFuture根据证券名称查询证券代码,第二个CompletableFuture根据证券代码查询证券价格,这两个CompletableFuture实现串行操作如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| public class Main { public static void main(String[] args) throws Exception { CompletableFuture<String> cfQuery = CompletableFuture.supplyAsync(() -> { return queryCode("中国石油"); }); CompletableFuture<Double> cfFetch = cfQuery.thenApplyAsync((code) -> { return fetchPrice(code); }); cfFetch.thenAccept((result) -> { System.out.println("price: " + result); }); Thread.sleep(2000); }
static String queryCode(String name) { try { Thread.sleep(100); } catch (InterruptedException e) { } return "601857"; }
static Double fetchPrice(String code) { try { Thread.sleep(100); } catch (InterruptedException e) { } return 5 + Math.random() * 20; } }
|
除了串行执行外,多个CompletableFuture还可以并行执行。例如,我们考虑这样的场景:
同时从新浪和网易查询证券代码,只要任意一个返回结果,就进行下一步查询价格,查询价格也同时从新浪和网易查询,只要任意一个返回结果,就完成操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| public class Main { public static void main(String[] args) throws Exception { CompletableFuture<String> cfQueryFromSina = CompletableFuture.supplyAsync(() -> { return queryCode("中国石油", "https://finance.sina.com.cn/code/"); }); CompletableFuture<String> cfQueryFrom163 = CompletableFuture.supplyAsync(() -> { return queryCode("中国石油", "https://money.163.com/code/"); });
CompletableFuture<Object> cfQuery = CompletableFuture.anyOf(cfQueryFromSina, cfQueryFrom163);
CompletableFuture<Double> cfFetchFromSina = cfQuery.thenApplyAsync((code) -> { return fetchPrice((String) code, "https://finance.sina.com.cn/price/"); }); CompletableFuture<Double> cfFetchFrom163 = cfQuery.thenApplyAsync((code) -> { return fetchPrice((String) code, "https://money.163.com/price/"); });
CompletableFuture<Object> cfFetch = CompletableFuture.anyOf(cfFetchFromSina, cfFetchFrom163);
cfFetch.thenAccept((result) -> { System.out.println("price: " + result); }); Thread.sleep(200); }
static String queryCode(String name, String url) { System.out.println("query code from " + url + "..."); try { Thread.sleep((long) (Math.random() * 100)); } catch (InterruptedException e) { } return "601857"; }
static Double fetchPrice(String code, String url) { System.out.println("query price from " + url + "..."); try { Thread.sleep((long) (Math.random() * 100)); } catch (InterruptedException e) { } return 5 + Math.random() * 20; } }
|
上述逻辑实现的异步查询规则实际上是:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| ┌─────────────┐ ┌─────────────┐ │ Query Code │ │ Query Code │ │ from sina │ │ from 163 │ └─────────────┘ └─────────────┘ │ │ └───────┬───────┘ ▼ ┌─────────────┐ │ anyOf │ └─────────────┘ │ ┌───────┴────────┐ ▼ ▼ ┌─────────────┐ ┌─────────────┐ │ Query Price │ │ Query Price │ │ from sina │ │ from 163 │ └─────────────┘ └─────────────┘ │ │ └────────┬───────┘ ▼ ┌─────────────┐ │ anyOf │ └─────────────┘ │ ▼ ┌─────────────┐ │Display Price│ └─────────────┘
|
除了anyOf()可以实现“任意个CompletableFuture只要一个成功”,allOf()可以实现“所有CompletableFuture都必须成功”,这些组合操作可以实现非常复杂的异步流程控制。
最后我们注意CompletableFuture的命名规则:
xxx():表示该方法将继续在已有的线程中执行;
xxxAsync():表示将异步在线程池中执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| int a = 1, b = 2; ExecutorService executor = Executors.newFixedThreadPool(5); for (int i = 0; i < 10; i++) { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { System.out.println("task start execute"); try { Thread.sleep(1000); } catch (InterruptedException e1) { e1.printStackTrace(); } System.out.println("task executed"); return a + b; }, executor); future.thenAcceptAsync(System.out::println); System.out.println(new Timestamp(new Date().getTime())); }
|
newFixedThreadPool()创建一个线程池,重复使用固定数量的线程在共享的无界队列上操作。
supplyAsync()返回一个新的CompletableFuture,该CompletableFuture由在给定执行器中运行的任务异步完成,该任务的值通过调用给定的Supplier获得。
thenAcceptAsync()返回一个新的CompletionStage,当此阶段正常完成时,使用此阶段的默认异步执行工具执行该阶段,并将此阶段的结果作为所提供函数的参数。
异步结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| main start task start execute 2021-06-27 23:52:22.558 2021-06-27 23:52:22.569 2021-06-27 23:52:22.569 2021-06-27 23:52:22.569 2021-06-27 23:52:22.57 2021-06-27 23:52:22.57 2021-06-27 23:52:22.57 2021-06-27 23:52:22.57 2021-06-27 23:52:22.57 2021-06-27 23:52:22.57 main finished task start execute task start execute task start execute task start execute task executed task start execute 4 task executed task executed task executed task executed 4 4 4 4 task start execute task start execute task start execute task start execute task executed 4 task executed task executed task executed task executed 4 4 4 4
|