39
39
":" ,
40
40
}
41
41
42
- DEFAULT_INDICES = f"*,-*kibana*,-{ COLLECTIONS_INDEX } "
42
+ ITEM_INDICES = f"{ ITEMS_INDEX_PREFIX } *,-*kibana*,-{ COLLECTIONS_INDEX } * "
43
43
44
44
DEFAULT_SORT = {
45
45
"properties.datetime" : {"order" : "desc" },
@@ -164,7 +164,7 @@ def indices(collection_ids: Optional[List[str]]) -> str:
164
164
A string of comma-separated index names. If `collection_ids` is None, returns the default indices.
165
165
"""
166
166
if collection_ids is None :
167
- return DEFAULT_INDICES
167
+ return ITEM_INDICES
168
168
else :
169
169
return "," .join ([index_by_collection_id (c ) for c in collection_ids ])
170
170
@@ -178,7 +178,8 @@ async def create_collection_index() -> None:
178
178
client = AsyncElasticsearchSettings ().create_client
179
179
180
180
await client .indices .create (
181
- index = COLLECTIONS_INDEX ,
181
+ index = f"{ COLLECTIONS_INDEX } -000001" ,
182
+ aliases = {COLLECTIONS_INDEX : {}},
182
183
mappings = ES_COLLECTIONS_MAPPINGS ,
183
184
ignore = 400 , # ignore 400 already exists code
184
185
)
@@ -197,9 +198,11 @@ async def create_item_index(collection_id: str):
197
198
198
199
"""
199
200
client = AsyncElasticsearchSettings ().create_client
201
+ index_name = index_by_collection_id (collection_id )
200
202
201
203
await client .indices .create (
202
- index = index_by_collection_id (collection_id ),
204
+ index = f"{ index_by_collection_id (collection_id )} -000001" ,
205
+ aliases = {index_name : {}},
203
206
mappings = ES_ITEMS_MAPPINGS ,
204
207
settings = ES_ITEMS_SETTINGS ,
205
208
ignore = 400 , # ignore 400 already exists code
@@ -215,7 +218,14 @@ async def delete_item_index(collection_id: str):
215
218
"""
216
219
client = AsyncElasticsearchSettings ().create_client
217
220
218
- await client .indices .delete (index = index_by_collection_id (collection_id ))
221
+ name = index_by_collection_id (collection_id )
222
+ resolved = await client .indices .resolve_index (name = name )
223
+ if "aliases" in resolved and resolved ["aliases" ]:
224
+ [alias ] = resolved ["aliases" ]
225
+ await client .indices .delete_alias (index = alias ["indices" ], name = alias ["name" ])
226
+ await client .indices .delete (index = alias ["indices" ])
227
+ else :
228
+ await client .indices .delete (index = name )
219
229
await client .close ()
220
230
221
231
@@ -773,14 +783,11 @@ async def bulk_async(
773
783
`mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to True, the
774
784
index is refreshed after the bulk insert. The function does not return any value.
775
785
"""
776
- await asyncio .get_event_loop ().run_in_executor (
777
- None ,
778
- lambda : helpers .bulk (
779
- self .sync_client ,
780
- mk_actions (collection_id , processed_items ),
781
- refresh = refresh ,
782
- raise_on_error = False ,
783
- ),
786
+ await helpers .async_bulk (
787
+ self .client ,
788
+ mk_actions (collection_id , processed_items ),
789
+ refresh = refresh ,
790
+ raise_on_error = False ,
784
791
)
785
792
786
793
def bulk_sync (
@@ -811,7 +818,7 @@ def bulk_sync(
811
818
async def delete_items (self ) -> None :
812
819
"""Danger. this is only for tests."""
813
820
await self .client .delete_by_query (
814
- index = DEFAULT_INDICES ,
821
+ index = ITEM_INDICES ,
815
822
body = {"query" : {"match_all" : {}}},
816
823
wait_for_completion = True ,
817
824
)
0 commit comments