Spring WebFlux源码学习笔记(一)

Spring WebFlux源码学习笔记(一)

在开始学习WebFlux之前,需要对Java的函数式编程有一定的了解,熟悉相关函数式接口的用法,包括Function,BiFunction等

开始

学习开始,我们需要找一个切入点,了解一次Http请求发生后,代码响应的过程,这里我们从reactor.netty.http.serverHttpServerHandle开始看起

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void onStateChange(Connection connection, State newState) {
if (newState == HttpServerState.REQUEST_RECEIVED) { //1
try {
if (log.isDebugEnabled()) {
log.debug(format(connection.channel(), "Handler is being applied: {}"), handler);
}
HttpServerOperations ops = (HttpServerOperations) connection; //2
Mono.fromDirect(handler.apply(ops, ops))
.subscribe(ops.disposeSubscriber()); //3
}
catch (Throwable t) {
log.error(format(connection.channel(), ""), t);
connection.channel()
.close();
}
}
}
  1. 接受一个StateREQUEST_RECEIVED状态的连接
  2. 获取在前面的过程中已经将连接信息封装到了HttpServerOperations对象中,这个对象实现了HttpServerRequest, HttpServerResponse

    1
    2
      class HttpServerOperations extends HttpOperations<HttpServerRequest, HttpServerResponse>
    implements HttpServerRequest, HttpServerResponse
  1. 创建了一个Mono对象(Reactor中的用来发出一个元素),并且订阅(触发)了这个Publisher。

    备注: Mono.fromDirect() 用来转换一个Publisher到Mono免去了一些基础的验证。

处理适配器:ReactorHttpHandlerAdapter

从开始的过程第3步中使用了ReactorHttpHandlerAdapter对HttpServerRequest, HttpServerResponse进行了处理

1
public class ReactorHttpHandlerAdapter implements BiFunction<HttpServerRequest, HttpServerResponse, Mono<Void>>

ReactorHttpHandlerAdapter实现了BiFunction,函数方法接收2个参数,分别是HttpServerRequest, HttpServerResponse,返回Mono,看类名可以看出这是一个适配器对象,实际的Handler为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public Mono<Void> apply(HttpServerRequest reactorRequest, HttpServerResponse reactorResponse) {
NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(reactorResponse.alloc());
try {
ReactorServerHttpRequest request = new ReactorServerHttpRequest(reactorRequest, bufferFactory); //1
ServerHttpResponse response = new ReactorServerHttpResponse(reactorResponse, bufferFactory); //2

if (request.getMethod() == HttpMethod.HEAD) {
response = new HttpHeadResponseDecorator(response);
}

return this.httpHandler.handle(request, response) //3
.doOnError(ex -> logger.trace(request.getLogPrefix() + "Failed to complete: " + ex.getMessage())) //4
.doOnSuccess(aVoid -> logger.trace(request.getLogPrefix() + "Handling completed")); //5
}
catch (URISyntaxException ex) {
if (logger.isDebugEnabled()) {
logger.debug("Failed to get request URI: " + ex.getMessage());
}
reactorResponse.status(HttpResponseStatus.BAD_REQUEST);
return Mono.empty();
}
}
  1. 包装HttpServerRequest到ReactorServerHttpRequest
  2. 包装HttpServerResponse到ReactorServerHttpResponse
  3. 调用ReactiveWebServerApplicationContext的内部类ServerManager完成handle。
  4. 出错时打印错误日志
  5. 成功是打印成功日志

注:

  1. doOnError()和doOnSuccess()不会对序列造成改变

继续任务委派

在ServerManager里面包含了HttpHandler对象,实际的实现对象为HttpWebHandlerAdapter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
if (this.forwardedHeaderTransformer != null) {
request = this.forwardedHeaderTransformer.apply(request);
}
ServerWebExchange exchange = createExchange(request, response); //1

LogFormatUtils.traceDebug(logger, traceOn ->
exchange.getLogPrefix() + formatRequest(exchange.getRequest()) +
(traceOn ? ", headers=" + formatHeaders(exchange.getRequest().getHeaders()) : ""));

return getDelegate().handle(exchange) //2
.doOnSuccess(aVoid -> logResponse(exchange)) //3
.onErrorResume(ex -> handleUnresolvedError(exchange, ex)) //4
.then(Mono.defer(response::setComplete)); //5
}
  1. 将request和response组合成了一个对象ServerWebExchange
  2. 调用被委派对象处理这个组合后的对象
  3. 执行成功后打印response日志
  4. 捕获handle过程中的异常,处理后返回一个Mono对象
  5. 用 Mono 来表示序列已经结束

    1
    2
    3
    4
    @Override
    public Mono<Void> setComplete() {
    return !isCommitted() ? doCommit(null) : Mono.empty(); //6
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
protected Mono<Void> doCommit(@Nullable Supplier<? extends Mono<Void>> writeAction) {
if (!this.state.compareAndSet(State.NEW, State.COMMITTING)) {
return Mono.empty();
}

this.commitActions.add(() ->
Mono.fromRunnable(() -> {
applyStatusCode();
applyHeaders();
applyCookies();
this.state.set(State.COMMITTED);
})); //7

if (writeAction != null) {
this.commitActions.add(writeAction);
}

List<? extends Mono<Void>> actions = this.commitActions.stream()
.map(Supplier::get).collect(Collectors.toList()); //8

return Flux.concat(actions).then(); //9
}
  1. 设置处理完成,提交了一个空的writeAction
  2. 添加响应状态码,响应头,cookie等
  3. 转换为一个类型Mono的数组
  4. 将3步骤产生的数组用打包成一个Mono返回

小结

从过程上看,当收到一个Http请求后,首先交给HttpHandlerAdapter类去处理,经过对数据的一些组合和处理,然后将请求丢给HttpWebHandlerAdapter去处理,从设计上看,WebHandler只是HttpHandler的一个子集,HttpWebHandlerAdapter本身是一个适配器对象,它会将任务继续委派给FilteringWebHandler

Comments

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×