diff --git a/CHANGELOG.md b/CHANGELOG.md index 66dd8d67..94bec4de 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Fixed - Exclude unset fields in search response [#166](https://github.com/stac-utils/stac-fastapi-elasticsearch/issues/166) +- Upgrade stac-fastapi to v2.4.9 [#172](https://github.com/stac-utils/stac-fastapi-elasticsearch/pull/172) ## [v1.0.0] diff --git a/stac_fastapi/elasticsearch/setup.py b/stac_fastapi/elasticsearch/setup.py index c0f941f0..3106c512 100644 --- a/stac_fastapi/elasticsearch/setup.py +++ b/stac_fastapi/elasticsearch/setup.py @@ -10,9 +10,9 @@ "attrs", "pydantic[dotenv]<2", "stac_pydantic==2.0.*", - "stac-fastapi.types==2.4.8", - "stac-fastapi.api==2.4.8", - "stac-fastapi.extensions==2.4.8", + "stac-fastapi.types==2.4.9", + "stac-fastapi.api==2.4.9", + "stac-fastapi.extensions==2.4.9", "elasticsearch[async]==8.11.0", "elasticsearch-dsl==8.11.0", "pystac[validation]", diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py index 76bb04cd..4fb9f174 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py @@ -26,6 +26,7 @@ from stac_fastapi.elasticsearch.session import Session from stac_fastapi.extensions.third_party.bulk_transactions import ( BaseBulkTransactionsClient, + BulkTransactionMethod, Items, ) from stac_fastapi.types import stac as stac_types @@ -568,7 +569,7 @@ async def create_item( if item["type"] == "FeatureCollection": bulk_client = BulkTransactionsClient() processed_items = [ - bulk_client.preprocess_item(item, base_url) for item in item["features"] # type: ignore + bulk_client.preprocess_item(item, base_url, BulkTransactionMethod.INSERT) for item in item["features"] # type: ignore ] await self.database.bulk_async( @@ -718,17 +719,23 @@ def __attrs_post_init__(self): settings = ElasticsearchSettings() self.client = settings.create_client - def preprocess_item(self, item: stac_types.Item, base_url) -> stac_types.Item: + def preprocess_item( + self, item: stac_types.Item, base_url, method: BulkTransactionMethod + ) -> stac_types.Item: """Preprocess an item to match the data model. Args: item: The item to preprocess. base_url: The base URL of the request. + method: The bulk transaction method. Returns: The preprocessed item. """ - return self.database.sync_prep_create_item(item=item, base_url=base_url) + exist_ok = method == BulkTransactionMethod.UPSERT + return self.database.sync_prep_create_item( + item=item, base_url=base_url, exist_ok=exist_ok + ) @overrides def bulk_item_insert( @@ -751,7 +758,8 @@ def bulk_item_insert( base_url = "" processed_items = [ - self.preprocess_item(item, base_url) for item in items.items.values() + self.preprocess_item(item, base_url, items.method) + for item in items.items.values() ] # not a great way to get the collection_id-- should be part of the method signature diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py index 4554e74c..336c8d07 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py @@ -591,13 +591,16 @@ async def check_collection_exists(self, collection_id: str): if not await self.client.exists(index=COLLECTIONS_INDEX, id=collection_id): raise NotFoundError(f"Collection {collection_id} does not exist") - async def prep_create_item(self, item: Item, base_url: str) -> Item: + async def prep_create_item( + self, item: Item, base_url: str, exist_ok: bool = False + ) -> Item: """ Preps an item for insertion into the database. Args: item (Item): The item to be prepped for insertion. base_url (str): The base URL used to create the item's self URL. + exist_ok (bool): Indicates whether the item can exist already. Returns: Item: The prepped item. @@ -608,7 +611,7 @@ async def prep_create_item(self, item: Item, base_url: str) -> Item: """ await self.check_collection_exists(collection_id=item["collection"]) - if await self.client.exists( + if not exist_ok and await self.client.exists( index=index_by_collection_id(item["collection"]), id=mk_item_id(item["id"], item["collection"]), ): @@ -618,17 +621,20 @@ async def prep_create_item(self, item: Item, base_url: str) -> Item: return self.item_serializer.stac_to_db(item, base_url) - def sync_prep_create_item(self, item: Item, base_url: str) -> Item: + def sync_prep_create_item( + self, item: Item, base_url: str, exist_ok: bool = False + ) -> Item: """ Prepare an item for insertion into the database. This method performs pre-insertion preparation on the given `item`, such as checking if the collection the item belongs to exists, - and verifying that an item with the same ID does not already exist in the database. + and optionally verifying that an item with the same ID does not already exist in the database. Args: item (Item): The item to be inserted into the database. base_url (str): The base URL used for constructing URLs for the item. + exist_ok (bool): Indicates whether the item can exist already. Returns: Item: The item after preparation is done. @@ -642,7 +648,7 @@ def sync_prep_create_item(self, item: Item, base_url: str) -> Item: if not self.sync_client.exists(index=COLLECTIONS_INDEX, id=collection_id): raise NotFoundError(f"Collection {collection_id} does not exist") - if self.sync_client.exists( + if not exist_ok and self.sync_client.exists( index=index_by_collection_id(collection_id), id=mk_item_id(item_id, collection_id), ):