Skip to content

Commit 2e63db2

Browse files
committed
Example Scripts Folder
1 parent 195b96a commit 2e63db2

File tree

1 file changed

+107
-0
lines changed

1 file changed

+107
-0
lines changed

examples/cancel_batch.py

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
# Script that takes in an array of batch names (split by comma) and
2+
# applies a bulk action to cancel all tasks in each batch.
3+
# By default, this script makes 50 concurrent API calls.
4+
5+
# Example: python cancel_batch.py --api_key "SCALE_API_KEY" --batches "batch1,batch2" --clear "True"
6+
7+
import argparse
8+
from collections import defaultdict
9+
from concurrent.futures import ThreadPoolExecutor, as_completed
10+
import sys
11+
12+
import scaleapi
13+
from scaleapi.exceptions import ScaleException, ScaleUnauthorized
14+
15+
# Change this for update concurrency
16+
MAX_WORKERS = 50
17+
18+
19+
def cancel_batch(client, batch_name, clear_unique_id):
20+
print(f"\nProcessing Batch: {batch_name}")
21+
try:
22+
batch = client.get_batch(batch_name)
23+
except ScaleException:
24+
print(f"-ERROR: Batch {batch_name} not found.")
25+
return
26+
27+
task_count = client.get_tasks_count(
28+
project_name=batch.project, batch_name=batch.name
29+
)
30+
print(f"-Batch {batch.name} contains {task_count} tasks.")
31+
32+
summary_metrics = defaultdict(lambda: 0)
33+
task_in_progress = 0
34+
processes = []
35+
36+
tasks = client.get_tasks(project_name=batch.project, batch_name=batch.name)
37+
38+
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
39+
for task in tasks:
40+
task_in_progress += 1
41+
if task_in_progress % 1000 == 0:
42+
print(f"-Processing Task # {task_in_progress}")
43+
processes.append(
44+
executor.submit(
45+
cancel_task_with_response, client, task, clear_unique_id
46+
)
47+
)
48+
49+
for process in as_completed(processes):
50+
result = process.result()
51+
summary_metrics[result["status"]] += 1
52+
53+
for k, v in summary_metrics.items():
54+
print(f"--{k}: {v} tasks")
55+
56+
57+
def cancel_task_with_response(client: scaleapi.ScaleClient, task, clear_unique_ud):
58+
task_status = task.as_dict()["status"]
59+
if task_status in ["completed", "canceled"]:
60+
return {"task": task.id, "status": task_status}
61+
62+
try:
63+
task = client.cancel_task(task.id, clear_unique_ud)
64+
return {"task": task.id, "status": task.as_dict()["status"]}
65+
except ScaleException:
66+
return {"task": task.id, "status": "Can not cancel"}
67+
except Exception as err:
68+
print(err)
69+
return {"task": task.id, "status": "Errored"}
70+
71+
72+
def get_args():
73+
ap = argparse.ArgumentParser()
74+
ap.add_argument("--api_key", required=True, help="Please provide Scale API Key")
75+
ap.add_argument(
76+
"--batches", required=True, help="Please enter batch names separated by a comma"
77+
)
78+
ap.add_argument(
79+
"--clear",
80+
type=bool,
81+
help="Set to True if you want to remove unique_id upon cancel",
82+
)
83+
return ap.parse_args()
84+
85+
86+
def main():
87+
args = get_args()
88+
clear_unique_id = args.clear or False
89+
90+
client = scaleapi.ScaleClient(args.api_key)
91+
92+
# Testing API Key
93+
try:
94+
client.projects()
95+
except ScaleUnauthorized as err:
96+
print(err.message)
97+
sys.exit(1)
98+
99+
batch_list = args.batches.split(",")
100+
batches = [word.strip() for word in batch_list]
101+
102+
for batch_name in batches:
103+
cancel_batch(client, batch_name, clear_unique_id)
104+
105+
106+
if __name__ == "__main__":
107+
main()

0 commit comments

Comments
 (0)