Skip to content

Commit 602629b

Browse files
committed
add to transactions client
1 parent 3a9f076 commit 602629b

File tree

4 files changed

+72
-11
lines changed

4 files changed

+72
-11
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ You can customize additional settings in your `.env` file:
115115
| `OPENSEARCH_VERSION` | OpenSearch version | `2.11.1` | Optional
116116
| `ENABLE_DIRECT_RESPONSE` | Enable direct response for maximum performance (disables all FastAPI dependencies, including authentication, custom status codes, and validation) | `false` | Optional
117117
| `RAISE_ON_BULK_ERROR` | Controls whether bulk insert operations raise exceptions on errors. If set to `true`, the operation will stop and raise an exception when an error occurs. If set to `false`, errors will be logged, and the operation will continue. **Note:** STAC Item and ItemCollection validation errors will always raise, regardless of this flag. | `false` Optional |
118-
| `ES_OS_REFRESH` | Controls whether Elasticsearch/OpenSearch operations refresh the index immediately after changes. If set to `true`, changes will be immediately searchable. If set to `false`, changes may not be immediately visible but can improve performance for bulk operations. | `false` | Optional |
118+
| `DATABASE_REFRESH` | Controls whether database operations refresh the index immediately after changes. If set to `true`, changes will be immediately searchable. If set to `false`, changes may not be immediately visible but can improve performance for bulk operations. | `false` | Optional |
119119

120120
> [!NOTE]
121121
> The variables `ES_HOST`, `ES_PORT`, `ES_USE_SSL`, and `ES_VERIFY_CERTS` apply to both Elasticsearch and OpenSearch backends, so there is no need to rename the key names to `OS_` even if you're using OpenSearch.

stac_fastapi/core/stac_fastapi/core/core.py

Lines changed: 67 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -673,6 +673,27 @@ class TransactionsClient(AsyncBaseTransactionsClient):
673673
settings: ApiBaseSettings = attr.ib()
674674
session: Session = attr.ib(default=attr.Factory(Session.create_from_env))
675675

676+
def _resolve_refresh(self, **kwargs) -> bool:
677+
"""
678+
Resolve the `refresh` parameter from kwargs or the environment variable.
679+
680+
Args:
681+
**kwargs: Additional keyword arguments, including `refresh`.
682+
683+
Returns:
684+
bool: The resolved value of the `refresh` parameter.
685+
"""
686+
refresh = kwargs.get(
687+
"refresh", self.database.async_settings.database_refresh == "true"
688+
)
689+
if "refresh" in kwargs:
690+
logger.info(f"`refresh` parameter explicitly passed in kwargs: {refresh}")
691+
else:
692+
logger.info(
693+
f"`refresh` parameter derived from environment variable: {refresh}"
694+
)
695+
return refresh
696+
676697
@overrides
677698
async def create_item(
678699
self, collection_id: str, item: Union[Item, ItemCollection], **kwargs
@@ -696,6 +717,9 @@ async def create_item(
696717
request = kwargs.get("request")
697718
base_url = str(request.base_url)
698719

720+
# Resolve the `refresh` parameter
721+
refresh = self._resolve_refresh(**kwargs)
722+
699723
# Convert Pydantic model to dict for uniform processing
700724
item_dict = item.model_dump(mode="json")
701725

@@ -712,10 +736,11 @@ async def create_item(
712736
for feature in features
713737
]
714738
attempted = len(processed_items)
739+
715740
success, errors = await self.database.bulk_async(
716741
collection_id,
717742
processed_items,
718-
refresh=kwargs.get("refresh", False),
743+
refresh=refresh,
719744
)
720745
if errors:
721746
logger.error(
@@ -730,7 +755,7 @@ async def create_item(
730755
# Handle single item
731756
await self.database.create_item(
732757
item_dict,
733-
refresh=kwargs.get("refresh", False),
758+
refresh=refresh,
734759
base_url=base_url,
735760
exist_ok=False,
736761
)
@@ -757,11 +782,15 @@ async def update_item(
757782
"""
758783
item = item.model_dump(mode="json")
759784
base_url = str(kwargs["request"].base_url)
785+
786+
# Resolve the `refresh` parameter
787+
refresh = self._resolve_refresh(**kwargs)
788+
760789
now = datetime_type.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
761790
item["properties"]["updated"] = now
762791

763792
await self.database.create_item(
764-
item, refresh=kwargs.get("refresh", False), base_url=base_url, exist_ok=True
793+
item, refresh=refresh, base_url=base_url, exist_ok=True
765794
)
766795

767796
return ItemSerializer.db_to_stac(item, base_url)
@@ -777,7 +806,12 @@ async def delete_item(self, item_id: str, collection_id: str, **kwargs) -> None:
777806
Returns:
778807
None: Returns 204 No Content on successful deletion
779808
"""
780-
await self.database.delete_item(item_id=item_id, collection_id=collection_id)
809+
# Resolve the `refresh` parameter
810+
refresh = self._resolve_refresh(**kwargs)
811+
812+
await self.database.delete_item(
813+
item_id=item_id, collection_id=collection_id, refresh=refresh
814+
)
781815
return None
782816

783817
@overrides
@@ -798,8 +832,12 @@ async def create_collection(
798832
"""
799833
collection = collection.model_dump(mode="json")
800834
request = kwargs["request"]
835+
836+
# Resolve the `refresh` parameter
837+
refresh = self._resolve_refresh(**kwargs)
838+
801839
collection = self.database.collection_serializer.stac_to_db(collection, request)
802-
await self.database.create_collection(collection=collection)
840+
await self.database.create_collection(collection=collection, refresh=refresh)
803841
return CollectionSerializer.db_to_stac(
804842
collection,
805843
request,
@@ -833,9 +871,12 @@ async def update_collection(
833871

834872
request = kwargs["request"]
835873

874+
# Resolve the `refresh` parameter
875+
refresh = self._resolve_refresh(**kwargs)
876+
836877
collection = self.database.collection_serializer.stac_to_db(collection, request)
837878
await self.database.update_collection(
838-
collection_id=collection_id, collection=collection
879+
collection_id=collection_id, collection=collection, refresh=refresh
839880
)
840881

841882
return CollectionSerializer.db_to_stac(
@@ -860,7 +901,12 @@ async def delete_collection(self, collection_id: str, **kwargs) -> None:
860901
Raises:
861902
NotFoundError: If the collection doesn't exist
862903
"""
863-
await self.database.delete_collection(collection_id=collection_id)
904+
# Resolve the `refresh` parameter
905+
refresh = self._resolve_refresh(**kwargs)
906+
907+
await self.database.delete_collection(
908+
collection_id=collection_id, refresh=refresh
909+
)
864910
return None
865911

866912

@@ -919,6 +965,19 @@ def bulk_item_insert(
919965
else:
920966
base_url = ""
921967

968+
# Use `refresh` from kwargs if provided, otherwise fall back to the environment variable
969+
refresh = kwargs.get(
970+
"refresh", self.database.sync_settings.database_refresh == "true"
971+
)
972+
973+
# Log the value of `refresh` and its source
974+
if "refresh" in kwargs:
975+
logger.info(f"`refresh` parameter explicitly passed in kwargs: {refresh}")
976+
else:
977+
logger.info(
978+
f"`refresh` parameter derived from environment variable: {refresh}"
979+
)
980+
922981
processed_items = []
923982
for item in items.items.values():
924983
try:
@@ -937,7 +996,7 @@ def bulk_item_insert(
937996
success, errors = self.database.bulk_sync(
938997
collection_id,
939998
processed_items,
940-
refresh=kwargs.get("refresh", False),
999+
refresh=refresh,
9411000
)
9421001
if errors:
9431002
logger.error(f"Bulk sync operation encountered errors: {errors}")

stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ class ElasticsearchSettings(ApiSettings, ApiBaseSettings):
8787
enable_response_models: bool = False
8888
enable_direct_response: bool = get_bool_env("ENABLE_DIRECT_RESPONSE", default=False)
8989
raise_on_bulk_error: bool = get_bool_env("RAISE_ON_BULK_ERROR", default=False)
90-
es_os_refresh: bool = get_bool_env("ES_OS_REFRESH", default=False)
90+
database_refresh: bool = get_bool_env("DATABASE_REFRESH", default=False)
9191

9292
@property
9393
def create_client(self):
@@ -109,6 +109,7 @@ class AsyncElasticsearchSettings(ApiSettings, ApiBaseSettings):
109109
enable_response_models: bool = False
110110
enable_direct_response: bool = get_bool_env("ENABLE_DIRECT_RESPONSE", default=False)
111111
raise_on_bulk_error: bool = get_bool_env("RAISE_ON_BULK_ERROR", default=False)
112+
database_refresh: bool = get_bool_env("DATABASE_REFRESH", default=False)
112113

113114
@property
114115
def create_client(self):

stac_fastapi/opensearch/stac_fastapi/opensearch/config.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ class OpensearchSettings(ApiSettings, ApiBaseSettings):
8484
enable_response_models: bool = False
8585
enable_direct_response: bool = get_bool_env("ENABLE_DIRECT_RESPONSE", default=False)
8686
raise_on_bulk_error: bool = get_bool_env("RAISE_ON_BULK_ERROR", default=False)
87+
database_refresh: bool = get_bool_env("DATABASE_REFRESH", default=False)
8788

8889
@property
8990
def create_client(self):
@@ -105,7 +106,7 @@ class AsyncOpensearchSettings(ApiSettings, ApiBaseSettings):
105106
enable_response_models: bool = False
106107
enable_direct_response: bool = get_bool_env("ENABLE_DIRECT_RESPONSE", default=False)
107108
raise_on_bulk_error: bool = get_bool_env("RAISE_ON_BULK_ERROR", default=False)
108-
es_os_refresh: bool = get_bool_env("ES_OS_REFRESH", default=False)
109+
database_refresh: bool = get_bool_env("DATABASE_REFRESH", default=False)
109110

110111
@property
111112
def create_client(self):

0 commit comments

Comments
 (0)