Skip to content

Drop requests dependency and optimize data_loader #395

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,14 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

## [Unreleased]

### Changed

- Optimize data_loader.py script [#395](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/395)

### Removed

- Removed `requests` dev dependency [#395](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/395)

## [v5.0.0a1] - 2025-05-30

### Changed
Expand Down
146 changes: 73 additions & 73 deletions data_loader.py
Original file line number Diff line number Diff line change
@@ -1,106 +1,105 @@
"""Data Loader CLI STAC_API Ingestion Tool."""
import json

import os
from typing import Any

import click
import requests
import orjson
from httpx import Client


def load_data(data_dir, filename):
def load_data(filepath: str) -> dict[str, Any]:
"""Load json data from a file within the specified data directory."""
filepath = os.path.join(data_dir, filename)
if not os.path.exists(filepath):
try:
with open(filepath, "rb") as file:
return orjson.loads(file.read())
except FileNotFoundError as e:
click.secho(f"File not found: {filepath}", fg="red", err=True)
raise click.Abort()
with open(filepath) as file:
return json.load(file)
raise click.Abort() from e


def load_collection(base_url, collection_id, data_dir):
def load_collection(client: Client, collection_id: str, data_dir: str) -> None:
"""Load a STAC collection into the database."""
collection = load_data(data_dir, "collection.json")
collection = load_data(os.path.join(data_dir, "collection.json"))
collection["id"] = collection_id
try:
resp = requests.post(f"{base_url}/collections", json=collection)
if resp.status_code == 200 or resp.status_code == 201:
click.echo(f"Status code: {resp.status_code}")
click.echo(f"Added collection: {collection['id']}")
elif resp.status_code == 409:
click.echo(f"Status code: {resp.status_code}")
click.echo(f"Collection: {collection['id']} already exists")
else:
click.echo(f"Status code: {resp.status_code}")
click.echo(
f"Error writing {collection['id']} collection. Message: {resp.text}"
)
except requests.ConnectionError:
click.secho("Failed to connect", fg="red", err=True)
resp = client.post("/collections", json=collection)
if resp.status_code == 200 or resp.status_code == 201:
click.echo(f"Status code: {resp.status_code}")
click.echo(f"Added collection: {collection['id']}")
elif resp.status_code == 409:
click.echo(f"Status code: {resp.status_code}")
click.echo(f"Collection: {collection['id']} already exists")
else:
click.echo(f"Status code: {resp.status_code}")
click.echo(f"Error writing {collection['id']} collection. Message: {resp.text}")


def load_items(base_url, collection_id, use_bulk, data_dir):
def load_items(
client: Client, collection_id: str, use_bulk: bool, data_dir: str
) -> None:
"""Load STAC items into the database based on the method selected."""
# Attempt to dynamically find a suitable feature collection file
feature_files = [
file
for file in os.listdir(data_dir)
if file.endswith(".json") and file != "collection.json"
]
if not feature_files:
with os.scandir(data_dir) as entries:
# Attempt to dynamically find a suitable feature collection file
# Use the first found feature collection file
feature_file = next(
(
entry.path
for entry in entries
if entry.is_file()
and entry.name.endswith(".json")
and entry.name != "collection.json"
),
None,
)

if feature_file is None:
click.secho(
"No feature collection files found in the specified directory.",
fg="red",
err=True,
)
raise click.Abort()
feature_collection_file = feature_files[
0
] # Use the first found feature collection file
feature_collection = load_data(data_dir, feature_collection_file)

load_collection(base_url, collection_id, data_dir)
feature_collection = load_data(feature_file)

load_collection(client, collection_id, data_dir)
if use_bulk:
load_items_bulk_insert(base_url, collection_id, feature_collection, data_dir)
load_items_bulk_insert(client, collection_id, feature_collection)
else:
load_items_one_by_one(base_url, collection_id, feature_collection, data_dir)
load_items_one_by_one(client, collection_id, feature_collection)


def load_items_one_by_one(base_url, collection_id, feature_collection, data_dir):
def load_items_one_by_one(
client: Client, collection_id: str, feature_collection: dict[str, Any]
) -> None:
"""Load STAC items into the database one by one."""
for feature in feature_collection["features"]:
try:
feature["collection"] = collection_id
resp = requests.post(
f"{base_url}/collections/{collection_id}/items", json=feature
)
if resp.status_code == 200:
click.echo(f"Status code: {resp.status_code}")
click.echo(f"Added item: {feature['id']}")
elif resp.status_code == 409:
click.echo(f"Status code: {resp.status_code}")
click.echo(f"Item: {feature['id']} already exists")
except requests.ConnectionError:
click.secho("Failed to connect", fg="red", err=True)


def load_items_bulk_insert(base_url, collection_id, feature_collection, data_dir):
"""Load STAC items into the database via bulk insert."""
try:
for i, _ in enumerate(feature_collection["features"]):
feature_collection["features"][i]["collection"] = collection_id
resp = requests.post(
f"{base_url}/collections/{collection_id}/items", json=feature_collection
)
feature["collection"] = collection_id
resp = client.post(f"/collections/{collection_id}/items", json=feature)
if resp.status_code == 200:
click.echo(f"Status code: {resp.status_code}")
click.echo("Bulk inserted items successfully.")
elif resp.status_code == 204:
click.echo(f"Status code: {resp.status_code}")
click.echo("Bulk update successful, no content returned.")
click.echo(f"Added item: {feature['id']}")
elif resp.status_code == 409:
click.echo(f"Status code: {resp.status_code}")
click.echo("Conflict detected, some items might already exist.")
except requests.ConnectionError:
click.secho("Failed to connect", fg="red", err=True)
click.echo(f"Item: {feature['id']} already exists")


def load_items_bulk_insert(
client: Client, collection_id: str, feature_collection: dict[str, Any]
) -> None:
"""Load STAC items into the database via bulk insert."""
for feature in feature_collection["features"]:
feature["collection"] = collection_id
resp = client.post(f"/collections/{collection_id}/items", json=feature_collection)
if resp.status_code == 200:
click.echo(f"Status code: {resp.status_code}")
click.echo("Bulk inserted items successfully.")
elif resp.status_code == 204:
click.echo(f"Status code: {resp.status_code}")
click.echo("Bulk update successful, no content returned.")
elif resp.status_code == 409:
click.echo(f"Status code: {resp.status_code}")
click.echo("Conflict detected, some items might already exist.")


@click.command()
Expand All @@ -117,9 +116,10 @@ def load_items_bulk_insert(base_url, collection_id, feature_collection, data_dir
default="sample_data/",
help="Directory containing collection.json and feature collection file",
)
def main(base_url, collection_id, use_bulk, data_dir):
def main(base_url: str, collection_id: str, use_bulk: bool, data_dir: str) -> None:
"""Load STAC items into the database."""
load_items(base_url, collection_id, use_bulk, data_dir)
with Client(base_url=base_url) as client:
load_items(client, collection_id, use_bulk, data_dir)


if __name__ == "__main__":
Expand Down
1 change: 0 additions & 1 deletion stac_fastapi/elasticsearch/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
"pytest-cov~=4.0.0",
"pytest-asyncio~=0.21.0",
"pre-commit~=3.0.0",
"requests>=2.32.0,<3.0.0",
"ciso8601~=2.3.0",
"httpx>=0.24.0,<0.28.0",
],
Expand Down
1 change: 0 additions & 1 deletion stac_fastapi/opensearch/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
"pytest-cov~=4.0.0",
"pytest-asyncio~=0.21.0",
"pre-commit~=3.0.0",
"requests>=2.32.0,<3.0.0",
"ciso8601~=2.3.0",
"httpx>=0.24.0,<0.28.0",
],
Expand Down