From 7f4d19079f50d0116b3b10c82bcbb42ca5c6cc3c Mon Sep 17 00:00:00 2001 From: jonhealy1 Date: Tue, 6 May 2025 13:18:04 +0800 Subject: [PATCH 1/6] Refactor create_item method to include base_url and exist_ok parameters --- CHANGELOG.md | 2 ++ stac_fastapi/core/stac_fastapi/core/core.py | 14 +++++++++----- .../elasticsearch/database_logic.py | 18 +++++++++++------- .../stac_fastapi/opensearch/database_logic.py | 18 +++++++++++------- 4 files changed, 33 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3afc86eb..aa75da51 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Fixed +- Refactored `create_item` and `update_item` methods to share unified logic, ensuring consistent conflict detection, validation, and database operations. + ## [v4.0.0] - 2025-04-23 ### Added diff --git a/stac_fastapi/core/stac_fastapi/core/core.py b/stac_fastapi/core/stac_fastapi/core/core.py index a8821273..33df0d71 100644 --- a/stac_fastapi/core/stac_fastapi/core/core.py +++ b/stac_fastapi/core/stac_fastapi/core/core.py @@ -720,10 +720,12 @@ async def create_item( return f"Successfully added {success} Items. {attempted - success} errors occurred." else: - item = await self.database.async_prep_create_item( - item=item, base_url=base_url + await self.database.create_item( + item, + refresh=kwargs.get("refresh", False), + base_url=base_url, + exist_ok=False, ) - await self.database.create_item(item, refresh=kwargs.get("refresh", False)) return ItemSerializer.db_to_stac(item, base_url) @overrides @@ -750,8 +752,10 @@ async def update_item( now = datetime_type.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") item["properties"]["updated"] = now - await self.database.check_collection_exists(collection_id) - await self.database.create_item(item, refresh=kwargs.get("refresh", False)) + # await self.database.check_collection_exists(collection_id) + await self.database.create_item( + item, refresh=kwargs.get("refresh", False), base_url=base_url, exist_ok=True + ) return ItemSerializer.db_to_stac(item, base_url) diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py index 2834a4de..9a773230 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py @@ -842,7 +842,13 @@ def bulk_sync_prep_create_item( logger.debug(f"Item {item['id']} prepared successfully.") return prepped_item - async def create_item(self, item: Item, refresh: bool = False): + async def create_item( + self, + item: Item, + refresh: bool = False, + base_url: str = "", + exist_ok: bool = False, + ): """Database logic for creating one item. Args: @@ -858,18 +864,16 @@ async def create_item(self, item: Item, refresh: bool = False): # todo: check if collection exists, but cache item_id = item["id"] collection_id = item["collection"] - es_resp = await self.client.index( + item = await self.async_prep_create_item( + item=item, base_url=base_url, exist_ok=exist_ok + ) + await self.client.index( index=index_alias_by_collection_id(collection_id), id=mk_item_id(item_id, collection_id), document=item, refresh=refresh, ) - if (meta := es_resp.get("meta")) and meta.get("status") == 409: - raise ConflictError( - f"Item {item_id} in collection {collection_id} already exists" - ) - async def delete_item( self, item_id: str, collection_id: str, refresh: bool = False ): diff --git a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py index a555e3b0..66c8d3e6 100644 --- a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py +++ b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py @@ -861,7 +861,13 @@ def bulk_sync_prep_create_item( logger.debug(f"Item {item['id']} prepared successfully.") return prepped_item - async def create_item(self, item: Item, refresh: bool = False): + async def create_item( + self, + item: Item, + refresh: bool = False, + base_url: str = "", + exist_ok: bool = False, + ): """Database logic for creating one item. Args: @@ -877,18 +883,16 @@ async def create_item(self, item: Item, refresh: bool = False): # todo: check if collection exists, but cache item_id = item["id"] collection_id = item["collection"] - es_resp = await self.client.index( + item = await self.async_prep_create_item( + item=item, base_url=base_url, exist_ok=exist_ok + ) + await self.client.index( index=index_alias_by_collection_id(collection_id), id=mk_item_id(item_id, collection_id), body=item, refresh=refresh, ) - if (meta := es_resp.get("meta")) and meta.get("status") == 409: - raise ConflictError( - f"Item {item_id} in collection {collection_id} already exists" - ) - async def delete_item( self, item_id: str, collection_id: str, refresh: bool = False ): From 0f85dcf766446e1ee8e3bd68da1cfd57b4b39269 Mon Sep 17 00:00:00 2001 From: jonhealy1 Date: Tue, 6 May 2025 13:21:07 +0800 Subject: [PATCH 2/6] update changelog --- CHANGELOG.md | 2 +- stac_fastapi/core/stac_fastapi/core/core.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index aa75da51..c3e33b33 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,7 +21,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Fixed -- Refactored `create_item` and `update_item` methods to share unified logic, ensuring consistent conflict detection, validation, and database operations. +- Refactored `create_item` and `update_item` methods to share unified logic, ensuring consistent conflict detection, validation, and database operations. [#368](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/368) ## [v4.0.0] - 2025-04-23 diff --git a/stac_fastapi/core/stac_fastapi/core/core.py b/stac_fastapi/core/stac_fastapi/core/core.py index 33df0d71..5dc91ee3 100644 --- a/stac_fastapi/core/stac_fastapi/core/core.py +++ b/stac_fastapi/core/stac_fastapi/core/core.py @@ -752,7 +752,6 @@ async def update_item( now = datetime_type.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") item["properties"]["updated"] = now - # await self.database.check_collection_exists(collection_id) await self.database.create_item( item, refresh=kwargs.get("refresh", False), base_url=base_url, exist_ok=True ) From a6b6e102d5708fa1122742d0c99a338634bf7d40 Mon Sep 17 00:00:00 2001 From: jonhealy1 Date: Tue, 6 May 2025 23:49:15 +0800 Subject: [PATCH 3/6] test, validate items in bulk transacton client --- stac_fastapi/core/stac_fastapi/core/core.py | 65 +++++--- .../tests/clients/test_bulk_transactions.py | 155 ++++++++++++++++++ stac_fastapi/tests/clients/test_es_os.py | 98 +---------- 3 files changed, 199 insertions(+), 119 deletions(-) create mode 100644 stac_fastapi/tests/clients/test_bulk_transactions.py diff --git a/stac_fastapi/core/stac_fastapi/core/core.py b/stac_fastapi/core/stac_fastapi/core/core.py index 5dc91ee3..9bcf8fc0 100644 --- a/stac_fastapi/core/stac_fastapi/core/core.py +++ b/stac_fastapi/core/stac_fastapi/core/core.py @@ -693,19 +693,26 @@ async def create_item( NotFoundError: If the specified collection is not found in the database. ConflictError: If an item with the same ID already exists in the collection. """ - item = item.model_dump(mode="json") - base_url = str(kwargs["request"].base_url) + # Ensure request is present + request = kwargs.get("request") + if not request: + raise ValueError("Request must be provided in kwargs") + base_url = str(request.base_url) + + # Convert Pydantic model to dict for uniform processing + item_dict = item.model_dump(mode="json") - # If a feature collection is posted - if item["type"] == "FeatureCollection": + # Handle FeatureCollection (bulk insert) + if item_dict["type"] == "FeatureCollection": bulk_client = BulkTransactionsClient( database=self.database, settings=self.settings ) + features = item_dict["features"] processed_items = [ bulk_client.preprocess_item( - item, base_url, BulkTransactionMethod.INSERT + feature, base_url, BulkTransactionMethod.INSERT ) - for item in item["features"] + for feature in features ] attempted = len(processed_items) success, errors = await self.database.bulk_async( @@ -714,19 +721,23 @@ async def create_item( refresh=kwargs.get("refresh", False), ) if errors: - logger.error(f"Bulk async operation encountered errors: {errors}") + logger.error( + f"Bulk async operation encountered errors for collection {collection_id}: {errors} (attempted {attempted})" + ) else: - logger.info(f"Bulk async operation succeeded with {success} actions.") - + logger.info( + f"Bulk async operation succeeded with {success} actions for collection {collection_id}." + ) return f"Successfully added {success} Items. {attempted - success} errors occurred." - else: - await self.database.create_item( - item, - refresh=kwargs.get("refresh", False), - base_url=base_url, - exist_ok=False, - ) - return ItemSerializer.db_to_stac(item, base_url) + + # Handle single item + await self.database.create_item( + item_dict, + refresh=kwargs.get("refresh", False), + base_url=base_url, + exist_ok=False, + ) + return ItemSerializer.db_to_stac(item_dict, base_url) @overrides async def update_item( @@ -911,12 +922,22 @@ def bulk_item_insert( else: base_url = "" - processed_items = [ - self.preprocess_item(item, base_url, items.method) - for item in items.items.values() - ] + processed_items = [] + for item in items.items.values(): + try: + validated = Item(**item) if not isinstance(item, Item) else item + processed_items.append( + self.preprocess_item( + validated.model_dump(mode="json"), base_url, items.method + ) + ) + except ValidationError: + # Immediately raise on the first invalid item (strict mode) + raise + + if not processed_items: + return "No valid items to insert." - # not a great way to get the collection_id-- should be part of the method signature collection_id = processed_items[0]["collection"] attempted = len(processed_items) success, errors = self.database.bulk_sync( diff --git a/stac_fastapi/tests/clients/test_bulk_transactions.py b/stac_fastapi/tests/clients/test_bulk_transactions.py new file mode 100644 index 00000000..87325b23 --- /dev/null +++ b/stac_fastapi/tests/clients/test_bulk_transactions.py @@ -0,0 +1,155 @@ +import os +import uuid +from copy import deepcopy + +import pytest +from pydantic import ValidationError + +from stac_fastapi.extensions.third_party.bulk_transactions import Items +from stac_fastapi.types.errors import ConflictError + +from ..conftest import MockRequest, create_item + +if os.getenv("BACKEND", "elasticsearch").lower() == "opensearch": + from stac_fastapi.opensearch.config import OpensearchSettings as SearchSettings +else: + from stac_fastapi.elasticsearch.config import ( + ElasticsearchSettings as SearchSettings, + ) + + +@pytest.mark.asyncio +async def test_bulk_item_insert(ctx, core_client, txn_client, bulk_txn_client): + items = {} + for _ in range(10): + _item = deepcopy(ctx.item) + _item["id"] = str(uuid.uuid4()) + items[_item["id"]] = _item + + # fc = es_core.item_collection(coll["id"], request=MockStarletteRequest) + # assert len(fc["features"]) == 0 + + bulk_txn_client.bulk_item_insert(Items(items=items), refresh=True) + + fc = await core_client.item_collection(ctx.collection["id"], request=MockRequest()) + assert len(fc["features"]) >= 10 + + # for item in items: + # es_transactions.delete_item( + # item["id"], item["collection"], request=MockStarletteRequest + # ) + + +@pytest.mark.asyncio +async def test_bulk_item_insert_with_raise_on_error( + ctx, core_client, txn_client, bulk_txn_client +): + """ + Test bulk_item_insert behavior with RAISE_ON_BULK_ERROR set to true and false. + + This test verifies that when RAISE_ON_BULK_ERROR is set to true, a ConflictError + is raised for conflicting items. When set to false, the operation logs errors + and continues gracefully. + """ + + # Insert an initial item to set up a conflict + initial_item = deepcopy(ctx.item) + initial_item["id"] = str(uuid.uuid4()) + await create_item(txn_client, initial_item) + + # Verify the initial item is inserted + fc = await core_client.item_collection(ctx.collection["id"], request=MockRequest()) + assert len(fc["features"]) >= 1 + + # Create conflicting items (same ID as the initial item) + conflicting_items = {initial_item["id"]: deepcopy(initial_item)} + + # Test with RAISE_ON_BULK_ERROR set to true + os.environ["RAISE_ON_BULK_ERROR"] = "true" + bulk_txn_client.database.sync_settings = SearchSettings() + + with pytest.raises(ConflictError): + bulk_txn_client.bulk_item_insert(Items(items=conflicting_items), refresh=True) + + # Test with RAISE_ON_BULK_ERROR set to false + os.environ["RAISE_ON_BULK_ERROR"] = "false" + bulk_txn_client.database.sync_settings = SearchSettings() # Reinitialize settings + result = bulk_txn_client.bulk_item_insert( + Items(items=conflicting_items), refresh=True + ) + + # Validate the results + assert "Successfully added/updated 1 Items" in result + + # Clean up the inserted item + await txn_client.delete_item(initial_item["id"], ctx.item["collection"]) + + +@pytest.mark.asyncio +async def test_feature_collection_insert( + core_client, + txn_client, + ctx, +): + features = [] + for _ in range(10): + _item = deepcopy(ctx.item) + _item["id"] = str(uuid.uuid4()) + features.append(_item) + + feature_collection = {"type": "FeatureCollection", "features": features} + + await create_item(txn_client, feature_collection) + + fc = await core_client.item_collection(ctx.collection["id"], request=MockRequest()) + assert len(fc["features"]) >= 10 + + +@pytest.mark.asyncio +async def test_bulk_item_insert_validation_error(ctx, core_client, bulk_txn_client): + items = {} + # Add 9 valid items + for _ in range(9): + _item = deepcopy(ctx.item) + _item["id"] = str(uuid.uuid4()) + items[_item["id"]] = _item + + # Add 1 invalid item (e.g., missing "datetime") + invalid_item = deepcopy(ctx.item) + invalid_item["id"] = str(uuid.uuid4()) + invalid_item["properties"].pop( + "datetime", None + ) # Remove datetime to make it invalid + items[invalid_item["id"]] = invalid_item + + # The bulk insert should raise a ValidationError due to the invalid item + with pytest.raises(ValidationError): + bulk_txn_client.bulk_item_insert(Items(items=items), refresh=True) + + +@pytest.mark.asyncio +async def test_feature_collection_insert_validation_error( + core_client, + txn_client, + ctx, +): + features = [] + # Add 9 valid items + for _ in range(9): + _item = deepcopy(ctx.item) + _item["id"] = str(uuid.uuid4()) + features.append(_item) + + # Add 1 invalid item (e.g., missing "datetime") + invalid_item = deepcopy(ctx.item) + invalid_item["id"] = str(uuid.uuid4()) + invalid_item["properties"].pop( + "datetime", None + ) # Remove datetime to make it invalid + features.append(invalid_item) + + feature_collection = {"type": "FeatureCollection", "features": features} + + # Assert that a ValidationError is raised due to the invalid item + with pytest.raises(ValidationError): + await create_item(txn_client, feature_collection) diff --git a/stac_fastapi/tests/clients/test_es_os.py b/stac_fastapi/tests/clients/test_es_os.py index e913f11f..0f200826 100644 --- a/stac_fastapi/tests/clients/test_es_os.py +++ b/stac_fastapi/tests/clients/test_es_os.py @@ -1,4 +1,3 @@ -import os import uuid from copy import deepcopy from typing import Callable @@ -6,17 +5,9 @@ import pytest from stac_pydantic import Item, api -from stac_fastapi.extensions.third_party.bulk_transactions import Items from stac_fastapi.types.errors import ConflictError, NotFoundError -from ..conftest import MockRequest, create_item - -if os.getenv("BACKEND", "elasticsearch").lower() == "opensearch": - from stac_fastapi.opensearch.config import OpensearchSettings as SearchSettings -else: - from stac_fastapi.elasticsearch.config import ( - ElasticsearchSettings as SearchSettings, - ) +from ..conftest import MockRequest @pytest.mark.asyncio @@ -283,93 +274,6 @@ async def test_delete_item(ctx, core_client, txn_client): ) -@pytest.mark.asyncio -async def test_bulk_item_insert(ctx, core_client, txn_client, bulk_txn_client): - items = {} - for _ in range(10): - _item = deepcopy(ctx.item) - _item["id"] = str(uuid.uuid4()) - items[_item["id"]] = _item - - # fc = es_core.item_collection(coll["id"], request=MockStarletteRequest) - # assert len(fc["features"]) == 0 - - bulk_txn_client.bulk_item_insert(Items(items=items), refresh=True) - - fc = await core_client.item_collection(ctx.collection["id"], request=MockRequest()) - assert len(fc["features"]) >= 10 - - # for item in items: - # es_transactions.delete_item( - # item["id"], item["collection"], request=MockStarletteRequest - # ) - - -@pytest.mark.asyncio -async def test_bulk_item_insert_with_raise_on_error( - ctx, core_client, txn_client, bulk_txn_client -): - """ - Test bulk_item_insert behavior with RAISE_ON_BULK_ERROR set to true and false. - - This test verifies that when RAISE_ON_BULK_ERROR is set to true, a ConflictError - is raised for conflicting items. When set to false, the operation logs errors - and continues gracefully. - """ - - # Insert an initial item to set up a conflict - initial_item = deepcopy(ctx.item) - initial_item["id"] = str(uuid.uuid4()) - await create_item(txn_client, initial_item) - - # Verify the initial item is inserted - fc = await core_client.item_collection(ctx.collection["id"], request=MockRequest()) - assert len(fc["features"]) >= 1 - - # Create conflicting items (same ID as the initial item) - conflicting_items = {initial_item["id"]: deepcopy(initial_item)} - - # Test with RAISE_ON_BULK_ERROR set to true - os.environ["RAISE_ON_BULK_ERROR"] = "true" - bulk_txn_client.database.sync_settings = SearchSettings() - - with pytest.raises(ConflictError): - bulk_txn_client.bulk_item_insert(Items(items=conflicting_items), refresh=True) - - # Test with RAISE_ON_BULK_ERROR set to false - os.environ["RAISE_ON_BULK_ERROR"] = "false" - bulk_txn_client.database.sync_settings = SearchSettings() # Reinitialize settings - result = bulk_txn_client.bulk_item_insert( - Items(items=conflicting_items), refresh=True - ) - - # Validate the results - assert "Successfully added/updated 1 Items" in result - - # Clean up the inserted item - await txn_client.delete_item(initial_item["id"], ctx.item["collection"]) - - -@pytest.mark.asyncio -async def test_feature_collection_insert( - core_client, - txn_client, - ctx, -): - features = [] - for _ in range(10): - _item = deepcopy(ctx.item) - _item["id"] = str(uuid.uuid4()) - features.append(_item) - - feature_collection = {"type": "FeatureCollection", "features": features} - - await create_item(txn_client, feature_collection) - - fc = await core_client.item_collection(ctx.collection["id"], request=MockRequest()) - assert len(fc["features"]) >= 10 - - @pytest.mark.asyncio async def test_landing_page_no_collection_title(ctx, core_client, txn_client, app): ctx.collection["id"] = "new_id" From d6eda0458c7a56f5e01b35c44f8e4264f7406131 Mon Sep 17 00:00:00 2001 From: jonhealy1 Date: Tue, 6 May 2025 23:55:30 +0800 Subject: [PATCH 4/6] update readme, changelog --- CHANGELOG.md | 2 ++ README.md | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c3e33b33..cdc77391 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. - Updated dynamic mapping for items to map long values to double versus float. [#326](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/326) - Extended Datetime Search to search on start_datetime and end_datetime as well as datetime fields. [#182](https://github.com/stac-utils/stac-fastapi-elasticsearch/pull/182) - Changed item update operation to use Elasticsearch index API instead of delete and create for better efficiency and atomicity. [#75](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/issues/75) +- Bulk insertion via `BulkTransactionsClient` now strictly validates all STAC Items using the Pydantic model before insertion. Any invalid item will immediately raise a `ValidationError`, ensuring consistent validation with single-item inserts and preventing invalid STAC Items from being stored. This validation is enforced regardless of the `RAISE_ON_BULK_ERROR` setting. [#368](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/368) + ### Fixed diff --git a/README.md b/README.md index 5f93608e..1ae2f085 100644 --- a/README.md +++ b/README.md @@ -114,7 +114,7 @@ You can customize additional settings in your `.env` file: | `ELASTICSEARCH_VERSION` | Version of Elasticsearch to use. | `8.11.0` | Optional | | `ENABLE_DIRECT_RESPONSE` | Enable direct response for maximum performance (disables all FastAPI dependencies, including authentication, custom status codes, and validation) | `false` | Optional | | `OPENSEARCH_VERSION` | OpenSearch version | `2.11.1` | Optional -| `RAISE_ON_BULK_ERROR` | Controls whether bulk insert operations raise exceptions on errors. If set to `true`, the operation will stop and raise an exception when an error occurs. If set to `false`, errors will be logged, and the operation will continue. | `false` | Optional | +| `RAISE_ON_BULK_ERROR` | Controls whether bulk insert operations raise exceptions on errors. If set to `true`, the operation will stop and raise an exception when an error occurs. If set to `false`, errors will be logged, and the operation will continue. **Note:** STAC Item and ItemCollection validation errors will always raise, regardless of this flag. | `false` | Optional | > [!NOTE] > The variables `ES_HOST`, `ES_PORT`, `ES_USE_SSL`, and `ES_VERIFY_CERTS` apply to both Elasticsearch and OpenSearch backends, so there is no need to rename the key names to `OS_` even if you're using OpenSearch. From b9c88c425a0db2872d2b6cfb4dd7a001a2142cc4 Mon Sep 17 00:00:00 2001 From: jonhealy1 Date: Wed, 7 May 2025 10:50:58 +0800 Subject: [PATCH 5/6] remove commented out loop --- stac_fastapi/tests/clients/test_bulk_transactions.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/stac_fastapi/tests/clients/test_bulk_transactions.py b/stac_fastapi/tests/clients/test_bulk_transactions.py index 87325b23..a7405938 100644 --- a/stac_fastapi/tests/clients/test_bulk_transactions.py +++ b/stac_fastapi/tests/clients/test_bulk_transactions.py @@ -34,11 +34,6 @@ async def test_bulk_item_insert(ctx, core_client, txn_client, bulk_txn_client): fc = await core_client.item_collection(ctx.collection["id"], request=MockRequest()) assert len(fc["features"]) >= 10 - # for item in items: - # es_transactions.delete_item( - # item["id"], item["collection"], request=MockStarletteRequest - # ) - @pytest.mark.asyncio async def test_bulk_item_insert_with_raise_on_error( From 94ad607a76c3bbd2fb3bbf1b9fc0ad74a8894244 Mon Sep 17 00:00:00 2001 From: jonhealy1 Date: Wed, 7 May 2025 11:04:48 +0800 Subject: [PATCH 6/6] remove unnecessary code --- stac_fastapi/core/stac_fastapi/core/core.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/stac_fastapi/core/stac_fastapi/core/core.py b/stac_fastapi/core/stac_fastapi/core/core.py index 9bcf8fc0..f994b619 100644 --- a/stac_fastapi/core/stac_fastapi/core/core.py +++ b/stac_fastapi/core/stac_fastapi/core/core.py @@ -693,10 +693,7 @@ async def create_item( NotFoundError: If the specified collection is not found in the database. ConflictError: If an item with the same ID already exists in the collection. """ - # Ensure request is present request = kwargs.get("request") - if not request: - raise ValueError("Request must be provided in kwargs") base_url = str(request.base_url) # Convert Pydantic model to dict for uniform processing @@ -935,9 +932,6 @@ def bulk_item_insert( # Immediately raise on the first invalid item (strict mode) raise - if not processed_items: - return "No valid items to insert." - collection_id = processed_items[0]["collection"] attempted = len(processed_items) success, errors = self.database.bulk_sync(