|
2 | 2 |
|
3 | 3 | import logging
|
4 | 4 | from datetime import datetime, timezone
|
| 5 | +from typing import Optional |
5 | 6 |
|
6 | 7 | import attr
|
7 | 8 | import elasticsearch
|
@@ -104,108 +105,118 @@ class TransactionsClient(BaseTransactionsClient):
|
104 | 105 | },
|
105 | 106 | }
|
106 | 107 |
|
107 |
| - def _create_item_index(self): |
| 108 | + def create_item_index(self): |
| 109 | + """Create the index for Items.""" |
108 | 110 | self.client.indices.create(
|
109 | 111 | index="stac_items",
|
110 |
| - body=self.mappings, |
| 112 | + body={"mappings": self.mappings}, |
111 | 113 | ignore=400, # ignore 400 already exists code
|
112 | 114 | )
|
113 | 115 |
|
114 | 116 | @overrides
|
115 |
| - def create_item(self, model: stac_types.Item, **kwargs): |
| 117 | + def create_item(self, item: stac_types.Item, **kwargs) -> stac_types.Item: |
116 | 118 | """Create item."""
|
117 | 119 | base_url = str(kwargs["request"].base_url)
|
118 |
| - self._create_item_index() |
| 120 | + self.create_item_index() |
119 | 121 |
|
120 | 122 | # If a feature collection is posted
|
121 |
| - if model["type"] == "FeatureCollection": |
| 123 | + if item["type"] == "FeatureCollection": |
122 | 124 | bulk_client = BulkTransactionsClient()
|
123 | 125 | processed_items = [
|
124 | 126 | bulk_client._preprocess_item(item, base_url)
|
125 |
| - for item in model["features"] |
| 127 | + for item in item["features"] |
126 | 128 | ]
|
127 | 129 | return_msg = f"Successfully added {len(processed_items)} items."
|
128 | 130 | bulk_client.bulk_sync(processed_items)
|
129 | 131 |
|
130 | 132 | return return_msg
|
131 | 133 |
|
132 | 134 | # If a single item is posted
|
133 |
| - if not self.client.exists(index="stac_collections", id=model["collection"]): |
134 |
| - raise ForeignKeyError(f"Collection {model['collection']} does not exist") |
| 135 | + if not self.client.exists(index="stac_collections", id=item["collection"]): |
| 136 | + raise ForeignKeyError(f"Collection {item['collection']} does not exist") |
135 | 137 |
|
136 |
| - if self.client.exists(index="stac_items", id=model["id"]): |
| 138 | + if self.client.exists(index="stac_items", id=item["id"]): |
137 | 139 | raise ConflictError(
|
138 |
| - f"Item {model['id']} in collection {model['collection']} already exists" |
| 140 | + f"Item {item['id']} in collection {item['collection']} already exists" |
139 | 141 | )
|
140 | 142 |
|
141 |
| - data = ItemSerializer.stac_to_db(model, base_url) |
| 143 | + data = ItemSerializer.stac_to_db(item, base_url) |
142 | 144 |
|
143 | 145 | self.client.index(
|
144 |
| - index="stac_items", doc_type="_doc", id=model["id"], document=data |
| 146 | + index="stac_items", doc_type="_doc", id=item["id"], document=data |
145 | 147 | )
|
146 |
| - return ItemSerializer.db_to_stac(model, base_url) |
| 148 | + return ItemSerializer.db_to_stac(item, base_url) |
147 | 149 |
|
148 | 150 | @overrides
|
149 |
| - def create_collection(self, model: stac_types.Collection, **kwargs): |
150 |
| - """Create collection.""" |
151 |
| - base_url = str(kwargs["request"].base_url) |
152 |
| - collection_links = CollectionLinks( |
153 |
| - collection_id=model["id"], base_url=base_url |
154 |
| - ).create_links() |
155 |
| - model["links"] = collection_links |
156 |
| - |
157 |
| - self._create_item_index() |
158 |
| - |
159 |
| - if self.client.exists(index="stac_collections", id=model["id"]): |
160 |
| - raise ConflictError(f"Collection {model['id']} already exists") |
161 |
| - self.client.index( |
162 |
| - index="stac_collections", doc_type="_doc", id=model["id"], document=model |
163 |
| - ) |
164 |
| - return CollectionSerializer.db_to_stac(model, base_url) |
165 |
| - |
166 |
| - @overrides |
167 |
| - def update_item(self, model: stac_types.Item, **kwargs): |
| 151 | + def update_item(self, item: stac_types.Item, **kwargs) -> stac_types.Item: |
168 | 152 | """Update item."""
|
169 | 153 | base_url = str(kwargs["request"].base_url)
|
170 | 154 | now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
|
171 |
| - model["properties"]["updated"] = str(now) |
| 155 | + item["properties"]["updated"] = str(now) |
172 | 156 |
|
173 |
| - if not self.client.exists(index="stac_collections", id=model["collection"]): |
174 |
| - raise ForeignKeyError(f"Collection {model['collection']} does not exist") |
175 |
| - if not self.client.exists(index="stac_items", id=model["id"]): |
| 157 | + if not self.client.exists(index="stac_collections", id=item["collection"]): |
| 158 | + raise ForeignKeyError(f"Collection {item['collection']} does not exist") |
| 159 | + if not self.client.exists(index="stac_items", id=item["id"]): |
176 | 160 | raise NotFoundError(
|
177 |
| - f"Item {model['id']} in collection {model['collection']} doesn't exist" |
| 161 | + f"Item {item['id']} in collection {item['collection']} doesn't exist" |
178 | 162 | )
|
179 |
| - self.delete_item(model["id"], model["collection"]) |
180 |
| - self.create_item(model, **kwargs) |
| 163 | + self.delete_item(item["id"], item["collection"]) |
| 164 | + self.create_item(item, **kwargs) |
181 | 165 | # self.client.update(index="stac_items",doc_type='_doc',id=model["id"],
|
182 | 166 | # body=model)
|
183 |
| - return ItemSerializer.db_to_stac(model, base_url) |
| 167 | + return ItemSerializer.db_to_stac(item, base_url) |
184 | 168 |
|
185 | 169 | @overrides
|
186 |
| - def update_collection(self, model: stac_types.Collection, **kwargs): |
187 |
| - """Update collection.""" |
188 |
| - base_url = str(kwargs["request"].base_url) |
| 170 | + def delete_item( |
| 171 | + self, item_id: str, collection_id: str, **kwargs |
| 172 | + ) -> stac_types.Item: |
| 173 | + """Delete item.""" |
189 | 174 | try:
|
190 |
| - _ = self.client.get(index="stac_collections", id=model["id"]) |
| 175 | + _ = self.client.get(index="stac_items", id=item_id) |
191 | 176 | except elasticsearch.exceptions.NotFoundError:
|
192 |
| - raise NotFoundError(f"Collection {model['id']} not found") |
193 |
| - self.delete_collection(model["id"]) |
194 |
| - self.create_collection(model, **kwargs) |
| 177 | + raise NotFoundError(f"Item {item_id} not found") |
| 178 | + self.client.delete(index="stac_items", doc_type="_doc", id=item_id) |
| 179 | + |
| 180 | + @overrides |
| 181 | + def create_collection( |
| 182 | + self, collection: stac_types.Collection, **kwargs |
| 183 | + ) -> stac_types.Collection: |
| 184 | + """Create collection.""" |
| 185 | + base_url = str(kwargs["request"].base_url) |
| 186 | + collection_links = CollectionLinks( |
| 187 | + collection_id=collection["id"], base_url=base_url |
| 188 | + ).create_links() |
| 189 | + collection["links"] = collection_links |
195 | 190 |
|
196 |
| - return CollectionSerializer.db_to_stac(model, base_url) |
| 191 | + self.create_item_index() |
| 192 | + |
| 193 | + if self.client.exists(index="stac_collections", id=collection["id"]): |
| 194 | + raise ConflictError(f"Collection {collection['id']} already exists") |
| 195 | + self.client.index( |
| 196 | + index="stac_collections", |
| 197 | + doc_type="_doc", |
| 198 | + id=collection["id"], |
| 199 | + document=collection, |
| 200 | + ) |
| 201 | + return CollectionSerializer.db_to_stac(collection, base_url) |
197 | 202 |
|
198 | 203 | @overrides
|
199 |
| - def delete_item(self, item_id: str, collection_id: str, **kwargs): |
200 |
| - """Delete item.""" |
| 204 | + def update_collection( |
| 205 | + self, collection: stac_types.Collection, **kwargs |
| 206 | + ) -> stac_types.Collection: |
| 207 | + """Update collection.""" |
| 208 | + base_url = str(kwargs["request"].base_url) |
201 | 209 | try:
|
202 |
| - _ = self.client.get(index="stac_items", id=item_id) |
| 210 | + _ = self.client.get(index="stac_collections", id=collection["id"]) |
203 | 211 | except elasticsearch.exceptions.NotFoundError:
|
204 |
| - raise NotFoundError(f"Item {item_id} not found") |
205 |
| - self.client.delete(index="stac_items", doc_type="_doc", id=item_id) |
| 212 | + raise NotFoundError(f"Collection {collection['id']} not found") |
| 213 | + self.delete_collection(collection["id"]) |
| 214 | + self.create_collection(collection, **kwargs) |
| 215 | + |
| 216 | + return CollectionSerializer.db_to_stac(collection, base_url) |
206 | 217 |
|
207 | 218 | @overrides
|
208 |
| - def delete_collection(self, collection_id: str, **kwargs): |
| 219 | + def delete_collection(self, collection_id: str, **kwargs) -> stac_types.Collection: |
209 | 220 | """Delete collection."""
|
210 | 221 | try:
|
211 | 222 | _ = self.client.get(index="stac_collections", id=collection_id)
|
@@ -246,10 +257,12 @@ def bulk_sync(self, processed_items):
|
246 | 257 | helpers.bulk(self.client, actions)
|
247 | 258 |
|
248 | 259 | @overrides
|
249 |
| - def bulk_item_insert(self, items: Items, **kwargs) -> str: |
| 260 | + def bulk_item_insert( |
| 261 | + self, items: Items, chunk_size: Optional[int] = None, **kwargs |
| 262 | + ) -> str: |
250 | 263 | """Bulk item insertion using es."""
|
251 | 264 | transactions_client = TransactionsClient()
|
252 |
| - transactions_client._create_item_index() |
| 265 | + transactions_client.create_item_index() |
253 | 266 | try:
|
254 | 267 | base_url = str(kwargs["request"].base_url)
|
255 | 268 | except Exception:
|
|
0 commit comments