Skip to content

Commit 90c4a2a

Browse files
authored
DATAES-982 - Improve refresh handling.
Original PR: #573.
1 parent 6fd35b5 commit 90c4a2a

23 files changed

+269
-80
lines changed

src/main/asciidoc/reference/elasticsearch-clients.adoc

Lines changed: 22 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -22,28 +22,32 @@ public class TransportClientConfig extends ElasticsearchConfigurationSupport {
2222
2323
@Bean
2424
public Client elasticsearchClient() throws UnknownHostException {
25-
Settings settings = Settings.builder().put("cluster.name", "elasticsearch").build(); <1>
25+
Settings settings = Settings.builder().put("cluster.name", "elasticsearch").build(); <.>
2626
TransportClient client = new PreBuiltTransportClient(settings);
27-
client.addTransportAddress(new TransportAddress(InetAddress.getByName("127.0.0.1"), 9300)); <2>
27+
client.addTransportAddress(new TransportAddress(InetAddress.getByName("127.0.0.1"), 9300)); <.>
2828
return client;
2929
}
3030
3131
@Bean(name = { "elasticsearchOperations", "elasticsearchTemplate" })
3232
public ElasticsearchTemplate elasticsearchTemplate() throws UnknownHostException {
33-
return new ElasticsearchTemplate(elasticsearchClient());
33+
34+
ElasticsearchTemplate template = new ElasticsearchTemplate(elasticsearchClient, elasticsearchConverter);
35+
template.setRefreshPolicy(refreshPolicy()); <.>
36+
37+
return template;
3438
}
3539
}
3640
3741
// ...
3842
3943
IndexRequest request = new IndexRequest("spring-data", "elasticsearch", randomID())
40-
.source(someObject)
41-
.setRefreshPolicy(IMMEDIATE);
44+
.source(someObject);
4245
4346
IndexResponse response = client.index(request);
4447
----
45-
<1> The `TransportClient` must be configured with the cluster name.
46-
<2> The host and port to connect the client to.
48+
<.> The `TransportClient` must be configured with the cluster name.
49+
<.> The host and port to connect the client to.
50+
<.> the RefreshPolicy must be set in the `ElasticsearchTemplate` (override `refreshPolicy()` to not use the default)
4751
====
4852

4953
[[elasticsearch.clients.rest]]
@@ -103,39 +107,29 @@ Calls are directly operated on the reactive stack, **not** wrapping async (threa
103107
====
104108
[source,java]
105109
----
106-
static class Config {
107-
108-
@Bean
109-
ReactiveElasticsearchClient client() {
110+
@Configuration
111+
public class ReactiveRestClientConfig extends AbstractReactiveElasticsearchConfiguration {
110112
111-
ClientConfiguration clientConfiguration = ClientConfiguration.builder() <1>
112-
.connectedTo("localhost:9200", "localhost:9291")
113-
.withWebClientConfigurer(webClient -> { <2>
114-
ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder()
115-
.codecs(configurer -> configurer.defaultCodecs()
116-
.maxInMemorySize(-1))
113+
@Override
114+
@Bean
115+
public ReactiveElasticsearchClient reactiveElasticsearchClient() {
116+
final ClientConfiguration clientConfiguration = ClientConfiguration.builder() <.>
117+
.connectedTo("localhost:9200") //
117118
.build();
118-
return webClient.mutate().exchangeStrategies(exchangeStrategies).build();
119-
})
120-
.build();
119+
return ReactiveRestClients.create(clientConfiguration);
121120
122-
return ReactiveRestClients.create(clientConfiguration);
123-
}
121+
}
124122
}
125-
126123
// ...
127124
128125
Mono<IndexResponse> response = client.index(request ->
129126
130127
request.index("spring-data")
131-
.type("elasticsearch")
132128
.id(randomID())
133-
.source(singletonMap("feature", "reactive-client"))
134-
.setRefreshPolicy(IMMEDIATE);
129+
.source(singletonMap("feature", "reactive-client"));
135130
);
136131
----
137-
<1> Use the builder to provide cluster addresses, set default `HttpHeaders` or enable SSL.
138-
<2> when configuring a reactive client, the `withWebClientConfigurer` hook can be used to customize the WebClient.
132+
<.> Use the builder to provide cluster addresses, set default `HttpHeaders` or enable SSL.
139133
====
140134

141135
NOTE: The ReactiveClient response, especially for search operations, is bound to the `from` (offset) & `size` (limit) options of the request.
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
[[elasticsearch-migration-guide-4.1-4.2]]
2+
= Upgrading from 4.1.x to 4.2.x
3+
4+
This section describes breaking changes from version 4.1.x to 4.2.x and how removed features can be replaced by new introduced features.
5+
6+
[[elasticsearch-migration-guide-4.1-4.2.deprecations]]
7+
== Deprecations
8+
9+
[[elasticsearch-migration-guide-4.1-4.2.removal]]
10+
== Removals
11+
12+
[[elasticsearch-migration-guide-4.1-4.2.breaking-changes]]
13+
== Breaking Changes
14+
15+
=== RefreshPolicy
16+
17+
==== Enum package changed
18+
19+
It was possible in 4.1 to configure the refresh policy for the `ReactiveElasticsearchTemplate` by overriding the method `AbstractReactiveElasticsearchConfiguration.refreshPolicy()` in a custom configuration class. The return value of this method was an instance of the class `org.elasticsearch.action.support.WriteRequest.RefreshPolicy`.
20+
21+
Now the configuration must return `org.springframework.data.elasticsearch.core.RefreshPolicy`. This enum has the same values and triggers the same behaviour as before, so only the `import` statement has to be adjusted.
22+
23+
==== Refresh behaviour
24+
25+
`ElasticsearchOperations` and `ReactiveElasticsearchOperations` now explicitly use the `RefreshPolicy` set on the template for write requests if not null. If the refresh policy is null, then nothing special is done, so the cluster defaults are used. `ElasticsearchOperations` was always using the cluster default before this version.
26+
27+
The provided implementations for `ElasticsearchRepository` and `ReactiveElasticsearchRepository` will do an explicit refresh when the refresh policy is null. This is the same behaviour as in previous versions. If a refresh policy is set, then it will be used by the repositories as well.
28+
29+
==== Refresh configuration
30+
31+
When configuring Spring Data Elasticsearch like described in <<elasticsearch.clients>> by using `ElasticsearchConfigurationSupport`, `AbstractElasticsearchConfiguration` or `AbstractReactiveElasticsearchConfiguration` the refresh policy will be initialized to `null`. Previously the reactive code initialized this to `IMMEDIATE`, now reactive and
32+
non-reactive code show the same behaviour.

src/main/asciidoc/reference/migration-guides.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,6 @@
66
include::elasticsearch-migration-guide-3.2-4.0.adoc[]
77

88
include::elasticsearch-migration-guide-4.0-4.1.adoc[]
9+
10+
include::elasticsearch-migration-guide-4.1-4.2.adoc[]
911
:leveloffset: -1

src/main/java/org/springframework/data/elasticsearch/config/AbstractElasticsearchConfiguration.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ public abstract class AbstractElasticsearchConfiguration extends ElasticsearchCo
4545
@Bean(name = { "elasticsearchOperations", "elasticsearchTemplate" })
4646
public ElasticsearchOperations elasticsearchOperations(ElasticsearchConverter elasticsearchConverter,
4747
RestHighLevelClient elasticsearchClient) {
48-
return new ElasticsearchRestTemplate(elasticsearchClient, elasticsearchConverter);
48+
49+
ElasticsearchRestTemplate template = new ElasticsearchRestTemplate(elasticsearchClient, elasticsearchConverter);
50+
template.setRefreshPolicy(refreshPolicy());
51+
52+
return template;
4953
}
5054
}

src/main/java/org/springframework/data/elasticsearch/config/AbstractReactiveElasticsearchConfiguration.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@
1616
package org.springframework.data.elasticsearch.config;
1717

1818
import org.elasticsearch.action.support.IndicesOptions;
19-
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
2019
import org.springframework.context.annotation.Bean;
2120
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
2221
import org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations;
2322
import org.springframework.data.elasticsearch.core.ReactiveElasticsearchTemplate;
23+
import org.springframework.data.elasticsearch.core.RefreshPolicy;
2424
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
2525
import org.springframework.lang.Nullable;
2626

@@ -58,13 +58,13 @@ public ReactiveElasticsearchOperations reactiveElasticsearchTemplate(Elasticsear
5858
}
5959

6060
/**
61-
* Set up the write {@link RefreshPolicy}. Default is set to {@link RefreshPolicy#IMMEDIATE}.
61+
* Set up the write {@link RefreshPolicy}. Default is set to null to use the cluster defaults..
6262
*
6363
* @return {@literal null} to use the server defaults.
6464
*/
6565
@Nullable
6666
protected RefreshPolicy refreshPolicy() {
67-
return RefreshPolicy.IMMEDIATE;
67+
return null;
6868
}
6969

7070
/**

src/main/java/org/springframework/data/elasticsearch/config/ElasticsearchConfigurationSupport.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,12 @@
2828
import org.springframework.core.type.filter.AnnotationTypeFilter;
2929
import org.springframework.data.annotation.Persistent;
3030
import org.springframework.data.elasticsearch.annotations.Document;
31+
import org.springframework.data.elasticsearch.core.RefreshPolicy;
3132
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
3233
import org.springframework.data.elasticsearch.core.convert.ElasticsearchCustomConversions;
3334
import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchConverter;
3435
import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext;
36+
import org.springframework.lang.Nullable;
3537
import org.springframework.util.ClassUtils;
3638
import org.springframework.util.StringUtils;
3739

@@ -44,8 +46,8 @@
4446
public class ElasticsearchConfigurationSupport {
4547

4648
@Bean
47-
public ElasticsearchConverter elasticsearchEntityMapper(
48-
SimpleElasticsearchMappingContext elasticsearchMappingContext, ElasticsearchCustomConversions elasticsearchCustomConversions) {
49+
public ElasticsearchConverter elasticsearchEntityMapper(SimpleElasticsearchMappingContext elasticsearchMappingContext,
50+
ElasticsearchCustomConversions elasticsearchCustomConversions) {
4951

5052
MappingElasticsearchConverter elasticsearchConverter = new MappingElasticsearchConverter(
5153
elasticsearchMappingContext);
@@ -61,7 +63,8 @@ public ElasticsearchConverter elasticsearchEntityMapper(
6163
* @return never {@literal null}.
6264
*/
6365
@Bean
64-
public SimpleElasticsearchMappingContext elasticsearchMappingContext(ElasticsearchCustomConversions elasticsearchCustomConversions) {
66+
public SimpleElasticsearchMappingContext elasticsearchMappingContext(
67+
ElasticsearchCustomConversions elasticsearchCustomConversions) {
6568

6669
SimpleElasticsearchMappingContext mappingContext = new SimpleElasticsearchMappingContext();
6770
mappingContext.setInitialEntitySet(getInitialEntitySet());
@@ -147,4 +150,14 @@ protected Set<Class<?>> scanForEntities(String basePackage) {
147150

148151
return initialEntitySet;
149152
}
153+
154+
/**
155+
* Set up the write {@link RefreshPolicy}. Default is set to null to use the cluster defaults..
156+
*
157+
* @return {@literal null} to use the server defaults.
158+
*/
159+
@Nullable
160+
protected RefreshPolicy refreshPolicy() {
161+
return null;
162+
}
150163
}

src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchTemplate.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import org.elasticsearch.action.search.MultiSearchRequest;
3232
import org.elasticsearch.action.search.MultiSearchResponse;
3333
import org.elasticsearch.action.search.SearchResponse;
34+
import org.elasticsearch.action.support.WriteRequest;
35+
import org.elasticsearch.action.support.WriteRequestBuilder;
3436
import org.elasticsearch.common.unit.TimeValue;
3537
import org.elasticsearch.index.query.MoreLikeThisQueryBuilder;
3638
import org.elasticsearch.search.suggest.SuggestBuilder;
@@ -82,6 +84,7 @@ public abstract class AbstractElasticsearchTemplate implements ElasticsearchOper
8284
@Nullable protected RequestFactory requestFactory;
8385
@Nullable private EntityOperations entityOperations;
8486
@Nullable private EntityCallbacks entityCallbacks;
87+
@Nullable private RefreshPolicy refreshPolicy;
8588

8689
// region Initialization
8790
protected void initialize(ElasticsearchConverter elasticsearchConverter) {
@@ -130,6 +133,15 @@ public void setEntityCallbacks(EntityCallbacks entityCallbacks) {
130133

131134
this.entityCallbacks = entityCallbacks;
132135
}
136+
137+
public void setRefreshPolicy(@Nullable RefreshPolicy refreshPolicy) {
138+
this.refreshPolicy = refreshPolicy;
139+
}
140+
141+
@Nullable
142+
public RefreshPolicy getRefreshPolicy() {
143+
return refreshPolicy;
144+
}
133145
// endregion
134146

135147
// region DocumentOperations
@@ -308,6 +320,41 @@ public List<IndexedObjectInformation> bulkOperation(List<?> queries, BulkOptions
308320

309321
public abstract List<IndexedObjectInformation> doBulkOperation(List<?> queries, BulkOptions bulkOptions,
310322
IndexCoordinates index);
323+
324+
/**
325+
* Pre process the write request before it is sent to the server, eg. by setting the
326+
* {@link WriteRequest#setRefreshPolicy(String) refresh policy} if applicable.
327+
*
328+
* @param request must not be {@literal null}.
329+
* @param <R>
330+
* @return the processed {@link WriteRequest}.
331+
*/
332+
protected <R extends WriteRequest<R>> R prepareWriteRequest(R request) {
333+
334+
if (refreshPolicy == null) {
335+
return request;
336+
}
337+
338+
return request.setRefreshPolicy(refreshPolicy.toRequestRefreshPolicy());
339+
}
340+
341+
/**
342+
* Pre process the write request before it is sent to the server, eg. by setting the
343+
* {@link WriteRequest#setRefreshPolicy(String) refresh policy} if applicable.
344+
*
345+
* @param requestBuilder must not be {@literal null}.
346+
* @param <R>
347+
* @return the processed {@link WriteRequest}.
348+
*/
349+
protected <R extends WriteRequestBuilder<R>> R prepareWriteRequestBuilder(R requestBuilder) {
350+
351+
if (refreshPolicy == null) {
352+
return requestBuilder;
353+
}
354+
355+
return requestBuilder.setRefreshPolicy(refreshPolicy.toRequestRefreshPolicy());
356+
}
357+
311358
// endregion
312359

313360
// region SearchOperations
@@ -609,6 +656,7 @@ private SeqNoPrimaryTerm getEntitySeqNoPrimaryTerm(Object entity) {
609656
}
610657

611658
private <T> IndexQuery getIndexQuery(T entity) {
659+
612660
String id = getEntityId(entity);
613661

614662
if (id != null) {
@@ -618,7 +666,9 @@ private <T> IndexQuery getIndexQuery(T entity) {
618666
IndexQueryBuilder builder = new IndexQueryBuilder() //
619667
.withId(id) //
620668
.withObject(entity);
669+
621670
SeqNoPrimaryTerm seqNoPrimaryTerm = getEntitySeqNoPrimaryTerm(entity);
671+
622672
if (seqNoPrimaryTerm != null) {
623673
builder.withSeqNoPrimaryTerm(seqNoPrimaryTerm);
624674
} else {
@@ -627,9 +677,11 @@ private <T> IndexQuery getIndexQuery(T entity) {
627677
}
628678

629679
String routing = getEntityRouting(entity);
680+
630681
if (routing != null) {
631682
builder.withRouting(routing);
632683
}
684+
633685
return builder.build();
634686
}
635687

src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ public IndexOperations indexOps(IndexCoordinates index) {
139139
// region DocumentOperations
140140
public String doIndex(IndexQuery query, IndexCoordinates index) {
141141

142-
IndexRequest request = requestFactory.indexRequest(query, index);
142+
IndexRequest request = prepareWriteRequest(requestFactory.indexRequest(query, index));
143143
IndexResponse indexResponse = execute(client -> client.index(request, RequestOptions.DEFAULT));
144144

145145
// We should call this because we are not going through a mapper.
@@ -197,7 +197,8 @@ public String delete(String id, @Nullable String routing, IndexCoordinates index
197197
Assert.notNull(id, "id must not be null");
198198
Assert.notNull(index, "index must not be null");
199199

200-
DeleteRequest request = requestFactory.deleteRequest(elasticsearchConverter.convertId(id), routing, index);
200+
DeleteRequest request = prepareWriteRequest(
201+
requestFactory.deleteRequest(elasticsearchConverter.convertId(id), routing, index));
201202
return execute(client -> client.delete(request, RequestOptions.DEFAULT).getId());
202203
}
203204

@@ -224,7 +225,7 @@ public UpdateResponse update(UpdateQuery query, IndexCoordinates index) {
224225

225226
public List<IndexedObjectInformation> doBulkOperation(List<?> queries, BulkOptions bulkOptions,
226227
IndexCoordinates index) {
227-
BulkRequest bulkRequest = requestFactory.bulkRequest(queries, bulkOptions, index);
228+
BulkRequest bulkRequest = prepareWriteRequest(requestFactory.bulkRequest(queries, bulkOptions, index));
228229
List<IndexedObjectInformation> indexedObjectInformationList = checkForBulkOperationFailure(
229230
execute(client -> client.bulk(bulkRequest, RequestOptions.DEFAULT)));
230231
updateIndexedObjectsWithQueries(queries, indexedObjectInformationList);

src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ public void setSearchTimeout(String searchTimeout) {
147147
public String doIndex(IndexQuery query, IndexCoordinates index) {
148148

149149
IndexRequestBuilder indexRequestBuilder = requestFactory.indexRequestBuilder(client, query, index);
150+
indexRequestBuilder = prepareWriteRequestBuilder(indexRequestBuilder);
150151
ActionFuture<IndexResponse> future = indexRequestBuilder.execute();
151152
IndexResponse response;
152153
try {
@@ -211,8 +212,8 @@ public String delete(String id, @Nullable String routing, IndexCoordinates index
211212
Assert.notNull(id, "id must not be null");
212213
Assert.notNull(index, "index must not be null");
213214

214-
DeleteRequestBuilder deleteRequestBuilder = requestFactory.deleteRequestBuilder(client,
215-
elasticsearchConverter.convertId(id), routing, index);
215+
DeleteRequestBuilder deleteRequestBuilder = prepareWriteRequestBuilder(
216+
requestFactory.deleteRequestBuilder(client, elasticsearchConverter.convertId(id), routing, index));
216217
return deleteRequestBuilder.execute().actionGet().getId();
217218
}
218219

@@ -242,9 +243,10 @@ public UpdateResponse update(UpdateQuery query, IndexCoordinates index) {
242243

243244
public List<IndexedObjectInformation> doBulkOperation(List<?> queries, BulkOptions bulkOptions,
244245
IndexCoordinates index) {
245-
BulkRequestBuilder bulkRequest = requestFactory.bulkRequestBuilder(client, queries, bulkOptions, index);
246+
BulkRequestBuilder bulkRequestBuilder = requestFactory.bulkRequestBuilder(client, queries, bulkOptions, index);
247+
bulkRequestBuilder = prepareWriteRequestBuilder(bulkRequestBuilder);
246248
final List<IndexedObjectInformation> indexedObjectInformations = checkForBulkOperationFailure(
247-
bulkRequest.execute().actionGet());
249+
bulkRequestBuilder.execute().actionGet());
248250
updateIndexedObjectsWithQueries(queries, indexedObjectInformations);
249251
return indexedObjectInformations;
250252
}

0 commit comments

Comments
 (0)