Skip to content

Commit 1c9f3d2

Browse files
committed
Fix reactive connection handling.
Original Pull Request #1766 Closes #1759 (cherry picked from commit 58bca88)
1 parent 2b2bc3e commit 1c9f3d2

File tree

7 files changed

+87
-32
lines changed

7 files changed

+87
-32
lines changed

src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@
145145
*/
146146
public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearchClient, Indices {
147147

148-
private final HostProvider hostProvider;
148+
private final HostProvider<?> hostProvider;
149149
private final RequestCreator requestCreator;
150150
private Supplier<HttpHeaders> headersSupplier = () -> HttpHeaders.EMPTY;
151151

@@ -155,7 +155,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
155155
*
156156
* @param hostProvider must not be {@literal null}.
157157
*/
158-
public DefaultReactiveElasticsearchClient(HostProvider hostProvider) {
158+
public DefaultReactiveElasticsearchClient(HostProvider<?> hostProvider) {
159159
this(hostProvider, new DefaultRequestCreator());
160160
}
161161

@@ -166,7 +166,7 @@ public DefaultReactiveElasticsearchClient(HostProvider hostProvider) {
166166
* @param hostProvider must not be {@literal null}.
167167
* @param requestCreator must not be {@literal null}.
168168
*/
169-
public DefaultReactiveElasticsearchClient(HostProvider hostProvider, RequestCreator requestCreator) {
169+
public DefaultReactiveElasticsearchClient(HostProvider<?> hostProvider, RequestCreator requestCreator) {
170170

171171
Assert.notNull(hostProvider, "HostProvider must not be null");
172172
Assert.notNull(requestCreator, "RequestCreator must not be null");
@@ -535,8 +535,7 @@ public <T> Mono<T> execute(ReactiveElasticsearchClientCallback<T> callback) {
535535
.flatMap(callback::doWithClient) //
536536
.onErrorResume(throwable -> {
537537

538-
if (throwable instanceof ConnectException) {
539-
538+
if (isCausedByConnectionException(throwable)) {
540539
return hostProvider.getActive(Verification.ACTIVE) //
541540
.flatMap(callback::doWithClient);
542541
}
@@ -545,6 +544,27 @@ public <T> Mono<T> execute(ReactiveElasticsearchClientCallback<T> callback) {
545544
});
546545
}
547546

547+
/**
548+
* checks if the given throwable is a {@link ConnectException} or has one in it's cause chain
549+
*
550+
* @param throwable the throwable to check
551+
* @return true if throwable is caused by a {@link ConnectException}
552+
*/
553+
private boolean isCausedByConnectionException(Throwable throwable) {
554+
555+
Throwable t = throwable;
556+
do {
557+
558+
if (t instanceof ConnectException) {
559+
return true;
560+
}
561+
562+
t = t.getCause();
563+
} while (t != null);
564+
565+
return false;
566+
}
567+
548568
@Override
549569
public Mono<Status> status() {
550570

@@ -823,10 +843,9 @@ private <T> Publisher<? extends T> handleServerError(Request request, ClientResp
823843
String mediaType = response.headers().contentType().map(MediaType::toString).orElse(XContentType.JSON.mediaType());
824844

825845
return response.body(BodyExtractors.toMono(byte[].class)) //
826-
.switchIfEmpty(Mono
827-
.error(new ElasticsearchStatusException(String.format("%s request to %s returned error code %s and no body.",
828-
request.getMethod(), request.getEndpoint(), statusCode), status))
829-
)
846+
.switchIfEmpty(Mono.error(
847+
new ElasticsearchStatusException(String.format("%s request to %s returned error code %s and no body.",
848+
request.getMethod(), request.getEndpoint(), statusCode), status)))
830849
.map(bytes -> new String(bytes, StandardCharsets.UTF_8)) //
831850
.flatMap(content -> contentOrError(content, mediaType, status))
832851
.flatMap(unused -> Mono

src/main/java/org/springframework/data/elasticsearch/client/reactive/HostProvider.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2020 the original author or authors.
2+
* Copyright 2018-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -34,9 +34,10 @@
3434
*
3535
* @author Christoph Strobl
3636
* @author Mark Paluch
37+
* @author Peter-Josef Meisch
3738
* @since 3.2
3839
*/
39-
public interface HostProvider {
40+
public interface HostProvider<T extends HostProvider<T>> {
4041

4142
/**
4243
* Create a new {@link HostProvider} best suited for the given {@link WebClientProvider} and number of hosts.
@@ -46,7 +47,7 @@ public interface HostProvider {
4647
* @param endpoints must not be {@literal null} nor empty.
4748
* @return new instance of {@link HostProvider}.
4849
*/
49-
static HostProvider provider(WebClientProvider clientProvider, Supplier<HttpHeaders> headersSupplier,
50+
static HostProvider<?> provider(WebClientProvider clientProvider, Supplier<HttpHeaders> headersSupplier,
5051
InetSocketAddress... endpoints) {
5152

5253
Assert.notNull(clientProvider, "WebClientProvider must not be null");

src/main/java/org/springframework/data/elasticsearch/client/reactive/MultiNodeHostProvider.java

Lines changed: 47 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2020 the original author or authors.
2+
* Copyright 2018-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@
2020
import reactor.util.function.Tuple2;
2121

2222
import java.net.InetSocketAddress;
23+
import java.time.Duration;
2324
import java.util.ArrayList;
2425
import java.util.Collection;
2526
import java.util.Collections;
@@ -29,6 +30,8 @@
2930
import java.util.concurrent.ConcurrentHashMap;
3031
import java.util.function.Supplier;
3132

33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
3235
import org.springframework.data.elasticsearch.client.ElasticsearchHost;
3336
import org.springframework.data.elasticsearch.client.ElasticsearchHost.State;
3437
import org.springframework.data.elasticsearch.client.NoReachableHostException;
@@ -42,22 +45,28 @@
4245
*
4346
* @author Christoph Strobl
4447
* @author Mark Paluch
48+
* @author Peter-Josef Meisch
4549
* @since 3.2
4650
*/
47-
class MultiNodeHostProvider implements HostProvider {
51+
class MultiNodeHostProvider implements HostProvider<MultiNodeHostProvider> {
52+
53+
private final static Logger LOG = LoggerFactory.getLogger(MultiNodeHostProvider.class);
4854

4955
private final WebClientProvider clientProvider;
5056
private final Supplier<HttpHeaders> headersSupplier;
5157
private final Map<InetSocketAddress, ElasticsearchHost> hosts;
5258

53-
MultiNodeHostProvider(WebClientProvider clientProvider, Supplier<HttpHeaders> headersSupplier, InetSocketAddress... endpoints) {
59+
MultiNodeHostProvider(WebClientProvider clientProvider, Supplier<HttpHeaders> headersSupplier,
60+
InetSocketAddress... endpoints) {
5461

5562
this.clientProvider = clientProvider;
5663
this.headersSupplier = headersSupplier;
5764
this.hosts = new ConcurrentHashMap<>();
5865
for (InetSocketAddress endpoint : endpoints) {
5966
this.hosts.put(endpoint, new ElasticsearchHost(endpoint, State.UNKNOWN));
6067
}
68+
69+
LOG.debug("initialized with " + hosts);
6170
}
6271

6372
/*
@@ -66,7 +75,7 @@ class MultiNodeHostProvider implements HostProvider {
6675
*/
6776
@Override
6877
public Mono<ClusterInformation> clusterInfo() {
69-
return nodes(null).map(this::updateNodeState).buffer(hosts.size())
78+
return checkNodes(null).map(this::updateNodeState).buffer(hosts.size())
7079
.then(Mono.just(new ClusterInformation(new LinkedHashSet<>(this.hosts.values()))));
7180
}
7281

@@ -86,14 +95,19 @@ public WebClient createWebClient(InetSocketAddress endpoint) {
8695
@Override
8796
public Mono<InetSocketAddress> lookupActiveHost(Verification verification) {
8897

98+
LOG.trace("lookupActiveHost " + verification + " from " + hosts());
99+
89100
if (Verification.LAZY.equals(verification)) {
90101
for (ElasticsearchHost entry : hosts()) {
91102
if (entry.isOnline()) {
103+
LOG.trace("lookupActiveHost returning " + entry);
92104
return Mono.just(entry.getEndpoint());
93105
}
94106
}
107+
LOG.trace("no online host found with LAZY");
95108
}
96109

110+
LOG.trace("searching for active host");
97111
return findActiveHostInKnownActives() //
98112
.switchIfEmpty(findActiveHostInUnresolved()) //
99113
.switchIfEmpty(findActiveHostInDead()) //
@@ -105,20 +119,30 @@ Collection<ElasticsearchHost> getCachedHostState() {
105119
}
106120

107121
private Mono<InetSocketAddress> findActiveHostInKnownActives() {
108-
return findActiveForSate(State.ONLINE);
122+
return findActiveForState(State.ONLINE);
109123
}
110124

111125
private Mono<InetSocketAddress> findActiveHostInUnresolved() {
112-
return findActiveForSate(State.UNKNOWN);
126+
return findActiveForState(State.UNKNOWN);
113127
}
114128

115129
private Mono<InetSocketAddress> findActiveHostInDead() {
116-
return findActiveForSate(State.OFFLINE);
130+
return findActiveForState(State.OFFLINE);
117131
}
118132

119-
private Mono<InetSocketAddress> findActiveForSate(State state) {
120-
return nodes(state).map(this::updateNodeState).filter(ElasticsearchHost::isOnline)
121-
.map(ElasticsearchHost::getEndpoint).next();
133+
private Mono<InetSocketAddress> findActiveForState(State state) {
134+
135+
LOG.trace("findActiveForState state " + state + ", current hosts: " + hosts);
136+
137+
return checkNodes(state) //
138+
.map(this::updateNodeState) //
139+
.filter(ElasticsearchHost::isOnline) //
140+
.map(elasticsearchHost -> {
141+
LOG.trace("findActiveForState returning host " + elasticsearchHost);
142+
return elasticsearchHost;
143+
}).map(ElasticsearchHost::getEndpoint) //
144+
.takeLast(1) //
145+
.next();
122146
}
123147

124148
private ElasticsearchHost updateNodeState(Tuple2<InetSocketAddress, State> tuple2) {
@@ -129,25 +153,34 @@ private ElasticsearchHost updateNodeState(Tuple2<InetSocketAddress, State> tuple
129153
return elasticsearchHost;
130154
}
131155

132-
private Flux<Tuple2<InetSocketAddress, State>> nodes(@Nullable State state) {
156+
private Flux<Tuple2<InetSocketAddress, State>> checkNodes(@Nullable State state) {
157+
158+
LOG.trace("checkNodes() with state " + state);
133159

134160
return Flux.fromIterable(hosts()) //
135161
.filter(entry -> state == null || entry.getState().equals(state)) //
136162
.map(ElasticsearchHost::getEndpoint) //
137-
.flatMap(host -> {
163+
.concatMap(host -> {
164+
165+
LOG.trace("checking host " + host);
138166

139167
Mono<ClientResponse> exchange = createWebClient(host) //
140168
.head().uri("/") //
141169
.headers(httpHeaders -> httpHeaders.addAll(headersSupplier.get())) //
142-
.exchange().doOnError(throwable -> {
170+
.exchange() //
171+
.timeout(Duration.ofSeconds(1)) //
172+
.doOnError(throwable -> {
143173
hosts.put(host, new ElasticsearchHost(host, State.OFFLINE));
144174
clientProvider.getErrorListener().accept(throwable);
145175
});
146176

147177
return Mono.just(host).zipWith(exchange
148178
.flatMap(it -> it.releaseBody().thenReturn(it.statusCode().isError() ? State.OFFLINE : State.ONLINE)));
149179
}) //
150-
.onErrorContinue((throwable, o) -> clientProvider.getErrorListener().accept(throwable));
180+
.map(tuple -> {
181+
LOG.trace("check result " + tuple);
182+
return tuple;
183+
}).onErrorContinue((throwable, o) -> clientProvider.getErrorListener().accept(throwable));
151184
}
152185

153186
private List<ElasticsearchHost> hosts() {

src/main/java/org/springframework/data/elasticsearch/client/reactive/SingleNodeHostProvider.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2020 the original author or authors.
2+
* Copyright 2018-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -32,9 +32,10 @@
3232
*
3333
* @author Christoph Strobl
3434
* @author Mark Paluch
35+
* @author Peter-Josef Meisch
3536
* @since 3.2
3637
*/
37-
class SingleNodeHostProvider implements HostProvider {
38+
class SingleNodeHostProvider implements HostProvider<SingleNodeHostProvider> {
3839

3940
private final WebClientProvider clientProvider;
4041
private final Supplier<HttpHeaders> headersSupplier;

src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientUnitTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2020 the original author or authors.
2+
* Copyright 2018-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -62,7 +62,7 @@ public class ReactiveElasticsearchClientUnitTests {
6262

6363
static final String HOST = ":9200";
6464

65-
MockDelegatingElasticsearchHostProvider<HostProvider> hostProvider;
65+
MockDelegatingElasticsearchHostProvider<? extends HostProvider<?>> hostProvider;
6666
ReactiveElasticsearchClient client;
6767

6868
@BeforeEach

src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveMockClientTestsUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ public T getDelegate() {
186186
return delegate;
187187
}
188188

189-
public MockDelegatingElasticsearchHostProvider<T> withActiveDefaultHost(String host) {
189+
public MockDelegatingElasticsearchHostProvider<? extends HostProvider<?>> withActiveDefaultHost(String host) {
190190
return new MockDelegatingElasticsearchHostProvider(HttpHeaders.EMPTY, clientProvider, errorCollector, delegate,
191191
host);
192192
}

src/test/java/org/springframework/data/elasticsearch/client/reactive/SingleNodeHostProviderUnitTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2020 the original author or authors.
2+
* Copyright 2018-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -30,6 +30,7 @@
3030

3131
/**
3232
* @author Christoph Strobl
33+
* @author Peter-Josef Meisch
3334
*/
3435
public class SingleNodeHostProviderUnitTests {
3536

0 commit comments

Comments
 (0)