diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/app.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/app.py index dbf9a8c5..7f37f1d3 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/app.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/app.py @@ -4,6 +4,7 @@ from stac_fastapi.elasticsearch.config import ElasticsearchSettings from stac_fastapi.elasticsearch.core import CoreCrudClient from stac_fastapi.elasticsearch.extensions import QueryExtension +from stac_fastapi.elasticsearch.indexes import IndexesClient from stac_fastapi.elasticsearch.session import Session from stac_fastapi.elasticsearch.transactions import ( BulkTransactionsClient, @@ -43,6 +44,11 @@ app = api.app +@app.on_event("startup") +async def _startup_event(): + IndexesClient().create_indexes() + + def run(): """Run app from command line using uvicorn if available.""" try: diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py index ef7a7a13..f59431de 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py @@ -9,6 +9,7 @@ import elasticsearch from elasticsearch_dsl import Q, Search from fastapi import HTTPException +from overrides import overrides # from geojson_pydantic.geometries import Polygon from pydantic import ValidationError @@ -22,13 +23,15 @@ # from stac_fastapi.elasticsearch.types.error_checks import ErrorChecks from stac_fastapi.types.core import BaseCoreClient from stac_fastapi.types.errors import NotFoundError -from stac_fastapi.types.search import BaseSearchPostRequest from stac_fastapi.types.stac import Collection, Collections, Item, ItemCollection logger = logging.getLogger(__name__) NumType = Union[float, int] +ITEMS_INDEX = "stac_items" +COLLECTIONS_INDEX = "stac_collections" + @attr.s class CoreCrudClient(BaseCoreClient): @@ -44,17 +47,13 @@ class CoreCrudClient(BaseCoreClient): settings = ElasticsearchSettings() client = settings.create_client - @staticmethod - def _lookup_id(id: str, table, session): - """Lookup row by id.""" - pass - + @overrides def all_collections(self, **kwargs) -> Collections: """Read all collections from the database.""" base_url = str(kwargs["request"].base_url) try: collections = self.client.search( - index="stac_collections", doc_type="_doc", query={"match_all": {}} + index=COLLECTIONS_INDEX, query={"match_all": {}} ) except elasticsearch.exceptions.NotFoundError: raise NotFoundError("No collections exist") @@ -86,18 +85,20 @@ def all_collections(self, **kwargs) -> Collections: ) return collection_list + @overrides def get_collection(self, collection_id: str, **kwargs) -> Collection: """Get collection by id.""" base_url = str(kwargs["request"].base_url) try: - collection = self.client.get(index="stac_collections", id=collection_id) + collection = self.client.get(index=COLLECTIONS_INDEX, id=collection_id) except elasticsearch.exceptions.NotFoundError: raise NotFoundError(f"Collection {collection_id} not found") return self.collection_serializer.db_to_stac(collection["_source"], base_url) + @overrides def item_collection( - self, collection_id: str, limit: int = 10, **kwargs + self, collection_id: str, limit: int = 10, token: str = None, **kwargs ) -> ItemCollection: """Read an item collection from the database.""" links = [] @@ -136,11 +137,12 @@ def item_collection( context=context_obj, ) + @overrides def get_item(self, item_id: str, collection_id: str, **kwargs) -> Item: """Get item by item id, collection id.""" base_url = str(kwargs["request"].base_url) try: - item = self.client.get(index="stac_items", id=item_id) + item = self.client.get(index=ITEMS_INDEX, id=item_id) except elasticsearch.exceptions.NotFoundError: raise NotFoundError( f"Item {item_id} does not exist in Collection {collection_id}" @@ -171,6 +173,7 @@ def _return_date(interval_str): return {"lte": end_date, "gte": start_date} + @overrides def get_search( self, collections: Optional[List[str]] = None, @@ -234,15 +237,13 @@ def bbox2poly(b0, b1, b2, b3): poly = [[[b0, b1], [b2, b1], [b2, b3], [b0, b3], [b0, b1]]] return poly - def post_search( - self, search_request: BaseSearchPostRequest, **kwargs - ) -> ItemCollection: + def post_search(self, search_request: Search, **kwargs) -> ItemCollection: """POST search catalog.""" base_url = str(kwargs["request"].base_url) search = ( Search() .using(self.client) - .index("stac_items") + .index(ITEMS_INDEX) .sort( {"properties.datetime": {"order": "desc"}}, {"id": {"order": "desc"}}, diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/indexes.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/indexes.py new file mode 100644 index 00000000..4629092e --- /dev/null +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/indexes.py @@ -0,0 +1,115 @@ +"""index management client.""" + +import logging + +import attr + +from stac_fastapi.elasticsearch.config import ElasticsearchSettings +from stac_fastapi.elasticsearch.core import COLLECTIONS_INDEX, ITEMS_INDEX +from stac_fastapi.elasticsearch.session import Session + +logger = logging.getLogger(__name__) + + +@attr.s +class IndexesClient: + """Elasticsearch client to handle index creation.""" + + session: Session = attr.ib(default=attr.Factory(Session.create_from_env)) + client = ElasticsearchSettings().create_client + + ES_MAPPINGS_DYNAMIC_TEMPLATES = [ + # Common https://github.com/radiantearth/stac-spec/blob/master/item-spec/common-metadata.md + { + "descriptions": { + "match_mapping_type": "string", + "match": "description", + "mapping": {"type": "text"}, + } + }, + { + "titles": { + "match_mapping_type": "string", + "match": "title", + "mapping": {"type": "text"}, + } + }, + # Projection Extension https://github.com/stac-extensions/projection + {"proj_epsg": {"match": "proj:epsg", "mapping": {"type": "integer"}}}, + { + "proj_projjson": { + "match": "proj:projjson", + "mapping": {"type": "object", "enabled": False}, + } + }, + { + "proj_centroid": { + "match": "proj:centroid", + "mapping": {"type": "geo_point"}, + } + }, + { + "proj_geometry": { + "match": "proj:geometry", + "mapping": {"type": "geo_shape"}, + } + }, + { + "no_index_href": { + "match": "href", + "mapping": {"type": "text", "index": False}, + } + }, + # Default all other strings not otherwise specified to keyword + {"strings": {"match_mapping_type": "string", "mapping": {"type": "keyword"}}}, + {"numerics": {"match_mapping_type": "long", "mapping": {"type": "float"}}}, + ] + + ES_ITEMS_MAPPINGS = { + "numeric_detection": False, + "dynamic_templates": ES_MAPPINGS_DYNAMIC_TEMPLATES, + "properties": { + "geometry": {"type": "geo_shape"}, + "assets": {"type": "object", "enabled": False}, + "links": {"type": "object", "enabled": False}, + "properties": { + "type": "object", + "properties": { + # Common https://github.com/radiantearth/stac-spec/blob/master/item-spec/common-metadata.md + "datetime": {"type": "date"}, + "start_datetime": {"type": "date"}, + "end_datetime": {"type": "date"}, + "created": {"type": "date"}, + "updated": {"type": "date"}, + # Satellite Extension https://github.com/stac-extensions/sat + "sat:absolute_orbit": {"type": "integer"}, + "sat:relative_orbit": {"type": "integer"}, + }, + }, + }, + } + + ES_COLLECTIONS_MAPPINGS = { + "numeric_detection": False, + "dynamic_templates": ES_MAPPINGS_DYNAMIC_TEMPLATES, + "properties": { + "extent.spatial.bbox": {"type": "long"}, + "extent.temporal.interval": {"type": "date"}, + "providers": {"type": "object", "enabled": False}, + "links": {"type": "object", "enabled": False}, + "item_assets": {"type": "object", "enabled": False}, + }, + } + + def create_indexes(self): + """Create the index for Items and Collections.""" + self.client.indices.create( + index=ITEMS_INDEX, + body={"mappings": self.ES_ITEMS_MAPPINGS}, + ignore=400, # ignore 400 already exists code + ) + self.client.indices.create( + index=COLLECTIONS_INDEX, + body={"mappings": self.ES_COLLECTIONS_MAPPINGS}, + ignore=400, # ignore 400 already exists code + ) diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/transactions.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/transactions.py index 236f31fa..0ec82ea8 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/transactions.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/transactions.py @@ -10,6 +10,7 @@ from overrides import overrides from stac_fastapi.elasticsearch.config import ElasticsearchSettings +from stac_fastapi.elasticsearch.core import COLLECTIONS_INDEX, ITEMS_INDEX from stac_fastapi.elasticsearch.serializers import CollectionSerializer, ItemSerializer from stac_fastapi.elasticsearch.session import Session from stac_fastapi.extensions.third_party.bulk_transactions import ( @@ -32,90 +33,10 @@ class TransactionsClient(BaseTransactionsClient): settings = ElasticsearchSettings() client = settings.create_client - ES_MAPPINGS_DYNAMIC_TEMPLATES = [ - # Common https://github.com/radiantearth/stac-spec/blob/master/item-spec/common-metadata.md - { - "descriptions": { - "match_mapping_type": "string", - "match": "description", - "mapping": {"type": "text"}, - } - }, - { - "titles": { - "match_mapping_type": "string", - "match": "title", - "mapping": {"type": "text"}, - } - }, - # Projection Extension https://github.com/stac-extensions/projection - {"proj_epsg": {"match": "proj:epsg", "mapping": {"type": "integer"}}}, - { - "proj_projjson": { - "match": "proj:projjson", - "mapping": {"type": "object", "enabled": False}, - } - }, - { - "proj_centroid": { - "match": "proj:centroid", - "mapping": {"type": "geo_point"}, - } - }, - { - "proj_geometry": { - "match": "proj:geometry", - "mapping": {"type": "geo_shape"}, - } - }, - { - "no_index_href": { - "match": "href", - "mapping": {"type": "text", "index": False}, - } - }, - # Default all other strings not otherwise specified to keyword - {"strings": {"match_mapping_type": "string", "mapping": {"type": "keyword"}}}, - {"numerics": {"match_mapping_type": "long", "mapping": {"type": "float"}}}, - ] - - ES_MAPPINGS = { - "numeric_detection": False, - "dynamic_templates": ES_MAPPINGS_DYNAMIC_TEMPLATES, - "properties": { - "geometry": {"type": "geo_shape"}, - "assets": {"type": "object", "enabled": False}, - "links": {"type": "object", "enabled": False}, - "properties": { - "type": "object", - "properties": { - # Common https://github.com/radiantearth/stac-spec/blob/master/item-spec/common-metadata.md - "datetime": {"type": "date"}, - "start_datetime": {"type": "date"}, - "end_datetime": {"type": "date"}, - "created": {"type": "date"}, - "updated": {"type": "date"}, - # Satellite Extension https://github.com/stac-extensions/sat - "sat:absolute_orbit": {"type": "integer"}, - "sat:relative_orbit": {"type": "integer"}, - }, - }, - }, - } - - def create_item_index(self): - """Create the index for Items.""" - self.client.indices.create( - index="stac_items", - body={"mappings": self.ES_MAPPINGS}, - ignore=400, # ignore 400 already exists code - ) - @overrides def create_item(self, item: stac_types.Item, **kwargs) -> stac_types.Item: """Create item.""" base_url = str(kwargs["request"].base_url) - self.create_item_index() # If a feature collection is posted if item["type"] == "FeatureCollection": @@ -130,10 +51,10 @@ def create_item(self, item: stac_types.Item, **kwargs) -> stac_types.Item: return return_msg # If a single item is posted - if not self.client.exists(index="stac_collections", id=item["collection"]): + if not self.client.exists(index=COLLECTIONS_INDEX, id=item["collection"]): raise ForeignKeyError(f"Collection {item['collection']} does not exist") - if self.client.exists(index="stac_items", id=item["id"]): + if self.client.exists(index=ITEMS_INDEX, id=item["id"]): raise ConflictError( f"Item {item['id']} in collection {item['collection']} already exists" ) @@ -141,7 +62,7 @@ def create_item(self, item: stac_types.Item, **kwargs) -> stac_types.Item: data = ItemSerializer.stac_to_db(item, base_url) self.client.index( - index="stac_items", doc_type="_doc", id=item["id"], document=data + index=ITEMS_INDEX, doc_type="_doc", id=item["id"], document=data ) return ItemSerializer.db_to_stac(item, base_url) @@ -152,15 +73,15 @@ def update_item(self, item: stac_types.Item, **kwargs) -> stac_types.Item: now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") item["properties"]["updated"] = str(now) - if not self.client.exists(index="stac_collections", id=item["collection"]): + if not self.client.exists(index=COLLECTIONS_INDEX, id=item["collection"]): raise ForeignKeyError(f"Collection {item['collection']} does not exist") - if not self.client.exists(index="stac_items", id=item["id"]): + if not self.client.exists(index=ITEMS_INDEX, id=item["id"]): raise NotFoundError( f"Item {item['id']} in collection {item['collection']} doesn't exist" ) self.delete_item(item["id"], item["collection"]) self.create_item(item, **kwargs) - # self.client.update(index="stac_items",doc_type='_doc',id=model["id"], + # self.client.update(index=ITEMS_INDEX,doc_type='_doc',id=model["id"], # body=model) return ItemSerializer.db_to_stac(item, base_url) @@ -170,10 +91,10 @@ def delete_item( ) -> stac_types.Item: """Delete item.""" try: - _ = self.client.get(index="stac_items", id=item_id) + _ = self.client.get(index=ITEMS_INDEX, id=item_id) except elasticsearch.exceptions.NotFoundError: raise NotFoundError(f"Item {item_id} not found") - self.client.delete(index="stac_items", doc_type="_doc", id=item_id) + self.client.delete(index=ITEMS_INDEX, doc_type="_doc", id=item_id) @overrides def create_collection( @@ -186,12 +107,10 @@ def create_collection( ).create_links() collection["links"] = collection_links - self.create_item_index() - - if self.client.exists(index="stac_collections", id=collection["id"]): + if self.client.exists(index=COLLECTIONS_INDEX, id=collection["id"]): raise ConflictError(f"Collection {collection['id']} already exists") self.client.index( - index="stac_collections", + index=COLLECTIONS_INDEX, doc_type="_doc", id=collection["id"], document=collection, @@ -205,7 +124,7 @@ def update_collection( """Update collection.""" base_url = str(kwargs["request"].base_url) try: - _ = self.client.get(index="stac_collections", id=collection["id"]) + _ = self.client.get(index=COLLECTIONS_INDEX, id=collection["id"]) except elasticsearch.exceptions.NotFoundError: raise NotFoundError(f"Collection {collection['id']} not found") self.delete_collection(collection["id"]) @@ -217,10 +136,10 @@ def update_collection( def delete_collection(self, collection_id: str, **kwargs) -> stac_types.Collection: """Delete collection.""" try: - _ = self.client.get(index="stac_collections", id=collection_id) + _ = self.client.get(index=COLLECTIONS_INDEX, id=collection_id) except elasticsearch.exceptions.NotFoundError: raise NotFoundError(f"Collection {collection_id} not found") - self.client.delete(index="stac_collections", doc_type="_doc", id=collection_id) + self.client.delete(index=COLLECTIONS_INDEX, doc_type="_doc", id=collection_id) @attr.s @@ -236,10 +155,10 @@ def __attrs_post_init__(self): def _preprocess_item(self, model: stac_types.Item, base_url) -> stac_types.Item: """Preprocess items to match data model.""" - if not self.client.exists(index="stac_collections", id=model["collection"]): + if not self.client.exists(index=COLLECTIONS_INDEX, id=model["collection"]): raise ForeignKeyError(f"Collection {model['collection']} does not exist") - if self.client.exists(index="stac_items", id=model["id"]): + if self.client.exists(index=ITEMS_INDEX, id=model["id"]): raise ConflictError( f"Item {model['id']} in collection {model['collection']} already exists" ) @@ -249,9 +168,7 @@ def _preprocess_item(self, model: stac_types.Item, base_url) -> stac_types.Item: def bulk_sync(self, processed_items): """Elasticsearch bulk insertion.""" - actions = [ - {"_index": "stac_items", "_source": item} for item in processed_items - ] + actions = [{"_index": ITEMS_INDEX, "_source": item} for item in processed_items] helpers.bulk(self.client, actions) @overrides @@ -259,8 +176,6 @@ def bulk_item_insert( self, items: Items, chunk_size: Optional[int] = None, **kwargs ) -> str: """Bulk item insertion using es.""" - transactions_client = TransactionsClient() - transactions_client.create_item_index() try: base_url = str(kwargs["request"].base_url) except Exception: diff --git a/stac_fastapi/elasticsearch/tests/conftest.py b/stac_fastapi/elasticsearch/tests/conftest.py index 1fa22274..c207117c 100644 --- a/stac_fastapi/elasticsearch/tests/conftest.py +++ b/stac_fastapi/elasticsearch/tests/conftest.py @@ -10,6 +10,7 @@ from stac_fastapi.elasticsearch.config import ElasticsearchSettings from stac_fastapi.elasticsearch.core import CoreCrudClient from stac_fastapi.elasticsearch.extensions import QueryExtension +from stac_fastapi.elasticsearch.indexes import IndexesClient from stac_fastapi.elasticsearch.transactions import ( BulkTransactionsClient, TransactionsClient, @@ -143,6 +144,8 @@ def api_client(): @pytest.fixture def app_client(api_client, load_test_data): + IndexesClient().create_indexes() + coll = load_test_data("test_collection.json") client = TransactionsClient( session=None,