Skip to content

Enable choice of raising bulk insert errors, logging #364

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
May 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

### Added

- Added logging to bulk insertion methods to provide detailed feedback on errors encountered during operations. [#364](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/364)
- Introduced the `RAISE_ON_BULK_ERROR` environment variable to control whether bulk insertion methods raise exceptions on errors (`true`) or log warnings and continue processing (`false`). [#364](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/364)
- Added code coverage reporting to the test suite using pytest-cov. [#87](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/issues/87)

### Changed
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ You can customize additional settings in your `.env` file:
| `BACKEND` | Tests-related variable | `elasticsearch` or `opensearch` based on the backend | Optional |
| `ELASTICSEARCH_VERSION` | Version of Elasticsearch to use. | `8.11.0` | Optional |
| `ENABLE_DIRECT_RESPONSE` | Enable direct response for maximum performance (disables all FastAPI dependencies, including authentication, custom status codes, and validation) | `false` | Optional |
| `OPENSEARCH_VERSION` | OpenSearch version | `2.11.1` | Optional |
| `OPENSEARCH_VERSION` | OpenSearch version | `2.11.1` | Optional
| `RAISE_ON_BULK_ERROR` | Controls whether bulk insert operations raise exceptions on errors. If set to `true`, the operation will stop and raise an exception when an error occurs. If set to `false`, errors will be logged, and the operation will continue. | `false` | Optional |

> [!NOTE]
> The variables `ES_HOST`, `ES_PORT`, `ES_USE_SSL`, and `ES_VERIFY_CERTS` apply to both Elasticsearch and OpenSearch backends, so there is no need to rename the key names to `OS_` even if you're using OpenSearch.
Expand Down
53 changes: 34 additions & 19 deletions stac_fastapi/core/stac_fastapi/core/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -676,21 +676,22 @@ class TransactionsClient(AsyncBaseTransactionsClient):
@overrides
async def create_item(
self, collection_id: str, item: Union[Item, ItemCollection], **kwargs
) -> Optional[stac_types.Item]:
"""Create an item in the collection.
) -> Union[stac_types.Item, str]:
"""
Create an item or a feature collection of items in the specified collection.

Args:
collection_id (str): The id of the collection to add the item to.
item (stac_types.Item): The item to be added to the collection.
kwargs: Additional keyword arguments.
collection_id (str): The ID of the collection to add the item(s) to.
item (Union[Item, ItemCollection]): A single item or a collection of items to be added.
**kwargs: Additional keyword arguments, such as `request` and `refresh`.

Returns:
stac_types.Item: The created item.
Union[stac_types.Item, str]: The created item if a single item is added, or a summary string
indicating the number of items successfully added and errors if a collection of items is added.

Raises:
NotFound: If the specified collection is not found in the database.
ConflictError: If the item in the specified collection already exists.

NotFoundError: If the specified collection is not found in the database.
ConflictError: If an item with the same ID already exists in the collection.
"""
item = item.model_dump(mode="json")
base_url = str(kwargs["request"].base_url)
Expand All @@ -706,14 +707,22 @@ async def create_item(
)
for item in item["features"]
]

await self.database.bulk_async(
collection_id, processed_items, refresh=kwargs.get("refresh", False)
attempted = len(processed_items)
success, errors = await self.database.bulk_async(
collection_id,
processed_items,
refresh=kwargs.get("refresh", False),
)
if errors:
logger.error(f"Bulk async operation encountered errors: {errors}")
else:
logger.info(f"Bulk async operation succeeded with {success} actions.")

return None
return f"Successfully added {success} Items. {attempted - success} errors occurred."
else:
item = await self.database.prep_create_item(item=item, base_url=base_url)
item = await self.database.async_prep_create_item(
item=item, base_url=base_url
)
await self.database.create_item(item, refresh=kwargs.get("refresh", False))
return ItemSerializer.db_to_stac(item, base_url)

Expand Down Expand Up @@ -875,7 +884,7 @@ def preprocess_item(
The preprocessed item.
"""
exist_ok = method == BulkTransactionMethod.UPSERT
return self.database.sync_prep_create_item(
return self.database.bulk_sync_prep_create_item(
item=item, base_url=base_url, exist_ok=exist_ok
)

Expand Down Expand Up @@ -906,12 +915,18 @@ def bulk_item_insert(

# not a great way to get the collection_id-- should be part of the method signature
collection_id = processed_items[0]["collection"]

self.database.bulk_sync(
collection_id, processed_items, refresh=kwargs.get("refresh", False)
attempted = len(processed_items)
success, errors = self.database.bulk_sync(
collection_id,
processed_items,
refresh=kwargs.get("refresh", False),
)
if errors:
logger.error(f"Bulk sync operation encountered errors: {errors}")
else:
logger.info(f"Bulk sync operation succeeded with {success} actions.")

return f"Successfully added {len(processed_items)} Items."
return f"Successfully added/updated {success} Items. {attempted - success} errors occurred."


_DEFAULT_QUERYABLES: Dict[str, Dict[str, Any]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ class ElasticsearchSettings(ApiSettings, ApiBaseSettings):
indexed_fields: Set[str] = {"datetime"}
enable_response_models: bool = False
enable_direct_response: bool = get_bool_env("ENABLE_DIRECT_RESPONSE", default=False)
raise_on_bulk_error: bool = get_bool_env("RAISE_ON_BULK_ERROR", default=False)

@property
def create_client(self):
Expand All @@ -106,6 +107,7 @@ class AsyncElasticsearchSettings(ApiSettings, ApiBaseSettings):
indexed_fields: Set[str] = {"datetime"}
enable_response_models: bool = False
enable_direct_response: bool = get_bool_env("ENABLE_DIRECT_RESPONSE", default=False)
raise_on_bulk_error: bool = get_bool_env("RAISE_ON_BULK_ERROR", default=False)

@property
def create_client(self):
Expand Down
178 changes: 137 additions & 41 deletions stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,20 @@ async def delete_item_index(collection_id: str):
class DatabaseLogic(BaseDatabaseLogic):
"""Database logic."""

client = AsyncElasticsearchSettings().create_client
sync_client = SyncElasticsearchSettings().create_client
async_settings: AsyncElasticsearchSettings = attr.ib(
factory=AsyncElasticsearchSettings
)
sync_settings: SyncElasticsearchSettings = attr.ib(
factory=SyncElasticsearchSettings
)

client = attr.ib(init=False)
sync_client = attr.ib(init=False)

def __attrs_post_init__(self):
"""Initialize clients after the class is instantiated."""
self.client = self.async_settings.create_client
self.sync_client = self.sync_settings.create_client

item_serializer: Type[ItemSerializer] = attr.ib(default=ItemSerializer)
collection_serializer: Type[CollectionSerializer] = attr.ib(
Expand Down Expand Up @@ -699,7 +711,7 @@ async def check_collection_exists(self, collection_id: str):
if not await self.client.exists(index=COLLECTIONS_INDEX, id=collection_id):
raise NotFoundError(f"Collection {collection_id} does not exist")

async def prep_create_item(
async def async_prep_create_item(
self, item: Item, base_url: str, exist_ok: bool = False
) -> Item:
"""
Expand Down Expand Up @@ -729,42 +741,106 @@ async def prep_create_item(

return self.item_serializer.stac_to_db(item, base_url)

def sync_prep_create_item(
async def bulk_async_prep_create_item(
self, item: Item, base_url: str, exist_ok: bool = False
) -> Item:
"""
Prepare an item for insertion into the database.
This method performs pre-insertion preparation on the given `item`,
such as checking if the collection the item belongs to exists,
and optionally verifying that an item with the same ID does not already exist in the database.
This method performs pre-insertion preparation on the given `item`, such as:
- Verifying that the collection the item belongs to exists.
- Optionally checking if an item with the same ID already exists in the database.
- Serializing the item into a database-compatible format.
Args:
item (Item): The item to be inserted into the database.
base_url (str): The base URL used for constructing URLs for the item.
exist_ok (bool): Indicates whether the item can exist already.
item (Item): The item to be prepared for insertion.
base_url (str): The base URL used to construct the item's self URL.
exist_ok (bool): Indicates whether the item can already exist in the database.
If False, a `ConflictError` is raised if the item exists.
Returns:
Item: The item after preparation is done.
Item: The prepared item, serialized into a database-compatible format.
Raises:
NotFoundError: If the collection that the item belongs to does not exist in the database.
ConflictError: If an item with the same ID already exists in the collection.
ConflictError: If an item with the same ID already exists in the collection and `exist_ok` is False,
and `RAISE_ON_BULK_ERROR` is set to `true`.
"""
item_id = item["id"]
collection_id = item["collection"]
if not self.sync_client.exists(index=COLLECTIONS_INDEX, id=collection_id):
raise NotFoundError(f"Collection {collection_id} does not exist")
logger.debug(f"Preparing item {item['id']} in collection {item['collection']}.")

if not exist_ok and self.sync_client.exists(
index=index_alias_by_collection_id(collection_id),
id=mk_item_id(item_id, collection_id),
# Check if the collection exists
await self.check_collection_exists(collection_id=item["collection"])

# Check if the item already exists in the database
if not exist_ok and await self.client.exists(
index=index_alias_by_collection_id(item["collection"]),
id=mk_item_id(item["id"], item["collection"]),
):
raise ConflictError(
f"Item {item_id} in collection {collection_id} already exists"
error_message = (
f"Item {item['id']} in collection {item['collection']} already exists."
)
if self.async_settings.raise_on_bulk_error:
raise ConflictError(error_message)
else:
logger.warning(
f"{error_message} Continuing as `RAISE_ON_BULK_ERROR` is set to false."
)

# Serialize the item into a database-compatible format
prepped_item = self.item_serializer.stac_to_db(item, base_url)
logger.debug(f"Item {item['id']} prepared successfully.")
return prepped_item

def bulk_sync_prep_create_item(
self, item: Item, base_url: str, exist_ok: bool = False
) -> Item:
"""
Prepare an item for insertion into the database.
return self.item_serializer.stac_to_db(item, base_url)
This method performs pre-insertion preparation on the given `item`, such as:
- Verifying that the collection the item belongs to exists.
- Optionally checking if an item with the same ID already exists in the database.
- Serializing the item into a database-compatible format.
Args:
item (Item): The item to be prepared for insertion.
base_url (str): The base URL used to construct the item's self URL.
exist_ok (bool): Indicates whether the item can already exist in the database.
If False, a `ConflictError` is raised if the item exists.
Returns:
Item: The prepared item, serialized into a database-compatible format.
Raises:
NotFoundError: If the collection that the item belongs to does not exist in the database.
ConflictError: If an item with the same ID already exists in the collection and `exist_ok` is False,
and `RAISE_ON_BULK_ERROR` is set to `true`.
"""
logger.debug(f"Preparing item {item['id']} in collection {item['collection']}.")

# Check if the collection exists
if not self.sync_client.exists(index=COLLECTIONS_INDEX, id=item["collection"]):
raise NotFoundError(f"Collection {item['collection']} does not exist")

# Check if the item already exists in the database
if not exist_ok and self.sync_client.exists(
index=index_alias_by_collection_id(item["collection"]),
id=mk_item_id(item["id"], item["collection"]),
):
error_message = (
f"Item {item['id']} in collection {item['collection']} already exists."
)
if self.sync_settings.raise_on_bulk_error:
raise ConflictError(error_message)
else:
logger.warning(
f"{error_message} Continuing as `RAISE_ON_BULK_ERROR` is set to false."
)

# Serialize the item into a database-compatible format
prepped_item = self.item_serializer.stac_to_db(item, base_url)
logger.debug(f"Item {item['id']} prepared successfully.")
return prepped_item

async def create_item(self, item: Item, refresh: bool = False):
"""Database logic for creating one item.
Expand Down Expand Up @@ -959,52 +1035,72 @@ async def delete_collection(self, collection_id: str, refresh: bool = False):
await delete_item_index(collection_id)

async def bulk_async(
self, collection_id: str, processed_items: List[Item], refresh: bool = False
) -> None:
"""Perform a bulk insert of items into the database asynchronously.
self,
collection_id: str,
processed_items: List[Item],
refresh: bool = False,
) -> Tuple[int, List[Dict[str, Any]]]:
"""
Perform a bulk insert of items into the database asynchronously.
Args:
self: The instance of the object calling this function.
collection_id (str): The ID of the collection to which the items belong.
processed_items (List[Item]): A list of `Item` objects to be inserted into the database.
refresh (bool): Whether to refresh the index after the bulk insert (default: False).
Returns:
Tuple[int, List[Dict[str, Any]]]: A tuple containing:
- The number of successfully processed actions (`success`).
- A list of errors encountered during the bulk operation (`errors`).
Notes:
This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`. The
insert is performed asynchronously, and the event loop is used to run the operation in a separate executor. The
`mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to True, the
index is refreshed after the bulk insert. The function does not return any value.
This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`.
The insert is performed asynchronously, and the event loop is used to run the operation in a separate executor.
The `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to True,
the index is refreshed after the bulk insert.
"""
await helpers.async_bulk(
raise_on_error = self.async_settings.raise_on_bulk_error
success, errors = await helpers.async_bulk(
self.client,
mk_actions(collection_id, processed_items),
refresh=refresh,
raise_on_error=False,
raise_on_error=raise_on_error,
)
return success, errors

def bulk_sync(
self, collection_id: str, processed_items: List[Item], refresh: bool = False
) -> None:
"""Perform a bulk insert of items into the database synchronously.
self,
collection_id: str,
processed_items: List[Item],
refresh: bool = False,
) -> Tuple[int, List[Dict[str, Any]]]:
"""
Perform a bulk insert of items into the database synchronously.
Args:
self: The instance of the object calling this function.
collection_id (str): The ID of the collection to which the items belong.
processed_items (List[Item]): A list of `Item` objects to be inserted into the database.
refresh (bool): Whether to refresh the index after the bulk insert (default: False).
Returns:
Tuple[int, List[Dict[str, Any]]]: A tuple containing:
- The number of successfully processed actions (`success`).
- A list of errors encountered during the bulk operation (`errors`).
Notes:
This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`. The
insert is performed synchronously and blocking, meaning that the function does not return until the insert has
This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`.
The insert is performed synchronously and blocking, meaning that the function does not return until the insert has
completed. The `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to
True, the index is refreshed after the bulk insert. The function does not return any value.
True, the index is refreshed after the bulk insert.
"""
helpers.bulk(
raise_on_error = self.sync_settings.raise_on_bulk_error
success, errors = helpers.bulk(
self.sync_client,
mk_actions(collection_id, processed_items),
refresh=refresh,
raise_on_error=False,
raise_on_error=raise_on_error,
)
return success, errors

# DANGER
async def delete_items(self) -> None:
Expand Down
Loading