10
10
from overrides import overrides
11
11
12
12
from stac_fastapi .elasticsearch .config import ElasticsearchSettings
13
- from stac_fastapi .elasticsearch .core import COLLECTIONS_INDEX , ITEMS_INDEX
13
+ from stac_fastapi .elasticsearch .core import COLLECTIONS_INDEX , ITEMS_INDEX , mk_item_id
14
14
from stac_fastapi .elasticsearch .serializers import CollectionSerializer , ItemSerializer
15
15
from stac_fastapi .elasticsearch .session import Session
16
16
from stac_fastapi .extensions .third_party .bulk_transactions import (
@@ -42,29 +42,39 @@ def create_item(self, item: stac_types.Item, **kwargs) -> stac_types.Item:
42
42
if item ["type" ] == "FeatureCollection" :
43
43
bulk_client = BulkTransactionsClient ()
44
44
processed_items = [
45
- bulk_client ._preprocess_item (item , base_url )
46
- for item in item ["features" ]
45
+ bulk_client .preprocess_item (item , base_url ) for item in item ["features" ]
47
46
]
48
47
return_msg = f"Successfully added { len (processed_items )} items."
49
48
bulk_client .bulk_sync (processed_items )
50
49
51
50
return return_msg
52
-
53
- # If a single item is posted
54
- if not self .client .exists (index = COLLECTIONS_INDEX , id = item ["collection" ]):
55
- raise ForeignKeyError (f"Collection { item ['collection' ]} does not exist" )
56
-
57
- if self .client .exists (index = ITEMS_INDEX , id = item ["id" ]):
58
- raise ConflictError (
59
- f"Item { item ['id' ]} in collection { item ['collection' ]} already exists"
51
+ else :
52
+ # TODO
53
+ if self .client .exists (
54
+ index = ITEMS_INDEX , id = mk_item_id (item ["id" ], item ["collection" ])
55
+ ):
56
+ raise ConflictError (
57
+ f"Item { item ['id' ]} in collection { item ['collection' ]} already exists"
58
+ )
59
+
60
+ # todo: check if collection exists, but cache
61
+ if not self .client .exists (index = COLLECTIONS_INDEX , id = item ["collection" ]):
62
+ raise ForeignKeyError (f"Collection { item ['collection' ]} does not exist" )
63
+
64
+ item = BulkTransactionsClient ().preprocess_item (item , base_url )
65
+
66
+ es_resp = self .client .index (
67
+ index = ITEMS_INDEX ,
68
+ id = mk_item_id (item ["id" ], item ["collection" ]),
69
+ document = item ,
60
70
)
61
71
62
- data = ItemSerializer .stac_to_db (item , base_url )
72
+ if (meta := es_resp .get ("meta" )) and meta .get ("status" ) == 409 :
73
+ raise ConflictError (
74
+ f"Item { item ['id' ]} in collection { item ['collection' ]} already exists"
75
+ )
63
76
64
- self .client .index (
65
- index = ITEMS_INDEX , doc_type = "_doc" , id = item ["id" ], document = data
66
- )
67
- return ItemSerializer .db_to_stac (item , base_url )
77
+ return item
68
78
69
79
@overrides
70
80
def update_item (self , item : stac_types .Item , ** kwargs ) -> stac_types .Item :
@@ -75,14 +85,11 @@ def update_item(self, item: stac_types.Item, **kwargs) -> stac_types.Item:
75
85
76
86
if not self .client .exists (index = COLLECTIONS_INDEX , id = item ["collection" ]):
77
87
raise ForeignKeyError (f"Collection { item ['collection' ]} does not exist" )
78
- if not self .client .exists (index = ITEMS_INDEX , id = item ["id" ]):
79
- raise NotFoundError (
80
- f"Item { item ['id' ]} in collection { item ['collection' ]} doesn't exist"
81
- )
88
+
89
+ # todo: index instead of delete and create
82
90
self .delete_item (item ["id" ], item ["collection" ])
83
91
self .create_item (item , ** kwargs )
84
- # self.client.update(index=ITEMS_INDEX,doc_type='_doc',id=model["id"],
85
- # body=model)
92
+ # self.client.update(index=ITEMS_INDEX,id=item["id"], body=item)
86
93
return ItemSerializer .db_to_stac (item , base_url )
87
94
88
95
@overrides
@@ -91,10 +98,12 @@ def delete_item(
91
98
) -> stac_types .Item :
92
99
"""Delete item."""
93
100
try :
94
- _ = self .client .get (index = ITEMS_INDEX , id = item_id )
101
+ self .client .delete (index = ITEMS_INDEX , id = mk_item_id ( item_id , collection_id ) )
95
102
except elasticsearch .exceptions .NotFoundError :
96
- raise NotFoundError (f"Item { item_id } not found" )
97
- self .client .delete (index = ITEMS_INDEX , doc_type = "_doc" , id = item_id )
103
+ raise NotFoundError (
104
+ f"Item { item_id } in collection { collection_id } not found"
105
+ )
106
+ return None
98
107
99
108
@overrides
100
109
def create_collection (
@@ -109,9 +118,9 @@ def create_collection(
109
118
110
119
if self .client .exists (index = COLLECTIONS_INDEX , id = collection ["id" ]):
111
120
raise ConflictError (f"Collection { collection ['id' ]} already exists" )
121
+
112
122
self .client .index (
113
123
index = COLLECTIONS_INDEX ,
114
- doc_type = "_doc" ,
115
124
id = collection ["id" ],
116
125
document = collection ,
117
126
)
@@ -139,7 +148,8 @@ def delete_collection(self, collection_id: str, **kwargs) -> stac_types.Collecti
139
148
_ = self .client .get (index = COLLECTIONS_INDEX , id = collection_id )
140
149
except elasticsearch .exceptions .NotFoundError :
141
150
raise NotFoundError (f"Collection { collection_id } not found" )
142
- self .client .delete (index = COLLECTIONS_INDEX , doc_type = "_doc" , id = collection_id )
151
+ self .client .delete (index = COLLECTIONS_INDEX , id = collection_id )
152
+ return None
143
153
144
154
145
155
@attr .s
@@ -153,38 +163,45 @@ def __attrs_post_init__(self):
153
163
settings = ElasticsearchSettings ()
154
164
self .client = settings .create_client
155
165
156
- def _preprocess_item (self , model : stac_types .Item , base_url ) -> stac_types .Item :
166
+ def preprocess_item (self , item : stac_types .Item , base_url ) -> stac_types .Item :
157
167
"""Preprocess items to match data model."""
158
- if not self .client .exists (index = COLLECTIONS_INDEX , id = model ["collection" ]):
159
- raise ForeignKeyError (f"Collection { model ['collection' ]} does not exist" )
168
+ if not self .client .exists (index = COLLECTIONS_INDEX , id = item ["collection" ]):
169
+ raise ForeignKeyError (f"Collection { item ['collection' ]} does not exist" )
160
170
161
- if self .client .exists (index = ITEMS_INDEX , id = model ["id" ]):
171
+ if self .client .exists (index = ITEMS_INDEX , id = item ["id" ]):
162
172
raise ConflictError (
163
- f"Item { model ['id' ]} in collection { model ['collection' ]} already exists"
173
+ f"Item { item ['id' ]} in collection { item ['collection' ]} already exists"
164
174
)
165
175
166
- item = ItemSerializer .stac_to_db (model , base_url )
167
- return item
176
+ return ItemSerializer .stac_to_db (item , base_url )
168
177
169
178
def bulk_sync (self , processed_items ):
170
179
"""Elasticsearch bulk insertion."""
171
- actions = [{"_index" : ITEMS_INDEX , "_source" : item } for item in processed_items ]
180
+ actions = [
181
+ {
182
+ "_index" : ITEMS_INDEX ,
183
+ "_id" : mk_item_id (item ["id" ], item ["collection" ]),
184
+ "_source" : item ,
185
+ }
186
+ for item in processed_items
187
+ ]
172
188
helpers .bulk (self .client , actions )
173
189
174
190
@overrides
175
191
def bulk_item_insert (
176
192
self , items : Items , chunk_size : Optional [int ] = None , ** kwargs
177
193
) -> str :
178
194
"""Bulk item insertion using es."""
179
- try :
180
- base_url = str (kwargs ["request" ].base_url )
181
- except Exception :
195
+ request = kwargs .get ("request" )
196
+ if request :
197
+ base_url = str (request .base_url )
198
+ else :
182
199
base_url = ""
200
+
183
201
processed_items = [
184
- self ._preprocess_item (item , base_url ) for item in items .items .values ()
202
+ self .preprocess_item (item , base_url ) for item in items .items .values ()
185
203
]
186
- return_msg = f"Successfully added { len (processed_items )} items."
187
204
188
205
self .bulk_sync (processed_items )
189
206
190
- return return_msg
207
+ return f"Successfully added { len ( processed_items ) } Items."
0 commit comments