diff --git a/CHANGELOG.md b/CHANGELOG.md index 28088184..c99c10f4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. - Examples folder with example docker setup for running sfes from pip [#147](https://github.com/stac-utils/stac-fastapi-elasticsearch/pull/147) ### Changed + +- Use aliases on Elasticsearch indices, add number suffix in index name. [#152](https://github.com/stac-utils/stac-fastapi-elasticsearch/pull/152) + ### Fixed - Corrected the closing of client connections in ES index management functions [#132](https://github.com/stac-utils/stac-fastapi-elasticsearch/issues/132) diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py index e59f4041..273947d5 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py @@ -39,7 +39,7 @@ ":", } -DEFAULT_INDICES = f"*,-*kibana*,-{COLLECTIONS_INDEX}" +ITEM_INDICES = f"{ITEMS_INDEX_PREFIX}*,-*kibana*,-{COLLECTIONS_INDEX}*" DEFAULT_SORT = { "properties.datetime": {"order": "desc"}, @@ -164,7 +164,7 @@ def indices(collection_ids: Optional[List[str]]) -> str: A string of comma-separated index names. If `collection_ids` is None, returns the default indices. """ if collection_ids is None: - return DEFAULT_INDICES + return ITEM_INDICES else: return ",".join([index_by_collection_id(c) for c in collection_ids]) @@ -178,7 +178,8 @@ async def create_collection_index() -> None: client = AsyncElasticsearchSettings().create_client await client.indices.create( - index=COLLECTIONS_INDEX, + index=f"{COLLECTIONS_INDEX}-000001", + aliases={COLLECTIONS_INDEX: {}}, mappings=ES_COLLECTIONS_MAPPINGS, ignore=400, # ignore 400 already exists code ) @@ -197,9 +198,11 @@ async def create_item_index(collection_id: str): """ client = AsyncElasticsearchSettings().create_client + index_name = index_by_collection_id(collection_id) await client.indices.create( - index=index_by_collection_id(collection_id), + index=f"{index_by_collection_id(collection_id)}-000001", + aliases={index_name: {}}, mappings=ES_ITEMS_MAPPINGS, settings=ES_ITEMS_SETTINGS, ignore=400, # ignore 400 already exists code @@ -215,7 +218,14 @@ async def delete_item_index(collection_id: str): """ client = AsyncElasticsearchSettings().create_client - await client.indices.delete(index=index_by_collection_id(collection_id)) + name = index_by_collection_id(collection_id) + resolved = await client.indices.resolve_index(name=name) + if "aliases" in resolved and resolved["aliases"]: + [alias] = resolved["aliases"] + await client.indices.delete_alias(index=alias["indices"], name=alias["name"]) + await client.indices.delete(index=alias["indices"]) + else: + await client.indices.delete(index=name) await client.close() @@ -773,14 +783,11 @@ async def bulk_async( `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to True, the index is refreshed after the bulk insert. The function does not return any value. """ - await asyncio.get_event_loop().run_in_executor( - None, - lambda: helpers.bulk( - self.sync_client, - mk_actions(collection_id, processed_items), - refresh=refresh, - raise_on_error=False, - ), + await helpers.async_bulk( + self.client, + mk_actions(collection_id, processed_items), + refresh=refresh, + raise_on_error=False, ) def bulk_sync( @@ -811,7 +818,7 @@ def bulk_sync( async def delete_items(self) -> None: """Danger. this is only for tests.""" await self.client.delete_by_query( - index=DEFAULT_INDICES, + index=ITEM_INDICES, body={"query": {"match_all": {}}}, wait_for_completion=True, )