diff --git a/src/codeflare_sdk/job/jobs.py b/src/codeflare_sdk/job/jobs.py index 5ae09840f..74135980c 100644 --- a/src/codeflare_sdk/job/jobs.py +++ b/src/codeflare_sdk/job/jobs.py @@ -153,7 +153,7 @@ def submit(self, cluster: "Cluster" = None) -> "Job": class DDPJob(Job): - def __init__(self, job_definition: "DDPJobDefinition", cluster: "Cluster"): + def __init__(self, job_definition: "DDPJobDefinition", cluster: "Cluster" = None): self.job_definition = job_definition self.cluster = cluster if self.cluster: @@ -169,3 +169,6 @@ def status(self) -> str: def logs(self) -> str: return "".join(torchx_runner.log_lines(self._app_handle, None)) + + def cancel(self): + torchx_runner.cancel(self._app_handle) diff --git a/tests/unit_test.py b/tests/unit_test.py index bd9261c44..9338434a2 100644 --- a/tests/unit_test.py +++ b/tests/unit_test.py @@ -61,6 +61,7 @@ from torchx.specs import AppDryRunInfo, AppDef from torchx.runner import get_runner, Runner from torchx.schedulers.ray_scheduler import RayJob +from torchx.schedulers.kubernetes_mcad_scheduler import KubernetesMCADJob import pytest @@ -1686,6 +1687,40 @@ def test_DDPJobDefinition_dry_run(): assert ddp_job._scheduler == "ray" +def test_DDPJobDefinition_dry_run_no_cluster(): + """ + Test that the dry run method returns the correct type: AppDryRunInfo, + that the attributes of the returned object are of the correct type, + and that the values from cluster and job definition are correctly passed. + """ + ddp = test_DDPJobDefinition_creation() + ddp.image = "fake-image" + ddp_job = ddp._dry_run_no_cluster() + assert type(ddp_job) == AppDryRunInfo + assert ddp_job._fmt is not None + assert type(ddp_job.request) == KubernetesMCADJob + assert type(ddp_job._app) == AppDef + assert type(ddp_job._cfg) == type(dict()) + assert type(ddp_job._scheduler) == type(str()) + + assert ( + ddp_job.request.resource["spec"]["resources"]["GenericItems"][0][ + "generictemplate" + ] + .spec.containers[0] + .image + == "fake-image" + ) + + assert ddp_job._app.roles[0].resource.cpu == 1 + assert ddp_job._app.roles[0].resource.gpu == 0 + assert ddp_job._app.roles[0].resource.memMB == 1024 + + assert ddp_job._cfg["requirements"] == "test" + + assert ddp_job._scheduler == "kubernetes_mcad" + + def test_DDPJobDefinition_dry_run_no_resource_args(): """ Test that the dry run correctly gets resources from the cluster object @@ -1715,6 +1750,55 @@ def test_DDPJobDefinition_dry_run_no_resource_args(): ) +def test_DDPJobDefinition_dry_run_no_cluster_no_resource_args(): + """ + Test that the dry run method returns the correct type: AppDryRunInfo, + that the attributes of the returned object are of the correct type, + and that the values from cluster and job definition are correctly passed. + """ + ddp = test_DDPJobDefinition_creation() + try: + ddp._dry_run_no_cluster() + assert 0 == 1 + except ValueError as e: + assert str(e) == "Job definition missing arg: image" + ddp.image = "fake-image" + ddp.name = None + try: + ddp._dry_run_no_cluster() + assert 0 == 1 + except ValueError as e: + assert str(e) == "Job definition missing arg: name" + ddp.name = "fake" + ddp.cpu = None + try: + ddp._dry_run_no_cluster() + assert 0 == 1 + except ValueError as e: + assert str(e) == "Job definition missing arg: cpu (# cpus per worker)" + ddp.cpu = 1 + ddp.gpu = None + try: + ddp._dry_run_no_cluster() + assert 0 == 1 + except ValueError as e: + assert str(e) == "Job definition missing arg: gpu (# gpus per worker)" + ddp.gpu = 1 + ddp.memMB = None + try: + ddp._dry_run_no_cluster() + assert 0 == 1 + except ValueError as e: + assert str(e) == "Job definition missing arg: memMB (memory in MB)" + ddp.memMB = 1 + ddp.j = None + try: + ddp._dry_run_no_cluster() + assert 0 == 1 + except ValueError as e: + assert str(e) == "Job definition missing arg: j (`workers`x`procs`)" + + def test_DDPJobDefinition_submit(mocker): """ Tests that the submit method returns the correct type: DDPJob @@ -1733,6 +1817,14 @@ def test_DDPJobDefinition_submit(mocker): assert type(ddp_job._app_handle) == str assert ddp_job._app_handle == "fake-dashboard-url" + ddp_def.image = "fake-image" + ddp_job = ddp_def.submit() + assert type(ddp_job) == DDPJob + assert type(ddp_job.job_definition) == DDPJobDefinition + assert ddp_job.cluster == None + assert type(ddp_job._app_handle) == str + assert ddp_job._app_handle == "fake-dashboard-url" + def test_DDPJob_creation(mocker): ddp_def = test_DDPJobDefinition_creation() @@ -1757,6 +1849,29 @@ def test_DDPJob_creation(mocker): return ddp_job +def test_DDPJob_creation_no_cluster(mocker): + ddp_def = test_DDPJobDefinition_creation() + ddp_def.image = "fake-image" + mocker.patch( + "codeflare_sdk.job.jobs.torchx_runner.schedule", + return_value="fake-app-handle", + ) # a fake app_handle + ddp_job = DDPJob(ddp_def, None) + assert type(ddp_job) == DDPJob + assert type(ddp_job.job_definition) == DDPJobDefinition + assert ddp_job.cluster == None + assert type(ddp_job._app_handle) == str + assert ddp_job._app_handle == "fake-app-handle" + _, args, kwargs = torchx_runner.schedule.mock_calls[0] + assert type(args[0]) == AppDryRunInfo + job_info = args[0] + assert type(job_info.request) == KubernetesMCADJob + assert type(job_info._app) == AppDef + assert type(job_info._cfg) == type(dict()) + assert type(job_info._scheduler) == type(str()) + return ddp_job + + def test_DDPJob_status(mocker): ddp_job = test_DDPJob_creation(mocker) mocker.patch( @@ -1777,6 +1892,18 @@ def test_DDPJob_logs(mocker): assert args[0] == "fake-dashboard-url" +def arg_check_side_effect(*args): + assert args[0] == "fake-app-handle" + + +def test_DDPJob_cancel(mocker): + ddp_job = test_DDPJob_creation_no_cluster(mocker) + mocker.patch( + "codeflare_sdk.job.jobs.torchx_runner.cancel", side_effect=arg_check_side_effect + ) + ddp_job.cancel() + + def parse_j(cmd): pattern = r"--nnodes\s+\d+\s+--nproc_per_node\s+\d+"