diff --git a/pyproject.toml b/pyproject.toml index da65e9557..1c077c6cb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,7 +20,9 @@ homepage = "https://github.com/project-codeflare/codeflare-sdk" keywords = ['codeflare', 'python', 'sdk', 'client', 'batch', 'scale'] [tool.poetry.dependencies] -python = "^3.6.3" +python = "^3.7" openshift-client = "1.0.18" rich = "^12.5" ray = {version = "2.1.0", extras = ["default"]} +kubernetes = "26.1.0" +torchx = {git = "https://github.com/project-codeflare/torchx", rev = "OCP"} diff --git a/requirements.txt b/requirements.txt index 59f8082d8..a17a43ad2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ openshift-client==1.0.18 rich==12.5.1 ray[default]==2.1.0 -git+https://github.com/project-codeflare/torchx@6517d5b060e4fe32b9ad41019c3bef647095c35f#egg=torchx \ No newline at end of file +kubernetes==26.1.0 +git+https://github.com/project-codeflare/torchx@OCP diff --git a/src/codeflare_sdk/job/jobs.py b/src/codeflare_sdk/job/jobs.py index b95a9ba07..5ae09840f 100644 --- a/src/codeflare_sdk/job/jobs.py +++ b/src/codeflare_sdk/job/jobs.py @@ -61,6 +61,7 @@ def __init__( mounts: Optional[List[str]] = None, rdzv_port: int = 29500, scheduler_args: Optional[Dict[str, str]] = None, + image: Optional[str] = None, ): if bool(script) == bool(m): # logical XOR raise ValueError( @@ -82,6 +83,7 @@ def __init__( self.scheduler_args: Dict[str, str] = ( scheduler_args if scheduler_args is not None else dict() ) + self.image = image def _dry_run(self, cluster: "Cluster"): j = f"{cluster.config.max_worker}x{max(cluster.config.gpu, 1)}" # # of proc. = # of gpus @@ -108,7 +110,45 @@ def _dry_run(self, cluster: "Cluster"): workspace=f"file://{Path.cwd()}", ) - def submit(self, cluster: "Cluster") -> "Job": + def _missing_spec(self, spec: str): + raise ValueError(f"Job definition missing arg: {spec}") + + def _dry_run_no_cluster(self): + return torchx_runner.dryrun( + app=ddp( + *self.script_args, + script=self.script, + m=self.m, + name=self.name if self.name is not None else self._missing_spec("name"), + h=self.h, + cpu=self.cpu + if self.cpu is not None + else self._missing_spec("cpu (# cpus per worker)"), + gpu=self.gpu + if self.gpu is not None + else self._missing_spec("gpu (# gpus per worker)"), + memMB=self.memMB + if self.memMB is not None + else self._missing_spec("memMB (memory in MB)"), + j=self.j + if self.j is not None + else self._missing_spec( + "j (`workers`x`procs`)" + ), # # of proc. = # of gpus, + env=self.env, # should this still exist? + max_retries=self.max_retries, + rdzv_port=self.rdzv_port, # should this still exist? + mounts=self.mounts, + image=self.image + if self.image is not None + else self._missing_spec("image"), + ), + scheduler="kubernetes_mcad", + cfg=self.scheduler_args if self.scheduler_args is not None else None, + workspace="", + ) + + def submit(self, cluster: "Cluster" = None) -> "Job": return DDPJob(self, cluster) @@ -116,7 +156,12 @@ class DDPJob(Job): def __init__(self, job_definition: "DDPJobDefinition", cluster: "Cluster"): self.job_definition = job_definition self.cluster = cluster - self._app_handle = torchx_runner.schedule(job_definition._dry_run(cluster)) + if self.cluster: + self._app_handle = torchx_runner.schedule(job_definition._dry_run(cluster)) + else: + self._app_handle = torchx_runner.schedule( + job_definition._dry_run_no_cluster() + ) all_jobs.append(self) def status(self) -> str: