Skip to content

TopMetricsAggregation NamedObjectNotFoundException: unknown field [top_metrics] #1834

Closed
@griand

Description

@griand

Top Metrics Aggregation don't work with reactive client.

Steps for reproduce:
Elasticsearch server 7.13.0
Spring boot 2.5.0

Elasticsearch index:

PUT /test
{
  "mappings": {
    "properties": {
      "m":    { "type": "double" },  
      "s":  { "type": "keyword"  } 
    }
  }
}

POST /test/_bulk?refresh
{"index": {}}
{"s": "a", "m": 3.1415}
{"index": {}}
{"s": "b", "m": 1.0}
{"index": {}}
{"s": "c", "m": 2.71828}
{"index": {}}
{"s": "a", "m": 5.4}
{"index": {}}
{"s": "b", "m": 1.8}
{"index": {}}
{"s": "c", "m": 4.15}


POST /test/_search?filter_path=aggregations
{
  "aggs": {
    "tm": {
      "top_metrics": {
        "metrics": {"field": "s"},
        "sort": {"m": "desc"}
      }
    }
  }
}

Response (using elasticsearch dev tools): ES return expected result

{
  "aggregations" : {
    "tm" : {
      "top" : [
        {
          "sort" : [
            5.4
          ],
          "metrics" : {
            "s" : "a"
          }
        }
      ]
    }
  }
}

Using reactive java client:

        SearchSourceBuilder searchRequestBuilder = new SearchSourceBuilder();
        searchRequestBuilder.size(0);
        searchRequestBuilder.timeout(TimeValue.timeValueSeconds(5));

        TopMetricsAggregationBuilder builder = new TopMetricsAggregationBuilder("top_metrics_hits",
                new FieldSortBuilder("m").order(SortOrder.DESC),
                1,
                "s");

        searchRequestBuilder.aggregation(builder);

        SearchRequest searchRequest = new SearchRequest("test");
        searchRequest.source(searchRequestBuilder);
        Flux<Aggregation> aggregationFlux = reactiveElasticsearchClient.aggregate(searchRequest);

        aggregationFlux.subscribe(aggregation -> LOGGER.warn("aggregation name: {}, type: {}", aggregation.getName(), aggregation.getType()));

//   Or also:
//        Mono<SearchResponse> searchResponse = reactiveElasticsearchClient.searchForResponse(searchRequest);
//        searchResponse.subscribe(response -> {
//            Aggregations aggregations = response.getAggregations();
//            for (Aggregation aggregation : aggregations) {
//                LOGGER.warn("aggregation name: {}, type: {}", aggregation.getName(), aggregation.getType());
//            }
//        });

I getting error: org.elasticsearch.common.xcontent.NamedObjectNotFoundException: [1:174] unknown field [top_metrics]

Full stack trace:

reactor.core.Exceptions$ErrorCallbackNotImplemented: ElasticsearchStatusException[{"took":115,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"max_score":null,"hits":[]},"aggregations":{"top_metrics#top_metrics_hits":{"top":[{"sort":[5.4],"metrics":{"s":"a"}}]}}}]
Caused by: org.elasticsearch.ElasticsearchStatusException: {"took":115,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"max_score":null,"hits":[]},"aggregations":{"top_metrics#top_metrics_hits":{"top":[{"sort":[5.4],"metrics":{"s":"a"}}]}}}
	at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.doDecode(DefaultReactiveElasticsearchClient.java:827) ~[spring-data-elasticsearch-4.2.1.jar:4.2.1]
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoFlatMap] :
	reactor.core.publisher.Mono.flatMap
	org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.readResponseBody(DefaultReactiveElasticsearchClient.java:802)
Error has been observed at the following site(s):
	|_       Mono.flatMap ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.readResponseBody(DefaultReactiveElasticsearchClient.java:802)
	|_          Mono.from ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.lambda$null$19(DefaultReactiveElasticsearchClient.java:564)
	|_       Mono.flatMap ⇢ at org.springframework.web.reactive.function.client.DefaultWebClient$DefaultRequestBodyUriSpec.lambda$exchangeToMono$3(DefaultWebClient.java:400)
	|_ Mono.switchIfEmpty ⇢ at org.springframework.web.reactive.function.client.DefaultWebClient$DefaultRequestBodyUriSpec.lambda$exchangeToMono$3(DefaultWebClient.java:401)
	|_          Mono.then ⇢ at org.springframework.web.reactive.function.client.DefaultWebClient.releaseIfNotConsumed(DefaultWebClient.java:157)
	|_                    ⇢ at org.springframework.web.reactive.function.client.DefaultWebClient.access$1000(DefaultWebClient.java:66)
	|_                    ⇢ at org.springframework.web.reactive.function.client.DefaultWebClient$DefaultRequestBodyUriSpec.lambda$null$2(DefaultWebClient.java:402)
	|_ Mono.onErrorResume ⇢ at org.springframework.web.reactive.function.client.DefaultWebClient$DefaultRequestBodyUriSpec.lambda$exchangeToMono$3(DefaultWebClient.java:402)
	|_       Mono.flatMap ⇢ at org.springframework.web.reactive.function.client.DefaultWebClient$DefaultRequestBodyUriSpec.exchangeToMono(DefaultWebClient.java:397)
	|_                    ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.lambda$sendRequest$20(DefaultReactiveElasticsearchClient.java:562)
	|_       Mono.flatMap ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.execute(DefaultReactiveElasticsearchClient.java:501)
	|_         Mono.error ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.lambda$execute$17(DefaultReactiveElasticsearchClient.java:509)
	|_ Mono.onErrorResume ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.execute(DefaultReactiveElasticsearchClient.java:502)
	|_          Flux.from ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.sendRequest(DefaultReactiveElasticsearchClient.java:562)
	|_                    ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.sendRequest(DefaultReactiveElasticsearchClient.java:554)
	|_           Flux.map ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.aggregate(DefaultReactiveElasticsearchClient.java:424)
	|_       Flux.flatMap ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.aggregate(DefaultReactiveElasticsearchClient.java:425)
	|_                    ⇢ at org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.aggregate(ReactiveElasticsearchClient.java:562)
Stack trace:
		at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.doDecode(DefaultReactiveElasticsearchClient.java:827) ~[spring-data-elasticsearch-4.2.1.jar:4.2.1]
		at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.lambda$readResponseBody$29(DefaultReactiveElasticsearchClient.java:802) ~[spring-data-elasticsearch-4.2.1.jar:4.2.1]
		at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:125) ~[reactor-core-3.4.6.jar:3.4.6]
		at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210) ~[reactor-core-3.4.6.jar:3.4.6]
		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127) ~[reactor-core-3.4.6.jar:3.4.6]
		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127) ~[reactor-core-3.4.6.jar:3.4.6]
		at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107) ~[reactor-core-3.4.6.jar:3.4.6]
		at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:295) ~[reactor-core-3.4.6.jar:3.4.6]
		at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337) ~[reactor-core-3.4.6.jar:3.4.6]
		at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815) ~[reactor-core-3.4.6.jar:3.4.6]
		at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:159) ~[reactor-core-3.4.6.jar:3.4.6]
		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:150) ~[reactor-core-3.4.6.jar:3.4.6]
		at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onComplete(FluxPeekFuseable.java:277) ~[reactor-core-3.4.6.jar:3.4.6]
		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:150) ~[reactor-core-3.4.6.jar:3.4.6]
		at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:401) ~[reactor-netty-core-1.0.7.jar:1.0.7]
		at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:416) ~[reactor-netty-core-1.0.7.jar:1.0.7]
		at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:470) ~[reactor-netty-core-1.0.7.jar:1.0.7]
		at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:685) ~[reactor-netty-http-1.0.7.jar:1.0.7]
		at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94) ~[reactor-netty-core-1.0.7.jar:1.0.7]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
		at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) ~[netty-handler-4.1.65.Final.jar:4.1.65.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
		at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) ~[netty-codec-4.1.65.Final.jar:4.1.65.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
		at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
		at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) ~[netty-codec-4.1.65.Final.jar:4.1.65.Final]
		at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) ~[netty-codec-4.1.65.Final.jar:4.1.65.Final]
		at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
		at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
		at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
		at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
		at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
		at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
		at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
		at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
		at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.65.Final.jar:4.1.65.Final]
		at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.65.Final.jar:4.1.65.Final]
		at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.65.Final.jar:4.1.65.Final]
		at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions