Skip to content

Example Scripts Folder #54

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,15 @@ The attribute can be passed to the task payloads, in the ``attachment`` paramete
...
)

Example Scripts
_______________

A list of examples scripts for use.

* `cancel_batch.py`__ to concurrently cancel tasks in batches

__ https://github.com/scaleapi/scaleapi-python-client/blob/master/examples/cancel_batch.py

Evaluation tasks (For Scale Rapid projects only)
________________________________________________

Expand Down
109 changes: 109 additions & 0 deletions examples/cancel_batch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import argparse
import sys
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed

import scaleapi
from scaleapi.exceptions import ScaleException, ScaleUnauthorized

# Script that takes in an array of batch names (split by comma) and
# applies a bulk action to cancel all tasks in each batch.
# By default, this script makes 50 concurrent API calls.

# Example:
# python cancel_batch.py --api_key "SCALE_API_KEY"
# --batches "batch1,batch2" --clear "True"

# Change this for update concurrency
MAX_WORKERS = 50


def cancel_batch(client, batch_name, clear_unique_id):
print(f"\nProcessing Batch: {batch_name}")
try:
batch = client.get_batch(batch_name)
except ScaleException:
print(f"-ERROR: Batch {batch_name} not found.")
return

task_count = client.get_tasks_count(
project_name=batch.project, batch_name=batch.name
)
print(f"-Batch {batch.name} contains {task_count} tasks.")

summary_metrics = defaultdict(lambda: 0)
task_in_progress = 0
processes = []

tasks = client.get_tasks(project_name=batch.project, batch_name=batch.name)

with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
for task in tasks:
task_in_progress += 1
if task_in_progress % 1000 == 0:
print(f"-Processing Task # {task_in_progress}")
processes.append(
executor.submit(
cancel_task_with_response, client, task, clear_unique_id
)
)

for process in as_completed(processes):
result = process.result()
summary_metrics[result["status"]] += 1

for k, v in summary_metrics.items():
print(f"--{k}: {v} tasks")


def cancel_task_with_response(client: scaleapi.ScaleClient, task, clear_unique_ud):
task_status = task.as_dict()["status"]
if task_status in ["completed", "canceled"]:
return {"task": task.id, "status": task_status}

try:
task = client.cancel_task(task.id, clear_unique_ud)
return {"task": task.id, "status": task.as_dict()["status"]}
except ScaleException:
return {"task": task.id, "status": "Can not cancel"}
except Exception as err:
print(err)
return {"task": task.id, "status": "Errored"}


def get_args():
ap = argparse.ArgumentParser()
ap.add_argument("--api_key", required=True, help="Please provide Scale API Key")
ap.add_argument(
"--batches", required=True, help="Please enter batch names separated by a comma"
)
ap.add_argument(
"--clear",
type=bool,
help="Set to True if you want to remove unique_id upon cancel",
)
return ap.parse_args()


def main():
args = get_args()
clear_unique_id = args.clear or False

client = scaleapi.ScaleClient(args.api_key)

# Testing API Key
try:
client.projects()
except ScaleUnauthorized as err:
print(err.message)
sys.exit(1)

batch_list = args.batches.split(",")
batches = [word.strip() for word in batch_list]

for batch_name in batches:
cancel_batch(client, batch_name, clear_unique_id)


if __name__ == "__main__":
main()