-
Notifications
You must be signed in to change notification settings - Fork 25
use concatenation of id and collection for elasticsearch _id value #58
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,6 +33,11 @@ | |
COLLECTIONS_INDEX = "stac_collections" | ||
|
||
|
||
def mk_item_id(item_id: str, collection_id: str): | ||
"""Make the Elasticsearch document _id value from the Item id and collection.""" | ||
return f"{item_id}|{collection_id}" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great idea! |
||
|
||
|
||
@attr.s | ||
class CoreCrudClient(BaseCoreClient): | ||
"""Client for core endpoints defined by stac.""" | ||
|
@@ -142,7 +147,9 @@ def get_item(self, item_id: str, collection_id: str, **kwargs) -> Item: | |
"""Get item by item id, collection id.""" | ||
base_url = str(kwargs["request"].base_url) | ||
try: | ||
item = self.client.get(index=ITEMS_INDEX, id=item_id) | ||
item = self.client.get( | ||
index=ITEMS_INDEX, id=mk_item_id(item_id, collection_id) | ||
) | ||
except elasticsearch.exceptions.NotFoundError: | ||
raise NotFoundError( | ||
f"Item {item_id} does not exist in Collection {collection_id}" | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,7 +10,7 @@ | |
from overrides import overrides | ||
|
||
from stac_fastapi.elasticsearch.config import ElasticsearchSettings | ||
from stac_fastapi.elasticsearch.core import COLLECTIONS_INDEX, ITEMS_INDEX | ||
from stac_fastapi.elasticsearch.core import COLLECTIONS_INDEX, ITEMS_INDEX, mk_item_id | ||
from stac_fastapi.elasticsearch.serializers import CollectionSerializer, ItemSerializer | ||
from stac_fastapi.elasticsearch.session import Session | ||
from stac_fastapi.extensions.third_party.bulk_transactions import ( | ||
|
@@ -42,29 +42,38 @@ def create_item(self, item: stac_types.Item, **kwargs) -> stac_types.Item: | |
if item["type"] == "FeatureCollection": | ||
bulk_client = BulkTransactionsClient() | ||
processed_items = [ | ||
bulk_client._preprocess_item(item, base_url) | ||
for item in item["features"] | ||
bulk_client.preprocess_item(item, base_url) for item in item["features"] | ||
] | ||
return_msg = f"Successfully added {len(processed_items)} items." | ||
bulk_client.bulk_sync(processed_items) | ||
|
||
return return_msg | ||
|
||
# If a single item is posted | ||
if not self.client.exists(index=COLLECTIONS_INDEX, id=item["collection"]): | ||
raise ForeignKeyError(f"Collection {item['collection']} does not exist") | ||
|
||
if self.client.exists(index=ITEMS_INDEX, id=item["id"]): | ||
raise ConflictError( | ||
f"Item {item['id']} in collection {item['collection']} already exists" | ||
else: | ||
# todo: check if collection exists, but cache | ||
if not self.client.exists(index=COLLECTIONS_INDEX, id=item["collection"]): | ||
raise ForeignKeyError(f"Collection {item['collection']} does not exist") | ||
|
||
if self.client.exists( | ||
index=ITEMS_INDEX, id=mk_item_id(item["id"], item["collection"]) | ||
): | ||
raise ConflictError( | ||
f"Item {item['id']} in collection {item['collection']} already exists" | ||
) | ||
|
||
item = BulkTransactionsClient().preprocess_item(item, base_url) | ||
|
||
es_resp = self.client.index( | ||
index=ITEMS_INDEX, | ||
id=mk_item_id(item["id"], item["collection"]), | ||
document=item, | ||
) | ||
|
||
data = ItemSerializer.stac_to_db(item, base_url) | ||
if (meta := es_resp.get("meta")) and meta.get("status") == 409: | ||
raise ConflictError( | ||
f"Item {item['id']} in collection {item['collection']} already exists" | ||
) | ||
|
||
self.client.index( | ||
index=ITEMS_INDEX, doc_type="_doc", id=item["id"], document=data | ||
) | ||
return ItemSerializer.db_to_stac(item, base_url) | ||
return item | ||
|
||
@overrides | ||
def update_item(self, item: stac_types.Item, **kwargs) -> stac_types.Item: | ||
|
@@ -75,14 +84,11 @@ def update_item(self, item: stac_types.Item, **kwargs) -> stac_types.Item: | |
|
||
if not self.client.exists(index=COLLECTIONS_INDEX, id=item["collection"]): | ||
raise ForeignKeyError(f"Collection {item['collection']} does not exist") | ||
if not self.client.exists(index=ITEMS_INDEX, id=item["id"]): | ||
raise NotFoundError( | ||
f"Item {item['id']} in collection {item['collection']} doesn't exist" | ||
) | ||
|
||
# todo: index instead of delete and create | ||
self.delete_item(item["id"], item["collection"]) | ||
self.create_item(item, **kwargs) | ||
# self.client.update(index=ITEMS_INDEX,doc_type='_doc',id=model["id"], | ||
# body=model) | ||
# self.client.update(index=ITEMS_INDEX,id=item["id"], body=item) | ||
return ItemSerializer.db_to_stac(item, base_url) | ||
|
||
@overrides | ||
|
@@ -91,10 +97,12 @@ def delete_item( | |
) -> stac_types.Item: | ||
"""Delete item.""" | ||
try: | ||
_ = self.client.get(index=ITEMS_INDEX, id=item_id) | ||
self.client.delete(index=ITEMS_INDEX, id=mk_item_id(item_id, collection_id)) | ||
except elasticsearch.exceptions.NotFoundError: | ||
raise NotFoundError(f"Item {item_id} not found") | ||
self.client.delete(index=ITEMS_INDEX, doc_type="_doc", id=item_id) | ||
raise NotFoundError( | ||
f"Item {item_id} in collection {collection_id} not found" | ||
) | ||
return None | ||
|
||
@overrides | ||
def create_collection( | ||
|
@@ -109,9 +117,9 @@ def create_collection( | |
|
||
if self.client.exists(index=COLLECTIONS_INDEX, id=collection["id"]): | ||
raise ConflictError(f"Collection {collection['id']} already exists") | ||
|
||
self.client.index( | ||
index=COLLECTIONS_INDEX, | ||
doc_type="_doc", | ||
id=collection["id"], | ||
document=collection, | ||
) | ||
|
@@ -139,7 +147,8 @@ def delete_collection(self, collection_id: str, **kwargs) -> stac_types.Collecti | |
_ = self.client.get(index=COLLECTIONS_INDEX, id=collection_id) | ||
except elasticsearch.exceptions.NotFoundError: | ||
raise NotFoundError(f"Collection {collection_id} not found") | ||
self.client.delete(index=COLLECTIONS_INDEX, doc_type="_doc", id=collection_id) | ||
self.client.delete(index=COLLECTIONS_INDEX, id=collection_id) | ||
return None | ||
|
||
|
||
@attr.s | ||
|
@@ -153,38 +162,45 @@ def __attrs_post_init__(self): | |
settings = ElasticsearchSettings() | ||
self.client = settings.create_client | ||
|
||
def _preprocess_item(self, model: stac_types.Item, base_url) -> stac_types.Item: | ||
def preprocess_item(self, item: stac_types.Item, base_url) -> stac_types.Item: | ||
"""Preprocess items to match data model.""" | ||
if not self.client.exists(index=COLLECTIONS_INDEX, id=model["collection"]): | ||
raise ForeignKeyError(f"Collection {model['collection']} does not exist") | ||
if not self.client.exists(index=COLLECTIONS_INDEX, id=item["collection"]): | ||
raise ForeignKeyError(f"Collection {item['collection']} does not exist") | ||
|
||
if self.client.exists(index=ITEMS_INDEX, id=model["id"]): | ||
if self.client.exists(index=ITEMS_INDEX, id=item["id"]): | ||
raise ConflictError( | ||
f"Item {model['id']} in collection {model['collection']} already exists" | ||
f"Item {item['id']} in collection {item['collection']} already exists" | ||
) | ||
|
||
item = ItemSerializer.stac_to_db(model, base_url) | ||
return item | ||
return ItemSerializer.stac_to_db(item, base_url) | ||
|
||
def bulk_sync(self, processed_items): | ||
"""Elasticsearch bulk insertion.""" | ||
actions = [{"_index": ITEMS_INDEX, "_source": item} for item in processed_items] | ||
actions = [ | ||
{ | ||
"_index": ITEMS_INDEX, | ||
"_id": mk_item_id(item["id"], item["collection"]), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. explicitly set the document id for bulk insert now |
||
"_source": item, | ||
} | ||
for item in processed_items | ||
] | ||
helpers.bulk(self.client, actions) | ||
|
||
@overrides | ||
def bulk_item_insert( | ||
self, items: Items, chunk_size: Optional[int] = None, **kwargs | ||
) -> str: | ||
"""Bulk item insertion using es.""" | ||
try: | ||
base_url = str(kwargs["request"].base_url) | ||
except Exception: | ||
request = kwargs.get("request") | ||
if request: | ||
base_url = str(request.base_url) | ||
else: | ||
base_url = "" | ||
|
||
processed_items = [ | ||
self._preprocess_item(item, base_url) for item in items.items.values() | ||
self.preprocess_item(item, base_url) for item in items.items.values() | ||
] | ||
return_msg = f"Successfully added {len(processed_items)} items." | ||
|
||
self.bulk_sync(processed_items) | ||
|
||
return return_msg | ||
return f"Successfully added {len(processed_items)} Items." |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -100,19 +100,19 @@ def test_get_item( | |
item_data = load_test_data("test_item.json") | ||
es_transactions.create_collection(collection_data, request=MockStarletteRequest) | ||
es_transactions.create_item(item_data, request=MockStarletteRequest) | ||
coll = es_core.get_item( | ||
got_item = es_core.get_item( | ||
item_id=item_data["id"], | ||
collection_id=item_data["collection"], | ||
request=MockStarletteRequest, | ||
) | ||
assert coll["id"] == item_data["id"] | ||
assert coll["collection"] == item_data["collection"] | ||
assert got_item["id"] == item_data["id"] | ||
assert got_item["collection"] == item_data["collection"] | ||
|
||
es_transactions.delete_collection( | ||
collection_data["id"], request=MockStarletteRequest | ||
) | ||
es_transactions.delete_item( | ||
item_data["id"], coll["id"], request=MockStarletteRequest | ||
item_data["id"], item_data["collection"], request=MockStarletteRequest | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this test started failing because the wrong collection value was passed (notice it's passing the Item ID again), but the collection was previously ignored in delete, whereas now it's used. |
||
) | ||
|
||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -153,7 +153,10 @@ def app_client(api_client, load_test_data): | |
try: | ||
client.create_collection(coll, request=MockStarletteRequest) | ||
except ConflictError: | ||
pass | ||
try: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. these tests do not clean up well, especially when any of them fail. This is a stop-gap until that's better handled. |
||
client.delete_item("test-item", "test-collection") | ||
except Exception: | ||
pass | ||
|
||
with TestClient(api_client.app) as test_app: | ||
yield test_app |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -143,6 +143,8 @@ def test_update_item_missing_collection(app_client, load_test_data): | |
def test_update_item_geometry(app_client, load_test_data): | ||
test_item = load_test_data("test_item.json") | ||
|
||
test_item["id"] = "update_test_item_1" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. again, another stop-gap. The index should probably be cleared before each run, and each item should be given a random id. |
||
|
||
# Create the item | ||
resp = app_client.post( | ||
f"/collections/{test_item['collection']}/items", json=test_item | ||
|
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the
-
at the beginning will allow the execution of docker-compose down even if there are failing tests.