10
10
from overrides import overrides
11
11
12
12
from stac_fastapi .elasticsearch .config import ElasticsearchSettings
13
- from stac_fastapi .elasticsearch .core import COLLECTIONS_INDEX , ITEMS_INDEX
13
+ from stac_fastapi .elasticsearch .core import COLLECTIONS_INDEX , ITEMS_INDEX , mk_item_id
14
14
from stac_fastapi .elasticsearch .serializers import CollectionSerializer , ItemSerializer
15
15
from stac_fastapi .elasticsearch .session import Session
16
16
from stac_fastapi .extensions .third_party .bulk_transactions import (
@@ -42,29 +42,38 @@ def create_item(self, item: stac_types.Item, **kwargs) -> stac_types.Item:
42
42
if item ["type" ] == "FeatureCollection" :
43
43
bulk_client = BulkTransactionsClient ()
44
44
processed_items = [
45
- bulk_client ._preprocess_item (item , base_url )
46
- for item in item ["features" ]
45
+ bulk_client .preprocess_item (item , base_url ) for item in item ["features" ]
47
46
]
48
47
return_msg = f"Successfully added { len (processed_items )} items."
49
48
bulk_client .bulk_sync (processed_items )
50
49
51
50
return return_msg
52
-
53
- # If a single item is posted
54
- if not self .client .exists (index = COLLECTIONS_INDEX , id = item ["collection" ]):
55
- raise ForeignKeyError (f"Collection { item ['collection' ]} does not exist" )
56
-
57
- if self .client .exists (index = ITEMS_INDEX , id = item ["id" ]):
58
- raise ConflictError (
59
- f"Item { item ['id' ]} in collection { item ['collection' ]} already exists"
51
+ else :
52
+ # todo: check if collection exists, but cache
53
+ if not self .client .exists (index = COLLECTIONS_INDEX , id = item ["collection" ]):
54
+ raise ForeignKeyError (f"Collection { item ['collection' ]} does not exist" )
55
+
56
+ if self .client .exists (
57
+ index = ITEMS_INDEX , id = mk_item_id (item ["id" ], item ["collection" ])
58
+ ):
59
+ raise ConflictError (
60
+ f"Item { item ['id' ]} in collection { item ['collection' ]} already exists"
61
+ )
62
+
63
+ item = BulkTransactionsClient ().preprocess_item (item , base_url )
64
+
65
+ es_resp = self .client .index (
66
+ index = ITEMS_INDEX ,
67
+ id = mk_item_id (item ["id" ], item ["collection" ]),
68
+ document = item ,
60
69
)
61
70
62
- data = ItemSerializer .stac_to_db (item , base_url )
71
+ if (meta := es_resp .get ("meta" )) and meta .get ("status" ) == 409 :
72
+ raise ConflictError (
73
+ f"Item { item ['id' ]} in collection { item ['collection' ]} already exists"
74
+ )
63
75
64
- self .client .index (
65
- index = ITEMS_INDEX , doc_type = "_doc" , id = item ["id" ], document = data
66
- )
67
- return ItemSerializer .db_to_stac (item , base_url )
76
+ return item
68
77
69
78
@overrides
70
79
def update_item (self , item : stac_types .Item , ** kwargs ) -> stac_types .Item :
@@ -75,14 +84,11 @@ def update_item(self, item: stac_types.Item, **kwargs) -> stac_types.Item:
75
84
76
85
if not self .client .exists (index = COLLECTIONS_INDEX , id = item ["collection" ]):
77
86
raise ForeignKeyError (f"Collection { item ['collection' ]} does not exist" )
78
- if not self .client .exists (index = ITEMS_INDEX , id = item ["id" ]):
79
- raise NotFoundError (
80
- f"Item { item ['id' ]} in collection { item ['collection' ]} doesn't exist"
81
- )
87
+
88
+ # todo: index instead of delete and create
82
89
self .delete_item (item ["id" ], item ["collection" ])
83
90
self .create_item (item , ** kwargs )
84
- # self.client.update(index=ITEMS_INDEX,doc_type='_doc',id=model["id"],
85
- # body=model)
91
+ # self.client.update(index=ITEMS_INDEX,id=item["id"], body=item)
86
92
return ItemSerializer .db_to_stac (item , base_url )
87
93
88
94
@overrides
@@ -91,10 +97,12 @@ def delete_item(
91
97
) -> stac_types .Item :
92
98
"""Delete item."""
93
99
try :
94
- _ = self .client .get (index = ITEMS_INDEX , id = item_id )
100
+ self .client .delete (index = ITEMS_INDEX , id = mk_item_id ( item_id , collection_id ) )
95
101
except elasticsearch .exceptions .NotFoundError :
96
- raise NotFoundError (f"Item { item_id } not found" )
97
- self .client .delete (index = ITEMS_INDEX , doc_type = "_doc" , id = item_id )
102
+ raise NotFoundError (
103
+ f"Item { item_id } in collection { collection_id } not found"
104
+ )
105
+ return None
98
106
99
107
@overrides
100
108
def create_collection (
@@ -109,9 +117,9 @@ def create_collection(
109
117
110
118
if self .client .exists (index = COLLECTIONS_INDEX , id = collection ["id" ]):
111
119
raise ConflictError (f"Collection { collection ['id' ]} already exists" )
120
+
112
121
self .client .index (
113
122
index = COLLECTIONS_INDEX ,
114
- doc_type = "_doc" ,
115
123
id = collection ["id" ],
116
124
document = collection ,
117
125
)
@@ -139,7 +147,8 @@ def delete_collection(self, collection_id: str, **kwargs) -> stac_types.Collecti
139
147
_ = self .client .get (index = COLLECTIONS_INDEX , id = collection_id )
140
148
except elasticsearch .exceptions .NotFoundError :
141
149
raise NotFoundError (f"Collection { collection_id } not found" )
142
- self .client .delete (index = COLLECTIONS_INDEX , doc_type = "_doc" , id = collection_id )
150
+ self .client .delete (index = COLLECTIONS_INDEX , id = collection_id )
151
+ return None
143
152
144
153
145
154
@attr .s
@@ -153,38 +162,45 @@ def __attrs_post_init__(self):
153
162
settings = ElasticsearchSettings ()
154
163
self .client = settings .create_client
155
164
156
- def _preprocess_item (self , model : stac_types .Item , base_url ) -> stac_types .Item :
165
+ def preprocess_item (self , item : stac_types .Item , base_url ) -> stac_types .Item :
157
166
"""Preprocess items to match data model."""
158
- if not self .client .exists (index = COLLECTIONS_INDEX , id = model ["collection" ]):
159
- raise ForeignKeyError (f"Collection { model ['collection' ]} does not exist" )
167
+ if not self .client .exists (index = COLLECTIONS_INDEX , id = item ["collection" ]):
168
+ raise ForeignKeyError (f"Collection { item ['collection' ]} does not exist" )
160
169
161
- if self .client .exists (index = ITEMS_INDEX , id = model ["id" ]):
170
+ if self .client .exists (index = ITEMS_INDEX , id = item ["id" ]):
162
171
raise ConflictError (
163
- f"Item { model ['id' ]} in collection { model ['collection' ]} already exists"
172
+ f"Item { item ['id' ]} in collection { item ['collection' ]} already exists"
164
173
)
165
174
166
- item = ItemSerializer .stac_to_db (model , base_url )
167
- return item
175
+ return ItemSerializer .stac_to_db (item , base_url )
168
176
169
177
def bulk_sync (self , processed_items ):
170
178
"""Elasticsearch bulk insertion."""
171
- actions = [{"_index" : ITEMS_INDEX , "_source" : item } for item in processed_items ]
179
+ actions = [
180
+ {
181
+ "_index" : ITEMS_INDEX ,
182
+ "_id" : mk_item_id (item ["id" ], item ["collection" ]),
183
+ "_source" : item ,
184
+ }
185
+ for item in processed_items
186
+ ]
172
187
helpers .bulk (self .client , actions )
173
188
174
189
@overrides
175
190
def bulk_item_insert (
176
191
self , items : Items , chunk_size : Optional [int ] = None , ** kwargs
177
192
) -> str :
178
193
"""Bulk item insertion using es."""
179
- try :
180
- base_url = str (kwargs ["request" ].base_url )
181
- except Exception :
194
+ request = kwargs .get ("request" )
195
+ if request :
196
+ base_url = str (request .base_url )
197
+ else :
182
198
base_url = ""
199
+
183
200
processed_items = [
184
- self ._preprocess_item (item , base_url ) for item in items .items .values ()
201
+ self .preprocess_item (item , base_url ) for item in items .items .values ()
185
202
]
186
- return_msg = f"Successfully added { len (processed_items )} items."
187
203
188
204
self .bulk_sync (processed_items )
189
205
190
- return return_msg
206
+ return f"Successfully added { len ( processed_items ) } Items."
0 commit comments