Skip to content

Adding TorchX-MCAD Scheduler Support to Jobs #78

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ homepage = "https://github.com/project-codeflare/codeflare-sdk"
keywords = ['codeflare', 'python', 'sdk', 'client', 'batch', 'scale']

[tool.poetry.dependencies]
python = "^3.6.3"
python = "^3.7"
openshift-client = "1.0.18"
rich = "^12.5"
ray = {version = "2.1.0", extras = ["default"]}
kubernetes = "26.1.0"
torchx = {git = "https://github.com/project-codeflare/torchx", rev = "OCP"}
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
openshift-client==1.0.18
rich==12.5.1
ray[default]==2.1.0
git+https://github.com/project-codeflare/torchx@6517d5b060e4fe32b9ad41019c3bef647095c35f#egg=torchx
kubernetes==26.1.0
git+https://github.com/project-codeflare/torchx@OCP
49 changes: 47 additions & 2 deletions src/codeflare_sdk/job/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def __init__(
mounts: Optional[List[str]] = None,
rdzv_port: int = 29500,
scheduler_args: Optional[Dict[str, str]] = None,
image: Optional[str] = None,
):
if bool(script) == bool(m): # logical XOR
raise ValueError(
Expand All @@ -82,6 +83,7 @@ def __init__(
self.scheduler_args: Dict[str, str] = (
scheduler_args if scheduler_args is not None else dict()
)
self.image = image

def _dry_run(self, cluster: "Cluster"):
j = f"{cluster.config.max_worker}x{max(cluster.config.gpu, 1)}" # # of proc. = # of gpus
Expand All @@ -108,15 +110,58 @@ def _dry_run(self, cluster: "Cluster"):
workspace=f"file://{Path.cwd()}",
)

def submit(self, cluster: "Cluster") -> "Job":
def _missing_spec(self, spec: str):
raise ValueError(f"Job definition missing arg: {spec}")

def _dry_run_no_cluster(self):
return torchx_runner.dryrun(
app=ddp(
*self.script_args,
script=self.script,
m=self.m,
name=self.name if self.name is not None else self._missing_spec("name"),
h=self.h,
cpu=self.cpu
if self.cpu is not None
else self._missing_spec("cpu (# cpus per worker)"),
gpu=self.gpu
if self.gpu is not None
else self._missing_spec("gpu (# gpus per worker)"),
memMB=self.memMB
if self.memMB is not None
else self._missing_spec("memMB (memory in MB)"),
j=self.j
if self.j is not None
else self._missing_spec(
"j (`workers`x`procs`)"
), # # of proc. = # of gpus,
env=self.env, # should this still exist?
max_retries=self.max_retries,
rdzv_port=self.rdzv_port, # should this still exist?
mounts=self.mounts,
image=self.image
if self.image is not None
else self._missing_spec("image"),
),
scheduler="kubernetes_mcad",
cfg=self.scheduler_args if self.scheduler_args is not None else None,
workspace="",
)

def submit(self, cluster: "Cluster" = None) -> "Job":
return DDPJob(self, cluster)


class DDPJob(Job):
def __init__(self, job_definition: "DDPJobDefinition", cluster: "Cluster"):
self.job_definition = job_definition
self.cluster = cluster
self._app_handle = torchx_runner.schedule(job_definition._dry_run(cluster))
if self.cluster:
self._app_handle = torchx_runner.schedule(job_definition._dry_run(cluster))
else:
self._app_handle = torchx_runner.schedule(
job_definition._dry_run_no_cluster()
)
all_jobs.append(self)

def status(self) -> str:
Expand Down