31
31
ITEMS_INDEX = "stac_items"
32
32
COLLECTIONS_INDEX = "stac_collections"
33
33
34
+ DEFAULT_SORT = {
35
+ "properties.datetime" : {"order" : "desc" },
36
+ "id" : {"order" : "desc" },
37
+ "collection" : {"order" : "desc" },
38
+ }
39
+
40
+
41
+ def bbox2polygon (b0 , b1 , b2 , b3 ):
42
+ """Transform bbox to polygon."""
43
+ return [[[b0 , b1 ], [b2 , b1 ], [b2 , b3 ], [b0 , b3 ], [b0 , b1 ]]]
44
+
34
45
35
46
def mk_item_id (item_id : str , collection_id : str ):
36
47
"""Make the Elasticsearch document _id value from the Item id and collection."""
@@ -43,6 +54,7 @@ class DatabaseLogic:
43
54
44
55
client = AsyncElasticsearchSettings ().create_client
45
56
sync_client = SyncElasticsearchSettings ().create_client
57
+
46
58
item_serializer : Type [serializers .ItemSerializer ] = attr .ib (
47
59
default = serializers .ItemSerializer
48
60
)
@@ -54,43 +66,30 @@ class DatabaseLogic:
54
66
55
67
async def get_all_collections (self , base_url : str ) -> List [Collection ]:
56
68
"""Database logic to retrieve a list of all collections."""
57
- collections = await self . client . search (
58
- index = COLLECTIONS_INDEX , query = { "match_all" : {}}
59
- )
69
+ # https://github.com/stac-utils/stac-fastapi-elasticsearch/issues/65
70
+ # collections should be paginated, but at least return more than the default 10 for now
71
+ collections = await self . client . search ( index = COLLECTIONS_INDEX , size = 1000 )
60
72
61
73
return [
62
74
self .collection_serializer .db_to_stac (c ["_source" ], base_url = base_url )
63
75
for c in collections ["hits" ]["hits" ]
64
76
]
65
77
66
- async def search_count (self , search : Search ) -> int :
78
+ async def search_count (self , search : Search ) -> Optional [ int ] :
67
79
"""Database logic to count search results."""
68
80
return (
69
81
await self .client .count (index = ITEMS_INDEX , body = search .to_dict (count = True ))
70
82
).get ("count" )
71
83
72
- async def get_item_collection (
84
+ async def get_collection_items (
73
85
self , collection_id : str , limit : int , base_url : str
74
86
) -> Tuple [List [Item ], Optional [int ]]:
75
87
"""Database logic to retrieve an ItemCollection and a count of items contained."""
76
- search = self .create_search ()
77
- search = search .filter ("term" , collection = collection_id )
78
-
79
- count = await self .search_count (search )
80
-
81
- search = search .query ()[0 :limit ]
82
-
83
- body = search .to_dict ()
84
- es_response = await self .client .search (
85
- index = ITEMS_INDEX , query = body ["query" ], sort = body .get ("sort" )
86
- )
87
-
88
- serialized_children = [
89
- self .item_serializer .db_to_stac (item ["_source" ], base_url = base_url )
90
- for item in es_response ["hits" ]["hits" ]
91
- ]
88
+ search = self .apply_collections_filter (Search (), [collection_id ])
89
+ items = await self .execute_search (search = search , limit = limit , base_url = base_url )
90
+ maybe_count = await self .search_count (search )
92
91
93
- return serialized_children , count
92
+ return items , maybe_count
94
93
95
94
async def get_one_item (self , collection_id : str , item_id : str ) -> Dict :
96
95
"""Database logic to retrieve a single item."""
@@ -105,48 +104,26 @@ async def get_one_item(self, collection_id: str, item_id: str) -> Dict:
105
104
return item ["_source" ]
106
105
107
106
@staticmethod
108
- def create_search ():
109
- """Database logic to create a nosql Search instance."""
110
- return Search ().sort (
111
- {"properties.datetime" : {"order" : "desc" }},
112
- {"id" : {"order" : "desc" }},
113
- {"collection" : {"order" : "desc" }},
114
- )
115
-
116
- @staticmethod
117
- def create_query_filter (search : Search , op : str , field : str , value : float ):
118
- """Database logic to perform query for search endpoint."""
119
- if op != "eq" :
120
- key_filter = {field : {f"{ op } " : value }}
121
- search = search .query (Q ("range" , ** key_filter ))
122
- else :
123
- search = search .query ("match_phrase" , ** {field : value })
124
-
125
- return search
107
+ def make_search ():
108
+ """Database logic to create a Search instance."""
109
+ return Search ().sort (* DEFAULT_SORT )
126
110
127
111
@staticmethod
128
- def search_ids (search : Search , item_ids : List [str ]):
112
+ def apply_ids_filter (search : Search , item_ids : List [str ]):
129
113
"""Database logic to search a list of STAC item ids."""
130
- return search .query (
131
- Q ("bool" , should = [Q ("term" , ** {"id" : i_id }) for i_id in item_ids ])
132
- )
114
+ return search .filter ("terms" , id = item_ids )
133
115
134
116
@staticmethod
135
- def filter_collections (search : Search , collection_ids : List ):
117
+ def apply_collections_filter (search : Search , collection_ids : List [ str ] ):
136
118
"""Database logic to search a list of STAC collection ids."""
137
- return search .query (
138
- Q (
139
- "bool" ,
140
- should = [Q ("term" , ** {"collection" : c_id }) for c_id in collection_ids ],
141
- )
142
- )
119
+ return search .filter ("terms" , collection = collection_ids )
143
120
144
121
@staticmethod
145
- def search_datetime (search : Search , datetime_search ):
122
+ def apply_datetime_filter (search : Search , datetime_search ):
146
123
"""Database logic to search datetime field."""
147
124
if "eq" in datetime_search :
148
- search = search .query (
149
- "match_phrase " , ** {"properties__datetime" : datetime_search ["eq" ]}
125
+ search = search .filter (
126
+ "term " , ** {"properties__datetime" : datetime_search ["eq" ]}
150
127
)
151
128
else :
152
129
search = search .filter (
@@ -158,24 +135,28 @@ def search_datetime(search: Search, datetime_search):
158
135
return search
159
136
160
137
@staticmethod
161
- def search_bbox (search : Search , bbox : List ):
138
+ def apply_bbox_filter (search : Search , bbox : List ):
162
139
"""Database logic to search on bounding box."""
163
- polygon = DatabaseLogic .bbox2polygon (bbox [0 ], bbox [1 ], bbox [2 ], bbox [3 ])
164
- bbox_filter = Q (
165
- {
166
- "geo_shape" : {
167
- "geometry" : {
168
- "shape" : {"type" : "polygon" , "coordinates" : polygon },
169
- "relation" : "intersects" ,
140
+ return search .filter (
141
+ Q (
142
+ {
143
+ "geo_shape" : {
144
+ "geometry" : {
145
+ "shape" : {
146
+ "type" : "polygon" ,
147
+ "coordinates" : bbox2polygon (
148
+ bbox [0 ], bbox [1 ], bbox [2 ], bbox [3 ]
149
+ ),
150
+ },
151
+ "relation" : "intersects" ,
152
+ }
170
153
}
171
154
}
172
- }
155
+ )
173
156
)
174
- search = search .query (bbox_filter )
175
- return search
176
157
177
158
@staticmethod
178
- def search_intersects (
159
+ def apply_intersects_filter (
179
160
search : Search ,
180
161
intersects : Union [
181
162
Point ,
@@ -188,33 +169,45 @@ def search_intersects(
188
169
],
189
170
):
190
171
"""Database logic to search a geojson object."""
191
- intersect_filter = Q (
192
- {
193
- "geo_shape" : {
194
- "geometry" : {
195
- "shape" : {
196
- "type" : intersects .type .lower (),
197
- "coordinates" : intersects .coordinates ,
198
- },
199
- "relation" : "intersects" ,
172
+ return search .filter (
173
+ Q (
174
+ {
175
+ "geo_shape" : {
176
+ "geometry" : {
177
+ "shape" : {
178
+ "type" : intersects .type .lower (),
179
+ "coordinates" : intersects .coordinates ,
180
+ },
181
+ "relation" : "intersects" ,
182
+ }
200
183
}
201
184
}
202
- }
185
+ )
203
186
)
204
- search = search .query (intersect_filter )
187
+
188
+ @staticmethod
189
+ def apply_stacql_filter (search : Search , op : str , field : str , value : float ):
190
+ """Database logic to perform query for search endpoint."""
191
+ if op != "eq" :
192
+ key_filter = {field : {f"{ op } " : value }}
193
+ search = search .filter (Q ("range" , ** key_filter ))
194
+ else :
195
+ search = search .filter ("term" , ** {field : value })
196
+
205
197
return search
206
198
207
199
@staticmethod
208
- def sort_field (search : Search , field , direction ):
200
+ def apply_sort (search : Search , field , direction ):
209
201
"""Database logic to sort search instance."""
210
202
return search .sort ({field : {"order" : direction }})
211
203
212
- async def execute_search (self , search , limit : int , base_url : str ) -> List :
204
+ async def execute_search (self , search , limit : int , base_url : str ) -> List [ Item ] :
213
205
"""Database logic to execute search with limit."""
214
- search = search . query () [0 :limit ]
206
+ search = search [0 :limit ]
215
207
body = search .to_dict ()
208
+
216
209
es_response = await self .client .search (
217
- index = ITEMS_INDEX , query = body [ "query" ] , sort = body .get ("sort" )
210
+ index = ITEMS_INDEX , query = body . get ( "query" ) , sort = body .get ("sort" )
218
211
)
219
212
220
213
return [
@@ -330,7 +323,8 @@ def bulk_sync(self, processed_items, refresh: bool = False):
330
323
self .sync_client , self ._mk_actions (processed_items ), refresh = refresh
331
324
)
332
325
333
- def _mk_actions (self , processed_items ):
326
+ @staticmethod
327
+ def _mk_actions (processed_items ):
334
328
return [
335
329
{
336
330
"_index" : ITEMS_INDEX ,
@@ -357,8 +351,3 @@ async def delete_collections(self) -> None:
357
351
body = {"query" : {"match_all" : {}}},
358
352
wait_for_completion = True ,
359
353
)
360
-
361
- @staticmethod
362
- def bbox2polygon (b0 , b1 , b2 , b3 ):
363
- """Transform bbox to polygon."""
364
- return [[[b0 , b1 ], [b2 , b1 ], [b2 , b3 ], [b0 , b3 ], [b0 , b1 ]]]
0 commit comments