Skip to content

Commit a569587

Browse files
Zaczerojonhealy1
andauthored
Add support for optional enum queryables (#390)
**Description:** This enables support for generating "enum" fields for selected queryables. There is also now a concept of optional queryable parameters - so it's possible to indicate enum fields without requiring that field to be present in all collections. This enum generation is very efficient as it's basically an index-only scan. ``` "platform": { "description": "Satellite platform identifier", "title": "Platform", "type": "string", "enum": [ "sentinel-2a" ] }, ``` Other small changes include adding the previously missing get_items_mapping abstract to the base database logic. I also got confused by the existence of "OS_HOST" and "OS_PORT" variables in the Makefile which appear to be unused, so I simply removed them too. **PR Checklist:** - [x] Code is formatted and linted (run `pre-commit run --all-files`) - [x] Tests pass (run `make test`) - [ ] Documentation has been updated to reflect changes, if applicable - [x] Changes are added to the changelog --------- Co-authored-by: Jonathan Healy <jonathan.d.healy@gmail.com>
1 parent 69590cd commit a569587

File tree

11 files changed

+182
-25
lines changed

11 files changed

+182
-25
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
88

99
## [Unreleased]
1010

11+
### Added
12+
13+
- Added support for enum queryables [#390](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/390)
14+
1115
### Changed
1216

1317
- Optimize data_loader.py script [#395](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/395)

Makefile

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,11 @@ APP_HOST ?= 0.0.0.0
33
EXTERNAL_APP_PORT ?= 8080
44

55
ES_APP_PORT ?= 8080
6+
OS_APP_PORT ?= 8082
7+
68
ES_HOST ?= docker.for.mac.localhost
79
ES_PORT ?= 9200
810

9-
OS_APP_PORT ?= 8082
10-
OS_HOST ?= docker.for.mac.localhost
11-
OS_PORT ?= 9202
12-
1311
run_es = docker compose \
1412
run \
1513
-p ${EXTERNAL_APP_PORT}:${ES_APP_PORT} \

stac_fastapi/core/stac_fastapi/core/base_database_logic.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
"""Base database logic."""
22

33
import abc
4-
from typing import Any, Dict, Iterable, Optional
4+
from typing import Any, Dict, Iterable, List, Optional
55

66

77
class BaseDatabaseLogic(abc.ABC):
@@ -36,6 +36,18 @@ async def delete_item(
3636
"""Delete an item from the database."""
3737
pass
3838

39+
@abc.abstractmethod
40+
async def get_items_mapping(self, collection_id: str) -> Dict[str, Dict[str, Any]]:
41+
"""Get the mapping for the items in the collection."""
42+
pass
43+
44+
@abc.abstractmethod
45+
async def get_items_unique_values(
46+
self, collection_id: str, field_names: Iterable[str], *, limit: int = ...
47+
) -> Dict[str, List[str]]:
48+
"""Get the unique values for the given fields in the collection."""
49+
pass
50+
3951
@abc.abstractmethod
4052
async def create_collection(self, collection: Dict, refresh: bool = False) -> None:
4153
"""Create a collection in the database."""

stac_fastapi/core/stac_fastapi/core/extensions/filter.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,17 @@
6060
"maximum": 100,
6161
},
6262
}
63+
"""Queryables that are present in all collections."""
64+
65+
OPTIONAL_QUERYABLES: Dict[str, Dict[str, Any]] = {
66+
"platform": {
67+
"$enum": True,
68+
"description": "Satellite platform identifier",
69+
},
70+
}
71+
"""Queryables that are present in some collections."""
72+
73+
ALL_QUERYABLES: Dict[str, Dict[str, Any]] = DEFAULT_QUERYABLES | OPTIONAL_QUERYABLES
6374

6475

6576
class LogicalOp(str, Enum):

stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/app.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
TokenPaginationExtension,
3838
TransactionExtension,
3939
)
40+
from stac_fastapi.extensions.core.filter import FilterConformanceClasses
4041
from stac_fastapi.extensions.third_party import BulkTransactionExtension
4142
from stac_fastapi.sfeos_helpers.aggregation import EsAsyncBaseAggregationClient
4243
from stac_fastapi.sfeos_helpers.filter import EsAsyncBaseFiltersClient
@@ -56,7 +57,7 @@
5657
client=EsAsyncBaseFiltersClient(database=database_logic)
5758
)
5859
filter_extension.conformance_classes.append(
59-
"http://www.opengis.net/spec/cql2/1.0/conf/advanced-comparison-operators"
60+
FilterConformanceClasses.ADVANCED_COMPARISON_OPERATORS
6061
)
6162

6263
aggregation_extension = AggregationExtension(

stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -895,6 +895,37 @@ async def get_items_mapping(self, collection_id: str) -> Dict[str, Any]:
895895
except ESNotFoundError:
896896
raise NotFoundError(f"Mapping for index {index_name} not found")
897897

898+
async def get_items_unique_values(
899+
self, collection_id: str, field_names: Iterable[str], *, limit: int = 100
900+
) -> Dict[str, List[str]]:
901+
"""Get the unique values for the given fields in the collection."""
902+
limit_plus_one = limit + 1
903+
index_name = index_alias_by_collection_id(collection_id)
904+
905+
query = await self.client.search(
906+
index=index_name,
907+
body={
908+
"size": 0,
909+
"aggs": {
910+
field: {"terms": {"field": field, "size": limit_plus_one}}
911+
for field in field_names
912+
},
913+
},
914+
)
915+
916+
result: Dict[str, List[str]] = {}
917+
for field, agg in query["aggregations"].items():
918+
if len(agg["buckets"]) > limit:
919+
logger.warning(
920+
"Skipping enum field %s: exceeds limit of %d unique values. "
921+
"Consider excluding this field from enumeration or increase the limit.",
922+
field,
923+
limit,
924+
)
925+
continue
926+
result[field] = [bucket["key"] for bucket in agg["buckets"]]
927+
return result
928+
898929
async def create_collection(self, collection: Collection, **kwargs: Any):
899930
"""Create a single collection in the database.
900931

stac_fastapi/opensearch/stac_fastapi/opensearch/app.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
TokenPaginationExtension,
3232
TransactionExtension,
3333
)
34+
from stac_fastapi.extensions.core.filter import FilterConformanceClasses
3435
from stac_fastapi.extensions.third_party import BulkTransactionExtension
3536
from stac_fastapi.opensearch.config import OpensearchSettings
3637
from stac_fastapi.opensearch.database_logic import (
@@ -56,7 +57,7 @@
5657
client=EsAsyncBaseFiltersClient(database=database_logic)
5758
)
5859
filter_extension.conformance_classes.append(
59-
"http://www.opengis.net/spec/cql2/1.0/conf/advanced-comparison-operators"
60+
FilterConformanceClasses.ADVANCED_COMPARISON_OPERATORS
6061
)
6162

6263
aggregation_extension = AggregationExtension(

stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -904,6 +904,37 @@ async def get_items_mapping(self, collection_id: str) -> Dict[str, Any]:
904904
except exceptions.NotFoundError:
905905
raise NotFoundError(f"Mapping for index {index_name} not found")
906906

907+
async def get_items_unique_values(
908+
self, collection_id: str, field_names: Iterable[str], *, limit: int = 100
909+
) -> Dict[str, List[str]]:
910+
"""Get the unique values for the given fields in the collection."""
911+
limit_plus_one = limit + 1
912+
index_name = index_alias_by_collection_id(collection_id)
913+
914+
query = await self.client.search(
915+
index=index_name,
916+
body={
917+
"size": 0,
918+
"aggs": {
919+
field: {"terms": {"field": field, "size": limit_plus_one}}
920+
for field in field_names
921+
},
922+
},
923+
)
924+
925+
result: Dict[str, List[str]] = {}
926+
for field, agg in query["aggregations"].items():
927+
if len(agg["buckets"]) > limit:
928+
logger.warning(
929+
"Skipping enum field %s: exceeds limit of %d unique values. "
930+
"Consider excluding this field from enumeration or increase the limit.",
931+
field,
932+
limit,
933+
)
934+
continue
935+
result[field] = [bucket["key"] for bucket in agg["buckets"]]
936+
return result
937+
907938
async def create_collection(self, collection: Collection, **kwargs: Any):
908939
"""Create a single collection in the database.
909940

stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/filter/client.py

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
"""Filter client implementation for Elasticsearch/OpenSearch."""
22

33
from collections import deque
4-
from typing import Any, Dict, Optional
4+
from typing import Any, Dict, Optional, Tuple
55

66
import attr
77

88
from stac_fastapi.core.base_database_logic import BaseDatabaseLogic
9-
from stac_fastapi.core.extensions.filter import DEFAULT_QUERYABLES
9+
from stac_fastapi.core.extensions.filter import ALL_QUERYABLES, DEFAULT_QUERYABLES
1010
from stac_fastapi.extensions.core.filter.client import AsyncBaseFiltersClient
1111
from stac_fastapi.sfeos_helpers.mappings import ES_MAPPING_TYPE_TO_JSON
1212

@@ -59,31 +59,31 @@ async def get_queryables(
5959

6060
mapping_data = await self.database.get_items_mapping(collection_id)
6161
mapping_properties = next(iter(mapping_data.values()))["mappings"]["properties"]
62-
stack = deque(mapping_properties.items())
62+
stack: deque[Tuple[str, Dict[str, Any]]] = deque(mapping_properties.items())
63+
enum_fields: Dict[str, Dict[str, Any]] = {}
6364

6465
while stack:
65-
field_name, field_def = stack.popleft()
66+
field_fqn, field_def = stack.popleft()
6667

6768
# Iterate over nested fields
6869
field_properties = field_def.get("properties")
6970
if field_properties:
70-
# Fields in Item Properties should be exposed with their un-prefixed names,
71-
# and not require expressions to prefix them with properties,
72-
# e.g., eo:cloud_cover instead of properties.eo:cloud_cover.
73-
if field_name == "properties":
74-
stack.extend(field_properties.items())
75-
else:
76-
stack.extend(
77-
(f"{field_name}.{k}", v) for k, v in field_properties.items()
78-
)
71+
stack.extend(
72+
(f"{field_fqn}.{k}", v) for k, v in field_properties.items()
73+
)
7974

8075
# Skip non-indexed or disabled fields
8176
field_type = field_def.get("type")
8277
if not field_type or not field_def.get("enabled", True):
8378
continue
8479

80+
# Fields in Item Properties should be exposed with their un-prefixed names,
81+
# and not require expressions to prefix them with properties,
82+
# e.g., eo:cloud_cover instead of properties.eo:cloud_cover.
83+
field_name = field_fqn.removeprefix("properties.")
84+
8585
# Generate field properties
86-
field_result = DEFAULT_QUERYABLES.get(field_name, {})
86+
field_result = ALL_QUERYABLES.get(field_name, {})
8787
properties[field_name] = field_result
8888

8989
field_name_human = field_name.replace("_", " ").title()
@@ -95,4 +95,13 @@ async def get_queryables(
9595
if field_type in {"date", "date_nanos"}:
9696
field_result.setdefault("format", "date-time")
9797

98+
if field_result.pop("$enum", False):
99+
enum_fields[field_fqn] = field_result
100+
101+
if enum_fields:
102+
for field_fqn, unique_values in (
103+
await self.database.get_items_unique_values(collection_id, enum_fields)
104+
).items():
105+
enum_fields[field_fqn]["enum"] = unique_values
106+
98107
return queryables

stac_fastapi/tests/conftest.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@
2727
from stac_fastapi.core.rate_limit import setup_rate_limit
2828
from stac_fastapi.core.route_dependencies import get_route_dependencies
2929
from stac_fastapi.core.utilities import get_bool_env
30+
from stac_fastapi.extensions.core.filter import FilterConformanceClasses
3031
from stac_fastapi.sfeos_helpers.aggregation import EsAsyncBaseAggregationClient
32+
from stac_fastapi.sfeos_helpers.filter import EsAsyncBaseFiltersClient
3133

3234
if os.getenv("BACKEND", "elasticsearch").lower() == "opensearch":
3335
from stac_fastapi.opensearch.config import AsyncOpensearchSettings as AsyncSettings
@@ -39,9 +41,11 @@
3941
)
4042
else:
4143
from stac_fastapi.elasticsearch.config import (
42-
ElasticsearchSettings as SearchSettings,
4344
AsyncElasticsearchSettings as AsyncSettings,
4445
)
46+
from stac_fastapi.elasticsearch.config import (
47+
ElasticsearchSettings as SearchSettings,
48+
)
4549
from stac_fastapi.elasticsearch.database_logic import (
4650
DatabaseLogic,
4751
create_collection_index,
@@ -198,6 +202,13 @@ def bulk_txn_client():
198202
async def app():
199203
settings = AsyncSettings()
200204

205+
filter_extension = FilterExtension(
206+
client=EsAsyncBaseFiltersClient(database=database)
207+
)
208+
filter_extension.conformance_classes.append(
209+
FilterConformanceClasses.ADVANCED_COMPARISON_OPERATORS
210+
)
211+
201212
aggregation_extension = AggregationExtension(
202213
client=EsAsyncBaseAggregationClient(
203214
database=database, session=None, settings=settings
@@ -217,7 +228,7 @@ async def app():
217228
FieldsExtension(),
218229
QueryExtension(),
219230
TokenPaginationExtension(),
220-
FilterExtension(),
231+
filter_extension,
221232
FreeTextExtension(),
222233
]
223234

@@ -313,7 +324,6 @@ async def app_client_rate_limit(app_rate_limit):
313324

314325
@pytest_asyncio.fixture(scope="session")
315326
async def app_basic_auth():
316-
317327
stac_fastapi_route_dependencies = """[
318328
{
319329
"routes":[{"method":"*","path":"*"}],

stac_fastapi/tests/extensions/test_filter.py

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
import json
22
import logging
33
import os
4+
import uuid
45
from os import listdir
56
from os.path import isfile, join
7+
from typing import Callable, Dict
68

79
import pytest
10+
from httpx import AsyncClient
811

912
THIS_DIR = os.path.dirname(os.path.abspath(__file__))
1013

@@ -40,7 +43,6 @@ async def test_filter_extension_collection_link(app_client, load_test_data):
4043

4144
@pytest.mark.asyncio
4245
async def test_search_filters_post(app_client, ctx):
43-
4446
filters = []
4547
pwd = f"{THIS_DIR}/cql2"
4648
for fn in [fn for f in listdir(pwd) if isfile(fn := join(pwd, f))]:
@@ -625,3 +627,50 @@ async def test_search_filter_extension_cql2text_s_disjoint_property(app_client,
625627
assert resp.status_code == 200
626628
resp_json = resp.json()
627629
assert len(resp_json["features"]) == 1
630+
631+
632+
@pytest.mark.asyncio
633+
async def test_queryables_enum_platform(
634+
app_client: AsyncClient,
635+
load_test_data: Callable[[str], Dict],
636+
monkeypatch: pytest.MonkeyPatch,
637+
):
638+
# Arrange
639+
# Enforce instant database refresh
640+
# TODO: Is there a better way to do this?
641+
monkeypatch.setenv("DATABASE_REFRESH", "true")
642+
643+
# Create collection
644+
collection_data = load_test_data("test_collection.json")
645+
collection_id = collection_data["id"] = f"enum-test-collection-{uuid.uuid4()}"
646+
r = await app_client.post("/collections", json=collection_data)
647+
r.raise_for_status()
648+
649+
# Create items with different platform values
650+
NUM_ITEMS = 3
651+
for i in range(1, NUM_ITEMS + 1):
652+
item_data = load_test_data("test_item.json")
653+
item_data["id"] = f"enum-test-item-{i}"
654+
item_data["collection"] = collection_id
655+
item_data["properties"]["platform"] = "landsat-8" if i % 2 else "sentinel-2"
656+
r = await app_client.post(f"/collections/{collection_id}/items", json=item_data)
657+
r.raise_for_status()
658+
659+
# Act
660+
# Test queryables endpoint
661+
queryables = (
662+
(await app_client.get(f"/collections/{collection_data['id']}/queryables"))
663+
.raise_for_status()
664+
.json()
665+
)
666+
667+
# Assert
668+
# Verify distinct values (should only have 2 unique values despite 3 items)
669+
properties = queryables["properties"]
670+
platform_info = properties["platform"]
671+
platform_values = platform_info["enum"]
672+
assert set(platform_values) == {"landsat-8", "sentinel-2"}
673+
674+
# Clean up
675+
r = await app_client.delete(f"/collections/{collection_id}")
676+
r.raise_for_status()

0 commit comments

Comments
 (0)