Skip to content

Commit a321643

Browse files
committed
Polish
Issue: SPR-16387
1 parent ffbc75a commit a321643

File tree

15 files changed

+125
-104
lines changed

15 files changed

+125
-104
lines changed

build.gradle

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,9 @@ configure(rootProject) {
247247
imports {
248248
mavenBom "io.projectreactor:reactor-bom:${reactorVersion}"
249249
}
250+
dependencies {
251+
dependency "io.projectreactor.netty:reactor-netty:0.8.0.BUILD-SNAPSHOT"
252+
}
250253
resolutionStrategy {
251254
cacheChangingModulesFor 0, 'seconds'
252255
}

spring-messaging/spring-messaging.gradle

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ dependencyManagement {
77
mavenBom "io.projectreactor:reactor-bom:${reactorVersion}"
88
mavenBom "io.netty:netty-bom:${nettyVersion}"
99
}
10+
dependencies {
11+
dependency "io.projectreactor.netty:reactor-netty:0.8.0.BUILD-SNAPSHOT"
12+
}
1013
resolutionStrategy {
1114
cacheChangingModulesFor 0, 'seconds'
1215
}
@@ -18,7 +21,7 @@ dependencies {
1821
compile(project(":spring-core"))
1922
optional(project(":spring-context"))
2023
optional(project(":spring-oxm"))
21-
optional("io.projectreactor.netty:reactor-netty:0.8.0.BUILD-SNAPSHOT")
24+
optional("io.projectreactor.netty:reactor-netty")
2225
optional("org.eclipse.jetty.websocket:websocket-server:${jettyVersion}") {
2326
exclude group: "javax.servlet", module: "javax.servlet-api"
2427
}

spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -90,25 +90,30 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
9090

9191

9292
/**
93-
* Simple constructor with a host and a port.
93+
* Simple constructor with the host and port to use to connect to.
94+
* <p>This constructor manages the lifecycle of the {@link TcpClient} and
95+
* underlying resources such as {@link ConnectionProvider},
96+
* {@link LoopResources}, and {@link ChannelGroup}.
97+
* <p>For full control over the initialization and lifecycle of the
98+
* TcpClient, use {@link #ReactorNettyTcpClient(TcpClient, ReactorNettyCodec)}.
9499
* @param host the host to connect to
95100
* @param port the port to connect to
96-
* @param codec the code to use
101+
* @param codec for encoding and decoding the input/output byte streams
97102
* @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec
98103
*/
99104
public ReactorNettyTcpClient(String host, int port, ReactorNettyCodec<P> codec) {
100105
Assert.notNull(host, "host is required");
101-
Assert.notNull(port, "port is required");
102106
Assert.notNull(codec, "ReactorNettyCodec is required");
103107

104108
this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
105109
this.loopResources = LoopResources.create("tcp-client-loop");
106110
this.poolResources = ConnectionProvider.elastic("tcp-client-pool");
107-
this.tcpClient = TcpClient.create(poolResources)
108-
.host(host)
109-
.port(port)
110-
.runOn(loopResources, false)
111-
.doOnConnected(c -> channelGroup.add(c.channel()));
111+
112+
this.tcpClient = TcpClient.create(this.poolResources)
113+
.host(host).port(port)
114+
.runOn(this.loopResources, false)
115+
.doOnConnected(conn -> this.channelGroup.add(conn.channel()));
116+
112117
this.codec = codec;
113118
}
114119

@@ -117,7 +122,7 @@ public ReactorNettyTcpClient(String host, int port, ReactorNettyCodec<P> codec)
117122
* lifecycle is expected to be managed externally.
118123
*
119124
* @param tcpClient the TcpClient instance to use
120-
* @param codec the code to use
125+
* @param codec for encoding and decoding the input/output byte streams
121126
* @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec
122127
*/
123128
public ReactorNettyTcpClient(TcpClient tcpClient, ReactorNettyCodec<P> codec) {
@@ -264,16 +269,16 @@ private class ReactorNettyHandler implements BiFunction<NettyInbound, NettyOutbo
264269
@Override
265270
@SuppressWarnings("unchecked")
266271
public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
267-
inbound.withConnection(c -> {
272+
inbound.withConnection(conn -> {
268273
if (logger.isDebugEnabled()) {
269-
logger.debug("Connected to " + c.address());
274+
logger.debug("Connected to " + conn.address());
270275
}
271276
});
272277
DirectProcessor<Void> completion = DirectProcessor.create();
273278
TcpConnection<P> connection = new ReactorNettyTcpConnection<>(inbound, outbound, codec, completion);
274279
scheduler.schedule(() -> connectionHandler.afterConnected(connection));
275280

276-
inbound.withConnection(c -> c.addHandler(new StompMessageDecoder<>(codec)));
281+
inbound.withConnection(conn -> conn.addHandler(new StompMessageDecoder<>(codec)));
277282

278283
inbound.receiveObject()
279284
.cast(Message.class)

spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -64,13 +64,13 @@ public ListenableFuture<Void> send(Message<P> message) {
6464
@Override
6565
@SuppressWarnings("deprecation")
6666
public void onReadInactivity(Runnable runnable, long inactivityDuration) {
67-
this.inbound.withConnection(c -> c.onReadIdle(inactivityDuration, runnable));
67+
this.inbound.withConnection(conn -> conn.onReadIdle(inactivityDuration, runnable));
6868
}
6969

7070
@Override
7171
@SuppressWarnings("deprecation")
7272
public void onWriteInactivity(Runnable runnable, long inactivityDuration) {
73-
this.inbound.withConnection(c -> c.onWriteIdle(inactivityDuration, runnable));
73+
this.inbound.withConnection(conn -> conn.onWriteIdle(inactivityDuration, runnable));
7474
}
7575

7676
@Override

spring-test/spring-test.gradle

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ dependencyManagement {
77
mavenBom "io.projectreactor:reactor-bom:${reactorVersion}"
88
mavenBom "io.netty:netty-bom:${nettyVersion}"
99
}
10+
dependencies {
11+
dependency "io.projectreactor.netty:reactor-netty:0.8.0.BUILD-SNAPSHOT"
12+
}
1013
resolutionStrategy {
1114
cacheChangingModulesFor 0, 'seconds'
1215
}
@@ -80,7 +83,7 @@ dependencies {
8083
testCompile("org.apache.httpcomponents:httpclient:4.5.5") {
8184
exclude group: "commons-logging", module: "commons-logging"
8285
}
83-
testCompile('io.projectreactor.netty:reactor-netty:0.8.0.BUILD-SNAPSHOT')
86+
testCompile('io.projectreactor.netty:reactor-netty')
8487
testCompile('de.bechte.junit:junit-hierarchicalcontextrunner:4.12.1')
8588
// Pull in the latest JUnit 5 Launcher API and the Vintage engine as well
8689
// so that we can run JUnit 4 tests in IntelliJ IDEA.

spring-web/spring-web.gradle

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ dependencyManagement {
88
mavenBom "io.projectreactor:reactor-bom:${reactorVersion}"
99
mavenBom "io.netty:netty-bom:${nettyVersion}"
1010
}
11+
dependencies {
12+
dependency "io.projectreactor.netty:reactor-netty:0.8.0.BUILD-SNAPSHOT"
13+
}
1114
resolutionStrategy {
1215
cacheChangingModulesFor 0, 'seconds'
1316
}
@@ -34,7 +37,7 @@ dependencies {
3437
optional("io.reactivex:rxjava-reactive-streams:${rxjavaAdapterVersion}")
3538
optional("io.reactivex.rxjava2:rxjava:${rxjava2Version}")
3639
optional("io.netty:netty-all")
37-
optional("io.projectreactor.netty:reactor-netty:0.8.0.BUILD-SNAPSHOT")
40+
optional("io.projectreactor.netty:reactor-netty")
3841
optional("org.apache.tomcat.embed:tomcat-embed-core:${tomcatVersion}")
3942
optional("org.eclipse.jetty:jetty-server:${jettyVersion}") {
4043
exclude group: "javax.servlet", module: "javax.servlet-api"

spring-web/src/main/java/org/springframework/http/client/reactive/ClientHttpConnector.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -35,16 +35,15 @@ public interface ClientHttpConnector {
3535

3636
/**
3737
* Connect to the origin server using the given {@code HttpMethod} and
38-
* {@code URI}, then apply the given {@code requestCallback} on the
39-
* {@link ClientHttpRequest} once the connection has been established.
40-
* <p>Return a publisher of the {@link ClientHttpResponse}.
38+
* {@code URI} and apply the given {@code requestCallback} when the HTTP
39+
* request of the underlying API can be initialized and written to.
4140
* @param method the HTTP request method
4241
* @param uri the HTTP request URI
43-
* @param requestCallback a function that prepares and writes the request,
44-
* returning a publisher that signals when it's done interacting with the
45-
* request. Implementations should return a {@code Mono<Void>} by calling
42+
* @param requestCallback a function that prepares and writes to the request,
43+
* returning a publisher that signals when it's done writing.
44+
* Implementations can return a {@code Mono<Void>} by calling
4645
* {@link ClientHttpRequest#writeWith} or {@link ClientHttpRequest#setComplete}.
47-
* @return a publisher of the {@link ClientHttpResponse}
46+
* @return publisher for the {@link ClientHttpResponse}
4847
*/
4948
Mono<ClientHttpResponse> connect(HttpMethod method, URI uri,
5049
Function<? super ClientHttpRequest, Mono<Void>> requestCallback);

spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -44,16 +44,17 @@ public class ReactorClientHttpConnector implements ClientHttpConnector {
4444

4545
/**
4646
* Create a Reactor Netty {@link ClientHttpConnector}
47-
* with a default configuration and HTTP compression support enabled.
47+
* with default configuration and HTTP compression support enabled.
4848
*/
4949
public ReactorClientHttpConnector() {
50-
this.httpClient = HttpClient.create()
51-
.compress();
50+
this.httpClient = HttpClient.create().compress();
5251
}
5352

5453
/**
55-
* Create a Reactor Netty {@link ClientHttpConnector} with the given
56-
* {@link HttpClient}
54+
* Create a Reactor Netty {@link ClientHttpConnector} with a fully
55+
* configured {@code HttpClient}.
56+
* @param httpClient the client instance to use
57+
* @since 5.1
5758
*/
5859
public ReactorClientHttpConnector(HttpClient httpClient) {
5960
this.httpClient = httpClient;
@@ -69,24 +70,23 @@ public Mono<ClientHttpResponse> connect(HttpMethod method, URI uri,
6970
}
7071

7172
return this.httpClient
72-
.request(adaptHttpMethod(method))
73+
.request(io.netty.handler.codec.http.HttpMethod.valueOf(method.name()))
7374
.uri(uri.toString())
74-
.send((req, out) -> requestCallback.apply(adaptRequest(method, uri, req, out)))
75+
.send((request, outbound) -> requestCallback.apply(adaptRequest(method, uri, request, outbound)))
7576
.responseConnection((res, con) -> Mono.just(adaptResponse(res, con.inbound(), con.outbound().alloc())))
7677
.next();
7778
}
7879

79-
private io.netty.handler.codec.http.HttpMethod adaptHttpMethod(HttpMethod method) {
80-
return io.netty.handler.codec.http.HttpMethod.valueOf(method.name());
81-
}
80+
private ReactorClientHttpRequest adaptRequest(HttpMethod method, URI uri, HttpClientRequest request,
81+
NettyOutbound nettyOutbound) {
8282

83-
private ReactorClientHttpRequest adaptRequest(HttpMethod method, URI uri, HttpClientRequest request, NettyOutbound out) {
84-
return new ReactorClientHttpRequest(method, uri, request, out);
83+
return new ReactorClientHttpRequest(method, uri, request, nettyOutbound);
8584
}
8685

8786
private ClientHttpResponse adaptResponse(HttpClientResponse response, NettyInbound nettyInbound,
88-
ByteBufAllocator alloc) {
89-
return new ReactorClientHttpResponse(response, nettyInbound, alloc);
87+
ByteBufAllocator allocator) {
88+
89+
return new ReactorClientHttpResponse(response, nettyInbound, allocator);
9090
}
9191

9292
}

spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -47,20 +47,19 @@ class ReactorClientHttpRequest extends AbstractClientHttpRequest implements Zero
4747

4848
private final URI uri;
4949

50-
private final HttpClientRequest httpRequest;
50+
private final HttpClientRequest request;
5151

52-
private final NettyOutbound out;
52+
private final NettyOutbound outbound;
5353

5454
private final NettyDataBufferFactory bufferFactory;
5555

5656

57-
public ReactorClientHttpRequest(HttpMethod httpMethod, URI uri,
58-
HttpClientRequest httpRequest, NettyOutbound out) {
59-
this.httpMethod = httpMethod;
57+
public ReactorClientHttpRequest(HttpMethod method, URI uri, HttpClientRequest request, NettyOutbound outbound) {
58+
this.httpMethod = method;
6059
this.uri = uri;
61-
this.httpRequest = httpRequest;
62-
this.out = out;
63-
this.bufferFactory = new NettyDataBufferFactory(out.alloc());
60+
this.request = request;
61+
this.outbound = outbound;
62+
this.bufferFactory = new NettyDataBufferFactory(outbound.alloc());
6463
}
6564

6665

@@ -81,14 +80,16 @@ public URI getURI() {
8180

8281
@Override
8382
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
84-
return doCommit(() -> this.out
85-
.send(Flux.from(body).map(NettyDataBufferFactory::toByteBuf)).then());
83+
return doCommit(() -> {
84+
Flux<ByteBuf> byteBufFlux = Flux.from(body).map(NettyDataBufferFactory::toByteBuf);
85+
return this.outbound.send(byteBufFlux).then();
86+
});
8687
}
8788

8889
@Override
8990
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
9091
Publisher<Publisher<ByteBuf>> byteBufs = Flux.from(body).map(ReactorClientHttpRequest::toByteBufs);
91-
return doCommit(() -> this.out.sendGroups(byteBufs).then());
92+
return doCommit(() -> this.outbound.sendGroups(byteBufs).then());
9293
}
9394

9495
private static Publisher<ByteBuf> toByteBufs(Publisher<? extends DataBuffer> dataBuffers) {
@@ -97,24 +98,24 @@ private static Publisher<ByteBuf> toByteBufs(Publisher<? extends DataBuffer> dat
9798

9899
@Override
99100
public Mono<Void> writeWith(File file, long position, long count) {
100-
return doCommit(() -> this.out.sendFile(file.toPath(), position, count).then());
101+
return doCommit(() -> this.outbound.sendFile(file.toPath(), position, count).then());
101102
}
102103

103104
@Override
104105
public Mono<Void> setComplete() {
105-
return doCommit(() -> out.then());
106+
return doCommit(this.outbound::then);
106107
}
107108

108109
@Override
109110
protected void applyHeaders() {
110-
getHeaders().entrySet().forEach(e -> this.httpRequest.requestHeaders().set(e.getKey(), e.getValue()));
111+
getHeaders().forEach((key, value) -> this.request.requestHeaders().set(key, value));
111112
}
112113

113114
@Override
114115
protected void applyCookies() {
115116
getCookies().values().stream().flatMap(Collection::stream)
116117
.map(cookie -> new DefaultCookie(cookie.getName(), cookie.getValue()))
117-
.forEach(this.httpRequest::addCookie);
118+
.forEach(this.request::addCookie);
118119
}
119120

120121
}

spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -42,28 +42,26 @@
4242
*/
4343
class ReactorClientHttpResponse implements ClientHttpResponse {
4444

45-
private final NettyDataBufferFactory dataBufferFactory;
45+
private final NettyDataBufferFactory bufferFactory;
4646

4747
private final HttpClientResponse response;
4848

49-
private final NettyInbound nettyInbound;
49+
private final NettyInbound inbound;
5050

5151

52-
public ReactorClientHttpResponse(HttpClientResponse response, NettyInbound nettyInbound,
53-
ByteBufAllocator alloc) {
52+
public ReactorClientHttpResponse(HttpClientResponse response, NettyInbound inbound, ByteBufAllocator alloc) {
5453
this.response = response;
55-
this.nettyInbound = nettyInbound;
56-
this.dataBufferFactory = new NettyDataBufferFactory(alloc);
54+
this.inbound = inbound;
55+
this.bufferFactory = new NettyDataBufferFactory(alloc);
5756
}
5857

5958

6059
@Override
6160
public Flux<DataBuffer> getBody() {
62-
return nettyInbound
63-
.receive()
64-
.map(buf -> {
65-
buf.retain();
66-
return dataBufferFactory.wrap(buf);
61+
return this.inbound.receive()
62+
.map(byteBuf -> {
63+
byteBuf.retain();
64+
return this.bufferFactory.wrap(byteBuf);
6765
});
6866
}
6967

0 commit comments

Comments
 (0)