Skip to content

Pull database logic out of core.py and transaction.py #62

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 34 commits into from
Mar 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
aff6bf1
remove platform
jonhealy1 Mar 18, 2022
a6f6ef7
start isolating db logic
jonhealy1 Mar 18, 2022
22078aa
intersects logic
jonhealy1 Mar 18, 2022
26c1290
sort, count
jonhealy1 Mar 18, 2022
1e260c3
add search to all functions
jonhealy1 Mar 18, 2022
bbbc7f1
execute search
jonhealy1 Mar 18, 2022
514045f
tests pass
jonhealy1 Mar 18, 2022
f9dc26b
clean up db logic
jonhealy1 Mar 18, 2022
8234a64
move db logic into separate file
jonhealy1 Mar 18, 2022
fb643db
add fn comments
jonhealy1 Mar 18, 2022
8d4a062
clean up core
jonhealy1 Mar 18, 2022
7f62f10
re-link index
jonhealy1 Mar 18, 2022
ce2d5d2
run pre-commit
jonhealy1 Mar 18, 2022
ac07418
change es version back
jonhealy1 Mar 18, 2022
150fbd5
fixes
jonhealy1 Mar 18, 2022
247da4b
remove db settings from core.py
jonhealy1 Mar 18, 2022
e050ac7
items from feature collection
jonhealy1 Mar 19, 2022
2bad487
preprocess items
jonhealy1 Mar 19, 2022
520d88d
prep create item
jonhealy1 Mar 19, 2022
2819621
prep update item
jonhealy1 Mar 19, 2022
2225e76
delete item
jonhealy1 Mar 19, 2022
ad6f393
create collection
jonhealy1 Mar 19, 2022
cc96d23
prep update collection
jonhealy1 Mar 19, 2022
d7b0668
bulk sync
jonhealy1 Mar 19, 2022
f62de17
fix bulk sync
jonhealy1 Mar 19, 2022
83c2843
remomve db refs
jonhealy1 Mar 19, 2022
cf47504
add function descriptions
jonhealy1 Mar 19, 2022
6692323
run pre-commit
jonhealy1 Mar 19, 2022
33b9c49
add sleep to test
jonhealy1 Mar 19, 2022
e2d2f28
remove duplicate count
jonhealy1 Mar 19, 2022
20ac8d9
create find collection
jonhealy1 Mar 19, 2022
7770152
check collection exists
jonhealy1 Mar 19, 2022
827c68a
remove uneeded code
jonhealy1 Mar 19, 2022
cdd580b
clean up
jonhealy1 Mar 19, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ services:
build:
context: .
dockerfile: Dockerfile
platform: linux/amd64
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My linux laptop doesn't like this line?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting, i'm running on macos so probably why i didn't see it

environment:
- APP_HOST=0.0.0.0
- APP_PORT=8083
Expand Down
183 changes: 37 additions & 146 deletions stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
from urllib.parse import urljoin

import attr
import elasticsearch
from elasticsearch_dsl import Q, Search
from fastapi import HTTPException
from overrides import overrides

Expand All @@ -17,26 +15,17 @@
from stac_pydantic.shared import MimeTypes

from stac_fastapi.elasticsearch import serializers
from stac_fastapi.elasticsearch.config import ElasticsearchSettings
from stac_fastapi.elasticsearch.database_logic import DatabaseLogic
from stac_fastapi.elasticsearch.session import Session

# from stac_fastapi.elasticsearch.types.error_checks import ErrorChecks
from stac_fastapi.types.core import BaseCoreClient
from stac_fastapi.types.errors import NotFoundError
from stac_fastapi.types.stac import Collection, Collections, Item, ItemCollection

logger = logging.getLogger(__name__)

NumType = Union[float, int]

ITEMS_INDEX = "stac_items"
COLLECTIONS_INDEX = "stac_collections"


def mk_item_id(item_id: str, collection_id: str):
"""Make the Elasticsearch document _id value from the Item id and collection."""
return f"{item_id}|{collection_id}"


@attr.s
class CoreCrudClient(BaseCoreClient):
Expand All @@ -49,25 +38,14 @@ class CoreCrudClient(BaseCoreClient):
collection_serializer: Type[serializers.Serializer] = attr.ib(
default=serializers.CollectionSerializer
)
settings = ElasticsearchSettings()
client = settings.create_client
database = DatabaseLogic()

@overrides
def all_collections(self, **kwargs) -> Collections:
"""Read all collections from the database."""
base_url = str(kwargs["request"].base_url)
try:
collections = self.client.search(
index=COLLECTIONS_INDEX, query={"match_all": {}}
)
except elasticsearch.exceptions.NotFoundError:
raise NotFoundError("No collections exist")
serialized_collections = [
self.collection_serializer.db_to_stac(
collection["_source"], base_url=base_url
)
for collection in collections["hits"]["hits"]
]
serialized_collections = self.database.get_all_collections(base_url=base_url)

links = [
{
"rel": Relations.root.value,
Expand All @@ -94,12 +72,8 @@ def all_collections(self, **kwargs) -> Collections:
def get_collection(self, collection_id: str, **kwargs) -> Collection:
"""Get collection by id."""
base_url = str(kwargs["request"].base_url)
try:
collection = self.client.get(index=COLLECTIONS_INDEX, id=collection_id)
except elasticsearch.exceptions.NotFoundError:
raise NotFoundError(f"Collection {collection_id} not found")

return self.collection_serializer.db_to_stac(collection["_source"], base_url)
collection = self.database.find_collection(collection_id=collection_id)
return self.collection_serializer.db_to_stac(collection, base_url)

@overrides
def item_collection(
Expand All @@ -108,24 +82,10 @@ def item_collection(
"""Read an item collection from the database."""
links = []
base_url = str(kwargs["request"].base_url)
search = Search(using=self.client, index="stac_items")

collection_filter = Q(
"bool", should=[Q("match_phrase", **{"collection": collection_id})]
serialized_children, count = self.database.get_item_collection(
collection_id=collection_id, limit=limit, base_url=base_url
)
search = search.query(collection_filter)
try:
count = search.count()
except elasticsearch.exceptions.NotFoundError:
raise NotFoundError("No items exist")
# search = search.sort({"id.keyword" : {"order" : "asc"}})
search = search.query()[0:limit]
collection_children = search.execute().to_dict()

serialized_children = [
self.item_serializer.db_to_stac(item["_source"], base_url=base_url)
for item in collection_children["hits"]["hits"]
]

context_obj = None
if self.extension_is_enabled("ContextExtension"):
Expand All @@ -146,15 +106,8 @@ def item_collection(
def get_item(self, item_id: str, collection_id: str, **kwargs) -> Item:
"""Get item by item id, collection id."""
base_url = str(kwargs["request"].base_url)
try:
item = self.client.get(
index=ITEMS_INDEX, id=mk_item_id(item_id, collection_id)
)
except elasticsearch.exceptions.NotFoundError:
raise NotFoundError(
f"Item {item_id} does not exist in Collection {collection_id}"
)
return self.item_serializer.db_to_stac(item["_source"], base_url)
item = self.database.get_one_item(item_id=item_id, collection_id=collection_id)
return self.item_serializer.db_to_stac(item, base_url)

@staticmethod
def _return_date(interval_str):
Expand Down Expand Up @@ -238,125 +191,63 @@ def get_search(

return resp

@staticmethod
def bbox2poly(b0, b1, b2, b3):
"""Transform bbox to polygon."""
poly = [[[b0, b1], [b2, b1], [b2, b3], [b0, b3], [b0, b1]]]
return poly

def post_search(self, search_request: Search, **kwargs) -> ItemCollection:
def post_search(self, search_request, **kwargs) -> ItemCollection:
"""POST search catalog."""
base_url = str(kwargs["request"].base_url)
search = (
Search()
.using(self.client)
.index(ITEMS_INDEX)
.sort(
{"properties.datetime": {"order": "desc"}},
{"id": {"order": "desc"}},
{"collection": {"order": "desc"}},
)
)
search = self.database.create_search_object()

if search_request.query:
if type(search_request.query) == str:
search_request.query = json.loads(search_request.query)
for (field_name, expr) in search_request.query.items():
field = "properties__" + field_name
for (op, value) in expr.items():
if op != "eq":
key_filter = {field: {f"{op}": value}}
search = search.query(Q("range", **key_filter))
else:
search = search.query("match_phrase", **{field: value})
search = self.database.create_query_filter(
search=search, op=op, field=field, value=value
)

if search_request.ids:
id_list = []
for item_id in search_request.ids:
id_list.append(Q("match_phrase", **{"id": item_id}))
id_filter = Q("bool", should=id_list)
search = search.query(id_filter)
search = self.database.search_ids(
search=search, item_ids=search_request.ids
)

if search_request.collections:
collection_list = []
for collection_id in search_request.collections:
collection_list.append(
Q("match_phrase", **{"collection": collection_id})
)
collection_filter = Q("bool", should=collection_list)
search = search.query(collection_filter)
search = self.database.search_collections(
search=search, collection_ids=search_request.collections
)

if search_request.datetime:
datetime_search = self._return_date(search_request.datetime)
if "eq" in datetime_search:
search = search.query(
"match_phrase", **{"properties__datetime": datetime_search["eq"]}
)
else:
search = search.filter(
"range", properties__datetime={"lte": datetime_search["lte"]}
)
search = search.filter(
"range", properties__datetime={"gte": datetime_search["gte"]}
)
search = self.database.search_datetime(
search=search, datetime_search=datetime_search
)

if search_request.bbox:
bbox = search_request.bbox
if len(bbox) == 6:
bbox = [bbox[0], bbox[1], bbox[3], bbox[4]]
poly = self.bbox2poly(bbox[0], bbox[1], bbox[2], bbox[3])

bbox_filter = Q(
{
"geo_shape": {
"geometry": {
"shape": {"type": "polygon", "coordinates": poly},
"relation": "intersects",
}
}
}
)
search = search.query(bbox_filter)

search = self.database.search_bbox(search=search, bbox=bbox)

if search_request.intersects:
intersect_filter = Q(
{
"geo_shape": {
"geometry": {
"shape": {
"type": search_request.intersects.type.lower(),
"coordinates": search_request.intersects.coordinates,
},
"relation": "intersects",
}
}
}
self.database.search_intersects(
search=search, intersects=search_request.intersects
)
search = search.query(intersect_filter)

if search_request.sortby:
for sort in search_request.sortby:
if sort.field == "datetime":
sort.field = "properties__datetime"
field = sort.field + ".keyword"
search = search.sort({field: {"order": sort.direction}})
search = self.database.sort_field(
search=search, field=field, direction=sort.direction
)

try:
count = search.count()
except elasticsearch.exceptions.NotFoundError:
raise NotFoundError("No items exist")

# search = search.sort({"id.keyword" : {"order" : "asc"}})
search = search.query()[0 : search_request.limit]
response = search.execute().to_dict()

if len(response["hits"]["hits"]) > 0:
response_features = [
self.item_serializer.db_to_stac(item["_source"], base_url=base_url)
for item in response["hits"]["hits"]
]
else:
response_features = []
count = self.database.search_count(search=search)

response_features = self.database.execute_search(
search=search, limit=search_request.limit, base_url=base_url
)

# if self.extension_is_enabled("FieldsExtension"):
# if search_request.query is not None:
Expand Down Expand Up @@ -384,7 +275,7 @@ def post_search(self, search_request: Search, **kwargs) -> ItemCollection:
else:
limit = 10
response_features = response_features[0:limit]
limit = 10

context_obj = None
if self.extension_is_enabled("ContextExtension"):
context_obj = {
Expand Down
Loading