Skip to content

Commit 214020a

Browse files
author
Phil Varner
committed
move classes in transactions.py to core.py, to align with parent class organization
1 parent ba7409c commit 214020a

File tree

8 files changed

+145
-160
lines changed

8 files changed

+145
-160
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ pre-commit run --all-files`
1818

1919
```shell
2020
cd stac_fastapi/elasticsearch
21-
pip install .[dev]
21+
pip install -e .[dev]
2222
```
2323

2424
## Building

docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,6 @@ services:
3838
discovery.type: single-node
3939
network.host: 0.0.0.0
4040
http.port: 9200
41-
ES_JAVA_OPTS: -Xms512m -Xmx512m
41+
ES_JAVA_OPTS: -Xms512m -Xmx4g
4242
ports:
4343
- "9200:9200"

stac_fastapi/elasticsearch/setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
desc = f.read()
77

88
install_requires = [
9+
"fastapi",
910
"attrs",
1011
"pydantic[dotenv]",
1112
"stac_pydantic==2.0.*",

stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/app.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@
22
from stac_fastapi.api.app import StacApi
33
from stac_fastapi.api.models import create_get_request_model, create_post_request_model
44
from stac_fastapi.elasticsearch.config import ElasticsearchSettings
5-
from stac_fastapi.elasticsearch.core import CoreCrudClient
6-
from stac_fastapi.elasticsearch.extensions import QueryExtension
7-
from stac_fastapi.elasticsearch.indexes import IndexesClient
8-
from stac_fastapi.elasticsearch.session import Session
9-
from stac_fastapi.elasticsearch.transactions import (
5+
from stac_fastapi.elasticsearch.core import (
106
BulkTransactionsClient,
7+
CoreCrudClient,
118
TransactionsClient,
129
)
10+
from stac_fastapi.elasticsearch.extensions import QueryExtension
11+
from stac_fastapi.elasticsearch.indexes import IndexesClient
12+
from stac_fastapi.elasticsearch.session import Session
1313
from stac_fastapi.extensions.core import (
1414
ContextExtension,
1515
FieldsExtension,

stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py

Lines changed: 131 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,31 @@
11
"""Item crud client."""
22
import json
33
import logging
4+
from datetime import datetime
45
from datetime import datetime as datetime_type
6+
from datetime import timezone
57
from typing import List, Optional, Type, Union
68
from urllib.parse import urljoin
79

810
import attr
911
from fastapi import HTTPException
1012
from overrides import overrides
11-
12-
# from geojson_pydantic.geometries import Polygon
1313
from pydantic import ValidationError
1414
from stac_pydantic.links import Relations
1515
from stac_pydantic.shared import MimeTypes
1616

1717
from stac_fastapi.elasticsearch import serializers
18+
from stac_fastapi.elasticsearch.config import ElasticsearchSettings
1819
from stac_fastapi.elasticsearch.database_logic import DatabaseLogic
20+
from stac_fastapi.elasticsearch.serializers import CollectionSerializer, ItemSerializer
1921
from stac_fastapi.elasticsearch.session import Session
20-
21-
# from stac_fastapi.elasticsearch.types.error_checks import ErrorChecks
22-
from stac_fastapi.types.core import BaseCoreClient
22+
from stac_fastapi.extensions.third_party.bulk_transactions import (
23+
BaseBulkTransactionsClient,
24+
Items,
25+
)
26+
from stac_fastapi.types import stac as stac_types
27+
from stac_fastapi.types.core import BaseCoreClient, BaseTransactionsClient
28+
from stac_fastapi.types.links import CollectionLinks
2329
from stac_fastapi.types.stac import Collection, Collections, Item, ItemCollection
2430

2531
logger = logging.getLogger(__name__)
@@ -291,3 +297,123 @@ def post_search(self, search_request, **kwargs) -> ItemCollection:
291297
links=links,
292298
context=context_obj,
293299
)
300+
301+
302+
@attr.s
303+
class TransactionsClient(BaseTransactionsClient):
304+
"""Transactions extension specific CRUD operations."""
305+
306+
session: Session = attr.ib(default=attr.Factory(Session.create_from_env))
307+
database = DatabaseLogic()
308+
309+
@overrides
310+
def create_item(self, item: stac_types.Item, **kwargs) -> stac_types.Item:
311+
"""Create item."""
312+
base_url = str(kwargs["request"].base_url)
313+
314+
# If a feature collection is posted
315+
if item["type"] == "FeatureCollection":
316+
bulk_client = BulkTransactionsClient()
317+
processed_items = [
318+
bulk_client.preprocess_item(item, base_url) for item in item["features"]
319+
]
320+
return_msg = f"Successfully added {len(processed_items)} items."
321+
self.database.bulk_sync(processed_items)
322+
323+
return return_msg
324+
else:
325+
item = self.database.prep_create_item(item=item, base_url=base_url)
326+
self.database.create_item(item=item, base_url=base_url)
327+
return item
328+
329+
@overrides
330+
def update_item(self, item: stac_types.Item, **kwargs) -> stac_types.Item:
331+
"""Update item."""
332+
base_url = str(kwargs["request"].base_url)
333+
now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
334+
item["properties"]["updated"] = str(now)
335+
336+
self.database.check_collection_exists(collection_id=item["collection"])
337+
# todo: index instead of delete and create
338+
self.delete_item(item_id=item["id"], collection_id=item["collection"])
339+
self.create_item(item=item, **kwargs)
340+
341+
return ItemSerializer.db_to_stac(item, base_url)
342+
343+
@overrides
344+
def delete_item(
345+
self, item_id: str, collection_id: str, **kwargs
346+
) -> stac_types.Item:
347+
"""Delete item."""
348+
self.database.delete_item(item_id=item_id, collection_id=collection_id)
349+
return None
350+
351+
@overrides
352+
def create_collection(
353+
self, collection: stac_types.Collection, **kwargs
354+
) -> stac_types.Collection:
355+
"""Create collection."""
356+
base_url = str(kwargs["request"].base_url)
357+
collection_links = CollectionLinks(
358+
collection_id=collection["id"], base_url=base_url
359+
).create_links()
360+
collection["links"] = collection_links
361+
self.database.create_collection(collection=collection)
362+
363+
return CollectionSerializer.db_to_stac(collection, base_url)
364+
365+
@overrides
366+
def update_collection(
367+
self, collection: stac_types.Collection, **kwargs
368+
) -> stac_types.Collection:
369+
"""Update collection."""
370+
base_url = str(kwargs["request"].base_url)
371+
372+
self.database.find_collection(collection_id=collection["id"])
373+
self.delete_collection(collection["id"])
374+
self.create_collection(collection, **kwargs)
375+
376+
return CollectionSerializer.db_to_stac(collection, base_url)
377+
378+
@overrides
379+
def delete_collection(self, collection_id: str, **kwargs) -> stac_types.Collection:
380+
"""Delete collection."""
381+
self.database.delete_collection(collection_id=collection_id)
382+
return None
383+
384+
385+
@attr.s
386+
class BulkTransactionsClient(BaseBulkTransactionsClient):
387+
"""Postgres bulk transactions."""
388+
389+
session: Session = attr.ib(default=attr.Factory(Session.create_from_env))
390+
database = DatabaseLogic()
391+
392+
def __attrs_post_init__(self):
393+
"""Create es engine."""
394+
settings = ElasticsearchSettings()
395+
self.client = settings.create_client
396+
397+
def preprocess_item(self, item: stac_types.Item, base_url) -> stac_types.Item:
398+
"""Preprocess items to match data model."""
399+
item = self.database.prep_create_item(item=item, base_url=base_url)
400+
return item
401+
402+
@overrides
403+
def bulk_item_insert(
404+
self, items: Items, chunk_size: Optional[int] = None, **kwargs
405+
) -> str:
406+
"""Bulk item insertion using es."""
407+
request = kwargs.get("request")
408+
if request:
409+
base_url = str(request.base_url)
410+
else:
411+
base_url = ""
412+
413+
processed_items = [
414+
self.preprocess_item(item, base_url) for item in items.items.values()
415+
]
416+
417+
self.database.bulk_sync(processed_items)
418+
419+
return f"Successfully added {len(processed_items)} Items."

stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/transactions.py

Lines changed: 0 additions & 142 deletions
This file was deleted.

stac_fastapi/elasticsearch/tests/clients/test_elasticsearch.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@
88
from tests.conftest import MockStarletteRequest
99

1010
from stac_fastapi.api.app import StacApi
11-
from stac_fastapi.elasticsearch.core import CoreCrudClient
12-
from stac_fastapi.elasticsearch.transactions import (
11+
from stac_fastapi.elasticsearch.core import (
1312
BulkTransactionsClient,
13+
CoreCrudClient,
1414
TransactionsClient,
1515
)
1616
from stac_fastapi.extensions.third_party.bulk_transactions import Items

stac_fastapi/elasticsearch/tests/conftest.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,13 @@
88
from stac_fastapi.api.app import StacApi
99
from stac_fastapi.api.models import create_request_model
1010
from stac_fastapi.elasticsearch.config import ElasticsearchSettings
11-
from stac_fastapi.elasticsearch.core import CoreCrudClient
12-
from stac_fastapi.elasticsearch.extensions import QueryExtension
13-
from stac_fastapi.elasticsearch.indexes import IndexesClient
14-
from stac_fastapi.elasticsearch.transactions import (
11+
from stac_fastapi.elasticsearch.core import (
1512
BulkTransactionsClient,
13+
CoreCrudClient,
1614
TransactionsClient,
1715
)
16+
from stac_fastapi.elasticsearch.extensions import QueryExtension
17+
from stac_fastapi.elasticsearch.indexes import IndexesClient
1818
from stac_fastapi.extensions.core import (
1919
ContextExtension,
2020
FieldsExtension,

0 commit comments

Comments
 (0)