Skip to content

Commit 9c751ca

Browse files
committed
Migrate kafka-operator to use lb-operator for exposing brokers
This is far from final (for example, we'd need to expose a configurable LoadBalancerClass, for example), but I wanted to see whether lb-op would be a good fit for this operator first.
1 parent 7e9b331 commit 9c751ca

File tree

4 files changed

+40
-162
lines changed

4 files changed

+40
-162
lines changed

deploy/helm/kafka-operator/templates/roles-kafka.yaml

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,4 @@ apiVersion: rbac.authorization.k8s.io/v1
22
kind: ClusterRole
33
metadata:
44
name: {{ include "operator.fullname" . }}-kafka-broker-clusterrole
5-
rules:
6-
- apiGroups:
7-
- ""
8-
resources:
9-
- services
10-
verbs:
11-
- get
5+
rules: []

rust/operator/src/kafka_controller.rs

Lines changed: 37 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,20 @@ use stackable_operator::{
1919
api::{
2020
apps::v1::{StatefulSet, StatefulSetSpec},
2121
core::v1::{
22-
ConfigMap, ConfigMapKeySelector, ConfigMapVolumeSource, EmptyDirVolumeSource,
23-
EnvVar, EnvVarSource, ExecAction, ObjectFieldSelector, PodSpec, Probe,
22+
ConfigMap, ConfigMapKeySelector, ConfigMapVolumeSource, EnvVar, EnvVarSource,
23+
EphemeralVolumeSource, ExecAction, PersistentVolumeClaimSpec,
24+
PersistentVolumeClaimTemplate, PodSpec, Probe, ResourceRequirements,
2425
SecurityContext, Service, ServiceAccount, ServicePort, ServiceSpec, Volume,
2526
},
2627
rbac::v1::{ClusterRole, RoleBinding, RoleRef, Subject},
2728
},
28-
apimachinery::pkg::apis::meta::v1::LabelSelector,
29+
apimachinery::pkg::{api::resource::Quantity, apis::meta::v1::LabelSelector},
2930
Resource,
3031
},
31-
kube::runtime::{controller::Action, reflector::ObjectRef},
32+
kube::{
33+
core::ObjectMeta,
34+
runtime::{controller::Action, reflector::ObjectRef},
35+
},
3236
labels::{role_group_selector_labels, role_selector_labels},
3337
logging::controller::ReconcilerError,
3438
product_config::{
@@ -41,7 +45,6 @@ use strum::{EnumDiscriminants, IntoStaticStr};
4145

4246
use crate::{
4347
discovery::{self, build_discovery_configmaps},
44-
pod_svc_controller,
4548
utils::{self, ObjectRefExt},
4649
ControllerConfig,
4750
};
@@ -492,33 +495,6 @@ fn build_broker_rolegroup_statefulset(
492495
.context(KafkaVersionParseFailureSnafu)?;
493496
let image = format!("docker.stackable.tech/stackable/kafka:{}", image_version);
494497

495-
let container_get_svc = ContainerBuilder::new("get-svc")
496-
.image("docker.stackable.tech/stackable/tools:0.2.0-stackable0.3.0")
497-
.command(vec!["bash".to_string()])
498-
.args(vec![
499-
"-euo".to_string(),
500-
"pipefail".to_string(),
501-
"-c".to_string(),
502-
[
503-
"kubectl get service \"$POD_NAME\" -o jsonpath='{.spec.ports[0].nodePort}'",
504-
"tee /stackable/tmp/nodeport",
505-
]
506-
.join(" | "),
507-
])
508-
.add_env_vars(vec![EnvVar {
509-
name: "POD_NAME".to_string(),
510-
value_from: Some(EnvVarSource {
511-
field_ref: Some(ObjectFieldSelector {
512-
api_version: Some("v1".to_string()),
513-
field_path: "metadata.name".to_string(),
514-
}),
515-
..EnvVarSource::default()
516-
}),
517-
..EnvVar::default()
518-
}])
519-
.add_volume_mount("tmp", "/stackable/tmp")
520-
.build();
521-
522498
// For most storage classes the mounts will belong to the root user and not be writeable to
523499
// other users.
524500
// Since kafka runs as the user stackable inside of the container the data directory needs to be
@@ -583,18 +559,6 @@ fn build_broker_rolegroup_statefulset(
583559
..EnvVar::default()
584560
});
585561

586-
env.push(EnvVar {
587-
name: "NODE".to_string(),
588-
value_from: Some(EnvVarSource {
589-
field_ref: Some(ObjectFieldSelector {
590-
api_version: Some("v1".to_string()),
591-
field_path: "status.hostIP".to_string(),
592-
}),
593-
..EnvVarSource::default()
594-
}),
595-
..EnvVar::default()
596-
});
597-
598562
// add env var for log4j if set
599563
if kafka.spec.log4j.is_some() {
600564
env.push(EnvVar {
@@ -609,7 +573,7 @@ fn build_broker_rolegroup_statefulset(
609573
let jvm_args = format!("-javaagent:/stackable/jmx/jmx_prometheus_javaagent-0.16.1.jar={}:/stackable/jmx/broker.yaml", METRICS_PORT);
610574
let zookeeper_override = "--override \"zookeeper.connect=$ZOOKEEPER\"";
611575
let advertised_listeners_override =
612-
"--override \"advertised.listeners=PLAINTEXT://$NODE:$(cat /stackable/tmp/nodeport)\"";
576+
"--override \"advertised.listeners=PLAINTEXT://$(cat /stackable/lb/default-address/address):$(cat /stackable/lb/default-address/ports/kafka)\"";
613577
let opa_url_override = opa_connect_string.map_or("".to_string(), |opa| {
614578
format!("--override \"opa.authorizer.url={}\"", opa)
615579
});
@@ -634,7 +598,7 @@ fn build_broker_rolegroup_statefulset(
634598
.add_container_port("metrics", METRICS_PORT.into())
635599
.add_volume_mount(LOG_DIRS_VOLUME_NAME, "/stackable/data")
636600
.add_volume_mount("config", "/stackable/config")
637-
.add_volume_mount("tmp", "/stackable/tmp")
601+
.add_volume_mount("lb", "/stackable/lb")
638602
.resources(resources)
639603
.build();
640604

@@ -670,10 +634,9 @@ fn build_broker_rolegroup_statefulset(
670634
&rolegroup_ref.role,
671635
&rolegroup_ref.role_group,
672636
)
673-
.with_label(pod_svc_controller::LABEL_ENABLE, "true")
674637
})
675638
.add_init_container(container_chown)
676-
.add_init_container(container_get_svc)
639+
// .add_init_container(container_get_svc)
677640
.add_container(container_kafka)
678641
.add_container(container_kcat_prober)
679642
.add_volume(Volume {
@@ -685,8 +648,32 @@ fn build_broker_rolegroup_statefulset(
685648
..Volume::default()
686649
})
687650
.add_volume(Volume {
688-
name: "tmp".to_string(),
689-
empty_dir: Some(EmptyDirVolumeSource::default()),
651+
name: "lb".to_string(),
652+
ephemeral: Some(EphemeralVolumeSource {
653+
volume_claim_template: Some(PersistentVolumeClaimTemplate {
654+
metadata: Some(ObjectMeta {
655+
annotations: Some(
656+
[(
657+
"lb.stackable.tech/lb-class".to_string(),
658+
"nodeport".to_string(),
659+
)]
660+
.into(),
661+
),
662+
..Default::default()
663+
}),
664+
spec: PersistentVolumeClaimSpec {
665+
access_modes: Some(vec!["ReadWriteMany".to_string()]),
666+
resources: Some(ResourceRequirements {
667+
requests: Some(
668+
[("storage".to_string(), Quantity("1".to_string()))].into(),
669+
),
670+
..Default::default()
671+
}),
672+
storage_class_name: Some("lb.stackable.tech".to_string()),
673+
..Default::default()
674+
},
675+
}),
676+
}),
690677
..Volume::default()
691678
})
692679
.build_template();

rust/operator/src/lib.rs

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
mod discovery;
22
mod kafka_controller;
3-
mod pod_svc_controller;
43
mod utils;
54

65
use std::sync::Arc;
@@ -12,7 +11,7 @@ use stackable_operator::{
1211
client::Client,
1312
k8s_openapi::api::{
1413
apps::v1::StatefulSet,
15-
core::v1::{ConfigMap, Pod, Service, ServiceAccount},
14+
core::v1::{ConfigMap, Service, ServiceAccount},
1615
rbac::v1::RoleBinding,
1716
},
1817
kube::{api::ListParams, runtime::Controller},
@@ -65,24 +64,5 @@ pub async fn create_controller(
6564
report_controller_reconciled(&client, "kafkacluster.kafka.stackable.tech", &res);
6665
});
6766

68-
let pod_svc_controller = Controller::new(
69-
namespace.get_api::<Pod>(&client),
70-
ListParams::default().labels(&format!("{}=true", pod_svc_controller::LABEL_ENABLE)),
71-
)
72-
.owns(namespace.get_api::<Pod>(&client), ListParams::default())
73-
.shutdown_on_signal()
74-
.run(
75-
pod_svc_controller::reconcile_pod,
76-
pod_svc_controller::error_policy,
77-
Arc::new(pod_svc_controller::Ctx {
78-
client: client.clone(),
79-
}),
80-
)
81-
.map(|res| {
82-
report_controller_reconciled(&client, "pod-service.kafka.stackable.tech", &res);
83-
});
84-
85-
futures::stream::select(kafka_controller, pod_svc_controller)
86-
.collect::<()>()
87-
.await;
67+
kafka_controller.collect::<()>().await;
8868
}

rust/operator/src/pod_svc_controller.rs

Lines changed: 0 additions & 83 deletions
This file was deleted.

0 commit comments

Comments
 (0)