Skip to content

Commit a25a970

Browse files
committed
Example Scripts Folder
1 parent 195b96a commit a25a970

File tree

1 file changed

+103
-0
lines changed

1 file changed

+103
-0
lines changed

examples/cancel_batch.py

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
# Script that takes in an array of batch names (split by comma) and
2+
# applies a bulk action to cancel all tasks in each batch.
3+
# By default, this script makes 50 concurrent API calls.
4+
5+
# Example: python cancel_batch.py --api_key "SCALE_API_KEY" --batches "batch1,batch2" --clear "True"
6+
7+
import argparse
8+
from collections import defaultdict
9+
from concurrent.futures import ThreadPoolExecutor, as_completed
10+
import sys
11+
12+
import scaleapi
13+
from scaleapi.exceptions import ScaleException, ScaleUnauthorized
14+
15+
# Change this for update concurrency
16+
MAX_WORKERS = 50
17+
18+
19+
def cancel_batch(client, batch_name, clear_unique_id):
20+
print(f"\nProcessing Batch: {batch_name}")
21+
try:
22+
batch = client.get_batch(batch_name)
23+
except ScaleException:
24+
print(f"-ERROR: Batch {batch_name} not found.")
25+
return
26+
27+
task_count = client.get_tasks_count(project_name=batch.project, batch_name=batch.name)
28+
print(f"-Batch {batch.name} contains {task_count} tasks.")
29+
30+
summary_metrics = defaultdict(lambda: 0)
31+
task_in_progress = 0
32+
processes = []
33+
34+
tasks = client.get_tasks(project_name=batch.project, batch_name=batch.name)
35+
36+
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
37+
for task in tasks:
38+
task_in_progress += 1
39+
if task_in_progress % 1000 == 0:
40+
print(f"-Processing Task # {task_in_progress}")
41+
processes.append(
42+
executor.submit(cancel_task_with_response, client, task, clear_unique_id)
43+
)
44+
45+
for process in as_completed(processes):
46+
result = process.result()
47+
summary_metrics[result["status"]] += 1
48+
49+
for k, v in summary_metrics.items():
50+
print(f"--{k}: {v} tasks")
51+
52+
53+
def cancel_task_with_response(client: scaleapi.ScaleClient, task, clear_unique_ud):
54+
task_status = task.as_dict()["status"]
55+
if task_status in ["completed", "canceled"]:
56+
return {"task": task.id, "status": task_status}
57+
58+
try:
59+
task = client.cancel_task(task.id, clear_unique_ud)
60+
return {"task": task.id, "status": task.as_dict()["status"]}
61+
except ScaleException:
62+
return {"task": task.id, "status": "Can not cancel"}
63+
except Exception as err:
64+
print(err)
65+
return {"task": task.id, "status": "Errored"}
66+
67+
68+
def get_args():
69+
ap = argparse.ArgumentParser()
70+
ap.add_argument("--api_key", required=True, help="Please provide Scale API Key")
71+
ap.add_argument(
72+
"--batches", required=True, help="Please enter batch names separated by a comma"
73+
)
74+
ap.add_argument(
75+
"--clear",
76+
type=bool,
77+
help="Set to True if you want to remove unique_id upon cancel",
78+
)
79+
return ap.parse_args()
80+
81+
82+
def main():
83+
args = get_args()
84+
clear_unique_id = args.clear or False
85+
86+
client = scaleapi.ScaleClient(args.api_key)
87+
88+
# Testing API Key
89+
try:
90+
client.projects()
91+
except ScaleUnauthorized as err:
92+
print(err.message)
93+
sys.exit(1)
94+
95+
batch_list = args.batches.split(",")
96+
batches = [word.strip() for word in batch_list]
97+
98+
for batch_name in batches:
99+
cancel_batch(client, batch_name, clear_unique_id)
100+
101+
102+
if __name__ == "__main__":
103+
main()

0 commit comments

Comments
 (0)