From 0fe0908da105c3a4b3c964bcd379e688b2569329 Mon Sep 17 00:00:00 2001 From: pedro-cf Date: Sun, 5 May 2024 16:41:24 +0100 Subject: [PATCH 01/21] improved pagination --- .../elasticsearch/database_logic.py | 42 +++++++------------ 1 file changed, 16 insertions(+), 26 deletions(-) diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py index 49c44a60..4d5dd9fc 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py @@ -552,8 +552,12 @@ async def execute_search( NotFoundError: If the collections specified in `collection_ids` do not exist. """ search_after = None + page = 1 # Default page number + if token: - search_after = urlsafe_b64decode(token.encode()).decode().split(",") + decoded_token = urlsafe_b64decode(token.encode()).decode().split(",") + search_after = decoded_token[:-1] # Extract sort values from the token + page = int(decoded_token[-1]) + 1 # Extract previous page number from the token and increment for next page query = search.query.to_dict() if search.query else None @@ -569,39 +573,25 @@ async def execute_search( size=limit, ) ) - - count_task = asyncio.create_task( - self.client.count( - index=index_param, - ignore_unavailable=ignore_unavailable, - body=search.to_dict(count=True), - ) - ) - + try: es_response = await search_task except exceptions.NotFoundError: raise NotFoundError(f"Collections '{collection_ids}' do not exist") - + + matched = es_response["hits"]["total"]["value"] hits = es_response["hits"]["hits"] items = (hit["_source"] for hit in hits) next_token = None - if hits and (sort_array := hits[-1].get("sort")): - next_token = urlsafe_b64encode( - ",".join([str(x) for x in sort_array]).encode() - ).decode() - - # (1) count should not block returning results, so don't wait for it to be done - # (2) don't cancel the task so that it will populate the ES cache for subsequent counts - maybe_count = None - if count_task.done(): - try: - maybe_count = count_task.result().get("count") - except Exception as e: - logger.error(f"Count task failed: {e}") - - return items, maybe_count, next_token + + if matched > page * limit: # Check if there are more results beyond the current page + if hits and (sort_array := hits[-1].get("sort")): + next_token = urlsafe_b64encode( + ",".join([str(x) for x in sort_array] + [str(page)]).encode() + ).decode() + + return items, matched, next_token """ TRANSACTION LOGIC """ From aa070f242b402621f76cf1cde315c5a24fa6bc68 Mon Sep 17 00:00:00 2001 From: pedro-cf Date: Sun, 5 May 2024 16:43:49 +0100 Subject: [PATCH 02/21] pre-commit run --all-files --- .../stac_fastapi/elasticsearch/database_logic.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py index 4d5dd9fc..41ecf32e 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py @@ -552,12 +552,12 @@ async def execute_search( NotFoundError: If the collections specified in `collection_ids` do not exist. """ search_after = None - page = 1 # Default page number + page = 1 if token: decoded_token = urlsafe_b64decode(token.encode()).decode().split(",") - search_after = decoded_token[:-1] # Extract sort values from the token - page = int(decoded_token[-1]) + 1 # Extract previous page number from the token and increment for next page + search_after = decoded_token[:-1] + page = int(decoded_token[-1]) + 1 query = search.query.to_dict() if search.query else None @@ -573,19 +573,19 @@ async def execute_search( size=limit, ) ) - + try: es_response = await search_task except exceptions.NotFoundError: raise NotFoundError(f"Collections '{collection_ids}' do not exist") - + matched = es_response["hits"]["total"]["value"] hits = es_response["hits"]["hits"] items = (hit["_source"] for hit in hits) next_token = None - - if matched > page * limit: # Check if there are more results beyond the current page + + if matched > page * limit: if hits and (sort_array := hits[-1].get("sort")): next_token = urlsafe_b64encode( ",".join([str(x) for x in sort_array] + [str(page)]).encode() From 4cb40628cfdda3c797b52aa35ff97866fab44b16 Mon Sep 17 00:00:00 2001 From: pedro-cf Date: Sun, 5 May 2024 16:50:52 +0100 Subject: [PATCH 03/21] . --- .../elasticsearch/stac_fastapi/elasticsearch/database_logic.py | 1 - 1 file changed, 1 deletion(-) diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py index 41ecf32e..e2348166 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py @@ -584,7 +584,6 @@ async def execute_search( items = (hit["_source"] for hit in hits) next_token = None - if matched > page * limit: if hits and (sort_array := hits[-1].get("sort")): next_token = urlsafe_b64encode( From 3c2b63ad77eb3980abf1a7655591a99efa765125 Mon Sep 17 00:00:00 2001 From: pedro-cf Date: Sun, 5 May 2024 18:33:15 +0100 Subject: [PATCH 04/21] wip --- stac_fastapi/core/stac_fastapi/core/core.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/stac_fastapi/core/stac_fastapi/core/core.py b/stac_fastapi/core/stac_fastapi/core/core.py index 12c42a03..950a4f6f 100644 --- a/stac_fastapi/core/stac_fastapi/core/core.py +++ b/stac_fastapi/core/stac_fastapi/core/core.py @@ -318,9 +318,7 @@ async def item_collection( if maybe_count is not None: context_obj["matched"] = maybe_count - links = [] - if next_token: - links = await PagingLinks(request=request, next=next_token).get_links() + links = await PagingLinks(request=request, next=next_token).get_links() return ItemCollection( type="FeatureCollection", From ce84ef6db889e01e5e549ad20773feb953737992 Mon Sep 17 00:00:00 2001 From: pedro-cf Date: Sun, 5 May 2024 18:45:14 +0100 Subject: [PATCH 05/21] fixed test_item_search_temporal_window_timezone_get --- stac_fastapi/tests/resources/test_item.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/stac_fastapi/tests/resources/test_item.py b/stac_fastapi/tests/resources/test_item.py index f84d9759..13f8cf71 100644 --- a/stac_fastapi/tests/resources/test_item.py +++ b/stac_fastapi/tests/resources/test_item.py @@ -493,11 +493,8 @@ async def test_item_search_temporal_window_timezone_get(app_client, ctx): } resp = await app_client.get("/search", params=params) resp_json = resp.json() - next_link = next(link for link in resp_json["links"] if link["rel"] == "next")[ - "href" - ] - resp = await app_client.get(next_link) assert resp.status_code == 200 + assert resp_json["features"][0]["id"] == test_item["id"] @pytest.mark.asyncio From 127f7f15c5eea7f5f682ab0994a13f662fade4dc Mon Sep 17 00:00:00 2001 From: pedro-cf Date: Sun, 5 May 2024 18:46:01 +0100 Subject: [PATCH 06/21] test_item_search_temporal_window_timezone_get fix --- stac_fastapi/tests/resources/test_item.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stac_fastapi/tests/resources/test_item.py b/stac_fastapi/tests/resources/test_item.py index 13f8cf71..e33fadff 100644 --- a/stac_fastapi/tests/resources/test_item.py +++ b/stac_fastapi/tests/resources/test_item.py @@ -492,8 +492,8 @@ async def test_item_search_temporal_window_timezone_get(app_client, ctx): "datetime": f"{datetime_to_str(item_date_before)}/{datetime_to_str(item_date_after)}", } resp = await app_client.get("/search", params=params) - resp_json = resp.json() assert resp.status_code == 200 + resp_json = resp.json() assert resp_json["features"][0]["id"] == test_item["id"] From 7eb505fe6c8869dfe106b3695b29323096566dda Mon Sep 17 00:00:00 2001 From: pedro-cf Date: Sun, 5 May 2024 18:55:21 +0100 Subject: [PATCH 07/21] test_pagination_item_collection fix: with the new fix we only do 6 requests for 6 items --- stac_fastapi/tests/resources/test_item.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/stac_fastapi/tests/resources/test_item.py b/stac_fastapi/tests/resources/test_item.py index e33fadff..ffd663a4 100644 --- a/stac_fastapi/tests/resources/test_item.py +++ b/stac_fastapi/tests/resources/test_item.py @@ -587,18 +587,17 @@ async def test_pagination_item_collection(app_client, ctx, txn_client): await create_item(txn_client, item=ctx.item) ids.append(ctx.item["id"]) - # Paginate through all 6 items with a limit of 1 (expecting 7 requests) + # Paginate through all 6 items with a limit of 1 (expecting 6 requests) page = await app_client.get( f"/collections/{ctx.item['collection']}/items", params={"limit": 1} ) item_ids = [] - idx = 0 - for idx in range(100): + for idx in range(1, 100): page_data = page.json() next_link = list(filter(lambda link: link["rel"] == "next", page_data["links"])) if not next_link: - assert not page_data["features"] + assert idx == 6 break assert len(page_data["features"]) == 1 From 08319b266bddaebcb8c80de3b410e02a18653b74 Mon Sep 17 00:00:00 2001 From: pedro-cf Date: Sun, 5 May 2024 18:57:02 +0100 Subject: [PATCH 08/21] test_pagination_post: same reason --- stac_fastapi/tests/resources/test_item.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/stac_fastapi/tests/resources/test_item.py b/stac_fastapi/tests/resources/test_item.py index ffd663a4..8730cf53 100644 --- a/stac_fastapi/tests/resources/test_item.py +++ b/stac_fastapi/tests/resources/test_item.py @@ -626,10 +626,8 @@ async def test_pagination_post(app_client, ctx, txn_client): # Paginate through all 5 items with a limit of 1 (expecting 5 requests) request_body = {"ids": ids, "limit": 1} page = await app_client.post("/search", json=request_body) - idx = 0 item_ids = [] - for _ in range(100): - idx += 1 + for idx in range(1, 100): page_data = page.json() next_link = list(filter(lambda link: link["rel"] == "next", page_data["links"])) if not next_link: @@ -642,7 +640,7 @@ async def test_pagination_post(app_client, ctx, txn_client): page = await app_client.post("/search", json=request_body) # Our limit is 1, so we expect len(ids) number of requests before we run out of pages - assert idx == len(ids) + 1 + assert idx == len(ids) # Confirm we have paginated through all items assert not set(item_ids) - set(ids) From 405e83d02e8754b9c098eac57c570af72f559abe Mon Sep 17 00:00:00 2001 From: pedro-cf Date: Sun, 5 May 2024 18:59:25 +0100 Subject: [PATCH 09/21] test_pagination_token_idempotent fix: bad indentation --- stac_fastapi/tests/resources/test_item.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/stac_fastapi/tests/resources/test_item.py b/stac_fastapi/tests/resources/test_item.py index 8730cf53..ce7e7e47 100644 --- a/stac_fastapi/tests/resources/test_item.py +++ b/stac_fastapi/tests/resources/test_item.py @@ -654,8 +654,8 @@ async def test_pagination_token_idempotent(app_client, ctx, txn_client): # Ingest 5 items for _ in range(5): ctx.item["id"] = str(uuid.uuid4()) - await create_item(txn_client, ctx.item) - ids.append(ctx.item["id"]) + await create_item(txn_client, ctx.item) + ids.append(ctx.item["id"]) page = await app_client.get("/search", params={"ids": ",".join(ids), "limit": 3}) page_data = page.json() From a1c06b942394d77654b10c60ef4807d53c023b3d Mon Sep 17 00:00:00 2001 From: pedro-cf Date: Sun, 5 May 2024 19:05:26 +0100 Subject: [PATCH 10/21] changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a7759f53..b42c2629 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Fixed +- Fixed issue where paginated search queries would return a `next_token` on the last page [243](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/243) - Fixed issue where searches return an empty `links` array [241](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/241) ## [v2.4.0] From a27643cec0219d71337636d6e996178dc24d675a Mon Sep 17 00:00:00 2001 From: pedro-cf Date: Sun, 5 May 2024 19:14:53 +0100 Subject: [PATCH 11/21] opensearch --- .../stac_fastapi/opensearch/database_logic.py | 34 +++++++------------ 1 file changed, 13 insertions(+), 21 deletions(-) diff --git a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py index 95129f27..48e0b0b2 100644 --- a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py +++ b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py @@ -582,9 +582,16 @@ async def execute_search( query = search.query.to_dict() if search.query else None if query: search_body["query"] = query + + search_after = None + page = 1 if token: - search_after = urlsafe_b64decode(token.encode()).decode().split(",") - search_body["search_after"] = search_after + decoded_token = urlsafe_b64decode(token.encode()).decode().split(",") + search_after = decoded_token[:-1] + page = int(decoded_token[-1]) + 1 + if search_after: + search_body["search_after"] = search_after + [page] + search_body["sort"] = sort if sort else DEFAULT_SORT index_param = indices(collection_ids) @@ -598,14 +605,6 @@ async def execute_search( ) ) - count_task = asyncio.create_task( - self.client.count( - index=index_param, - ignore_unavailable=ignore_unavailable, - body=search.to_dict(count=True), - ) - ) - try: es_response = await search_task except exceptions.NotFoundError: @@ -614,22 +613,15 @@ async def execute_search( hits = es_response["hits"]["hits"] items = (hit["_source"] for hit in hits) + matched = es_response["hits"]["total"]["value"] + next_token = None if hits and (sort_array := hits[-1].get("sort")): next_token = urlsafe_b64encode( - ",".join([str(x) for x in sort_array]).encode() + ",".join([str(x) for x in sort_array] + [str(page)]).encode() ).decode() - # (1) count should not block returning results, so don't wait for it to be done - # (2) don't cancel the task so that it will populate the ES cache for subsequent counts - maybe_count = None - if count_task.done(): - try: - maybe_count = count_task.result().get("count") - except Exception as e: - logger.error(f"Count task failed: {e}") - - return items, maybe_count, next_token + return items, matched, next_token """ TRANSACTION LOGIC """ From bcf7ee71cc0be71d371a6860d82c3159199b3e57 Mon Sep 17 00:00:00 2001 From: pedro-cf Date: Sun, 5 May 2024 19:17:43 +0100 Subject: [PATCH 12/21] pre-commit run --all-files --- .../opensearch/stac_fastapi/opensearch/database_logic.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py index 48e0b0b2..1a14f222 100644 --- a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py +++ b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py @@ -582,7 +582,7 @@ async def execute_search( query = search.query.to_dict() if search.query else None if query: search_body["query"] = query - + search_after = None page = 1 if token: @@ -591,7 +591,7 @@ async def execute_search( page = int(decoded_token[-1]) + 1 if search_after: search_body["search_after"] = search_after + [page] - + search_body["sort"] = sort if sort else DEFAULT_SORT index_param = indices(collection_ids) From 87e6c951847add656ebce42387f59ae6cf291746 Mon Sep 17 00:00:00 2001 From: pedro-cf Date: Sun, 5 May 2024 19:29:08 +0100 Subject: [PATCH 13/21] opensearch fixes --- .../stac_fastapi/opensearch/database_logic.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py index 1a14f222..979ea0d6 100644 --- a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py +++ b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py @@ -590,7 +590,7 @@ async def execute_search( search_after = decoded_token[:-1] page = int(decoded_token[-1]) + 1 if search_after: - search_body["search_after"] = search_after + [page] + search_body["search_after"] = search_after search_body["sort"] = sort if sort else DEFAULT_SORT @@ -612,14 +612,14 @@ async def execute_search( hits = es_response["hits"]["hits"] items = (hit["_source"] for hit in hits) - matched = es_response["hits"]["total"]["value"] next_token = None - if hits and (sort_array := hits[-1].get("sort")): - next_token = urlsafe_b64encode( - ",".join([str(x) for x in sort_array] + [str(page)]).encode() - ).decode() + if matched > page * limit: + if hits and (sort_array := hits[-1].get("sort")): + next_token = urlsafe_b64encode( + ",".join([str(x) for x in sort_array] + [str(page)]).encode() + ).decode() return items, matched, next_token From 583b959ce4a3a6d647407f673c0f42be10ab6978 Mon Sep 17 00:00:00 2001 From: pedro-cf Date: Mon, 6 May 2024 12:56:04 +0100 Subject: [PATCH 14/21] account for > 10,000 hits --- .../elasticsearch/database_logic.py | 19 ++++++++++++++++++- .../stac_fastapi/opensearch/database_logic.py | 17 +++++++++++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py index e2348166..559a514c 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py @@ -574,15 +574,32 @@ async def execute_search( ) ) + count_task = asyncio.create_task( + self.client.count( + index=index_param, + ignore_unavailable=ignore_unavailable, + body=search.to_dict(count=True), + ) + ) + try: es_response = await search_task except exceptions.NotFoundError: raise NotFoundError(f"Collections '{collection_ids}' do not exist") - matched = es_response["hits"]["total"]["value"] hits = es_response["hits"]["hits"] items = (hit["_source"] for hit in hits) + matched = es_response["hits"]["total"]["value"] + if es_response["hits"]["total"]["relation"] != "eq": + if count_task.done(): + try: + matched = count_task.result().get("count") + except Exception as e: + logger.error(f"Count task failed: {e}") + else: + count_task.cancel() + next_token = None if matched > page * limit: if hits and (sort_array := hits[-1].get("sort")): diff --git a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py index 979ea0d6..793c4809 100644 --- a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py +++ b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py @@ -605,6 +605,14 @@ async def execute_search( ) ) + count_task = asyncio.create_task( + self.client.count( + index=index_param, + ignore_unavailable=ignore_unavailable, + body=search.to_dict(count=True), + ) + ) + try: es_response = await search_task except exceptions.NotFoundError: @@ -612,7 +620,16 @@ async def execute_search( hits = es_response["hits"]["hits"] items = (hit["_source"] for hit in hits) + matched = es_response["hits"]["total"]["value"] + if es_response["hits"]["total"]["relation"] != "eq": + if count_task.done(): + try: + matched = count_task.result().get("count") + except Exception as e: + logger.error(f"Count task failed: {e}") + else: + count_task.cancel() next_token = None if matched > page * limit: From cf97df7bf2e3357443dc0cda5fc336c60675a716 Mon Sep 17 00:00:00 2001 From: pedro-cf Date: Mon, 6 May 2024 19:56:34 +0100 Subject: [PATCH 15/21] . --- .../elasticsearch/database_logic.py | 32 ++++++++----------- 1 file changed, 13 insertions(+), 19 deletions(-) diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py index 559a514c..cbf55c90 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py @@ -552,12 +552,9 @@ async def execute_search( NotFoundError: If the collections specified in `collection_ids` do not exist. """ search_after = None - page = 1 if token: - decoded_token = urlsafe_b64decode(token.encode()).decode().split(",") - search_after = decoded_token[:-1] - page = int(decoded_token[-1]) + 1 + search_after = urlsafe_b64decode(token.encode()).decode().split(",") query = search.query.to_dict() if search.query else None @@ -570,7 +567,7 @@ async def execute_search( query=query, sort=sort or DEFAULT_SORT, search_after=search_after, - size=limit, + size=limit + 1, # Fetch one more result than the limit ) ) @@ -588,25 +585,22 @@ async def execute_search( raise NotFoundError(f"Collections '{collection_ids}' do not exist") hits = es_response["hits"]["hits"] - items = (hit["_source"] for hit in hits) - - matched = es_response["hits"]["total"]["value"] - if es_response["hits"]["total"]["relation"] != "eq": - if count_task.done(): - try: - matched = count_task.result().get("count") - except Exception as e: - logger.error(f"Count task failed: {e}") - else: - count_task.cancel() + items = (hit["_source"] for hit in hits[:limit]) next_token = None - if matched > page * limit: - if hits and (sort_array := hits[-1].get("sort")): + if len(hits) > limit: + if hits and (sort_array := hits[limit - 1].get("sort")): next_token = urlsafe_b64encode( - ",".join([str(x) for x in sort_array] + [str(page)]).encode() + ",".join([str(x) for x in sort_array]).encode() ).decode() + matched = None + if count_task.done(): + try: + matched = count_task.result().get("count") + except Exception as e: + logger.error(f"Count task failed: {e}") + return items, matched, next_token """ TRANSACTION LOGIC """ From 31513aba21d6192b1499dc1a68b5e3f213b68ac7 Mon Sep 17 00:00:00 2001 From: pedro-cf Date: Tue, 7 May 2024 00:25:51 +0100 Subject: [PATCH 16/21] . --- .../elasticsearch/database_logic.py | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py index cbf55c90..a05ba44f 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py @@ -560,6 +560,21 @@ async def execute_search( index_param = indices(collection_ids) + print(">>>>>>>>>>>>>>>>>>>>>>>>>>") + index_settings = await self.client.indices.get_settings(index=index_param) + max_result_window = int( + index_settings[index_param]["settings"]["index"]["max_result_window"][ + "max_result_window" + ] + ) + print(max_result_window) + print(index_settings) + print(">>>>>>>>>>>>>>>>>>>>>>>>>>") + + max_result_window = 10000 + + size_limit = min(limit + 1, max_result_window) + search_task = asyncio.create_task( self.client.search( index=index_param, @@ -567,7 +582,7 @@ async def execute_search( query=query, sort=sort or DEFAULT_SORT, search_after=search_after, - size=limit + 1, # Fetch one more result than the limit + size=size_limit, ) ) @@ -588,7 +603,7 @@ async def execute_search( items = (hit["_source"] for hit in hits[:limit]) next_token = None - if len(hits) > limit: + if len(hits) > limit and limit < max_result_window: if hits and (sort_array := hits[limit - 1].get("sort")): next_token = urlsafe_b64encode( ",".join([str(x) for x in sort_array]).encode() From 1df9d2ed6cda8b73f46d08e1003636e843e2e7b4 Mon Sep 17 00:00:00 2001 From: pedro-cf Date: Tue, 7 May 2024 00:33:21 +0100 Subject: [PATCH 17/21] +1 strategy w/ edge issue handling --- .../elasticsearch/database_logic.py | 11 ------ .../stac_fastapi/opensearch/database_logic.py | 37 +++++++++---------- 2 files changed, 18 insertions(+), 30 deletions(-) diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py index a05ba44f..6b9c3ea6 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py @@ -560,17 +560,6 @@ async def execute_search( index_param = indices(collection_ids) - print(">>>>>>>>>>>>>>>>>>>>>>>>>>") - index_settings = await self.client.indices.get_settings(index=index_param) - max_result_window = int( - index_settings[index_param]["settings"]["index"]["max_result_window"][ - "max_result_window" - ] - ) - print(max_result_window) - print(index_settings) - print(">>>>>>>>>>>>>>>>>>>>>>>>>>") - max_result_window = 10000 size_limit = min(limit + 1, max_result_window) diff --git a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py index 793c4809..331aaa1a 100644 --- a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py +++ b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py @@ -584,11 +584,9 @@ async def execute_search( search_body["query"] = query search_after = None - page = 1 + if token: - decoded_token = urlsafe_b64decode(token.encode()).decode().split(",") - search_after = decoded_token[:-1] - page = int(decoded_token[-1]) + 1 + search_after = urlsafe_b64decode(token.encode()).decode().split(",") if search_after: search_body["search_after"] = search_after @@ -596,12 +594,16 @@ async def execute_search( index_param = indices(collection_ids) + max_result_window = 10000 + + size_limit = min(limit + 1, max_result_window) + search_task = asyncio.create_task( self.client.search( index=index_param, ignore_unavailable=ignore_unavailable, body=search_body, - size=limit, + size=size_limit, ) ) @@ -619,25 +621,22 @@ async def execute_search( raise NotFoundError(f"Collections '{collection_ids}' do not exist") hits = es_response["hits"]["hits"] - items = (hit["_source"] for hit in hits) - - matched = es_response["hits"]["total"]["value"] - if es_response["hits"]["total"]["relation"] != "eq": - if count_task.done(): - try: - matched = count_task.result().get("count") - except Exception as e: - logger.error(f"Count task failed: {e}") - else: - count_task.cancel() + items = (hit["_source"] for hit in hits[:limit]) next_token = None - if matched > page * limit: - if hits and (sort_array := hits[-1].get("sort")): + if len(hits) > limit and limit < max_result_window: + if hits and (sort_array := hits[limit - 1].get("sort")): next_token = urlsafe_b64encode( - ",".join([str(x) for x in sort_array] + [str(page)]).encode() + ",".join([str(x) for x in sort_array]).encode() ).decode() + matched = None + if count_task.done(): + try: + matched = count_task.result().get("count") + except Exception as e: + logger.error(f"Count task failed: {e}") + return items, matched, next_token """ TRANSACTION LOGIC """ From 751319427c4568240ba4797c1905a366a7d53653 Mon Sep 17 00:00:00 2001 From: pedro-cf Date: Tue, 7 May 2024 09:57:25 +0100 Subject: [PATCH 18/21] es_response["hits"]["total"]["value"] --- .../elasticsearch/stac_fastapi/elasticsearch/database_logic.py | 2 +- .../opensearch/stac_fastapi/opensearch/database_logic.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py index 6b9c3ea6..76853f47 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py @@ -598,7 +598,7 @@ async def execute_search( ",".join([str(x) for x in sort_array]).encode() ).decode() - matched = None + matched = es_response["hits"]["total"]["value"] if count_task.done(): try: matched = count_task.result().get("count") diff --git a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py index 331aaa1a..b1e3e8f7 100644 --- a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py +++ b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py @@ -630,7 +630,7 @@ async def execute_search( ",".join([str(x) for x in sort_array]).encode() ).decode() - matched = None + matched = es_response["hits"]["total"]["value"] if count_task.done(): try: matched = count_task.result().get("count") From 3e3aa21e2ecab45c5aea24d3719c9ce03213b1ff Mon Sep 17 00:00:00 2001 From: pedro-cf Date: Tue, 7 May 2024 15:42:02 +0100 Subject: [PATCH 19/21] max_result_window from stac-fastapi --- .../stac_fastapi/elasticsearch/database_logic.py | 5 ++++- .../opensearch/stac_fastapi/opensearch/database_logic.py | 3 ++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py index 76853f47..163efc73 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py @@ -8,6 +8,7 @@ import attr from elasticsearch_dsl import Q, Search +import stac_fastapi.types.search from elasticsearch import exceptions, helpers # type: ignore from stac_fastapi.core.extensions import filter from stac_fastapi.core.serializers import CollectionSerializer, ItemSerializer @@ -560,7 +561,9 @@ async def execute_search( index_param = indices(collection_ids) - max_result_window = 10000 + max_result_window = stac_fastapi.types.search.Limit.le + + print(max_result_window) size_limit = min(limit + 1, max_result_window) diff --git a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py index b1e3e8f7..cdbf7438 100644 --- a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py +++ b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py @@ -11,6 +11,7 @@ from opensearchpy.helpers.query import Q from opensearchpy.helpers.search import Search +import stac_fastapi.types.search from stac_fastapi.core import serializers from stac_fastapi.core.extensions import filter from stac_fastapi.core.utilities import bbox2polygon @@ -594,7 +595,7 @@ async def execute_search( index_param = indices(collection_ids) - max_result_window = 10000 + max_result_window = stac_fastapi.types.search.Limit.le size_limit = min(limit + 1, max_result_window) From ab19d8c13edb21d8d637e97036355d5cdde866d8 Mon Sep 17 00:00:00 2001 From: pedro-cf Date: Tue, 7 May 2024 15:54:47 +0100 Subject: [PATCH 20/21] oops --- .../elasticsearch/stac_fastapi/elasticsearch/database_logic.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py index 163efc73..0ae2a920 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py @@ -563,8 +563,6 @@ async def execute_search( max_result_window = stac_fastapi.types.search.Limit.le - print(max_result_window) - size_limit = min(limit + 1, max_result_window) search_task = asyncio.create_task( From 6ea11794bafefdabc44a10913ce296ac25474b80 Mon Sep 17 00:00:00 2001 From: pedro-cf Date: Wed, 8 May 2024 19:01:30 +0100 Subject: [PATCH 21/21] matched default --- .../stac_fastapi/elasticsearch/database_logic.py | 6 +++++- .../opensearch/stac_fastapi/opensearch/database_logic.py | 6 +++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py index 0ae2a920..3ec81c99 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py @@ -599,7 +599,11 @@ async def execute_search( ",".join([str(x) for x in sort_array]).encode() ).decode() - matched = es_response["hits"]["total"]["value"] + matched = ( + es_response["hits"]["total"]["value"] + if es_response["hits"]["total"]["relation"] == "eq" + else None + ) if count_task.done(): try: matched = count_task.result().get("count") diff --git a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py index cdbf7438..3775ead3 100644 --- a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py +++ b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py @@ -631,7 +631,11 @@ async def execute_search( ",".join([str(x) for x in sort_array]).encode() ).decode() - matched = es_response["hits"]["total"]["value"] + matched = ( + es_response["hits"]["total"]["value"] + if es_response["hits"]["total"]["relation"] == "eq" + else None + ) if count_task.done(): try: matched = count_task.result().get("count")