1
1
"""Data Loader CLI STAC_API Ingestion Tool."""
2
- import json
2
+
3
3
import os
4
+ from typing import Any
4
5
5
6
import click
6
- import requests
7
+ import orjson
8
+ from httpx import Client
7
9
8
10
9
- def load_data (data_dir , filename ) :
11
+ def load_data (filepath : str ) -> dict [ str , Any ] :
10
12
"""Load json data from a file within the specified data directory."""
11
- filepath = os .path .join (data_dir , filename )
12
- if not os .path .exists (filepath ):
13
+ try :
14
+ with open (filepath , "rb" ) as file :
15
+ return orjson .loads (file .read ())
16
+ except FileNotFoundError as e :
13
17
click .secho (f"File not found: { filepath } " , fg = "red" , err = True )
14
- raise click .Abort ()
15
- with open (filepath ) as file :
16
- return json .load (file )
18
+ raise click .Abort () from e
17
19
18
20
19
- def load_collection (base_url , collection_id , data_dir ) :
21
+ def load_collection (client : Client , collection_id : str , data_dir : str ) -> None :
20
22
"""Load a STAC collection into the database."""
21
- collection = load_data (data_dir , "collection.json" )
23
+ collection = load_data (os . path . join ( data_dir , "collection.json" ) )
22
24
collection ["id" ] = collection_id
23
- try :
24
- resp = requests .post (f"{ base_url } /collections" , json = collection )
25
- if resp .status_code == 200 or resp .status_code == 201 :
26
- click .echo (f"Status code: { resp .status_code } " )
27
- click .echo (f"Added collection: { collection ['id' ]} " )
28
- elif resp .status_code == 409 :
29
- click .echo (f"Status code: { resp .status_code } " )
30
- click .echo (f"Collection: { collection ['id' ]} already exists" )
31
- else :
32
- click .echo (f"Status code: { resp .status_code } " )
33
- click .echo (
34
- f"Error writing { collection ['id' ]} collection. Message: { resp .text } "
35
- )
36
- except requests .ConnectionError :
37
- click .secho ("Failed to connect" , fg = "red" , err = True )
25
+ resp = client .post ("/collections" , json = collection )
26
+ if resp .status_code == 200 or resp .status_code == 201 :
27
+ click .echo (f"Status code: { resp .status_code } " )
28
+ click .echo (f"Added collection: { collection ['id' ]} " )
29
+ elif resp .status_code == 409 :
30
+ click .echo (f"Status code: { resp .status_code } " )
31
+ click .echo (f"Collection: { collection ['id' ]} already exists" )
32
+ else :
33
+ click .echo (f"Status code: { resp .status_code } " )
34
+ click .echo (f"Error writing { collection ['id' ]} collection. Message: { resp .text } " )
38
35
39
36
40
- def load_items (base_url , collection_id , use_bulk , data_dir ):
37
+ def load_items (
38
+ client : Client , collection_id : str , use_bulk : bool , data_dir : str
39
+ ) -> None :
41
40
"""Load STAC items into the database based on the method selected."""
42
- # Attempt to dynamically find a suitable feature collection file
43
- feature_files = [
44
- file
45
- for file in os .listdir (data_dir )
46
- if file .endswith (".json" ) and file != "collection.json"
47
- ]
48
- if not feature_files :
41
+ with os .scandir (data_dir ) as entries :
42
+ # Attempt to dynamically find a suitable feature collection file
43
+ # Use the first found feature collection file
44
+ feature_file = next (
45
+ (
46
+ entry .path
47
+ for entry in entries
48
+ if entry .is_file ()
49
+ and entry .name .endswith (".json" )
50
+ and entry .name != "collection.json"
51
+ ),
52
+ None ,
53
+ )
54
+
55
+ if feature_file is None :
49
56
click .secho (
50
57
"No feature collection files found in the specified directory." ,
51
58
fg = "red" ,
52
59
err = True ,
53
60
)
54
61
raise click .Abort ()
55
- feature_collection_file = feature_files [
56
- 0
57
- ] # Use the first found feature collection file
58
- feature_collection = load_data (data_dir , feature_collection_file )
59
62
60
- load_collection (base_url , collection_id , data_dir )
63
+ feature_collection = load_data (feature_file )
64
+
65
+ load_collection (client , collection_id , data_dir )
61
66
if use_bulk :
62
- load_items_bulk_insert (base_url , collection_id , feature_collection , data_dir )
67
+ load_items_bulk_insert (client , collection_id , feature_collection )
63
68
else :
64
- load_items_one_by_one (base_url , collection_id , feature_collection , data_dir )
69
+ load_items_one_by_one (client , collection_id , feature_collection )
65
70
66
71
67
- def load_items_one_by_one (base_url , collection_id , feature_collection , data_dir ):
72
+ def load_items_one_by_one (
73
+ client : Client , collection_id : str , feature_collection : dict [str , Any ]
74
+ ) -> None :
68
75
"""Load STAC items into the database one by one."""
69
76
for feature in feature_collection ["features" ]:
70
- try :
71
- feature ["collection" ] = collection_id
72
- resp = requests .post (
73
- f"{ base_url } /collections/{ collection_id } /items" , json = feature
74
- )
75
- if resp .status_code == 200 :
76
- click .echo (f"Status code: { resp .status_code } " )
77
- click .echo (f"Added item: { feature ['id' ]} " )
78
- elif resp .status_code == 409 :
79
- click .echo (f"Status code: { resp .status_code } " )
80
- click .echo (f"Item: { feature ['id' ]} already exists" )
81
- except requests .ConnectionError :
82
- click .secho ("Failed to connect" , fg = "red" , err = True )
83
-
84
-
85
- def load_items_bulk_insert (base_url , collection_id , feature_collection , data_dir ):
86
- """Load STAC items into the database via bulk insert."""
87
- try :
88
- for i , _ in enumerate (feature_collection ["features" ]):
89
- feature_collection ["features" ][i ]["collection" ] = collection_id
90
- resp = requests .post (
91
- f"{ base_url } /collections/{ collection_id } /items" , json = feature_collection
92
- )
77
+ feature ["collection" ] = collection_id
78
+ resp = client .post (f"/collections/{ collection_id } /items" , json = feature )
93
79
if resp .status_code == 200 :
94
80
click .echo (f"Status code: { resp .status_code } " )
95
- click .echo ("Bulk inserted items successfully." )
96
- elif resp .status_code == 204 :
97
- click .echo (f"Status code: { resp .status_code } " )
98
- click .echo ("Bulk update successful, no content returned." )
81
+ click .echo (f"Added item: { feature ['id' ]} " )
99
82
elif resp .status_code == 409 :
100
83
click .echo (f"Status code: { resp .status_code } " )
101
- click .echo ("Conflict detected, some items might already exist." )
102
- except requests .ConnectionError :
103
- click .secho ("Failed to connect" , fg = "red" , err = True )
84
+ click .echo (f"Item: { feature ['id' ]} already exists" )
85
+
86
+
87
+ def load_items_bulk_insert (
88
+ client : Client , collection_id : str , feature_collection : dict [str , Any ]
89
+ ) -> None :
90
+ """Load STAC items into the database via bulk insert."""
91
+ for feature in feature_collection ["features" ]:
92
+ feature ["collection" ] = collection_id
93
+ resp = client .post (f"/collections/{ collection_id } /items" , json = feature_collection )
94
+ if resp .status_code == 200 :
95
+ click .echo (f"Status code: { resp .status_code } " )
96
+ click .echo ("Bulk inserted items successfully." )
97
+ elif resp .status_code == 204 :
98
+ click .echo (f"Status code: { resp .status_code } " )
99
+ click .echo ("Bulk update successful, no content returned." )
100
+ elif resp .status_code == 409 :
101
+ click .echo (f"Status code: { resp .status_code } " )
102
+ click .echo ("Conflict detected, some items might already exist." )
104
103
105
104
106
105
@click .command ()
@@ -117,9 +116,10 @@ def load_items_bulk_insert(base_url, collection_id, feature_collection, data_dir
117
116
default = "sample_data/" ,
118
117
help = "Directory containing collection.json and feature collection file" ,
119
118
)
120
- def main (base_url , collection_id , use_bulk , data_dir ) :
119
+ def main (base_url : str , collection_id : str , use_bulk : bool , data_dir : str ) -> None :
121
120
"""Load STAC items into the database."""
122
- load_items (base_url , collection_id , use_bulk , data_dir )
121
+ with Client (base_url = base_url ) as client :
122
+ load_items (client , collection_id , use_bulk , data_dir )
123
123
124
124
125
125
if __name__ == "__main__" :
0 commit comments