From 6f07fa31a34cefc9d3796b82cfd8cf3c80137690 Mon Sep 17 00:00:00 2001 From: maxusmusti Date: Mon, 27 Mar 2023 12:04:34 -0400 Subject: [PATCH 1/4] First pass torchx-mcad --- src/codeflare_sdk/job/jobs.py | 49 +++++++++++++++++++++++++++++++++-- 1 file changed, 47 insertions(+), 2 deletions(-) diff --git a/src/codeflare_sdk/job/jobs.py b/src/codeflare_sdk/job/jobs.py index b95a9ba07..a9c371800 100644 --- a/src/codeflare_sdk/job/jobs.py +++ b/src/codeflare_sdk/job/jobs.py @@ -61,6 +61,7 @@ def __init__( mounts: Optional[List[str]] = None, rdzv_port: int = 29500, scheduler_args: Optional[Dict[str, str]] = None, + image: Optional[str] = None, ): if bool(script) == bool(m): # logical XOR raise ValueError( @@ -82,6 +83,7 @@ def __init__( self.scheduler_args: Dict[str, str] = ( scheduler_args if scheduler_args is not None else dict() ) + self.image = image def _dry_run(self, cluster: "Cluster"): j = f"{cluster.config.max_worker}x{max(cluster.config.gpu, 1)}" # # of proc. = # of gpus @@ -108,7 +110,45 @@ def _dry_run(self, cluster: "Cluster"): workspace=f"file://{Path.cwd()}", ) - def submit(self, cluster: "Cluster") -> "Job": + def _missing_spec(spec: str): + raise ValueError(f"Job definition missing arg: {spec}") + + def _dry_run_no_cluster(self): + return torchx_runner.dryrun( + app=ddp( + *self.script_args, + script=self.script, + m=self.m, + name=self.name, + h=self.h, + cpu=self.cpu + if self.cpu is not None + else self._missing_spec("cpu (# cpus per worker)"), + gpu=self.gpu + if self.gpu is not None + else self._missing_spec("gpu (# gpus per worker)"), + memMB=self.memMB + if self.memMB is not None + else self._missing_spec("memMB (memory in MB)"), + j=self.j + if self.j is not None + else self._missing_spec( + "j (`workers`x`procs`)" + ), # # of proc. = # of gpus, + env=self.env, # should this still exist? + max_retries=self.max_retries, + rdzv_port=self.rdzv_port, # should this still exist? + mounts=self.mounts, + image=self.image + if self.image is not None + else self._missing_spec("image"), + ), + scheduler="kubernetes_mcad", + cfg=self.scheduler_args if self.scheduler_args is not None else None, + workspace=f"file://{Path.cwd()}", + ) + + def submit(self, cluster: "Cluster" = None) -> "Job": return DDPJob(self, cluster) @@ -116,7 +156,12 @@ class DDPJob(Job): def __init__(self, job_definition: "DDPJobDefinition", cluster: "Cluster"): self.job_definition = job_definition self.cluster = cluster - self._app_handle = torchx_runner.schedule(job_definition._dry_run(cluster)) + if self.cluster: + self._app_handle = torchx_runner.schedule(job_definition._dry_run(cluster)) + else: + self._app_handle = torchx_runner.schedule( + job_definition._dry_run_no_cluster() + ) all_jobs.append(self) def status(self) -> str: From 29eb5376b043c749ad161656d36fff360f008fc1 Mon Sep 17 00:00:00 2001 From: maxusmusti Date: Tue, 28 Mar 2023 12:58:36 -0400 Subject: [PATCH 2/4] Make workspace blank --- src/codeflare_sdk/job/jobs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/codeflare_sdk/job/jobs.py b/src/codeflare_sdk/job/jobs.py index a9c371800..c97d5cb84 100644 --- a/src/codeflare_sdk/job/jobs.py +++ b/src/codeflare_sdk/job/jobs.py @@ -145,7 +145,7 @@ def _dry_run_no_cluster(self): ), scheduler="kubernetes_mcad", cfg=self.scheduler_args if self.scheduler_args is not None else None, - workspace=f"file://{Path.cwd()}", + workspace="", ) def submit(self, cluster: "Cluster" = None) -> "Job": From e9d32a7d000d053793a76c23e9370fed86acbbf3 Mon Sep 17 00:00:00 2001 From: maxusmusti Date: Wed, 29 Mar 2023 10:38:32 -0400 Subject: [PATCH 3/4] Updated requirements --- pyproject.toml | 4 +++- requirements.txt | 3 ++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index da65e9557..1c077c6cb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,7 +20,9 @@ homepage = "https://github.com/project-codeflare/codeflare-sdk" keywords = ['codeflare', 'python', 'sdk', 'client', 'batch', 'scale'] [tool.poetry.dependencies] -python = "^3.6.3" +python = "^3.7" openshift-client = "1.0.18" rich = "^12.5" ray = {version = "2.1.0", extras = ["default"]} +kubernetes = "26.1.0" +torchx = {git = "https://github.com/project-codeflare/torchx", rev = "OCP"} diff --git a/requirements.txt b/requirements.txt index 59f8082d8..a17a43ad2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ openshift-client==1.0.18 rich==12.5.1 ray[default]==2.1.0 -git+https://github.com/project-codeflare/torchx@6517d5b060e4fe32b9ad41019c3bef647095c35f#egg=torchx \ No newline at end of file +kubernetes==26.1.0 +git+https://github.com/project-codeflare/torchx@OCP From ebcf72eaf1ae0379fa6ef4c6300eb26386e9f5b4 Mon Sep 17 00:00:00 2001 From: maxusmusti Date: Wed, 29 Mar 2023 11:12:18 -0400 Subject: [PATCH 4/4] Feedback applied --- src/codeflare_sdk/job/jobs.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/codeflare_sdk/job/jobs.py b/src/codeflare_sdk/job/jobs.py index c97d5cb84..5ae09840f 100644 --- a/src/codeflare_sdk/job/jobs.py +++ b/src/codeflare_sdk/job/jobs.py @@ -110,7 +110,7 @@ def _dry_run(self, cluster: "Cluster"): workspace=f"file://{Path.cwd()}", ) - def _missing_spec(spec: str): + def _missing_spec(self, spec: str): raise ValueError(f"Job definition missing arg: {spec}") def _dry_run_no_cluster(self): @@ -119,7 +119,7 @@ def _dry_run_no_cluster(self): *self.script_args, script=self.script, m=self.m, - name=self.name, + name=self.name if self.name is not None else self._missing_spec("name"), h=self.h, cpu=self.cpu if self.cpu is not None