Skip to content

Commit 363e2cc

Browse files
authored
Merge pull request #62 from stac-utils/modular_db
Pull database logic out of core.py and transaction.py
2 parents 7e83c51 + cdd580b commit 363e2cc

File tree

6 files changed

+360
-230
lines changed

6 files changed

+360
-230
lines changed

docker-compose.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ services:
88
build:
99
context: .
1010
dockerfile: Dockerfile
11-
platform: linux/amd64
1211
environment:
1312
- APP_HOST=0.0.0.0
1413
- APP_PORT=8083

stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py

Lines changed: 37 additions & 146 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@
66
from urllib.parse import urljoin
77

88
import attr
9-
import elasticsearch
10-
from elasticsearch_dsl import Q, Search
119
from fastapi import HTTPException
1210
from overrides import overrides
1311

@@ -17,26 +15,17 @@
1715
from stac_pydantic.shared import MimeTypes
1816

1917
from stac_fastapi.elasticsearch import serializers
20-
from stac_fastapi.elasticsearch.config import ElasticsearchSettings
18+
from stac_fastapi.elasticsearch.database_logic import DatabaseLogic
2119
from stac_fastapi.elasticsearch.session import Session
2220

2321
# from stac_fastapi.elasticsearch.types.error_checks import ErrorChecks
2422
from stac_fastapi.types.core import BaseCoreClient
25-
from stac_fastapi.types.errors import NotFoundError
2623
from stac_fastapi.types.stac import Collection, Collections, Item, ItemCollection
2724

2825
logger = logging.getLogger(__name__)
2926

3027
NumType = Union[float, int]
3128

32-
ITEMS_INDEX = "stac_items"
33-
COLLECTIONS_INDEX = "stac_collections"
34-
35-
36-
def mk_item_id(item_id: str, collection_id: str):
37-
"""Make the Elasticsearch document _id value from the Item id and collection."""
38-
return f"{item_id}|{collection_id}"
39-
4029

4130
@attr.s
4231
class CoreCrudClient(BaseCoreClient):
@@ -49,25 +38,14 @@ class CoreCrudClient(BaseCoreClient):
4938
collection_serializer: Type[serializers.Serializer] = attr.ib(
5039
default=serializers.CollectionSerializer
5140
)
52-
settings = ElasticsearchSettings()
53-
client = settings.create_client
41+
database = DatabaseLogic()
5442

5543
@overrides
5644
def all_collections(self, **kwargs) -> Collections:
5745
"""Read all collections from the database."""
5846
base_url = str(kwargs["request"].base_url)
59-
try:
60-
collections = self.client.search(
61-
index=COLLECTIONS_INDEX, query={"match_all": {}}
62-
)
63-
except elasticsearch.exceptions.NotFoundError:
64-
raise NotFoundError("No collections exist")
65-
serialized_collections = [
66-
self.collection_serializer.db_to_stac(
67-
collection["_source"], base_url=base_url
68-
)
69-
for collection in collections["hits"]["hits"]
70-
]
47+
serialized_collections = self.database.get_all_collections(base_url=base_url)
48+
7149
links = [
7250
{
7351
"rel": Relations.root.value,
@@ -94,12 +72,8 @@ def all_collections(self, **kwargs) -> Collections:
9472
def get_collection(self, collection_id: str, **kwargs) -> Collection:
9573
"""Get collection by id."""
9674
base_url = str(kwargs["request"].base_url)
97-
try:
98-
collection = self.client.get(index=COLLECTIONS_INDEX, id=collection_id)
99-
except elasticsearch.exceptions.NotFoundError:
100-
raise NotFoundError(f"Collection {collection_id} not found")
101-
102-
return self.collection_serializer.db_to_stac(collection["_source"], base_url)
75+
collection = self.database.find_collection(collection_id=collection_id)
76+
return self.collection_serializer.db_to_stac(collection, base_url)
10377

10478
@overrides
10579
def item_collection(
@@ -108,24 +82,10 @@ def item_collection(
10882
"""Read an item collection from the database."""
10983
links = []
11084
base_url = str(kwargs["request"].base_url)
111-
search = Search(using=self.client, index="stac_items")
11285

113-
collection_filter = Q(
114-
"bool", should=[Q("match_phrase", **{"collection": collection_id})]
86+
serialized_children, count = self.database.get_item_collection(
87+
collection_id=collection_id, limit=limit, base_url=base_url
11588
)
116-
search = search.query(collection_filter)
117-
try:
118-
count = search.count()
119-
except elasticsearch.exceptions.NotFoundError:
120-
raise NotFoundError("No items exist")
121-
# search = search.sort({"id.keyword" : {"order" : "asc"}})
122-
search = search.query()[0:limit]
123-
collection_children = search.execute().to_dict()
124-
125-
serialized_children = [
126-
self.item_serializer.db_to_stac(item["_source"], base_url=base_url)
127-
for item in collection_children["hits"]["hits"]
128-
]
12989

13090
context_obj = None
13191
if self.extension_is_enabled("ContextExtension"):
@@ -146,15 +106,8 @@ def item_collection(
146106
def get_item(self, item_id: str, collection_id: str, **kwargs) -> Item:
147107
"""Get item by item id, collection id."""
148108
base_url = str(kwargs["request"].base_url)
149-
try:
150-
item = self.client.get(
151-
index=ITEMS_INDEX, id=mk_item_id(item_id, collection_id)
152-
)
153-
except elasticsearch.exceptions.NotFoundError:
154-
raise NotFoundError(
155-
f"Item {item_id} does not exist in Collection {collection_id}"
156-
)
157-
return self.item_serializer.db_to_stac(item["_source"], base_url)
109+
item = self.database.get_one_item(item_id=item_id, collection_id=collection_id)
110+
return self.item_serializer.db_to_stac(item, base_url)
158111

159112
@staticmethod
160113
def _return_date(interval_str):
@@ -238,125 +191,63 @@ def get_search(
238191

239192
return resp
240193

241-
@staticmethod
242-
def bbox2poly(b0, b1, b2, b3):
243-
"""Transform bbox to polygon."""
244-
poly = [[[b0, b1], [b2, b1], [b2, b3], [b0, b3], [b0, b1]]]
245-
return poly
246-
247-
def post_search(self, search_request: Search, **kwargs) -> ItemCollection:
194+
def post_search(self, search_request, **kwargs) -> ItemCollection:
248195
"""POST search catalog."""
249196
base_url = str(kwargs["request"].base_url)
250-
search = (
251-
Search()
252-
.using(self.client)
253-
.index(ITEMS_INDEX)
254-
.sort(
255-
{"properties.datetime": {"order": "desc"}},
256-
{"id": {"order": "desc"}},
257-
{"collection": {"order": "desc"}},
258-
)
259-
)
197+
search = self.database.create_search_object()
260198

261199
if search_request.query:
262200
if type(search_request.query) == str:
263201
search_request.query = json.loads(search_request.query)
264202
for (field_name, expr) in search_request.query.items():
265203
field = "properties__" + field_name
266204
for (op, value) in expr.items():
267-
if op != "eq":
268-
key_filter = {field: {f"{op}": value}}
269-
search = search.query(Q("range", **key_filter))
270-
else:
271-
search = search.query("match_phrase", **{field: value})
205+
search = self.database.create_query_filter(
206+
search=search, op=op, field=field, value=value
207+
)
272208

273209
if search_request.ids:
274-
id_list = []
275-
for item_id in search_request.ids:
276-
id_list.append(Q("match_phrase", **{"id": item_id}))
277-
id_filter = Q("bool", should=id_list)
278-
search = search.query(id_filter)
210+
search = self.database.search_ids(
211+
search=search, item_ids=search_request.ids
212+
)
279213

280214
if search_request.collections:
281-
collection_list = []
282-
for collection_id in search_request.collections:
283-
collection_list.append(
284-
Q("match_phrase", **{"collection": collection_id})
285-
)
286-
collection_filter = Q("bool", should=collection_list)
287-
search = search.query(collection_filter)
215+
search = self.database.search_collections(
216+
search=search, collection_ids=search_request.collections
217+
)
288218

289219
if search_request.datetime:
290220
datetime_search = self._return_date(search_request.datetime)
291-
if "eq" in datetime_search:
292-
search = search.query(
293-
"match_phrase", **{"properties__datetime": datetime_search["eq"]}
294-
)
295-
else:
296-
search = search.filter(
297-
"range", properties__datetime={"lte": datetime_search["lte"]}
298-
)
299-
search = search.filter(
300-
"range", properties__datetime={"gte": datetime_search["gte"]}
301-
)
221+
search = self.database.search_datetime(
222+
search=search, datetime_search=datetime_search
223+
)
302224

303225
if search_request.bbox:
304226
bbox = search_request.bbox
305227
if len(bbox) == 6:
306228
bbox = [bbox[0], bbox[1], bbox[3], bbox[4]]
307-
poly = self.bbox2poly(bbox[0], bbox[1], bbox[2], bbox[3])
308-
309-
bbox_filter = Q(
310-
{
311-
"geo_shape": {
312-
"geometry": {
313-
"shape": {"type": "polygon", "coordinates": poly},
314-
"relation": "intersects",
315-
}
316-
}
317-
}
318-
)
319-
search = search.query(bbox_filter)
229+
230+
search = self.database.search_bbox(search=search, bbox=bbox)
320231

321232
if search_request.intersects:
322-
intersect_filter = Q(
323-
{
324-
"geo_shape": {
325-
"geometry": {
326-
"shape": {
327-
"type": search_request.intersects.type.lower(),
328-
"coordinates": search_request.intersects.coordinates,
329-
},
330-
"relation": "intersects",
331-
}
332-
}
333-
}
233+
self.database.search_intersects(
234+
search=search, intersects=search_request.intersects
334235
)
335-
search = search.query(intersect_filter)
336236

337237
if search_request.sortby:
338238
for sort in search_request.sortby:
339239
if sort.field == "datetime":
340240
sort.field = "properties__datetime"
341241
field = sort.field + ".keyword"
342-
search = search.sort({field: {"order": sort.direction}})
242+
search = self.database.sort_field(
243+
search=search, field=field, direction=sort.direction
244+
)
343245

344-
try:
345-
count = search.count()
346-
except elasticsearch.exceptions.NotFoundError:
347-
raise NotFoundError("No items exist")
348-
349-
# search = search.sort({"id.keyword" : {"order" : "asc"}})
350-
search = search.query()[0 : search_request.limit]
351-
response = search.execute().to_dict()
352-
353-
if len(response["hits"]["hits"]) > 0:
354-
response_features = [
355-
self.item_serializer.db_to_stac(item["_source"], base_url=base_url)
356-
for item in response["hits"]["hits"]
357-
]
358-
else:
359-
response_features = []
246+
count = self.database.search_count(search=search)
247+
248+
response_features = self.database.execute_search(
249+
search=search, limit=search_request.limit, base_url=base_url
250+
)
360251

361252
# if self.extension_is_enabled("FieldsExtension"):
362253
# if search_request.query is not None:
@@ -384,7 +275,7 @@ def post_search(self, search_request: Search, **kwargs) -> ItemCollection:
384275
else:
385276
limit = 10
386277
response_features = response_features[0:limit]
387-
limit = 10
278+
388279
context_obj = None
389280
if self.extension_is_enabled("ContextExtension"):
390281
context_obj = {

0 commit comments

Comments
 (0)