Skip to content

Commit 880a2fd

Browse files
committed
Made Kueue the default queueing strategy
1 parent 5422936 commit 880a2fd

File tree

9 files changed

+117
-4
lines changed

9 files changed

+117
-4
lines changed

src/codeflare_sdk/cluster/cluster.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ def create_app_wrapper(self):
192192
dispatch_priority = self.config.dispatch_priority
193193
ingress_domain = self.config.ingress_domain
194194
ingress_options = self.config.ingress_options
195+
local_queue = self.config.local_queue
195196
return generate_appwrapper(
196197
name=name,
197198
namespace=namespace,
@@ -217,6 +218,7 @@ def create_app_wrapper(self):
217218
openshift_oauth=self.config.openshift_oauth,
218219
ingress_domain=ingress_domain,
219220
ingress_options=ingress_options,
221+
local_queue=local_queue,
220222
)
221223

222224
# creates a new cluster with the provided or default spec

src/codeflare_sdk/cluster/config.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ class ClusterConfiguration:
4646
num_gpus: int = 0
4747
template: str = f"{dir}/templates/base-template.yaml"
4848
instascale: bool = False
49-
mcad: bool = True
49+
mcad: bool = False
5050
envs: dict = field(default_factory=dict)
5151
image: str = ""
5252
local_interactive: bool = False
@@ -55,3 +55,4 @@ class ClusterConfiguration:
5555
openshift_oauth: bool = False # NOTE: to use the user must have permission to create a RoleBinding for system:auth-delegator
5656
ingress_options: dict = field(default_factory=dict)
5757
ingress_domain: str = None
58+
local_queue: str = None

src/codeflare_sdk/templates/base-template.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ spec:
4242
metadata:
4343
labels:
4444
workload.codeflare.dev/appwrapper: "aw-kuberay"
45+
kueue.x-k8s.io/queue-name: local-queue
4546
controller-tools.k8s.io: "1.0"
4647
# A unique identifier for the head node and workers of this cluster.
4748
name: kuberay-cluster

src/codeflare_sdk/utils/generate_yaml.py

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -544,6 +544,9 @@ def write_user_appwrapper(user_yaml, output_file_name):
544544
os.makedirs(directory_path)
545545

546546
with open(output_file_name, "w") as outfile:
547+
del user_yaml["spec"]["resources"]["GenericItems"][0]["generictemplate"][
548+
"metadata"
549+
]["labels"]["kueue.x-k8s.io/queue-name"]
547550
yaml.dump(user_yaml, outfile, default_flow_style=False)
548551

549552
print(f"Written to: {output_file_name}")
@@ -622,7 +625,39 @@ def _create_oauth_sidecar_object(
622625
)
623626

624627

625-
def write_components(user_yaml: dict, output_file_name: str):
628+
def update_local_kueue_label(local_queue: str, namespace: str):
629+
if local_queue is not None:
630+
return local_queue
631+
else:
632+
try:
633+
config_check()
634+
api_instance = client.CustomObjectsApi(api_config_handler())
635+
local_queues = api_instance.list_namespaced_custom_object(
636+
group="kueue.x-k8s.io",
637+
version="v1beta1",
638+
namespace=namespace,
639+
plural="localqueues",
640+
)
641+
except Exception as e: # pragma: no cover
642+
return _kube_api_error_handling(e)
643+
for lq in local_queues["items"]:
644+
if (
645+
"annotations" in lq["metadata"]
646+
and "kueue.x-k8s.io/default-queue" in lq["metadata"]["annotations"]
647+
and lq["metadata"]["annotations"][
648+
"kueue.x-k8s.io/default-queue"
649+
].lower()
650+
== "true"
651+
):
652+
return lq["metadata"]["name"]
653+
raise ValueError(
654+
"Default Local Queue with kueue.x-k8s.io/default-queue: true annotation not found please create a default Local Queue or provide the local_queue name in Cluster Configuration"
655+
)
656+
657+
658+
def write_components(
659+
user_yaml: dict, output_file_name: str, namespace: str, local_queue: str
660+
):
626661
# Create the directory if it doesn't exist
627662
directory_path = os.path.dirname(output_file_name)
628663
if not os.path.exists(directory_path):
@@ -633,6 +668,16 @@ def write_components(user_yaml: dict, output_file_name: str):
633668
with open(output_file_name, "a") as outfile:
634669
for component in components:
635670
if "generictemplate" in component:
671+
if (
672+
"workload.codeflare.dev/appwrapper"
673+
in component["generictemplate"]["metadata"]["labels"]
674+
):
675+
del component["generictemplate"]["metadata"]["labels"][
676+
"workload.codeflare.dev/appwrapper"
677+
]
678+
component["generictemplate"]["metadata"]["labels"][
679+
"kueue.x-k8s.io/queue-name"
680+
] = update_local_kueue_label(local_queue, namespace)
636681
outfile.write("---\n")
637682
yaml.dump(
638683
component["generictemplate"], outfile, default_flow_style=False
@@ -665,6 +710,7 @@ def generate_appwrapper(
665710
openshift_oauth: bool,
666711
ingress_domain: str,
667712
ingress_options: dict,
713+
local_queue: str,
668714
):
669715
user_yaml = read_template(template)
670716
appwrapper_name, cluster_name = gen_names(name)
@@ -725,7 +771,7 @@ def generate_appwrapper(
725771
directory_path = os.path.expanduser("~/.codeflare/appwrapper/")
726772
outfile = os.path.join(directory_path, appwrapper_name + ".yaml")
727773
if not mcad:
728-
write_components(user_yaml, outfile)
774+
write_components(user_yaml, outfile, namespace, local_queue)
729775
else:
730776
write_user_appwrapper(user_yaml, outfile)
731777
return outfile

tests/e2e/mnist_raycluster_sdk_test.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ def run_mnist_raycluster_sdk(self):
7070
instascale=False,
7171
image=ray_image,
7272
ingress_options=ingress_options,
73+
mcad=True,
7374
)
7475
)
7576

tests/e2e/start_ray_cluster.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
instascale=False,
3939
image=ray_image,
4040
ingress_options=ingress_options,
41+
mcad=True,
4142
)
4243
)
4344

tests/test-case-no-mcad.yamls

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ kind: RayCluster
44
metadata:
55
labels:
66
controller-tools.k8s.io: '1.0'
7-
workload.codeflare.dev/appwrapper: unit-test-cluster-ray
7+
kueue.x-k8s.io/queue-name: local-queue-default
88
name: unit-test-cluster-ray
99
namespace: ns
1010
spec:

tests/unit_test.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,12 +307,42 @@ def test_create_app_wrapper_raises_error_with_no_image():
307307
), "Error message did not match expected output."
308308

309309

310+
def get_local_queue(group, version, namespace, plural):
311+
assert group == "kueue.x-k8s.io"
312+
assert version == "v1beta1"
313+
assert namespace == "ns"
314+
assert plural == "localqueues"
315+
local_queues = {
316+
"apiVersion": "kueue.x-k8s.io/v1beta1",
317+
"items": [
318+
{
319+
"apiVersion": "kueue.x-k8s.io/v1beta1",
320+
"kind": "LocalQueue",
321+
"metadata": {
322+
"annotations": {"kueue.x-k8s.io/default-queue": "true"},
323+
"name": "local-queue-default",
324+
"namespace": "ns",
325+
},
326+
"spec": {"clusterQueue": "cluster-queue"},
327+
}
328+
],
329+
"kind": "LocalQueueList",
330+
"metadata": {"continue": "", "resourceVersion": "2266811"},
331+
}
332+
return local_queues
333+
334+
310335
def test_cluster_creation_no_mcad(mocker):
336+
# Create Ray Cluster with no local queue specified
311337
mocker.patch("kubernetes.client.ApisApi.get_api_versions")
312338
mocker.patch(
313339
"kubernetes.client.CustomObjectsApi.get_cluster_custom_object",
314340
return_value={"spec": {"domain": "apps.cluster.awsroute.org"}},
315341
)
342+
mocker.patch(
343+
"kubernetes.client.CustomObjectsApi.list_namespaced_custom_object",
344+
return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"),
345+
)
316346
config = createClusterConfig()
317347
config.name = "unit-test-cluster-ray"
318348
config.mcad = False
@@ -326,6 +356,27 @@ def test_cluster_creation_no_mcad(mocker):
326356
)
327357

328358

359+
def test_cluster_creation_no_mcad_local_queue(mocker):
360+
# Create Ray Cluster with local queue specified
361+
mocker.patch("kubernetes.client.ApisApi.get_api_versions")
362+
mocker.patch(
363+
"kubernetes.client.CustomObjectsApi.get_cluster_custom_object",
364+
return_value={"spec": {"domain": "apps.cluster.awsroute.org"}},
365+
)
366+
config = createClusterConfig()
367+
config.name = "unit-test-cluster-ray"
368+
config.mcad = False
369+
config.local_queue = "local-queue-default"
370+
cluster = Cluster(config)
371+
assert cluster.app_wrapper_yaml == f"{aw_dir}unit-test-cluster-ray.yaml"
372+
assert cluster.app_wrapper_name == "unit-test-cluster-ray"
373+
assert filecmp.cmp(
374+
f"{aw_dir}unit-test-cluster-ray.yaml",
375+
f"{parent}/tests/test-case-no-mcad.yamls",
376+
shallow=True,
377+
)
378+
379+
329380
def test_cluster_creation_priority(mocker):
330381
mocker.patch("kubernetes.client.ApisApi.get_api_versions")
331382
mocker.patch("kubernetes.config.load_kube_config", return_value="ignore")
@@ -360,6 +411,7 @@ def test_default_cluster_creation(mocker):
360411
name="unit-test-default-cluster",
361412
image="quay.io/project-codeflare/ray:latest-py39-cu118",
362413
ingress_domain="apps.cluster.awsroute.org",
414+
mcad=True,
363415
)
364416
cluster = Cluster(default_config)
365417

@@ -466,6 +518,10 @@ def test_cluster_up_down(mocker):
466518

467519

468520
def test_cluster_up_down_no_mcad(mocker):
521+
mocker.patch(
522+
"kubernetes.client.CustomObjectsApi.list_namespaced_custom_object",
523+
return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"),
524+
)
469525
mocker.patch("kubernetes.client.ApisApi.get_api_versions")
470526
mocker.patch("kubernetes.config.load_kube_config", return_value="ignore")
471527
mocker.patch(
@@ -802,6 +858,7 @@ def test_ray_details(mocker, capsys):
802858
namespace="ns",
803859
image="quay.io/project-codeflare/ray:latest-py39-cu118",
804860
ingress_domain="apps.cluster.awsroute.org",
861+
mcad=True,
805862
)
806863
)
807864
captured = capsys.readouterr()
@@ -2055,6 +2112,7 @@ def test_cluster_status(mocker):
20552112
namespace="ns",
20562113
image="quay.io/project-codeflare/ray:latest-py39-cu118",
20572114
ingress_domain="apps.cluster.awsroute.org",
2115+
mcad=True,
20582116
)
20592117
)
20602118
mocker.patch("codeflare_sdk.cluster.cluster._app_wrapper_status", return_value=None)
@@ -2149,6 +2207,7 @@ def test_wait_ready(mocker, capsys):
21492207
namespace="ns",
21502208
image="quay.io/project-codeflare/ray:latest-py39-cu118",
21512209
ingress_domain="apps.cluster.awsroute.org",
2210+
mcad=True,
21522211
)
21532212
)
21542213
try:
@@ -2924,6 +2983,7 @@ def test_gen_app_wrapper_with_oauth(mocker: MockerFixture):
29242983
openshift_oauth=True,
29252984
image="quay.io/project-codeflare/ray:latest-py39-cu118",
29262985
ingress_domain="apps.cluster.awsroute.org",
2986+
mcad=True,
29272987
)
29282988
)
29292989
user_yaml = write_user_appwrapper.call_args.args[0]

tests/unit_test_support.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ def createClusterConfig():
4343
min_memory=5,
4444
max_memory=6,
4545
num_gpus=7,
46+
mcad=True,
4647
instascale=True,
4748
machine_types=["cpu.small", "gpu.large"],
4849
image_pull_secrets=["unit-test-pull-secret"],

0 commit comments

Comments
 (0)