Closed
Description
Top Metrics Aggregation don't work with reactive client.
Steps for reproduce:
Elasticsearch server 7.13.0
Spring boot 2.5.0
Elasticsearch index:
PUT /test
{
"mappings": {
"properties": {
"m": { "type": "double" },
"s": { "type": "keyword" }
}
}
}
POST /test/_bulk?refresh
{"index": {}}
{"s": "a", "m": 3.1415}
{"index": {}}
{"s": "b", "m": 1.0}
{"index": {}}
{"s": "c", "m": 2.71828}
{"index": {}}
{"s": "a", "m": 5.4}
{"index": {}}
{"s": "b", "m": 1.8}
{"index": {}}
{"s": "c", "m": 4.15}
POST /test/_search?filter_path=aggregations
{
"aggs": {
"tm": {
"top_metrics": {
"metrics": {"field": "s"},
"sort": {"m": "desc"}
}
}
}
}
Response (using elasticsearch dev tools): ES return expected result
{
"aggregations" : {
"tm" : {
"top" : [
{
"sort" : [
5.4
],
"metrics" : {
"s" : "a"
}
}
]
}
}
}
Using reactive java client:
SearchSourceBuilder searchRequestBuilder = new SearchSourceBuilder();
searchRequestBuilder.size(0);
searchRequestBuilder.timeout(TimeValue.timeValueSeconds(5));
TopMetricsAggregationBuilder builder = new TopMetricsAggregationBuilder("top_metrics_hits",
new FieldSortBuilder("m").order(SortOrder.DESC),
1,
"s");
searchRequestBuilder.aggregation(builder);
SearchRequest searchRequest = new SearchRequest("test");
searchRequest.source(searchRequestBuilder);
Flux<Aggregation> aggregationFlux = reactiveElasticsearchClient.aggregate(searchRequest);
aggregationFlux.subscribe(aggregation -> LOGGER.warn("aggregation name: {}, type: {}", aggregation.getName(), aggregation.getType()));
// Or also:
// Mono<SearchResponse> searchResponse = reactiveElasticsearchClient.searchForResponse(searchRequest);
// searchResponse.subscribe(response -> {
// Aggregations aggregations = response.getAggregations();
// for (Aggregation aggregation : aggregations) {
// LOGGER.warn("aggregation name: {}, type: {}", aggregation.getName(), aggregation.getType());
// }
// });
I getting error: org.elasticsearch.common.xcontent.NamedObjectNotFoundException: [1:174] unknown field [top_metrics]
Full stack trace:
reactor.core.Exceptions$ErrorCallbackNotImplemented: ElasticsearchStatusException[{"took":115,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"max_score":null,"hits":[]},"aggregations":{"top_metrics#top_metrics_hits":{"top":[{"sort":[5.4],"metrics":{"s":"a"}}]}}}]
Caused by: org.elasticsearch.ElasticsearchStatusException: {"took":115,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"max_score":null,"hits":[]},"aggregations":{"top_metrics#top_metrics_hits":{"top":[{"sort":[5.4],"metrics":{"s":"a"}}]}}}
at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.doDecode(DefaultReactiveElasticsearchClient.java:827) ~[spring-data-elasticsearch-4.2.1.jar:4.2.1]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.MonoFlatMap] :
reactor.core.publisher.Mono.flatMap
org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.readResponseBody(DefaultReactiveElasticsearchClient.java:802)
Error has been observed at the following site(s):
|_ Mono.flatMap ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.readResponseBody(DefaultReactiveElasticsearchClient.java:802)
|_ Mono.from ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.lambda$null$19(DefaultReactiveElasticsearchClient.java:564)
|_ Mono.flatMap ⇢ at org.springframework.web.reactive.function.client.DefaultWebClient$DefaultRequestBodyUriSpec.lambda$exchangeToMono$3(DefaultWebClient.java:400)
|_ Mono.switchIfEmpty ⇢ at org.springframework.web.reactive.function.client.DefaultWebClient$DefaultRequestBodyUriSpec.lambda$exchangeToMono$3(DefaultWebClient.java:401)
|_ Mono.then ⇢ at org.springframework.web.reactive.function.client.DefaultWebClient.releaseIfNotConsumed(DefaultWebClient.java:157)
|_ ⇢ at org.springframework.web.reactive.function.client.DefaultWebClient.access$1000(DefaultWebClient.java:66)
|_ ⇢ at org.springframework.web.reactive.function.client.DefaultWebClient$DefaultRequestBodyUriSpec.lambda$null$2(DefaultWebClient.java:402)
|_ Mono.onErrorResume ⇢ at org.springframework.web.reactive.function.client.DefaultWebClient$DefaultRequestBodyUriSpec.lambda$exchangeToMono$3(DefaultWebClient.java:402)
|_ Mono.flatMap ⇢ at org.springframework.web.reactive.function.client.DefaultWebClient$DefaultRequestBodyUriSpec.exchangeToMono(DefaultWebClient.java:397)
|_ ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.lambda$sendRequest$20(DefaultReactiveElasticsearchClient.java:562)
|_ Mono.flatMap ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.execute(DefaultReactiveElasticsearchClient.java:501)
|_ Mono.error ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.lambda$execute$17(DefaultReactiveElasticsearchClient.java:509)
|_ Mono.onErrorResume ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.execute(DefaultReactiveElasticsearchClient.java:502)
|_ Flux.from ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.sendRequest(DefaultReactiveElasticsearchClient.java:562)
|_ ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.sendRequest(DefaultReactiveElasticsearchClient.java:554)
|_ Flux.map ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.aggregate(DefaultReactiveElasticsearchClient.java:424)
|_ Flux.flatMap ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.aggregate(DefaultReactiveElasticsearchClient.java:425)
|_ ⇢ at org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.aggregate(ReactiveElasticsearchClient.java:562)
Stack trace:
at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.doDecode(DefaultReactiveElasticsearchClient.java:827) ~[spring-data-elasticsearch-4.2.1.jar:4.2.1]
at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.lambda$readResponseBody$29(DefaultReactiveElasticsearchClient.java:802) ~[spring-data-elasticsearch-4.2.1.jar:4.2.1]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:125) ~[reactor-core-3.4.6.jar:3.4.6]
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210) ~[reactor-core-3.4.6.jar:3.4.6]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127) ~[reactor-core-3.4.6.jar:3.4.6]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127) ~[reactor-core-3.4.6.jar:3.4.6]
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107) ~[reactor-core-3.4.6.jar:3.4.6]
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:295) ~[reactor-core-3.4.6.jar:3.4.6]
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337) ~[reactor-core-3.4.6.jar:3.4.6]
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815) ~[reactor-core-3.4.6.jar:3.4.6]
at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:159) ~[reactor-core-3.4.6.jar:3.4.6]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:150) ~[reactor-core-3.4.6.jar:3.4.6]
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onComplete(FluxPeekFuseable.java:277) ~[reactor-core-3.4.6.jar:3.4.6]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:150) ~[reactor-core-3.4.6.jar:3.4.6]
at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:401) ~[reactor-netty-core-1.0.7.jar:1.0.7]
at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:416) ~[reactor-netty-core-1.0.7.jar:1.0.7]
at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:470) ~[reactor-netty-core-1.0.7.jar:1.0.7]
at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:685) ~[reactor-netty-http-1.0.7.jar:1.0.7]
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94) ~[reactor-netty-core-1.0.7.jar:1.0.7]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) ~[netty-handler-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) ~[netty-codec-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) ~[netty-codec-4.1.65.Final.jar:4.1.65.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) ~[netty-codec-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.65.Final.jar:4.1.65.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.65.Final.jar:4.1.65.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.65.Final.jar:4.1.65.Final]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]