From 4d3a160485bc12c5a24207ec667c4e9c6927637f Mon Sep 17 00:00:00 2001 From: Phil Varner Date: Wed, 16 Mar 2022 15:49:12 -0400 Subject: [PATCH 1/2] use concatenation of id and collection for elasticsearch _id value --- Makefile | 2 +- .../stac_fastapi/elasticsearch/core.py | 9 +- .../elasticsearch/transactions.py | 99 +++++++++++-------- .../tests/clients/test_elasticsearch.py | 8 +- stac_fastapi/elasticsearch/tests/conftest.py | 5 +- .../tests/resources/test_item.py | 2 + 6 files changed, 77 insertions(+), 48 deletions(-) diff --git a/Makefile b/Makefile index bb0a8c34..e7bb1f74 100644 --- a/Makefile +++ b/Makefile @@ -25,7 +25,7 @@ docker-shell: .PHONY: test test: - $(run_es) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh elasticsearch:9200 && cd /app/stac_fastapi/elasticsearch/tests/ && pytest' + -$(run_es) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh elasticsearch:9200 && cd /app/stac_fastapi/elasticsearch/tests/ && pytest' docker-compose down .PHONY: run-database diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py index f59431de..c36b5a7b 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py @@ -33,6 +33,11 @@ COLLECTIONS_INDEX = "stac_collections" +def mk_item_id(item_id: str, collection_id: str): + """Make the Elasticsearch document _id value from the Item id and collection.""" + return f"{item_id}|{collection_id}" + + @attr.s class CoreCrudClient(BaseCoreClient): """Client for core endpoints defined by stac.""" @@ -142,7 +147,9 @@ def get_item(self, item_id: str, collection_id: str, **kwargs) -> Item: """Get item by item id, collection id.""" base_url = str(kwargs["request"].base_url) try: - item = self.client.get(index=ITEMS_INDEX, id=item_id) + item = self.client.get( + index=ITEMS_INDEX, id=mk_item_id(item_id, collection_id) + ) except elasticsearch.exceptions.NotFoundError: raise NotFoundError( f"Item {item_id} does not exist in Collection {collection_id}" diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/transactions.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/transactions.py index 0ec82ea8..ede8e25b 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/transactions.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/transactions.py @@ -10,7 +10,7 @@ from overrides import overrides from stac_fastapi.elasticsearch.config import ElasticsearchSettings -from stac_fastapi.elasticsearch.core import COLLECTIONS_INDEX, ITEMS_INDEX +from stac_fastapi.elasticsearch.core import COLLECTIONS_INDEX, ITEMS_INDEX, mk_item_id from stac_fastapi.elasticsearch.serializers import CollectionSerializer, ItemSerializer from stac_fastapi.elasticsearch.session import Session from stac_fastapi.extensions.third_party.bulk_transactions import ( @@ -42,29 +42,39 @@ def create_item(self, item: stac_types.Item, **kwargs) -> stac_types.Item: if item["type"] == "FeatureCollection": bulk_client = BulkTransactionsClient() processed_items = [ - bulk_client._preprocess_item(item, base_url) - for item in item["features"] + bulk_client.preprocess_item(item, base_url) for item in item["features"] ] return_msg = f"Successfully added {len(processed_items)} items." bulk_client.bulk_sync(processed_items) return return_msg - - # If a single item is posted - if not self.client.exists(index=COLLECTIONS_INDEX, id=item["collection"]): - raise ForeignKeyError(f"Collection {item['collection']} does not exist") - - if self.client.exists(index=ITEMS_INDEX, id=item["id"]): - raise ConflictError( - f"Item {item['id']} in collection {item['collection']} already exists" + else: + # TODO + if self.client.exists( + index=ITEMS_INDEX, id=mk_item_id(item["id"], item["collection"]) + ): + raise ConflictError( + f"Item {item['id']} in collection {item['collection']} already exists" + ) + + # todo: check if collection exists, but cache + if not self.client.exists(index=COLLECTIONS_INDEX, id=item["collection"]): + raise ForeignKeyError(f"Collection {item['collection']} does not exist") + + item = BulkTransactionsClient().preprocess_item(item, base_url) + + es_resp = self.client.index( + index=ITEMS_INDEX, + id=mk_item_id(item["id"], item["collection"]), + document=item, ) - data = ItemSerializer.stac_to_db(item, base_url) + if (meta := es_resp.get("meta")) and meta.get("status") == 409: + raise ConflictError( + f"Item {item['id']} in collection {item['collection']} already exists" + ) - self.client.index( - index=ITEMS_INDEX, doc_type="_doc", id=item["id"], document=data - ) - return ItemSerializer.db_to_stac(item, base_url) + return item @overrides def update_item(self, item: stac_types.Item, **kwargs) -> stac_types.Item: @@ -75,14 +85,11 @@ def update_item(self, item: stac_types.Item, **kwargs) -> stac_types.Item: if not self.client.exists(index=COLLECTIONS_INDEX, id=item["collection"]): raise ForeignKeyError(f"Collection {item['collection']} does not exist") - if not self.client.exists(index=ITEMS_INDEX, id=item["id"]): - raise NotFoundError( - f"Item {item['id']} in collection {item['collection']} doesn't exist" - ) + + # todo: index instead of delete and create self.delete_item(item["id"], item["collection"]) self.create_item(item, **kwargs) - # self.client.update(index=ITEMS_INDEX,doc_type='_doc',id=model["id"], - # body=model) + # self.client.update(index=ITEMS_INDEX,id=item["id"], body=item) return ItemSerializer.db_to_stac(item, base_url) @overrides @@ -91,10 +98,12 @@ def delete_item( ) -> stac_types.Item: """Delete item.""" try: - _ = self.client.get(index=ITEMS_INDEX, id=item_id) + self.client.delete(index=ITEMS_INDEX, id=mk_item_id(item_id, collection_id)) except elasticsearch.exceptions.NotFoundError: - raise NotFoundError(f"Item {item_id} not found") - self.client.delete(index=ITEMS_INDEX, doc_type="_doc", id=item_id) + raise NotFoundError( + f"Item {item_id} in collection {collection_id} not found" + ) + return None @overrides def create_collection( @@ -109,9 +118,9 @@ def create_collection( if self.client.exists(index=COLLECTIONS_INDEX, id=collection["id"]): raise ConflictError(f"Collection {collection['id']} already exists") + self.client.index( index=COLLECTIONS_INDEX, - doc_type="_doc", id=collection["id"], document=collection, ) @@ -139,7 +148,8 @@ def delete_collection(self, collection_id: str, **kwargs) -> stac_types.Collecti _ = self.client.get(index=COLLECTIONS_INDEX, id=collection_id) except elasticsearch.exceptions.NotFoundError: raise NotFoundError(f"Collection {collection_id} not found") - self.client.delete(index=COLLECTIONS_INDEX, doc_type="_doc", id=collection_id) + self.client.delete(index=COLLECTIONS_INDEX, id=collection_id) + return None @attr.s @@ -153,22 +163,28 @@ def __attrs_post_init__(self): settings = ElasticsearchSettings() self.client = settings.create_client - def _preprocess_item(self, model: stac_types.Item, base_url) -> stac_types.Item: + def preprocess_item(self, item: stac_types.Item, base_url) -> stac_types.Item: """Preprocess items to match data model.""" - if not self.client.exists(index=COLLECTIONS_INDEX, id=model["collection"]): - raise ForeignKeyError(f"Collection {model['collection']} does not exist") + if not self.client.exists(index=COLLECTIONS_INDEX, id=item["collection"]): + raise ForeignKeyError(f"Collection {item['collection']} does not exist") - if self.client.exists(index=ITEMS_INDEX, id=model["id"]): + if self.client.exists(index=ITEMS_INDEX, id=item["id"]): raise ConflictError( - f"Item {model['id']} in collection {model['collection']} already exists" + f"Item {item['id']} in collection {item['collection']} already exists" ) - item = ItemSerializer.stac_to_db(model, base_url) - return item + return ItemSerializer.stac_to_db(item, base_url) def bulk_sync(self, processed_items): """Elasticsearch bulk insertion.""" - actions = [{"_index": ITEMS_INDEX, "_source": item} for item in processed_items] + actions = [ + { + "_index": ITEMS_INDEX, + "_id": mk_item_id(item["id"], item["collection"]), + "_source": item, + } + for item in processed_items + ] helpers.bulk(self.client, actions) @overrides @@ -176,15 +192,16 @@ def bulk_item_insert( self, items: Items, chunk_size: Optional[int] = None, **kwargs ) -> str: """Bulk item insertion using es.""" - try: - base_url = str(kwargs["request"].base_url) - except Exception: + request = kwargs.get("request") + if request: + base_url = str(request.base_url) + else: base_url = "" + processed_items = [ - self._preprocess_item(item, base_url) for item in items.items.values() + self.preprocess_item(item, base_url) for item in items.items.values() ] - return_msg = f"Successfully added {len(processed_items)} items." self.bulk_sync(processed_items) - return return_msg + return f"Successfully added {len(processed_items)} Items." diff --git a/stac_fastapi/elasticsearch/tests/clients/test_elasticsearch.py b/stac_fastapi/elasticsearch/tests/clients/test_elasticsearch.py index 7fc8f876..060e7eb2 100644 --- a/stac_fastapi/elasticsearch/tests/clients/test_elasticsearch.py +++ b/stac_fastapi/elasticsearch/tests/clients/test_elasticsearch.py @@ -100,19 +100,19 @@ def test_get_item( item_data = load_test_data("test_item.json") es_transactions.create_collection(collection_data, request=MockStarletteRequest) es_transactions.create_item(item_data, request=MockStarletteRequest) - coll = es_core.get_item( + got_item = es_core.get_item( item_id=item_data["id"], collection_id=item_data["collection"], request=MockStarletteRequest, ) - assert coll["id"] == item_data["id"] - assert coll["collection"] == item_data["collection"] + assert got_item["id"] == item_data["id"] + assert got_item["collection"] == item_data["collection"] es_transactions.delete_collection( collection_data["id"], request=MockStarletteRequest ) es_transactions.delete_item( - item_data["id"], coll["id"], request=MockStarletteRequest + item_data["id"], item_data["collection"], request=MockStarletteRequest ) diff --git a/stac_fastapi/elasticsearch/tests/conftest.py b/stac_fastapi/elasticsearch/tests/conftest.py index c207117c..d27d462a 100644 --- a/stac_fastapi/elasticsearch/tests/conftest.py +++ b/stac_fastapi/elasticsearch/tests/conftest.py @@ -153,7 +153,10 @@ def app_client(api_client, load_test_data): try: client.create_collection(coll, request=MockStarletteRequest) except ConflictError: - pass + try: + client.delete_item("test-item", "test-collection") + except Exception: + pass with TestClient(api_client.app) as test_app: yield test_app diff --git a/stac_fastapi/elasticsearch/tests/resources/test_item.py b/stac_fastapi/elasticsearch/tests/resources/test_item.py index 3bd07b56..9b052dc8 100644 --- a/stac_fastapi/elasticsearch/tests/resources/test_item.py +++ b/stac_fastapi/elasticsearch/tests/resources/test_item.py @@ -143,6 +143,8 @@ def test_update_item_missing_collection(app_client, load_test_data): def test_update_item_geometry(app_client, load_test_data): test_item = load_test_data("test_item.json") + test_item["id"] = "update_test_item_1" + # Create the item resp = app_client.post( f"/collections/{test_item['collection']}/items", json=test_item From 1d0aac8b7fb23d01ee29729a311d04dd3d880b1b Mon Sep 17 00:00:00 2001 From: Phil Varner Date: Wed, 16 Mar 2022 15:59:41 -0400 Subject: [PATCH 2/2] un-rearrange code --- .../stac_fastapi/elasticsearch/transactions.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/transactions.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/transactions.py index ede8e25b..6973c81b 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/transactions.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/transactions.py @@ -49,7 +49,10 @@ def create_item(self, item: stac_types.Item, **kwargs) -> stac_types.Item: return return_msg else: - # TODO + # todo: check if collection exists, but cache + if not self.client.exists(index=COLLECTIONS_INDEX, id=item["collection"]): + raise ForeignKeyError(f"Collection {item['collection']} does not exist") + if self.client.exists( index=ITEMS_INDEX, id=mk_item_id(item["id"], item["collection"]) ): @@ -57,10 +60,6 @@ def create_item(self, item: stac_types.Item, **kwargs) -> stac_types.Item: f"Item {item['id']} in collection {item['collection']} already exists" ) - # todo: check if collection exists, but cache - if not self.client.exists(index=COLLECTIONS_INDEX, id=item["collection"]): - raise ForeignKeyError(f"Collection {item['collection']} does not exist") - item = BulkTransactionsClient().preprocess_item(item, base_url) es_resp = self.client.index(