diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py index 291e981e..c17b2dce 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py @@ -94,7 +94,7 @@ def get_collection(self, collection_id: str, **kwargs) -> Collection: return self.collection_serializer.db_to_stac(collection["_source"], base_url) def item_collection( - self, collection_id: str, limit: int = 10, token: str = None, **kwargs + self, collection_id: str, limit: int = 10, **kwargs ) -> ItemCollection: """Read an item collection from the database.""" links = [] diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/serializers.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/serializers.py index d016fcb3..ff098e49 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/serializers.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/serializers.py @@ -63,6 +63,9 @@ def db_to_stac(cls, collection: dict, base_url: str) -> stac_types.Collection: if original_links: collection_links += resolve_links(original_links, base_url) + if "providers" not in collection: + collection["providers"] = {} + return stac_types.Collection( type="Collection", id=collection["id"], diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/transactions.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/transactions.py index bedf833c..85c31bd7 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/transactions.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/transactions.py @@ -31,6 +31,26 @@ class TransactionsClient(BaseTransactionsClient): settings = ElasticsearchSettings() client = settings.create_client + def _create_item_index(self): + mapping = { + "mappings": { + "properties": { + "geometry": {"type": "geo_shape"}, + "id": {"type": "text", "fields": {"keyword": {"type": "keyword"}}}, + "properties__datetime": { + "type": "text", + "fields": {"keyword": {"type": "keyword"}}, + }, + } + } + } + + _ = self.client.indices.create( + index="stac_items", + body=mapping, + ignore=400, # ignore 400 already exists code + ) + def create_item(self, model: stac_types.Item, **kwargs): """Create item.""" base_url = str(kwargs["request"].base_url) @@ -59,24 +79,7 @@ def create_item(self, model: stac_types.Item, **kwargs): if "created" not in model["properties"]: model["properties"]["created"] = str(now) - mapping = { - "mappings": { - "properties": { - "geometry": {"type": "geo_shape"}, - "id": {"type": "text", "fields": {"keyword": {"type": "keyword"}}}, - "properties__datetime": { - "type": "text", - "fields": {"keyword": {"type": "keyword"}}, - }, - } - } - } - - _ = self.client.indices.create( - index="stac_items", - body=mapping, - ignore=400, # ignore 400 already exists code - ) + self._create_item_index() self.client.index( index="stac_items", doc_type="_doc", id=model["id"], document=model @@ -91,6 +94,8 @@ def create_collection(self, model: stac_types.Collection, **kwargs): ).create_links() model["links"] = collection_links + self._create_item_index() + if self.client.exists(index="stac_collections", id=model["id"]): raise ConflictError(f"Collection {model['id']} already exists") self.client.index( @@ -155,6 +160,26 @@ def __attrs_post_init__(self): settings = ElasticsearchSettings() self.client = settings.create_client + def _create_item_index(self): + mapping = { + "mappings": { + "properties": { + "geometry": {"type": "geo_shape"}, + "id": {"type": "text", "fields": {"keyword": {"type": "keyword"}}}, + "properties__datetime": { + "type": "text", + "fields": {"keyword": {"type": "keyword"}}, + }, + } + } + } + + _ = self.client.indices.create( + index="stac_items", + body=mapping, + ignore=400, # ignore 400 already exists code + ) + def _preprocess_item(self, model: stac_types.Item, base_url) -> stac_types.Item: """Preprocess items to match data model.""" item_links = ItemLinks( @@ -162,32 +187,52 @@ def _preprocess_item(self, model: stac_types.Item, base_url) -> stac_types.Item: ).create_links() model["links"] = item_links - # with self.client.start_session(causal_consistency=True) as session: - # error_check = ErrorChecks(session=session, client=self.client) - # error_check._check_collection_foreign_key(model) - # error_check._check_item_conflict(model) - # now = datetime.utcnow().strftime(DATETIME_RFC339) - # if "created" not in model["properties"]: - # model["properties"]["created"] = str(now) - # return model + if not self.client.exists(index="stac_collections", id=model["collection"]): + raise ForeignKeyError(f"Collection {model['collection']} does not exist") + + if self.client.exists(index="stac_items", id=model["id"]): + raise ConflictError( + f"Item {model['id']} in collection {model['collection']} already exists" + ) + + now = datetime.utcnow().strftime(DATETIME_RFC339) + if "created" not in model["properties"]: + model["properties"]["created"] = str(now) + + # elasticsearch doesn't like the fact that some values are float and some were int + if "eo:bands" in model["properties"]: + for wave in model["properties"]["eo:bands"]: + for k, v in wave.items(): + if type(v) != str: + v = float(v) + wave.update({k: v}) + return model def bulk_item_insert(self, items: Items, **kwargs) -> str: """Bulk item insertion using es.""" + self._create_item_index() try: base_url = str(kwargs["request"].base_url) except Exception: base_url = "" processed_items = [self._preprocess_item(item, base_url) for item in items] return_msg = f"Successfully added {len(processed_items)} items." - # with self.client.start_session(causal_consistency=True) as session: - # self.item_table.insert_many(processed_items, session=session) - # return return_msg - helpers.bulk( - self.client, - processed_items, - index="stac_items", - doc_type="_doc", - request_timeout=200, - ) + # helpers.bulk( + # self.client, + # processed_items, + # index="stac_items", + # doc_type="_doc", + # request_timeout=200, + # ) + + def bulk_sync(processed_items): + actions = [ + {"_index": "stac_items", "_source": item} for item in processed_items + ] + + helpers.bulk(self.client, actions) + + bulk_sync(processed_items) + return return_msg diff --git a/stac_fastapi/elasticsearch/tests/clients/test_elasticsearch.py b/stac_fastapi/elasticsearch/tests/clients/test_elasticsearch.py index 5f915d1f..942d1d37 100644 --- a/stac_fastapi/elasticsearch/tests/clients/test_elasticsearch.py +++ b/stac_fastapi/elasticsearch/tests/clients/test_elasticsearch.py @@ -1,3 +1,4 @@ +import time import uuid from copy import deepcopy from typing import Callable @@ -256,7 +257,7 @@ def test_delete_item( es_core.get_item(item["id"], item["collection"], request=MockStarletteRequest) -@pytest.mark.skip(reason="bulk not implemented") +# @pytest.mark.skip(reason="might need a larger timeout") def test_bulk_item_insert( es_core: CoreCrudClient, es_transactions: TransactionsClient, @@ -278,14 +279,14 @@ def test_bulk_item_insert( assert len(fc["features"]) == 0 es_bulk_transactions.bulk_item_insert(items=items) - + time.sleep(3) fc = es_core.item_collection(coll["id"], request=MockStarletteRequest) assert len(fc["features"]) == 10 - for item in items: - es_transactions.delete_item( - item["id"], item["collection"], request=MockStarletteRequest - ) + # for item in items: + # es_transactions.delete_item( + # item["id"], item["collection"], request=MockStarletteRequest + # ) @pytest.mark.skip(reason="Not working")