CompletableFuture
是java8新增的一个接口,它提供了对异步程序执行的另一种方式。
理解Future
在结构上看CompletableFuture
实现了Future和CompletionStage这两个接口,先来看看Future接口提供的功能
1 2 3 4 5 6 7 8
| public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
|
Future是一个将来对象,用来接收异步操作执行后的结果,提供了三个方法用来获取当前异步线程执行的状态。而调用get()
方法会阻塞异步线程,直到执行完成获取结果。
但在实际异步编程的时候,使用起来并不是那么特别好用。
一个简单的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| @Test public void test5() throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(3); Future<String> f = executorService.submit(()->{ System.out.println(Thread.currentThread().getName()); try { Thread.sleep(3000L); } catch (InterruptedException e) { e.printStackTrace(); } return "async finished"; });
try { Thread.sleep(5000L); } catch (InterruptedException e) { e.printStackTrace(); } String r = f.get(); System.out.println(r); }
|
理解CompletionStage
上面提到了CompletableFuture
实现的另外一个接口是CompletionStage
,从字面上看这是一个完成阶段,在这个接口中声明了许多的方法
虽然这个接口的方法很多,但是很有规律,总的来说,分为对异步结果分别进行响应(),结果转换(Function, BiFunction),结果消费(Consumer, BiConsumer),以及异常的处理等。
对异步结果进行转换
1
| public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
|
例子:
1 2 3 4 5 6 7 8
| @Test public void test() { String result = CompletableFuture.supplyAsync(()-> "Hello").thenApplyAsync(s->{ System.out.println(Thread.currentThread().getName()); return s+"world"; }).join(); System.out.println(result); }
|
对异步结果进行消费
1
| public CompletionStage<Void> thenAccept(Consumer<? super T> action);
|
例子:
1 2 3 4
| @Test public void test() { CompletableFuture.supplyAsync(()-> "Hello").thenAccept(System.out::println); }
|
对异步结果不关心,执行下一个操作
1
| public CompletionStage<Void> thenRun(Runnable action);
|
例子:
1 2 3 4
| @Test public void test() { CompletableFuture.supplyAsync(()-> "Hello").thenRun(()-> System.out.println("World")); }
|
将CompletionStage函数作为参数
1 2
| public <U> CompletionStage<U> thenCompose (Function<? super T, ? extends CompletionStage<U>> fn);
|
例子:
1 2 3 4 5
| @Test public void test() { String combine = CompletableFuture.supplyAsync(()-> "Hello").thenComposeAsync((x)-> CompletableFuture.supplyAsync(()-> x + "World")).join(); System.out.println(combine); }
|
组合其它的CompletionStage,使用BiFunction计算转换
1 2 3
| public <U,V> CompletionStage<V> thenCombine (CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
|
例子:
1 2 3 4 5
| @Test public void test() { String combine = CompletableFuture.supplyAsync(()-> "Hello").thenCombine(CompletableFuture.supplyAsync(()-> "World"), (x, y)-> x+y).join(); System.out.println(combine); }
|
组合其它的CompletionStage,使用BiConsumer消费
1 2 3
| public <U> CompletionStage<Void> thenAcceptBoth (CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action);
|
例子:
1 2 3 4 5 6
| @Test public void test() { CompletableFuture.supplyAsync(()-> "Hello").thenAcceptBoth(CompletableFuture.supplyAsync(()-> "World"), (x, y)-> { System.out.println(x+y); }); }
|
组合其它的CompletionStage,都完成后执行新的任务
1 2
| public CompletionStage<Void> runAfterBoth(CompletionStage<?> other, Runnable action);
|
例子:
1 2 3 4 5 6
| @Test public void test() { CompletableFuture.supplyAsync(()-> "Hello").runAfterBoth(CompletableFuture.supplyAsync(()-> "World"), ()-> { System.out.println("ok"); }); }
|
组合其它的CompletionStage,用计算快的CompletionStage结果进行后续计算转换
1 2 3
| public <U> CompletionStage<U> applyToEither (CompletionStage<? extends T> other, Function<? super T, U> fn);
|
例子:
1 2 3 4 5
| @Test public void test() { String e = CompletableFuture.supplyAsync(()-> "Hello").applyToEither(CompletableFuture.supplyAsync(()-> "World"), (x)-> x).join(); System.out.println(e); }
|
组合其它的CompletionStage,用计算快的CompletionStage结果进行消费
1 2 3
| public CompletionStage<Void> acceptEither (CompletionStage<? extends T> other, Consumer<? super T> action);
|
例子:
1 2 3 4
| @Test public void test() { CompletableFuture.supplyAsync(()-> "Hello").acceptEither(CompletableFuture.supplyAsync(()-> "World"), System.out::println); }
|
组合其它的CompletionStage,其中一个完成后,继续后续任务
1 2
| public CompletionStage<Void> runAfterEither(CompletionStage<?> other, Runnable action);
|
例子:
1 2 3 4
| @Test public void test() { CompletableFuture.supplyAsync(()-> "Hello").runAfterEither(CompletableFuture.supplyAsync(()-> "World"), ()-> System.out.println("Hi")); }
|
函数完成异常, 使用exceptionally进行熔断
1 2
| public CompletionStage<T> exceptionally (Function<Throwable, ? extends T> fn);
|
例子:
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Test public void test() { String r = CompletableFuture.supplyAsync(()-> { if (true) { throw new RuntimeException("Error Test"); } return "Hello"; }).exceptionally(e -> { e.printStackTrace(); return "World"; }).join(); System.out.println(r); }
|
当运行完成时,对正常和异常的结果进行消费
1 2
| public CompletionStage<T> whenComplete (BiConsumer<? super T, ? super Throwable> action);
|
例子:
1 2 3 4 5 6 7 8 9 10 11 12
| @Test public void test() { CompletableFuture.supplyAsync(()-> { if (true) { throw new RuntimeException("Error Test"); } return "Hello"; }).whenComplete((x, e)-> { System.out.println(x); e.printStackTrace(); }); }
|
对正常和异常的运行结果进行转换
1 2
| public <U> CompletionStage<U> handle (BiFunction<? super T, Throwable, ? extends U> fn);
|
例子:
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Test public void test() { CompletableFuture.supplyAsync(()-> { if (true) { throw new RuntimeException("Error Test"); } return "Hello"; }).handle((x, e)-> { System.out.println(x); e.printStackTrace(); return ""; }); }
|
理解CompletableFuture
在看完Future和CompletionStage接口之后,我们继续看CompletableFuture里面定义的一些公共方法
构造CompletableFuture
- 空构造方法构造一个非完成状态的CompletableFuture
- 通过一个给定的对象创建一个完成状态的CompletableFuture
- 通过接受一个Supplier接口函数创建