From fd01249e1fd35cf3c6f223e7c55bbc90cb0e27a8 Mon Sep 17 00:00:00 2001 From: Kamil Monicz Date: Fri, 6 Jun 2025 14:23:37 +0000 Subject: [PATCH 1/4] Drop requests dependency in favor of httpx --- data_loader.py | 111 +++++++++++++--------------- stac_fastapi/elasticsearch/setup.py | 1 - stac_fastapi/opensearch/setup.py | 1 - 3 files changed, 52 insertions(+), 61 deletions(-) diff --git a/data_loader.py b/data_loader.py index 7d157e40..317ed442 100644 --- a/data_loader.py +++ b/data_loader.py @@ -1,12 +1,14 @@ """Data Loader CLI STAC_API Ingestion Tool.""" + import json import os +from typing import Any import click -import requests +from httpx import Client -def load_data(data_dir, filename): +def load_data(data_dir: str, filename: str) -> dict[str, Any]: """Load json data from a file within the specified data directory.""" filepath = os.path.join(data_dir, filename) if not os.path.exists(filepath): @@ -16,28 +18,25 @@ def load_data(data_dir, filename): return json.load(file) -def load_collection(base_url, collection_id, data_dir): +def load_collection(client: Client, collection_id: str, data_dir: str) -> None: """Load a STAC collection into the database.""" collection = load_data(data_dir, "collection.json") collection["id"] = collection_id - try: - resp = requests.post(f"{base_url}/collections", json=collection) - if resp.status_code == 200 or resp.status_code == 201: - click.echo(f"Status code: {resp.status_code}") - click.echo(f"Added collection: {collection['id']}") - elif resp.status_code == 409: - click.echo(f"Status code: {resp.status_code}") - click.echo(f"Collection: {collection['id']} already exists") - else: - click.echo(f"Status code: {resp.status_code}") - click.echo( - f"Error writing {collection['id']} collection. Message: {resp.text}" - ) - except requests.ConnectionError: - click.secho("Failed to connect", fg="red", err=True) + resp = client.post("/collections", json=collection) + if resp.status_code == 200 or resp.status_code == 201: + click.echo(f"Status code: {resp.status_code}") + click.echo(f"Added collection: {collection['id']}") + elif resp.status_code == 409: + click.echo(f"Status code: {resp.status_code}") + click.echo(f"Collection: {collection['id']} already exists") + else: + click.echo(f"Status code: {resp.status_code}") + click.echo(f"Error writing {collection['id']} collection. Message: {resp.text}") -def load_items(base_url, collection_id, use_bulk, data_dir): +def load_items( + client: Client, collection_id: str, use_bulk: bool, data_dir: str +) -> None: """Load STAC items into the database based on the method selected.""" # Attempt to dynamically find a suitable feature collection file feature_files = [ @@ -52,55 +51,48 @@ def load_items(base_url, collection_id, use_bulk, data_dir): err=True, ) raise click.Abort() - feature_collection_file = feature_files[ - 0 - ] # Use the first found feature collection file - feature_collection = load_data(data_dir, feature_collection_file) - load_collection(base_url, collection_id, data_dir) + # Use the first found feature collection file + feature_collection = load_data(data_dir, feature_files[0]) + + load_collection(client, collection_id, data_dir) if use_bulk: - load_items_bulk_insert(base_url, collection_id, feature_collection, data_dir) + load_items_bulk_insert(client, collection_id, feature_collection) else: - load_items_one_by_one(base_url, collection_id, feature_collection, data_dir) + load_items_one_by_one(client, collection_id, feature_collection) -def load_items_one_by_one(base_url, collection_id, feature_collection, data_dir): +def load_items_one_by_one( + client: Client, collection_id: str, feature_collection: dict[str, Any] +) -> None: """Load STAC items into the database one by one.""" for feature in feature_collection["features"]: - try: - feature["collection"] = collection_id - resp = requests.post( - f"{base_url}/collections/{collection_id}/items", json=feature - ) - if resp.status_code == 200: - click.echo(f"Status code: {resp.status_code}") - click.echo(f"Added item: {feature['id']}") - elif resp.status_code == 409: - click.echo(f"Status code: {resp.status_code}") - click.echo(f"Item: {feature['id']} already exists") - except requests.ConnectionError: - click.secho("Failed to connect", fg="red", err=True) - - -def load_items_bulk_insert(base_url, collection_id, feature_collection, data_dir): - """Load STAC items into the database via bulk insert.""" - try: - for i, _ in enumerate(feature_collection["features"]): - feature_collection["features"][i]["collection"] = collection_id - resp = requests.post( - f"{base_url}/collections/{collection_id}/items", json=feature_collection - ) + feature["collection"] = collection_id + resp = client.post(f"/collections/{collection_id}/items", json=feature) if resp.status_code == 200: click.echo(f"Status code: {resp.status_code}") - click.echo("Bulk inserted items successfully.") - elif resp.status_code == 204: - click.echo(f"Status code: {resp.status_code}") - click.echo("Bulk update successful, no content returned.") + click.echo(f"Added item: {feature['id']}") elif resp.status_code == 409: click.echo(f"Status code: {resp.status_code}") - click.echo("Conflict detected, some items might already exist.") - except requests.ConnectionError: - click.secho("Failed to connect", fg="red", err=True) + click.echo(f"Item: {feature['id']} already exists") + + +def load_items_bulk_insert( + client: Client, collection_id: str, feature_collection: dict[str, Any] +) -> None: + """Load STAC items into the database via bulk insert.""" + for feature in feature_collection["features"]: + feature["collection"] = collection_id + resp = client.post(f"/collections/{collection_id}/items", json=feature_collection) + if resp.status_code == 200: + click.echo(f"Status code: {resp.status_code}") + click.echo("Bulk inserted items successfully.") + elif resp.status_code == 204: + click.echo(f"Status code: {resp.status_code}") + click.echo("Bulk update successful, no content returned.") + elif resp.status_code == 409: + click.echo(f"Status code: {resp.status_code}") + click.echo("Conflict detected, some items might already exist.") @click.command() @@ -117,9 +109,10 @@ def load_items_bulk_insert(base_url, collection_id, feature_collection, data_dir default="sample_data/", help="Directory containing collection.json and feature collection file", ) -def main(base_url, collection_id, use_bulk, data_dir): +def main(base_url: str, collection_id: str, use_bulk: bool, data_dir: str) -> None: """Load STAC items into the database.""" - load_items(base_url, collection_id, use_bulk, data_dir) + with Client(base_url=base_url) as client: + load_items(client, collection_id, use_bulk, data_dir) if __name__ == "__main__": diff --git a/stac_fastapi/elasticsearch/setup.py b/stac_fastapi/elasticsearch/setup.py index eb6b401f..4c0646a0 100644 --- a/stac_fastapi/elasticsearch/setup.py +++ b/stac_fastapi/elasticsearch/setup.py @@ -19,7 +19,6 @@ "pytest-cov~=4.0.0", "pytest-asyncio~=0.21.0", "pre-commit~=3.0.0", - "requests>=2.32.0,<3.0.0", "ciso8601~=2.3.0", "httpx>=0.24.0,<0.28.0", ], diff --git a/stac_fastapi/opensearch/setup.py b/stac_fastapi/opensearch/setup.py index d52dd96c..aad06e58 100644 --- a/stac_fastapi/opensearch/setup.py +++ b/stac_fastapi/opensearch/setup.py @@ -20,7 +20,6 @@ "pytest-cov~=4.0.0", "pytest-asyncio~=0.21.0", "pre-commit~=3.0.0", - "requests>=2.32.0,<3.0.0", "ciso8601~=2.3.0", "httpx>=0.24.0,<0.28.0", ], From 091ccaca2fa2759aea248771f754fb471cf581ae Mon Sep 17 00:00:00 2001 From: Kamil Monicz Date: Fri, 6 Jun 2025 14:24:09 +0000 Subject: [PATCH 2/4] Use orjson in data_loader --- data_loader.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/data_loader.py b/data_loader.py index 317ed442..75ba83ab 100644 --- a/data_loader.py +++ b/data_loader.py @@ -1,10 +1,10 @@ """Data Loader CLI STAC_API Ingestion Tool.""" -import json import os from typing import Any import click +import orjson from httpx import Client @@ -14,8 +14,8 @@ def load_data(data_dir: str, filename: str) -> dict[str, Any]: if not os.path.exists(filepath): click.secho(f"File not found: {filepath}", fg="red", err=True) raise click.Abort() - with open(filepath) as file: - return json.load(file) + with open(filepath, "rb") as file: + return orjson.loads(file.read()) def load_collection(client: Client, collection_id: str, data_dir: str) -> None: From 4eaef844a12659725dab6cf2c710c177569a1cf4 Mon Sep 17 00:00:00 2001 From: Kamil Monicz Date: Fri, 6 Jun 2025 14:34:32 +0000 Subject: [PATCH 3/4] More robust and efficient handling of files in data_loader --- data_loader.py | 39 +++++++++++++++++++++++---------------- 1 file changed, 23 insertions(+), 16 deletions(-) diff --git a/data_loader.py b/data_loader.py index 75ba83ab..dea02dce 100644 --- a/data_loader.py +++ b/data_loader.py @@ -8,19 +8,19 @@ from httpx import Client -def load_data(data_dir: str, filename: str) -> dict[str, Any]: +def load_data(filepath: str) -> dict[str, Any]: """Load json data from a file within the specified data directory.""" - filepath = os.path.join(data_dir, filename) - if not os.path.exists(filepath): + try: + with open(filepath, "rb") as file: + return orjson.loads(file.read()) + except FileNotFoundError as e: click.secho(f"File not found: {filepath}", fg="red", err=True) - raise click.Abort() - with open(filepath, "rb") as file: - return orjson.loads(file.read()) + raise click.Abort() from e def load_collection(client: Client, collection_id: str, data_dir: str) -> None: """Load a STAC collection into the database.""" - collection = load_data(data_dir, "collection.json") + collection = load_data(os.path.join(data_dir, "collection.json")) collection["id"] = collection_id resp = client.post("/collections", json=collection) if resp.status_code == 200 or resp.status_code == 201: @@ -38,13 +38,21 @@ def load_items( client: Client, collection_id: str, use_bulk: bool, data_dir: str ) -> None: """Load STAC items into the database based on the method selected.""" - # Attempt to dynamically find a suitable feature collection file - feature_files = [ - file - for file in os.listdir(data_dir) - if file.endswith(".json") and file != "collection.json" - ] - if not feature_files: + with os.scandir(data_dir) as entries: + # Attempt to dynamically find a suitable feature collection file + # Use the first found feature collection file + feature_file = next( + ( + entry.path + for entry in entries + if entry.is_file() + and entry.name.endswith(".json") + and entry.name != "collection.json" + ), + None, + ) + + if feature_file is None: click.secho( "No feature collection files found in the specified directory.", fg="red", @@ -52,8 +60,7 @@ def load_items( ) raise click.Abort() - # Use the first found feature collection file - feature_collection = load_data(data_dir, feature_files[0]) + feature_collection = load_data(feature_file) load_collection(client, collection_id, data_dir) if use_bulk: From 28eebce925aec2ba3bc134c55a660921d46c2481 Mon Sep 17 00:00:00 2001 From: Kamil Monicz Date: Fri, 6 Jun 2025 18:32:43 +0200 Subject: [PATCH 4/4] Update CHANGELOG to summarize data_loader changes --- CHANGELOG.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6f056e26..0d9df98a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,14 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] +### Changed + +- Optimize data_loader.py script [#395](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/395) + +### Removed + +- Removed `requests` dev dependency [#395](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/395) + ## [v5.0.0a1] - 2025-05-30 ### Changed