From 3a9f0769f273e17dc77d3d0c6262ef1d6e9e973d Mon Sep 17 00:00:00 2001 From: jonhealy1 Date: Wed, 7 May 2025 14:22:56 +0800 Subject: [PATCH 01/13] add es_os_refresh env var --- README.md | 7 ++++--- .../elasticsearch/stac_fastapi/elasticsearch/config.py | 1 + stac_fastapi/opensearch/stac_fastapi/opensearch/config.py | 1 + 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 1ae2f085..16e5b77f 100644 --- a/README.md +++ b/README.md @@ -111,10 +111,11 @@ You can customize additional settings in your `.env` file: | `RELOAD` | Enable auto-reload for development. | `true` | Optional | | `STAC_FASTAPI_RATE_LIMIT` | API rate limit per client. | `200/minute` | Optional | | `BACKEND` | Tests-related variable | `elasticsearch` or `opensearch` based on the backend | Optional | -| `ELASTICSEARCH_VERSION` | Version of Elasticsearch to use. | `8.11.0` | Optional | -| `ENABLE_DIRECT_RESPONSE` | Enable direct response for maximum performance (disables all FastAPI dependencies, including authentication, custom status codes, and validation) | `false` | Optional | +| `ELASTICSEARCH_VERSION` | Version of Elasticsearch to use. | `8.11.0` | Optional | | | `OPENSEARCH_VERSION` | OpenSearch version | `2.11.1` | Optional -| `RAISE_ON_BULK_ERROR` | Controls whether bulk insert operations raise exceptions on errors. If set to `true`, the operation will stop and raise an exception when an error occurs. If set to `false`, errors will be logged, and the operation will continue. **Note:** STAC Item and ItemCollection validation errors will always raise, regardless of this flag. | `false` | Optional | +| `ENABLE_DIRECT_RESPONSE` | Enable direct response for maximum performance (disables all FastAPI dependencies, including authentication, custom status codes, and validation) | `false` | Optional +| `RAISE_ON_BULK_ERROR` | Controls whether bulk insert operations raise exceptions on errors. If set to `true`, the operation will stop and raise an exception when an error occurs. If set to `false`, errors will be logged, and the operation will continue. **Note:** STAC Item and ItemCollection validation errors will always raise, regardless of this flag. | `false` Optional | +| `ES_OS_REFRESH` | Controls whether Elasticsearch/OpenSearch operations refresh the index immediately after changes. If set to `true`, changes will be immediately searchable. If set to `false`, changes may not be immediately visible but can improve performance for bulk operations. | `false` | Optional | > [!NOTE] > The variables `ES_HOST`, `ES_PORT`, `ES_USE_SSL`, and `ES_VERIFY_CERTS` apply to both Elasticsearch and OpenSearch backends, so there is no need to rename the key names to `OS_` even if you're using OpenSearch. diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py index 37e1ba5b..03ae5d64 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py @@ -87,6 +87,7 @@ class ElasticsearchSettings(ApiSettings, ApiBaseSettings): enable_response_models: bool = False enable_direct_response: bool = get_bool_env("ENABLE_DIRECT_RESPONSE", default=False) raise_on_bulk_error: bool = get_bool_env("RAISE_ON_BULK_ERROR", default=False) + es_os_refresh: bool = get_bool_env("ES_OS_REFRESH", default=False) @property def create_client(self): diff --git a/stac_fastapi/opensearch/stac_fastapi/opensearch/config.py b/stac_fastapi/opensearch/stac_fastapi/opensearch/config.py index 4c305fda..93b99181 100644 --- a/stac_fastapi/opensearch/stac_fastapi/opensearch/config.py +++ b/stac_fastapi/opensearch/stac_fastapi/opensearch/config.py @@ -105,6 +105,7 @@ class AsyncOpensearchSettings(ApiSettings, ApiBaseSettings): enable_response_models: bool = False enable_direct_response: bool = get_bool_env("ENABLE_DIRECT_RESPONSE", default=False) raise_on_bulk_error: bool = get_bool_env("RAISE_ON_BULK_ERROR", default=False) + es_os_refresh: bool = get_bool_env("ES_OS_REFRESH", default=False) @property def create_client(self): From 602629bf4ba4644d95bed9c9d5701cca6498e5ca Mon Sep 17 00:00:00 2001 From: jonhealy1 Date: Wed, 7 May 2025 15:26:03 +0800 Subject: [PATCH 02/13] add to transactions client --- README.md | 2 +- stac_fastapi/core/stac_fastapi/core/core.py | 75 +++++++++++++++++-- .../stac_fastapi/elasticsearch/config.py | 3 +- .../stac_fastapi/opensearch/config.py | 3 +- 4 files changed, 72 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 16e5b77f..7288e290 100644 --- a/README.md +++ b/README.md @@ -115,7 +115,7 @@ You can customize additional settings in your `.env` file: | `OPENSEARCH_VERSION` | OpenSearch version | `2.11.1` | Optional | `ENABLE_DIRECT_RESPONSE` | Enable direct response for maximum performance (disables all FastAPI dependencies, including authentication, custom status codes, and validation) | `false` | Optional | `RAISE_ON_BULK_ERROR` | Controls whether bulk insert operations raise exceptions on errors. If set to `true`, the operation will stop and raise an exception when an error occurs. If set to `false`, errors will be logged, and the operation will continue. **Note:** STAC Item and ItemCollection validation errors will always raise, regardless of this flag. | `false` Optional | -| `ES_OS_REFRESH` | Controls whether Elasticsearch/OpenSearch operations refresh the index immediately after changes. If set to `true`, changes will be immediately searchable. If set to `false`, changes may not be immediately visible but can improve performance for bulk operations. | `false` | Optional | +| `DATABASE_REFRESH` | Controls whether database operations refresh the index immediately after changes. If set to `true`, changes will be immediately searchable. If set to `false`, changes may not be immediately visible but can improve performance for bulk operations. | `false` | Optional | > [!NOTE] > The variables `ES_HOST`, `ES_PORT`, `ES_USE_SSL`, and `ES_VERIFY_CERTS` apply to both Elasticsearch and OpenSearch backends, so there is no need to rename the key names to `OS_` even if you're using OpenSearch. diff --git a/stac_fastapi/core/stac_fastapi/core/core.py b/stac_fastapi/core/stac_fastapi/core/core.py index f994b619..833fc6dd 100644 --- a/stac_fastapi/core/stac_fastapi/core/core.py +++ b/stac_fastapi/core/stac_fastapi/core/core.py @@ -673,6 +673,27 @@ class TransactionsClient(AsyncBaseTransactionsClient): settings: ApiBaseSettings = attr.ib() session: Session = attr.ib(default=attr.Factory(Session.create_from_env)) + def _resolve_refresh(self, **kwargs) -> bool: + """ + Resolve the `refresh` parameter from kwargs or the environment variable. + + Args: + **kwargs: Additional keyword arguments, including `refresh`. + + Returns: + bool: The resolved value of the `refresh` parameter. + """ + refresh = kwargs.get( + "refresh", self.database.async_settings.database_refresh == "true" + ) + if "refresh" in kwargs: + logger.info(f"`refresh` parameter explicitly passed in kwargs: {refresh}") + else: + logger.info( + f"`refresh` parameter derived from environment variable: {refresh}" + ) + return refresh + @overrides async def create_item( self, collection_id: str, item: Union[Item, ItemCollection], **kwargs @@ -696,6 +717,9 @@ async def create_item( request = kwargs.get("request") base_url = str(request.base_url) + # Resolve the `refresh` parameter + refresh = self._resolve_refresh(**kwargs) + # Convert Pydantic model to dict for uniform processing item_dict = item.model_dump(mode="json") @@ -712,10 +736,11 @@ async def create_item( for feature in features ] attempted = len(processed_items) + success, errors = await self.database.bulk_async( collection_id, processed_items, - refresh=kwargs.get("refresh", False), + refresh=refresh, ) if errors: logger.error( @@ -730,7 +755,7 @@ async def create_item( # Handle single item await self.database.create_item( item_dict, - refresh=kwargs.get("refresh", False), + refresh=refresh, base_url=base_url, exist_ok=False, ) @@ -757,11 +782,15 @@ async def update_item( """ item = item.model_dump(mode="json") base_url = str(kwargs["request"].base_url) + + # Resolve the `refresh` parameter + refresh = self._resolve_refresh(**kwargs) + now = datetime_type.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") item["properties"]["updated"] = now await self.database.create_item( - item, refresh=kwargs.get("refresh", False), base_url=base_url, exist_ok=True + item, refresh=refresh, base_url=base_url, exist_ok=True ) return ItemSerializer.db_to_stac(item, base_url) @@ -777,7 +806,12 @@ async def delete_item(self, item_id: str, collection_id: str, **kwargs) -> None: Returns: None: Returns 204 No Content on successful deletion """ - await self.database.delete_item(item_id=item_id, collection_id=collection_id) + # Resolve the `refresh` parameter + refresh = self._resolve_refresh(**kwargs) + + await self.database.delete_item( + item_id=item_id, collection_id=collection_id, refresh=refresh + ) return None @overrides @@ -798,8 +832,12 @@ async def create_collection( """ collection = collection.model_dump(mode="json") request = kwargs["request"] + + # Resolve the `refresh` parameter + refresh = self._resolve_refresh(**kwargs) + collection = self.database.collection_serializer.stac_to_db(collection, request) - await self.database.create_collection(collection=collection) + await self.database.create_collection(collection=collection, refresh=refresh) return CollectionSerializer.db_to_stac( collection, request, @@ -833,9 +871,12 @@ async def update_collection( request = kwargs["request"] + # Resolve the `refresh` parameter + refresh = self._resolve_refresh(**kwargs) + collection = self.database.collection_serializer.stac_to_db(collection, request) await self.database.update_collection( - collection_id=collection_id, collection=collection + collection_id=collection_id, collection=collection, refresh=refresh ) return CollectionSerializer.db_to_stac( @@ -860,7 +901,12 @@ async def delete_collection(self, collection_id: str, **kwargs) -> None: Raises: NotFoundError: If the collection doesn't exist """ - await self.database.delete_collection(collection_id=collection_id) + # Resolve the `refresh` parameter + refresh = self._resolve_refresh(**kwargs) + + await self.database.delete_collection( + collection_id=collection_id, refresh=refresh + ) return None @@ -919,6 +965,19 @@ def bulk_item_insert( else: base_url = "" + # Use `refresh` from kwargs if provided, otherwise fall back to the environment variable + refresh = kwargs.get( + "refresh", self.database.sync_settings.database_refresh == "true" + ) + + # Log the value of `refresh` and its source + if "refresh" in kwargs: + logger.info(f"`refresh` parameter explicitly passed in kwargs: {refresh}") + else: + logger.info( + f"`refresh` parameter derived from environment variable: {refresh}" + ) + processed_items = [] for item in items.items.values(): try: @@ -937,7 +996,7 @@ def bulk_item_insert( success, errors = self.database.bulk_sync( collection_id, processed_items, - refresh=kwargs.get("refresh", False), + refresh=refresh, ) if errors: logger.error(f"Bulk sync operation encountered errors: {errors}") diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py index 03ae5d64..a87a7a66 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py @@ -87,7 +87,7 @@ class ElasticsearchSettings(ApiSettings, ApiBaseSettings): enable_response_models: bool = False enable_direct_response: bool = get_bool_env("ENABLE_DIRECT_RESPONSE", default=False) raise_on_bulk_error: bool = get_bool_env("RAISE_ON_BULK_ERROR", default=False) - es_os_refresh: bool = get_bool_env("ES_OS_REFRESH", default=False) + database_refresh: bool = get_bool_env("DATABASE_REFRESH", default=False) @property def create_client(self): @@ -109,6 +109,7 @@ class AsyncElasticsearchSettings(ApiSettings, ApiBaseSettings): enable_response_models: bool = False enable_direct_response: bool = get_bool_env("ENABLE_DIRECT_RESPONSE", default=False) raise_on_bulk_error: bool = get_bool_env("RAISE_ON_BULK_ERROR", default=False) + database_refresh: bool = get_bool_env("DATABASE_REFRESH", default=False) @property def create_client(self): diff --git a/stac_fastapi/opensearch/stac_fastapi/opensearch/config.py b/stac_fastapi/opensearch/stac_fastapi/opensearch/config.py index 93b99181..a8902af3 100644 --- a/stac_fastapi/opensearch/stac_fastapi/opensearch/config.py +++ b/stac_fastapi/opensearch/stac_fastapi/opensearch/config.py @@ -84,6 +84,7 @@ class OpensearchSettings(ApiSettings, ApiBaseSettings): enable_response_models: bool = False enable_direct_response: bool = get_bool_env("ENABLE_DIRECT_RESPONSE", default=False) raise_on_bulk_error: bool = get_bool_env("RAISE_ON_BULK_ERROR", default=False) + database_refresh: bool = get_bool_env("DATABASE_REFRESH", default=False) @property def create_client(self): @@ -105,7 +106,7 @@ class AsyncOpensearchSettings(ApiSettings, ApiBaseSettings): enable_response_models: bool = False enable_direct_response: bool = get_bool_env("ENABLE_DIRECT_RESPONSE", default=False) raise_on_bulk_error: bool = get_bool_env("RAISE_ON_BULK_ERROR", default=False) - es_os_refresh: bool = get_bool_env("ES_OS_REFRESH", default=False) + database_refresh: bool = get_bool_env("DATABASE_REFRESH", default=False) @property def create_client(self): From 6b2c2d73d6748f97088e98564aaf7e9ab7243bed Mon Sep 17 00:00:00 2001 From: jonhealy1 Date: Wed, 7 May 2025 19:26:13 +0800 Subject: [PATCH 03/13] update changelog --- CHANGELOG.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cdc77391..e3b3545f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. - Added logging to bulk insertion methods to provide detailed feedback on errors encountered during operations. [#364](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/364) - Introduced the `RAISE_ON_BULK_ERROR` environment variable to control whether bulk insertion methods raise exceptions on errors (`true`) or log warnings and continue processing (`false`). [#364](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/364) - Added code coverage reporting to the test suite using pytest-cov. [#87](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/issues/87) +- Introduced the `DATABASE_REFRESH` environment variable to control whether database operations refresh the index immediately after changes. If set to `true`, changes will be immediately searchable. If set to `false`, changes may not be immediately visible but can improve performance for bulk operations. [#370](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/370) ### Changed @@ -19,11 +20,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. - Extended Datetime Search to search on start_datetime and end_datetime as well as datetime fields. [#182](https://github.com/stac-utils/stac-fastapi-elasticsearch/pull/182) - Changed item update operation to use Elasticsearch index API instead of delete and create for better efficiency and atomicity. [#75](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/issues/75) - Bulk insertion via `BulkTransactionsClient` now strictly validates all STAC Items using the Pydantic model before insertion. Any invalid item will immediately raise a `ValidationError`, ensuring consistent validation with single-item inserts and preventing invalid STAC Items from being stored. This validation is enforced regardless of the `RAISE_ON_BULK_ERROR` setting. [#368](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/368) - +- Refactored CRUD methods in `TransactionsClient` to use the `_resolve_refresh` helper method for consistent and reusable handling of the `refresh` parameter. [#370](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/370) ### Fixed - Refactored `create_item` and `update_item` methods to share unified logic, ensuring consistent conflict detection, validation, and database operations. [#368](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/368) +- Fixed an issue where some routes were not passing the `refresh` parameter from `kwargs` to the database logic, ensuring consistent behavior across all CRUD operations. [#370](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/370) ## [v4.0.0] - 2025-04-23 From b96c8f4dc4204a24409730fc38a2df9f5400d2b4 Mon Sep 17 00:00:00 2001 From: jonhealy1 Date: Wed, 7 May 2025 19:31:12 +0800 Subject: [PATCH 04/13] add tests --- ...irect_response.py => test_config_settings.py} | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) rename stac_fastapi/tests/elasticsearch/{test_direct_response.py => test_config_settings.py} (70%) diff --git a/stac_fastapi/tests/elasticsearch/test_direct_response.py b/stac_fastapi/tests/elasticsearch/test_config_settings.py similarity index 70% rename from stac_fastapi/tests/elasticsearch/test_direct_response.py rename to stac_fastapi/tests/elasticsearch/test_config_settings.py index bbbceb56..963ec8aa 100644 --- a/stac_fastapi/tests/elasticsearch/test_direct_response.py +++ b/stac_fastapi/tests/elasticsearch/test_config_settings.py @@ -37,3 +37,19 @@ def test_enable_direct_response_false(monkeypatch): settings_class, _ = get_settings_class() settings = settings_class() assert settings.enable_direct_response is False + + +def test_database_refresh_true(monkeypatch): + """Test that DATABASE_REFRESH env var enables database refresh.""" + monkeypatch.setenv("DATABASE_REFRESH", "true") + settings_class, _ = get_settings_class() + settings = settings_class() + assert settings.database_refresh is True + + +def test_database_refresh_false(monkeypatch): + """Test that DATABASE_REFRESH env var disables database refresh.""" + monkeypatch.setenv("DATABASE_REFRESH", "false") + settings_class, _ = get_settings_class() + settings = settings_class() + assert settings.database_refresh is False From 203f4715ebfc2124293e35f0f4a9048689b37526 Mon Sep 17 00:00:00 2001 From: jonhealy1 Date: Wed, 7 May 2025 19:32:37 +0800 Subject: [PATCH 05/13] move bulk transactions tests --- .../tests/{clients => extensions}/test_bulk_transactions.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename stac_fastapi/tests/{clients => extensions}/test_bulk_transactions.py (100%) diff --git a/stac_fastapi/tests/clients/test_bulk_transactions.py b/stac_fastapi/tests/extensions/test_bulk_transactions.py similarity index 100% rename from stac_fastapi/tests/clients/test_bulk_transactions.py rename to stac_fastapi/tests/extensions/test_bulk_transactions.py From e7198f507d4ba0654e6a01cff2a9e0f26d1accb0 Mon Sep 17 00:00:00 2001 From: jonhealy1 Date: Wed, 7 May 2025 19:37:36 +0800 Subject: [PATCH 06/13] add init --- stac_fastapi/tests/extensions/__init__.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 stac_fastapi/tests/extensions/__init__.py diff --git a/stac_fastapi/tests/extensions/__init__.py b/stac_fastapi/tests/extensions/__init__.py new file mode 100644 index 00000000..e69de29b From 5bb835625e6063b4d7458aca087413a1008e32d3 Mon Sep 17 00:00:00 2001 From: jonhealy1 Date: Thu, 8 May 2025 19:24:14 +0800 Subject: [PATCH 07/13] add wait_for, reorg es db logic --- stac_fastapi/core/stac_fastapi/core/core.py | 77 +------ .../core/stac_fastapi/core/utilities.py | 24 +++ .../stac_fastapi/elasticsearch/config.py | 40 +++- .../elasticsearch/database_logic.py | 188 +++++++++++++++--- .../stac_fastapi/opensearch/config.py | 40 +++- .../test_config_settings.py | 8 + 6 files changed, 278 insertions(+), 99 deletions(-) rename stac_fastapi/tests/{elasticsearch => config}/test_config_settings.py (86%) diff --git a/stac_fastapi/core/stac_fastapi/core/core.py b/stac_fastapi/core/stac_fastapi/core/core.py index 833fc6dd..987acdf6 100644 --- a/stac_fastapi/core/stac_fastapi/core/core.py +++ b/stac_fastapi/core/stac_fastapi/core/core.py @@ -673,27 +673,6 @@ class TransactionsClient(AsyncBaseTransactionsClient): settings: ApiBaseSettings = attr.ib() session: Session = attr.ib(default=attr.Factory(Session.create_from_env)) - def _resolve_refresh(self, **kwargs) -> bool: - """ - Resolve the `refresh` parameter from kwargs or the environment variable. - - Args: - **kwargs: Additional keyword arguments, including `refresh`. - - Returns: - bool: The resolved value of the `refresh` parameter. - """ - refresh = kwargs.get( - "refresh", self.database.async_settings.database_refresh == "true" - ) - if "refresh" in kwargs: - logger.info(f"`refresh` parameter explicitly passed in kwargs: {refresh}") - else: - logger.info( - f"`refresh` parameter derived from environment variable: {refresh}" - ) - return refresh - @overrides async def create_item( self, collection_id: str, item: Union[Item, ItemCollection], **kwargs @@ -717,9 +696,6 @@ async def create_item( request = kwargs.get("request") base_url = str(request.base_url) - # Resolve the `refresh` parameter - refresh = self._resolve_refresh(**kwargs) - # Convert Pydantic model to dict for uniform processing item_dict = item.model_dump(mode="json") @@ -738,9 +714,9 @@ async def create_item( attempted = len(processed_items) success, errors = await self.database.bulk_async( - collection_id, - processed_items, - refresh=refresh, + collection_id=collection_id, + processed_items=processed_items, + **kwargs, ) if errors: logger.error( @@ -754,10 +730,7 @@ async def create_item( # Handle single item await self.database.create_item( - item_dict, - refresh=refresh, - base_url=base_url, - exist_ok=False, + item_dict, base_url=base_url, exist_ok=False, **kwargs ) return ItemSerializer.db_to_stac(item_dict, base_url) @@ -783,14 +756,11 @@ async def update_item( item = item.model_dump(mode="json") base_url = str(kwargs["request"].base_url) - # Resolve the `refresh` parameter - refresh = self._resolve_refresh(**kwargs) - now = datetime_type.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") item["properties"]["updated"] = now await self.database.create_item( - item, refresh=refresh, base_url=base_url, exist_ok=True + item, base_url=base_url, exist_ok=True, **kwargs ) return ItemSerializer.db_to_stac(item, base_url) @@ -806,11 +776,8 @@ async def delete_item(self, item_id: str, collection_id: str, **kwargs) -> None: Returns: None: Returns 204 No Content on successful deletion """ - # Resolve the `refresh` parameter - refresh = self._resolve_refresh(**kwargs) - await self.database.delete_item( - item_id=item_id, collection_id=collection_id, refresh=refresh + item_id=item_id, collection_id=collection_id, **kwargs ) return None @@ -833,11 +800,8 @@ async def create_collection( collection = collection.model_dump(mode="json") request = kwargs["request"] - # Resolve the `refresh` parameter - refresh = self._resolve_refresh(**kwargs) - collection = self.database.collection_serializer.stac_to_db(collection, request) - await self.database.create_collection(collection=collection, refresh=refresh) + await self.database.create_collection(collection=collection, **kwargs) return CollectionSerializer.db_to_stac( collection, request, @@ -871,12 +835,9 @@ async def update_collection( request = kwargs["request"] - # Resolve the `refresh` parameter - refresh = self._resolve_refresh(**kwargs) - collection = self.database.collection_serializer.stac_to_db(collection, request) await self.database.update_collection( - collection_id=collection_id, collection=collection, refresh=refresh + collection_id=collection_id, collection=collection, **kwargs ) return CollectionSerializer.db_to_stac( @@ -901,12 +862,7 @@ async def delete_collection(self, collection_id: str, **kwargs) -> None: Raises: NotFoundError: If the collection doesn't exist """ - # Resolve the `refresh` parameter - refresh = self._resolve_refresh(**kwargs) - - await self.database.delete_collection( - collection_id=collection_id, refresh=refresh - ) + await self.database.delete_collection(collection_id=collection_id, **kwargs) return None @@ -965,19 +921,6 @@ def bulk_item_insert( else: base_url = "" - # Use `refresh` from kwargs if provided, otherwise fall back to the environment variable - refresh = kwargs.get( - "refresh", self.database.sync_settings.database_refresh == "true" - ) - - # Log the value of `refresh` and its source - if "refresh" in kwargs: - logger.info(f"`refresh` parameter explicitly passed in kwargs: {refresh}") - else: - logger.info( - f"`refresh` parameter derived from environment variable: {refresh}" - ) - processed_items = [] for item in items.items.values(): try: @@ -996,7 +939,7 @@ def bulk_item_insert( success, errors = self.database.bulk_sync( collection_id, processed_items, - refresh=refresh, + **kwargs, ) if errors: logger.error(f"Bulk sync operation encountered errors: {errors}") diff --git a/stac_fastapi/core/stac_fastapi/core/utilities.py b/stac_fastapi/core/stac_fastapi/core/utilities.py index e7aafe67..16f9abd8 100644 --- a/stac_fastapi/core/stac_fastapi/core/utilities.py +++ b/stac_fastapi/core/stac_fastapi/core/utilities.py @@ -39,6 +39,30 @@ def get_bool_env(name: str, default: bool = False) -> bool: return default +def resolve_refresh(refresh: str) -> str: + """ + Resolve the `refresh` parameter from kwargs or the environment variable. + + Args: + refresh (str): The `refresh` parameter value. + + Returns: + str: The resolved value of the `refresh` parameter, which can be "true", "false", or "wait_for". + """ + logger = logging.getLogger(__name__) + + # Normalize and validate the `refresh` value + refresh = refresh.lower() + if refresh not in {"true", "false", "wait_for"}: + raise ValueError( + "Invalid value for `refresh`. Must be 'true', 'false', or 'wait_for'." + ) + + # Log the resolved value + logger.info(f"`refresh` parameter resolved to: {refresh}") + return refresh + + def bbox2polygon(b0: float, b1: float, b2: float, b3: float) -> List[List[List[float]]]: """Transform a bounding box represented by its four coordinates `b0`, `b1`, `b2`, and `b3` into a polygon. diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py index a87a7a66..66deb2e7 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py @@ -3,7 +3,7 @@ import logging import os import ssl -from typing import Any, Dict, Set +from typing import Any, Dict, Set, Union import certifi from elasticsearch._async.client import AsyncElasticsearch @@ -87,7 +87,24 @@ class ElasticsearchSettings(ApiSettings, ApiBaseSettings): enable_response_models: bool = False enable_direct_response: bool = get_bool_env("ENABLE_DIRECT_RESPONSE", default=False) raise_on_bulk_error: bool = get_bool_env("RAISE_ON_BULK_ERROR", default=False) - database_refresh: bool = get_bool_env("DATABASE_REFRESH", default=False) + + @property + def database_refresh(self) -> Union[bool, str]: + """ + Get the value of the DATABASE_REFRESH environment variable. + + Returns: + Union[bool, str]: The value of DATABASE_REFRESH, which can be True, False, or "wait_for". + """ + value = os.getenv("DATABASE_REFRESH", "false").lower() + if value in {"true", "false"}: + return value == "true" + elif value == "wait_for": + return "wait_for" + else: + raise ValueError( + "Invalid value for DATABASE_REFRESH. Must be 'true', 'false', or 'wait_for'." + ) @property def create_client(self): @@ -109,7 +126,24 @@ class AsyncElasticsearchSettings(ApiSettings, ApiBaseSettings): enable_response_models: bool = False enable_direct_response: bool = get_bool_env("ENABLE_DIRECT_RESPONSE", default=False) raise_on_bulk_error: bool = get_bool_env("RAISE_ON_BULK_ERROR", default=False) - database_refresh: bool = get_bool_env("DATABASE_REFRESH", default=False) + + @property + def database_refresh(self) -> Union[bool, str]: + """ + Get the value of the DATABASE_REFRESH environment variable. + + Returns: + Union[bool, str]: The value of DATABASE_REFRESH, which can be True, False, or "wait_for". + """ + value = os.getenv("DATABASE_REFRESH", "false").lower() + if value in {"true", "false"}: + return value == "true" + elif value == "wait_for": + return "wait_for" + else: + raise ValueError( + "Invalid value for DATABASE_REFRESH. Must be 'true', 'false', or 'wait_for'." + ) @property def create_client(self): diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py index 9a773230..522aad67 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py @@ -31,7 +31,7 @@ ) from stac_fastapi.core.extensions import filter from stac_fastapi.core.serializers import CollectionSerializer, ItemSerializer -from stac_fastapi.core.utilities import MAX_LIMIT, bbox2polygon +from stac_fastapi.core.utilities import MAX_LIMIT, bbox2polygon, resolve_refresh from stac_fastapi.elasticsearch.config import AsyncElasticsearchSettings from stac_fastapi.elasticsearch.config import ( ElasticsearchSettings as SyncElasticsearchSettings, @@ -845,14 +845,16 @@ def bulk_sync_prep_create_item( async def create_item( self, item: Item, - refresh: bool = False, base_url: str = "", exist_ok: bool = False, + **kwargs: Any, ): """Database logic for creating one item. Args: item (Item): The item to be created. + base_url (str, optional): The base URL for the item. Defaults to an empty string. + exist_ok (bool, optional): Whether to allow the item to exist already. Defaults to False. refresh (bool, optional): Refresh the index after performing the operation. Defaults to False. Raises: @@ -861,12 +863,28 @@ async def create_item( Returns: None """ - # todo: check if collection exists, but cache + # Extract item and collection IDs item_id = item["id"] collection_id = item["collection"] + + # Ensure kwargs is a dictionary + kwargs = kwargs or {} + + # Resolve the `refresh` parameter + refresh = kwargs.get("refresh", self.async_settings.database_refresh) + refresh = resolve_refresh(str(refresh).lower()) + + # Log the creation attempt + logger.info( + f"Creating item {item_id} in collection {collection_id} with refresh={refresh}" + ) + + # Prepare the item for insertion item = await self.async_prep_create_item( item=item, base_url=base_url, exist_ok=exist_ok ) + + # Index the item in the database await self.client.index( index=index_alias_by_collection_id(collection_id), id=mk_item_id(item_id, collection_id), @@ -874,9 +892,7 @@ async def create_item( refresh=refresh, ) - async def delete_item( - self, item_id: str, collection_id: str, refresh: bool = False - ): + async def delete_item(self, item_id: str, collection_id: str, **kwargs: Any): """Delete a single item from the database. Args: @@ -887,13 +903,27 @@ async def delete_item( Raises: NotFoundError: If the Item does not exist in the database. """ + # Ensure kwargs is a dictionary + kwargs = kwargs or {} + + # Resolve the `refresh` parameter + refresh = kwargs.get("refresh", self.async_settings.database_refresh) + refresh = resolve_refresh(str(refresh).lower()) + + # Log the deletion attempt + logger.info( + f"Deleting item {item_id} from collection {collection_id} with refresh={refresh}" + ) + try: + # Perform the delete operation await self.client.delete( index=index_alias_by_collection_id(collection_id), id=mk_item_id(item_id, collection_id), refresh=refresh, ) except ESNotFoundError: + # Raise a custom NotFoundError if the item does not exist raise NotFoundError( f"Item {item_id} in collection {collection_id} not found" ) @@ -916,12 +946,12 @@ async def get_items_mapping(self, collection_id: str) -> Dict[str, Any]: except ESNotFoundError: raise NotFoundError(f"Mapping for index {index_name} not found") - async def create_collection(self, collection: Collection, refresh: bool = False): + async def create_collection(self, collection: Collection, **kwargs: Any): """Create a single collection in the database. Args: collection (Collection): The Collection object to be created. - refresh (bool, optional): Whether to refresh the index after the creation. Default is False. + refresh (str, optional): Whether to refresh the index after the creation. Can be "true", "false", or "wait_for". Raises: ConflictError: If a Collection with the same id already exists in the database. @@ -931,9 +961,21 @@ async def create_collection(self, collection: Collection, refresh: bool = False) """ collection_id = collection["id"] + # Ensure kwargs is a dictionary + kwargs = kwargs or {} + + # Resolve the `refresh` parameter + refresh = kwargs.get("refresh", self.async_settings.database_refresh) + refresh = resolve_refresh(str(refresh).lower()) + + # Log the creation attempt + logger.info(f"Creating collection {collection_id} with refresh={refresh}") + + # Check if the collection already exists if await self.client.exists(index=COLLECTIONS_INDEX, id=collection_id): raise ConflictError(f"Collection {collection_id} already exists") + # Index the collection in the database await self.client.index( index=COLLECTIONS_INDEX, id=collection_id, @@ -941,6 +983,7 @@ async def create_collection(self, collection: Collection, refresh: bool = False) refresh=refresh, ) + # Create the item index for the collection await create_item_index(collection_id) async def find_collection(self, collection_id: str) -> Collection: @@ -970,29 +1013,48 @@ async def find_collection(self, collection_id: str) -> Collection: return collection["_source"] async def update_collection( - self, collection_id: str, collection: Collection, refresh: bool = False + self, collection_id: str, collection: Collection, **kwargs: Any ): - """Update a collection from the database. + """Update a collection in the database. Args: - self: The instance of the object calling this function. collection_id (str): The ID of the collection to be updated. collection (Collection): The Collection object to be used for the update. + kwargs (Any, optional): Additional keyword arguments, including `refresh`. Raises: - NotFoundError: If the collection with the given `collection_id` is not - found in the database. + NotFoundError: If the collection with the given `collection_id` is not found in the database. + ConflictError: If a conflict occurs during the update. Notes: This function updates the collection in the database using the specified - `collection_id` and with the collection specified in the `Collection` object. - If the collection is not found, a `NotFoundError` is raised. + `collection_id` and the provided `Collection` object. If the collection ID + changes, the function creates a new collection, reindexes the items, and deletes + the old collection. """ + # Ensure kwargs is a dictionary + kwargs = kwargs or {} + + # Resolve the `refresh` parameter + refresh = kwargs.get("refresh", self.async_settings.database_refresh) + refresh = resolve_refresh(str(refresh).lower()) + + # Log the update attempt + logger.info(f"Updating collection {collection_id} with refresh={refresh}") + + # Ensure the collection exists await self.find_collection(collection_id=collection_id) + # Handle collection ID change if collection_id != collection["id"]: + logger.info( + f"Collection ID change detected: {collection_id} -> {collection['id']}" + ) + + # Create the new collection await self.create_collection(collection, refresh=refresh) + # Reindex items from the old collection to the new collection await self.client.reindex( body={ "dest": {"index": f"{ITEMS_INDEX_PREFIX}{collection['id']}"}, @@ -1006,9 +1068,11 @@ async def update_collection( refresh=refresh, ) + # Delete the old collection await self.delete_collection(collection_id) else: + # Update the existing collection await self.client.index( index=COLLECTIONS_INDEX, id=collection_id, @@ -1016,33 +1080,52 @@ async def update_collection( refresh=refresh, ) - async def delete_collection(self, collection_id: str, refresh: bool = False): + async def delete_collection(self, collection_id: str, **kwargs: Any): """Delete a collection from the database. Parameters: - self: The instance of the object calling this function. collection_id (str): The ID of the collection to be deleted. - refresh (bool): Whether to refresh the index after the deletion (default: False). + kwargs (Any, optional): Additional keyword arguments, including `refresh`. Raises: NotFoundError: If the collection with the given `collection_id` is not found in the database. Notes: This function first verifies that the collection with the specified `collection_id` exists in the database, and then - deletes the collection. If `refresh` is set to True, the index is refreshed after the deletion. Additionally, this - function also calls `delete_item_index` to delete the index for the items in the collection. + deletes the collection. If `refresh` is set to "true", "false", or "wait_for", the index is refreshed accordingly after + the deletion. Additionally, this function also calls `delete_item_index` to delete the index for the items in the collection. """ + # Ensure kwargs is a dictionary + kwargs = kwargs or {} + + # Verify that the collection exists await self.find_collection(collection_id=collection_id) + + # Resolve the `refresh` parameter + refresh = kwargs.get("refresh", self.async_settings.database_refresh) + refresh = resolve_refresh(str(refresh).lower()) + + # Log the deletion attempt + logger.info(f"Deleting collection {collection_id} with refresh={refresh}") + + # Delete the collection from the database await self.client.delete( index=COLLECTIONS_INDEX, id=collection_id, refresh=refresh ) - await delete_item_index(collection_id) + + # Delete the item index for the collection + try: + await delete_item_index(collection_id) + except Exception as e: + logger.error( + f"Failed to delete item index for collection {collection_id}: {e}" + ) async def bulk_async( self, collection_id: str, processed_items: List[Item], - refresh: bool = False, + **kwargs: Any, ) -> Tuple[int, List[Dict[str, Any]]]: """ Perform a bulk insert of items into the database asynchronously. @@ -1063,6 +1146,24 @@ async def bulk_async( The `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to True, the index is refreshed after the bulk insert. """ + # Ensure kwargs is a dictionary + kwargs = kwargs or {} + + # Resolve the `refresh` parameter + refresh = kwargs.get("refresh", self.async_settings.database_refresh) + refresh = resolve_refresh(str(refresh).lower()) + + # Log the bulk insert attempt + logger.info( + f"Performing bulk insert for collection {collection_id} with refresh={refresh}" + ) + + # Handle empty processed_items + if not processed_items: + logger.warning(f"No items to insert for collection {collection_id}") + return 0, [] + + # Perform the bulk insert raise_on_error = self.async_settings.raise_on_bulk_error success, errors = await helpers.async_bulk( self.client, @@ -1070,13 +1171,19 @@ async def bulk_async( refresh=refresh, raise_on_error=raise_on_error, ) + + # Log the result + logger.info( + f"Bulk insert completed for collection {collection_id}: {success} successes, {len(errors)} errors" + ) + return success, errors def bulk_sync( self, collection_id: str, processed_items: List[Item], - refresh: bool = False, + **kwargs: Any, ) -> Tuple[int, List[Dict[str, Any]]]: """ Perform a bulk insert of items into the database synchronously. @@ -1084,7 +1191,9 @@ def bulk_sync( Args: collection_id (str): The ID of the collection to which the items belong. processed_items (List[Item]): A list of `Item` objects to be inserted into the database. - refresh (bool): Whether to refresh the index after the bulk insert (default: False). + **kwargs (Any): Additional keyword arguments, including: + - refresh (str, optional): Whether to refresh the index after the bulk insert. + Can be "true", "false", or "wait_for". Defaults to the value of `self.sync_settings.database_refresh`. Returns: Tuple[int, List[Dict[str, Any]]]: A tuple containing: @@ -1094,9 +1203,30 @@ def bulk_sync( Notes: This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`. The insert is performed synchronously and blocking, meaning that the function does not return until the insert has - completed. The `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to - True, the index is refreshed after the bulk insert. + completed. The `mk_actions` function is called to generate a list of actions for the bulk insert. The `refresh` + parameter determines whether the index is refreshed after the bulk insert: + - "true": Forces an immediate refresh of the index. + - "false": Does not refresh the index immediately (default behavior). + - "wait_for": Waits for the next refresh cycle to make the changes visible. """ + # Ensure kwargs is a dictionary + kwargs = kwargs or {} + + # Resolve the `refresh` parameter + refresh = kwargs.get("refresh", self.sync_settings.database_refresh) + refresh = resolve_refresh(str(refresh).lower()) + + # Log the bulk insert attempt + logger.info( + f"Performing bulk insert for collection {collection_id} with refresh={refresh}" + ) + + # Handle empty processed_items + if not processed_items: + logger.warning(f"No items to insert for collection {collection_id}") + return 0, [] + + # Perform the bulk insert raise_on_error = self.sync_settings.raise_on_bulk_error success, errors = helpers.bulk( self.sync_client, @@ -1104,6 +1234,12 @@ def bulk_sync( refresh=refresh, raise_on_error=raise_on_error, ) + + # Log the result + logger.info( + f"Bulk insert completed for collection {collection_id}: {success} successes, {len(errors)} errors" + ) + return success, errors # DANGER diff --git a/stac_fastapi/opensearch/stac_fastapi/opensearch/config.py b/stac_fastapi/opensearch/stac_fastapi/opensearch/config.py index a8902af3..217e686c 100644 --- a/stac_fastapi/opensearch/stac_fastapi/opensearch/config.py +++ b/stac_fastapi/opensearch/stac_fastapi/opensearch/config.py @@ -2,7 +2,7 @@ import logging import os import ssl -from typing import Any, Dict, Set +from typing import Any, Dict, Set, Union import certifi from opensearchpy import AsyncOpenSearch, OpenSearch @@ -84,7 +84,24 @@ class OpensearchSettings(ApiSettings, ApiBaseSettings): enable_response_models: bool = False enable_direct_response: bool = get_bool_env("ENABLE_DIRECT_RESPONSE", default=False) raise_on_bulk_error: bool = get_bool_env("RAISE_ON_BULK_ERROR", default=False) - database_refresh: bool = get_bool_env("DATABASE_REFRESH", default=False) + + @property + def database_refresh(self) -> Union[bool, str]: + """ + Get the value of the DATABASE_REFRESH environment variable. + + Returns: + Union[bool, str]: The value of DATABASE_REFRESH, which can be True, False, or "wait_for". + """ + value = os.getenv("DATABASE_REFRESH", "false").lower() + if value in {"true", "false"}: + return value == "true" + elif value == "wait_for": + return "wait_for" + else: + raise ValueError( + "Invalid value for DATABASE_REFRESH. Must be 'true', 'false', or 'wait_for'." + ) @property def create_client(self): @@ -106,7 +123,24 @@ class AsyncOpensearchSettings(ApiSettings, ApiBaseSettings): enable_response_models: bool = False enable_direct_response: bool = get_bool_env("ENABLE_DIRECT_RESPONSE", default=False) raise_on_bulk_error: bool = get_bool_env("RAISE_ON_BULK_ERROR", default=False) - database_refresh: bool = get_bool_env("DATABASE_REFRESH", default=False) + + @property + def database_refresh(self) -> Union[bool, str]: + """ + Get the value of the DATABASE_REFRESH environment variable. + + Returns: + Union[bool, str]: The value of DATABASE_REFRESH, which can be True, False, or "wait_for". + """ + value = os.getenv("DATABASE_REFRESH", "false").lower() + if value in {"true", "false"}: + return value == "true" + elif value == "wait_for": + return "wait_for" + else: + raise ValueError( + "Invalid value for DATABASE_REFRESH. Must be 'true', 'false', or 'wait_for'." + ) @property def create_client(self): diff --git a/stac_fastapi/tests/elasticsearch/test_config_settings.py b/stac_fastapi/tests/config/test_config_settings.py similarity index 86% rename from stac_fastapi/tests/elasticsearch/test_config_settings.py rename to stac_fastapi/tests/config/test_config_settings.py index 963ec8aa..0a8399be 100644 --- a/stac_fastapi/tests/elasticsearch/test_config_settings.py +++ b/stac_fastapi/tests/config/test_config_settings.py @@ -53,3 +53,11 @@ def test_database_refresh_false(monkeypatch): settings_class, _ = get_settings_class() settings = settings_class() assert settings.database_refresh is False + + +def test_database_refresh_wait_for(monkeypatch): + """Test that DATABASE_REFRESH env var sets database refresh to 'wait_for'.""" + monkeypatch.setenv("DATABASE_REFRESH", "wait_for") + settings_class, _ = get_settings_class() + settings = settings_class() + assert settings.database_refresh == "wait_for" From 204eac984f8449613923dea79e60bd48a7e83f75 Mon Sep 17 00:00:00 2001 From: jonhealy1 Date: Thu, 8 May 2025 19:48:17 +0800 Subject: [PATCH 08/13] update os db logic --- .../stac_fastapi/opensearch/database_logic.py | 158 +++++++++++++++--- 1 file changed, 133 insertions(+), 25 deletions(-) diff --git a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py index 66c8d3e6..00967c63 100644 --- a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py +++ b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py @@ -31,7 +31,7 @@ ) from stac_fastapi.core.extensions import filter from stac_fastapi.core.serializers import CollectionSerializer, ItemSerializer -from stac_fastapi.core.utilities import MAX_LIMIT, bbox2polygon +from stac_fastapi.core.utilities import MAX_LIMIT, bbox2polygon, resolve_refresh from stac_fastapi.opensearch.config import ( AsyncOpensearchSettings as AsyncSearchSettings, ) @@ -864,15 +864,17 @@ def bulk_sync_prep_create_item( async def create_item( self, item: Item, - refresh: bool = False, base_url: str = "", exist_ok: bool = False, + **kwargs: Any, ): """Database logic for creating one item. Args: item (Item): The item to be created. - refresh (bool, optional): Refresh the index after performing the operation. Defaults to False. + base_url (str, optional): The base URL for the item. Defaults to an empty string. + exist_ok (bool, optional): Whether to allow the item to exist already. Defaults to False. + **kwargs: Additional keyword arguments like refresh. Raises: ConflictError: If the item already exists in the database. @@ -883,6 +885,19 @@ async def create_item( # todo: check if collection exists, but cache item_id = item["id"] collection_id = item["collection"] + + # Ensure kwargs is a dictionary + kwargs = kwargs or {} + + # Resolve the `refresh` parameter + refresh = kwargs.get("refresh", self.async_settings.database_refresh) + refresh = resolve_refresh(str(refresh).lower()) + + # Log the creation attempt + logger.info( + f"Creating item {item_id} in collection {collection_id} with refresh={refresh}" + ) + item = await self.async_prep_create_item( item=item, base_url=base_url, exist_ok=exist_ok ) @@ -893,19 +908,29 @@ async def create_item( refresh=refresh, ) - async def delete_item( - self, item_id: str, collection_id: str, refresh: bool = False - ): + async def delete_item(self, item_id: str, collection_id: str, **kwargs: Any): """Delete a single item from the database. Args: item_id (str): The id of the Item to be deleted. collection_id (str): The id of the Collection that the Item belongs to. - refresh (bool, optional): Whether to refresh the index after the deletion. Default is False. + **kwargs: Additional keyword arguments like refresh. Raises: NotFoundError: If the Item does not exist in the database. """ + # Ensure kwargs is a dictionary + kwargs = kwargs or {} + + # Resolve the `refresh` parameter + refresh = kwargs.get("refresh", self.async_settings.database_refresh) + refresh = resolve_refresh(str(refresh).lower()) + + # Log the deletion attempt + logger.info( + f"Deleting item {item_id} from collection {collection_id} with refresh={refresh}" + ) + try: await self.client.delete( index=index_alias_by_collection_id(collection_id), @@ -935,12 +960,12 @@ async def get_items_mapping(self, collection_id: str) -> Dict[str, Any]: except exceptions.NotFoundError: raise NotFoundError(f"Mapping for index {index_name} not found") - async def create_collection(self, collection: Collection, refresh: bool = False): + async def create_collection(self, collection: Collection, **kwargs: Any): """Create a single collection in the database. Args: collection (Collection): The Collection object to be created. - refresh (bool, optional): Whether to refresh the index after the creation. Default is False. + **kwargs: Additional keyword arguments like refresh. Raises: ConflictError: If a Collection with the same id already exists in the database. @@ -950,6 +975,16 @@ async def create_collection(self, collection: Collection, refresh: bool = False) """ collection_id = collection["id"] + # Ensure kwargs is a dictionary + kwargs = kwargs or {} + + # Resolve the `refresh` parameter + refresh = kwargs.get("refresh", self.async_settings.database_refresh) + refresh = resolve_refresh(str(refresh).lower()) + + # Log the creation attempt + logger.info(f"Creating collection {collection_id} with refresh={refresh}") + if await self.client.exists(index=COLLECTIONS_INDEX, id=collection_id): raise ConflictError(f"Collection {collection_id} already exists") @@ -989,14 +1024,14 @@ async def find_collection(self, collection_id: str) -> Collection: return collection["_source"] async def update_collection( - self, collection_id: str, collection: Collection, refresh: bool = False + self, collection_id: str, collection: Collection, **kwargs: Any ): """Update a collection from the database. Args: - self: The instance of the object calling this function. collection_id (str): The ID of the collection to be updated. collection (Collection): The Collection object to be used for the update. + **kwargs: Additional keyword arguments like refresh. Raises: NotFoundError: If the collection with the given `collection_id` is not @@ -1007,9 +1042,23 @@ async def update_collection( `collection_id` and with the collection specified in the `Collection` object. If the collection is not found, a `NotFoundError` is raised. """ + # Ensure kwargs is a dictionary + kwargs = kwargs or {} + + # Resolve the `refresh` parameter + refresh = kwargs.get("refresh", self.async_settings.database_refresh) + refresh = resolve_refresh(str(refresh).lower()) + + # Log the update attempt + logger.info(f"Updating collection {collection_id} with refresh={refresh}") + await self.find_collection(collection_id=collection_id) if collection_id != collection["id"]: + logger.info( + f"Collection ID change detected: {collection_id} -> {collection['id']}" + ) + await self.create_collection(collection, refresh=refresh) await self.client.reindex( @@ -1025,7 +1074,7 @@ async def update_collection( refresh=refresh, ) - await self.delete_collection(collection_id) + await self.delete_collection(collection_id=collection_id, **kwargs) else: await self.client.index( @@ -1035,23 +1084,34 @@ async def update_collection( refresh=refresh, ) - async def delete_collection(self, collection_id: str, refresh: bool = False): + async def delete_collection(self, collection_id: str, **kwargs: Any): """Delete a collection from the database. Parameters: self: The instance of the object calling this function. collection_id (str): The ID of the collection to be deleted. - refresh (bool): Whether to refresh the index after the deletion (default: False). + **kwargs: Additional keyword arguments like refresh. Raises: NotFoundError: If the collection with the given `collection_id` is not found in the database. Notes: This function first verifies that the collection with the specified `collection_id` exists in the database, and then - deletes the collection. If `refresh` is set to True, the index is refreshed after the deletion. Additionally, this - function also calls `delete_item_index` to delete the index for the items in the collection. + deletes the collection. If `refresh` is set to "true", "false", or "wait_for", the index is refreshed accordingly after + the deletion. Additionally, this function also calls `delete_item_index` to delete the index for the items in the collection. """ + # Ensure kwargs is a dictionary + kwargs = kwargs or {} + await self.find_collection(collection_id=collection_id) + + # Resolve the `refresh` parameter + refresh = kwargs.get("refresh", self.async_settings.database_refresh) + refresh = resolve_refresh(str(refresh).lower()) + + # Log the deletion attempt + logger.info(f"Deleting collection {collection_id} with refresh={refresh}") + await self.client.delete( index=COLLECTIONS_INDEX, id=collection_id, refresh=refresh ) @@ -1061,7 +1121,7 @@ async def bulk_async( self, collection_id: str, processed_items: List[Item], - refresh: bool = False, + **kwargs: Any, ) -> Tuple[int, List[Dict[str, Any]]]: """ Perform a bulk insert of items into the database asynchronously. @@ -1069,7 +1129,9 @@ async def bulk_async( Args: collection_id (str): The ID of the collection to which the items belong. processed_items (List[Item]): A list of `Item` objects to be inserted into the database. - refresh (bool): Whether to refresh the index after the bulk insert (default: False). + **kwargs (Any): Additional keyword arguments, including: + - refresh (str, optional): Whether to refresh the index after the bulk insert. + Can be "true", "false", or "wait_for". Defaults to the value of `self.sync_settings.database_refresh`. Returns: Tuple[int, List[Dict[str, Any]]]: A tuple containing: @@ -1078,10 +1140,30 @@ async def bulk_async( Notes: This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`. - The insert is performed asynchronously, and the event loop is used to run the operation in a separate executor. - The `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to True, - the index is refreshed after the bulk insert. + The insert is performed synchronously and blocking, meaning that the function does not return until the insert has + completed. The `mk_actions` function is called to generate a list of actions for the bulk insert. The `refresh` + parameter determines whether the index is refreshed after the bulk insert: + - "true": Forces an immediate refresh of the index. + - "false": Does not refresh the index immediately (default behavior). + - "wait_for": Waits for the next refresh cycle to make the changes visible. """ + # Ensure kwargs is a dictionary + kwargs = kwargs or {} + + # Resolve the `refresh` parameter + refresh = kwargs.get("refresh", self.async_settings.database_refresh) + refresh = resolve_refresh(str(refresh).lower()) + + # Log the bulk insert attempt + logger.info( + f"Performing bulk insert for collection {collection_id} with refresh={refresh}" + ) + + # Handle empty processed_items + if not processed_items: + logger.warning(f"No items to insert for collection {collection_id}") + return 0, [] + raise_on_error = self.async_settings.raise_on_bulk_error success, errors = await helpers.async_bulk( self.client, @@ -1089,13 +1171,17 @@ async def bulk_async( refresh=refresh, raise_on_error=raise_on_error, ) + # Log the result + logger.info( + f"Bulk insert completed for collection {collection_id}: {success} successes, {len(errors)} errors" + ) return success, errors def bulk_sync( self, collection_id: str, processed_items: List[Item], - refresh: bool = False, + **kwargs: Any, ) -> Tuple[int, List[Dict[str, Any]]]: """ Perform a bulk insert of items into the database synchronously. @@ -1103,7 +1189,9 @@ def bulk_sync( Args: collection_id (str): The ID of the collection to which the items belong. processed_items (List[Item]): A list of `Item` objects to be inserted into the database. - refresh (bool): Whether to refresh the index after the bulk insert (default: False). + **kwargs (Any): Additional keyword arguments, including: + - refresh (str, optional): Whether to refresh the index after the bulk insert. + Can be "true", "false", or "wait_for". Defaults to the value of `self.sync_settings.database_refresh`. Returns: Tuple[int, List[Dict[str, Any]]]: A tuple containing: @@ -1113,9 +1201,29 @@ def bulk_sync( Notes: This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`. The insert is performed synchronously and blocking, meaning that the function does not return until the insert has - completed. The `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to - True, the index is refreshed after the bulk insert. + completed. The `mk_actions` function is called to generate a list of actions for the bulk insert. The `refresh` + parameter determines whether the index is refreshed after the bulk insert: + - "true": Forces an immediate refresh of the index. + - "false": Does not refresh the index immediately (default behavior). + - "wait_for": Waits for the next refresh cycle to make the changes visible. """ + # Ensure kwargs is a dictionary + kwargs = kwargs or {} + + # Resolve the `refresh` parameter + refresh = kwargs.get("refresh", self.sync_settings.database_refresh) + refresh = resolve_refresh(str(refresh).lower()) + + # Log the bulk insert attempt + logger.info( + f"Performing bulk insert for collection {collection_id} with refresh={refresh}" + ) + + # Handle empty processed_items + if not processed_items: + logger.warning(f"No items to insert for collection {collection_id}") + return 0, [] + raise_on_error = self.sync_settings.raise_on_bulk_error success, errors = helpers.bulk( self.sync_client, From bf6470d6523c0c6560e9d2fdd8c3284e2c70f711 Mon Sep 17 00:00:00 2001 From: jonhealy1 Date: Thu, 8 May 2025 20:01:29 +0800 Subject: [PATCH 09/13] update changelog, readme --- CHANGELOG.md | 2 +- README.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e3b3545f..b6693f73 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,7 +12,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. - Added logging to bulk insertion methods to provide detailed feedback on errors encountered during operations. [#364](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/364) - Introduced the `RAISE_ON_BULK_ERROR` environment variable to control whether bulk insertion methods raise exceptions on errors (`true`) or log warnings and continue processing (`false`). [#364](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/364) - Added code coverage reporting to the test suite using pytest-cov. [#87](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/issues/87) -- Introduced the `DATABASE_REFRESH` environment variable to control whether database operations refresh the index immediately after changes. If set to `true`, changes will be immediately searchable. If set to `false`, changes may not be immediately visible but can improve performance for bulk operations. [#370](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/370) +- Introduced the `DATABASE_REFRESH` environment variable to control whether database operations refresh the index immediately after changes. If set to `true`, changes will be immediately searchable. If set to `false`, changes may not be immediately visible but can improve performance for bulk operations. If set to `wait_for`, changes will wait for the next refresh cycle to become visible. [#370](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/370) ### Changed diff --git a/README.md b/README.md index 7288e290..2bcbbbd3 100644 --- a/README.md +++ b/README.md @@ -115,7 +115,7 @@ You can customize additional settings in your `.env` file: | `OPENSEARCH_VERSION` | OpenSearch version | `2.11.1` | Optional | `ENABLE_DIRECT_RESPONSE` | Enable direct response for maximum performance (disables all FastAPI dependencies, including authentication, custom status codes, and validation) | `false` | Optional | `RAISE_ON_BULK_ERROR` | Controls whether bulk insert operations raise exceptions on errors. If set to `true`, the operation will stop and raise an exception when an error occurs. If set to `false`, errors will be logged, and the operation will continue. **Note:** STAC Item and ItemCollection validation errors will always raise, regardless of this flag. | `false` Optional | -| `DATABASE_REFRESH` | Controls whether database operations refresh the index immediately after changes. If set to `true`, changes will be immediately searchable. If set to `false`, changes may not be immediately visible but can improve performance for bulk operations. | `false` | Optional | +| `DATABASE_REFRESH` | Controls whether database operations refresh the index immediately after changes. If set to `true`, changes will be immediately searchable. If set to `false`, changes may not be immediately visible but can improve performance for bulk operations. If set to `wait_for`, changes will wait for the next refresh cycle to become visible. | `false` | Optional | > [!NOTE] > The variables `ES_HOST`, `ES_PORT`, `ES_USE_SSL`, and `ES_VERIFY_CERTS` apply to both Elasticsearch and OpenSearch backends, so there is no need to rename the key names to `OS_` even if you're using OpenSearch. From 0d6829842204595d075378b6d00a82f2d821010e Mon Sep 17 00:00:00 2001 From: jonhealy1 Date: Thu, 8 May 2025 20:01:39 +0800 Subject: [PATCH 10/13] update docstrings --- .../core/stac_fastapi/core/utilities.py | 3 ++ .../elasticsearch/database_logic.py | 48 +++++++++++++++---- .../stac_fastapi/opensearch/database_logic.py | 8 +++- 3 files changed, 50 insertions(+), 9 deletions(-) diff --git a/stac_fastapi/core/stac_fastapi/core/utilities.py b/stac_fastapi/core/stac_fastapi/core/utilities.py index 16f9abd8..693e8b98 100644 --- a/stac_fastapi/core/stac_fastapi/core/utilities.py +++ b/stac_fastapi/core/stac_fastapi/core/utilities.py @@ -48,6 +48,9 @@ def resolve_refresh(refresh: str) -> str: Returns: str: The resolved value of the `refresh` parameter, which can be "true", "false", or "wait_for". + + Raises: + ValueError: If the `refresh` value is not one of "true", "false", or "wait_for". """ logger = logging.getLogger(__name__) diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py index 522aad67..d35b28b8 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py @@ -855,7 +855,9 @@ async def create_item( item (Item): The item to be created. base_url (str, optional): The base URL for the item. Defaults to an empty string. exist_ok (bool, optional): Whether to allow the item to exist already. Defaults to False. - refresh (bool, optional): Refresh the index after performing the operation. Defaults to False. + **kwargs: Additional keyword arguments. + - refresh (str): Whether to refresh the index after the operation. Can be "true", "false", or "wait_for". + - refresh (bool): Whether to refresh the index after the operation. Defaults to the value in `self.async_settings.database_refresh`. Raises: ConflictError: If the item already exists in the database. @@ -898,10 +900,15 @@ async def delete_item(self, item_id: str, collection_id: str, **kwargs: Any): Args: item_id (str): The id of the Item to be deleted. collection_id (str): The id of the Collection that the Item belongs to. - refresh (bool, optional): Whether to refresh the index after the deletion. Default is False. + **kwargs: Additional keyword arguments. + - refresh (str): Whether to refresh the index after the operation. Can be "true", "false", or "wait_for". + - refresh (bool): Whether to refresh the index after the operation. Defaults to the value in `self.async_settings.database_refresh`. Raises: NotFoundError: If the Item does not exist in the database. + + Returns: + None """ # Ensure kwargs is a dictionary kwargs = kwargs or {} @@ -951,11 +958,16 @@ async def create_collection(self, collection: Collection, **kwargs: Any): Args: collection (Collection): The Collection object to be created. - refresh (str, optional): Whether to refresh the index after the creation. Can be "true", "false", or "wait_for". + **kwargs: Additional keyword arguments. + - refresh (str): Whether to refresh the index after the operation. Can be "true", "false", or "wait_for". + - refresh (bool): Whether to refresh the index after the operation. Defaults to the value in `self.async_settings.database_refresh`. Raises: ConflictError: If a Collection with the same id already exists in the database. + Returns: + None + Notes: A new index is created for the items in the Collection using the `create_item_index` function. """ @@ -1020,7 +1032,11 @@ async def update_collection( Args: collection_id (str): The ID of the collection to be updated. collection (Collection): The Collection object to be used for the update. - kwargs (Any, optional): Additional keyword arguments, including `refresh`. + **kwargs: Additional keyword arguments. + - refresh (str): Whether to refresh the index after the operation. Can be "true", "false", or "wait_for". + - refresh (bool): Whether to refresh the index after the operation. Defaults to the value in `self.async_settings.database_refresh`. + Returns: + None Raises: NotFoundError: If the collection with the given `collection_id` is not found in the database. @@ -1086,10 +1102,15 @@ async def delete_collection(self, collection_id: str, **kwargs: Any): Parameters: collection_id (str): The ID of the collection to be deleted. kwargs (Any, optional): Additional keyword arguments, including `refresh`. + - refresh (str): Whether to refresh the index after the operation. Can be "true", "false", or "wait_for". + - refresh (bool): Whether to refresh the index after the operation. Defaults to the value in `self.async_settings.database_refresh`. Raises: NotFoundError: If the collection with the given `collection_id` is not found in the database. + Returns: + None + Notes: This function first verifies that the collection with the specified `collection_id` exists in the database, and then deletes the collection. If `refresh` is set to "true", "false", or "wait_for", the index is refreshed accordingly after @@ -1133,7 +1154,12 @@ async def bulk_async( Args: collection_id (str): The ID of the collection to which the items belong. processed_items (List[Item]): A list of `Item` objects to be inserted into the database. - refresh (bool): Whether to refresh the index after the bulk insert (default: False). + **kwargs (Any): Additional keyword arguments, including: + - refresh (str, optional): Whether to refresh the index after the bulk insert. + Can be "true", "false", or "wait_for". Defaults to the value of `self.sync_settings.database_refresh`. + - refresh (bool, optional): Whether to refresh the index after the bulk insert. + - raise_on_error (bool, optional): Whether to raise an error if any of the bulk operations fail. + Defaults to the value of `self.async_settings.raise_on_bulk_error`. Returns: Tuple[int, List[Dict[str, Any]]]: A tuple containing: @@ -1142,9 +1168,12 @@ async def bulk_async( Notes: This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`. - The insert is performed asynchronously, and the event loop is used to run the operation in a separate executor. - The `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to True, - the index is refreshed after the bulk insert. + The insert is performed synchronously and blocking, meaning that the function does not return until the insert has + completed. The `mk_actions` function is called to generate a list of actions for the bulk insert. The `refresh` + parameter determines whether the index is refreshed after the bulk insert: + - "true": Forces an immediate refresh of the index. + - "false": Does not refresh the index immediately (default behavior). + - "wait_for": Waits for the next refresh cycle to make the changes visible. """ # Ensure kwargs is a dictionary kwargs = kwargs or {} @@ -1194,6 +1223,9 @@ def bulk_sync( **kwargs (Any): Additional keyword arguments, including: - refresh (str, optional): Whether to refresh the index after the bulk insert. Can be "true", "false", or "wait_for". Defaults to the value of `self.sync_settings.database_refresh`. + - refresh (bool, optional): Whether to refresh the index after the bulk insert. + - raise_on_error (bool, optional): Whether to raise an error if any of the bulk operations fail. + Defaults to the value of `self.async_settings.raise_on_bulk_error`. Returns: Tuple[int, List[Dict[str, Any]]]: A tuple containing: diff --git a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py index 00967c63..067f5618 100644 --- a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py +++ b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py @@ -1132,6 +1132,9 @@ async def bulk_async( **kwargs (Any): Additional keyword arguments, including: - refresh (str, optional): Whether to refresh the index after the bulk insert. Can be "true", "false", or "wait_for". Defaults to the value of `self.sync_settings.database_refresh`. + - refresh (bool, optional): Whether to refresh the index after the bulk insert. + - raise_on_error (bool, optional): Whether to raise an error if any of the bulk operations fail. + Defaults to the value of `self.async_settings.raise_on_bulk_error`. Returns: Tuple[int, List[Dict[str, Any]]]: A tuple containing: @@ -1184,7 +1187,7 @@ def bulk_sync( **kwargs: Any, ) -> Tuple[int, List[Dict[str, Any]]]: """ - Perform a bulk insert of items into the database synchronously. + Perform a bulk insert of items into the database asynchronously. Args: collection_id (str): The ID of the collection to which the items belong. @@ -1192,6 +1195,9 @@ def bulk_sync( **kwargs (Any): Additional keyword arguments, including: - refresh (str, optional): Whether to refresh the index after the bulk insert. Can be "true", "false", or "wait_for". Defaults to the value of `self.sync_settings.database_refresh`. + - refresh (bool, optional): Whether to refresh the index after the bulk insert. + - raise_on_error (bool, optional): Whether to raise an error if any of the bulk operations fail. + Defaults to the value of `self.async_settings.raise_on_bulk_error`. Returns: Tuple[int, List[Dict[str, Any]]]: A tuple containing: From 7f0de2aa8fcf9026e26de03f13436df776d88a64 Mon Sep 17 00:00:00 2001 From: jonhealy1 Date: Sat, 10 May 2025 11:26:59 +0800 Subject: [PATCH 11/13] move repeated logic --- .../core/stac_fastapi/core/utilities.py | 65 +++++++++++++++++-- .../stac_fastapi/elasticsearch/config.py | 24 ++----- .../stac_fastapi/opensearch/config.py | 24 ++----- .../tests/config/test_config_settings.py | 4 +- 4 files changed, 72 insertions(+), 45 deletions(-) diff --git a/stac_fastapi/core/stac_fastapi/core/utilities.py b/stac_fastapi/core/stac_fastapi/core/utilities.py index 693e8b98..243f9cb3 100644 --- a/stac_fastapi/core/stac_fastapi/core/utilities.py +++ b/stac_fastapi/core/stac_fastapi/core/utilities.py @@ -12,20 +12,75 @@ MAX_LIMIT = 10000 -def get_bool_env(name: str, default: bool = False) -> bool: +def validate_refresh(value: Union[str, bool]) -> str: + """ + Validate the `refresh` parameter value. + + Args: + value (Union[str, bool]): The `refresh` parameter value, which can be a string or a boolean. + + Returns: + str: The validated value of the `refresh` parameter, which can be "true", "false", or "wait_for". + """ + logger = logging.getLogger(__name__) + + # Handle boolean-like values using get_bool_env + if isinstance(value, bool) or value in { + "true", + "false", + "1", + "0", + "yes", + "no", + "y", + "n", + }: + is_true = get_bool_env("DATABASE_REFRESH", default=value) + return "true" if is_true else "false" + + # Normalize to lowercase for case-insensitivity + value = value.lower() + + # Handle "wait_for" explicitly + if value == "wait_for": + return "wait_for" + + # Log a warning for invalid values and default to "false" + logger.warning( + f"Invalid value for `refresh`: '{value}'. Expected 'true', 'false', or 'wait_for'. Defaulting to 'false'." + ) + return "false" + + +def get_bool_env(name: str, default: Union[bool, str] = False) -> bool: """ Retrieve a boolean value from an environment variable. Args: name (str): The name of the environment variable. - default (bool, optional): The default value to use if the variable is not set or unrecognized. Defaults to False. + default (Union[bool, str], optional): The default value to use if the variable is not set or unrecognized. Defaults to False. Returns: bool: The boolean value parsed from the environment variable. """ - value = os.getenv(name, str(default).lower()) true_values = ("true", "1", "yes", "y") false_values = ("false", "0", "no", "n") + + # Normalize the default value + if isinstance(default, bool): + default_str = "true" if default else "false" + elif isinstance(default, str): + default_str = default.lower() + else: + logger = logging.getLogger(__name__) + logger.warning( + f"The `default` parameter must be a boolean or string, got {type(default).__name__}. " + f"Falling back to `False`." + ) + default_str = "false" + + # Retrieve and normalize the environment variable value + value = os.getenv(name, default_str) if value.lower() in true_values: return True elif value.lower() in false_values: @@ -34,9 +89,9 @@ def get_bool_env(name: str, default: bool = False) -> bool: logger = logging.getLogger(__name__) logger.warning( f"Environment variable '{name}' has unrecognized value '{value}'. " - f"Expected one of {true_values + false_values}. Using default: {default}" + f"Expected one of {true_values + false_values}. Using default: {default_str}" ) - return default + return default_str in true_values def resolve_refresh(refresh: str) -> str: diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py index 66deb2e7..accbe8cc 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py @@ -10,7 +10,7 @@ from elasticsearch import Elasticsearch # type: ignore[attr-defined] from stac_fastapi.core.base_settings import ApiBaseSettings -from stac_fastapi.core.utilities import get_bool_env +from stac_fastapi.core.utilities import get_bool_env, validate_refresh from stac_fastapi.types.config import ApiSettings @@ -96,15 +96,8 @@ def database_refresh(self) -> Union[bool, str]: Returns: Union[bool, str]: The value of DATABASE_REFRESH, which can be True, False, or "wait_for". """ - value = os.getenv("DATABASE_REFRESH", "false").lower() - if value in {"true", "false"}: - return value == "true" - elif value == "wait_for": - return "wait_for" - else: - raise ValueError( - "Invalid value for DATABASE_REFRESH. Must be 'true', 'false', or 'wait_for'." - ) + value = os.getenv("DATABASE_REFRESH", "false") + return validate_refresh(value) @property def create_client(self): @@ -135,15 +128,8 @@ def database_refresh(self) -> Union[bool, str]: Returns: Union[bool, str]: The value of DATABASE_REFRESH, which can be True, False, or "wait_for". """ - value = os.getenv("DATABASE_REFRESH", "false").lower() - if value in {"true", "false"}: - return value == "true" - elif value == "wait_for": - return "wait_for" - else: - raise ValueError( - "Invalid value for DATABASE_REFRESH. Must be 'true', 'false', or 'wait_for'." - ) + value = os.getenv("DATABASE_REFRESH", "false") + return validate_refresh(value) @property def create_client(self): diff --git a/stac_fastapi/opensearch/stac_fastapi/opensearch/config.py b/stac_fastapi/opensearch/stac_fastapi/opensearch/config.py index 217e686c..3a53ffdf 100644 --- a/stac_fastapi/opensearch/stac_fastapi/opensearch/config.py +++ b/stac_fastapi/opensearch/stac_fastapi/opensearch/config.py @@ -8,7 +8,7 @@ from opensearchpy import AsyncOpenSearch, OpenSearch from stac_fastapi.core.base_settings import ApiBaseSettings -from stac_fastapi.core.utilities import get_bool_env +from stac_fastapi.core.utilities import get_bool_env, validate_refresh from stac_fastapi.types.config import ApiSettings @@ -93,15 +93,8 @@ def database_refresh(self) -> Union[bool, str]: Returns: Union[bool, str]: The value of DATABASE_REFRESH, which can be True, False, or "wait_for". """ - value = os.getenv("DATABASE_REFRESH", "false").lower() - if value in {"true", "false"}: - return value == "true" - elif value == "wait_for": - return "wait_for" - else: - raise ValueError( - "Invalid value for DATABASE_REFRESH. Must be 'true', 'false', or 'wait_for'." - ) + value = os.getenv("DATABASE_REFRESH", "false") + return validate_refresh(value) @property def create_client(self): @@ -132,15 +125,8 @@ def database_refresh(self) -> Union[bool, str]: Returns: Union[bool, str]: The value of DATABASE_REFRESH, which can be True, False, or "wait_for". """ - value = os.getenv("DATABASE_REFRESH", "false").lower() - if value in {"true", "false"}: - return value == "true" - elif value == "wait_for": - return "wait_for" - else: - raise ValueError( - "Invalid value for DATABASE_REFRESH. Must be 'true', 'false', or 'wait_for'." - ) + value = os.getenv("DATABASE_REFRESH", "false") + return validate_refresh(value) @property def create_client(self): diff --git a/stac_fastapi/tests/config/test_config_settings.py b/stac_fastapi/tests/config/test_config_settings.py index 0a8399be..8509c259 100644 --- a/stac_fastapi/tests/config/test_config_settings.py +++ b/stac_fastapi/tests/config/test_config_settings.py @@ -44,7 +44,7 @@ def test_database_refresh_true(monkeypatch): monkeypatch.setenv("DATABASE_REFRESH", "true") settings_class, _ = get_settings_class() settings = settings_class() - assert settings.database_refresh is True + assert settings.database_refresh == "true" def test_database_refresh_false(monkeypatch): @@ -52,7 +52,7 @@ def test_database_refresh_false(monkeypatch): monkeypatch.setenv("DATABASE_REFRESH", "false") settings_class, _ = get_settings_class() settings = settings_class() - assert settings.database_refresh is False + assert settings.database_refresh == "false" def test_database_refresh_wait_for(monkeypatch): From fc31c31b299491d31f0c5ac3f54edfd0b88daf7e Mon Sep 17 00:00:00 2001 From: jonhealy1 Date: Sat, 10 May 2025 11:38:40 +0800 Subject: [PATCH 12/13] use unified verify refesh --- CHANGELOG.md | 2 +- .../elasticsearch/database_logic.py | 18 +++++++++--------- .../stac_fastapi/opensearch/database_logic.py | 18 +++++++++--------- 3 files changed, 19 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a8f20d45..ff7f5530 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,7 +13,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Changed -- Refactored CRUD methods in `TransactionsClient` to use the `_resolve_refresh` helper method for consistent and reusable handling of the `refresh` parameter. [#370](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/370) +- Refactored CRUD methods in `TransactionsClient` to use the `validate_refresh` helper method for consistent and reusable handling of the `refresh` parameter. [#370](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/370) ### Fixed diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py index d35b28b8..7afbb58d 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py @@ -31,7 +31,7 @@ ) from stac_fastapi.core.extensions import filter from stac_fastapi.core.serializers import CollectionSerializer, ItemSerializer -from stac_fastapi.core.utilities import MAX_LIMIT, bbox2polygon, resolve_refresh +from stac_fastapi.core.utilities import MAX_LIMIT, bbox2polygon, validate_refresh from stac_fastapi.elasticsearch.config import AsyncElasticsearchSettings from stac_fastapi.elasticsearch.config import ( ElasticsearchSettings as SyncElasticsearchSettings, @@ -874,7 +874,7 @@ async def create_item( # Resolve the `refresh` parameter refresh = kwargs.get("refresh", self.async_settings.database_refresh) - refresh = resolve_refresh(str(refresh).lower()) + refresh = validate_refresh(refresh) # Log the creation attempt logger.info( @@ -915,7 +915,7 @@ async def delete_item(self, item_id: str, collection_id: str, **kwargs: Any): # Resolve the `refresh` parameter refresh = kwargs.get("refresh", self.async_settings.database_refresh) - refresh = resolve_refresh(str(refresh).lower()) + refresh = validate_refresh(refresh) # Log the deletion attempt logger.info( @@ -978,7 +978,7 @@ async def create_collection(self, collection: Collection, **kwargs: Any): # Resolve the `refresh` parameter refresh = kwargs.get("refresh", self.async_settings.database_refresh) - refresh = resolve_refresh(str(refresh).lower()) + refresh = validate_refresh(refresh) # Log the creation attempt logger.info(f"Creating collection {collection_id} with refresh={refresh}") @@ -1053,7 +1053,7 @@ async def update_collection( # Resolve the `refresh` parameter refresh = kwargs.get("refresh", self.async_settings.database_refresh) - refresh = resolve_refresh(str(refresh).lower()) + refresh = validate_refresh(refresh) # Log the update attempt logger.info(f"Updating collection {collection_id} with refresh={refresh}") @@ -1124,7 +1124,7 @@ async def delete_collection(self, collection_id: str, **kwargs: Any): # Resolve the `refresh` parameter refresh = kwargs.get("refresh", self.async_settings.database_refresh) - refresh = resolve_refresh(str(refresh).lower()) + refresh = validate_refresh(refresh) # Log the deletion attempt logger.info(f"Deleting collection {collection_id} with refresh={refresh}") @@ -1180,7 +1180,7 @@ async def bulk_async( # Resolve the `refresh` parameter refresh = kwargs.get("refresh", self.async_settings.database_refresh) - refresh = resolve_refresh(str(refresh).lower()) + refresh = validate_refresh(refresh) # Log the bulk insert attempt logger.info( @@ -1245,8 +1245,8 @@ def bulk_sync( kwargs = kwargs or {} # Resolve the `refresh` parameter - refresh = kwargs.get("refresh", self.sync_settings.database_refresh) - refresh = resolve_refresh(str(refresh).lower()) + refresh = kwargs.get("refresh", self.async_settings.database_refresh) + refresh = validate_refresh(refresh) # Log the bulk insert attempt logger.info( diff --git a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py index 067f5618..5b9510f3 100644 --- a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py +++ b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py @@ -31,7 +31,7 @@ ) from stac_fastapi.core.extensions import filter from stac_fastapi.core.serializers import CollectionSerializer, ItemSerializer -from stac_fastapi.core.utilities import MAX_LIMIT, bbox2polygon, resolve_refresh +from stac_fastapi.core.utilities import MAX_LIMIT, bbox2polygon, validate_refresh from stac_fastapi.opensearch.config import ( AsyncOpensearchSettings as AsyncSearchSettings, ) @@ -891,7 +891,7 @@ async def create_item( # Resolve the `refresh` parameter refresh = kwargs.get("refresh", self.async_settings.database_refresh) - refresh = resolve_refresh(str(refresh).lower()) + refresh = validate_refresh(refresh) # Log the creation attempt logger.info( @@ -924,7 +924,7 @@ async def delete_item(self, item_id: str, collection_id: str, **kwargs: Any): # Resolve the `refresh` parameter refresh = kwargs.get("refresh", self.async_settings.database_refresh) - refresh = resolve_refresh(str(refresh).lower()) + refresh = validate_refresh(refresh) # Log the deletion attempt logger.info( @@ -980,7 +980,7 @@ async def create_collection(self, collection: Collection, **kwargs: Any): # Resolve the `refresh` parameter refresh = kwargs.get("refresh", self.async_settings.database_refresh) - refresh = resolve_refresh(str(refresh).lower()) + refresh = validate_refresh(refresh) # Log the creation attempt logger.info(f"Creating collection {collection_id} with refresh={refresh}") @@ -1047,7 +1047,7 @@ async def update_collection( # Resolve the `refresh` parameter refresh = kwargs.get("refresh", self.async_settings.database_refresh) - refresh = resolve_refresh(str(refresh).lower()) + refresh = validate_refresh(refresh) # Log the update attempt logger.info(f"Updating collection {collection_id} with refresh={refresh}") @@ -1107,7 +1107,7 @@ async def delete_collection(self, collection_id: str, **kwargs: Any): # Resolve the `refresh` parameter refresh = kwargs.get("refresh", self.async_settings.database_refresh) - refresh = resolve_refresh(str(refresh).lower()) + refresh = validate_refresh(refresh) # Log the deletion attempt logger.info(f"Deleting collection {collection_id} with refresh={refresh}") @@ -1155,7 +1155,7 @@ async def bulk_async( # Resolve the `refresh` parameter refresh = kwargs.get("refresh", self.async_settings.database_refresh) - refresh = resolve_refresh(str(refresh).lower()) + refresh = validate_refresh(refresh) # Log the bulk insert attempt logger.info( @@ -1217,8 +1217,8 @@ def bulk_sync( kwargs = kwargs or {} # Resolve the `refresh` parameter - refresh = kwargs.get("refresh", self.sync_settings.database_refresh) - refresh = resolve_refresh(str(refresh).lower()) + refresh = kwargs.get("refresh", self.async_settings.database_refresh) + refresh = validate_refresh(refresh) # Log the bulk insert attempt logger.info( From 7baf5b470c4523eb2381784bc685f4a08f5b92ce Mon Sep 17 00:00:00 2001 From: jonhealy1 Date: Sat, 10 May 2025 11:42:40 +0800 Subject: [PATCH 13/13] remove unused fn --- .../core/stac_fastapi/core/utilities.py | 27 ------------------- 1 file changed, 27 deletions(-) diff --git a/stac_fastapi/core/stac_fastapi/core/utilities.py b/stac_fastapi/core/stac_fastapi/core/utilities.py index 243f9cb3..d4a35109 100644 --- a/stac_fastapi/core/stac_fastapi/core/utilities.py +++ b/stac_fastapi/core/stac_fastapi/core/utilities.py @@ -94,33 +94,6 @@ def get_bool_env(name: str, default: Union[bool, str] = False) -> bool: return default_str in true_values -def resolve_refresh(refresh: str) -> str: - """ - Resolve the `refresh` parameter from kwargs or the environment variable. - - Args: - refresh (str): The `refresh` parameter value. - - Returns: - str: The resolved value of the `refresh` parameter, which can be "true", "false", or "wait_for". - - Raises: - ValueError: If the `refresh` value is not one of "true", "false", or "wait_for". - """ - logger = logging.getLogger(__name__) - - # Normalize and validate the `refresh` value - refresh = refresh.lower() - if refresh not in {"true", "false", "wait_for"}: - raise ValueError( - "Invalid value for `refresh`. Must be 'true', 'false', or 'wait_for'." - ) - - # Log the resolved value - logger.info(f"`refresh` parameter resolved to: {refresh}") - return refresh - - def bbox2polygon(b0: float, b1: float, b2: float, b3: float) -> List[List[List[float]]]: """Transform a bounding box represented by its four coordinates `b0`, `b1`, `b2`, and `b3` into a polygon.