Skip to content

create indexes at startup rather than when ingesting, add collection index schema #57

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from stac_fastapi.elasticsearch.config import ElasticsearchSettings
from stac_fastapi.elasticsearch.core import CoreCrudClient
from stac_fastapi.elasticsearch.extensions import QueryExtension
from stac_fastapi.elasticsearch.indexes import IndexesClient
from stac_fastapi.elasticsearch.session import Session
from stac_fastapi.elasticsearch.transactions import (
BulkTransactionsClient,
Expand Down Expand Up @@ -43,6 +44,11 @@
app = api.app


@app.on_event("startup")
async def _startup_event():
IndexesClient().create_indexes()


def run():
"""Run app from command line using uvicorn if available."""
try:
Expand Down
29 changes: 15 additions & 14 deletions stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import elasticsearch
from elasticsearch_dsl import Q, Search
from fastapi import HTTPException
from overrides import overrides

# from geojson_pydantic.geometries import Polygon
from pydantic import ValidationError
Expand All @@ -22,13 +23,15 @@
# from stac_fastapi.elasticsearch.types.error_checks import ErrorChecks
from stac_fastapi.types.core import BaseCoreClient
from stac_fastapi.types.errors import NotFoundError
from stac_fastapi.types.search import BaseSearchPostRequest
from stac_fastapi.types.stac import Collection, Collections, Item, ItemCollection

logger = logging.getLogger(__name__)

NumType = Union[float, int]

ITEMS_INDEX = "stac_items"
COLLECTIONS_INDEX = "stac_collections"


@attr.s
class CoreCrudClient(BaseCoreClient):
Expand All @@ -44,17 +47,13 @@ class CoreCrudClient(BaseCoreClient):
settings = ElasticsearchSettings()
client = settings.create_client

@staticmethod
def _lookup_id(id: str, table, session):
"""Lookup row by id."""
pass

@overrides
def all_collections(self, **kwargs) -> Collections:
"""Read all collections from the database."""
base_url = str(kwargs["request"].base_url)
try:
collections = self.client.search(
index="stac_collections", doc_type="_doc", query={"match_all": {}}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doc_type is deprecated, and is assumed to be _doc

index=COLLECTIONS_INDEX, query={"match_all": {}}
)
except elasticsearch.exceptions.NotFoundError:
raise NotFoundError("No collections exist")
Expand Down Expand Up @@ -86,18 +85,20 @@ def all_collections(self, **kwargs) -> Collections:
)
return collection_list

@overrides
def get_collection(self, collection_id: str, **kwargs) -> Collection:
"""Get collection by id."""
base_url = str(kwargs["request"].base_url)
try:
collection = self.client.get(index="stac_collections", id=collection_id)
collection = self.client.get(index=COLLECTIONS_INDEX, id=collection_id)
except elasticsearch.exceptions.NotFoundError:
raise NotFoundError(f"Collection {collection_id} not found")

return self.collection_serializer.db_to_stac(collection["_source"], base_url)

@overrides
def item_collection(
self, collection_id: str, limit: int = 10, **kwargs
self, collection_id: str, limit: int = 10, token: str = None, **kwargs
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated this signature wrt the ABC

) -> ItemCollection:
"""Read an item collection from the database."""
links = []
Expand Down Expand Up @@ -136,11 +137,12 @@ def item_collection(
context=context_obj,
)

@overrides
def get_item(self, item_id: str, collection_id: str, **kwargs) -> Item:
"""Get item by item id, collection id."""
base_url = str(kwargs["request"].base_url)
try:
item = self.client.get(index="stac_items", id=item_id)
item = self.client.get(index=ITEMS_INDEX, id=item_id)
except elasticsearch.exceptions.NotFoundError:
raise NotFoundError(
f"Item {item_id} does not exist in Collection {collection_id}"
Expand Down Expand Up @@ -171,6 +173,7 @@ def _return_date(interval_str):

return {"lte": end_date, "gte": start_date}

@overrides
def get_search(
self,
collections: Optional[List[str]] = None,
Expand Down Expand Up @@ -234,15 +237,13 @@ def bbox2poly(b0, b1, b2, b3):
poly = [[[b0, b1], [b2, b1], [b2, b3], [b0, b3], [b0, b1]]]
return poly

def post_search(
self, search_request: BaseSearchPostRequest, **kwargs
) -> ItemCollection:
def post_search(self, search_request: Search, **kwargs) -> ItemCollection:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated wrt the ABC

"""POST search catalog."""
base_url = str(kwargs["request"].base_url)
search = (
Search()
.using(self.client)
.index("stac_items")
.index(ITEMS_INDEX)
.sort(
{"properties.datetime": {"order": "desc"}},
{"id": {"order": "desc"}},
Expand Down
115 changes: 115 additions & 0 deletions stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/indexes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
"""index management client."""

import logging

import attr

from stac_fastapi.elasticsearch.config import ElasticsearchSettings
from stac_fastapi.elasticsearch.core import COLLECTIONS_INDEX, ITEMS_INDEX
from stac_fastapi.elasticsearch.session import Session

logger = logging.getLogger(__name__)


@attr.s
class IndexesClient:
"""Elasticsearch client to handle index creation."""

session: Session = attr.ib(default=attr.Factory(Session.create_from_env))
client = ElasticsearchSettings().create_client

ES_MAPPINGS_DYNAMIC_TEMPLATES = [
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copied from transactions.py.

# Common https://github.com/radiantearth/stac-spec/blob/master/item-spec/common-metadata.md
{
"descriptions": {
"match_mapping_type": "string",
"match": "description",
"mapping": {"type": "text"},
}
},
{
"titles": {
"match_mapping_type": "string",
"match": "title",
"mapping": {"type": "text"},
}
},
# Projection Extension https://github.com/stac-extensions/projection
{"proj_epsg": {"match": "proj:epsg", "mapping": {"type": "integer"}}},
{
"proj_projjson": {
"match": "proj:projjson",
"mapping": {"type": "object", "enabled": False},
}
},
{
"proj_centroid": {
"match": "proj:centroid",
"mapping": {"type": "geo_point"},
}
},
{
"proj_geometry": {
"match": "proj:geometry",
"mapping": {"type": "geo_shape"},
}
},
{
"no_index_href": {
"match": "href",
"mapping": {"type": "text", "index": False},
}
},
# Default all other strings not otherwise specified to keyword
{"strings": {"match_mapping_type": "string", "mapping": {"type": "keyword"}}},
{"numerics": {"match_mapping_type": "long", "mapping": {"type": "float"}}},
]

ES_ITEMS_MAPPINGS = {
"numeric_detection": False,
"dynamic_templates": ES_MAPPINGS_DYNAMIC_TEMPLATES,
"properties": {
"geometry": {"type": "geo_shape"},
"assets": {"type": "object", "enabled": False},
"links": {"type": "object", "enabled": False},
"properties": {
"type": "object",
"properties": {
# Common https://github.com/radiantearth/stac-spec/blob/master/item-spec/common-metadata.md
"datetime": {"type": "date"},
"start_datetime": {"type": "date"},
"end_datetime": {"type": "date"},
"created": {"type": "date"},
"updated": {"type": "date"},
# Satellite Extension https://github.com/stac-extensions/sat
"sat:absolute_orbit": {"type": "integer"},
"sat:relative_orbit": {"type": "integer"},
},
},
},
}

ES_COLLECTIONS_MAPPINGS = {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a mapping for collections -- same one used in stac-server

"numeric_detection": False,
"dynamic_templates": ES_MAPPINGS_DYNAMIC_TEMPLATES,
"properties": {
"extent.spatial.bbox": {"type": "long"},
"extent.temporal.interval": {"type": "date"},
"providers": {"type": "object", "enabled": False},
"links": {"type": "object", "enabled": False},
"item_assets": {"type": "object", "enabled": False},
},
}

def create_indexes(self):
"""Create the index for Items and Collections."""
self.client.indices.create(
index=ITEMS_INDEX,
body={"mappings": self.ES_ITEMS_MAPPINGS},
ignore=400, # ignore 400 already exists code
)
self.client.indices.create(
index=COLLECTIONS_INDEX,
body={"mappings": self.ES_COLLECTIONS_MAPPINGS},
ignore=400, # ignore 400 already exists code
)
Loading