Skip to content

DATAES-982 - Improve refresh handling. #573

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 22 additions & 28 deletions src/main/asciidoc/reference/elasticsearch-clients.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,32 @@ public class TransportClientConfig extends ElasticsearchConfigurationSupport {

@Bean
public Client elasticsearchClient() throws UnknownHostException {
Settings settings = Settings.builder().put("cluster.name", "elasticsearch").build(); <1>
Settings settings = Settings.builder().put("cluster.name", "elasticsearch").build(); <.>
TransportClient client = new PreBuiltTransportClient(settings);
client.addTransportAddress(new TransportAddress(InetAddress.getByName("127.0.0.1"), 9300)); <2>
client.addTransportAddress(new TransportAddress(InetAddress.getByName("127.0.0.1"), 9300)); <.>
return client;
}

@Bean(name = { "elasticsearchOperations", "elasticsearchTemplate" })
public ElasticsearchTemplate elasticsearchTemplate() throws UnknownHostException {
return new ElasticsearchTemplate(elasticsearchClient());

ElasticsearchTemplate template = new ElasticsearchTemplate(elasticsearchClient, elasticsearchConverter);
template.setRefreshPolicy(refreshPolicy()); <.>

return template;
}
}

// ...

IndexRequest request = new IndexRequest("spring-data", "elasticsearch", randomID())
.source(someObject)
.setRefreshPolicy(IMMEDIATE);
.source(someObject);

IndexResponse response = client.index(request);
----
<1> The `TransportClient` must be configured with the cluster name.
<2> The host and port to connect the client to.
<.> The `TransportClient` must be configured with the cluster name.
<.> The host and port to connect the client to.
<.> the RefreshPolicy must be set in the `ElasticsearchTemplate` (override `refreshPolicy()` to not use the default)
====

[[elasticsearch.clients.rest]]
Expand Down Expand Up @@ -103,39 +107,29 @@ Calls are directly operated on the reactive stack, **not** wrapping async (threa
====
[source,java]
----
static class Config {

@Bean
ReactiveElasticsearchClient client() {
@Configuration
public class ReactiveRestClientConfig extends AbstractReactiveElasticsearchConfiguration {

ClientConfiguration clientConfiguration = ClientConfiguration.builder() <1>
.connectedTo("localhost:9200", "localhost:9291")
.withWebClientConfigurer(webClient -> { <2>
ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder()
.codecs(configurer -> configurer.defaultCodecs()
.maxInMemorySize(-1))
@Override
@Bean
public ReactiveElasticsearchClient reactiveElasticsearchClient() {
final ClientConfiguration clientConfiguration = ClientConfiguration.builder() <.>
.connectedTo("localhost:9200") //
.build();
return webClient.mutate().exchangeStrategies(exchangeStrategies).build();
})
.build();
return ReactiveRestClients.create(clientConfiguration);

return ReactiveRestClients.create(clientConfiguration);
}
}
}

// ...

Mono<IndexResponse> response = client.index(request ->

request.index("spring-data")
.type("elasticsearch")
.id(randomID())
.source(singletonMap("feature", "reactive-client"))
.setRefreshPolicy(IMMEDIATE);
.source(singletonMap("feature", "reactive-client"));
);
----
<1> Use the builder to provide cluster addresses, set default `HttpHeaders` or enable SSL.
<2> when configuring a reactive client, the `withWebClientConfigurer` hook can be used to customize the WebClient.
<.> Use the builder to provide cluster addresses, set default `HttpHeaders` or enable SSL.
====

NOTE: The ReactiveClient response, especially for search operations, is bound to the `from` (offset) & `size` (limit) options of the request.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
[[elasticsearch-migration-guide-4.1-4.2]]
= Upgrading from 4.1.x to 4.2.x

This section describes breaking changes from version 4.1.x to 4.2.x and how removed features can be replaced by new introduced features.

[[elasticsearch-migration-guide-4.1-4.2.deprecations]]
== Deprecations

[[elasticsearch-migration-guide-4.1-4.2.removal]]
== Removals

[[elasticsearch-migration-guide-4.1-4.2.breaking-changes]]
== Breaking Changes

=== RefreshPolicy

==== Enum package changed

It was possible in 4.1 to configure the refresh policy for the `ReactiveElasticsearchTemplate` by overriding the method `AbstractReactiveElasticsearchConfiguration.refreshPolicy()` in a custom configuration class. The return value of this method was an instance of the class `org.elasticsearch.action.support.WriteRequest.RefreshPolicy`.

Now the configuration must return `org.springframework.data.elasticsearch.core.RefreshPolicy`. This enum has the same values and triggers the same behaviour as before, so only the `import` statement has to be adjusted.

==== Refresh behaviour

`ElasticsearchOperations` and `ReactiveElasticsearchOperations` now explicitly use the `RefreshPolicy` set on the template for write requests if not null. If the refresh policy is null, then nothing special is done, so the cluster defaults are used. `ElasticsearchOperations` was always using the cluster default before this version.

The provided implementations for `ElasticsearchRepository` and `ReactiveElasticsearchRepository` will do an explicit refresh when the refresh policy is null. This is the same behaviour as in previous versions. If a refresh policy is set, then it will be used by the repositories as well.

==== Refresh configuration

When configuring Spring Data Elasticsearch like described in <<elasticsearch.clients>> by using `ElasticsearchConfigurationSupport`, `AbstractElasticsearchConfiguration` or `AbstractReactiveElasticsearchConfiguration` the refresh policy will be initialized to `null`. Previously the reactive code initialized this to `IMMEDIATE`, now reactive and
non-reactive code show the same behaviour.
2 changes: 2 additions & 0 deletions src/main/asciidoc/reference/migration-guides.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@
include::elasticsearch-migration-guide-3.2-4.0.adoc[]

include::elasticsearch-migration-guide-4.0-4.1.adoc[]

include::elasticsearch-migration-guide-4.1-4.2.adoc[]
:leveloffset: -1
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ public abstract class AbstractElasticsearchConfiguration extends ElasticsearchCo
@Bean(name = { "elasticsearchOperations", "elasticsearchTemplate" })
public ElasticsearchOperations elasticsearchOperations(ElasticsearchConverter elasticsearchConverter,
RestHighLevelClient elasticsearchClient) {
return new ElasticsearchRestTemplate(elasticsearchClient, elasticsearchConverter);

ElasticsearchRestTemplate template = new ElasticsearchRestTemplate(elasticsearchClient, elasticsearchConverter);
template.setRefreshPolicy(refreshPolicy());

return template;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
package org.springframework.data.elasticsearch.config;

import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.springframework.context.annotation.Bean;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
import org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations;
import org.springframework.data.elasticsearch.core.ReactiveElasticsearchTemplate;
import org.springframework.data.elasticsearch.core.RefreshPolicy;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.lang.Nullable;

Expand Down Expand Up @@ -58,13 +58,13 @@ public ReactiveElasticsearchOperations reactiveElasticsearchTemplate(Elasticsear
}

/**
* Set up the write {@link RefreshPolicy}. Default is set to {@link RefreshPolicy#IMMEDIATE}.
* Set up the write {@link RefreshPolicy}. Default is set to null to use the cluster defaults..
*
* @return {@literal null} to use the server defaults.
*/
@Nullable
protected RefreshPolicy refreshPolicy() {
return RefreshPolicy.IMMEDIATE;
return null;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@
import org.springframework.core.type.filter.AnnotationTypeFilter;
import org.springframework.data.annotation.Persistent;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.core.RefreshPolicy;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchCustomConversions;
import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchConverter;
import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext;
import org.springframework.lang.Nullable;
import org.springframework.util.ClassUtils;
import org.springframework.util.StringUtils;

Expand All @@ -44,8 +46,8 @@
public class ElasticsearchConfigurationSupport {

@Bean
public ElasticsearchConverter elasticsearchEntityMapper(
SimpleElasticsearchMappingContext elasticsearchMappingContext, ElasticsearchCustomConversions elasticsearchCustomConversions) {
public ElasticsearchConverter elasticsearchEntityMapper(SimpleElasticsearchMappingContext elasticsearchMappingContext,
ElasticsearchCustomConversions elasticsearchCustomConversions) {

MappingElasticsearchConverter elasticsearchConverter = new MappingElasticsearchConverter(
elasticsearchMappingContext);
Expand All @@ -61,7 +63,8 @@ public ElasticsearchConverter elasticsearchEntityMapper(
* @return never {@literal null}.
*/
@Bean
public SimpleElasticsearchMappingContext elasticsearchMappingContext(ElasticsearchCustomConversions elasticsearchCustomConversions) {
public SimpleElasticsearchMappingContext elasticsearchMappingContext(
ElasticsearchCustomConversions elasticsearchCustomConversions) {

SimpleElasticsearchMappingContext mappingContext = new SimpleElasticsearchMappingContext();
mappingContext.setInitialEntitySet(getInitialEntitySet());
Expand Down Expand Up @@ -147,4 +150,14 @@ protected Set<Class<?>> scanForEntities(String basePackage) {

return initialEntitySet;
}

/**
* Set up the write {@link RefreshPolicy}. Default is set to null to use the cluster defaults..
*
* @return {@literal null} to use the server defaults.
*/
@Nullable
protected RefreshPolicy refreshPolicy() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.WriteRequestBuilder;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.MoreLikeThisQueryBuilder;
import org.elasticsearch.search.suggest.SuggestBuilder;
Expand Down Expand Up @@ -82,6 +84,7 @@ public abstract class AbstractElasticsearchTemplate implements ElasticsearchOper
@Nullable protected RequestFactory requestFactory;
@Nullable private EntityOperations entityOperations;
@Nullable private EntityCallbacks entityCallbacks;
@Nullable private RefreshPolicy refreshPolicy;

// region Initialization
protected void initialize(ElasticsearchConverter elasticsearchConverter) {
Expand Down Expand Up @@ -130,6 +133,15 @@ public void setEntityCallbacks(EntityCallbacks entityCallbacks) {

this.entityCallbacks = entityCallbacks;
}

public void setRefreshPolicy(@Nullable RefreshPolicy refreshPolicy) {
this.refreshPolicy = refreshPolicy;
}

@Nullable
public RefreshPolicy getRefreshPolicy() {
return refreshPolicy;
}
// endregion

// region DocumentOperations
Expand Down Expand Up @@ -308,6 +320,41 @@ public List<IndexedObjectInformation> bulkOperation(List<?> queries, BulkOptions

public abstract List<IndexedObjectInformation> doBulkOperation(List<?> queries, BulkOptions bulkOptions,
IndexCoordinates index);

/**
* Pre process the write request before it is sent to the server, eg. by setting the
* {@link WriteRequest#setRefreshPolicy(String) refresh policy} if applicable.
*
* @param request must not be {@literal null}.
* @param <R>
* @return the processed {@link WriteRequest}.
*/
protected <R extends WriteRequest<R>> R prepareWriteRequest(R request) {

if (refreshPolicy == null) {
return request;
}

return request.setRefreshPolicy(refreshPolicy.toRequestRefreshPolicy());
}

/**
* Pre process the write request before it is sent to the server, eg. by setting the
* {@link WriteRequest#setRefreshPolicy(String) refresh policy} if applicable.
*
* @param requestBuilder must not be {@literal null}.
* @param <R>
* @return the processed {@link WriteRequest}.
*/
protected <R extends WriteRequestBuilder<R>> R prepareWriteRequestBuilder(R requestBuilder) {

if (refreshPolicy == null) {
return requestBuilder;
}

return requestBuilder.setRefreshPolicy(refreshPolicy.toRequestRefreshPolicy());
}

// endregion

// region SearchOperations
Expand Down Expand Up @@ -609,6 +656,7 @@ private SeqNoPrimaryTerm getEntitySeqNoPrimaryTerm(Object entity) {
}

private <T> IndexQuery getIndexQuery(T entity) {

String id = getEntityId(entity);

if (id != null) {
Expand All @@ -618,7 +666,9 @@ private <T> IndexQuery getIndexQuery(T entity) {
IndexQueryBuilder builder = new IndexQueryBuilder() //
.withId(id) //
.withObject(entity);

SeqNoPrimaryTerm seqNoPrimaryTerm = getEntitySeqNoPrimaryTerm(entity);

if (seqNoPrimaryTerm != null) {
builder.withSeqNoPrimaryTerm(seqNoPrimaryTerm);
} else {
Expand All @@ -627,9 +677,11 @@ private <T> IndexQuery getIndexQuery(T entity) {
}

String routing = getEntityRouting(entity);

if (routing != null) {
builder.withRouting(routing);
}

return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public IndexOperations indexOps(IndexCoordinates index) {
// region DocumentOperations
public String doIndex(IndexQuery query, IndexCoordinates index) {

IndexRequest request = requestFactory.indexRequest(query, index);
IndexRequest request = prepareWriteRequest(requestFactory.indexRequest(query, index));
IndexResponse indexResponse = execute(client -> client.index(request, RequestOptions.DEFAULT));

// We should call this because we are not going through a mapper.
Expand Down Expand Up @@ -197,7 +197,8 @@ public String delete(String id, @Nullable String routing, IndexCoordinates index
Assert.notNull(id, "id must not be null");
Assert.notNull(index, "index must not be null");

DeleteRequest request = requestFactory.deleteRequest(elasticsearchConverter.convertId(id), routing, index);
DeleteRequest request = prepareWriteRequest(
requestFactory.deleteRequest(elasticsearchConverter.convertId(id), routing, index));
return execute(client -> client.delete(request, RequestOptions.DEFAULT).getId());
}

Expand All @@ -224,7 +225,7 @@ public UpdateResponse update(UpdateQuery query, IndexCoordinates index) {

public List<IndexedObjectInformation> doBulkOperation(List<?> queries, BulkOptions bulkOptions,
IndexCoordinates index) {
BulkRequest bulkRequest = requestFactory.bulkRequest(queries, bulkOptions, index);
BulkRequest bulkRequest = prepareWriteRequest(requestFactory.bulkRequest(queries, bulkOptions, index));
List<IndexedObjectInformation> indexedObjectInformationList = checkForBulkOperationFailure(
execute(client -> client.bulk(bulkRequest, RequestOptions.DEFAULT)));
updateIndexedObjectsWithQueries(queries, indexedObjectInformationList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ public void setSearchTimeout(String searchTimeout) {
public String doIndex(IndexQuery query, IndexCoordinates index) {

IndexRequestBuilder indexRequestBuilder = requestFactory.indexRequestBuilder(client, query, index);
indexRequestBuilder = prepareWriteRequestBuilder(indexRequestBuilder);
ActionFuture<IndexResponse> future = indexRequestBuilder.execute();
IndexResponse response;
try {
Expand Down Expand Up @@ -211,8 +212,8 @@ public String delete(String id, @Nullable String routing, IndexCoordinates index
Assert.notNull(id, "id must not be null");
Assert.notNull(index, "index must not be null");

DeleteRequestBuilder deleteRequestBuilder = requestFactory.deleteRequestBuilder(client,
elasticsearchConverter.convertId(id), routing, index);
DeleteRequestBuilder deleteRequestBuilder = prepareWriteRequestBuilder(
requestFactory.deleteRequestBuilder(client, elasticsearchConverter.convertId(id), routing, index));
return deleteRequestBuilder.execute().actionGet().getId();
}

Expand Down Expand Up @@ -242,9 +243,10 @@ public UpdateResponse update(UpdateQuery query, IndexCoordinates index) {

public List<IndexedObjectInformation> doBulkOperation(List<?> queries, BulkOptions bulkOptions,
IndexCoordinates index) {
BulkRequestBuilder bulkRequest = requestFactory.bulkRequestBuilder(client, queries, bulkOptions, index);
BulkRequestBuilder bulkRequestBuilder = requestFactory.bulkRequestBuilder(client, queries, bulkOptions, index);
bulkRequestBuilder = prepareWriteRequestBuilder(bulkRequestBuilder);
final List<IndexedObjectInformation> indexedObjectInformations = checkForBulkOperationFailure(
bulkRequest.execute().actionGet());
bulkRequestBuilder.execute().actionGet());
updateIndexedObjectsWithQueries(queries, indexedObjectInformations);
return indexedObjectInformations;
}
Expand Down
Loading