Skip to content

Commit 0a87e9a

Browse files
committed
Example Scripts Folder
1 parent 195b96a commit 0a87e9a

File tree

1 file changed

+105
-0
lines changed

1 file changed

+105
-0
lines changed

examples/cancel_batch.py

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
import argparse
2+
import sys
3+
from collections import defaultdict
4+
from concurrent.futures import ThreadPoolExecutor, as_completed
5+
6+
import scaleapi
7+
from scaleapi.exceptions import ScaleException, ScaleUnauthorized
8+
9+
# Script that takes in an array of batch names (split by comma) and
10+
# applies a bulk action to cancel all tasks in each batch.
11+
# By default, this script makes 50 concurrent API calls.
12+
13+
# Example:
14+
# python cancel_batch.py --api_key "SCALE_API_KEY"
15+
# --batches "batch1,batch2" --clear "True"
16+
17+
# Change this for update concurrency
18+
MAX_WORKERS = 50
19+
20+
21+
def cancel_batch(client, batch_name, clear_unique_id):
22+
print(f"\nProcessing Batch: {batch_name}")
23+
try:
24+
batch = client.get_batch(batch_name)
25+
except ScaleException:
26+
print(f"-ERROR: Batch {batch_name} not found.")
27+
return
28+
29+
task_count = client.get_tasks_count(project_name=batch.project, batch_name=batch.name)
30+
print(f"-Batch {batch.name} contains {task_count} tasks.")
31+
32+
summary_metrics = defaultdict(lambda: 0)
33+
task_in_progress = 0
34+
processes = []
35+
36+
tasks = client.get_tasks(project_name=batch.project, batch_name=batch.name)
37+
38+
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
39+
for task in tasks:
40+
task_in_progress += 1
41+
if task_in_progress % 1000 == 0:
42+
print(f"-Processing Task # {task_in_progress}")
43+
processes.append(
44+
executor.submit(cancel_task_with_response, client, task, clear_unique_id)
45+
)
46+
47+
for process in as_completed(processes):
48+
result = process.result()
49+
summary_metrics[result["status"]] += 1
50+
51+
for k, v in summary_metrics.items():
52+
print(f"--{k}: {v} tasks")
53+
54+
55+
def cancel_task_with_response(client: scaleapi.ScaleClient, task, clear_unique_ud):
56+
task_status = task.as_dict()["status"]
57+
if task_status in ["completed", "canceled"]:
58+
return {"task": task.id, "status": task_status}
59+
60+
try:
61+
task = client.cancel_task(task.id, clear_unique_ud)
62+
return {"task": task.id, "status": task.as_dict()["status"]}
63+
except ScaleException:
64+
return {"task": task.id, "status": "Can not cancel"}
65+
except Exception as err:
66+
print(err)
67+
return {"task": task.id, "status": "Errored"}
68+
69+
70+
def get_args():
71+
ap = argparse.ArgumentParser()
72+
ap.add_argument("--api_key", required=True, help="Please provide Scale API Key")
73+
ap.add_argument(
74+
"--batches", required=True, help="Please enter batch names separated by a comma"
75+
)
76+
ap.add_argument(
77+
"--clear",
78+
type=bool,
79+
help="Set to True if you want to remove unique_id upon cancel",
80+
)
81+
return ap.parse_args()
82+
83+
84+
def main():
85+
args = get_args()
86+
clear_unique_id = args.clear or False
87+
88+
client = scaleapi.ScaleClient(args.api_key)
89+
90+
# Testing API Key
91+
try:
92+
client.projects()
93+
except ScaleUnauthorized as err:
94+
print(err.message)
95+
sys.exit(1)
96+
97+
batch_list = args.batches.split(",")
98+
batches = [word.strip() for word in batch_list]
99+
100+
for batch_name in batches:
101+
cancel_batch(client, batch_name, clear_unique_id)
102+
103+
104+
if __name__ == "__main__":
105+
main()

0 commit comments

Comments
 (0)