diff --git a/CHANGELOG.md b/CHANGELOG.md index 0caca4f1..4f339041 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ All notable changes to this project will be documented in this file. ### Added - Support version `3.8.0` ([#753]). +- Add support for Kerberos authentication ([#762]). - The operator can now run on Kubernetes clusters using a non-default cluster domain. Use the env var `KUBERNETES_CLUSTER_DOMAIN` or the operator Helm chart property `kubernetesClusterDomain` to set a non-default cluster domain ([#771]). @@ -35,6 +36,7 @@ All notable changes to this project will be documented in this file. [#741]: https://github.com/stackabletech/kafka-operator/pull/741 [#750]: https://github.com/stackabletech/kafka-operator/pull/750 [#753]: https://github.com/stackabletech/kafka-operator/pull/753 +[#762]: https://github.com/stackabletech/kafka-operator/pull/762 [#771]: https://github.com/stackabletech/kafka-operator/pull/771 [#773]: https://github.com/stackabletech/kafka-operator/pull/773 diff --git a/deploy/helm/kafka-operator/crds/crds.yaml b/deploy/helm/kafka-operator/crds/crds.yaml index 4c6ead0f..98b6769d 100644 --- a/deploy/helm/kafka-operator/crds/crds.yaml +++ b/deploy/helm/kafka-operator/crds/crds.yaml @@ -565,6 +565,10 @@ spec: Only affects client connections. This setting controls: - If clients need to authenticate themselves against the broker via TLS - Which ca.crt to use when validating the provided client certs This will override the server TLS settings (if set) in `spec.clusterConfig.tls.serverSecretClass`. + + ## Kerberos provider + + This affects client connections and also requires TLS for encryption. This setting is used to reference an `AuthenticationClass` and in turn, a `SecretClass` that is used to create keytabs. type: string required: - authenticationClass diff --git a/docs/modules/kafka/pages/usage-guide/security.adoc b/docs/modules/kafka/pages/usage-guide/security.adoc index 8744f5d2..e829a91b 100644 --- a/docs/modules/kafka/pages/usage-guide/security.adoc +++ b/docs/modules/kafka/pages/usage-guide/security.adoc @@ -53,6 +53,10 @@ You can create your own secrets and reference them e.g. in the `spec.clusterConf == Authentication The internal or broker-to-broker communication is authenticated via TLS. +For client-to-server communication, authentication can be achieved with either TLS or Kerberos. + +=== TLS + In order to enforce TLS authentication for client-to-server communication, you can set an `AuthenticationClass` reference in the custom resource provided by the xref:commons-operator:index.adoc[Commons Operator]. [source,yaml] @@ -101,6 +105,65 @@ spec: <3> The reference to a `SecretClass`. <4> The `SecretClass` that is referenced by the `AuthenticationClass` in order to provide certificates. +=== Kerberos + +Similarly, you can set an `AuthenticationClass` reference for a Kerberos authentication provider: + +[source,yaml] +---- +apiVersion: authentication.stackable.tech/v1alpha1 +kind: AuthenticationClass +metadata: + name: kafka-client-kerberos # <2> +spec: + provider: + kerberos: + kerberosSecretClass: kafka-client-auth-secret # <3> +--- +apiVersion: secrets.stackable.tech/v1alpha1 +kind: SecretClass +metadata: + name: kafka-client-auth-secret # <4> +spec: + backend: + kerberosKeytab: + ... +--- +apiVersion: kafka.stackable.tech/v1alpha1 +kind: KafkaCluster +metadata: + name: simple-kafka +spec: + image: + productVersion: 3.7.1 + clusterConfig: + authentication: + - authenticationClass: kafka-client-kerberos # <1> + tls: + serverSecretClass: tls # <5> + zookeeperConfigMapName: simple-kafka-znode + brokers: + roleGroups: + default: + replicas: 3 +---- +<1> The `clusterConfig.authentication.authenticationClass` can be set to use Kerberos for authentication. This is optional. +<2> The referenced `AuthenticationClass` that references a `SecretClass` to provide Kerberos keytabs. +<3> The reference to a `SecretClass`. +<4> The `SecretClass` that is referenced by the `AuthenticationClass` in order to provide keytabs. +<5> The SecretClass that will be used for encryption. + +NOTE: When Kerberos is enabled it is also required to enable TLS for maximum security. + +==== Clients + +In order to keep client configuration as uncluttered as possible, each kerberized Kafka broker has two principals: one for the broker itself and one for the bootstrap service. +The client can connect to the bootstrap service, which returns the broker quorum for use in subsequent operations. +This is transparent as each connection dynamically uses the relevant principal (broker or bootstrap). +In order for this to work, it is necessary for kerberized clusters to define an extra Kafka listener for the bootstrap with a corresponding service (and port). +The bootstrap address is written to the discovery ConfigMap, using the Stackable bootstrap listener with the port being 9095 (secure) for kerberized clusters, and 9092 (non-secure) or 9093 (secure) for non-kerberized ones. + +NOTE: Port 9094 is reserved for non-secure kerberized connections which is not currently implemented. == [[authorization]]Authorization diff --git a/rust/crd/src/authentication.rs b/rust/crd/src/authentication.rs index 04932eed..47462883 100644 --- a/rust/crd/src/authentication.rs +++ b/rust/crd/src/authentication.rs @@ -9,7 +9,7 @@ use stackable_operator::{ schemars::{self, JsonSchema}, }; -const SUPPORTED_AUTHENTICATION_CLASS_PROVIDERS: [&str; 1] = ["TLS"]; +pub const SUPPORTED_AUTHENTICATION_CLASS_PROVIDERS: [&str; 2] = ["TLS", "Kerberos"]; #[derive(Snafu, Debug)] pub enum Error { @@ -18,9 +18,10 @@ pub enum Error { source: stackable_operator::client::Error, authentication_class: ObjectRef, }, - // TODO: Adapt message if multiple authentication classes are supported - #[snafu(display("only one authentication class is currently supported. Possible Authentication class providers are {SUPPORTED_AUTHENTICATION_CLASS_PROVIDERS:?}"))] + + #[snafu(display("only one authentication class at a time is currently supported. Possible Authentication class providers are {SUPPORTED_AUTHENTICATION_CLASS_PROVIDERS:?}"))] MultipleAuthenticationClassesProvided, + #[snafu(display( "failed to use authentication provider [{provider}] for authentication class [{authentication_class}] - supported providers: {SUPPORTED_AUTHENTICATION_CLASS_PROVIDERS:?}", ))] @@ -42,6 +43,12 @@ pub struct KafkaAuthentication { /// - Which ca.crt to use when validating the provided client certs /// /// This will override the server TLS settings (if set) in `spec.clusterConfig.tls.serverSecretClass`. + /// + /// ## Kerberos provider + /// + /// This affects client connections and also requires TLS for encryption. + /// This setting is used to reference an `AuthenticationClass` and in turn, a `SecretClass` that is + /// used to create keytabs. pub authentication_class: String, } @@ -90,6 +97,13 @@ impl ResolvedAuthenticationClasses { .find(|auth| matches!(auth.spec.provider, AuthenticationClassProvider::Tls(_))) } + /// Return the (first) Kerberos `AuthenticationClass` if available + pub fn get_kerberos_authentication_class(&self) -> Option<&AuthenticationClass> { + self.resolved_authentication_classes + .iter() + .find(|auth| matches!(auth.spec.provider, AuthenticationClassProvider::Kerberos(_))) + } + /// Validates the resolved AuthenticationClasses. /// Currently errors out if: /// - More than one AuthenticationClass was provided @@ -101,8 +115,11 @@ impl ResolvedAuthenticationClasses { for auth_class in &self.resolved_authentication_classes { match &auth_class.spec.provider { - AuthenticationClassProvider::Tls(_) => {} - _ => { + // explicitly list each branch so new elements do not get overlooked + AuthenticationClassProvider::Tls(_) | AuthenticationClassProvider::Kerberos(_) => {} + AuthenticationClassProvider::Static(_) + | AuthenticationClassProvider::Ldap(_) + | AuthenticationClassProvider::Oidc(_) => { return Err(Error::AuthenticationProviderNotSupported { authentication_class: ObjectRef::from_obj(auth_class), provider: auth_class.spec.provider.to_string(), diff --git a/rust/crd/src/lib.rs b/rust/crd/src/lib.rs index 2c54953e..45eb0c46 100644 --- a/rust/crd/src/lib.rs +++ b/rust/crd/src/lib.rs @@ -5,11 +5,11 @@ pub mod listener; pub mod security; pub mod tls; -use crate::authentication::KafkaAuthentication; use crate::authorization::KafkaAuthorization; use crate::tls::KafkaTls; use affinity::get_affinity; +use authentication::KafkaAuthentication; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt, Snafu}; use stackable_operator::{ @@ -63,6 +63,9 @@ pub const STACKABLE_DATA_DIR: &str = "/stackable/data"; pub const STACKABLE_CONFIG_DIR: &str = "/stackable/config"; pub const STACKABLE_LOG_CONFIG_DIR: &str = "/stackable/log_config"; pub const STACKABLE_LOG_DIR: &str = "/stackable/log"; +// kerberos +pub const STACKABLE_KERBEROS_DIR: &str = "/stackable/kerberos"; +pub const STACKABLE_KERBEROS_KRB5_PATH: &str = "/stackable/kerberos/krb5.conf"; const DEFAULT_BROKER_GRACEFUL_SHUTDOWN_TIMEOUT: Duration = Duration::from_minutes_unchecked(30); @@ -335,6 +338,13 @@ impl KafkaRole { } roles } + + /// A Kerberos principal has three parts, with the form username/fully.qualified.domain.name@YOUR-REALM.COM. + /// We only have one role and will use "kafka" everywhere (which e.g. differs from the current hdfs implementation, + /// but is similar to HBase). + pub fn kerberos_service_name(&self) -> &'static str { + "kafka" + } } #[derive(Clone, Debug, Default, PartialEq, Fragment, JsonSchema)] diff --git a/rust/crd/src/listener.rs b/rust/crd/src/listener.rs index ae76c734..acbf1216 100644 --- a/rust/crd/src/listener.rs +++ b/rust/crd/src/listener.rs @@ -22,9 +22,14 @@ pub enum KafkaListenerProtocol { /// Unencrypted and unauthenticated HTTP connections #[strum(serialize = "PLAINTEXT")] Plaintext, + /// Encrypted and server-authenticated HTTPS connections #[strum(serialize = "SSL")] Ssl, + + /// Kerberos authentication + #[strum(serialize = "SASL_SSL")] + SaslSsl, } #[derive(strum::Display, Debug, EnumString, Ord, Eq, PartialEq, PartialOrd)] @@ -35,6 +40,8 @@ pub enum KafkaListenerName { ClientAuth, #[strum(serialize = "INTERNAL")] Internal, + #[strum(serialize = "BOOTSTRAP")] + Bootstrap, } #[derive(Debug)] @@ -99,6 +106,7 @@ pub fn get_kafka_listener_config( let mut advertised_listeners = vec![]; let mut listener_security_protocol_map = BTreeMap::new(); + // CLIENT if kafka_security.tls_client_authentication_class().is_some() { // 1) If client authentication required, we expose only CLIENT_AUTH connection with SSL listeners.push(KafkaListener { @@ -116,8 +124,25 @@ pub fn get_kafka_listener_config( }); listener_security_protocol_map .insert(KafkaListenerName::ClientAuth, KafkaListenerProtocol::Ssl); + } else if kafka_security.has_kerberos_enabled() { + // 2) Kerberos and TLS authentication classes are mutually exclusive + listeners.push(KafkaListener { + name: KafkaListenerName::Client, + host: LISTENER_LOCAL_ADDRESS.to_string(), + port: KafkaTlsSecurity::SECURE_CLIENT_PORT.to_string(), + }); + advertised_listeners.push(KafkaListener { + name: KafkaListenerName::Client, + host: node_address_cmd(STACKABLE_LISTENER_BROKER_DIR), + port: node_port_cmd( + STACKABLE_LISTENER_BROKER_DIR, + kafka_security.client_port_name(), + ), + }); + listener_security_protocol_map + .insert(KafkaListenerName::Client, KafkaListenerProtocol::SaslSsl); } else if kafka_security.tls_server_secret_class().is_some() { - // 2) If no client authentication but tls is required we expose CLIENT with SSL + // 3) If no client authentication but tls is required we expose CLIENT with SSL listeners.push(KafkaListener { name: KafkaListenerName::Client, host: LISTENER_LOCAL_ADDRESS.to_string(), @@ -134,7 +159,7 @@ pub fn get_kafka_listener_config( listener_security_protocol_map .insert(KafkaListenerName::Client, KafkaListenerProtocol::Ssl); } else { - // 3) If no client auth or tls is required we expose CLIENT with PLAINTEXT + // 4) If no client auth or tls is required we expose CLIENT with PLAINTEXT listeners.push(KafkaListener { name: KafkaListenerName::Client, host: LISTENER_LOCAL_ADDRESS.to_string(), @@ -152,22 +177,24 @@ pub fn get_kafka_listener_config( .insert(KafkaListenerName::Client, KafkaListenerProtocol::Plaintext); } - if kafka_security.tls_internal_secret_class().is_some() { - // 4) If internal tls is required we expose INTERNAL as SSL + // INTERNAL + if kafka_security.has_kerberos_enabled() || kafka_security.tls_internal_secret_class().is_some() + { + // 5) & 6) Kerberos and TLS authentication classes are mutually exclusive but both require internal tls to be used listeners.push(KafkaListener { name: KafkaListenerName::Internal, host: LISTENER_LOCAL_ADDRESS.to_string(), - port: kafka_security.internal_port().to_string(), + port: KafkaTlsSecurity::SECURE_INTERNAL_PORT.to_string(), }); advertised_listeners.push(KafkaListener { name: KafkaListenerName::Internal, - host: pod_fqdn, - port: kafka_security.internal_port().to_string(), + host: pod_fqdn.to_string(), + port: KafkaTlsSecurity::SECURE_INTERNAL_PORT.to_string(), }); listener_security_protocol_map .insert(KafkaListenerName::Internal, KafkaListenerProtocol::Ssl); } else { - // 5) If no internal tls is required we expose INTERNAL as PLAINTEXT + // 7) If no internal tls is required we expose INTERNAL as PLAINTEXT listeners.push(KafkaListener { name: KafkaListenerName::Internal, host: LISTENER_LOCAL_ADDRESS.to_string(), @@ -175,7 +202,7 @@ pub fn get_kafka_listener_config( }); advertised_listeners.push(KafkaListener { name: KafkaListenerName::Internal, - host: pod_fqdn, + host: pod_fqdn.to_string(), port: kafka_security.internal_port().to_string(), }); listener_security_protocol_map.insert( @@ -184,6 +211,25 @@ pub fn get_kafka_listener_config( ); } + // BOOTSTRAP + if kafka_security.has_kerberos_enabled() { + listeners.push(KafkaListener { + name: KafkaListenerName::Bootstrap, + host: LISTENER_LOCAL_ADDRESS.to_string(), + port: kafka_security.bootstrap_port().to_string(), + }); + advertised_listeners.push(KafkaListener { + name: KafkaListenerName::Bootstrap, + host: node_address_cmd(STACKABLE_LISTENER_BROKER_DIR), + port: node_port_cmd( + STACKABLE_LISTENER_BROKER_DIR, + kafka_security.client_port_name(), + ), + }); + listener_security_protocol_map + .insert(KafkaListenerName::Bootstrap, KafkaListenerProtocol::SaslSsl); + } + Ok(KafkaListenerConfig { listeners, advertised_listeners, @@ -191,7 +237,7 @@ pub fn get_kafka_listener_config( }) } -fn node_address_cmd(directory: &str) -> String { +pub fn node_address_cmd(directory: &str) -> String { format!("$(cat {directory}/default-address/address)") } @@ -199,7 +245,7 @@ fn node_port_cmd(directory: &str, port_name: &str) -> String { format!("$(cat {directory}/default-address/ports/{port_name})") } -fn pod_fqdn( +pub fn pod_fqdn( kafka: &KafkaCluster, object_name: &str, cluster_info: &KubernetesClusterInfo, @@ -221,8 +267,9 @@ mod tests { builder::meta::ObjectMetaBuilder, commons::{ authentication::{ - tls::AuthenticationProvider, AuthenticationClass, AuthenticationClassProvider, - AuthenticationClassSpec, + kerberos, + tls::{self}, + AuthenticationClass, AuthenticationClassProvider, AuthenticationClassSpec, }, networking::DomainName, }, @@ -261,7 +308,7 @@ mod tests { ResolvedAuthenticationClasses::new(vec![AuthenticationClass { metadata: ObjectMetaBuilder::new().name("auth-class").build(), spec: AuthenticationClassSpec { - provider: AuthenticationClassProvider::Tls(AuthenticationProvider { + provider: AuthenticationClassProvider::Tls(tls::AuthenticationProvider { client_cert_secret_class: Some("client-auth-secret-class".to_string()), }), }, @@ -269,6 +316,7 @@ mod tests { "internalTls".to_string(), Some("tls".to_string()), ); + let config = get_kafka_listener_config(&kafka, &kafka_security, object_name, &cluster_info).unwrap(); @@ -308,25 +356,10 @@ mod tests { name = KafkaListenerName::ClientAuth, protocol = KafkaListenerProtocol::Ssl, internal_name = KafkaListenerName::Internal, - internal_protocol = KafkaListenerProtocol::Ssl + internal_protocol = KafkaListenerProtocol::Ssl, ) ); - let input = r#" - apiVersion: kafka.stackable.tech/v1alpha1 - kind: KafkaCluster - metadata: - name: simple-kafka - namespace: default - spec: - image: - productVersion: 3.7.1 - clusterConfig: - tls: - serverSecretClass: tls - zookeeperConfigMapName: xyz - "#; - let kafka: KafkaCluster = serde_yaml::from_str(input).expect("illegal test input"); let kafka_security = KafkaTlsSecurity::new( ResolvedAuthenticationClasses::new(vec![]), "tls".to_string(), @@ -371,11 +404,66 @@ mod tests { name = KafkaListenerName::Client, protocol = KafkaListenerProtocol::Ssl, internal_name = KafkaListenerName::Internal, - internal_protocol = KafkaListenerProtocol::Ssl + internal_protocol = KafkaListenerProtocol::Ssl, + ) + ); + + let kafka_security = KafkaTlsSecurity::new( + ResolvedAuthenticationClasses::new(vec![]), + "".to_string(), + None, + ); + + let config = + get_kafka_listener_config(&kafka, &kafka_security, object_name, &cluster_info).unwrap(); + + assert_eq!( + config.listeners(), + format!( + "{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port}", + name = KafkaListenerName::Client, + host = LISTENER_LOCAL_ADDRESS, + port = kafka_security.client_port(), + internal_name = KafkaListenerName::Internal, + internal_host = LISTENER_LOCAL_ADDRESS, + internal_port = kafka_security.internal_port(), + ) + ); + + assert_eq!( + config.advertised_listeners(), + format!( + "{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port}", + name = KafkaListenerName::Client, + host = node_address_cmd(STACKABLE_LISTENER_BROKER_DIR), + port = node_port_cmd( + STACKABLE_LISTENER_BROKER_DIR, + kafka_security.client_port_name() + ), + internal_name = KafkaListenerName::Internal, + internal_host = pod_fqdn(&kafka, object_name, &cluster_info).unwrap(), + internal_port = kafka_security.internal_port(), + ) + ); + + assert_eq!( + config.listener_security_protocol_map(), + format!( + "{name}:{protocol},{internal_name}:{internal_protocol}", + name = KafkaListenerName::Client, + protocol = KafkaListenerProtocol::Plaintext, + internal_name = KafkaListenerName::Internal, + internal_protocol = KafkaListenerProtocol::Plaintext, ) ); + } + + #[test] + fn test_get_kafka_kerberos_listeners_config() { + let object_name = "simple-kafka-broker-default"; + let cluster_info = default_cluster_info(); - let input = r#" + let kafka_cluster = r#" apiVersion: kafka.stackable.tech/v1alpha1 kind: KafkaCluster metadata: @@ -384,39 +472,52 @@ mod tests { spec: image: productVersion: 3.7.1 - zookeeperConfigMapName: xyz clusterConfig: + authentication: + - authenticationClass: kafka-kerberos tls: - internalSecretClass: null - serverSecretClass: null + serverSecretClass: tls zookeeperConfigMapName: xyz "#; - let kafka: KafkaCluster = serde_yaml::from_str(input).expect("illegal test input"); + let kafka: KafkaCluster = serde_yaml::from_str(kafka_cluster).expect("illegal test input"); let kafka_security = KafkaTlsSecurity::new( - ResolvedAuthenticationClasses::new(vec![]), - "".to_string(), - None, + ResolvedAuthenticationClasses::new(vec![AuthenticationClass { + metadata: ObjectMetaBuilder::new().name("auth-class").build(), + spec: AuthenticationClassSpec { + provider: AuthenticationClassProvider::Kerberos( + kerberos::AuthenticationProvider { + kerberos_secret_class: "kerberos-secret-class".to_string(), + }, + ), + }, + }]), + "tls".to_string(), + Some("tls".to_string()), ); + let config = get_kafka_listener_config(&kafka, &kafka_security, object_name, &cluster_info).unwrap(); assert_eq!( config.listeners(), format!( - "{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port}", + "{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port},{bootstrap_name}://{bootstrap_host}:{bootstrap_port}", name = KafkaListenerName::Client, host = LISTENER_LOCAL_ADDRESS, port = kafka_security.client_port(), internal_name = KafkaListenerName::Internal, internal_host = LISTENER_LOCAL_ADDRESS, internal_port = kafka_security.internal_port(), + bootstrap_name = KafkaListenerName::Bootstrap, + bootstrap_host = LISTENER_LOCAL_ADDRESS, + bootstrap_port = kafka_security.bootstrap_port(), ) ); assert_eq!( config.advertised_listeners(), format!( - "{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port}", + "{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port},{bootstrap_name}://{bootstrap_host}:{bootstrap_port}", name = KafkaListenerName::Client, host = node_address_cmd(STACKABLE_LISTENER_BROKER_DIR), port = node_port_cmd( @@ -426,17 +527,26 @@ mod tests { internal_name = KafkaListenerName::Internal, internal_host = pod_fqdn(&kafka, object_name, &cluster_info).unwrap(), internal_port = kafka_security.internal_port(), + bootstrap_name = KafkaListenerName::Bootstrap, + bootstrap_host = node_address_cmd(STACKABLE_LISTENER_BROKER_DIR), + bootstrap_port = node_port_cmd( + STACKABLE_LISTENER_BROKER_DIR, + kafka_security.client_port_name() + ), ) ); assert_eq!( config.listener_security_protocol_map(), format!( - "{name}:{protocol},{internal_name}:{internal_protocol}", + "{name}:{protocol},{internal_name}:{internal_protocol},{bootstrap_name}:{bootstrap_protocol}", name = KafkaListenerName::Client, - protocol = KafkaListenerProtocol::Plaintext, + protocol = KafkaListenerProtocol::SaslSsl, internal_name = KafkaListenerName::Internal, - internal_protocol = KafkaListenerProtocol::Plaintext + internal_protocol = KafkaListenerProtocol::Ssl, + bootstrap_name = KafkaListenerName::Bootstrap, + bootstrap_protocol = KafkaListenerProtocol::SaslSsl, + ) ); } diff --git a/rust/crd/src/security.rs b/rust/crd/src/security.rs index 48a8d2de..5b4fe9b5 100644 --- a/rust/crd/src/security.rs +++ b/rust/crd/src/security.rs @@ -7,7 +7,7 @@ use std::collections::BTreeMap; use indoc::formatdoc; -use snafu::{ResultExt, Snafu}; +use snafu::{ensure, ResultExt, Snafu}; use stackable_operator::{ builder::{ self, @@ -32,7 +32,11 @@ use crate::{ tls, KafkaCluster, LISTENER_BOOTSTRAP_VOLUME_NAME, SERVER_PROPERTIES_FILE, STACKABLE_CONFIG_DIR, }; -use crate::{LISTENER_BROKER_VOLUME_NAME, STACKABLE_LOG_DIR}; +use crate::{ + listener::node_address_cmd, STACKABLE_KERBEROS_KRB5_PATH, STACKABLE_LISTENER_BOOTSTRAP_DIR, + STACKABLE_LISTENER_BROKER_DIR, +}; +use crate::{KafkaRole, LISTENER_BROKER_VOLUME_NAME, STACKABLE_LOG_DIR}; #[derive(Snafu, Debug)] pub enum Error { @@ -51,6 +55,9 @@ pub enum Error { AddVolumeMount { source: builder::pod::container::Error, }, + + #[snafu(display("kerberos enablement requires TLS activation"))] + KerberosRequiresTls, } /// Helper struct combining TLS settings for server and internal with the resolved AuthenticationClasses @@ -66,6 +73,14 @@ impl KafkaTlsSecurity { pub const CLIENT_PORT: u16 = 9092; pub const SECURE_CLIENT_PORT_NAME: &'static str = "kafka-tls"; pub const SECURE_CLIENT_PORT: u16 = 9093; + // bootstrap: we will have a single named port with different values for + // secure (9095) and insecure (9094). The bootstrap listener is needed to + // be able to expose principals for both the broker and bootstrap in the + // JAAS configuration, so that clients can use both. + pub const BOOTSTRAP_PORT_NAME: &'static str = "bootstrap"; + pub const BOOTSTRAP_PORT: u16 = 9094; + pub const SECURE_BOOTSTRAP_PORT: u16 = 9095; + // internal pub const INTERNAL_PORT: u16 = 19092; pub const SECURE_INTERNAL_PORT: u16 = 19093; // - TLS global @@ -79,6 +94,18 @@ impl KafkaTlsSecurity { const CLIENT_SSL_TRUSTSTORE_PASSWORD: &'static str = "listener.name.client.ssl.truststore.password"; const CLIENT_SSL_TRUSTSTORE_TYPE: &'static str = "listener.name.client.ssl.truststore.type"; + // - Bootstrapper + const BOOTSTRAP_SSL_KEYSTORE_LOCATION: &'static str = + "listener.name.bootstrap.ssl.keystore.location"; + const BOOTSTRAP_SSL_KEYSTORE_PASSWORD: &'static str = + "listener.name.bootstrap.ssl.keystore.password"; + const BOOTSTRAP_SSL_KEYSTORE_TYPE: &'static str = "listener.name.bootstrap.ssl.keystore.type"; + const BOOTSTRAP_SSL_TRUSTSTORE_LOCATION: &'static str = + "listener.name.bootstrap.ssl.truststore.location"; + const BOOTSTRAP_SSL_TRUSTSTORE_PASSWORD: &'static str = + "listener.name.bootstrap.ssl.truststore.password"; + const BOOTSTRAP_SSL_TRUSTSTORE_TYPE: &'static str = + "listener.name.bootstrap.ssl.truststore.type"; // - TLS client authentication const CLIENT_AUTH_SSL_KEYSTORE_LOCATION: &'static str = "listener.name.client_auth.ssl.keystore.location"; @@ -187,6 +214,39 @@ impl KafkaTlsSecurity { } } + pub fn has_kerberos_enabled(&self) -> bool { + self.kerberos_secret_class().is_some() + } + + pub fn kerberos_secret_class(&self) -> Option { + if let Some(kerberos) = self + .resolved_authentication_classes + .get_kerberos_authentication_class() + { + match &kerberos.spec.provider { + AuthenticationClassProvider::Kerberos(kerberos) => { + Some(kerberos.kerberos_secret_class.clone()) + } + _ => None, + } + } else { + None + } + } + + pub fn validate_authentication_methods(&self) -> Result<(), Error> { + // Client TLS authentication and Kerberos authentication are mutually + // exclusive, but this has already been checked when checking the + // authentication classes. When users enable Kerberos we require them + // to also enable TLS for a) maximum security and b) to limit the + // number of combinations we need to support. + if self.has_kerberos_enabled() { + ensure!(self.server_secret_class.is_some(), KerberosRequiresTlsSnafu); + } + + Ok(()) + } + /// Return the Kafka (secure) client port depending on tls or authentication settings. pub fn client_port(&self) -> u16 { if self.tls_enabled() { @@ -196,6 +256,18 @@ impl KafkaTlsSecurity { } } + pub fn bootstrap_port(&self) -> u16 { + if self.tls_enabled() { + Self::SECURE_BOOTSTRAP_PORT + } else { + Self::BOOTSTRAP_PORT + } + } + + pub fn bootstrap_port_name(&self) -> &str { + Self::BOOTSTRAP_PORT_NAME + } + /// Return the Kafka (secure) client port name depending on tls or authentication settings. pub fn client_port_name(&self) -> &str { if self.tls_enabled() { @@ -215,19 +287,46 @@ impl KafkaTlsSecurity { } /// Returns the commands for the kcat readiness probe. - pub fn kcat_prober_container_commands(&self) -> Vec { - let mut args = vec!["/stackable/kcat".to_string()]; + pub fn kcat_prober_container_commands(&self, pod_fqdn: &String) -> Vec { + let mut args = vec![]; let port = self.client_port(); if self.tls_client_authentication_class().is_some() { + args.push("/stackable/kcat".to_string()); args.push("-b".to_string()); args.push(format!("localhost:{}", port)); args.extend(Self::kcat_client_auth_ssl(Self::STACKABLE_TLS_KCAT_DIR)); + } else if self.has_kerberos_enabled() { + let service_name = KafkaRole::Broker.kerberos_service_name(); + // here we need to specify a shell so that variable substitution will work + // see e.g. https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1ExecAction.md + args.push("/bin/bash".to_string()); + args.push("-x".to_string()); + args.push("-euo".to_string()); + args.push("pipefail".to_string()); + args.push("-c".to_string()); + args.push( + format!( + "export KERBEROS_REALM=$(grep -oP 'default_realm = \\K.*' {});", + STACKABLE_KERBEROS_KRB5_PATH + ) + .to_string(), + ); + args.push("/stackable/kcat".to_string()); + args.push("-b".to_string()); + args.push(format!("{pod_fqdn}:{port}")); + args.extend(Self::kcat_client_sasl_ssl( + Self::STACKABLE_TLS_KCAT_DIR, + service_name, + pod_fqdn, + )); } else if self.tls_server_secret_class().is_some() { + args.push("/stackable/kcat".to_string()); args.push("-b".to_string()); args.push(format!("localhost:{}", port)); args.extend(Self::kcat_client_ssl(Self::STACKABLE_TLS_KCAT_DIR)); } else { + args.push("/stackable/kcat".to_string()); args.push("-b".to_string()); args.push(format!("localhost:{}", port)); } @@ -241,12 +340,14 @@ impl KafkaTlsSecurity { &self, kafka_listeners: &KafkaListenerConfig, opa_connect_string: Option<&str>, + kerberos_enabled: bool, ) -> Vec { vec![formatdoc! {" {COMMON_BASH_TRAP_FUNCTIONS} {remove_vector_shutdown_file_command} prepare_signal_handlers - bin/kafka-server-start.sh {STACKABLE_CONFIG_DIR}/{SERVER_PROPERTIES_FILE} --override \"zookeeper.connect=$ZOOKEEPER\" --override \"listeners={listeners}\" --override \"advertised.listeners={advertised_listeners}\" --override \"listener.security.protocol.map={listener_security_protocol_map}\"{opa_config} & + {set_realm_env} + bin/kafka-server-start.sh {STACKABLE_CONFIG_DIR}/{SERVER_PROPERTIES_FILE} --override \"zookeeper.connect=$ZOOKEEPER\" --override \"listeners={listeners}\" --override \"advertised.listeners={advertised_listeners}\" --override \"listener.security.protocol.map={listener_security_protocol_map}\"{opa_config}{jaas_config} & wait_for_termination $! {create_vector_shutdown_file_command} ", @@ -254,13 +355,26 @@ impl KafkaTlsSecurity { remove_vector_shutdown_file_command(STACKABLE_LOG_DIR), create_vector_shutdown_file_command = create_vector_shutdown_file_command(STACKABLE_LOG_DIR), + set_realm_env = match kerberos_enabled { + true => format!("export KERBEROS_REALM=$(grep -oP 'default_realm = \\K.*' {})", STACKABLE_KERBEROS_KRB5_PATH), + false => "".to_string(), + }, listeners = kafka_listeners.listeners(), advertised_listeners = kafka_listeners.advertised_listeners(), listener_security_protocol_map = kafka_listeners.listener_security_protocol_map(), opa_config = match opa_connect_string { None => "".to_string(), Some(opa_connect_string) => format!(" --override \"opa.authorizer.url={opa_connect_string}\""), - } + }, + jaas_config = match kerberos_enabled { + true => { + let service_name = KafkaRole::Broker.kerberos_service_name(); + let broker_address = node_address_cmd(STACKABLE_LISTENER_BROKER_DIR); + let bootstrap_address = node_address_cmd(STACKABLE_LISTENER_BOOTSTRAP_DIR); + // TODO replace client and bootstrap below with constants + format!(" --override \"listener.name.client.gssapi.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true isInitiator=false keyTab=\\\"/stackable/kerberos/keytab\\\" principal=\\\"{service_name}/{broker_address}@$KERBEROS_REALM\\\";\" --override \"listener.name.bootstrap.gssapi.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true isInitiator=false keyTab=\\\"/stackable/kerberos/keytab\\\" principal=\\\"{service_name}/{bootstrap_address}@$KERBEROS_REALM\\\";\"").to_string()}, + false => "".to_string(), + }, }] } @@ -385,6 +499,34 @@ impl KafkaTlsSecurity { ); } + if self.has_kerberos_enabled() { + // Bootstrap + config.insert( + Self::BOOTSTRAP_SSL_KEYSTORE_LOCATION.to_string(), + format!("{}/keystore.p12", Self::STACKABLE_TLS_KAFKA_SERVER_DIR), + ); + config.insert( + Self::BOOTSTRAP_SSL_KEYSTORE_PASSWORD.to_string(), + Self::SSL_STORE_PASSWORD.to_string(), + ); + config.insert( + Self::BOOTSTRAP_SSL_KEYSTORE_TYPE.to_string(), + "PKCS12".to_string(), + ); + config.insert( + Self::BOOTSTRAP_SSL_TRUSTSTORE_LOCATION.to_string(), + format!("{}/truststore.p12", Self::STACKABLE_TLS_KAFKA_SERVER_DIR), + ); + config.insert( + Self::BOOTSTRAP_SSL_TRUSTSTORE_PASSWORD.to_string(), + Self::SSL_STORE_PASSWORD.to_string(), + ); + config.insert( + Self::BOOTSTRAP_SSL_TRUSTSTORE_TYPE.to_string(), + "PKCS12".to_string(), + ); + } + // Internal TLS if self.tls_internal_secret_class().is_some() { config.insert( @@ -417,6 +559,20 @@ impl KafkaTlsSecurity { ); } + // Kerberos + if self.has_kerberos_enabled() { + config.insert("sasl.enabled.mechanisms".to_string(), "GSSAPI".to_string()); + config.insert( + "sasl.kerberos.service.name".to_string(), + KafkaRole::Broker.kerberos_service_name().to_string(), + ); + config.insert( + "sasl.mechanism.inter.broker.protocol".to_string(), + "GSSAPI".to_string(), + ); + tracing::debug!("Kerberos configs added: [{:#?}]", config); + } + // common config.insert( Self::INTER_BROKER_LISTENER_NAME.to_string(), @@ -489,4 +645,25 @@ impl KafkaTlsSecurity { format!("ssl.ca.location={cert_directory}/ca.crt"), ] } + + fn kcat_client_sasl_ssl( + cert_directory: &str, + service_name: &str, + pod_fqdn: &String, + ) -> Vec { + vec![ + "-X".to_string(), + "security.protocol=SASL_SSL".to_string(), + "-X".to_string(), + format!("ssl.ca.location={cert_directory}/ca.crt"), + "-X".to_string(), + "sasl.kerberos.keytab=/stackable/kerberos/keytab".to_string(), + "-X".to_string(), + "sasl.mechanism=GSSAPI".to_string(), + "-X".to_string(), + format!("sasl.kerberos.service.name={service_name}"), + "-X".to_string(), + format!("sasl.kerberos.principal={service_name}/{pod_fqdn}@$KERBEROS_REALM"), + ] + } } diff --git a/rust/operator-binary/src/discovery.rs b/rust/operator-binary/src/discovery.rs index 218de7ba..70e6da0d 100644 --- a/rust/operator-binary/src/discovery.rs +++ b/rust/operator-binary/src/discovery.rs @@ -60,7 +60,11 @@ pub async fn build_discovery_configmaps( listeners: &[Listener], ) -> Result, Error> { let name = owner.name_unchecked(); - let port_name = kafka_security.client_port_name(); + let port_name = if kafka_security.has_kerberos_enabled() { + kafka_security.bootstrap_port_name() + } else { + kafka_security.client_port_name() + }; Ok(vec![ build_discovery_configmap( kafka, diff --git a/rust/operator-binary/src/kafka_controller.rs b/rust/operator-binary/src/kafka_controller.rs index 9a202d88..9d1a933f 100644 --- a/rust/operator-binary/src/kafka_controller.rs +++ b/rust/operator-binary/src/kafka_controller.rs @@ -12,13 +12,14 @@ use product_config::{ }; use snafu::{OptionExt, ResultExt, Snafu}; use stackable_kafka_crd::{ - listener::get_kafka_listener_config, security::KafkaTlsSecurity, Container, KafkaCluster, - KafkaClusterStatus, KafkaConfig, KafkaRole, APP_NAME, DOCKER_IMAGE_BASE_NAME, - JVM_SECURITY_PROPERTIES_FILE, KAFKA_HEAP_OPTS, LISTENER_BOOTSTRAP_VOLUME_NAME, - LISTENER_BROKER_VOLUME_NAME, LOG_DIRS_VOLUME_NAME, METRICS_PORT, METRICS_PORT_NAME, - OPERATOR_NAME, SERVER_PROPERTIES_FILE, STACKABLE_CONFIG_DIR, STACKABLE_DATA_DIR, - STACKABLE_LISTENER_BOOTSTRAP_DIR, STACKABLE_LISTENER_BROKER_DIR, STACKABLE_LOG_CONFIG_DIR, - STACKABLE_LOG_DIR, + listener::{get_kafka_listener_config, pod_fqdn, KafkaListenerError}, + security::KafkaTlsSecurity, + Container, KafkaCluster, KafkaClusterStatus, KafkaConfig, KafkaRole, APP_NAME, + DOCKER_IMAGE_BASE_NAME, JVM_SECURITY_PROPERTIES_FILE, KAFKA_HEAP_OPTS, + LISTENER_BOOTSTRAP_VOLUME_NAME, LISTENER_BROKER_VOLUME_NAME, LOG_DIRS_VOLUME_NAME, + METRICS_PORT, METRICS_PORT_NAME, OPERATOR_NAME, SERVER_PROPERTIES_FILE, STACKABLE_CONFIG_DIR, + STACKABLE_DATA_DIR, STACKABLE_LISTENER_BOOTSTRAP_DIR, STACKABLE_LISTENER_BROKER_DIR, + STACKABLE_LOG_CONFIG_DIR, STACKABLE_LOG_DIR, }; use stackable_operator::{ builder::{ @@ -83,6 +84,7 @@ use strum::{EnumDiscriminants, IntoStaticStr}; use crate::{ discovery::{self, build_discovery_configmaps}, + kerberos::{self, add_kerberos_pod_config}, operations::{ graceful_shutdown::{add_graceful_shutdown_config, graceful_shutdown_config_properties}, pdb::add_pdbs, @@ -325,8 +327,16 @@ pub enum Error { source: stackable_kafka_crd::security::Error, }, - #[snafu(display("failed to configure logging"))] - ConfigureLogging { source: LoggingError }, + #[snafu(display("failed to resolve the fully-qualified pod name"))] + ResolveNamespace { source: KafkaListenerError }, + + #[snafu(display("failed to add kerberos config"))] + AddKerberosConfig { source: kerberos::Error }, + + #[snafu(display("failed to validate authentication method"))] + FailedToValidateAuthenticationMethod { + source: stackable_kafka_crd::security::Error, + }, #[snafu(display("failed to add needed volume"))] AddVolume { source: builder::pod::Error }, @@ -336,6 +346,9 @@ pub enum Error { source: builder::pod::container::Error, }, + #[snafu(display("failed to configure logging"))] + ConfigureLogging { source: LoggingError }, + #[snafu(display("KafkaCluster object is invalid"))] InvalidKafkaCluster { source: error_boundary::InvalidObject, @@ -401,6 +414,9 @@ impl ReconcilerError for Error { Error::ConfigureLogging { .. } => None, Error::AddVolume { .. } => None, Error::AddVolumeMount { .. } => None, + Error::ResolveNamespace { .. } => None, + Error::AddKerberosConfig { .. } => None, + Error::FailedToValidateAuthenticationMethod { .. } => None, Error::InvalidKafkaCluster { .. } => None, } } @@ -467,6 +483,18 @@ pub async fn reconcile_kafka( .await .context(FailedToInitializeSecurityContextSnafu)?; + tracing::debug!( + kerberos_enabled = kafka_security.has_kerberos_enabled(), + kerberos_secret_class = ?kafka_security.kerberos_secret_class(), + tls_enabled = kafka_security.tls_enabled(), + tls_client_authentication_class = ?kafka_security.tls_client_authentication_class(), + "The following security settings are used" + ); + + kafka_security + .validate_authentication_methods() + .context(FailedToValidateAuthenticationMethodSnafu)?; + // Assemble the OPA connection string from the discovery and the given path if provided // Will be passed as --override parameter in the cli in the state ful set let opa_connect = if let Some(opa_spec) = &kafka.spec.cluster_config.authorization.opa { @@ -720,6 +748,9 @@ fn build_broker_rolegroup_config_map( })?, ); + tracing::debug!(?server_cfg, "Applied server config"); + tracing::debug!(?jvm_sec_props, "Applied JVM config"); + extend_role_group_config_map( rolegroup, vector_aggregator_address, @@ -786,7 +817,7 @@ fn build_broker_rolegroup_service( #[allow(clippy::too_many_arguments)] fn build_broker_rolegroup_statefulset( kafka: &KafkaCluster, - role: &KafkaRole, + kafka_role: &KafkaRole, resolved_product_image: &ResolvedProductImage, rolegroup_ref: &RoleGroupRef, broker_config: &HashMap>, @@ -796,7 +827,7 @@ fn build_broker_rolegroup_statefulset( sa_name: &str, cluster_info: &KubernetesClusterInfo, ) -> Result { - let role = kafka.role(role).context(InternalOperatorSnafu)?; + let role = kafka.role(kafka_role).context(InternalOperatorSnafu)?; let rolegroup = kafka .rolegroup(rolegroup_ref) .context(InternalOperatorSnafu)?; @@ -852,6 +883,17 @@ fn build_broker_rolegroup_statefulset( .unwrap(), ); + if kafka_security.has_kerberos_enabled() { + add_kerberos_pod_config( + kafka_security, + kafka_role, + &mut cb_kcat_prober, + &mut cb_kafka, + &mut pod_builder, + ) + .context(AddKerberosConfigSnafu)?; + } + let mut env = broker_config .get(&PropertyNameKind::Env) .into_iter() @@ -909,6 +951,7 @@ fn build_broker_rolegroup_statefulset( let jvm_args = format!( "-Djava.security.properties={STACKABLE_CONFIG_DIR}/{JVM_SECURITY_PROPERTIES_FILE} -javaagent:/stackable/jmx/jmx_prometheus_javaagent.jar={METRICS_PORT}:/stackable/jmx/broker.yaml", ); + let kafka_listeners = get_kafka_listener_config( kafka, kafka_security, @@ -927,7 +970,11 @@ fn build_broker_rolegroup_statefulset( "-c".to_string(), ]) .args(vec![kafka_security - .kafka_container_commands(&kafka_listeners, opa_connect_string) + .kafka_container_commands( + &kafka_listeners, + opa_connect_string, + kafka_security.has_kerberos_enabled(), + ) .join("\n")]) .add_env_var("EXTRA_ARGS", jvm_args) .add_env_var( @@ -953,11 +1000,24 @@ fn build_broker_rolegroup_statefulset( .context(AddVolumeMountSnafu)? .resources(merged_config.resources.clone().into()); + let pod_fqdn = pod_fqdn(kafka, &rolegroup_ref.object_name(), cluster_info) + .context(ResolveNamespaceSnafu)?; // Use kcat sidecar for probing container status rather than the official Kafka tools, since they incur a lot of // unacceptable perf overhead cb_kcat_prober .image_from_product_image(resolved_product_image) .command(vec!["sleep".to_string(), "infinity".to_string()]) + .add_env_vars(vec![EnvVar { + name: "POD_NAME".to_string(), + value_from: Some(EnvVarSource { + field_ref: Some(ObjectFieldSelector { + api_version: Some("v1".to_string()), + field_path: "metadata.name".to_string(), + }), + ..EnvVarSource::default() + }), + ..EnvVar::default() + }]) .resources( ResourceRequirementsBuilder::new() .with_cpu_request("100m") @@ -971,7 +1031,7 @@ fn build_broker_rolegroup_statefulset( .readiness_probe(Probe { exec: Some(ExecAction { // If the broker is able to get its fellow cluster members then it has at least completed basic registration at some point - command: Some(kafka_security.kcat_prober_container_commands()), + command: Some(kafka_security.kcat_prober_container_commands(&pod_fqdn)), }), timeout_seconds: Some(5), period_seconds: Some(2), @@ -1128,7 +1188,7 @@ pub fn error_policy( /// We only expose client HTTP / HTTPS and Metrics ports. fn listener_ports(kafka_security: &KafkaTlsSecurity) -> Vec { - vec![ + let mut ports = vec![ ListenerPort { name: METRICS_PORT_NAME.to_string(), port: METRICS_PORT.into(), @@ -1139,12 +1199,20 @@ fn listener_ports(kafka_security: &KafkaTlsSecurity) -> Vec { port: kafka_security.client_port().into(), protocol: Some("TCP".to_string()), }, - ] + ]; + if kafka_security.has_kerberos_enabled() { + ports.push(ListenerPort { + name: kafka_security.bootstrap_port_name().to_string(), + port: kafka_security.bootstrap_port().into(), + protocol: Some("TCP".to_string()), + }); + } + ports } /// We only expose client HTTP / HTTPS and Metrics ports. fn container_ports(kafka_security: &KafkaTlsSecurity) -> Vec { - vec![ + let mut ports = vec![ ContainerPort { name: Some(METRICS_PORT_NAME.to_string()), container_port: METRICS_PORT.into(), @@ -1157,5 +1225,14 @@ fn container_ports(kafka_security: &KafkaTlsSecurity) -> Vec { protocol: Some("TCP".to_string()), ..ContainerPort::default() }, - ] + ]; + if kafka_security.has_kerberos_enabled() { + ports.push(ContainerPort { + name: Some(kafka_security.bootstrap_port_name().to_string()), + container_port: kafka_security.bootstrap_port().into(), + protocol: Some("TCP".to_string()), + ..ContainerPort::default() + }); + } + ports } diff --git a/rust/operator-binary/src/kerberos.rs b/rust/operator-binary/src/kerberos.rs new file mode 100644 index 00000000..7558bd91 --- /dev/null +++ b/rust/operator-binary/src/kerberos.rs @@ -0,0 +1,70 @@ +use snafu::{ResultExt, Snafu}; +use stackable_kafka_crd::{security::KafkaTlsSecurity, KafkaRole}; +use stackable_kafka_crd::{ + LISTENER_BOOTSTRAP_VOLUME_NAME, LISTENER_BROKER_VOLUME_NAME, STACKABLE_KERBEROS_DIR, + STACKABLE_KERBEROS_KRB5_PATH, +}; +use stackable_operator::builder::{ + self, + pod::{ + container::ContainerBuilder, + volume::{ + SecretOperatorVolumeSourceBuilder, SecretOperatorVolumeSourceBuilderError, + VolumeBuilder, + }, + PodBuilder, + }, +}; + +#[derive(Snafu, Debug)] +pub enum Error { + #[snafu(display("failed to add Kerberos secret volume"))] + KerberosSecretVolume { + source: SecretOperatorVolumeSourceBuilderError, + }, + + #[snafu(display("failed to add needed volume"))] + AddVolume { source: builder::pod::Error }, + + #[snafu(display("failed to add needed volumeMount"))] + AddVolumeMount { + source: builder::pod::container::Error, + }, +} + +pub fn add_kerberos_pod_config( + kafka_security: &KafkaTlsSecurity, + role: &KafkaRole, + cb_kcat_prober: &mut ContainerBuilder, + cb_kafka: &mut ContainerBuilder, + pb: &mut PodBuilder, +) -> Result<(), Error> { + if let Some(kerberos_secret_class) = kafka_security.kerberos_secret_class() { + // Mount keytab + let kerberos_secret_operator_volume = + SecretOperatorVolumeSourceBuilder::new(kerberos_secret_class) + .with_listener_volume_scope(LISTENER_BROKER_VOLUME_NAME) + .with_listener_volume_scope(LISTENER_BOOTSTRAP_VOLUME_NAME) + .with_kerberos_service_name(role.kerberos_service_name()) + .build() + .context(KerberosSecretVolumeSnafu)?; + pb.add_volume( + VolumeBuilder::new("kerberos") + .ephemeral(kerberos_secret_operator_volume) + .build(), + ) + .context(AddVolumeSnafu)?; + + for cb in [cb_kafka, cb_kcat_prober] { + cb.add_volume_mount("kerberos", STACKABLE_KERBEROS_DIR) + .context(AddVolumeMountSnafu)?; + cb.add_env_var("KRB5_CONFIG", STACKABLE_KERBEROS_KRB5_PATH); + cb.add_env_var( + "KAFKA_OPTS", + format!("-Djava.security.krb5.conf={STACKABLE_KERBEROS_KRB5_PATH}",), + ); + } + } + + Ok(()) +} diff --git a/rust/operator-binary/src/main.rs b/rust/operator-binary/src/main.rs index fa2d427b..17ef8a14 100644 --- a/rust/operator-binary/src/main.rs +++ b/rust/operator-binary/src/main.rs @@ -24,6 +24,7 @@ use crate::kafka_controller::KAFKA_CONTROLLER_NAME; mod discovery; mod kafka_controller; +mod kerberos; mod operations; mod product_logging; mod utils; diff --git a/tests/templates/kuttl/kerberos/00-assert.yaml.j2 b/tests/templates/kuttl/kerberos/00-assert.yaml.j2 new file mode 100644 index 00000000..50b1d4c3 --- /dev/null +++ b/tests/templates/kuttl/kerberos/00-assert.yaml.j2 @@ -0,0 +1,10 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +{% if lookup('env', 'VECTOR_AGGREGATOR') %} +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: vector-aggregator-discovery +{% endif %} diff --git a/tests/templates/kuttl/kerberos/00-install-vector-aggregator-discovery-configmap.yaml.j2 b/tests/templates/kuttl/kerberos/00-install-vector-aggregator-discovery-configmap.yaml.j2 new file mode 100644 index 00000000..2d6a0df5 --- /dev/null +++ b/tests/templates/kuttl/kerberos/00-install-vector-aggregator-discovery-configmap.yaml.j2 @@ -0,0 +1,9 @@ +{% if lookup('env', 'VECTOR_AGGREGATOR') %} +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: vector-aggregator-discovery +data: + ADDRESS: {{ lookup('env', 'VECTOR_AGGREGATOR') }} +{% endif %} diff --git a/tests/templates/kuttl/kerberos/00-patch-ns.yaml.j2 b/tests/templates/kuttl/kerberos/00-patch-ns.yaml.j2 new file mode 100644 index 00000000..67185acf --- /dev/null +++ b/tests/templates/kuttl/kerberos/00-patch-ns.yaml.j2 @@ -0,0 +1,9 @@ +{% if test_scenario['values']['openshift'] == 'true' %} +# see https://github.com/stackabletech/issues/issues/566 +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: kubectl patch namespace $NAMESPACE -p '{"metadata":{"labels":{"pod-security.kubernetes.io/enforce":"privileged"}}}' + timeout: 120 +{% endif %} diff --git a/tests/templates/kuttl/kerberos/00-rbac.yaml.j2 b/tests/templates/kuttl/kerberos/00-rbac.yaml.j2 new file mode 100644 index 00000000..7ee61d23 --- /dev/null +++ b/tests/templates/kuttl/kerberos/00-rbac.yaml.j2 @@ -0,0 +1,29 @@ +--- +kind: Role +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: test-role +rules: +{% if test_scenario['values']['openshift'] == "true" %} + - apiGroups: ["security.openshift.io"] + resources: ["securitycontextconstraints"] + resourceNames: ["privileged"] + verbs: ["use"] +{% endif %} +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: test-sa +--- +kind: RoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: test-rb +subjects: + - kind: ServiceAccount + name: test-sa +roleRef: + kind: Role + name: test-role + apiGroup: rbac.authorization.k8s.io diff --git a/tests/templates/kuttl/kerberos/01-assert.yaml.j2 b/tests/templates/kuttl/kerberos/01-assert.yaml.j2 new file mode 100644 index 00000000..d34c1c63 --- /dev/null +++ b/tests/templates/kuttl/kerberos/01-assert.yaml.j2 @@ -0,0 +1,14 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 300 +{% if test_scenario['values']['kerberos-backend'] == 'mit' %} +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: krb5-kdc +status: + readyReplicas: 1 + replicas: 1 +{% endif %} diff --git a/tests/templates/kuttl/kerberos/01-install-krb5-kdc.yaml.j2 b/tests/templates/kuttl/kerberos/01-install-krb5-kdc.yaml.j2 new file mode 100644 index 00000000..fa04cc0d --- /dev/null +++ b/tests/templates/kuttl/kerberos/01-install-krb5-kdc.yaml.j2 @@ -0,0 +1,146 @@ +{% if test_scenario['values']['kerberos-backend'] == 'mit' %} +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: krb5-kdc +spec: + selector: + matchLabels: + app: krb5-kdc + template: + metadata: + labels: + app: krb5-kdc + spec: + serviceAccountName: test-sa + initContainers: + - name: init + image: docker.stackable.tech/stackable/krb5:{{ test_scenario['values']['krb5'] }}-stackable0.0.0-dev + args: + - sh + - -euo + - pipefail + - -c + - | + test -e /var/kerberos/krb5kdc/principal || kdb5_util create -s -P asdf + kadmin.local get_principal -terse root/admin || kadmin.local add_principal -pw asdf root/admin + # stackable-secret-operator principal must match the keytab specified in the SecretClass + kadmin.local get_principal -terse stackable-secret-operator || kadmin.local add_principal -e aes256-cts-hmac-sha384-192:normal -pw asdf stackable-secret-operator + env: + - name: KRB5_CONFIG + value: /stackable/config/krb5.conf + volumeMounts: + - mountPath: /stackable/config + name: config + - mountPath: /var/kerberos/krb5kdc + name: data + containers: + - name: kdc + image: docker.stackable.tech/stackable/krb5:{{ test_scenario['values']['krb5'] }}-stackable0.0.0-dev + args: + - krb5kdc + - -n + env: + - name: KRB5_CONFIG + value: /stackable/config/krb5.conf + volumeMounts: + - mountPath: /stackable/config + name: config + - mountPath: /var/kerberos/krb5kdc + name: data +# Root permissions required on Openshift to bind to privileged port numbers +{% if test_scenario['values']['openshift'] == "true" %} + securityContext: + runAsUser: 0 +{% endif %} + - name: kadmind + image: docker.stackable.tech/stackable/krb5:{{ test_scenario['values']['krb5'] }}-stackable0.0.0-dev + args: + - kadmind + - -nofork + env: + - name: KRB5_CONFIG + value: /stackable/config/krb5.conf + volumeMounts: + - mountPath: /stackable/config + name: config + - mountPath: /var/kerberos/krb5kdc + name: data +# Root permissions required on Openshift to bind to privileged port numbers +{% if test_scenario['values']['openshift'] == "true" %} + securityContext: + runAsUser: 0 +{% endif %} + - name: client + image: docker.stackable.tech/stackable/krb5:{{ test_scenario['values']['krb5'] }}-stackable0.0.0-dev + tty: true + stdin: true + env: + - name: KRB5_CONFIG + value: /stackable/config/krb5.conf + volumeMounts: + - mountPath: /stackable/config + name: config + volumes: + - name: config + configMap: + name: krb5-kdc + volumeClaimTemplates: + - metadata: + name: data + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 1Gi +--- +apiVersion: v1 +kind: Service +metadata: + name: krb5-kdc +spec: + selector: + app: krb5-kdc + ports: + - name: kadmin + port: 749 + - name: kdc + port: 88 + - name: kdc-udp + port: 88 + protocol: UDP +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: krb5-kdc +data: + krb5.conf: | + [logging] + default = STDERR + kdc = STDERR + admin_server = STDERR + # default = FILE:/var/log/krb5libs.log + # kdc = FILE:/var/log/krb5kdc.log + # admin_server = FILE:/vaggr/log/kadmind.log + [libdefaults] + dns_lookup_realm = false + ticket_lifetime = 24h + renew_lifetime = 7d + forwardable = true + rdns = false + default_realm = {{ test_scenario['values']['kerberos-realm'] }} + spake_preauth_groups = edwards25519 + [realms] + {{ test_scenario['values']['kerberos-realm'] }} = { + acl_file = /stackable/config/kadm5.acl + disable_encrypted_timestamp = false + } + [domain_realm] + .cluster.local = {{ test_scenario['values']['kerberos-realm'] }} + cluster.local = {{ test_scenario['values']['kerberos-realm'] }} + kadm5.acl: | + root/admin *e + stackable-secret-operator *e +{% endif %} diff --git a/tests/templates/kuttl/kerberos/02-create-kerberos-secretclass.yaml.j2 b/tests/templates/kuttl/kerberos/02-create-kerberos-secretclass.yaml.j2 new file mode 100644 index 00000000..04ae9a63 --- /dev/null +++ b/tests/templates/kuttl/kerberos/02-create-kerberos-secretclass.yaml.j2 @@ -0,0 +1,72 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: | + kubectl apply -n $NAMESPACE -f - < 0 }} + roleGroups: + default: + replicas: 1 diff --git a/tests/templates/kuttl/kerberos/20-assert.yaml b/tests/templates/kuttl/kerberos/20-assert.yaml new file mode 100644 index 00000000..01ba15d1 --- /dev/null +++ b/tests/templates/kuttl/kerberos/20-assert.yaml @@ -0,0 +1,12 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 600 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: test-kafka-broker-default +status: + readyReplicas: 3 + replicas: 3 diff --git a/tests/templates/kuttl/kerberos/20-install-kafka.yaml.j2 b/tests/templates/kuttl/kerberos/20-install-kafka.yaml.j2 new file mode 100644 index 00000000..fc3a958a --- /dev/null +++ b/tests/templates/kuttl/kerberos/20-install-kafka.yaml.j2 @@ -0,0 +1,55 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: | + kubectl apply -n $NAMESPACE -f - < 0 %} + custom: "{{ test_scenario['values']['kafka'].split(',')[1] }}" + productVersion: "{{ test_scenario['values']['kafka'].split(',')[0] }}" +{% else %} + productVersion: "{{ test_scenario['values']['kafka'] }}" +{% endif %} + pullPolicy: IfNotPresent + clusterConfig: + zookeeperConfigMapName: test-kafka-znode + authentication: + - authenticationClass: kerberos-auth-$NAMESPACE + tls: + # Kerberos requires the use of server and internal TLS! + serverSecretClass: tls +{% if lookup('env', 'VECTOR_AGGREGATOR') %} + vectorAggregatorConfigMapName: vector-aggregator-discovery +{% endif %} + brokers: + config: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} + roleGroups: + default: + replicas: 3 + EOF diff --git a/tests/templates/kuttl/kerberos/30-access-kafka.txt.j2 b/tests/templates/kuttl/kerberos/30-access-kafka.txt.j2 new file mode 100644 index 00000000..7fa56095 --- /dev/null +++ b/tests/templates/kuttl/kerberos/30-access-kafka.txt.j2 @@ -0,0 +1,120 @@ +--- +apiVersion: batch/v1 +kind: Job +metadata: + name: access-kafka +spec: + template: + spec: + serviceAccountName: test-sa + containers: + - name: access-kafka + image: docker.stackable.tech/stackable/kafka:{{ test_scenario['values']['kafka'] }}-stackable0.0.0-dev + command: + - /bin/bash + - /tmp/script/script.sh + env: + - name: KRB5_CONFIG + value: /stackable/kerberos/krb5.conf + - name: KAFKA_OPTS + value: -Djava.security.krb5.conf=/stackable/kerberos/krb5.conf + - name: KAFKA + valueFrom: + configMapKeyRef: + name: test-kafka + key: KAFKA + volumeMounts: + - name: script + mountPath: /tmp/script + - mountPath: /stackable/tls-ca-cert-mount + name: tls-ca-cert-mount + - name: kerberos + mountPath: /stackable/kerberos + volumes: + - name: script + configMap: + name: access-kafka-script + - name: kerberos + ephemeral: + volumeClaimTemplate: + metadata: + annotations: + secrets.stackable.tech/class: kerberos-$NAMESPACE + secrets.stackable.tech/scope: service=access-kafka + secrets.stackable.tech/kerberos.service.names: developer + spec: + storageClassName: secrets.stackable.tech + accessModes: + - ReadWriteOnce + resources: + requests: + storage: "1" + - name: tls-ca-cert-mount + ephemeral: + volumeClaimTemplate: + metadata: + annotations: + secrets.stackable.tech/class: tls + secrets.stackable.tech/scope: pod + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: "1" + storageClassName: secrets.stackable.tech + volumeMode: Filesystem + securityContext: + fsGroup: 1000 + runAsGroup: 1000 + runAsUser: 1000 + restartPolicy: OnFailure +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: access-kafka-script +data: + script.sh: | + set -euxo pipefail + + export KCAT_CONFIG=/stackable/kcat.conf + TOPIC=test-topic + CONSUMER_GROUP=test-consumer-group + + echo -e -n "\ + metadata.broker.list=$KAFKA\n\ + auto.offset.reset=beginning\n\ + security.protocol=SASL_SSL\n\ + ssl.ca.location=/stackable/tls-ca-cert-mount/ca.crt\n\ + sasl.kerberos.keytab=/stackable/kerberos/keytab\n\ + sasl.kerberos.service.name=kafka\n\ + sasl.kerberos.principal=developer/access-kafka.$NAMESPACE.svc.cluster.local@{{ test_scenario['values']['kerberos-realm'] }}\n\ + sasl.mechanism=GSSAPI\n\ + " > $KCAT_CONFIG + + cat $KCAT_CONFIG + + sent_message="Hello Stackable!" + + echo $sent_message | kcat \ + -t $TOPIC \ + -P + + echo Sent message: \"$sent_message\" + + received_message=$(kcat \ + -G $CONSUMER_GROUP \ + -o stored \ + -e \ + $TOPIC) + + echo Received message: \"$received_message\" + + if [ "$received_message" = "$sent_message" ]; then + echo "Test passed" + exit 0 + else + echo "Test failed" + exit 1 + fi diff --git a/tests/templates/kuttl/kerberos/30-access-kafka.yaml b/tests/templates/kuttl/kerberos/30-access-kafka.yaml new file mode 100644 index 00000000..eecc0f08 --- /dev/null +++ b/tests/templates/kuttl/kerberos/30-access-kafka.yaml @@ -0,0 +1,6 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + # We need to replace $NAMESPACE (by KUTTL) + - script: envsubst '$NAMESPACE' < 30-access-kafka.txt | kubectl apply -n $NAMESPACE -f - diff --git a/tests/templates/kuttl/kerberos/30-assert.yaml b/tests/templates/kuttl/kerberos/30-assert.yaml new file mode 100644 index 00000000..edc6c317 --- /dev/null +++ b/tests/templates/kuttl/kerberos/30-assert.yaml @@ -0,0 +1,11 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 600 +--- +apiVersion: batch/v1 +kind: Job +metadata: + name: access-kafka +status: + succeeded: 1 diff --git a/tests/test-definition.yaml b/tests/test-definition.yaml index 6486e1a3..ca7af2e3 100644 --- a/tests/test-definition.yaml +++ b/tests/test-definition.yaml @@ -40,6 +40,19 @@ dimensions: - name: openshift values: - "false" + - name: krb5 + values: + - 1.21.1 + - name: kerberos-realm + values: + - "CLUSTER.LOCAL" + - "PROD.MYCORP" + - name: kerberos-backend + values: + - mit + # Requires manual setup, see create-kerberos-secretclass.yaml + # This will *not* respect the kerberos-realm test attribute, but instead use a hard-coded realm + # - activeDirectory tests: - name: smoke dimensions: @@ -82,6 +95,14 @@ tests: - zookeeper-latest - kafka-latest - openshift + - name: kerberos + dimensions: + - kafka + - zookeeper-latest + - krb5 + - kerberos-realm + - kerberos-backend + - openshift suites: - name: nightly