Skip to content

use concatenation of id and collection for elasticsearch _id value #58

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ docker-shell:

.PHONY: test
test:
$(run_es) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh elasticsearch:9200 && cd /app/stac_fastapi/elasticsearch/tests/ && pytest'
-$(run_es) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh elasticsearch:9200 && cd /app/stac_fastapi/elasticsearch/tests/ && pytest'
Copy link
Collaborator Author

@philvarner philvarner Mar 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the - at the beginning will allow the execution of docker-compose down even if there are failing tests.

docker-compose down

.PHONY: run-database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@
COLLECTIONS_INDEX = "stac_collections"


def mk_item_id(item_id: str, collection_id: str):
"""Make the Elasticsearch document _id value from the Item id and collection."""
return f"{item_id}|{collection_id}"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great idea!



@attr.s
class CoreCrudClient(BaseCoreClient):
"""Client for core endpoints defined by stac."""
Expand Down Expand Up @@ -142,7 +147,9 @@ def get_item(self, item_id: str, collection_id: str, **kwargs) -> Item:
"""Get item by item id, collection id."""
base_url = str(kwargs["request"].base_url)
try:
item = self.client.get(index=ITEMS_INDEX, id=item_id)
item = self.client.get(
index=ITEMS_INDEX, id=mk_item_id(item_id, collection_id)
)
except elasticsearch.exceptions.NotFoundError:
raise NotFoundError(
f"Item {item_id} does not exist in Collection {collection_id}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from overrides import overrides

from stac_fastapi.elasticsearch.config import ElasticsearchSettings
from stac_fastapi.elasticsearch.core import COLLECTIONS_INDEX, ITEMS_INDEX
from stac_fastapi.elasticsearch.core import COLLECTIONS_INDEX, ITEMS_INDEX, mk_item_id
from stac_fastapi.elasticsearch.serializers import CollectionSerializer, ItemSerializer
from stac_fastapi.elasticsearch.session import Session
from stac_fastapi.extensions.third_party.bulk_transactions import (
Expand Down Expand Up @@ -42,29 +42,38 @@ def create_item(self, item: stac_types.Item, **kwargs) -> stac_types.Item:
if item["type"] == "FeatureCollection":
bulk_client = BulkTransactionsClient()
processed_items = [
bulk_client._preprocess_item(item, base_url)
for item in item["features"]
bulk_client.preprocess_item(item, base_url) for item in item["features"]
]
return_msg = f"Successfully added {len(processed_items)} items."
bulk_client.bulk_sync(processed_items)

return return_msg

# If a single item is posted
if not self.client.exists(index=COLLECTIONS_INDEX, id=item["collection"]):
raise ForeignKeyError(f"Collection {item['collection']} does not exist")

if self.client.exists(index=ITEMS_INDEX, id=item["id"]):
raise ConflictError(
f"Item {item['id']} in collection {item['collection']} already exists"
else:
# todo: check if collection exists, but cache
if not self.client.exists(index=COLLECTIONS_INDEX, id=item["collection"]):
raise ForeignKeyError(f"Collection {item['collection']} does not exist")

if self.client.exists(
index=ITEMS_INDEX, id=mk_item_id(item["id"], item["collection"])
):
raise ConflictError(
f"Item {item['id']} in collection {item['collection']} already exists"
)

item = BulkTransactionsClient().preprocess_item(item, base_url)

es_resp = self.client.index(
index=ITEMS_INDEX,
id=mk_item_id(item["id"], item["collection"]),
document=item,
)

data = ItemSerializer.stac_to_db(item, base_url)
if (meta := es_resp.get("meta")) and meta.get("status") == 409:
raise ConflictError(
f"Item {item['id']} in collection {item['collection']} already exists"
)

self.client.index(
index=ITEMS_INDEX, doc_type="_doc", id=item["id"], document=data
)
return ItemSerializer.db_to_stac(item, base_url)
return item

@overrides
def update_item(self, item: stac_types.Item, **kwargs) -> stac_types.Item:
Expand All @@ -75,14 +84,11 @@ def update_item(self, item: stac_types.Item, **kwargs) -> stac_types.Item:

if not self.client.exists(index=COLLECTIONS_INDEX, id=item["collection"]):
raise ForeignKeyError(f"Collection {item['collection']} does not exist")
if not self.client.exists(index=ITEMS_INDEX, id=item["id"]):
raise NotFoundError(
f"Item {item['id']} in collection {item['collection']} doesn't exist"
)

# todo: index instead of delete and create
self.delete_item(item["id"], item["collection"])
self.create_item(item, **kwargs)
# self.client.update(index=ITEMS_INDEX,doc_type='_doc',id=model["id"],
# body=model)
# self.client.update(index=ITEMS_INDEX,id=item["id"], body=item)
return ItemSerializer.db_to_stac(item, base_url)

@overrides
Expand All @@ -91,10 +97,12 @@ def delete_item(
) -> stac_types.Item:
"""Delete item."""
try:
_ = self.client.get(index=ITEMS_INDEX, id=item_id)
self.client.delete(index=ITEMS_INDEX, id=mk_item_id(item_id, collection_id))
except elasticsearch.exceptions.NotFoundError:
raise NotFoundError(f"Item {item_id} not found")
self.client.delete(index=ITEMS_INDEX, doc_type="_doc", id=item_id)
raise NotFoundError(
f"Item {item_id} in collection {collection_id} not found"
)
return None

@overrides
def create_collection(
Expand All @@ -109,9 +117,9 @@ def create_collection(

if self.client.exists(index=COLLECTIONS_INDEX, id=collection["id"]):
raise ConflictError(f"Collection {collection['id']} already exists")

self.client.index(
index=COLLECTIONS_INDEX,
doc_type="_doc",
id=collection["id"],
document=collection,
)
Expand Down Expand Up @@ -139,7 +147,8 @@ def delete_collection(self, collection_id: str, **kwargs) -> stac_types.Collecti
_ = self.client.get(index=COLLECTIONS_INDEX, id=collection_id)
except elasticsearch.exceptions.NotFoundError:
raise NotFoundError(f"Collection {collection_id} not found")
self.client.delete(index=COLLECTIONS_INDEX, doc_type="_doc", id=collection_id)
self.client.delete(index=COLLECTIONS_INDEX, id=collection_id)
return None


@attr.s
Expand All @@ -153,38 +162,45 @@ def __attrs_post_init__(self):
settings = ElasticsearchSettings()
self.client = settings.create_client

def _preprocess_item(self, model: stac_types.Item, base_url) -> stac_types.Item:
def preprocess_item(self, item: stac_types.Item, base_url) -> stac_types.Item:
"""Preprocess items to match data model."""
if not self.client.exists(index=COLLECTIONS_INDEX, id=model["collection"]):
raise ForeignKeyError(f"Collection {model['collection']} does not exist")
if not self.client.exists(index=COLLECTIONS_INDEX, id=item["collection"]):
raise ForeignKeyError(f"Collection {item['collection']} does not exist")

if self.client.exists(index=ITEMS_INDEX, id=model["id"]):
if self.client.exists(index=ITEMS_INDEX, id=item["id"]):
raise ConflictError(
f"Item {model['id']} in collection {model['collection']} already exists"
f"Item {item['id']} in collection {item['collection']} already exists"
)

item = ItemSerializer.stac_to_db(model, base_url)
return item
return ItemSerializer.stac_to_db(item, base_url)

def bulk_sync(self, processed_items):
"""Elasticsearch bulk insertion."""
actions = [{"_index": ITEMS_INDEX, "_source": item} for item in processed_items]
actions = [
{
"_index": ITEMS_INDEX,
"_id": mk_item_id(item["id"], item["collection"]),
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

explicitly set the document id for bulk insert now

"_source": item,
}
for item in processed_items
]
helpers.bulk(self.client, actions)

@overrides
def bulk_item_insert(
self, items: Items, chunk_size: Optional[int] = None, **kwargs
) -> str:
"""Bulk item insertion using es."""
try:
base_url = str(kwargs["request"].base_url)
except Exception:
request = kwargs.get("request")
if request:
base_url = str(request.base_url)
else:
base_url = ""

processed_items = [
self._preprocess_item(item, base_url) for item in items.items.values()
self.preprocess_item(item, base_url) for item in items.items.values()
]
return_msg = f"Successfully added {len(processed_items)} items."

self.bulk_sync(processed_items)

return return_msg
return f"Successfully added {len(processed_items)} Items."
Original file line number Diff line number Diff line change
Expand Up @@ -100,19 +100,19 @@ def test_get_item(
item_data = load_test_data("test_item.json")
es_transactions.create_collection(collection_data, request=MockStarletteRequest)
es_transactions.create_item(item_data, request=MockStarletteRequest)
coll = es_core.get_item(
got_item = es_core.get_item(
item_id=item_data["id"],
collection_id=item_data["collection"],
request=MockStarletteRequest,
)
assert coll["id"] == item_data["id"]
assert coll["collection"] == item_data["collection"]
assert got_item["id"] == item_data["id"]
assert got_item["collection"] == item_data["collection"]

es_transactions.delete_collection(
collection_data["id"], request=MockStarletteRequest
)
es_transactions.delete_item(
item_data["id"], coll["id"], request=MockStarletteRequest
item_data["id"], item_data["collection"], request=MockStarletteRequest
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test started failing because the wrong collection value was passed (notice it's passing the Item ID again), but the collection was previously ignored in delete, whereas now it's used.

)


Expand Down
5 changes: 4 additions & 1 deletion stac_fastapi/elasticsearch/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,10 @@ def app_client(api_client, load_test_data):
try:
client.create_collection(coll, request=MockStarletteRequest)
except ConflictError:
pass
try:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these tests do not clean up well, especially when any of them fail. This is a stop-gap until that's better handled.

client.delete_item("test-item", "test-collection")
except Exception:
pass

with TestClient(api_client.app) as test_app:
yield test_app
2 changes: 2 additions & 0 deletions stac_fastapi/elasticsearch/tests/resources/test_item.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ def test_update_item_missing_collection(app_client, load_test_data):
def test_update_item_geometry(app_client, load_test_data):
test_item = load_test_data("test_item.json")

test_item["id"] = "update_test_item_1"
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, another stop-gap. The index should probably be cleared before each run, and each item should be given a random id.


# Create the item
resp = app_client.post(
f"/collections/{test_item['collection']}/items", json=test_item
Expand Down