Skip to content

Commit 2755198

Browse files
authored
Merge pull request #10 from jonhealy1/bulk_trans
implement bulk transactions
2 parents e8075d3 + cfd38e1 commit 2755198

File tree

4 files changed

+92
-43
lines changed

4 files changed

+92
-43
lines changed

stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ def get_collection(self, collection_id: str, **kwargs) -> Collection:
9494
return self.collection_serializer.db_to_stac(collection["_source"], base_url)
9595

9696
def item_collection(
97-
self, collection_id: str, limit: int = 10, token: str = None, **kwargs
97+
self, collection_id: str, limit: int = 10, **kwargs
9898
) -> ItemCollection:
9999
"""Read an item collection from the database."""
100100
links = []

stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/serializers.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ def db_to_stac(cls, collection: dict, base_url: str) -> stac_types.Collection:
6363
if original_links:
6464
collection_links += resolve_links(original_links, base_url)
6565

66+
if "providers" not in collection:
67+
collection["providers"] = {}
68+
6669
return stac_types.Collection(
6770
type="Collection",
6871
id=collection["id"],

stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/transactions.py

Lines changed: 81 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,26 @@ class TransactionsClient(BaseTransactionsClient):
3131
settings = ElasticsearchSettings()
3232
client = settings.create_client
3333

34+
def _create_item_index(self):
35+
mapping = {
36+
"mappings": {
37+
"properties": {
38+
"geometry": {"type": "geo_shape"},
39+
"id": {"type": "text", "fields": {"keyword": {"type": "keyword"}}},
40+
"properties__datetime": {
41+
"type": "text",
42+
"fields": {"keyword": {"type": "keyword"}},
43+
},
44+
}
45+
}
46+
}
47+
48+
_ = self.client.indices.create(
49+
index="stac_items",
50+
body=mapping,
51+
ignore=400, # ignore 400 already exists code
52+
)
53+
3454
def create_item(self, model: stac_types.Item, **kwargs):
3555
"""Create item."""
3656
base_url = str(kwargs["request"].base_url)
@@ -59,24 +79,7 @@ def create_item(self, model: stac_types.Item, **kwargs):
5979
if "created" not in model["properties"]:
6080
model["properties"]["created"] = str(now)
6181

62-
mapping = {
63-
"mappings": {
64-
"properties": {
65-
"geometry": {"type": "geo_shape"},
66-
"id": {"type": "text", "fields": {"keyword": {"type": "keyword"}}},
67-
"properties__datetime": {
68-
"type": "text",
69-
"fields": {"keyword": {"type": "keyword"}},
70-
},
71-
}
72-
}
73-
}
74-
75-
_ = self.client.indices.create(
76-
index="stac_items",
77-
body=mapping,
78-
ignore=400, # ignore 400 already exists code
79-
)
82+
self._create_item_index()
8083

8184
self.client.index(
8285
index="stac_items", doc_type="_doc", id=model["id"], document=model
@@ -91,6 +94,8 @@ def create_collection(self, model: stac_types.Collection, **kwargs):
9194
).create_links()
9295
model["links"] = collection_links
9396

97+
self._create_item_index()
98+
9499
if self.client.exists(index="stac_collections", id=model["id"]):
95100
raise ConflictError(f"Collection {model['id']} already exists")
96101
self.client.index(
@@ -155,39 +160,79 @@ def __attrs_post_init__(self):
155160
settings = ElasticsearchSettings()
156161
self.client = settings.create_client
157162

163+
def _create_item_index(self):
164+
mapping = {
165+
"mappings": {
166+
"properties": {
167+
"geometry": {"type": "geo_shape"},
168+
"id": {"type": "text", "fields": {"keyword": {"type": "keyword"}}},
169+
"properties__datetime": {
170+
"type": "text",
171+
"fields": {"keyword": {"type": "keyword"}},
172+
},
173+
}
174+
}
175+
}
176+
177+
_ = self.client.indices.create(
178+
index="stac_items",
179+
body=mapping,
180+
ignore=400, # ignore 400 already exists code
181+
)
182+
158183
def _preprocess_item(self, model: stac_types.Item, base_url) -> stac_types.Item:
159184
"""Preprocess items to match data model."""
160185
item_links = ItemLinks(
161186
collection_id=model["collection"], item_id=model["id"], base_url=base_url
162187
).create_links()
163188
model["links"] = item_links
164189

165-
# with self.client.start_session(causal_consistency=True) as session:
166-
# error_check = ErrorChecks(session=session, client=self.client)
167-
# error_check._check_collection_foreign_key(model)
168-
# error_check._check_item_conflict(model)
169-
# now = datetime.utcnow().strftime(DATETIME_RFC339)
170-
# if "created" not in model["properties"]:
171-
# model["properties"]["created"] = str(now)
172-
# return model
190+
if not self.client.exists(index="stac_collections", id=model["collection"]):
191+
raise ForeignKeyError(f"Collection {model['collection']} does not exist")
192+
193+
if self.client.exists(index="stac_items", id=model["id"]):
194+
raise ConflictError(
195+
f"Item {model['id']} in collection {model['collection']} already exists"
196+
)
197+
198+
now = datetime.utcnow().strftime(DATETIME_RFC339)
199+
if "created" not in model["properties"]:
200+
model["properties"]["created"] = str(now)
201+
202+
# elasticsearch doesn't like the fact that some values are float and some were int
203+
if "eo:bands" in model["properties"]:
204+
for wave in model["properties"]["eo:bands"]:
205+
for k, v in wave.items():
206+
if type(v) != str:
207+
v = float(v)
208+
wave.update({k: v})
209+
return model
173210

174211
def bulk_item_insert(self, items: Items, **kwargs) -> str:
175212
"""Bulk item insertion using es."""
213+
self._create_item_index()
176214
try:
177215
base_url = str(kwargs["request"].base_url)
178216
except Exception:
179217
base_url = ""
180218
processed_items = [self._preprocess_item(item, base_url) for item in items]
181219
return_msg = f"Successfully added {len(processed_items)} items."
182-
# with self.client.start_session(causal_consistency=True) as session:
183-
# self.item_table.insert_many(processed_items, session=session)
184-
# return return_msg
185220

186-
helpers.bulk(
187-
self.client,
188-
processed_items,
189-
index="stac_items",
190-
doc_type="_doc",
191-
request_timeout=200,
192-
)
221+
# helpers.bulk(
222+
# self.client,
223+
# processed_items,
224+
# index="stac_items",
225+
# doc_type="_doc",
226+
# request_timeout=200,
227+
# )
228+
229+
def bulk_sync(processed_items):
230+
actions = [
231+
{"_index": "stac_items", "_source": item} for item in processed_items
232+
]
233+
234+
helpers.bulk(self.client, actions)
235+
236+
bulk_sync(processed_items)
237+
193238
return return_msg

stac_fastapi/elasticsearch/tests/clients/test_elasticsearch.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import time
12
import uuid
23
from copy import deepcopy
34
from typing import Callable
@@ -256,7 +257,7 @@ def test_delete_item(
256257
es_core.get_item(item["id"], item["collection"], request=MockStarletteRequest)
257258

258259

259-
@pytest.mark.skip(reason="bulk not implemented")
260+
# @pytest.mark.skip(reason="might need a larger timeout")
260261
def test_bulk_item_insert(
261262
es_core: CoreCrudClient,
262263
es_transactions: TransactionsClient,
@@ -278,14 +279,14 @@ def test_bulk_item_insert(
278279
assert len(fc["features"]) == 0
279280

280281
es_bulk_transactions.bulk_item_insert(items=items)
281-
282+
time.sleep(3)
282283
fc = es_core.item_collection(coll["id"], request=MockStarletteRequest)
283284
assert len(fc["features"]) == 10
284285

285-
for item in items:
286-
es_transactions.delete_item(
287-
item["id"], item["collection"], request=MockStarletteRequest
288-
)
286+
# for item in items:
287+
# es_transactions.delete_item(
288+
# item["id"], item["collection"], request=MockStarletteRequest
289+
# )
289290

290291

291292
@pytest.mark.skip(reason="Not working")

0 commit comments

Comments
 (0)