Description
Hello team,
I wanted to reach out reporting an issue 100% reproducible with reactive spring data elasticsearch please.
The setup is simple:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.0-M1</version>
<relativePath/>
</parent>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>4.0.2-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
@Configuration
@EnableReactiveElasticsearchRepositories(Constant.BASE_PACKAGE_ELASTIC)
public class ElasticConfiguration extends ReactiveElasticsearchConfiguration {
@Override
public final ClientConfiguration clientConfiguration() {
return ClientConfiguration.builder().connectedTo("1xx.xx.xx.xx:9200").build();
}
}
@Repository
public interface ElasticRepository extends ReactiveElasticsearchRepository<MyPojo, String> {
}
@org.springframework.data.elasticsearch.annotations.Document(indexName = Constant.ELASTIC_INDEX_NAME)
public record MyPojo(String name, String protocol, @Id String clientTs) {
}
The business logic is straightforward, using Reactor-Kafka; Spring Cloud Stream Kafka Reactive, or Spring Webflux, I will receive a flux of MyPojo. My goal is just to save them inside Elastic.
The rate is quite fast, therefore, I am using the reactive repository.
When using above setup, I will get 100%:
java.lang.RuntimeException: Connection lease request time out
at org.springframework.data.elasticsearch.client.elc.ReactiveElasticsearchTemplate.translateException(ReactiveElasticsearchTemplate.java:670)
at reactor.core.publisher.Flux.lambda$onErrorMap$27(Flux.java:7099)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
at reactor.core.publisher.MonoCompletionStage$MonoCompletionStageSubscription.apply(MonoCompletionStage.java:117)
at reactor.core.publisher.MonoCompletionStage$MonoCompletionStageSubscription.apply(MonoCompletionStage.java:64)
at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934)
at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162)
at co.elastic.clients.transport.rest_client.RestClientTransport$1.onFailure(RestClientTransport.java:179)
at org.elasticsearch.client.RestClient$FailureTrackingResponseListener.onDefinitiveFailure(RestClient.java:684)
at org.elasticsearch.client.RestClient$1.failed(RestClient.java:422)
at org.apache.http.concurrent.BasicFuture.failed(BasicFuture.java:137)
at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.executionFailed(DefaultClientExchangeHandlerImpl.java:101)
at org.apache.http.impl.nio.client.AbstractClientExchangeHandler.failed(AbstractClientExchangeHandler.java:432)
at org.apache.http.impl.nio.client.AbstractClientExchangeHandler.connectionRequestFailed(AbstractClientExchangeHandler.java:352)
at org.apache.http.impl.nio.client.AbstractClientExchangeHandler.access$100(AbstractClientExchangeHandler.java:64)
at org.apache.http.impl.nio.client.AbstractClientExchangeHandler$1.failed(AbstractClientExchangeHandler.java:396)
at org.apache.http.concurrent.BasicFuture.failed(BasicFuture.java:137)
at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager$1.failed(PoolingNHttpClientConnectionManager.java:318)
at org.apache.http.concurrent.BasicFuture.failed(BasicFuture.java:137)
at org.apache.http.nio.pool.AbstractNIOConnPool.fireCallbacks(AbstractNIOConnPool.java:501)
at org.apache.http.nio.pool.AbstractNIOConnPool.release(AbstractNIOConnPool.java:360)
at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.releaseConnection(PoolingNHttpClientConnectionManager.java:393)
at org.apache.http.impl.nio.client.AbstractClientExchangeHandler.releaseConnection(AbstractClientExchangeHandler.java:247)
at org.apache.http.impl.nio.client.MainClientExec.responseCompleted(MainClientExec.java:387)
at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:173)
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:448)
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:338)
at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:87)
at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:40)
at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:121)
at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.util.concurrent.TimeoutException: Connection lease request time out
at org.apache.http.nio.pool.AbstractNIOConnPool.processPendingRequest(AbstractNIOConnPool.java:411)
at org.apache.http.nio.pool.AbstractNIOConnPool.processNextPendingRequest(AbstractNIOConnPool.java:391)
at org.apache.http.nio.pool.AbstractNIOConnPool.release(AbstractNIOConnPool.java:355)
... 17 common frames omitted
The thing is:
I tried ``` .withConnectTimeout() .withSocketTimeout()`` set to some crazy value, still getting this error.
I tried switching to non reactive, and it works.
Finally, I will see this stack trace after processing many messages. Yet, I will see only 1 Hit in Elasticsearch.
May I ask what is the root cause of this issue please?
Thank you