Skip to content

Commit 204eac9

Browse files
committed
update os db logic
1 parent 5bb8356 commit 204eac9

File tree

1 file changed

+133
-25
lines changed

1 file changed

+133
-25
lines changed

stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py

Lines changed: 133 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
)
3232
from stac_fastapi.core.extensions import filter
3333
from stac_fastapi.core.serializers import CollectionSerializer, ItemSerializer
34-
from stac_fastapi.core.utilities import MAX_LIMIT, bbox2polygon
34+
from stac_fastapi.core.utilities import MAX_LIMIT, bbox2polygon, resolve_refresh
3535
from stac_fastapi.opensearch.config import (
3636
AsyncOpensearchSettings as AsyncSearchSettings,
3737
)
@@ -864,15 +864,17 @@ def bulk_sync_prep_create_item(
864864
async def create_item(
865865
self,
866866
item: Item,
867-
refresh: bool = False,
868867
base_url: str = "",
869868
exist_ok: bool = False,
869+
**kwargs: Any,
870870
):
871871
"""Database logic for creating one item.
872872
873873
Args:
874874
item (Item): The item to be created.
875-
refresh (bool, optional): Refresh the index after performing the operation. Defaults to False.
875+
base_url (str, optional): The base URL for the item. Defaults to an empty string.
876+
exist_ok (bool, optional): Whether to allow the item to exist already. Defaults to False.
877+
**kwargs: Additional keyword arguments like refresh.
876878
877879
Raises:
878880
ConflictError: If the item already exists in the database.
@@ -883,6 +885,19 @@ async def create_item(
883885
# todo: check if collection exists, but cache
884886
item_id = item["id"]
885887
collection_id = item["collection"]
888+
889+
# Ensure kwargs is a dictionary
890+
kwargs = kwargs or {}
891+
892+
# Resolve the `refresh` parameter
893+
refresh = kwargs.get("refresh", self.async_settings.database_refresh)
894+
refresh = resolve_refresh(str(refresh).lower())
895+
896+
# Log the creation attempt
897+
logger.info(
898+
f"Creating item {item_id} in collection {collection_id} with refresh={refresh}"
899+
)
900+
886901
item = await self.async_prep_create_item(
887902
item=item, base_url=base_url, exist_ok=exist_ok
888903
)
@@ -893,19 +908,29 @@ async def create_item(
893908
refresh=refresh,
894909
)
895910

896-
async def delete_item(
897-
self, item_id: str, collection_id: str, refresh: bool = False
898-
):
911+
async def delete_item(self, item_id: str, collection_id: str, **kwargs: Any):
899912
"""Delete a single item from the database.
900913
901914
Args:
902915
item_id (str): The id of the Item to be deleted.
903916
collection_id (str): The id of the Collection that the Item belongs to.
904-
refresh (bool, optional): Whether to refresh the index after the deletion. Default is False.
917+
**kwargs: Additional keyword arguments like refresh.
905918
906919
Raises:
907920
NotFoundError: If the Item does not exist in the database.
908921
"""
922+
# Ensure kwargs is a dictionary
923+
kwargs = kwargs or {}
924+
925+
# Resolve the `refresh` parameter
926+
refresh = kwargs.get("refresh", self.async_settings.database_refresh)
927+
refresh = resolve_refresh(str(refresh).lower())
928+
929+
# Log the deletion attempt
930+
logger.info(
931+
f"Deleting item {item_id} from collection {collection_id} with refresh={refresh}"
932+
)
933+
909934
try:
910935
await self.client.delete(
911936
index=index_alias_by_collection_id(collection_id),
@@ -935,12 +960,12 @@ async def get_items_mapping(self, collection_id: str) -> Dict[str, Any]:
935960
except exceptions.NotFoundError:
936961
raise NotFoundError(f"Mapping for index {index_name} not found")
937962

938-
async def create_collection(self, collection: Collection, refresh: bool = False):
963+
async def create_collection(self, collection: Collection, **kwargs: Any):
939964
"""Create a single collection in the database.
940965
941966
Args:
942967
collection (Collection): The Collection object to be created.
943-
refresh (bool, optional): Whether to refresh the index after the creation. Default is False.
968+
**kwargs: Additional keyword arguments like refresh.
944969
945970
Raises:
946971
ConflictError: If a Collection with the same id already exists in the database.
@@ -950,6 +975,16 @@ async def create_collection(self, collection: Collection, refresh: bool = False)
950975
"""
951976
collection_id = collection["id"]
952977

978+
# Ensure kwargs is a dictionary
979+
kwargs = kwargs or {}
980+
981+
# Resolve the `refresh` parameter
982+
refresh = kwargs.get("refresh", self.async_settings.database_refresh)
983+
refresh = resolve_refresh(str(refresh).lower())
984+
985+
# Log the creation attempt
986+
logger.info(f"Creating collection {collection_id} with refresh={refresh}")
987+
953988
if await self.client.exists(index=COLLECTIONS_INDEX, id=collection_id):
954989
raise ConflictError(f"Collection {collection_id} already exists")
955990

@@ -989,14 +1024,14 @@ async def find_collection(self, collection_id: str) -> Collection:
9891024
return collection["_source"]
9901025

9911026
async def update_collection(
992-
self, collection_id: str, collection: Collection, refresh: bool = False
1027+
self, collection_id: str, collection: Collection, **kwargs: Any
9931028
):
9941029
"""Update a collection from the database.
9951030
9961031
Args:
997-
self: The instance of the object calling this function.
9981032
collection_id (str): The ID of the collection to be updated.
9991033
collection (Collection): The Collection object to be used for the update.
1034+
**kwargs: Additional keyword arguments like refresh.
10001035
10011036
Raises:
10021037
NotFoundError: If the collection with the given `collection_id` is not
@@ -1007,9 +1042,23 @@ async def update_collection(
10071042
`collection_id` and with the collection specified in the `Collection` object.
10081043
If the collection is not found, a `NotFoundError` is raised.
10091044
"""
1045+
# Ensure kwargs is a dictionary
1046+
kwargs = kwargs or {}
1047+
1048+
# Resolve the `refresh` parameter
1049+
refresh = kwargs.get("refresh", self.async_settings.database_refresh)
1050+
refresh = resolve_refresh(str(refresh).lower())
1051+
1052+
# Log the update attempt
1053+
logger.info(f"Updating collection {collection_id} with refresh={refresh}")
1054+
10101055
await self.find_collection(collection_id=collection_id)
10111056

10121057
if collection_id != collection["id"]:
1058+
logger.info(
1059+
f"Collection ID change detected: {collection_id} -> {collection['id']}"
1060+
)
1061+
10131062
await self.create_collection(collection, refresh=refresh)
10141063

10151064
await self.client.reindex(
@@ -1025,7 +1074,7 @@ async def update_collection(
10251074
refresh=refresh,
10261075
)
10271076

1028-
await self.delete_collection(collection_id)
1077+
await self.delete_collection(collection_id=collection_id, **kwargs)
10291078

10301079
else:
10311080
await self.client.index(
@@ -1035,23 +1084,34 @@ async def update_collection(
10351084
refresh=refresh,
10361085
)
10371086

1038-
async def delete_collection(self, collection_id: str, refresh: bool = False):
1087+
async def delete_collection(self, collection_id: str, **kwargs: Any):
10391088
"""Delete a collection from the database.
10401089
10411090
Parameters:
10421091
self: The instance of the object calling this function.
10431092
collection_id (str): The ID of the collection to be deleted.
1044-
refresh (bool): Whether to refresh the index after the deletion (default: False).
1093+
**kwargs: Additional keyword arguments like refresh.
10451094
10461095
Raises:
10471096
NotFoundError: If the collection with the given `collection_id` is not found in the database.
10481097
10491098
Notes:
10501099
This function first verifies that the collection with the specified `collection_id` exists in the database, and then
1051-
deletes the collection. If `refresh` is set to True, the index is refreshed after the deletion. Additionally, this
1052-
function also calls `delete_item_index` to delete the index for the items in the collection.
1100+
deletes the collection. If `refresh` is set to "true", "false", or "wait_for", the index is refreshed accordingly after
1101+
the deletion. Additionally, this function also calls `delete_item_index` to delete the index for the items in the collection.
10531102
"""
1103+
# Ensure kwargs is a dictionary
1104+
kwargs = kwargs or {}
1105+
10541106
await self.find_collection(collection_id=collection_id)
1107+
1108+
# Resolve the `refresh` parameter
1109+
refresh = kwargs.get("refresh", self.async_settings.database_refresh)
1110+
refresh = resolve_refresh(str(refresh).lower())
1111+
1112+
# Log the deletion attempt
1113+
logger.info(f"Deleting collection {collection_id} with refresh={refresh}")
1114+
10551115
await self.client.delete(
10561116
index=COLLECTIONS_INDEX, id=collection_id, refresh=refresh
10571117
)
@@ -1061,15 +1121,17 @@ async def bulk_async(
10611121
self,
10621122
collection_id: str,
10631123
processed_items: List[Item],
1064-
refresh: bool = False,
1124+
**kwargs: Any,
10651125
) -> Tuple[int, List[Dict[str, Any]]]:
10661126
"""
10671127
Perform a bulk insert of items into the database asynchronously.
10681128
10691129
Args:
10701130
collection_id (str): The ID of the collection to which the items belong.
10711131
processed_items (List[Item]): A list of `Item` objects to be inserted into the database.
1072-
refresh (bool): Whether to refresh the index after the bulk insert (default: False).
1132+
**kwargs (Any): Additional keyword arguments, including:
1133+
- refresh (str, optional): Whether to refresh the index after the bulk insert.
1134+
Can be "true", "false", or "wait_for". Defaults to the value of `self.sync_settings.database_refresh`.
10731135
10741136
Returns:
10751137
Tuple[int, List[Dict[str, Any]]]: A tuple containing:
@@ -1078,32 +1140,58 @@ async def bulk_async(
10781140
10791141
Notes:
10801142
This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`.
1081-
The insert is performed asynchronously, and the event loop is used to run the operation in a separate executor.
1082-
The `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to True,
1083-
the index is refreshed after the bulk insert.
1143+
The insert is performed synchronously and blocking, meaning that the function does not return until the insert has
1144+
completed. The `mk_actions` function is called to generate a list of actions for the bulk insert. The `refresh`
1145+
parameter determines whether the index is refreshed after the bulk insert:
1146+
- "true": Forces an immediate refresh of the index.
1147+
- "false": Does not refresh the index immediately (default behavior).
1148+
- "wait_for": Waits for the next refresh cycle to make the changes visible.
10841149
"""
1150+
# Ensure kwargs is a dictionary
1151+
kwargs = kwargs or {}
1152+
1153+
# Resolve the `refresh` parameter
1154+
refresh = kwargs.get("refresh", self.async_settings.database_refresh)
1155+
refresh = resolve_refresh(str(refresh).lower())
1156+
1157+
# Log the bulk insert attempt
1158+
logger.info(
1159+
f"Performing bulk insert for collection {collection_id} with refresh={refresh}"
1160+
)
1161+
1162+
# Handle empty processed_items
1163+
if not processed_items:
1164+
logger.warning(f"No items to insert for collection {collection_id}")
1165+
return 0, []
1166+
10851167
raise_on_error = self.async_settings.raise_on_bulk_error
10861168
success, errors = await helpers.async_bulk(
10871169
self.client,
10881170
mk_actions(collection_id, processed_items),
10891171
refresh=refresh,
10901172
raise_on_error=raise_on_error,
10911173
)
1174+
# Log the result
1175+
logger.info(
1176+
f"Bulk insert completed for collection {collection_id}: {success} successes, {len(errors)} errors"
1177+
)
10921178
return success, errors
10931179

10941180
def bulk_sync(
10951181
self,
10961182
collection_id: str,
10971183
processed_items: List[Item],
1098-
refresh: bool = False,
1184+
**kwargs: Any,
10991185
) -> Tuple[int, List[Dict[str, Any]]]:
11001186
"""
11011187
Perform a bulk insert of items into the database synchronously.
11021188
11031189
Args:
11041190
collection_id (str): The ID of the collection to which the items belong.
11051191
processed_items (List[Item]): A list of `Item` objects to be inserted into the database.
1106-
refresh (bool): Whether to refresh the index after the bulk insert (default: False).
1192+
**kwargs (Any): Additional keyword arguments, including:
1193+
- refresh (str, optional): Whether to refresh the index after the bulk insert.
1194+
Can be "true", "false", or "wait_for". Defaults to the value of `self.sync_settings.database_refresh`.
11071195
11081196
Returns:
11091197
Tuple[int, List[Dict[str, Any]]]: A tuple containing:
@@ -1113,9 +1201,29 @@ def bulk_sync(
11131201
Notes:
11141202
This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`.
11151203
The insert is performed synchronously and blocking, meaning that the function does not return until the insert has
1116-
completed. The `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to
1117-
True, the index is refreshed after the bulk insert.
1204+
completed. The `mk_actions` function is called to generate a list of actions for the bulk insert. The `refresh`
1205+
parameter determines whether the index is refreshed after the bulk insert:
1206+
- "true": Forces an immediate refresh of the index.
1207+
- "false": Does not refresh the index immediately (default behavior).
1208+
- "wait_for": Waits for the next refresh cycle to make the changes visible.
11181209
"""
1210+
# Ensure kwargs is a dictionary
1211+
kwargs = kwargs or {}
1212+
1213+
# Resolve the `refresh` parameter
1214+
refresh = kwargs.get("refresh", self.sync_settings.database_refresh)
1215+
refresh = resolve_refresh(str(refresh).lower())
1216+
1217+
# Log the bulk insert attempt
1218+
logger.info(
1219+
f"Performing bulk insert for collection {collection_id} with refresh={refresh}"
1220+
)
1221+
1222+
# Handle empty processed_items
1223+
if not processed_items:
1224+
logger.warning(f"No items to insert for collection {collection_id}")
1225+
return 0, []
1226+
11191227
raise_on_error = self.sync_settings.raise_on_bulk_error
11201228
success, errors = helpers.bulk(
11211229
self.sync_client,

0 commit comments

Comments
 (0)