在开始学习WebFlux之前,需要对Java的函数式编程有一定的了解,熟悉相关函数式接口的用法,包括Function,BiFunction等
开始
学习开始,我们需要找一个切入点,了解一次Http请求发生后,代码响应的过程,这里我们从reactor.netty.http.server
的HttpServerHandle
开始看起
1 | public void onStateChange(Connection connection, State newState) { |
- 接受一个
State
为REQUEST_RECEIVED
状态的连接 获取在前面的过程中已经将连接信息封装到了
HttpServerOperations
对象中,这个对象实现了HttpServerRequest, HttpServerResponse1
2class HttpServerOperations extends HttpOperations<HttpServerRequest, HttpServerResponse>
implements HttpServerRequest, HttpServerResponse
创建了一个Mono对象(Reactor中的用来发出一个元素),并且订阅(触发)了这个Publisher。
备注: Mono.fromDirect() 用来转换一个Publisher到Mono免去了一些基础的验证。
处理适配器:ReactorHttpHandlerAdapter
从开始的过程第3步中使用了ReactorHttpHandlerAdapter
对HttpServerRequest, HttpServerResponse进行了处理
1 | public class ReactorHttpHandlerAdapter implements BiFunction<HttpServerRequest, HttpServerResponse, Mono<Void>> |
ReactorHttpHandlerAdapter实现了BiFunction,函数方法接收2个参数,分别是HttpServerRequest, HttpServerResponse,返回Mono
1 | public Mono<Void> apply(HttpServerRequest reactorRequest, HttpServerResponse reactorResponse) { |
- 包装HttpServerRequest到ReactorServerHttpRequest
- 包装HttpServerResponse到ReactorServerHttpResponse
- 调用ReactiveWebServerApplicationContext的内部类ServerManager完成handle。
- 出错时打印错误日志
- 成功是打印成功日志
注:
- doOnError()和doOnSuccess()不会对序列造成改变
继续任务委派
在ServerManager里面包含了HttpHandler对象,实际的实现对象为HttpWebHandlerAdapter
1 | @Override |
- 将request和response组合成了一个对象ServerWebExchange
- 调用被委派对象处理这个组合后的对象
- 执行成功后打印response日志
- 捕获handle过程中的异常,处理后返回一个Mono对象
用 Mono 来表示序列已经结束
1
2
3
4@Override
public Mono<Void> setComplete() {
return !isCommitted() ? doCommit(null) : Mono.empty(); //6
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
protected Mono<Void> doCommit(@Nullable Supplier<? extends Mono<Void>> writeAction) {
if (!this.state.compareAndSet(State.NEW, State.COMMITTING)) {
return Mono.empty();
}
this.commitActions.add(() ->
Mono.fromRunnable(() -> {
applyStatusCode();
applyHeaders();
applyCookies();
this.state.set(State.COMMITTED);
})); //7
if (writeAction != null) {
this.commitActions.add(writeAction);
}
List<? extends Mono<Void>> actions = this.commitActions.stream()
.map(Supplier::get).collect(Collectors.toList()); //8
return Flux.concat(actions).then(); //9
}
- 设置处理完成,提交了一个空的writeAction
- 添加响应状态码,响应头,cookie等
- 转换为一个类型Mono
的数组 - 将3步骤产生的数组用打包成一个Mono
返回
小结
从过程上看,当收到一个Http请求后,首先交给HttpHandlerAdapter类去处理,经过对数据的一些组合和处理,然后将请求丢给HttpWebHandlerAdapter去处理,从设计上看,WebHandler只是HttpHandler的一个子集,HttpWebHandlerAdapter本身是一个适配器对象,它会将任务继续委派给FilteringWebHandler