Skip to content

Commit 02b896a

Browse files
committed
upgrade stac-fastapi to v2.4.9, implement support for BulkTransaction method parameter
1 parent 830e678 commit 02b896a

File tree

3 files changed

+16
-12
lines changed

3 files changed

+16
-12
lines changed

stac_fastapi/elasticsearch/setup.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@
1010
"attrs",
1111
"pydantic[dotenv]<2",
1212
"stac_pydantic==2.0.*",
13-
"stac-fastapi.types==2.4.8",
14-
"stac-fastapi.api==2.4.8",
15-
"stac-fastapi.extensions==2.4.8",
13+
"stac-fastapi.types==2.4.9",
14+
"stac-fastapi.api==2.4.9",
15+
"stac-fastapi.extensions==2.4.9",
1616
"elasticsearch[async]==7.17.9",
1717
"elasticsearch-dsl==7.4.1",
1818
"pystac[validation]",

stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
from stac_fastapi.elasticsearch.session import Session
2727
from stac_fastapi.extensions.third_party.bulk_transactions import (
2828
BaseBulkTransactionsClient,
29-
Items,
29+
Items, BulkTransactionMethod,
3030
)
3131
from stac_fastapi.types import stac as stac_types
3232
from stac_fastapi.types.config import Settings
@@ -568,7 +568,7 @@ async def create_item(
568568
if item["type"] == "FeatureCollection":
569569
bulk_client = BulkTransactionsClient()
570570
processed_items = [
571-
bulk_client.preprocess_item(item, base_url) for item in item["features"] # type: ignore
571+
bulk_client.preprocess_item(item, base_url, BulkTransactionMethod.INSERT) for item in item["features"] # type: ignore
572572
]
573573

574574
await self.database.bulk_async(
@@ -718,17 +718,19 @@ def __attrs_post_init__(self):
718718
settings = ElasticsearchSettings()
719719
self.client = settings.create_client
720720

721-
def preprocess_item(self, item: stac_types.Item, base_url) -> stac_types.Item:
721+
def preprocess_item(self, item: stac_types.Item, base_url, method: BulkTransactionMethod) -> stac_types.Item:
722722
"""Preprocess an item to match the data model.
723723
724724
Args:
725725
item: The item to preprocess.
726726
base_url: The base URL of the request.
727+
method: The bulk transaction method.
727728
728729
Returns:
729730
The preprocessed item.
730731
"""
731-
return self.database.sync_prep_create_item(item=item, base_url=base_url)
732+
exist_ok = (method == BulkTransactionMethod.UPSERT)
733+
return self.database.sync_prep_create_item(item=item, base_url=base_url, exist_ok=exist_ok)
732734

733735
@overrides
734736
def bulk_item_insert(
@@ -751,7 +753,7 @@ def bulk_item_insert(
751753
base_url = ""
752754

753755
processed_items = [
754-
self.preprocess_item(item, base_url) for item in items.items.values()
756+
self.preprocess_item(item, base_url, items.method) for item in items.items.values()
755757
]
756758

757759
# not a great way to get the collection_id-- should be part of the method signature

stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -591,13 +591,14 @@ async def check_collection_exists(self, collection_id: str):
591591
if not await self.client.exists(index=COLLECTIONS_INDEX, id=collection_id):
592592
raise NotFoundError(f"Collection {collection_id} does not exist")
593593

594-
async def prep_create_item(self, item: Item, base_url: str) -> Item:
594+
async def prep_create_item(self, item: Item, base_url: str, exist_ok: bool = False) -> Item:
595595
"""
596596
Preps an item for insertion into the database.
597597
598598
Args:
599599
item (Item): The item to be prepped for insertion.
600600
base_url (str): The base URL used to create the item's self URL.
601+
exist_ok (bool): Indicates whether the item can exist already.
601602
602603
Returns:
603604
Item: The prepped item.
@@ -608,7 +609,7 @@ async def prep_create_item(self, item: Item, base_url: str) -> Item:
608609
"""
609610
await self.check_collection_exists(collection_id=item["collection"])
610611

611-
if await self.client.exists(
612+
if not exist_ok and await self.client.exists(
612613
index=index_by_collection_id(item["collection"]),
613614
id=mk_item_id(item["id"], item["collection"]),
614615
):
@@ -618,7 +619,7 @@ async def prep_create_item(self, item: Item, base_url: str) -> Item:
618619

619620
return self.item_serializer.stac_to_db(item, base_url)
620621

621-
def sync_prep_create_item(self, item: Item, base_url: str) -> Item:
622+
def sync_prep_create_item(self, item: Item, base_url: str, exist_ok: bool = False) -> Item:
622623
"""
623624
Prepare an item for insertion into the database.
624625
@@ -629,6 +630,7 @@ def sync_prep_create_item(self, item: Item, base_url: str) -> Item:
629630
Args:
630631
item (Item): The item to be inserted into the database.
631632
base_url (str): The base URL used for constructing URLs for the item.
633+
exist_ok (bool): Indicates whether the item can exist already.
632634
633635
Returns:
634636
Item: The item after preparation is done.
@@ -642,7 +644,7 @@ def sync_prep_create_item(self, item: Item, base_url: str) -> Item:
642644
if not self.sync_client.exists(index=COLLECTIONS_INDEX, id=collection_id):
643645
raise NotFoundError(f"Collection {collection_id} does not exist")
644646

645-
if self.sync_client.exists(
647+
if not exist_ok and self.sync_client.exists(
646648
index=index_by_collection_id(collection_id),
647649
id=mk_item_id(item_id, collection_id),
648650
):

0 commit comments

Comments
 (0)