diff --git a/.github/workflows/cicd.yml b/.github/workflows/cicd.yml index 864b52e3..abf6ebfa 100644 --- a/.github/workflows/cicd.yml +++ b/.github/workflows/cicd.yml @@ -96,6 +96,10 @@ jobs: run: | pip install ./stac_fastapi/core + - name: Install helpers library stac-fastapi + run: | + pip install ./stac_fastapi/sfeos_helpers + - name: Install elasticsearch stac-fastapi run: | pip install ./stac_fastapi/elasticsearch[dev,server] diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index eb84e7fc..8ed81ce6 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -35,6 +35,18 @@ jobs: # Publish to PyPI twine upload dist/* + - name: Build and publish sfeos_helpers + working-directory: stac_fastapi/sfeos_helpers + env: + TWINE_USERNAME: "__token__" + TWINE_PASSWORD: ${{ secrets.PYPI_API_TOKEN }} + run: | + # Build package + python setup.py sdist bdist_wheel + + # Publish to PyPI + twine upload dist/* + - name: Build and publish stac-fastapi-elasticsearch working-directory: stac_fastapi/elasticsearch env: diff --git a/CHANGELOG.md b/CHANGELOG.md index ea4b5993..7388cc78 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,24 +5,33 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). + ## [Unreleased] +## [v5.0.0a0] - 2025-05-29 + ### Added +- Created new `sfeos_helpers` package to improve code organization and maintainability [#376](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/376) - Added introduction section - What is stac-fastapi-elasticsearch-opensearch? - to README [#384](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/384) ### Changed +- Refactored utility functions into dedicated modules within `sfeos_helpers` [#376](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/376): + - Created `database` package with specialized modules for index, document, and utility operations + - Created `aggregation` package for Elasticsearch/OpenSearch-specific aggregation functionality + - Moved shared logic from core module to helper functions for better code reuse + - Separated utility functions from constant mappings for clearer code organization +- Updated documentation to reflect recent code refactoring [#376](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/376) - Improved README documentation with consistent formatting and enhanced sections [#381](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/381): - Added sfeos logo and banner - Added a comprehensive Quick Start guide - Reorganized sections for better navigation - Reformatted content with bullet points for improved readability - Added more detailed examples for API interaction - + ### Fixed - ## [v4.2.0] - 2025-05-15 ### Added @@ -397,7 +406,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. - Use genexp in execute_search and get_all_collections to return results. - Added db_to_stac serializer to item_collection method in core.py. -[Unreleased]: https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/compare/v4.2.0...main +[Unreleased]: https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/compare/v5.0.0a0...main +[v5.0.0a0]: https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/compare/v4.2.0...v5.0.0a0 [v4.2.0]: https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/compare/v4.1.0...v4.2.0 [v4.1.0]: https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/compare/v4.0.0...v4.1.0 [v4.0.0]: https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/compare/v3.2.5...v4.0.0 diff --git a/Makefile b/Makefile index 3440b7a2..5896e734 100644 --- a/Makefile +++ b/Makefile @@ -95,7 +95,8 @@ pybase-install: pip install -e ./stac_fastapi/api[dev] && \ pip install -e ./stac_fastapi/types[dev] && \ pip install -e ./stac_fastapi/extensions[dev] && \ - pip install -e ./stac_fastapi/core + pip install -e ./stac_fastapi/core && \ + pip install -e ./stac_fastapi/sfeos_helpers .PHONY: install-es install-es: pybase-install diff --git a/README.md b/README.md index 807a02bb..11619f86 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,6 @@ -

@@ -68,6 +67,7 @@ This project is built on the following technologies: STAC, stac-fastapi, FastAPI ## Table of Contents - [Documentation & Resources](#documentation--resources) +- [Package Structure](#package-structure) - [Examples](#examples) - [Performance](#performance) - [Quick Start](#quick-start) @@ -95,6 +95,21 @@ This project is built on the following technologies: STAC, stac-fastapi, FastAPI - [Gitter Chat](https://app.gitter.im/#/room/#stac-fastapi-elasticsearch_community:gitter.im) - For real-time discussions - [GitHub Discussions](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/discussions) - For longer-form questions and answers +## Package Structure + +This project is organized into several packages, each with a specific purpose: + +- **stac_fastapi_core**: Core functionality that's database-agnostic, including API models, extensions, and shared utilities. This package provides the foundation for building STAC API implementations with any database backend. See [stac-fastapi-mongo](https://github.com/Healy-Hyperspatial/stac-fastapi-mongo) for a working example. + +- **sfeos_helpers**: Shared helper functions and utilities used by both the Elasticsearch and OpenSearch backends. This package includes: + - `database`: Specialized modules for index, document, and database utility operations + - `aggregation`: Elasticsearch/OpenSearch-specific aggregation functionality + - Shared logic and utilities that improve code reuse between backends + +- **stac_fastapi_elasticsearch**: Complete implementation of the STAC API using Elasticsearch as the backend database. This package depends on both `stac_fastapi_core` and `sfeos_helpers`. +- +- **stac_fastapi_opensearch**: Complete implementation of the STAC API using OpenSearch as the backend database. This package depends on both `stac_fastapi_core` and `sfeos_helpers`. + ## Examples The `/examples` directory contains several useful examples and reference implementations: @@ -181,6 +196,7 @@ There are two main ways to run the API locally: - **Compatibility**: The most recent Elasticsearch 7.x versions should also work. See the [opensearch-py docs](https://github.com/opensearch-project/opensearch-py/blob/main/COMPATIBILITY.md) for compatibility information. + ## Configuration Reference You can customize additional settings in your `.env` file: @@ -518,5 +534,6 @@ You can customize additional settings in your `.env` file: - Limits each client to a specified number of requests per time period (e.g., 500 requests per minute) - Helps prevent API abuse and maintains system stability - Ensures fair resource allocation among all clients - + - **Examples**: Implementation examples are available in the [examples/rate_limit](examples/rate_limit) directory. + diff --git a/compose.yml b/compose.yml index 125f6539..240934d6 100644 --- a/compose.yml +++ b/compose.yml @@ -9,7 +9,7 @@ services: environment: - STAC_FASTAPI_TITLE=stac-fastapi-elasticsearch - STAC_FASTAPI_DESCRIPTION=A STAC FastAPI with an Elasticsearch backend - - STAC_FASTAPI_VERSION=4.2.0 + - STAC_FASTAPI_VERSION=5.0.0a0 - STAC_FASTAPI_LANDING_PAGE_ID=stac-fastapi-elasticsearch - APP_HOST=0.0.0.0 - APP_PORT=8080 @@ -42,7 +42,7 @@ services: environment: - STAC_FASTAPI_TITLE=stac-fastapi-opensearch - STAC_FASTAPI_DESCRIPTION=A STAC FastAPI with an Opensearch backend - - STAC_FASTAPI_VERSION=4.2.0 + - STAC_FASTAPI_VERSION=5.0.0a0 - STAC_FASTAPI_LANDING_PAGE_ID=stac-fastapi-opensearch - APP_HOST=0.0.0.0 - APP_PORT=8082 diff --git a/dockerfiles/Dockerfile.ci.es b/dockerfiles/Dockerfile.ci.es index a6fb6a53..5bd3853b 100644 --- a/dockerfiles/Dockerfile.ci.es +++ b/dockerfiles/Dockerfile.ci.es @@ -12,6 +12,7 @@ RUN apt-get update && \ COPY . /app/ RUN pip3 install --no-cache-dir -e ./stac_fastapi/core && \ + pip3 install --no-cache-dir -e ./stac_fastapi/sfeos_helpers && \ pip3 install --no-cache-dir ./stac_fastapi/elasticsearch[server] USER root diff --git a/dockerfiles/Dockerfile.ci.os b/dockerfiles/Dockerfile.ci.os index a046a3b6..e359f1a8 100644 --- a/dockerfiles/Dockerfile.ci.os +++ b/dockerfiles/Dockerfile.ci.os @@ -12,6 +12,7 @@ RUN apt-get update && \ COPY . /app/ RUN pip3 install --no-cache-dir -e ./stac_fastapi/core && \ + pip3 install --no-cache-dir -e ./stac_fastapi/sfeos_helpers && \ pip3 install --no-cache-dir ./stac_fastapi/opensearch[server] USER root diff --git a/dockerfiles/Dockerfile.deploy.es b/dockerfiles/Dockerfile.deploy.es index 2eab7b9d..2a6fc4fc 100644 --- a/dockerfiles/Dockerfile.deploy.es +++ b/dockerfiles/Dockerfile.deploy.es @@ -13,6 +13,7 @@ WORKDIR /app COPY . /app RUN pip install --no-cache-dir -e ./stac_fastapi/core +RUN pip install --no-cache-dir -e ./stac_fastapi/sfeos_helpers RUN pip install --no-cache-dir ./stac_fastapi/elasticsearch[server] EXPOSE 8080 diff --git a/dockerfiles/Dockerfile.deploy.os b/dockerfiles/Dockerfile.deploy.os index 035b181e..8a532f0c 100644 --- a/dockerfiles/Dockerfile.deploy.os +++ b/dockerfiles/Dockerfile.deploy.os @@ -13,6 +13,7 @@ WORKDIR /app COPY . /app RUN pip install --no-cache-dir -e ./stac_fastapi/core +RUN pip install --no-cache-dir -e ./stac_fastapi/sfeos_helpers RUN pip install --no-cache-dir ./stac_fastapi/opensearch[server] EXPOSE 8080 diff --git a/dockerfiles/Dockerfile.dev.es b/dockerfiles/Dockerfile.dev.es index 009f9681..7a01aca8 100644 --- a/dockerfiles/Dockerfile.dev.es +++ b/dockerfiles/Dockerfile.dev.es @@ -16,4 +16,5 @@ WORKDIR /app COPY . /app RUN pip install --no-cache-dir -e ./stac_fastapi/core +RUN pip install --no-cache-dir -e ./stac_fastapi/sfeos_helpers RUN pip install --no-cache-dir -e ./stac_fastapi/elasticsearch[dev,server] diff --git a/dockerfiles/Dockerfile.dev.os b/dockerfiles/Dockerfile.dev.os index d9dc8b0a..28012dfb 100644 --- a/dockerfiles/Dockerfile.dev.os +++ b/dockerfiles/Dockerfile.dev.os @@ -16,4 +16,5 @@ WORKDIR /app COPY . /app RUN pip install --no-cache-dir -e ./stac_fastapi/core +RUN pip install --no-cache-dir -e ./stac_fastapi/sfeos_helpers RUN pip install --no-cache-dir -e ./stac_fastapi/opensearch[dev,server] diff --git a/dockerfiles/Dockerfile.docs b/dockerfiles/Dockerfile.docs index f1fe63b8..aa080c7c 100644 --- a/dockerfiles/Dockerfile.docs +++ b/dockerfiles/Dockerfile.docs @@ -12,6 +12,7 @@ WORKDIR /opt/src RUN python -m pip install \ stac_fastapi/core \ + stac_fastapi/sfeos_helpers \ stac_fastapi/elasticsearch \ stac_fastapi/opensearch diff --git a/examples/auth/compose.basic_auth.yml b/examples/auth/compose.basic_auth.yml index e603f130..eb7b2d75 100644 --- a/examples/auth/compose.basic_auth.yml +++ b/examples/auth/compose.basic_auth.yml @@ -9,7 +9,7 @@ services: environment: - STAC_FASTAPI_TITLE=stac-fastapi-elasticsearch - STAC_FASTAPI_DESCRIPTION=A STAC FastAPI with an Elasticsearch backend - - STAC_FASTAPI_VERSION=4.2.0 + - STAC_FASTAPI_VERSION=5.0.0a0 - STAC_FASTAPI_LANDING_PAGE_ID=stac-fastapi-elasticsearch - APP_HOST=0.0.0.0 - APP_PORT=8080 @@ -43,7 +43,7 @@ services: environment: - STAC_FASTAPI_TITLE=stac-fastapi-opensearch - STAC_FASTAPI_DESCRIPTION=A STAC FastAPI with an Opensearch backend - - STAC_FASTAPI_VERSION=4.2.0 + - STAC_FASTAPI_VERSION=5.0.0a0 - STAC_FASTAPI_LANDING_PAGE_ID=stac-fastapi-opensearch - APP_HOST=0.0.0.0 - APP_PORT=8082 diff --git a/examples/auth/compose.oauth2.yml b/examples/auth/compose.oauth2.yml index 3a2f1982..f739e03a 100644 --- a/examples/auth/compose.oauth2.yml +++ b/examples/auth/compose.oauth2.yml @@ -9,7 +9,7 @@ services: environment: - STAC_FASTAPI_TITLE=stac-fastapi-elasticsearch - STAC_FASTAPI_DESCRIPTION=A STAC FastAPI with an Elasticsearch backend - - STAC_FASTAPI_VERSION=4.2.0 + - STAC_FASTAPI_VERSION=5.0.0a0 - STAC_FASTAPI_LANDING_PAGE_ID=stac-fastapi-elasticsearch - APP_HOST=0.0.0.0 - APP_PORT=8080 @@ -44,7 +44,7 @@ services: environment: - STAC_FASTAPI_TITLE=stac-fastapi-opensearch - STAC_FASTAPI_DESCRIPTION=A STAC FastAPI with an Opensearch backend - - STAC_FASTAPI_VERSION=4.2.0 + - STAC_FASTAPI_VERSION=5.0.0a0 - STAC_FASTAPI_LANDING_PAGE_ID=stac-fastapi-opensearch - APP_HOST=0.0.0.0 - APP_PORT=8082 diff --git a/examples/auth/compose.route_dependencies.yml b/examples/auth/compose.route_dependencies.yml index 967f9be6..3a11b1ad 100644 --- a/examples/auth/compose.route_dependencies.yml +++ b/examples/auth/compose.route_dependencies.yml @@ -9,7 +9,7 @@ services: environment: - STAC_FASTAPI_TITLE=stac-fastapi-elasticsearch - STAC_FASTAPI_DESCRIPTION=A STAC FastAPI with an Elasticsearch backend - - STAC_FASTAPI_VERSION=4.2.0 + - STAC_FASTAPI_VERSION=5.0.0a0 - STAC_FASTAPI_LANDING_PAGE_ID=stac-fastapi-elasticsearch - APP_HOST=0.0.0.0 - APP_PORT=8080 @@ -43,7 +43,7 @@ services: environment: - STAC_FASTAPI_TITLE=stac-fastapi-opensearch - STAC_FASTAPI_DESCRIPTION=A STAC FastAPI with an Opensearch backend - - STAC_FASTAPI_VERSION=4.2.0 + - STAC_FASTAPI_VERSION=5.0.0a0 - STAC_FASTAPI_LANDING_PAGE_ID=stac-fastapi-opensearch - APP_HOST=0.0.0.0 - APP_PORT=8082 diff --git a/examples/rate_limit/compose.rate_limit.yml b/examples/rate_limit/compose.rate_limit.yml index d1631f7b..448c7760 100644 --- a/examples/rate_limit/compose.rate_limit.yml +++ b/examples/rate_limit/compose.rate_limit.yml @@ -9,7 +9,7 @@ services: environment: - STAC_FASTAPI_TITLE=stac-fastapi-elasticsearch - STAC_FASTAPI_DESCRIPTION=A STAC FastAPI with an Elasticsearch backend - - STAC_FASTAPI_VERSION=4.2.0 + - STAC_FASTAPI_VERSION=5.0.0a0 - STAC_FASTAPI_LANDING_PAGE_ID=stac-fastapi-elasticsearch - APP_HOST=0.0.0.0 - APP_PORT=8080 @@ -43,7 +43,7 @@ services: environment: - STAC_FASTAPI_TITLE=stac-fastapi-opensearch - STAC_FASTAPI_DESCRIPTION=A STAC FastAPI with an Opensearch backend - - STAC_FASTAPI_VERSION=4.2.0 + - STAC_FASTAPI_VERSION=5.0.0a0 - APP_HOST=0.0.0.0 - APP_PORT=8082 - RELOAD=true diff --git a/stac_fastapi/core/stac_fastapi/core/core.py b/stac_fastapi/core/stac_fastapi/core/core.py index 05212f5b..866b429a 100644 --- a/stac_fastapi/core/stac_fastapi/core/core.py +++ b/stac_fastapi/core/stac_fastapi/core/core.py @@ -1,11 +1,10 @@ """Core client.""" import logging -from collections import deque from datetime import datetime as datetime_type from datetime import timezone from enum import Enum -from typing import Any, Dict, List, Literal, Optional, Set, Type, Union +from typing import List, Optional, Set, Type, Union from urllib.parse import unquote_plus, urljoin import attr @@ -22,11 +21,11 @@ from stac_fastapi.core.base_database_logic import BaseDatabaseLogic from stac_fastapi.core.base_settings import ApiBaseSettings +from stac_fastapi.core.datetime_utils import format_datetime_range from stac_fastapi.core.models.links import PagingLinks from stac_fastapi.core.serializers import CollectionSerializer, ItemSerializer from stac_fastapi.core.session import Session from stac_fastapi.core.utilities import filter_fields -from stac_fastapi.extensions.core.filter.client import AsyncBaseFiltersClient from stac_fastapi.extensions.third_party.bulk_transactions import ( BaseBulkTransactionsClient, BulkTransactionMethod, @@ -37,7 +36,6 @@ from stac_fastapi.types.core import AsyncBaseCoreClient, AsyncBaseTransactionsClient from stac_fastapi.types.extension import ApiExtension from stac_fastapi.types.requests import get_base_url -from stac_fastapi.types.rfc3339 import DateTimeType, rfc3339_str_to_datetime from stac_fastapi.types.search import BaseSearchPostRequest logger = logging.getLogger(__name__) @@ -318,9 +316,8 @@ async def item_collection( ) if datetime: - datetime_search = self._return_date(datetime) search = self.database.apply_datetime_filter( - search=search, datetime_search=datetime_search + search=search, interval=datetime ) if bbox: @@ -374,87 +371,6 @@ async def get_item( ) return self.item_serializer.db_to_stac(item, base_url) - @staticmethod - def _return_date( - interval: Optional[Union[DateTimeType, str]] - ) -> Dict[str, Optional[str]]: - """ - Convert a date interval. - - (which may be a datetime, a tuple of one or two datetimes a string - representing a datetime or range, or None) into a dictionary for filtering - search results with Elasticsearch. - - This function ensures the output dictionary contains 'gte' and 'lte' keys, - even if they are set to None, to prevent KeyError in the consuming logic. - - Args: - interval (Optional[Union[DateTimeType, str]]): The date interval, which might be a single datetime, - a tuple with one or two datetimes, a string, or None. - - Returns: - dict: A dictionary representing the date interval for use in filtering search results, - always containing 'gte' and 'lte' keys. - """ - result: Dict[str, Optional[str]] = {"gte": None, "lte": None} - - if interval is None: - return result - - if isinstance(interval, str): - if "/" in interval: - parts = interval.split("/") - result["gte"] = parts[0] if parts[0] != ".." else None - result["lte"] = ( - parts[1] if len(parts) > 1 and parts[1] != ".." else None - ) - else: - converted_time = interval if interval != ".." else None - result["gte"] = result["lte"] = converted_time - return result - - if isinstance(interval, datetime_type): - datetime_iso = interval.isoformat() - result["gte"] = result["lte"] = datetime_iso - elif isinstance(interval, tuple): - start, end = interval - # Ensure datetimes are converted to UTC and formatted with 'Z' - if start: - result["gte"] = start.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" - if end: - result["lte"] = end.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" - - return result - - def _format_datetime_range(self, date_str: str) -> str: - """ - Convert a datetime range string into a normalized UTC string for API requests using rfc3339_str_to_datetime. - - Args: - date_str (str): A string containing two datetime values separated by a '/'. - - Returns: - str: A string formatted as 'YYYY-MM-DDTHH:MM:SSZ/YYYY-MM-DDTHH:MM:SSZ', with '..' used if any element is None. - """ - - def normalize(dt): - dt = dt.strip() - if not dt or dt == "..": - return ".." - dt_obj = rfc3339_str_to_datetime(dt) - dt_utc = dt_obj.astimezone(timezone.utc) - return dt_utc.strftime("%Y-%m-%dT%H:%M:%SZ") - - if not isinstance(date_str, str): - return "../.." - if "/" not in date_str: - return f"{normalize(date_str)}/{normalize(date_str)}" - try: - start, end = date_str.split("/", 1) - except Exception: - return "../.." - return f"{normalize(start)}/{normalize(end)}" - async def get_search( self, request: Request, @@ -506,7 +422,7 @@ async def get_search( } if datetime: - base_args["datetime"] = self._format_datetime_range(date_str=datetime) + base_args["datetime"] = format_datetime_range(date_str=datetime) if intersects: base_args["intersects"] = orjson.loads(unquote_plus(intersects)) @@ -576,9 +492,8 @@ async def post_search( ) if search_request.datetime: - datetime_search = self._return_date(search_request.datetime) search = self.database.apply_datetime_filter( - search=search, datetime_search=datetime_search + search=search, interval=search_request.datetime ) if search_request.bbox: @@ -947,159 +862,3 @@ def bulk_item_insert( logger.info(f"Bulk sync operation succeeded with {success} actions.") return f"Successfully added/updated {success} Items. {attempted - success} errors occurred." - - -_DEFAULT_QUERYABLES: Dict[str, Dict[str, Any]] = { - "id": { - "description": "ID", - "$ref": "https://schemas.stacspec.org/v1.0.0/item-spec/json-schema/item.json#/definitions/core/allOf/2/properties/id", - }, - "collection": { - "description": "Collection", - "$ref": "https://schemas.stacspec.org/v1.0.0/item-spec/json-schema/item.json#/definitions/core/allOf/2/then/properties/collection", - }, - "geometry": { - "description": "Geometry", - "$ref": "https://schemas.stacspec.org/v1.0.0/item-spec/json-schema/item.json#/definitions/core/allOf/1/oneOf/0/properties/geometry", - }, - "datetime": { - "description": "Acquisition Timestamp", - "$ref": "https://schemas.stacspec.org/v1.0.0/item-spec/json-schema/datetime.json#/properties/datetime", - }, - "created": { - "description": "Creation Timestamp", - "$ref": "https://schemas.stacspec.org/v1.0.0/item-spec/json-schema/datetime.json#/properties/created", - }, - "updated": { - "description": "Creation Timestamp", - "$ref": "https://schemas.stacspec.org/v1.0.0/item-spec/json-schema/datetime.json#/properties/updated", - }, - "cloud_cover": { - "description": "Cloud Cover", - "$ref": "https://stac-extensions.github.io/eo/v1.0.0/schema.json#/definitions/fields/properties/eo:cloud_cover", - }, - "cloud_shadow_percentage": { - "title": "Cloud Shadow Percentage", - "description": "Cloud Shadow Percentage", - "type": "number", - "minimum": 0, - "maximum": 100, - }, - "nodata_pixel_percentage": { - "title": "No Data Pixel Percentage", - "description": "No Data Pixel Percentage", - "type": "number", - "minimum": 0, - "maximum": 100, - }, -} - -_ES_MAPPING_TYPE_TO_JSON: Dict[ - str, Literal["string", "number", "boolean", "object", "array", "null"] -] = { - "date": "string", - "date_nanos": "string", - "keyword": "string", - "match_only_text": "string", - "text": "string", - "wildcard": "string", - "byte": "number", - "double": "number", - "float": "number", - "half_float": "number", - "long": "number", - "scaled_float": "number", - "short": "number", - "token_count": "number", - "unsigned_long": "number", - "geo_point": "object", - "geo_shape": "object", - "nested": "array", -} - - -@attr.s -class EsAsyncBaseFiltersClient(AsyncBaseFiltersClient): - """Defines a pattern for implementing the STAC filter extension.""" - - database: BaseDatabaseLogic = attr.ib() - - async def get_queryables( - self, collection_id: Optional[str] = None, **kwargs - ) -> Dict[str, Any]: - """Get the queryables available for the given collection_id. - - If collection_id is None, returns the intersection of all - queryables over all collections. - - This base implementation returns a blank queryable schema. This is not allowed - under OGC CQL but it is allowed by the STAC API Filter Extension - - https://github.com/radiantearth/stac-api-spec/tree/master/fragments/filter#queryables - - Args: - collection_id (str, optional): The id of the collection to get queryables for. - **kwargs: additional keyword arguments - - Returns: - Dict[str, Any]: A dictionary containing the queryables for the given collection. - """ - queryables: Dict[str, Any] = { - "$schema": "https://json-schema.org/draft/2019-09/schema", - "$id": "https://stac-api.example.com/queryables", - "type": "object", - "title": "Queryables for STAC API", - "description": "Queryable names for the STAC API Item Search filter.", - "properties": _DEFAULT_QUERYABLES, - "additionalProperties": True, - } - if not collection_id: - return queryables - - properties: Dict[str, Any] = queryables["properties"] - queryables.update( - { - "properties": properties, - "additionalProperties": False, - } - ) - - mapping_data = await self.database.get_items_mapping(collection_id) - mapping_properties = next(iter(mapping_data.values()))["mappings"]["properties"] - stack = deque(mapping_properties.items()) - - while stack: - field_name, field_def = stack.popleft() - - # Iterate over nested fields - field_properties = field_def.get("properties") - if field_properties: - # Fields in Item Properties should be exposed with their un-prefixed names, - # and not require expressions to prefix them with properties, - # e.g., eo:cloud_cover instead of properties.eo:cloud_cover. - if field_name == "properties": - stack.extend(field_properties.items()) - else: - stack.extend( - (f"{field_name}.{k}", v) for k, v in field_properties.items() - ) - - # Skip non-indexed or disabled fields - field_type = field_def.get("type") - if not field_type or not field_def.get("enabled", True): - continue - - # Generate field properties - field_result = _DEFAULT_QUERYABLES.get(field_name, {}) - properties[field_name] = field_result - - field_name_human = field_name.replace("_", " ").title() - field_result.setdefault("title", field_name_human) - - field_type_json = _ES_MAPPING_TYPE_TO_JSON.get(field_type, field_type) - field_result.setdefault("type", field_type_json) - - if field_type in {"date", "date_nanos"}: - field_result.setdefault("format", "date-time") - - return queryables diff --git a/stac_fastapi/core/stac_fastapi/core/datetime_utils.py b/stac_fastapi/core/stac_fastapi/core/datetime_utils.py index 3d6dd663..f9dbacf5 100644 --- a/stac_fastapi/core/stac_fastapi/core/datetime_utils.py +++ b/stac_fastapi/core/stac_fastapi/core/datetime_utils.py @@ -1,6 +1,38 @@ -"""A few datetime methods.""" +"""Utility functions to handle datetime parsing.""" from datetime import datetime, timezone +from stac_fastapi.types.rfc3339 import rfc3339_str_to_datetime + + +def format_datetime_range(date_str: str) -> str: + """ + Convert a datetime range string into a normalized UTC string for API requests using rfc3339_str_to_datetime. + + Args: + date_str (str): A string containing two datetime values separated by a '/'. + + Returns: + str: A string formatted as 'YYYY-MM-DDTHH:MM:SSZ/YYYY-MM-DDTHH:MM:SSZ', with '..' used if any element is None. + """ + + def normalize(dt): + dt = dt.strip() + if not dt or dt == "..": + return ".." + dt_obj = rfc3339_str_to_datetime(dt) + dt_utc = dt_obj.astimezone(timezone.utc) + return dt_utc.strftime("%Y-%m-%dT%H:%M:%SZ") + + if not isinstance(date_str, str): + return "../.." + if "/" not in date_str: + return f"{normalize(date_str)}/{normalize(date_str)}" + try: + start, end = date_str.split("/", 1) + except Exception: + return "../.." + return f"{normalize(start)}/{normalize(end)}" + # Borrowed from pystac - https://github.com/stac-utils/pystac/blob/f5e4cf4a29b62e9ef675d4a4dac7977b09f53c8f/pystac/utils.py#L370-L394 def datetime_to_str(dt: datetime, timespec: str = "auto") -> str: diff --git a/stac_fastapi/core/stac_fastapi/core/extensions/aggregation.py b/stac_fastapi/core/stac_fastapi/core/extensions/aggregation.py index d41d763c..cdce486f 100644 --- a/stac_fastapi/core/stac_fastapi/core/extensions/aggregation.py +++ b/stac_fastapi/core/stac_fastapi/core/extensions/aggregation.py @@ -1,36 +1,19 @@ """Request model for the Aggregation extension.""" -from datetime import datetime -from datetime import datetime as datetime_type -from typing import Dict, List, Literal, Optional, Union -from urllib.parse import unquote_plus, urljoin +from typing import Literal, Optional import attr -import orjson -from fastapi import HTTPException, Path, Request -from pygeofilter.backends.cql2_json import to_cql2 -from pygeofilter.parsers.cql2_text import parse as parse_cql2_text -from stac_pydantic.shared import BBox +from fastapi import Path from typing_extensions import Annotated -from stac_fastapi.core.base_database_logic import BaseDatabaseLogic -from stac_fastapi.core.base_settings import ApiBaseSettings -from stac_fastapi.core.datetime_utils import datetime_to_str -from stac_fastapi.core.session import Session -from stac_fastapi.extensions.core.aggregation.client import AsyncBaseAggregationClient from stac_fastapi.extensions.core.aggregation.request import ( AggregationExtensionGetRequest, AggregationExtensionPostRequest, ) -from stac_fastapi.extensions.core.aggregation.types import ( - Aggregation, - AggregationCollection, -) from stac_fastapi.extensions.core.filter.request import ( FilterExtensionGetRequest, FilterExtensionPostRequest, ) -from stac_fastapi.types.rfc3339 import DateTimeType FilterLang = Literal["cql-json", "cql2-json", "cql2-text"] @@ -64,514 +47,3 @@ class EsAggregationExtensionPostRequest( geometry_geohash_grid_frequency_precision: Optional[int] = None geometry_geotile_grid_frequency_precision: Optional[int] = None datetime_frequency_interval: Optional[str] = None - - -@attr.s -class EsAsyncAggregationClient(AsyncBaseAggregationClient): - """Defines a pattern for implementing the STAC aggregation extension.""" - - database: BaseDatabaseLogic = attr.ib() - settings: ApiBaseSettings = attr.ib() - session: Session = attr.ib(default=attr.Factory(Session.create_from_env)) - - DEFAULT_AGGREGATIONS = [ - {"name": "total_count", "data_type": "integer"}, - {"name": "datetime_max", "data_type": "datetime"}, - {"name": "datetime_min", "data_type": "datetime"}, - { - "name": "datetime_frequency", - "data_type": "frequency_distribution", - "frequency_distribution_data_type": "datetime", - }, - { - "name": "collection_frequency", - "data_type": "frequency_distribution", - "frequency_distribution_data_type": "string", - }, - { - "name": "geometry_geohash_grid_frequency", - "data_type": "frequency_distribution", - "frequency_distribution_data_type": "string", - }, - { - "name": "geometry_geotile_grid_frequency", - "data_type": "frequency_distribution", - "frequency_distribution_data_type": "string", - }, - ] - - GEO_POINT_AGGREGATIONS = [ - { - "name": "grid_code_frequency", - "data_type": "frequency_distribution", - "frequency_distribution_data_type": "string", - }, - { - "name": "centroid_geohash_grid_frequency", - "data_type": "frequency_distribution", - "frequency_distribution_data_type": "string", - }, - { - "name": "centroid_geohex_grid_frequency", - "data_type": "frequency_distribution", - "frequency_distribution_data_type": "string", - }, - { - "name": "centroid_geotile_grid_frequency", - "data_type": "frequency_distribution", - "frequency_distribution_data_type": "string", - }, - ] - - MAX_GEOHASH_PRECISION = 12 - MAX_GEOHEX_PRECISION = 15 - MAX_GEOTILE_PRECISION = 29 - SUPPORTED_DATETIME_INTERVAL = {"day", "month", "year"} - DEFAULT_DATETIME_INTERVAL = "month" - - async def get_aggregations(self, collection_id: Optional[str] = None, **kwargs): - """Get the available aggregations for a catalog or collection defined in the STAC JSON. If no aggregations, default aggregations are used.""" - request: Request = kwargs["request"] - base_url = str(request.base_url) - links = [{"rel": "root", "type": "application/json", "href": base_url}] - - if collection_id is not None: - collection_endpoint = urljoin(base_url, f"collections/{collection_id}") - links.extend( - [ - { - "rel": "collection", - "type": "application/json", - "href": collection_endpoint, - }, - { - "rel": "self", - "type": "application/json", - "href": urljoin(collection_endpoint + "/", "aggregations"), - }, - ] - ) - if await self.database.check_collection_exists(collection_id) is None: - collection = await self.database.find_collection(collection_id) - aggregations = collection.get( - "aggregations", self.DEFAULT_AGGREGATIONS.copy() - ) - else: - raise IndexError(f"Collection {collection_id} does not exist") - else: - links.append( - { - "rel": "self", - "type": "application/json", - "href": urljoin(base_url, "aggregations"), - } - ) - - aggregations = self.DEFAULT_AGGREGATIONS - return AggregationCollection( - type="AggregationCollection", aggregations=aggregations, links=links - ) - - def extract_precision( - self, precision: Union[int, None], min_value: int, max_value: int - ) -> Optional[int]: - """Ensure that the aggregation precision value is withing the a valid range, otherwise return the minumium value.""" - if precision is not None: - if precision < min_value or precision > max_value: - raise HTTPException( - status_code=400, - detail=f"Invalid precision. Must be a number between {min_value} and {max_value} inclusive", - ) - return precision - else: - return min_value - - def extract_date_histogram_interval(self, value: Optional[str]) -> str: - """ - Ensure that the interval for the date histogram is valid. If no value is provided, the default will be returned. - - Args: - value: value entered by the user - - Returns: - string containing the date histogram interval to use. - - Raises: - HTTPException: if the supplied value is not in the supported intervals - """ - if value is not None: - if value not in self.SUPPORTED_DATETIME_INTERVAL: - raise HTTPException( - status_code=400, - detail=f"Invalid datetime interval. Must be one of {self.SUPPORTED_DATETIME_INTERVAL}", - ) - else: - return value - else: - return self.DEFAULT_DATETIME_INTERVAL - - @staticmethod - def _return_date( - interval: Optional[Union[DateTimeType, str]] - ) -> Dict[str, Optional[str]]: - """ - Convert a date interval. - - (which may be a datetime, a tuple of one or two datetimes a string - representing a datetime or range, or None) into a dictionary for filtering - search results with Elasticsearch. - - This function ensures the output dictionary contains 'gte' and 'lte' keys, - even if they are set to None, to prevent KeyError in the consuming logic. - - Args: - interval (Optional[Union[DateTimeType, str]]): The date interval, which might be a single datetime, - a tuple with one or two datetimes, a string, or None. - - Returns: - dict: A dictionary representing the date interval for use in filtering search results, - always containing 'gte' and 'lte' keys. - """ - result: Dict[str, Optional[str]] = {"gte": None, "lte": None} - - if interval is None: - return result - - if isinstance(interval, str): - if "/" in interval: - parts = interval.split("/") - result["gte"] = parts[0] if parts[0] != ".." else None - result["lte"] = ( - parts[1] if len(parts) > 1 and parts[1] != ".." else None - ) - else: - converted_time = interval if interval != ".." else None - result["gte"] = result["lte"] = converted_time - return result - - if isinstance(interval, datetime_type): - datetime_iso = interval.isoformat() - result["gte"] = result["lte"] = datetime_iso - elif isinstance(interval, tuple): - start, end = interval - # Ensure datetimes are converted to UTC and formatted with 'Z' - if start: - result["gte"] = start.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" - if end: - result["lte"] = end.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" - - return result - - def frequency_agg(self, es_aggs, name, data_type): - """Format an aggregation for a frequency distribution aggregation.""" - buckets = [] - for bucket in es_aggs.get(name, {}).get("buckets", []): - bucket_data = { - "key": bucket.get("key_as_string") or bucket.get("key"), - "data_type": data_type, - "frequency": bucket.get("doc_count"), - "to": bucket.get("to"), - "from": bucket.get("from"), - } - buckets.append(bucket_data) - return Aggregation( - name=name, - data_type="frequency_distribution", - overflow=es_aggs.get(name, {}).get("sum_other_doc_count", 0), - buckets=buckets, - ) - - def metric_agg(self, es_aggs, name, data_type): - """Format an aggregation for a metric aggregation.""" - value = es_aggs.get(name, {}).get("value_as_string") or es_aggs.get( - name, {} - ).get("value") - # ES 7.x does not return datetimes with a 'value_as_string' field - if "datetime" in name and isinstance(value, float): - value = datetime_to_str(datetime.fromtimestamp(value / 1e3)) - return Aggregation( - name=name, - data_type=data_type, - value=value, - ) - - def get_filter(self, filter, filter_lang): - """Format the filter parameter in cql2-json or cql2-text.""" - if filter_lang == "cql2-text": - return orjson.loads(to_cql2(parse_cql2_text(filter))) - elif filter_lang == "cql2-json": - if isinstance(filter, str): - return orjson.loads(unquote_plus(filter)) - else: - return filter - else: - raise HTTPException( - status_code=400, - detail=f"Unknown filter-lang: {filter_lang}. Only cql2-json or cql2-text are supported.", - ) - - def _format_datetime_range(self, date_tuple: DateTimeType) -> str: - """ - Convert a tuple of datetime objects or None into a formatted string for API requests. - - Args: - date_tuple (tuple): A tuple containing two elements, each can be a datetime object or None. - - Returns: - str: A string formatted as 'YYYY-MM-DDTHH:MM:SS.sssZ/YYYY-MM-DDTHH:MM:SS.sssZ', with '..' used if any element is None. - """ - - def format_datetime(dt): - """Format a single datetime object to the ISO8601 extended format with 'Z'.""" - return dt.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" if dt else ".." - - start, end = date_tuple - return f"{format_datetime(start)}/{format_datetime(end)}" - - async def aggregate( - self, - aggregate_request: Optional[EsAggregationExtensionPostRequest] = None, - collection_id: Optional[ - Annotated[str, Path(description="Collection ID")] - ] = None, - collections: Optional[List[str]] = [], - datetime: Optional[DateTimeType] = None, - intersects: Optional[str] = None, - filter_lang: Optional[str] = None, - filter_expr: Optional[str] = None, - aggregations: Optional[str] = None, - ids: Optional[List[str]] = None, - bbox: Optional[BBox] = None, - centroid_geohash_grid_frequency_precision: Optional[int] = None, - centroid_geohex_grid_frequency_precision: Optional[int] = None, - centroid_geotile_grid_frequency_precision: Optional[int] = None, - geometry_geohash_grid_frequency_precision: Optional[int] = None, - geometry_geotile_grid_frequency_precision: Optional[int] = None, - datetime_frequency_interval: Optional[str] = None, - **kwargs, - ) -> Union[Dict, Exception]: - """Get aggregations from the database.""" - request: Request = kwargs["request"] - base_url = str(request.base_url) - path = request.url.path - search = self.database.make_search() - - if aggregate_request is None: - - base_args = { - "collections": collections, - "ids": ids, - "bbox": bbox, - "aggregations": aggregations, - "centroid_geohash_grid_frequency_precision": centroid_geohash_grid_frequency_precision, - "centroid_geohex_grid_frequency_precision": centroid_geohex_grid_frequency_precision, - "centroid_geotile_grid_frequency_precision": centroid_geotile_grid_frequency_precision, - "geometry_geohash_grid_frequency_precision": geometry_geohash_grid_frequency_precision, - "geometry_geotile_grid_frequency_precision": geometry_geotile_grid_frequency_precision, - "datetime_frequency_interval": datetime_frequency_interval, - } - - if collection_id: - collections = [str(collection_id)] - - if intersects: - base_args["intersects"] = orjson.loads(unquote_plus(intersects)) - - if datetime: - base_args["datetime"] = self._format_datetime_range(datetime) - - if filter_expr: - base_args["filter"] = self.get_filter(filter_expr, filter_lang) - aggregate_request = EsAggregationExtensionPostRequest(**base_args) - else: - # Workaround for optional path param in POST requests - if "collections" in path: - collection_id = path.split("/")[2] - - filter_lang = "cql2-json" - if aggregate_request.filter_expr: - aggregate_request.filter_expr = self.get_filter( - aggregate_request.filter_expr, filter_lang - ) - - if collection_id: - if aggregate_request.collections: - raise HTTPException( - status_code=400, - detail="Cannot query multiple collections when executing '/collections//aggregate'. Use '/aggregate' and the collections field instead", - ) - else: - aggregate_request.collections = [collection_id] - - if ( - aggregate_request.aggregations is None - or aggregate_request.aggregations == [] - ): - raise HTTPException( - status_code=400, - detail="No 'aggregations' found. Use '/aggregations' to return available aggregations", - ) - - if aggregate_request.ids: - search = self.database.apply_ids_filter( - search=search, item_ids=aggregate_request.ids - ) - - if aggregate_request.datetime: - datetime_search = self._return_date(aggregate_request.datetime) - search = self.database.apply_datetime_filter( - search=search, datetime_search=datetime_search - ) - - if aggregate_request.bbox: - bbox = aggregate_request.bbox - if len(bbox) == 6: - bbox = [bbox[0], bbox[1], bbox[3], bbox[4]] - - search = self.database.apply_bbox_filter(search=search, bbox=bbox) - - if aggregate_request.intersects: - search = self.database.apply_intersects_filter( - search=search, intersects=aggregate_request.intersects - ) - - if aggregate_request.collections: - search = self.database.apply_collections_filter( - search=search, collection_ids=aggregate_request.collections - ) - # validate that aggregations are supported for all collections - for collection_id in aggregate_request.collections: - aggs = await self.get_aggregations( - collection_id=collection_id, request=request - ) - supported_aggregations = ( - aggs["aggregations"] + self.DEFAULT_AGGREGATIONS - ) - - for agg_name in aggregate_request.aggregations: - if agg_name not in set([x["name"] for x in supported_aggregations]): - raise HTTPException( - status_code=400, - detail=f"Aggregation {agg_name} not supported by collection {collection_id}", - ) - else: - # Validate that the aggregations requested are supported by the catalog - aggs = await self.get_aggregations(request=request) - supported_aggregations = aggs["aggregations"] - for agg_name in aggregate_request.aggregations: - if agg_name not in [x["name"] for x in supported_aggregations]: - raise HTTPException( - status_code=400, - detail=f"Aggregation {agg_name} not supported at catalog level", - ) - - if aggregate_request.filter_expr: - try: - search = await self.database.apply_cql2_filter( - search, aggregate_request.filter_expr - ) - except Exception as e: - raise HTTPException( - status_code=400, detail=f"Error with cql2 filter: {e}" - ) - - centroid_geohash_grid_precision = self.extract_precision( - aggregate_request.centroid_geohash_grid_frequency_precision, - 1, - self.MAX_GEOHASH_PRECISION, - ) - - centroid_geohex_grid_precision = self.extract_precision( - aggregate_request.centroid_geohex_grid_frequency_precision, - 0, - self.MAX_GEOHEX_PRECISION, - ) - - centroid_geotile_grid_precision = self.extract_precision( - aggregate_request.centroid_geotile_grid_frequency_precision, - 0, - self.MAX_GEOTILE_PRECISION, - ) - - geometry_geohash_grid_precision = self.extract_precision( - aggregate_request.geometry_geohash_grid_frequency_precision, - 1, - self.MAX_GEOHASH_PRECISION, - ) - - geometry_geotile_grid_precision = self.extract_precision( - aggregate_request.geometry_geotile_grid_frequency_precision, - 0, - self.MAX_GEOTILE_PRECISION, - ) - - datetime_frequency_interval = self.extract_date_histogram_interval( - aggregate_request.datetime_frequency_interval, - ) - - try: - db_response = await self.database.aggregate( - collections, - aggregate_request.aggregations, - search, - centroid_geohash_grid_precision, - centroid_geohex_grid_precision, - centroid_geotile_grid_precision, - geometry_geohash_grid_precision, - geometry_geotile_grid_precision, - datetime_frequency_interval, - ) - except Exception as error: - if not isinstance(error, IndexError): - raise error - aggs = [] - if db_response: - result_aggs = db_response.get("aggregations", {}) - for agg in { - frozenset(item.items()): item - for item in supported_aggregations + self.GEO_POINT_AGGREGATIONS - }.values(): - if agg["name"] in aggregate_request.aggregations: - if agg["name"].endswith("_frequency"): - aggs.append( - self.frequency_agg( - result_aggs, agg["name"], agg["data_type"] - ) - ) - else: - aggs.append( - self.metric_agg(result_aggs, agg["name"], agg["data_type"]) - ) - links = [ - {"rel": "root", "type": "application/json", "href": base_url}, - ] - - if collection_id: - collection_endpoint = urljoin(base_url, f"collections/{collection_id}") - links.extend( - [ - { - "rel": "collection", - "type": "application/json", - "href": collection_endpoint, - }, - { - "rel": "self", - "type": "application/json", - "href": urljoin(collection_endpoint, "aggregate"), - }, - ] - ) - else: - links.append( - { - "rel": "self", - "type": "application/json", - "href": urljoin(base_url, "aggregate"), - } - ) - results = AggregationCollection( - type="AggregationCollection", aggregations=aggs, links=links - ) - - return results diff --git a/stac_fastapi/core/stac_fastapi/core/extensions/filter.py b/stac_fastapi/core/stac_fastapi/core/extensions/filter.py index 078e7fbf..08200030 100644 --- a/stac_fastapi/core/stac_fastapi/core/extensions/filter.py +++ b/stac_fastapi/core/stac_fastapi/core/extensions/filter.py @@ -1,4 +1,4 @@ -"""Filter extension logic for es conversion.""" +"""Filter extension logic for conversion.""" # """ # Implements Filter Extension. @@ -13,47 +13,55 @@ # defines spatial operators (S_INTERSECTS, S_CONTAINS, S_WITHIN, S_DISJOINT). # """ -import re from enum import Enum from typing import Any, Dict -_cql2_like_patterns = re.compile(r"\\.|[%_]|\\$") -_valid_like_substitutions = { - "\\\\": "\\", - "\\%": "%", - "\\_": "_", - "%": "*", - "_": "?", +DEFAULT_QUERYABLES: Dict[str, Dict[str, Any]] = { + "id": { + "description": "ID", + "$ref": "https://schemas.stacspec.org/v1.0.0/item-spec/json-schema/item.json#/definitions/core/allOf/2/properties/id", + }, + "collection": { + "description": "Collection", + "$ref": "https://schemas.stacspec.org/v1.0.0/item-spec/json-schema/item.json#/definitions/core/allOf/2/then/properties/collection", + }, + "geometry": { + "description": "Geometry", + "$ref": "https://schemas.stacspec.org/v1.0.0/item-spec/json-schema/item.json#/definitions/core/allOf/1/oneOf/0/properties/geometry", + }, + "datetime": { + "description": "Acquisition Timestamp", + "$ref": "https://schemas.stacspec.org/v1.0.0/item-spec/json-schema/datetime.json#/properties/datetime", + }, + "created": { + "description": "Creation Timestamp", + "$ref": "https://schemas.stacspec.org/v1.0.0/item-spec/json-schema/datetime.json#/properties/created", + }, + "updated": { + "description": "Creation Timestamp", + "$ref": "https://schemas.stacspec.org/v1.0.0/item-spec/json-schema/datetime.json#/properties/updated", + }, + "cloud_cover": { + "description": "Cloud Cover", + "$ref": "https://stac-extensions.github.io/eo/v1.0.0/schema.json#/definitions/fields/properties/eo:cloud_cover", + }, + "cloud_shadow_percentage": { + "title": "Cloud Shadow Percentage", + "description": "Cloud Shadow Percentage", + "type": "number", + "minimum": 0, + "maximum": 100, + }, + "nodata_pixel_percentage": { + "title": "No Data Pixel Percentage", + "description": "No Data Pixel Percentage", + "type": "number", + "minimum": 0, + "maximum": 100, + }, } -def _replace_like_patterns(match: re.Match) -> str: - pattern = match.group() - try: - return _valid_like_substitutions[pattern] - except KeyError: - raise ValueError(f"'{pattern}' is not a valid escape sequence") - - -def cql2_like_to_es(string: str) -> str: - """ - Convert CQL2 "LIKE" characters to Elasticsearch "wildcard" characters. - - Args: - string (str): The string containing CQL2 wildcard characters. - - Returns: - str: The converted string with Elasticsearch compatible wildcards. - - Raises: - ValueError: If an invalid escape sequence is encountered. - """ - return _cql2_like_patterns.sub( - repl=_replace_like_patterns, - string=string, - ) - - class LogicalOp(str, Enum): """Enumeration for logical operators used in constructing Elasticsearch queries.""" @@ -89,124 +97,3 @@ class SpatialOp(str, Enum): S_CONTAINS = "s_contains" S_WITHIN = "s_within" S_DISJOINT = "s_disjoint" - - -def to_es_field(queryables_mapping: Dict[str, Any], field: str) -> str: - """ - Map a given field to its corresponding Elasticsearch field according to a predefined mapping. - - Args: - field (str): The field name from a user query or filter. - - Returns: - str: The mapped field name suitable for Elasticsearch queries. - """ - return queryables_mapping.get(field, field) - - -def to_es(queryables_mapping: Dict[str, Any], query: Dict[str, Any]) -> Dict[str, Any]: - """ - Transform a simplified CQL2 query structure to an Elasticsearch compatible query DSL. - - Args: - query (Dict[str, Any]): The query dictionary containing 'op' and 'args'. - - Returns: - Dict[str, Any]: The corresponding Elasticsearch query in the form of a dictionary. - """ - if query["op"] in [LogicalOp.AND, LogicalOp.OR, LogicalOp.NOT]: - bool_type = { - LogicalOp.AND: "must", - LogicalOp.OR: "should", - LogicalOp.NOT: "must_not", - }[query["op"]] - return { - "bool": { - bool_type: [ - to_es(queryables_mapping, sub_query) for sub_query in query["args"] - ] - } - } - - elif query["op"] in [ - ComparisonOp.EQ, - ComparisonOp.NEQ, - ComparisonOp.LT, - ComparisonOp.LTE, - ComparisonOp.GT, - ComparisonOp.GTE, - ]: - range_op = { - ComparisonOp.LT: "lt", - ComparisonOp.LTE: "lte", - ComparisonOp.GT: "gt", - ComparisonOp.GTE: "gte", - } - - field = to_es_field(queryables_mapping, query["args"][0]["property"]) - value = query["args"][1] - if isinstance(value, dict) and "timestamp" in value: - value = value["timestamp"] - if query["op"] == ComparisonOp.EQ: - return {"range": {field: {"gte": value, "lte": value}}} - elif query["op"] == ComparisonOp.NEQ: - return { - "bool": { - "must_not": [{"range": {field: {"gte": value, "lte": value}}}] - } - } - else: - return {"range": {field: {range_op[query["op"]]: value}}} - else: - if query["op"] == ComparisonOp.EQ: - return {"term": {field: value}} - elif query["op"] == ComparisonOp.NEQ: - return {"bool": {"must_not": [{"term": {field: value}}]}} - else: - return {"range": {field: {range_op[query["op"]]: value}}} - - elif query["op"] == ComparisonOp.IS_NULL: - field = to_es_field(queryables_mapping, query["args"][0]["property"]) - return {"bool": {"must_not": {"exists": {"field": field}}}} - - elif query["op"] == AdvancedComparisonOp.BETWEEN: - field = to_es_field(queryables_mapping, query["args"][0]["property"]) - gte, lte = query["args"][1], query["args"][2] - if isinstance(gte, dict) and "timestamp" in gte: - gte = gte["timestamp"] - if isinstance(lte, dict) and "timestamp" in lte: - lte = lte["timestamp"] - return {"range": {field: {"gte": gte, "lte": lte}}} - - elif query["op"] == AdvancedComparisonOp.IN: - field = to_es_field(queryables_mapping, query["args"][0]["property"]) - values = query["args"][1] - if not isinstance(values, list): - raise ValueError(f"Arg {values} is not a list") - return {"terms": {field: values}} - - elif query["op"] == AdvancedComparisonOp.LIKE: - field = to_es_field(queryables_mapping, query["args"][0]["property"]) - pattern = cql2_like_to_es(query["args"][1]) - return {"wildcard": {field: {"value": pattern, "case_insensitive": True}}} - - elif query["op"] in [ - SpatialOp.S_INTERSECTS, - SpatialOp.S_CONTAINS, - SpatialOp.S_WITHIN, - SpatialOp.S_DISJOINT, - ]: - field = to_es_field(queryables_mapping, query["args"][0]["property"]) - geometry = query["args"][1] - - relation_mapping = { - SpatialOp.S_INTERSECTS: "intersects", - SpatialOp.S_CONTAINS: "contains", - SpatialOp.S_WITHIN: "within", - SpatialOp.S_DISJOINT: "disjoint", - } - - relation = relation_mapping[query["op"]] - return {"geo_shape": {field: {"shape": geometry, "relation": relation}}} - - return {} diff --git a/stac_fastapi/core/stac_fastapi/core/utilities.py b/stac_fastapi/core/stac_fastapi/core/utilities.py index d4a35109..be197f71 100644 --- a/stac_fastapi/core/stac_fastapi/core/utilities.py +++ b/stac_fastapi/core/stac_fastapi/core/utilities.py @@ -12,46 +12,6 @@ MAX_LIMIT = 10000 -def validate_refresh(value: Union[str, bool]) -> str: - """ - Validate the `refresh` parameter value. - - Args: - value (Union[str, bool]): The `refresh` parameter value, which can be a string or a boolean. - - Returns: - str: The validated value of the `refresh` parameter, which can be "true", "false", or "wait_for". - """ - logger = logging.getLogger(__name__) - - # Handle boolean-like values using get_bool_env - if isinstance(value, bool) or value in { - "true", - "false", - "1", - "0", - "yes", - "no", - "y", - "n", - }: - is_true = get_bool_env("DATABASE_REFRESH", default=value) - return "true" if is_true else "false" - - # Normalize to lowercase for case-insensitivity - value = value.lower() - - # Handle "wait_for" explicitly - if value == "wait_for": - return "wait_for" - - # Log a warning for invalid values and default to "false" - logger.warning( - f"Invalid value for `refresh`: '{value}'. Expected 'true', 'false', or 'wait_for'. Defaulting to 'false'." - ) - return "false" - - def get_bool_env(name: str, default: Union[bool, str] = False) -> bool: """ Retrieve a boolean value from an environment variable. diff --git a/stac_fastapi/core/stac_fastapi/core/version.py b/stac_fastapi/core/stac_fastapi/core/version.py index 1cd0ed04..e152cdff 100644 --- a/stac_fastapi/core/stac_fastapi/core/version.py +++ b/stac_fastapi/core/stac_fastapi/core/version.py @@ -1,2 +1,2 @@ """library version.""" -__version__ = "4.2.0" +__version__ = "5.0.0a0" diff --git a/stac_fastapi/elasticsearch/setup.py b/stac_fastapi/elasticsearch/setup.py index 06b8e880..34a139d6 100644 --- a/stac_fastapi/elasticsearch/setup.py +++ b/stac_fastapi/elasticsearch/setup.py @@ -6,7 +6,8 @@ desc = f.read() install_requires = [ - "stac-fastapi-core==4.2.0", + "stac-fastapi-core==5.0.0a0", + "sfeos-helpers==5.0.0a0", "elasticsearch[async]~=8.18.0", "uvicorn~=0.23.0", "starlette>=0.35.0,<0.36.0", diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/app.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/app.py index 35027a63..e9a420d3 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/app.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/app.py @@ -11,14 +11,12 @@ from stac_fastapi.core.core import ( BulkTransactionsClient, CoreClient, - EsAsyncBaseFiltersClient, TransactionsClient, ) from stac_fastapi.core.extensions import QueryExtension from stac_fastapi.core.extensions.aggregation import ( EsAggregationExtensionGetRequest, EsAggregationExtensionPostRequest, - EsAsyncAggregationClient, ) from stac_fastapi.core.extensions.fields import FieldsExtension from stac_fastapi.core.rate_limit import setup_rate_limit @@ -40,6 +38,8 @@ TransactionExtension, ) from stac_fastapi.extensions.third_party import BulkTransactionExtension +from stac_fastapi.sfeos_helpers.aggregation import EsAsyncBaseAggregationClient +from stac_fastapi.sfeos_helpers.filter import EsAsyncBaseFiltersClient logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -60,7 +60,7 @@ ) aggregation_extension = AggregationExtension( - client=EsAsyncAggregationClient( + client=EsAsyncBaseAggregationClient( database=database_logic, session=session, settings=settings ) ) @@ -106,7 +106,7 @@ api = StacApi( title=os.getenv("STAC_FASTAPI_TITLE", "stac-fastapi-elasticsearch"), description=os.getenv("STAC_FASTAPI_DESCRIPTION", "stac-fastapi-elasticsearch"), - api_version=os.getenv("STAC_FASTAPI_VERSION", "4.2.0"), + api_version=os.getenv("STAC_FASTAPI_VERSION", "5.0.0a0"), settings=settings, extensions=extensions, client=CoreClient( diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py index accbe8cc..d371c6a5 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py @@ -10,7 +10,8 @@ from elasticsearch import Elasticsearch # type: ignore[attr-defined] from stac_fastapi.core.base_settings import ApiBaseSettings -from stac_fastapi.core.utilities import get_bool_env, validate_refresh +from stac_fastapi.core.utilities import get_bool_env +from stac_fastapi.sfeos_helpers.database import validate_refresh from stac_fastapi.types.config import ApiSettings diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py index 958ee597..d529ce01 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py @@ -5,7 +5,7 @@ import logging from base64 import urlsafe_b64decode, urlsafe_b64encode from copy import deepcopy -from typing import Any, Dict, Iterable, List, Optional, Tuple, Type +from typing import Any, Dict, Iterable, List, Optional, Tuple, Type, Union import attr import elasticsearch.helpers as helpers @@ -14,29 +14,38 @@ from starlette.requests import Request from stac_fastapi.core.base_database_logic import BaseDatabaseLogic -from stac_fastapi.core.database_logic import ( - COLLECTIONS_INDEX, - DEFAULT_SORT, - ES_COLLECTIONS_MAPPINGS, - ES_ITEMS_MAPPINGS, - ES_ITEMS_SETTINGS, - ITEM_INDICES, - ITEMS_INDEX_PREFIX, - Geometry, +from stac_fastapi.core.serializers import CollectionSerializer, ItemSerializer +from stac_fastapi.core.utilities import MAX_LIMIT, bbox2polygon +from stac_fastapi.elasticsearch.config import AsyncElasticsearchSettings +from stac_fastapi.elasticsearch.config import ( + ElasticsearchSettings as SyncElasticsearchSettings, +) +from stac_fastapi.sfeos_helpers import filter +from stac_fastapi.sfeos_helpers.database import ( + apply_free_text_filter_shared, + apply_intersects_filter_shared, + create_index_templates_shared, + delete_item_index_shared, + get_queryables_mapping_shared, index_alias_by_collection_id, index_by_collection_id, indices, mk_actions, mk_item_id, + populate_sort_shared, + return_date, + validate_refresh, ) -from stac_fastapi.core.extensions import filter -from stac_fastapi.core.serializers import CollectionSerializer, ItemSerializer -from stac_fastapi.core.utilities import MAX_LIMIT, bbox2polygon, validate_refresh -from stac_fastapi.elasticsearch.config import AsyncElasticsearchSettings -from stac_fastapi.elasticsearch.config import ( - ElasticsearchSettings as SyncElasticsearchSettings, +from stac_fastapi.sfeos_helpers.mappings import ( + AGGREGATION_MAPPING, + COLLECTIONS_INDEX, + DEFAULT_SORT, + ITEM_INDICES, + ITEMS_INDEX_PREFIX, + Geometry, ) from stac_fastapi.types.errors import ConflictError, NotFoundError +from stac_fastapi.types.rfc3339 import DateTimeType from stac_fastapi.types.stac import Collection, Item logger = logging.getLogger(__name__) @@ -50,22 +59,7 @@ async def create_index_templates() -> None: None """ - client = AsyncElasticsearchSettings().create_client - await client.indices.put_index_template( - name=f"template_{COLLECTIONS_INDEX}", - body={ - "index_patterns": [f"{COLLECTIONS_INDEX}*"], - "template": {"mappings": ES_COLLECTIONS_MAPPINGS}, - }, - ) - await client.indices.put_index_template( - name=f"template_{ITEMS_INDEX_PREFIX}", - body={ - "index_patterns": [f"{ITEMS_INDEX_PREFIX}*"], - "template": {"settings": ES_ITEMS_SETTINGS, "mappings": ES_ITEMS_MAPPINGS}, - }, - ) - await client.close() + await create_index_templates_shared(settings=AsyncElasticsearchSettings()) async def create_collection_index() -> None: @@ -110,18 +104,13 @@ async def delete_item_index(collection_id: str): Args: collection_id (str): The ID of the collection whose items index will be deleted. - """ - client = AsyncElasticsearchSettings().create_client - name = index_alias_by_collection_id(collection_id) - resolved = await client.indices.resolve_index(name=name) - if "aliases" in resolved and resolved["aliases"]: - [alias] = resolved["aliases"] - await client.indices.delete_alias(index=alias["indices"], name=alias["name"]) - await client.indices.delete(index=alias["indices"]) - else: - await client.indices.delete(index=name) - await client.close() + Notes: + This function delegates to the shared implementation in delete_item_index_shared. + """ + await delete_item_index_shared( + settings=AsyncElasticsearchSettings(), collection_id=collection_id + ) @attr.s @@ -150,76 +139,7 @@ def __attrs_post_init__(self): extensions: List[str] = attr.ib(default=attr.Factory(list)) - aggregation_mapping: Dict[str, Dict[str, Any]] = { - "total_count": {"value_count": {"field": "id"}}, - "collection_frequency": {"terms": {"field": "collection", "size": 100}}, - "platform_frequency": {"terms": {"field": "properties.platform", "size": 100}}, - "cloud_cover_frequency": { - "range": { - "field": "properties.eo:cloud_cover", - "ranges": [ - {"to": 5}, - {"from": 5, "to": 15}, - {"from": 15, "to": 40}, - {"from": 40}, - ], - } - }, - "datetime_frequency": { - "date_histogram": { - "field": "properties.datetime", - "calendar_interval": "month", - } - }, - "datetime_min": {"min": {"field": "properties.datetime"}}, - "datetime_max": {"max": {"field": "properties.datetime"}}, - "grid_code_frequency": { - "terms": { - "field": "properties.grid:code", - "missing": "none", - "size": 10000, - } - }, - "sun_elevation_frequency": { - "histogram": {"field": "properties.view:sun_elevation", "interval": 5} - }, - "sun_azimuth_frequency": { - "histogram": {"field": "properties.view:sun_azimuth", "interval": 5} - }, - "off_nadir_frequency": { - "histogram": {"field": "properties.view:off_nadir", "interval": 5} - }, - "centroid_geohash_grid_frequency": { - "geohash_grid": { - "field": "properties.proj:centroid", - "precision": 1, - } - }, - "centroid_geohex_grid_frequency": { - "geohex_grid": { - "field": "properties.proj:centroid", - "precision": 0, - } - }, - "centroid_geotile_grid_frequency": { - "geotile_grid": { - "field": "properties.proj:centroid", - "precision": 0, - } - }, - "geometry_geohash_grid_frequency": { - "geohash_grid": { - "field": "geometry", - "precision": 1, - } - }, - "geometry_geotile_grid_frequency": { - "geotile_grid": { - "field": "geometry", - "precision": 0, - } - }, - } + aggregation_mapping: Dict[str, Dict[str, Any]] = AGGREGATION_MAPPING """CORE LOGIC""" @@ -300,23 +220,12 @@ async def get_queryables_mapping(self, collection_id: str = "*") -> dict: Returns: dict: A dictionary containing the Queryables mappings. """ - queryables_mapping = {} - mappings = await self.client.indices.get_mapping( index=f"{ITEMS_INDEX_PREFIX}{collection_id}", ) - - for mapping in mappings.values(): - fields = mapping["mappings"].get("properties", {}) - properties = fields.pop("properties", {}).get("properties", {}).keys() - - for field_key in fields: - queryables_mapping[field_key] = field_key - - for property_key in properties: - queryables_mapping[property_key] = f"properties.{property_key}" - - return queryables_mapping + return await get_queryables_mapping_shared( + collection_id=collection_id, mappings=mappings + ) @staticmethod def make_search(): @@ -334,17 +243,20 @@ def apply_collections_filter(search: Search, collection_ids: List[str]): return search.filter("terms", collection=collection_ids) @staticmethod - def apply_datetime_filter(search: Search, datetime_search: dict): + def apply_datetime_filter( + search: Search, interval: Optional[Union[DateTimeType, str]] + ): """Apply a filter to search on datetime, start_datetime, and end_datetime fields. Args: search (Search): The search object to filter. - datetime_search (dict): The datetime filter criteria. + interval: Optional[Union[DateTimeType, str]] Returns: Search: The filtered search object. """ should = [] + datetime_search = return_date(interval) # If the request is a single datetime return # items with datetimes equal to the requested datetime OR @@ -497,21 +409,8 @@ def apply_intersects_filter( Notes: A geo_shape filter is added to the search object, set to intersect with the specified geometry. """ - return search.filter( - Q( - { - "geo_shape": { - "geometry": { - "shape": { - "type": intersects.type.lower(), - "coordinates": intersects.coordinates, - }, - "relation": "intersects", - } - } - } - ) - ) + filter = apply_intersects_filter_shared(intersects=intersects) + return search.filter(Q(filter)) @staticmethod def apply_stacql_filter(search: Search, op: str, field: str, value: float): @@ -537,14 +436,21 @@ def apply_stacql_filter(search: Search, op: str, field: str, value: float): @staticmethod def apply_free_text_filter(search: Search, free_text_queries: Optional[List[str]]): - """Database logic to perform query for search endpoint.""" - if free_text_queries is not None: - free_text_query_string = '" OR properties.\\*:"'.join(free_text_queries) - search = search.query( - "query_string", query=f'properties.\\*:"{free_text_query_string}"' - ) + """Create a free text query for Elasticsearch queries. - return search + This method delegates to the shared implementation in apply_free_text_filter_shared. + + Args: + search (Search): The search object to apply the query to. + free_text_queries (Optional[List[str]]): A list of text strings to search for in the properties. + + Returns: + Search: The search object with the free text query applied, or the original search + object if no free_text_queries were provided. + """ + return apply_free_text_filter_shared( + search=search, free_text_queries=free_text_queries + ) async def apply_cql2_filter( self, search: Search, _filter: Optional[Dict[str, Any]] @@ -575,11 +481,18 @@ async def apply_cql2_filter( @staticmethod def populate_sort(sortby: List) -> Optional[Dict[str, Dict[str, str]]]: - """Database logic to sort search instance.""" - if sortby: - return {s.field: {"order": s.direction} for s in sortby} - else: - return None + """Create a sort configuration for Elasticsearch queries. + + This method delegates to the shared implementation in populate_sort_shared. + + Args: + sortby (List): A list of sort specifications, each containing a field and direction. + + Returns: + Optional[Dict[str, Dict[str, str]]]: A dictionary mapping field names to sort direction + configurations, or None if no sort was specified. + """ + return populate_sort_shared(sortby=sortby) async def execute_search( self, diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/version.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/version.py index 1cd0ed04..e152cdff 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/version.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/version.py @@ -1,2 +1,2 @@ """library version.""" -__version__ = "4.2.0" +__version__ = "5.0.0a0" diff --git a/stac_fastapi/opensearch/setup.py b/stac_fastapi/opensearch/setup.py index 7fe18f87..68586236 100644 --- a/stac_fastapi/opensearch/setup.py +++ b/stac_fastapi/opensearch/setup.py @@ -6,7 +6,8 @@ desc = f.read() install_requires = [ - "stac-fastapi-core==4.2.0", + "stac-fastapi-core==5.0.0a0", + "sfeos-helpers==5.0.0a0", "opensearch-py~=2.8.0", "opensearch-py[async]~=2.8.0", "uvicorn~=0.23.0", diff --git a/stac_fastapi/opensearch/stac_fastapi/opensearch/app.py b/stac_fastapi/opensearch/stac_fastapi/opensearch/app.py index 5273e598..bd2ec073 100644 --- a/stac_fastapi/opensearch/stac_fastapi/opensearch/app.py +++ b/stac_fastapi/opensearch/stac_fastapi/opensearch/app.py @@ -11,14 +11,12 @@ from stac_fastapi.core.core import ( BulkTransactionsClient, CoreClient, - EsAsyncBaseFiltersClient, TransactionsClient, ) from stac_fastapi.core.extensions import QueryExtension from stac_fastapi.core.extensions.aggregation import ( EsAggregationExtensionGetRequest, EsAggregationExtensionPostRequest, - EsAsyncAggregationClient, ) from stac_fastapi.core.extensions.fields import FieldsExtension from stac_fastapi.core.rate_limit import setup_rate_limit @@ -40,6 +38,8 @@ create_collection_index, create_index_templates, ) +from stac_fastapi.sfeos_helpers.aggregation import EsAsyncBaseAggregationClient +from stac_fastapi.sfeos_helpers.filter import EsAsyncBaseFiltersClient logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -60,7 +60,7 @@ ) aggregation_extension = AggregationExtension( - client=EsAsyncAggregationClient( + client=EsAsyncBaseAggregationClient( database=database_logic, session=session, settings=settings ) ) @@ -107,7 +107,7 @@ api = StacApi( title=os.getenv("STAC_FASTAPI_TITLE", "stac-fastapi-opensearch"), description=os.getenv("STAC_FASTAPI_DESCRIPTION", "stac-fastapi-opensearch"), - api_version=os.getenv("STAC_FASTAPI_VERSION", "4.2.0"), + api_version=os.getenv("STAC_FASTAPI_VERSION", "5.0.0a0"), settings=settings, extensions=extensions, client=CoreClient( diff --git a/stac_fastapi/opensearch/stac_fastapi/opensearch/config.py b/stac_fastapi/opensearch/stac_fastapi/opensearch/config.py index 3a53ffdf..d3811376 100644 --- a/stac_fastapi/opensearch/stac_fastapi/opensearch/config.py +++ b/stac_fastapi/opensearch/stac_fastapi/opensearch/config.py @@ -8,7 +8,8 @@ from opensearchpy import AsyncOpenSearch, OpenSearch from stac_fastapi.core.base_settings import ApiBaseSettings -from stac_fastapi.core.utilities import get_bool_env, validate_refresh +from stac_fastapi.core.utilities import get_bool_env +from stac_fastapi.sfeos_helpers.database import validate_refresh from stac_fastapi.types.config import ApiSettings diff --git a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py index 71ab9275..f93311f9 100644 --- a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py +++ b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py @@ -5,7 +5,7 @@ import logging from base64 import urlsafe_b64decode, urlsafe_b64encode from copy import deepcopy -from typing import Any, Dict, Iterable, List, Optional, Tuple, Type +from typing import Any, Dict, Iterable, List, Optional, Tuple, Type, Union import attr from opensearchpy import exceptions, helpers @@ -14,7 +14,30 @@ from starlette.requests import Request from stac_fastapi.core.base_database_logic import BaseDatabaseLogic -from stac_fastapi.core.database_logic import ( +from stac_fastapi.core.serializers import CollectionSerializer, ItemSerializer +from stac_fastapi.core.utilities import MAX_LIMIT, bbox2polygon +from stac_fastapi.opensearch.config import ( + AsyncOpensearchSettings as AsyncSearchSettings, +) +from stac_fastapi.opensearch.config import OpensearchSettings as SyncSearchSettings +from stac_fastapi.sfeos_helpers import filter +from stac_fastapi.sfeos_helpers.database import ( + apply_free_text_filter_shared, + apply_intersects_filter_shared, + create_index_templates_shared, + delete_item_index_shared, + get_queryables_mapping_shared, + index_alias_by_collection_id, + index_by_collection_id, + indices, + mk_actions, + mk_item_id, + populate_sort_shared, + return_date, + validate_refresh, +) +from stac_fastapi.sfeos_helpers.mappings import ( + AGGREGATION_MAPPING, COLLECTIONS_INDEX, DEFAULT_SORT, ES_COLLECTIONS_MAPPINGS, @@ -23,20 +46,9 @@ ITEM_INDICES, ITEMS_INDEX_PREFIX, Geometry, - index_alias_by_collection_id, - index_by_collection_id, - indices, - mk_actions, - mk_item_id, ) -from stac_fastapi.core.extensions import filter -from stac_fastapi.core.serializers import CollectionSerializer, ItemSerializer -from stac_fastapi.core.utilities import MAX_LIMIT, bbox2polygon, validate_refresh -from stac_fastapi.opensearch.config import ( - AsyncOpensearchSettings as AsyncSearchSettings, -) -from stac_fastapi.opensearch.config import OpensearchSettings as SyncSearchSettings from stac_fastapi.types.errors import ConflictError, NotFoundError +from stac_fastapi.types.rfc3339 import DateTimeType from stac_fastapi.types.stac import Collection, Item logger = logging.getLogger(__name__) @@ -50,23 +62,7 @@ async def create_index_templates() -> None: None """ - client = AsyncSearchSettings().create_client - await client.indices.put_template( - name=f"template_{COLLECTIONS_INDEX}", - body={ - "index_patterns": [f"{COLLECTIONS_INDEX}*"], - "mappings": ES_COLLECTIONS_MAPPINGS, - }, - ) - await client.indices.put_template( - name=f"template_{ITEMS_INDEX_PREFIX}", - body={ - "index_patterns": [f"{ITEMS_INDEX_PREFIX}*"], - "settings": ES_ITEMS_SETTINGS, - "mappings": ES_ITEMS_MAPPINGS, - }, - ) - await client.close() + await create_index_templates_shared(settings=AsyncSearchSettings()) async def create_collection_index() -> None: @@ -125,18 +121,13 @@ async def delete_item_index(collection_id: str) -> None: Args: collection_id (str): The ID of the collection whose items index will be deleted. - """ - client = AsyncSearchSettings().create_client - name = index_alias_by_collection_id(collection_id) - resolved = await client.indices.resolve_index(name=name) - if "aliases" in resolved and resolved["aliases"]: - [alias] = resolved["aliases"] - await client.indices.delete_alias(index=alias["indices"], name=alias["name"]) - await client.indices.delete(index=alias["indices"]) - else: - await client.indices.delete(index=name) - await client.close() + Notes: + This function delegates to the shared implementation in delete_item_index_shared. + """ + await delete_item_index_shared( + settings=AsyncSearchSettings(), collection_id=collection_id + ) @attr.s @@ -161,76 +152,7 @@ def __attrs_post_init__(self): extensions: List[str] = attr.ib(default=attr.Factory(list)) - aggregation_mapping: Dict[str, Dict[str, Any]] = { - "total_count": {"value_count": {"field": "id"}}, - "collection_frequency": {"terms": {"field": "collection", "size": 100}}, - "platform_frequency": {"terms": {"field": "properties.platform", "size": 100}}, - "cloud_cover_frequency": { - "range": { - "field": "properties.eo:cloud_cover", - "ranges": [ - {"to": 5}, - {"from": 5, "to": 15}, - {"from": 15, "to": 40}, - {"from": 40}, - ], - } - }, - "datetime_frequency": { - "date_histogram": { - "field": "properties.datetime", - "calendar_interval": "month", - } - }, - "datetime_min": {"min": {"field": "properties.datetime"}}, - "datetime_max": {"max": {"field": "properties.datetime"}}, - "grid_code_frequency": { - "terms": { - "field": "properties.grid:code", - "missing": "none", - "size": 10000, - } - }, - "sun_elevation_frequency": { - "histogram": {"field": "properties.view:sun_elevation", "interval": 5} - }, - "sun_azimuth_frequency": { - "histogram": {"field": "properties.view:sun_azimuth", "interval": 5} - }, - "off_nadir_frequency": { - "histogram": {"field": "properties.view:off_nadir", "interval": 5} - }, - "centroid_geohash_grid_frequency": { - "geohash_grid": { - "field": "properties.proj:centroid", - "precision": 1, - } - }, - "centroid_geohex_grid_frequency": { - "geohex_grid": { - "field": "properties.proj:centroid", - "precision": 0, - } - }, - "centroid_geotile_grid_frequency": { - "geotile_grid": { - "field": "properties.proj:centroid", - "precision": 0, - } - }, - "geometry_geohash_grid_frequency": { - "geohash_grid": { - "field": "geometry", - "precision": 1, - } - }, - "geometry_geotile_grid_frequency": { - "geotile_grid": { - "field": "geometry", - "precision": 0, - } - }, - } + aggregation_mapping: Dict[str, Dict[str, Any]] = AGGREGATION_MAPPING """CORE LOGIC""" @@ -317,23 +239,12 @@ async def get_queryables_mapping(self, collection_id: str = "*") -> dict: Returns: dict: A dictionary containing the Queryables mappings. """ - queryables_mapping = {} - mappings = await self.client.indices.get_mapping( index=f"{ITEMS_INDEX_PREFIX}{collection_id}", ) - - for mapping in mappings.values(): - fields = mapping["mappings"].get("properties", {}) - properties = fields.pop("properties", {}).get("properties", {}).keys() - - for field_key in fields: - queryables_mapping[field_key] = field_key - - for property_key in properties: - queryables_mapping[property_key] = f"properties.{property_key}" - - return queryables_mapping + return await get_queryables_mapping_shared( + collection_id=collection_id, mappings=mappings + ) @staticmethod def make_search(): @@ -352,27 +263,37 @@ def apply_collections_filter(search: Search, collection_ids: List[str]): @staticmethod def apply_free_text_filter(search: Search, free_text_queries: Optional[List[str]]): - """Database logic to perform query for search endpoint.""" - if free_text_queries is not None: - free_text_query_string = '" OR properties.\\*:"'.join(free_text_queries) - search = search.query( - "query_string", query=f'properties.\\*:"{free_text_query_string}"' - ) + """Create a free text query for OpenSearch queries. - return search + This method delegates to the shared implementation in apply_free_text_filter_shared. + + Args: + search (Search): The search object to apply the query to. + free_text_queries (Optional[List[str]]): A list of text strings to search for in the properties. + + Returns: + Search: The search object with the free text query applied, or the original search + object if no free_text_queries were provided. + """ + return apply_free_text_filter_shared( + search=search, free_text_queries=free_text_queries + ) @staticmethod - def apply_datetime_filter(search: Search, datetime_search): + def apply_datetime_filter( + search: Search, interval: Optional[Union[DateTimeType, str]] + ): """Apply a filter to search based on datetime field, start_datetime, and end_datetime fields. Args: search (Search): The search object to filter. - datetime_search (dict): The datetime filter criteria. + interval: Optional[Union[DateTimeType, str]] Returns: Search: The filtered search object. """ should = [] + datetime_search = return_date(interval) # If the request is a single datetime return # items with datetimes equal to the requested datetime OR @@ -525,21 +446,8 @@ def apply_intersects_filter( Notes: A geo_shape filter is added to the search object, set to intersect with the specified geometry. """ - return search.filter( - Q( - { - "geo_shape": { - "geometry": { - "shape": { - "type": intersects.type.lower(), - "coordinates": intersects.coordinates, - }, - "relation": "intersects", - } - } - } - ) - ) + filter = apply_intersects_filter_shared(intersects=intersects) + return search.filter(Q(filter)) @staticmethod def apply_stacql_filter(search: Search, op: str, field: str, value: float): @@ -592,11 +500,18 @@ async def apply_cql2_filter( @staticmethod def populate_sort(sortby: List) -> Optional[Dict[str, Dict[str, str]]]: - """Database logic to sort search instance.""" - if sortby: - return {s.field: {"order": s.direction} for s in sortby} - else: - return None + """Create a sort configuration for OpenSearch queries. + + This method delegates to the shared implementation in populate_sort_shared. + + Args: + sortby (List): A list of sort specifications, each containing a field and direction. + + Returns: + Optional[Dict[str, Dict[str, str]]]: A dictionary mapping field names to sort direction + configurations, or None if no sort was specified. + """ + return populate_sort_shared(sortby=sortby) async def execute_search( self, diff --git a/stac_fastapi/opensearch/stac_fastapi/opensearch/version.py b/stac_fastapi/opensearch/stac_fastapi/opensearch/version.py index 1cd0ed04..e152cdff 100644 --- a/stac_fastapi/opensearch/stac_fastapi/opensearch/version.py +++ b/stac_fastapi/opensearch/stac_fastapi/opensearch/version.py @@ -1,2 +1,2 @@ """library version.""" -__version__ = "4.2.0" +__version__ = "5.0.0a0" diff --git a/stac_fastapi/sfeos_helpers/README.md b/stac_fastapi/sfeos_helpers/README.md new file mode 120000 index 00000000..fe840054 --- /dev/null +++ b/stac_fastapi/sfeos_helpers/README.md @@ -0,0 +1 @@ +../../README.md \ No newline at end of file diff --git a/stac_fastapi/sfeos_helpers/setup.cfg b/stac_fastapi/sfeos_helpers/setup.cfg new file mode 100644 index 00000000..a3210acb --- /dev/null +++ b/stac_fastapi/sfeos_helpers/setup.cfg @@ -0,0 +1,2 @@ +[metadata] +version = attr: stac_fastapi.sfeos_helpers.version.__version__ diff --git a/stac_fastapi/sfeos_helpers/setup.py b/stac_fastapi/sfeos_helpers/setup.py new file mode 100644 index 00000000..7228a41c --- /dev/null +++ b/stac_fastapi/sfeos_helpers/setup.py @@ -0,0 +1,34 @@ +"""stac_fastapi: helpers elasticsearch/ opensearch module.""" + +from setuptools import find_namespace_packages, setup + +with open("README.md") as f: + desc = f.read() + +install_requires = [ + "stac-fastapi.core==5.0.0a0", +] + +setup( + name="sfeos_helpers", + description="Helper library for the Elasticsearch and Opensearch stac-fastapi backends.", + long_description=desc, + long_description_content_type="text/markdown", + python_requires=">=3.9", + classifiers=[ + "Intended Audience :: Developers", + "Intended Audience :: Information Technology", + "Intended Audience :: Science/Research", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "License :: OSI Approved :: MIT License", + ], + url="https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch", + license="MIT", + packages=find_namespace_packages(), + zip_safe=False, + install_requires=install_requires, +) diff --git a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/aggregation/README.md b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/aggregation/README.md new file mode 100644 index 00000000..253855b4 --- /dev/null +++ b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/aggregation/README.md @@ -0,0 +1,57 @@ +# STAC FastAPI Aggregation Package + +This package contains shared aggregation functionality used by both the Elasticsearch and OpenSearch implementations of STAC FastAPI. It helps reduce code duplication and ensures consistent behavior between the two implementations. + +## Package Structure + +The aggregation package is organized into three main modules: + +- **client.py**: Contains the base aggregation client implementation + - `EsAsyncBaseAggregationClient`: The main class that implements the STAC aggregation extension for Elasticsearch/OpenSearch + - Methods for handling aggregation requests, validating parameters, and formatting responses + +- **format.py**: Contains functions for formatting aggregation responses + - `frequency_agg`: Formats frequency distribution aggregation responses + - `metric_agg`: Formats metric aggregation responses + +- **__init__.py**: Package initialization and exports + - Exports the main classes and functions for use by other modules + +## Features + +The aggregation package provides the following features: + +- Support for various aggregation types: + - Datetime frequency + - Collection frequency + - Property frequency + - Geospatial grid aggregations (geohash, geohex, geotile) + - Metric aggregations (min, max, etc.) + +- Parameter validation: + - Precision validation for geospatial aggregations + - Interval validation for datetime aggregations + +- Response formatting: + - Consistent response structure + - Proper typing and documentation + +## Usage + +The aggregation package is used by the Elasticsearch and OpenSearch implementations to provide aggregation functionality for STAC API. The main entry point is the `EsAsyncBaseAggregationClient` class, which is instantiated in the respective app.py files. + +Example: +```python +from stac_fastapi.sfeos_helpers.aggregation import EsAsyncBaseAggregationClient + +# Create an instance of the aggregation client +aggregation_client = EsAsyncBaseAggregationClient(database) + +# Register the aggregation extension with the API +api = StacApi( + ..., + extensions=[ + ..., + AggregationExtension(client=aggregation_client), + ], +) \ No newline at end of file diff --git a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/aggregation/__init__.py b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/aggregation/__init__.py new file mode 100644 index 00000000..2beeff67 --- /dev/null +++ b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/aggregation/__init__.py @@ -0,0 +1,31 @@ +"""Shared aggregation extension methods for stac-fastapi elasticsearch and opensearch backends. + +This module provides shared functionality for implementing the STAC API Aggregation Extension +with Elasticsearch and OpenSearch. It includes: + +1. Functions for formatting aggregation responses +2. Helper functions for handling aggregation parameters +3. Base implementation of the AsyncBaseAggregationClient for Elasticsearch/OpenSearch + +The aggregation package is organized as follows: +- client.py: Aggregation client implementation +- format.py: Response formatting functions + +When adding new functionality to this package, consider: +1. Will this code be used by both Elasticsearch and OpenSearch implementations? +2. Is the functionality stable and unlikely to diverge between implementations? +3. Is the function well-documented with clear input/output contracts? + +Function Naming Conventions: +- Function names should be descriptive and indicate their purpose +- Parameter names should be consistent across similar functions +""" + +from .client import EsAsyncBaseAggregationClient +from .format import frequency_agg, metric_agg + +__all__ = [ + "EsAsyncBaseAggregationClient", + "frequency_agg", + "metric_agg", +] diff --git a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/aggregation/client.py b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/aggregation/client.py new file mode 100644 index 00000000..1f335245 --- /dev/null +++ b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/aggregation/client.py @@ -0,0 +1,469 @@ +"""Client implementation for the STAC API Aggregation Extension.""" + +from pathlib import Path +from typing import Annotated, Any, Dict, List, Optional, Union +from urllib.parse import unquote_plus, urljoin + +import attr +import orjson +from fastapi import HTTPException, Request +from pygeofilter.backends.cql2_json import to_cql2 +from pygeofilter.parsers.cql2_text import parse as parse_cql2_text +from stac_pydantic.shared import BBox + +from stac_fastapi.core.base_database_logic import BaseDatabaseLogic +from stac_fastapi.core.base_settings import ApiBaseSettings +from stac_fastapi.core.datetime_utils import format_datetime_range +from stac_fastapi.core.extensions.aggregation import EsAggregationExtensionPostRequest +from stac_fastapi.core.session import Session +from stac_fastapi.extensions.core.aggregation.client import AsyncBaseAggregationClient +from stac_fastapi.extensions.core.aggregation.types import ( + Aggregation, + AggregationCollection, +) +from stac_fastapi.types.rfc3339 import DateTimeType + +from .format import frequency_agg, metric_agg + + +@attr.s +class EsAsyncBaseAggregationClient(AsyncBaseAggregationClient): + """Defines a pattern for implementing the STAC aggregation extension with Elasticsearch/OpenSearch.""" + + database: BaseDatabaseLogic = attr.ib() + settings: ApiBaseSettings = attr.ib() + session: Session = attr.ib(default=attr.Factory(Session.create_from_env)) + + # Default aggregations to use if none are specified + DEFAULT_AGGREGATIONS = [ + {"name": "total_count", "data_type": "integer"}, + {"name": "datetime_max", "data_type": "datetime"}, + {"name": "datetime_min", "data_type": "datetime"}, + { + "name": "datetime_frequency", + "data_type": "frequency_distribution", + "frequency_distribution_data_type": "datetime", + }, + { + "name": "collection_frequency", + "data_type": "frequency_distribution", + "frequency_distribution_data_type": "string", + }, + { + "name": "geometry_geohash_grid_frequency", + "data_type": "frequency_distribution", + "frequency_distribution_data_type": "string", + }, + { + "name": "geometry_geotile_grid_frequency", + "data_type": "frequency_distribution", + "frequency_distribution_data_type": "string", + }, + ] + + # Geo point aggregations + GEO_POINT_AGGREGATIONS = [ + { + "name": "grid_code_frequency", + "data_type": "frequency_distribution", + "frequency_distribution_data_type": "string", + }, + ] + + # Supported datetime intervals + SUPPORTED_DATETIME_INTERVAL = [ + "year", + "quarter", + "month", + "week", + "day", + "hour", + "minute", + "second", + ] + + # Default datetime interval + DEFAULT_DATETIME_INTERVAL = "month" + + # Maximum precision values + MAX_GEOHASH_PRECISION = 12 + MAX_GEOHEX_PRECISION = 15 + MAX_GEOTILE_PRECISION = 29 + + async def get_aggregations( + self, collection_id: Optional[str] = None, **kwargs + ) -> Dict[str, Any]: + """Get the available aggregations for a catalog or collection defined in the STAC JSON. + + If no aggregations are defined, default aggregations are used. + + Args: + collection_id: Optional collection ID to get aggregations for + **kwargs: Additional keyword arguments + + Returns: + Dict[str, Any]: A dictionary containing the available aggregations + """ + request: Request = kwargs.get("request") + base_url = str(request.base_url) if request else "" + links = [{"rel": "root", "type": "application/json", "href": base_url}] + + if collection_id is not None: + collection_endpoint = urljoin(base_url, f"collections/{collection_id}") + links.extend( + [ + { + "rel": "collection", + "type": "application/json", + "href": collection_endpoint, + }, + { + "rel": "self", + "type": "application/json", + "href": urljoin(collection_endpoint + "/", "aggregations"), + }, + ] + ) + if await self.database.check_collection_exists(collection_id) is None: + collection = await self.database.find_collection(collection_id) + aggregations = collection.get( + "aggregations", self.DEFAULT_AGGREGATIONS.copy() + ) + else: + raise IndexError(f"Collection {collection_id} does not exist") + else: + links.append( + { + "rel": "self", + "type": "application/json", + "href": urljoin(base_url, "aggregations"), + } + ) + aggregations = self.DEFAULT_AGGREGATIONS.copy() + + return { + "type": "AggregationCollection", + "aggregations": aggregations, + "links": links, + } + + def extract_precision( + self, precision: Union[int, None], min_value: int, max_value: int + ) -> int: + """Ensure that the aggregation precision value is within a valid range. + + Args: + precision: The precision value to validate + min_value: The minimum allowed precision value + max_value: The maximum allowed precision value + + Returns: + int: A validated precision value + + Raises: + HTTPException: If the precision is outside the valid range + """ + if precision is None: + return min_value + if precision < min_value or precision > max_value: + raise HTTPException( + status_code=400, + detail=f"Invalid precision value. Must be between {min_value} and {max_value}", + ) + return precision + + def extract_date_histogram_interval(self, value: Optional[str]) -> str: + """Ensure that the interval for the date histogram is valid. + + If no value is provided, the default will be returned. + + Args: + value: The interval value to validate + + Returns: + str: A validated date histogram interval + + Raises: + HTTPException: If the supplied value is not in the supported intervals + """ + if value is not None: + if value not in self.SUPPORTED_DATETIME_INTERVAL: + raise HTTPException( + status_code=400, + detail=f"Invalid datetime interval. Must be one of {self.SUPPORTED_DATETIME_INTERVAL}", + ) + else: + return value + else: + return self.DEFAULT_DATETIME_INTERVAL + + def get_filter(self, filter, filter_lang): + """Format the filter parameter in cql2-json or cql2-text. + + Args: + filter: The filter expression + filter_lang: The filter language (cql2-json or cql2-text) + + Returns: + dict: A formatted filter expression + + Raises: + HTTPException: If the filter language is not supported + """ + if filter_lang == "cql2-text": + return orjson.loads(to_cql2(parse_cql2_text(filter))) + elif filter_lang == "cql2-json": + if isinstance(filter, str): + return orjson.loads(unquote_plus(filter)) + else: + return filter + else: + raise HTTPException( + status_code=400, + detail=f"Unknown filter-lang: {filter_lang}. Only cql2-json or cql2-text are supported.", + ) + + async def aggregate( + self, + aggregate_request: Optional[EsAggregationExtensionPostRequest] = None, + collection_id: Optional[ + Annotated[str, Path(description="Collection ID")] + ] = None, + collections: Optional[List[str]] = [], + datetime: Optional[DateTimeType] = None, + intersects: Optional[str] = None, + filter_lang: Optional[str] = None, + filter_expr: Optional[str] = None, + aggregations: Optional[str] = None, + ids: Optional[List[str]] = None, + bbox: Optional[BBox] = None, + centroid_geohash_grid_frequency_precision: Optional[int] = None, + centroid_geohex_grid_frequency_precision: Optional[int] = None, + centroid_geotile_grid_frequency_precision: Optional[int] = None, + geometry_geohash_grid_frequency_precision: Optional[int] = None, + geometry_geotile_grid_frequency_precision: Optional[int] = None, + datetime_frequency_interval: Optional[str] = None, + **kwargs, + ) -> Union[Dict, Exception]: + """Get aggregations from the database.""" + request: Request = kwargs["request"] + base_url = str(request.base_url) + path = request.url.path + search = self.database.make_search() + + if aggregate_request is None: + + base_args = { + "collections": collections, + "ids": ids, + "bbox": bbox, + "aggregations": aggregations, + "centroid_geohash_grid_frequency_precision": centroid_geohash_grid_frequency_precision, + "centroid_geohex_grid_frequency_precision": centroid_geohex_grid_frequency_precision, + "centroid_geotile_grid_frequency_precision": centroid_geotile_grid_frequency_precision, + "geometry_geohash_grid_frequency_precision": geometry_geohash_grid_frequency_precision, + "geometry_geotile_grid_frequency_precision": geometry_geotile_grid_frequency_precision, + "datetime_frequency_interval": datetime_frequency_interval, + } + + if collection_id: + collections = [str(collection_id)] + + if intersects: + base_args["intersects"] = orjson.loads(unquote_plus(intersects)) + + if datetime: + base_args["datetime"] = format_datetime_range(datetime) + + if filter_expr: + base_args["filter"] = self.get_filter(filter_expr, filter_lang) + aggregate_request = EsAggregationExtensionPostRequest(**base_args) + else: + # Workaround for optional path param in POST requests + if "collections" in path: + collection_id = path.split("/")[2] + + filter_lang = "cql2-json" + if aggregate_request.filter_expr: + aggregate_request.filter_expr = self.get_filter( + aggregate_request.filter_expr, filter_lang + ) + + if collection_id: + if aggregate_request.collections: + raise HTTPException( + status_code=400, + detail="Cannot query multiple collections when executing '/collections//aggregate'. Use '/aggregate' and the collections field instead", + ) + else: + aggregate_request.collections = [collection_id] + + if ( + aggregate_request.aggregations is None + or aggregate_request.aggregations == [] + ): + raise HTTPException( + status_code=400, + detail="No 'aggregations' found. Use '/aggregations' to return available aggregations", + ) + + if aggregate_request.ids: + search = self.database.apply_ids_filter( + search=search, item_ids=aggregate_request.ids + ) + + if aggregate_request.datetime: + search = self.database.apply_datetime_filter( + search=search, interval=aggregate_request.datetime + ) + + if aggregate_request.bbox: + bbox = aggregate_request.bbox + if len(bbox) == 6: + bbox = [bbox[0], bbox[1], bbox[3], bbox[4]] + + search = self.database.apply_bbox_filter(search=search, bbox=bbox) + + if aggregate_request.intersects: + search = self.database.apply_intersects_filter( + search=search, intersects=aggregate_request.intersects + ) + + if aggregate_request.collections: + search = self.database.apply_collections_filter( + search=search, collection_ids=aggregate_request.collections + ) + # validate that aggregations are supported for all collections + for collection_id in aggregate_request.collections: + aggregation_info = await self.get_aggregations( + collection_id=collection_id, request=request + ) + supported_aggregations = ( + aggregation_info["aggregations"] + self.DEFAULT_AGGREGATIONS + ) + + for agg_name in aggregate_request.aggregations: + if agg_name not in set([x["name"] for x in supported_aggregations]): + raise HTTPException( + status_code=400, + detail=f"Aggregation {agg_name} not supported by collection {collection_id}", + ) + else: + # Validate that the aggregations requested are supported by the catalog + aggregation_info = await self.get_aggregations(request=request) + supported_aggregations = aggregation_info["aggregations"] + for agg_name in aggregate_request.aggregations: + if agg_name not in [x["name"] for x in supported_aggregations]: + raise HTTPException( + status_code=400, + detail=f"Aggregation {agg_name} not supported at catalog level", + ) + + if aggregate_request.filter_expr: + try: + search = await self.database.apply_cql2_filter( + search, aggregate_request.filter_expr + ) + except Exception as e: + raise HTTPException( + status_code=400, detail=f"Error with cql2 filter: {e}" + ) + + centroid_geohash_grid_precision = self.extract_precision( + aggregate_request.centroid_geohash_grid_frequency_precision, + 1, + self.MAX_GEOHASH_PRECISION, + ) + + centroid_geohex_grid_precision = self.extract_precision( + aggregate_request.centroid_geohex_grid_frequency_precision, + 0, + self.MAX_GEOHEX_PRECISION, + ) + + centroid_geotile_grid_precision = self.extract_precision( + aggregate_request.centroid_geotile_grid_frequency_precision, + 0, + self.MAX_GEOTILE_PRECISION, + ) + + geometry_geohash_grid_precision = self.extract_precision( + aggregate_request.geometry_geohash_grid_frequency_precision, + 1, + self.MAX_GEOHASH_PRECISION, + ) + + geometry_geotile_grid_precision = self.extract_precision( + aggregate_request.geometry_geotile_grid_frequency_precision, + 0, + self.MAX_GEOTILE_PRECISION, + ) + + datetime_frequency_interval = self.extract_date_histogram_interval( + aggregate_request.datetime_frequency_interval, + ) + + try: + db_response = await self.database.aggregate( + collections, + aggregate_request.aggregations, + search, + centroid_geohash_grid_precision, + centroid_geohex_grid_precision, + centroid_geotile_grid_precision, + geometry_geohash_grid_precision, + geometry_geotile_grid_precision, + datetime_frequency_interval, + ) + except Exception as error: + if not isinstance(error, IndexError): + raise error + aggs: List[Aggregation] = [] + if db_response: + result_aggs = db_response.get("aggregations", {}) + for agg in { + frozenset(item.items()): item + for item in supported_aggregations + self.GEO_POINT_AGGREGATIONS + }.values(): + if agg["name"] in aggregate_request.aggregations: + if agg["name"].endswith("_frequency"): + aggs.append( + frequency_agg(result_aggs, agg["name"], agg["data_type"]) + ) + else: + aggs.append( + metric_agg(result_aggs, agg["name"], agg["data_type"]) + ) + links = [ + {"rel": "root", "type": "application/json", "href": base_url}, + ] + + if collection_id: + collection_endpoint = urljoin(base_url, f"collections/{collection_id}") + links.extend( + [ + { + "rel": "collection", + "type": "application/json", + "href": collection_endpoint, + }, + { + "rel": "self", + "type": "application/json", + "href": urljoin(collection_endpoint, "aggregate"), + }, + ] + ) + else: + links.append( + { + "rel": "self", + "type": "application/json", + "href": urljoin(base_url, "aggregate"), + } + ) + results = AggregationCollection( + type="AggregationCollection", aggregations=aggs, links=links + ) + + return results diff --git a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/aggregation/format.py b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/aggregation/format.py new file mode 100644 index 00000000..9553ede4 --- /dev/null +++ b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/aggregation/format.py @@ -0,0 +1,60 @@ +"""Formatting functions for aggregation responses.""" + +from datetime import datetime +from typing import Any, Dict + +from stac_fastapi.core.datetime_utils import datetime_to_str +from stac_fastapi.extensions.core.aggregation.types import Aggregation + + +def frequency_agg(es_aggs: Dict[str, Any], name: str, data_type: str) -> Aggregation: + """Format an aggregation for a frequency distribution aggregation. + + Args: + es_aggs: The Elasticsearch/OpenSearch aggregation response + name: The name of the aggregation + data_type: The data type of the aggregation + + Returns: + Aggregation: A formatted aggregation response + """ + buckets = [] + for bucket in es_aggs.get(name, {}).get("buckets", []): + bucket_data = { + "key": bucket.get("key_as_string") or bucket.get("key"), + "data_type": data_type, + "frequency": bucket.get("doc_count"), + "to": bucket.get("to"), + "from": bucket.get("from"), + } + buckets.append(bucket_data) + return Aggregation( + name=name, + data_type="frequency_distribution", + overflow=es_aggs.get(name, {}).get("sum_other_doc_count", 0), + buckets=buckets, + ) + + +def metric_agg(es_aggs: Dict[str, Any], name: str, data_type: str) -> Aggregation: + """Format an aggregation for a metric aggregation. + + Args: + es_aggs: The Elasticsearch/OpenSearch aggregation response + name: The name of the aggregation + data_type: The data type of the aggregation + + Returns: + Aggregation: A formatted aggregation response + """ + value = es_aggs.get(name, {}).get("value_as_string") or es_aggs.get(name, {}).get( + "value" + ) + # ES 7.x does not return datetimes with a 'value_as_string' field + if "datetime" in name and isinstance(value, float): + value = datetime_to_str(datetime.fromtimestamp(value / 1e3)) + return Aggregation( + name=name, + data_type=data_type, + value=value, + ) diff --git a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/README.md b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/README.md new file mode 100644 index 00000000..5f4a6ada --- /dev/null +++ b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/README.md @@ -0,0 +1,61 @@ +# STAC FastAPI Database Package + +This package contains shared database operations used by both the Elasticsearch and OpenSearch +implementations of STAC FastAPI. It helps reduce code duplication and ensures consistent behavior +between the two implementations. + +## Package Structure + +The database package is organized into five main modules: + +- **index.py**: Contains functions for managing indices + - [create_index_templates_shared](cci:1://file:///home/computer/Code/stac-fastapi-elasticsearch-opensearch/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database_logic_helpers.py:15:0-48:33): Creates index templates for Collections and Items + - [delete_item_index_shared](cci:1://file:///home/computer/Code/stac-fastapi-elasticsearch-opensearch/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database_logic_helpers.py:128:0-153:30): Deletes an item index for a collection + - [index_by_collection_id](cci:1://file:///home/computer/Code/stac-fastapi-elasticsearch-opensearch/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/utilities.py:86:0-100:5): Translates a collection ID into an index name + - [index_alias_by_collection_id](cci:1://file:///home/computer/Code/stac-fastapi-elasticsearch-opensearch/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/utilities.py:103:0-115:5): Translates a collection ID into an index alias + - [indices](cci:1://file:///home/computer/Code/stac-fastapi-elasticsearch-opensearch/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/utilities.py:118:0-132:5): Gets a comma-separated string of index names + +- **query.py**: Contains functions for building and manipulating queries + - [apply_free_text_filter_shared](cci:1://file:///home/computer/Code/stac-fastapi-elasticsearch-opensearch/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database_logic_helpers.py:51:0-74:16): Applies a free text filter to a search + - [apply_intersects_filter_shared](cci:1://file:///home/computer/Code/stac-fastapi-elasticsearch-opensearch/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database_logic_helpers.py:77:0-104:5): Creates a geo_shape filter for intersecting geometry + - [populate_sort_shared](cci:1://file:///home/computer/Code/stac-fastapi-elasticsearch-opensearch/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database_logic_helpers.py:107:0-125:16): Creates a sort configuration for queries + +- **mapping.py**: Contains functions for working with mappings + - [get_queryables_mapping_shared](cci:1://file:///home/computer/Code/stac-fastapi-elasticsearch-opensearch/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database_logic_helpers.py:156:0-185:27): Retrieves mapping of Queryables for search + +- **document.py**: Contains functions for working with documents + - [mk_item_id](cci:1://file:///home/computer/Code/stac-fastapi-elasticsearch-opensearch/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/utilities.py:140:0-150:5): Creates a document ID for an Item + - [mk_actions](cci:1://file:///home/computer/Code/stac-fastapi-elasticsearch-opensearch/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/utilities.py:153:0-175:5): Creates bulk actions for indexing items + +- **utils.py**: Contains utility functions for database operations + - [validate_refresh](cci:1://file:///home/computer/Code/stac-fastapi-elasticsearch-opensearch/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/utilities.py:41:0-78:5): Validates the refresh parameter value + +## Usage + +Import the necessary components from the database package: + +```python +from stac_fastapi.sfeos_helpers.database import ( + # Index operations + create_index_templates_shared, + delete_item_index_shared, + index_alias_by_collection_id, + index_by_collection_id, + indices, + + # Query operations + apply_free_text_filter_shared, + apply_intersects_filter_shared, + populate_sort_shared, + + # Mapping operations + get_queryables_mapping_shared, + + # Document operations + mk_item_id, + mk_actions, + + # Utility functions + validate_refresh, +) +``` diff --git a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/__init__.py b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/__init__.py new file mode 100644 index 00000000..31bf28d8 --- /dev/null +++ b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/__init__.py @@ -0,0 +1,71 @@ +"""Shared database operations for stac-fastapi elasticsearch and opensearch backends. + +This module provides shared database functionality used by both the Elasticsearch and OpenSearch +implementations of STAC FastAPI. It includes: + +1. Index management functions for creating and deleting indices +2. Query building functions for constructing search queries +3. Mapping functions for working with Elasticsearch/OpenSearch mappings +4. Document operations for working with documents +5. Utility functions for database operations +6. Datetime utilities for query formatting + +The database package is organized as follows: +- index.py: Index management functions +- query.py: Query building functions +- mapping.py: Mapping functions +- document.py: Document operations +- utils.py: Utility functions +- datetime.py: Datetime utilities for query formatting + +When adding new functionality to this package, consider: +1. Will this code be used by both Elasticsearch and OpenSearch implementations? +2. Is the functionality stable and unlikely to diverge between implementations? +3. Is the function well-documented with clear input/output contracts? + +Function Naming Conventions: +- All shared functions should end with `_shared` to clearly indicate they're meant to be used by both implementations +- Function names should be descriptive and indicate their purpose +- Parameter names should be consistent across similar functions +""" + +# Re-export all functions for backward compatibility +from .datetime import return_date +from .document import mk_actions, mk_item_id +from .index import ( + create_index_templates_shared, + delete_item_index_shared, + index_alias_by_collection_id, + index_by_collection_id, + indices, +) +from .mapping import get_queryables_mapping_shared +from .query import ( + apply_free_text_filter_shared, + apply_intersects_filter_shared, + populate_sort_shared, +) +from .utils import get_bool_env, validate_refresh + +__all__ = [ + # Index operations + "create_index_templates_shared", + "delete_item_index_shared", + "index_alias_by_collection_id", + "index_by_collection_id", + "indices", + # Query operations + "apply_free_text_filter_shared", + "apply_intersects_filter_shared", + "populate_sort_shared", + # Mapping operations + "get_queryables_mapping_shared", + # Document operations + "mk_item_id", + "mk_actions", + # Utility functions + "validate_refresh", + "get_bool_env", + # Datetime utilities + "return_date", +] diff --git a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/datetime.py b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/datetime.py new file mode 100644 index 00000000..352ed4b5 --- /dev/null +++ b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/datetime.py @@ -0,0 +1,60 @@ +"""Elasticsearch/OpenSearch-specific datetime utilities. + +This module provides datetime utility functions specifically designed for +Elasticsearch and OpenSearch query formatting. +""" + +from datetime import datetime as datetime_type +from typing import Dict, Optional, Union + +from stac_fastapi.types.rfc3339 import DateTimeType + + +def return_date( + interval: Optional[Union[DateTimeType, str]] +) -> Dict[str, Optional[str]]: + """ + Convert a date interval to an Elasticsearch/OpenSearch query format. + + This function converts a date interval (which may be a datetime, a tuple of one or two datetimes, + a string representing a datetime or range, or None) into a dictionary for filtering + search results with Elasticsearch/OpenSearch. + + This function ensures the output dictionary contains 'gte' and 'lte' keys, + even if they are set to None, to prevent KeyError in the consuming logic. + + Args: + interval (Optional[Union[DateTimeType, str]]): The date interval, which might be a single datetime, + a tuple with one or two datetimes, a string, or None. + + Returns: + dict: A dictionary representing the date interval for use in filtering search results, + always containing 'gte' and 'lte' keys. + """ + result: Dict[str, Optional[str]] = {"gte": None, "lte": None} + + if interval is None: + return result + + if isinstance(interval, str): + if "/" in interval: + parts = interval.split("/") + result["gte"] = parts[0] if parts[0] != ".." else None + result["lte"] = parts[1] if len(parts) > 1 and parts[1] != ".." else None + else: + converted_time = interval if interval != ".." else None + result["gte"] = result["lte"] = converted_time + return result + + if isinstance(interval, datetime_type): + datetime_iso = interval.isoformat() + result["gte"] = result["lte"] = datetime_iso + elif isinstance(interval, tuple): + start, end = interval + # Ensure datetimes are converted to UTC and formatted with 'Z' + if start: + result["gte"] = start.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" + if end: + result["lte"] = end.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" + + return result diff --git a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/document.py b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/document.py new file mode 100644 index 00000000..0ba0e025 --- /dev/null +++ b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/document.py @@ -0,0 +1,48 @@ +"""Document operations for Elasticsearch/OpenSearch. + +This module provides functions for working with documents in Elasticsearch/OpenSearch, +including document ID generation and bulk action creation. +""" + +from typing import Any, Dict, List + +from stac_fastapi.sfeos_helpers.database.index import index_alias_by_collection_id +from stac_fastapi.types.stac import Item + + +def mk_item_id(item_id: str, collection_id: str) -> str: + """Create the document id for an Item in Elasticsearch. + + Args: + item_id (str): The id of the Item. + collection_id (str): The id of the Collection that the Item belongs to. + + Returns: + str: The document id for the Item, combining the Item id and the Collection id, separated by a `|` character. + """ + return f"{item_id}|{collection_id}" + + +def mk_actions(collection_id: str, processed_items: List[Item]) -> List[Dict[str, Any]]: + """Create Elasticsearch bulk actions for a list of processed items. + + Args: + collection_id (str): The identifier for the collection the items belong to. + processed_items (List[Item]): The list of processed items to be bulk indexed. + + Returns: + List[Dict[str, Union[str, Dict]]]: The list of bulk actions to be executed, + each action being a dictionary with the following keys: + - `_index`: the index to store the document in. + - `_id`: the document's identifier. + - `_source`: the source of the document. + """ + index_alias = index_alias_by_collection_id(collection_id) + return [ + { + "_index": index_alias, + "_id": mk_item_id(item["id"], item["collection"]), + "_source": item, + } + for item in processed_items + ] diff --git a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/index.py b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/index.py new file mode 100644 index 00000000..3305f50f --- /dev/null +++ b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/index.py @@ -0,0 +1,130 @@ +"""Index management functions for Elasticsearch/OpenSearch. + +This module provides functions for creating and managing indices in Elasticsearch/OpenSearch. +""" + +from functools import lru_cache +from typing import Any, List, Optional + +from stac_fastapi.sfeos_helpers.mappings import ( + _ES_INDEX_NAME_UNSUPPORTED_CHARS_TABLE, + COLLECTIONS_INDEX, + ES_COLLECTIONS_MAPPINGS, + ES_ITEMS_MAPPINGS, + ES_ITEMS_SETTINGS, + ITEM_INDICES, + ITEMS_INDEX_PREFIX, +) + + +@lru_cache(256) +def index_by_collection_id(collection_id: str) -> str: + """ + Translate a collection id into an Elasticsearch index name. + + Args: + collection_id (str): The collection id to translate into an index name. + + Returns: + str: The index name derived from the collection id. + """ + cleaned = collection_id.translate(_ES_INDEX_NAME_UNSUPPORTED_CHARS_TABLE) + return ( + f"{ITEMS_INDEX_PREFIX}{cleaned.lower()}_{collection_id.encode('utf-8').hex()}" + ) + + +@lru_cache(256) +def index_alias_by_collection_id(collection_id: str) -> str: + """ + Translate a collection id into an Elasticsearch index alias. + + Args: + collection_id (str): The collection id to translate into an index alias. + + Returns: + str: The index alias derived from the collection id. + """ + cleaned = collection_id.translate(_ES_INDEX_NAME_UNSUPPORTED_CHARS_TABLE) + return f"{ITEMS_INDEX_PREFIX}{cleaned}" + + +def indices(collection_ids: Optional[List[str]]) -> str: + """ + Get a comma-separated string of index names for a given list of collection ids. + + Args: + collection_ids: A list of collection ids. + + Returns: + A string of comma-separated index names. If `collection_ids` is empty, returns the default indices. + """ + return ( + ",".join(map(index_alias_by_collection_id, collection_ids)) + if collection_ids + else ITEM_INDICES + ) + + +async def create_index_templates_shared(settings: Any) -> None: + """Create index templates for Elasticsearch/OpenSearch Collection and Item indices. + + Args: + settings (Any): The settings object containing the client configuration. + Must have a create_client attribute that returns an Elasticsearch/OpenSearch client. + + Returns: + None: This function doesn't return any value but creates index templates in the database. + + Notes: + This function creates two index templates: + 1. A template for the Collections index with the appropriate mappings + 2. A template for the Items indices with both settings and mappings + + These templates ensure that any new indices created with matching patterns + will automatically have the correct structure. + """ + client = settings.create_client + await client.indices.put_index_template( + name=f"template_{COLLECTIONS_INDEX}", + body={ + "index_patterns": [f"{COLLECTIONS_INDEX}*"], + "template": {"mappings": ES_COLLECTIONS_MAPPINGS}, + }, + ) + await client.indices.put_index_template( + name=f"template_{ITEMS_INDEX_PREFIX}", + body={ + "index_patterns": [f"{ITEMS_INDEX_PREFIX}*"], + "template": {"settings": ES_ITEMS_SETTINGS, "mappings": ES_ITEMS_MAPPINGS}, + }, + ) + await client.close() + + +async def delete_item_index_shared(settings: Any, collection_id: str) -> None: + """Delete the index for items in a collection. + + Args: + settings (Any): The settings object containing the client configuration. + Must have a create_client attribute that returns an Elasticsearch/OpenSearch client. + collection_id (str): The ID of the collection whose items index will be deleted. + + Returns: + None: This function doesn't return any value but deletes an item index in the database. + + Notes: + This function deletes an item index and its alias. It first resolves the alias to find + the actual index name, then deletes both the alias and the index. + """ + client = settings.create_client + + name = index_alias_by_collection_id(collection_id) + resolved = await client.indices.resolve_index(name=name) + if "aliases" in resolved and resolved["aliases"]: + [alias] = resolved["aliases"] + await client.indices.delete_alias(index=alias["indices"], name=alias["name"]) + await client.indices.delete(index=alias["indices"]) + else: + await client.indices.delete(index=name) + await client.close() diff --git a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/mapping.py b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/mapping.py new file mode 100644 index 00000000..8f664651 --- /dev/null +++ b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/mapping.py @@ -0,0 +1,38 @@ +"""Mapping functions for Elasticsearch/OpenSearch. + +This module provides functions for working with Elasticsearch/OpenSearch mappings. +""" + +from typing import Any, Dict + + +async def get_queryables_mapping_shared( + mappings: Dict[str, Dict[str, Any]], collection_id: str = "*" +) -> Dict[str, str]: + """Retrieve mapping of Queryables for search. + + Args: + mappings (Dict[str, Dict[str, Any]]): The mapping information returned from + Elasticsearch/OpenSearch client's indices.get_mapping() method. + Expected structure is {index_name: {"mappings": {...}}}. + collection_id (str, optional): The id of the Collection the Queryables + belongs to. Defaults to "*". + + Returns: + Dict[str, str]: A dictionary containing the Queryables mappings, where keys are + field names and values are the corresponding paths in the Elasticsearch/OpenSearch + document structure. + """ + queryables_mapping = {} + + for mapping in mappings.values(): + fields = mapping["mappings"].get("properties", {}) + properties = fields.pop("properties", {}).get("properties", {}).keys() + + for field_key in fields: + queryables_mapping[field_key] = field_key + + for property_key in properties: + queryables_mapping[property_key] = f"properties.{property_key}" + + return queryables_mapping diff --git a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/query.py b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/query.py new file mode 100644 index 00000000..dacbb590 --- /dev/null +++ b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/query.py @@ -0,0 +1,85 @@ +"""Query building functions for Elasticsearch/OpenSearch. + +This module provides functions for building and manipulating Elasticsearch/OpenSearch queries. +""" + +from typing import Any, Dict, List, Optional + +from stac_fastapi.sfeos_helpers.mappings import Geometry + + +def apply_free_text_filter_shared( + search: Any, free_text_queries: Optional[List[str]] +) -> Any: + """Create a free text query for Elasticsearch/OpenSearch. + + Args: + search (Any): The search object to apply the query to. + free_text_queries (Optional[List[str]]): A list of text strings to search for in the properties. + + Returns: + Any: The search object with the free text query applied, or the original search + object if no free_text_queries were provided. + + Notes: + This function creates a query_string query that searches for the specified text strings + in all properties of the documents. The query strings are joined with OR operators. + """ + if free_text_queries is not None: + free_text_query_string = '" OR properties.\\*:"'.join(free_text_queries) + search = search.query( + "query_string", query=f'properties.\\*:"{free_text_query_string}"' + ) + + return search + + +def apply_intersects_filter_shared( + intersects: Geometry, +) -> Dict[str, Dict]: + """Create a geo_shape filter for intersecting geometry. + + Args: + intersects (Geometry): The intersecting geometry, represented as a GeoJSON-like object. + + Returns: + Dict[str, Dict]: A dictionary containing the geo_shape filter configuration + that can be used with Elasticsearch/OpenSearch Q objects. + + Notes: + This function creates a geo_shape filter configuration to find documents that intersect + with the specified geometry. The returned dictionary should be wrapped in a Q object + when applied to a search. + """ + return { + "geo_shape": { + "geometry": { + "shape": { + "type": intersects.type.lower(), + "coordinates": intersects.coordinates, + }, + "relation": "intersects", + } + } + } + + +def populate_sort_shared(sortby: List) -> Optional[Dict[str, Dict[str, str]]]: + """Create a sort configuration for Elasticsearch/OpenSearch queries. + + Args: + sortby (List): A list of sort specifications, each containing a field and direction. + + Returns: + Optional[Dict[str, Dict[str, str]]]: A dictionary mapping field names to sort direction + configurations, or None if no sort was specified. + + Notes: + This function transforms a list of sort specifications into the format required by + Elasticsearch/OpenSearch for sorting query results. The returned dictionary can be + directly used in search requests. + """ + if sortby: + return {s.field: {"order": s.direction} for s in sortby} + else: + return None diff --git a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/utils.py b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/utils.py new file mode 100644 index 00000000..0c6b4c45 --- /dev/null +++ b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/utils.py @@ -0,0 +1,50 @@ +"""Utility functions for database operations in Elasticsearch/OpenSearch. + +This module provides utility functions for working with database operations +in Elasticsearch/OpenSearch, such as parameter validation. +""" + +import logging +from typing import Union + +from stac_fastapi.core.utilities import get_bool_env + + +def validate_refresh(value: Union[str, bool]) -> str: + """ + Validate the `refresh` parameter value. + + Args: + value (Union[str, bool]): The `refresh` parameter value, which can be a string or a boolean. + + Returns: + str: The validated value of the `refresh` parameter, which can be "true", "false", or "wait_for". + """ + logger = logging.getLogger(__name__) + + # Handle boolean-like values using get_bool_env + if isinstance(value, bool) or value in { + "true", + "false", + "1", + "0", + "yes", + "no", + "y", + "n", + }: + is_true = get_bool_env("DATABASE_REFRESH", default=value) + return "true" if is_true else "false" + + # Normalize to lowercase for case-insensitivity + value = value.lower() + + # Handle "wait_for" explicitly + if value == "wait_for": + return "wait_for" + + # Log a warning for invalid values and default to "false" + logger.warning( + f"Invalid value for `refresh`: '{value}'. Expected 'true', 'false', or 'wait_for'. Defaulting to 'false'." + ) + return "false" diff --git a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/filter/README.md b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/filter/README.md new file mode 100644 index 00000000..d3b09167 --- /dev/null +++ b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/filter/README.md @@ -0,0 +1,27 @@ +# STAC FastAPI Filter Package + +This package contains shared filter extension functionality used by both the Elasticsearch and OpenSearch +implementations of STAC FastAPI. It helps reduce code duplication and ensures consistent behavior +between the two implementations. + +## Package Structure + +The filter package is organized into three main modules: + +- **cql2.py**: Contains functions for converting CQL2 patterns to Elasticsearch/OpenSearch compatible formats + - [cql2_like_to_es](cci:1://file:///home/computer/Code/stac-fastapi-elasticsearch-opensearch/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/filter.py:59:0-75:5): Converts CQL2 "LIKE" characters to Elasticsearch "wildcard" characters + - [_replace_like_patterns](cci:1://file:///home/computer/Code/stac-fastapi-elasticsearch-opensearch/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/filter.py:51:0-56:71): Helper function for pattern replacement + +- **transform.py**: Contains functions for transforming CQL2 queries to Elasticsearch/OpenSearch query DSL + - [to_es_field](cci:1://file:///home/computer/Code/stac-fastapi-elasticsearch-opensearch/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/filter.py:83:0-93:47): Maps field names using queryables mapping + - [to_es](cci:1://file:///home/computer/Code/stac-fastapi-elasticsearch-opensearch/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/filter.py:96:0-201:13): Transforms CQL2 query structures to Elasticsearch/OpenSearch query DSL + +- **client.py**: Contains the base filter client implementation + - [EsAsyncBaseFiltersClient](cci:2://file:///home/computer/Code/stac-fastapi-elasticsearch-opensearch/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/filter.py:209:0-293:25): Base class for implementing the STAC filter extension + +## Usage + +Import the necessary components from the filter package: + +```python +from stac_fastapi.sfeos_helpers.filter import cql2_like_to_es, to_es, EsAsyncBaseFiltersClient \ No newline at end of file diff --git a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/filter/__init__.py b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/filter/__init__.py new file mode 100644 index 00000000..02b5db92 --- /dev/null +++ b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/filter/__init__.py @@ -0,0 +1,44 @@ +"""Shared filter extension methods for stac-fastapi elasticsearch and opensearch backends. + +This module provides shared functionality for implementing the STAC API Filter Extension +with Elasticsearch and OpenSearch. It includes: + +1. Functions for converting CQL2 queries to Elasticsearch/OpenSearch query DSL +2. Helper functions for field mapping and query transformation +3. Base implementation of the AsyncBaseFiltersClient for Elasticsearch/OpenSearch + +The filter package is organized as follows: +- cql2.py: CQL2 pattern conversion helpers +- transform.py: Query transformation functions +- client.py: Filter client implementation + +When adding new functionality to this package, consider: +1. Will this code be used by both Elasticsearch and OpenSearch implementations? +2. Is the functionality stable and unlikely to diverge between implementations? +3. Is the function well-documented with clear input/output contracts? + +Function Naming Conventions: +- Function names should be descriptive and indicate their purpose +- Parameter names should be consistent across similar functions +""" + +from .client import EsAsyncBaseFiltersClient + +# Re-export the main functions and classes for backward compatibility +from .cql2 import ( + _replace_like_patterns, + cql2_like_patterns, + cql2_like_to_es, + valid_like_substitutions, +) +from .transform import to_es, to_es_field + +__all__ = [ + "cql2_like_patterns", + "valid_like_substitutions", + "cql2_like_to_es", + "_replace_like_patterns", + "to_es_field", + "to_es", + "EsAsyncBaseFiltersClient", +] diff --git a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/filter/client.py b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/filter/client.py new file mode 100644 index 00000000..4b2a1a71 --- /dev/null +++ b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/filter/client.py @@ -0,0 +1,98 @@ +"""Filter client implementation for Elasticsearch/OpenSearch.""" + +from collections import deque +from typing import Any, Dict, Optional + +import attr + +from stac_fastapi.core.base_database_logic import BaseDatabaseLogic +from stac_fastapi.core.extensions.filter import DEFAULT_QUERYABLES +from stac_fastapi.extensions.core.filter.client import AsyncBaseFiltersClient +from stac_fastapi.sfeos_helpers.mappings import ES_MAPPING_TYPE_TO_JSON + + +@attr.s +class EsAsyncBaseFiltersClient(AsyncBaseFiltersClient): + """Defines a pattern for implementing the STAC filter extension.""" + + database: BaseDatabaseLogic = attr.ib() + + async def get_queryables( + self, collection_id: Optional[str] = None, **kwargs + ) -> Dict[str, Any]: + """Get the queryables available for the given collection_id. + + If collection_id is None, returns the intersection of all + queryables over all collections. + + This base implementation returns a blank queryable schema. This is not allowed + under OGC CQL but it is allowed by the STAC API Filter Extension + + https://github.com/radiantearth/stac-api-spec/tree/master/fragments/filter#queryables + + Args: + collection_id (str, optional): The id of the collection to get queryables for. + **kwargs: additional keyword arguments + + Returns: + Dict[str, Any]: A dictionary containing the queryables for the given collection. + """ + queryables: Dict[str, Any] = { + "$schema": "https://json-schema.org/draft/2019-09/schema", + "$id": "https://stac-api.example.com/queryables", + "type": "object", + "title": "Queryables for STAC API", + "description": "Queryable names for the STAC API Item Search filter.", + "properties": DEFAULT_QUERYABLES, + "additionalProperties": True, + } + if not collection_id: + return queryables + + properties: Dict[str, Any] = queryables["properties"] + queryables.update( + { + "properties": properties, + "additionalProperties": False, + } + ) + + mapping_data = await self.database.get_items_mapping(collection_id) + mapping_properties = next(iter(mapping_data.values()))["mappings"]["properties"] + stack = deque(mapping_properties.items()) + + while stack: + field_name, field_def = stack.popleft() + + # Iterate over nested fields + field_properties = field_def.get("properties") + if field_properties: + # Fields in Item Properties should be exposed with their un-prefixed names, + # and not require expressions to prefix them with properties, + # e.g., eo:cloud_cover instead of properties.eo:cloud_cover. + if field_name == "properties": + stack.extend(field_properties.items()) + else: + stack.extend( + (f"{field_name}.{k}", v) for k, v in field_properties.items() + ) + + # Skip non-indexed or disabled fields + field_type = field_def.get("type") + if not field_type or not field_def.get("enabled", True): + continue + + # Generate field properties + field_result = DEFAULT_QUERYABLES.get(field_name, {}) + properties[field_name] = field_result + + field_name_human = field_name.replace("_", " ").title() + field_result.setdefault("title", field_name_human) + + field_type_json = ES_MAPPING_TYPE_TO_JSON.get(field_type, field_type) + field_result.setdefault("type", field_type_json) + + if field_type in {"date", "date_nanos"}: + field_result.setdefault("format", "date-time") + + return queryables diff --git a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/filter/cql2.py b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/filter/cql2.py new file mode 100644 index 00000000..bd248c90 --- /dev/null +++ b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/filter/cql2.py @@ -0,0 +1,39 @@ +"""CQL2 pattern conversion helpers for Elasticsearch/OpenSearch.""" + +import re + +cql2_like_patterns = re.compile(r"\\.|[%_]|\\$") +valid_like_substitutions = { + "\\\\": "\\", + "\\%": "%", + "\\_": "_", + "%": "*", + "_": "?", +} + + +def _replace_like_patterns(match: re.Match) -> str: + pattern = match.group() + try: + return valid_like_substitutions[pattern] + except KeyError: + raise ValueError(f"'{pattern}' is not a valid escape sequence") + + +def cql2_like_to_es(string: str) -> str: + """ + Convert CQL2 "LIKE" characters to Elasticsearch "wildcard" characters. + + Args: + string (str): The string containing CQL2 wildcard characters. + + Returns: + str: The converted string with Elasticsearch compatible wildcards. + + Raises: + ValueError: If an invalid escape sequence is encountered. + """ + return cql2_like_patterns.sub( + repl=_replace_like_patterns, + string=string, + ) diff --git a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/filter/transform.py b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/filter/transform.py new file mode 100644 index 00000000..c78b19e5 --- /dev/null +++ b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/filter/transform.py @@ -0,0 +1,133 @@ +"""Query transformation functions for Elasticsearch/OpenSearch.""" + +from typing import Any, Dict + +from stac_fastapi.core.extensions.filter import ( + AdvancedComparisonOp, + ComparisonOp, + LogicalOp, + SpatialOp, +) + +from .cql2 import cql2_like_to_es + + +def to_es_field(queryables_mapping: Dict[str, Any], field: str) -> str: + """ + Map a given field to its corresponding Elasticsearch field according to a predefined mapping. + + Args: + field (str): The field name from a user query or filter. + + Returns: + str: The mapped field name suitable for Elasticsearch queries. + """ + return queryables_mapping.get(field, field) + + +def to_es(queryables_mapping: Dict[str, Any], query: Dict[str, Any]) -> Dict[str, Any]: + """ + Transform a simplified CQL2 query structure to an Elasticsearch compatible query DSL. + + Args: + query (Dict[str, Any]): The query dictionary containing 'op' and 'args'. + + Returns: + Dict[str, Any]: The corresponding Elasticsearch query in the form of a dictionary. + """ + if query["op"] in [LogicalOp.AND, LogicalOp.OR, LogicalOp.NOT]: + bool_type = { + LogicalOp.AND: "must", + LogicalOp.OR: "should", + LogicalOp.NOT: "must_not", + }[query["op"]] + return { + "bool": { + bool_type: [ + to_es(queryables_mapping, sub_query) for sub_query in query["args"] + ] + } + } + + elif query["op"] in [ + ComparisonOp.EQ, + ComparisonOp.NEQ, + ComparisonOp.LT, + ComparisonOp.LTE, + ComparisonOp.GT, + ComparisonOp.GTE, + ]: + range_op = { + ComparisonOp.LT: "lt", + ComparisonOp.LTE: "lte", + ComparisonOp.GT: "gt", + ComparisonOp.GTE: "gte", + } + + field = to_es_field(queryables_mapping, query["args"][0]["property"]) + value = query["args"][1] + if isinstance(value, dict) and "timestamp" in value: + value = value["timestamp"] + if query["op"] == ComparisonOp.EQ: + return {"range": {field: {"gte": value, "lte": value}}} + elif query["op"] == ComparisonOp.NEQ: + return { + "bool": { + "must_not": [{"range": {field: {"gte": value, "lte": value}}}] + } + } + else: + return {"range": {field: {range_op[query["op"]]: value}}} + else: + if query["op"] == ComparisonOp.EQ: + return {"term": {field: value}} + elif query["op"] == ComparisonOp.NEQ: + return {"bool": {"must_not": [{"term": {field: value}}]}} + else: + return {"range": {field: {range_op[query["op"]]: value}}} + + elif query["op"] == ComparisonOp.IS_NULL: + field = to_es_field(queryables_mapping, query["args"][0]["property"]) + return {"bool": {"must_not": {"exists": {"field": field}}}} + + elif query["op"] == AdvancedComparisonOp.BETWEEN: + field = to_es_field(queryables_mapping, query["args"][0]["property"]) + gte, lte = query["args"][1], query["args"][2] + if isinstance(gte, dict) and "timestamp" in gte: + gte = gte["timestamp"] + if isinstance(lte, dict) and "timestamp" in lte: + lte = lte["timestamp"] + return {"range": {field: {"gte": gte, "lte": lte}}} + + elif query["op"] == AdvancedComparisonOp.IN: + field = to_es_field(queryables_mapping, query["args"][0]["property"]) + values = query["args"][1] + if not isinstance(values, list): + raise ValueError(f"Arg {values} is not a list") + return {"terms": {field: values}} + + elif query["op"] == AdvancedComparisonOp.LIKE: + field = to_es_field(queryables_mapping, query["args"][0]["property"]) + pattern = cql2_like_to_es(query["args"][1]) + return {"wildcard": {field: {"value": pattern, "case_insensitive": True}}} + + elif query["op"] in [ + SpatialOp.S_INTERSECTS, + SpatialOp.S_CONTAINS, + SpatialOp.S_WITHIN, + SpatialOp.S_DISJOINT, + ]: + field = to_es_field(queryables_mapping, query["args"][0]["property"]) + geometry = query["args"][1] + + relation_mapping = { + SpatialOp.S_INTERSECTS: "intersects", + SpatialOp.S_CONTAINS: "contains", + SpatialOp.S_WITHIN: "within", + SpatialOp.S_DISJOINT: "disjoint", + } + + relation = relation_mapping[query["op"]] + return {"geo_shape": {field: {"shape": geometry, "relation": relation}}} + + return {} diff --git a/stac_fastapi/core/stac_fastapi/core/database_logic.py b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/mappings.py similarity index 50% rename from stac_fastapi/core/stac_fastapi/core/database_logic.py rename to stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/mappings.py index 85ebcf21..476d656a 100644 --- a/stac_fastapi/core/stac_fastapi/core/database_logic.py +++ b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/mappings.py @@ -1,10 +1,32 @@ -"""Database logic core.""" +"""Shared mappings for stac-fastapi elasticsearch and opensearch backends. -import os -from functools import lru_cache -from typing import Any, Dict, List, Optional, Protocol +This module contains shared constants, mappings, and type definitions used by both +the Elasticsearch and OpenSearch implementations of STAC FastAPI. It includes: + +1. Index name constants and character translation tables +2. Mapping definitions for Collections and Items +3. Aggregation mappings for search queries +4. Type conversion mappings between Elasticsearch/OpenSearch and JSON Schema types -from stac_fastapi.types.stac import Item +The sfeos_helpers package is organized as follows: +- database_logic_helpers.py: Shared database operations +- filter.py: Shared filter extension implementation +- mappings.py: Shared constants and mapping definitions (this file) +- utilities.py: Shared utility functions + +When adding new functionality to this package, consider: +1. Will this code be used by both Elasticsearch and OpenSearch implementations? +2. Is the functionality stable and unlikely to diverge between implementations? +3. Is the function well-documented with clear input/output contracts? + +Function Naming Conventions: +- All shared functions should end with `_shared` to clearly indicate they're meant to be used by both implementations +- Function names should be descriptive and indicate their purpose +- Parameter names should be consistent across similar functions +""" + +import os +from typing import Any, Dict, Literal, Protocol # stac_pydantic classes extend _GeometryBase, which doesn't have a type field, @@ -144,89 +166,97 @@ class Geometry(Protocol): # noqa }, } - -@lru_cache(256) -def index_by_collection_id(collection_id: str) -> str: - """ - Translate a collection id into an Elasticsearch index name. - - Args: - collection_id (str): The collection id to translate into an index name. - - Returns: - str: The index name derived from the collection id. - """ - cleaned = collection_id.translate(_ES_INDEX_NAME_UNSUPPORTED_CHARS_TABLE) - return ( - f"{ITEMS_INDEX_PREFIX}{cleaned.lower()}_{collection_id.encode('utf-8').hex()}" - ) - - -@lru_cache(256) -def index_alias_by_collection_id(collection_id: str) -> str: - """ - Translate a collection id into an Elasticsearch index alias. - - Args: - collection_id (str): The collection id to translate into an index alias. - - Returns: - str: The index alias derived from the collection id. - """ - cleaned = collection_id.translate(_ES_INDEX_NAME_UNSUPPORTED_CHARS_TABLE) - return f"{ITEMS_INDEX_PREFIX}{cleaned}" - - -def indices(collection_ids: Optional[List[str]]) -> str: - """ - Get a comma-separated string of index names for a given list of collection ids. - - Args: - collection_ids: A list of collection ids. - - Returns: - A string of comma-separated index names. If `collection_ids` is empty, returns the default indices. - """ - return ( - ",".join(map(index_alias_by_collection_id, collection_ids)) - if collection_ids - else ITEM_INDICES - ) - - -def mk_item_id(item_id: str, collection_id: str) -> str: - """Create the document id for an Item in Elasticsearch. - - Args: - item_id (str): The id of the Item. - collection_id (str): The id of the Collection that the Item belongs to. - - Returns: - str: The document id for the Item, combining the Item id and the Collection id, separated by a `|` character. - """ - return f"{item_id}|{collection_id}" - - -def mk_actions(collection_id: str, processed_items: List[Item]) -> List[Dict[str, Any]]: - """Create Elasticsearch bulk actions for a list of processed items. - - Args: - collection_id (str): The identifier for the collection the items belong to. - processed_items (List[Item]): The list of processed items to be bulk indexed. - - Returns: - List[Dict[str, Union[str, Dict]]]: The list of bulk actions to be executed, - each action being a dictionary with the following keys: - - `_index`: the index to store the document in. - - `_id`: the document's identifier. - - `_source`: the source of the document. - """ - index_alias = index_alias_by_collection_id(collection_id) - return [ - { - "_index": index_alias, - "_id": mk_item_id(item["id"], item["collection"]), - "_source": item, +# Shared aggregation mapping for both Elasticsearch and OpenSearch +AGGREGATION_MAPPING: Dict[str, Dict[str, Any]] = { + "total_count": {"value_count": {"field": "id"}}, + "collection_frequency": {"terms": {"field": "collection", "size": 100}}, + "platform_frequency": {"terms": {"field": "properties.platform", "size": 100}}, + "cloud_cover_frequency": { + "range": { + "field": "properties.eo:cloud_cover", + "ranges": [ + {"to": 5}, + {"from": 5, "to": 15}, + {"from": 15, "to": 40}, + {"from": 40}, + ], + } + }, + "datetime_frequency": { + "date_histogram": { + "field": "properties.datetime", + "calendar_interval": "month", + } + }, + "datetime_min": {"min": {"field": "properties.datetime"}}, + "datetime_max": {"max": {"field": "properties.datetime"}}, + "grid_code_frequency": { + "terms": { + "field": "properties.grid:code", + "missing": "none", + "size": 10000, + } + }, + "sun_elevation_frequency": { + "histogram": {"field": "properties.view:sun_elevation", "interval": 5} + }, + "sun_azimuth_frequency": { + "histogram": {"field": "properties.view:sun_azimuth", "interval": 5} + }, + "off_nadir_frequency": { + "histogram": {"field": "properties.view:off_nadir", "interval": 5} + }, + "centroid_geohash_grid_frequency": { + "geohash_grid": { + "field": "properties.proj:centroid", + "precision": 1, + } + }, + "centroid_geohex_grid_frequency": { + "geohex_grid": { + "field": "properties.proj:centroid", + "precision": 0, + } + }, + "centroid_geotile_grid_frequency": { + "geotile_grid": { + "field": "properties.proj:centroid", + "precision": 0, + } + }, + "geometry_geohash_grid_frequency": { + "geohash_grid": { + "field": "geometry", + "precision": 1, + } + }, + "geometry_geotile_grid_frequency": { + "geotile_grid": { + "field": "geometry", + "precision": 0, } - for item in processed_items - ] + }, +} + +ES_MAPPING_TYPE_TO_JSON: Dict[ + str, Literal["string", "number", "boolean", "object", "array", "null"] +] = { + "date": "string", + "date_nanos": "string", + "keyword": "string", + "match_only_text": "string", + "text": "string", + "wildcard": "string", + "byte": "number", + "double": "number", + "float": "number", + "half_float": "number", + "long": "number", + "scaled_float": "number", + "short": "number", + "token_count": "number", + "unsigned_long": "number", + "geo_point": "object", + "geo_shape": "object", + "nested": "array", +} diff --git a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/version.py b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/version.py new file mode 100644 index 00000000..e152cdff --- /dev/null +++ b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/version.py @@ -0,0 +1,2 @@ +"""library version.""" +__version__ = "5.0.0a0" diff --git a/stac_fastapi/tests/conftest.py b/stac_fastapi/tests/conftest.py index 066b014d..afb9ac9b 100644 --- a/stac_fastapi/tests/conftest.py +++ b/stac_fastapi/tests/conftest.py @@ -23,11 +23,11 @@ from stac_fastapi.core.extensions.aggregation import ( EsAggregationExtensionGetRequest, EsAggregationExtensionPostRequest, - EsAsyncAggregationClient, ) from stac_fastapi.core.rate_limit import setup_rate_limit from stac_fastapi.core.route_dependencies import get_route_dependencies from stac_fastapi.core.utilities import get_bool_env +from stac_fastapi.sfeos_helpers.aggregation import EsAsyncBaseAggregationClient if os.getenv("BACKEND", "elasticsearch").lower() == "opensearch": from stac_fastapi.opensearch.config import AsyncOpensearchSettings as AsyncSettings @@ -199,7 +199,7 @@ async def app(): settings = AsyncSettings() aggregation_extension = AggregationExtension( - client=EsAsyncAggregationClient( + client=EsAsyncBaseAggregationClient( database=database, session=None, settings=settings ) ) @@ -255,7 +255,7 @@ async def app_rate_limit(): settings = AsyncSettings() aggregation_extension = AggregationExtension( - client=EsAsyncAggregationClient( + client=EsAsyncBaseAggregationClient( database=database, session=None, settings=settings ) ) @@ -349,7 +349,7 @@ async def app_basic_auth(): settings = AsyncSettings() aggregation_extension = AggregationExtension( - client=EsAsyncAggregationClient( + client=EsAsyncBaseAggregationClient( database=database, session=None, settings=settings ) ) @@ -488,7 +488,7 @@ def build_test_app(): ) settings = AsyncSettings() aggregation_extension = AggregationExtension( - client=EsAsyncAggregationClient( + client=EsAsyncBaseAggregationClient( database=database, session=None, settings=settings ) ) diff --git a/stac_fastapi/tests/database/test_database.py b/stac_fastapi/tests/database/test_database.py index a5a01e60..86611235 100644 --- a/stac_fastapi/tests/database/test_database.py +++ b/stac_fastapi/tests/database/test_database.py @@ -1,25 +1,16 @@ -import os import uuid import pytest from stac_pydantic import api -from ..conftest import MockRequest, database +from stac_fastapi.sfeos_helpers.database import index_alias_by_collection_id +from stac_fastapi.sfeos_helpers.mappings import ( + COLLECTIONS_INDEX, + ES_COLLECTIONS_MAPPINGS, + ES_ITEMS_MAPPINGS, +) -if os.getenv("BACKEND", "elasticsearch").lower() == "opensearch": - from stac_fastapi.opensearch.database_logic import ( - COLLECTIONS_INDEX, - ES_COLLECTIONS_MAPPINGS, - ES_ITEMS_MAPPINGS, - index_alias_by_collection_id, - ) -else: - from stac_fastapi.elasticsearch.database_logic import ( - COLLECTIONS_INDEX, - ES_COLLECTIONS_MAPPINGS, - ES_ITEMS_MAPPINGS, - index_alias_by_collection_id, - ) +from ..conftest import MockRequest, database @pytest.mark.asyncio diff --git a/stac_fastapi/tests/extensions/test_cql2_like_to_es.py b/stac_fastapi/tests/extensions/test_cql2_like_to_es.py index 96d51272..2125eeed 100644 --- a/stac_fastapi/tests/extensions/test_cql2_like_to_es.py +++ b/stac_fastapi/tests/extensions/test_cql2_like_to_es.py @@ -1,6 +1,6 @@ import pytest -from stac_fastapi.core.extensions.filter import cql2_like_to_es +from stac_fastapi.sfeos_helpers.filter import cql2_like_to_es @pytest.mark.parametrize(