理解CompletableFuture

理解CompletableFuture

CompletableFuture是java8新增的一个接口,它提供了对异步程序执行的另一种方式。

理解Future

在结构上看CompletableFuture实现了Future和CompletionStage这两个接口,先来看看Future接口提供的功能

1
2
3
4
5
6
7
8
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

Future是一个将来对象,用来接收异步操作执行后的结果,提供了三个方法用来获取当前异步线程执行的状态。而调用get()方法会阻塞异步线程,直到执行完成获取结果。

但在实际异步编程的时候,使用起来并不是那么特别好用。

一个简单的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Test
public void test5() throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(3);
Future<String> f = executorService.submit(()->{
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "async finished";
});

try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
String r = f.get();
System.out.println(r);
}

理解CompletionStage

上面提到了CompletableFuture实现的另外一个接口是CompletionStage,从字面上看这是一个完成阶段,在这个接口中声明了许多的方法

虽然这个接口的方法很多,但是很有规律,总的来说,分为对异步结果分别进行响应(),结果转换(Function, BiFunction),结果消费(Consumer, BiConsumer),以及异常的处理等。

对异步结果进行转换

1
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);

例子:

1
2
3
4
5
6
7
8
@Test
public void test() {
String result = CompletableFuture.supplyAsync(()-> "Hello").thenApplyAsync(s->{
System.out.println(Thread.currentThread().getName());
return s+"world";
}).join();
System.out.println(result);
}

对异步结果进行消费

1
public CompletionStage<Void> thenAccept(Consumer<? super T> action);

例子:

1
2
3
4
@Test
public void test() {
CompletableFuture.supplyAsync(()-> "Hello").thenAccept(System.out::println);
}

对异步结果不关心,执行下一个操作

1
public CompletionStage<Void> thenRun(Runnable action);

例子:

1
2
3
4
@Test
public void test() {
CompletableFuture.supplyAsync(()-> "Hello").thenRun(()-> System.out.println("World"));
}

将CompletionStage函数作为参数

1
2
public <U> CompletionStage<U> thenCompose
(Function<? super T, ? extends CompletionStage<U>> fn);

例子:

1
2
3
4
5
@Test
public void test() {
String combine = CompletableFuture.supplyAsync(()-> "Hello").thenComposeAsync((x)-> CompletableFuture.supplyAsync(()-> x + "World")).join();
System.out.println(combine);
}

组合其它的CompletionStage,使用BiFunction计算转换

1
2
3
public <U,V> CompletionStage<V> thenCombine
(CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn);

例子:

1
2
3
4
5
@Test
public void test() {
String combine = CompletableFuture.supplyAsync(()-> "Hello").thenCombine(CompletableFuture.supplyAsync(()-> "World"), (x, y)-> x+y).join();
System.out.println(combine);
}

组合其它的CompletionStage,使用BiConsumer消费

1
2
3
public <U> CompletionStage<Void> thenAcceptBoth
(CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action);

例子:

1
2
3
4
5
6
@Test
public void test() {
CompletableFuture.supplyAsync(()-> "Hello").thenAcceptBoth(CompletableFuture.supplyAsync(()-> "World"), (x, y)-> {
System.out.println(x+y);
});
}

组合其它的CompletionStage,都完成后执行新的任务

1
2
public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,
Runnable action);

例子:

1
2
3
4
5
6
@Test
public void test() {
CompletableFuture.supplyAsync(()-> "Hello").runAfterBoth(CompletableFuture.supplyAsync(()-> "World"), ()-> {
System.out.println("ok");
});
}

组合其它的CompletionStage,用计算快的CompletionStage结果进行后续计算转换

1
2
3
public <U> CompletionStage<U> applyToEither
(CompletionStage<? extends T> other,
Function<? super T, U> fn);

例子:

1
2
3
4
5
@Test
public void test() {
String e = CompletableFuture.supplyAsync(()-> "Hello").applyToEither(CompletableFuture.supplyAsync(()-> "World"), (x)-> x).join();
System.out.println(e);
}

组合其它的CompletionStage,用计算快的CompletionStage结果进行消费

1
2
3
public CompletionStage<Void> acceptEither
(CompletionStage<? extends T> other,
Consumer<? super T> action);

例子:

1
2
3
4
@Test
public void test() {
CompletableFuture.supplyAsync(()-> "Hello").acceptEither(CompletableFuture.supplyAsync(()-> "World"), System.out::println);
}

组合其它的CompletionStage,其中一个完成后,继续后续任务

1
2
public CompletionStage<Void> runAfterEither(CompletionStage<?> other,
Runnable action);

例子:

1
2
3
4
@Test
public void test() {
CompletableFuture.supplyAsync(()-> "Hello").runAfterEither(CompletableFuture.supplyAsync(()-> "World"), ()-> System.out.println("Hi"));
}

函数完成异常, 使用exceptionally进行熔断

1
2
public CompletionStage<T> exceptionally
(Function<Throwable, ? extends T> fn);

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
public void test() {
String r = CompletableFuture.supplyAsync(()-> {
if (true) {
throw new RuntimeException("Error Test");
}
return "Hello";
}).exceptionally(e -> {
e.printStackTrace();
return "World";
}).join();
System.out.println(r);
}

当运行完成时,对正常和异常的结果进行消费

1
2
public CompletionStage<T> whenComplete
(BiConsumer<? super T, ? super Throwable> action);

例子:

1
2
3
4
5
6
7
8
9
10
11
12
@Test
public void test() {
CompletableFuture.supplyAsync(()-> {
if (true) {
throw new RuntimeException("Error Test");
}
return "Hello";
}).whenComplete((x, e)-> {
System.out.println(x);
e.printStackTrace();
});
}

对正常和异常的运行结果进行转换

1
2
public <U> CompletionStage<U> handle
(BiFunction<? super T, Throwable, ? extends U> fn);

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
public void test() {
CompletableFuture.supplyAsync(()-> {
if (true) {
throw new RuntimeException("Error Test");
}
return "Hello";
}).handle((x, e)-> {
System.out.println(x);
e.printStackTrace();
return "";
});
}

理解CompletableFuture

在看完Future和CompletionStage接口之后,我们继续看CompletableFuture里面定义的一些公共方法

构造CompletableFuture

  1. 空构造方法构造一个非完成状态的CompletableFuture
  2. 通过一个给定的对象创建一个完成状态的CompletableFuture
  3. 通过接受一个Supplier接口函数创建
# java

Comments

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×