diff --git a/CHANGELOG.md b/CHANGELOG.md index 88e554bb5..6eb6a81c5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,8 +9,10 @@ All notable changes to this project will be documented in this file. ### Added - Added airlift json source and airlift json transform to vector.toml ([#553]). +- Added commons structs as well as helper functions for Affinity ([#556]). [#553]: https://github.com/stackabletech/operator-rs/pull/553 +[#556]: https://github.com/stackabletech/operator-rs/pull/556 ## [0.34.0] - 2023-02-06 diff --git a/src/builder/pod/mod.rs b/src/builder/pod/mod.rs index b4ad09b2a..5fd42256c 100644 --- a/src/builder/pod/mod.rs +++ b/src/builder/pod/mod.rs @@ -2,7 +2,10 @@ pub mod container; pub mod security; pub mod volume; +use std::collections::BTreeMap; + use crate::builder::meta::ObjectMetaBuilder; +use crate::commons::affinity::StackableAffinity; use crate::commons::product_image_selection::ResolvedProductImage; use crate::error::{Error, OperatorResult}; @@ -10,13 +13,11 @@ use super::{ListenerOperatorVolumeSourceBuilder, ListenerReference, VolumeBuilde use k8s_openapi::apimachinery::pkg::api::resource::Quantity; use k8s_openapi::{ api::core::v1::{ - Affinity, Container, LocalObjectReference, NodeAffinity, NodeSelector, - NodeSelectorRequirement, NodeSelectorTerm, Pod, PodAffinity, PodAntiAffinity, PodCondition, - PodSecurityContext, PodSpec, PodStatus, PodTemplateSpec, Toleration, Volume, + Affinity, Container, LocalObjectReference, NodeAffinity, Pod, PodAffinity, PodAntiAffinity, + PodCondition, PodSecurityContext, PodSpec, PodStatus, PodTemplateSpec, Toleration, Volume, }, - apimachinery::pkg::apis::meta::v1::{LabelSelector, LabelSelectorRequirement, ObjectMeta}, + apimachinery::pkg::apis::meta::v1::ObjectMeta, }; -use std::collections::BTreeMap; /// A builder to build [`Pod`] or [`PodTemplateSpec`] objects. #[derive(Clone, Default)] @@ -26,7 +27,8 @@ pub struct PodBuilder { init_containers: Option>, metadata: Option, node_name: Option, - node_selector: Option, + node_selector: Option>, + node_affinity: Option, pod_affinity: Option, pod_anti_affinity: Option, status: Option, @@ -82,6 +84,16 @@ impl PodBuilder { self } + pub fn node_affinity(&mut self, affinity: NodeAffinity) -> &mut Self { + self.node_affinity = Some(affinity); + self + } + + pub fn node_affinity_opt(&mut self, affinity: Option) -> &mut Self { + self.node_affinity = affinity; + self + } + pub fn pod_affinity(&mut self, affinity: PodAffinity) -> &mut Self { self.pod_affinity = Some(affinity); self @@ -102,16 +114,27 @@ impl PodBuilder { self } - pub fn node_selector(&mut self, node_selector: LabelSelector) -> &mut Self { + pub fn node_selector(&mut self, node_selector: BTreeMap) -> &mut Self { self.node_selector = Some(node_selector); self } - pub fn node_selector_opt(&mut self, node_selector: Option) -> &mut Self { + pub fn node_selector_opt( + &mut self, + node_selector: Option>, + ) -> &mut Self { self.node_selector = node_selector; self } + pub fn affinity(&mut self, affinities: &StackableAffinity) -> &mut Self { + self.pod_affinity = affinities.pod_affinity.clone(); + self.pod_anti_affinity = affinities.pod_anti_affinity.clone(); + self.node_affinity = affinities.node_affinity.clone(); + self.node_selector = affinities.node_selector.clone().map(|ns| ns.node_selector); + self + } + pub fn phase(&mut self, phase: &str) -> &mut Self { let mut status = self.status.get_or_insert_with(PodStatus::default); status.phase = Some(phase.to_string()); @@ -338,58 +361,15 @@ impl PodBuilder { self } - /// Hack because [`Pod`] predates [`LabelSelector`], and so its functionality is split between [`PodSpec::node_selector`] and [`Affinity::node_affinity`] - fn node_selector_for_label_selector( - label_selector: Option, - ) -> (Option>, Option) { - let (node_labels, node_label_exprs) = match label_selector { - Some(LabelSelector { - match_labels, - match_expressions, - }) => (match_labels, match_expressions), - None => (None, None), - }; - - let node_affinity = node_label_exprs.map(|node_label_exprs| NodeAffinity { - required_during_scheduling_ignored_during_execution: Some(NodeSelector { - node_selector_terms: vec![NodeSelectorTerm { - match_expressions: Some( - node_label_exprs - .into_iter() - .map( - |LabelSelectorRequirement { - key, - operator, - values, - }| { - NodeSelectorRequirement { - key, - operator, - values, - } - }, - ) - .collect(), - ), - ..NodeSelectorTerm::default() - }], - }), - ..NodeAffinity::default() - }); - (node_labels, node_affinity) - } - fn build_spec(&self) -> PodSpec { - let (node_selector_labels, node_affinity) = - Self::node_selector_for_label_selector(self.node_selector.clone()); PodSpec { containers: self.containers.clone(), host_network: self.host_network, init_containers: self.init_containers.clone(), node_name: self.node_name.clone(), - node_selector: node_selector_labels, + node_selector: self.node_selector.clone(), affinity: Some(Affinity { - node_affinity, + node_affinity: self.node_affinity.clone(), pod_affinity: self.pod_affinity.clone(), pod_anti_affinity: self.pod_anti_affinity.clone(), }), @@ -463,38 +443,6 @@ mod tests { builder } - // A fixture for a node selector to use on a Pod, and the resulting node selector labels and node affinity. - #[fixture] - fn node_selector1() -> ( - LabelSelector, - Option>, - Option, - ) { - let labels = BTreeMap::from([("key".to_owned(), "value".to_owned())]); - let label_selector = LabelSelector { - match_expressions: Some(vec![LabelSelectorRequirement { - key: "security".to_owned(), - operator: "In".to_owned(), - values: Some(vec!["S1".to_owned(), "S2".to_owned()]), - }]), - match_labels: Some(labels.clone()), - }; - let affinity = Some(NodeAffinity { - required_during_scheduling_ignored_during_execution: Some(NodeSelector { - node_selector_terms: vec![NodeSelectorTerm { - match_expressions: Some(vec![NodeSelectorRequirement { - key: "security".to_owned(), - operator: "In".to_owned(), - values: Some(vec!["S1".to_owned(), "S2".to_owned()]), - }]), - ..Default::default() - }], - }), - ..Default::default() - }); - (label_selector, Some(labels), affinity) - } - #[fixture] fn pod_affinity() -> PodAffinity { PodAffinity { @@ -594,88 +542,4 @@ mod tests { }] ); } - - /// Test if setting a node selector generates the correct node selector labels and node affinity on the Pod. - #[rstest] - fn test_pod_builder_node_selector( - mut pod_builder_with_name_and_container: PodBuilder, - node_selector1: ( - LabelSelector, - Option>, - Option, - ), - ) { - // destruct fixture - let (node_selector, expected_labels, expected_affinity) = node_selector1; - // first test with the normal node_selector function - let pod = pod_builder_with_name_and_container - .clone() - .node_selector(node_selector.clone()) - .build() - .unwrap(); - - let spec = pod.spec.unwrap(); - assert_eq!(spec.node_selector, expected_labels); - assert_eq!(spec.affinity.unwrap().node_affinity, expected_affinity); - - // test the node_selector_opt function - let pod = pod_builder_with_name_and_container - .node_selector_opt(Some(node_selector)) - .build() - .unwrap(); - - // asserts - let spec = pod.spec.unwrap(); - assert_eq!(spec.node_selector, expected_labels); - assert_eq!(spec.affinity.unwrap().node_affinity, expected_affinity); - } - - /// Test if setting a node selector generates the correct node selector labels and node affinity on the Pod, - /// while keeping the manually set Pod affinities. Since they are mangled together, it makes sense to make sure that - /// one is not replacing the other - #[rstest] - fn test_pod_builder_node_selector_and_affinity( - mut pod_builder_with_name_and_container: PodBuilder, - node_selector1: ( - LabelSelector, - Option>, - Option, - ), - pod_affinity: PodAffinity, - pod_anti_affinity: PodAntiAffinity, - ) { - // destruct fixture - let (node_selector, expected_labels, expected_affinity) = node_selector1; - // first test with the normal functions - let pod = pod_builder_with_name_and_container - .clone() - .node_selector(node_selector.clone()) - .pod_affinity(pod_affinity.clone()) - .pod_anti_affinity(pod_anti_affinity.clone()) - .build() - .unwrap(); - - let spec = pod.spec.unwrap(); - assert_eq!(spec.node_selector, expected_labels); - let affinity = spec.affinity.unwrap(); - assert_eq!(affinity.node_affinity, expected_affinity); - assert_eq!(affinity.pod_affinity, Some(pod_affinity.clone())); - assert_eq!(affinity.pod_anti_affinity, Some(pod_anti_affinity.clone())); - - // test the *_opt functions - let pod = pod_builder_with_name_and_container - .node_selector_opt(Some(node_selector)) - .pod_affinity_opt(Some(pod_affinity.clone())) - .pod_anti_affinity_opt(Some(pod_anti_affinity.clone())) - .build() - .unwrap(); - - // asserts - let spec = pod.spec.unwrap(); - assert_eq!(spec.node_selector, expected_labels); - let affinity = spec.affinity.unwrap(); - assert_eq!(affinity.node_affinity, expected_affinity); - assert_eq!(affinity.pod_affinity, Some(pod_affinity)); - assert_eq!(affinity.pod_anti_affinity, Some(pod_anti_affinity)); - } } diff --git a/src/commons/affinity.rs b/src/commons/affinity.rs new file mode 100644 index 000000000..c4a210045 --- /dev/null +++ b/src/commons/affinity.rs @@ -0,0 +1,407 @@ +use std::collections::BTreeMap; + +use k8s_openapi::{ + api::core::v1::{ + NodeAffinity, NodeSelector, NodeSelectorRequirement, NodeSelectorTerm, PodAffinity, + PodAffinityTerm, PodAntiAffinity, WeightedPodAffinityTerm, + }, + apimachinery::pkg::apis::meta::v1::{LabelSelector, LabelSelectorRequirement}, +}; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; +use stackable_operator_derive::Fragment; + +use crate::{ + config::merge::{Atomic, Merge}, + labels::{APP_COMPONENT_LABEL, APP_INSTANCE_LABEL, APP_NAME_LABEL}, +}; + +pub const TOPOLOGY_KEY_HOSTNAME: &str = "kubernetes.io/hostname"; + +#[derive(Clone, Debug, Default, Deserialize, Fragment, JsonSchema, PartialEq, Serialize)] +#[fragment(path_overrides(fragment = "crate::config::fragment"))] +#[fragment_attrs( + derive( + Clone, + Debug, + Default, + Deserialize, + JsonSchema, + Merge, + PartialEq, + Serialize + ), + merge(path_overrides(merge = "crate::config::merge")), + serde(rename_all = "camelCase") +)] +pub struct StackableAffinity { + pub pod_affinity: Option, + pub pod_anti_affinity: Option, + pub node_affinity: Option, + pub node_selector: Option, +} + +impl StackableAffinityFragment { + #[deprecated( + since = "0.36.0", + note = "During https://github.com/stackabletech/issues/issues/323 we moved from the previous selector field on a rolegroup to a more generic affinity handling. \ +We still need to support the old selector field, which has some custom magic (see the code in this function). \ +So we need a way to transform the old into the mechanism which this function offers. \ +It will be removed once we stop supporting the old mechanism." + )] + pub fn add_legacy_selector(&mut self, label_selector: &LabelSelector) { + tracing::warn!("Deprecated field `selector` was specified. Please use the new `affinity` field instead, as support for `selector` will be removed in the future. See https://docs.stackable.tech/home/stable/contributor/adr/ADR026-affinities.html for details"); + + let node_labels = label_selector.match_labels.clone(); + let node_label_exprs = label_selector.match_expressions.clone(); + + let node_affinity = node_label_exprs.map(|node_label_exprs| NodeAffinity { + required_during_scheduling_ignored_during_execution: Some(NodeSelector { + node_selector_terms: vec![NodeSelectorTerm { + match_expressions: Some( + node_label_exprs + .into_iter() + .map( + |LabelSelectorRequirement { + key, + operator, + values, + }| { + NodeSelectorRequirement { + key, + operator, + values, + } + }, + ) + .collect(), + ), + ..NodeSelectorTerm::default() + }], + }), + ..NodeAffinity::default() + }); + + self.node_selector = node_labels.map(|node_labels| StackableNodeSelector { + node_selector: node_labels, + }); + self.node_affinity = node_affinity; + } +} + +#[derive(Clone, Debug, Eq, Deserialize, JsonSchema, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct StackableNodeSelector { + #[serde(flatten)] + pub node_selector: BTreeMap, +} + +impl Atomic for StackableNodeSelector {} + +/// Creates a `WeightedPodAffinityTerm`, which expresses a affinity towards all Pods of the given product (`app_name`) instance (`cluster_name`) role (`role`). +/// This affinity can be used to attract towards (affinity) or away (anti-affinity) from the specified role. +/// One common example would be to use this to distribute all the Pods of a certain role, e.g. hdfs datanodes. +/// An other usage would be to attract the hbase regionservers towards hdfs datanodes. +pub fn affinity_between_role_pods( + app_name: &str, + cluster_name: &str, + role: &str, + weight: i32, +) -> WeightedPodAffinityTerm { + WeightedPodAffinityTerm { + pod_affinity_term: PodAffinityTerm { + label_selector: Some(LabelSelector { + match_expressions: None, + match_labels: Some(BTreeMap::from([ + (APP_NAME_LABEL.to_string(), app_name.to_string()), + (APP_INSTANCE_LABEL.to_string(), cluster_name.to_string()), + (APP_COMPONENT_LABEL.to_string(), role.to_string()), + // We don't include the role-group label here, as the affinity should be between all rolegroups of the given role + ])), + }), + namespace_selector: None, + namespaces: None, + topology_key: TOPOLOGY_KEY_HOSTNAME.to_string(), + }, + weight, + } +} + +/// Creates a `WeightedPodAffinityTerm`, which expresses a affinity towards all Pods of the given product (`app_name`) instance (`cluster_name`). +/// This affinity can be used to attract towards (affinity) or away (anti-affinity) from the specified cluster. +/// One common example would be to use this to co-locate all the Pods of a certain cluster to not have to many network trips. +pub fn affinity_between_cluster_pods( + app_name: &str, + cluster_name: &str, + weight: i32, +) -> WeightedPodAffinityTerm { + WeightedPodAffinityTerm { + pod_affinity_term: PodAffinityTerm { + label_selector: Some(LabelSelector { + match_expressions: None, + match_labels: Some(BTreeMap::from([ + (APP_NAME_LABEL.to_string(), app_name.to_string()), + (APP_INSTANCE_LABEL.to_string(), cluster_name.to_string()), + ])), + }), + namespace_selector: None, + namespaces: None, + topology_key: TOPOLOGY_KEY_HOSTNAME.to_string(), + }, + weight, + } +} + +#[cfg(test)] +mod tests { + use k8s_openapi::{ + api::core::v1::{NodeSelector, NodeSelectorRequirement, NodeSelectorTerm}, + apimachinery::pkg::apis::meta::v1::LabelSelectorRequirement, + }; + + use crate::config::fragment; + + use super::*; + + #[test] + fn test_affinity_merge_new_attributes() { + let default_affinity = StackableAffinityFragment { + pod_affinity: None, + pod_anti_affinity: Some(PodAntiAffinity { + preferred_during_scheduling_ignored_during_execution: Some(vec![ + affinity_between_role_pods("kafka", "simple-kafka", "broker", 70), + ]), + required_during_scheduling_ignored_during_execution: None, + }), + node_affinity: None, + node_selector: None, + }; + + let role_input = r#" + podAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + - labelSelector: + matchExpressions: + - key: app.kubernetes.io/name + operator: In + values: + - foo + - bar + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: topology.kubernetes.io/zone + operator: In + values: + - antarctica-east1 + - antarctica-west1 + nodeSelector: + disktype: ssd + "#; + let mut role_affinity: StackableAffinityFragment = + serde_yaml::from_str(role_input).expect("illegal test input"); + + role_affinity.merge(&default_affinity); + let merged_affinity: StackableAffinity = fragment::validate(role_affinity).unwrap(); + + assert_eq!( + merged_affinity, + StackableAffinity { + pod_affinity: Some(PodAffinity { + preferred_during_scheduling_ignored_during_execution: None, + required_during_scheduling_ignored_during_execution: Some(vec![ + PodAffinityTerm { + label_selector: Some(LabelSelector { + match_expressions: Some(vec![LabelSelectorRequirement { + key: "app.kubernetes.io/name".to_string(), + operator: "In".to_string(), + values: Some(vec!["foo".to_string(), "bar".to_string()]) + }]), + match_labels: None, + }), + namespace_selector: None, + namespaces: None, + topology_key: "".to_string(), + } + ]) + }), + pod_anti_affinity: Some(PodAntiAffinity { + preferred_during_scheduling_ignored_during_execution: Some(vec![ + WeightedPodAffinityTerm { + pod_affinity_term: PodAffinityTerm { + label_selector: Some(LabelSelector { + match_expressions: None, + match_labels: Some(BTreeMap::from([ + ("app.kubernetes.io/name".to_string(), "kafka".to_string(),), + ( + "app.kubernetes.io/instance".to_string(), + "simple-kafka".to_string(), + ), + ( + "app.kubernetes.io/component".to_string(), + "broker".to_string(), + ) + ])) + }), + namespace_selector: None, + namespaces: None, + topology_key: TOPOLOGY_KEY_HOSTNAME.to_string(), + }, + weight: 70 + } + ]), + required_during_scheduling_ignored_during_execution: None, + }), + node_affinity: Some(NodeAffinity { + preferred_during_scheduling_ignored_during_execution: None, + required_during_scheduling_ignored_during_execution: Some(NodeSelector { + node_selector_terms: vec![NodeSelectorTerm { + match_expressions: Some(vec![NodeSelectorRequirement { + key: "topology.kubernetes.io/zone".to_string(), + operator: "In".to_string(), + values: Some(vec![ + "antarctica-east1".to_string(), + "antarctica-west1".to_string() + ]), + }]), + match_fields: None, + }] + }), + }), + node_selector: Some(StackableNodeSelector { + node_selector: BTreeMap::from([("disktype".to_string(), "ssd".to_string())]) + }), + } + ); + } + + #[test] + fn test_affinity_merge_overwrite_existing_attribute() { + let default_affinity = StackableAffinityFragment { + pod_affinity: None, + pod_anti_affinity: Some(PodAntiAffinity { + preferred_during_scheduling_ignored_during_execution: Some(vec![ + affinity_between_role_pods("kafka", "simple-kafka", "broker", 70), + ]), + required_during_scheduling_ignored_during_execution: None, + }), + node_affinity: None, + node_selector: None, + }; + + // The following anti-affinity tells k8s it *must* spread the brokers across multiple zones + // It will overwrite the default anti-affinity + let role_input = r#" + podAntiAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + - labelSelector: + matchLabels: + app.kubernetes.io/name: kafka + app.kubernetes.io/instance: simple-kafka + app.kubernetes.io/component: broker + topologyKey: topology.kubernetes.io/zone + "#; + let mut role_affinity: StackableAffinityFragment = + serde_yaml::from_str(role_input).expect("illegal test input"); + + role_affinity.merge(&default_affinity); + let merged_affinity: StackableAffinity = fragment::validate(role_affinity).unwrap(); + + assert_eq!( + merged_affinity, + StackableAffinity { + pod_affinity: None, + pod_anti_affinity: Some(PodAntiAffinity { + preferred_during_scheduling_ignored_during_execution: None, + required_during_scheduling_ignored_during_execution: Some(vec![ + PodAffinityTerm { + label_selector: Some(LabelSelector { + match_expressions: None, + match_labels: Some(BTreeMap::from([ + ("app.kubernetes.io/name".to_string(), "kafka".to_string(),), + ( + "app.kubernetes.io/instance".to_string(), + "simple-kafka".to_string(), + ), + ( + "app.kubernetes.io/component".to_string(), + "broker".to_string(), + ) + ])) + }), + namespace_selector: None, + namespaces: None, + topology_key: "topology.kubernetes.io/zone".to_string(), + } + ]), + }), + node_affinity: None, + node_selector: None, + } + ); + } + + #[test] + fn test_affinity_between_role_pods() { + let app_name = "kafka"; + let cluster_name = "simple-kafka"; + let role = "broker"; + + let anti_affinity = affinity_between_role_pods(app_name, cluster_name, role, 70); + assert_eq!( + anti_affinity, + WeightedPodAffinityTerm { + pod_affinity_term: PodAffinityTerm { + label_selector: Some(LabelSelector { + match_expressions: None, + match_labels: Some(BTreeMap::from([ + ("app.kubernetes.io/name".to_string(), "kafka".to_string(),), + ( + "app.kubernetes.io/instance".to_string(), + "simple-kafka".to_string(), + ), + ( + "app.kubernetes.io/component".to_string(), + "broker".to_string(), + ) + ])) + }), + namespace_selector: None, + namespaces: None, + topology_key: TOPOLOGY_KEY_HOSTNAME.to_string(), + }, + weight: 70 + } + ); + } + + #[test] + fn test_affinity_between_cluster_pods() { + let app_name = "kafka"; + let cluster_name = "simple-kafka"; + + let anti_affinity = affinity_between_cluster_pods(app_name, cluster_name, 20); + assert_eq!( + anti_affinity, + WeightedPodAffinityTerm { + pod_affinity_term: PodAffinityTerm { + label_selector: Some(LabelSelector { + match_expressions: None, + match_labels: Some(BTreeMap::from([ + ("app.kubernetes.io/name".to_string(), "kafka".to_string(),), + ( + "app.kubernetes.io/instance".to_string(), + "simple-kafka".to_string(), + ) + ])) + }), + namespace_selector: None, + namespaces: None, + topology_key: TOPOLOGY_KEY_HOSTNAME.to_string(), + }, + weight: 20 + } + ); + } +} diff --git a/src/commons/mod.rs b/src/commons/mod.rs index 477724dbe..31a033c1f 100644 --- a/src/commons/mod.rs +++ b/src/commons/mod.rs @@ -1,5 +1,6 @@ //! This module provides common datastructures or CRDs shared between all the operators +pub mod affinity; pub mod authentication; pub mod ldap; pub mod listener; diff --git a/src/config/merge.rs b/src/config/merge.rs index 337670c62..3be0bdac1 100644 --- a/src/config/merge.rs +++ b/src/config/merge.rs @@ -1,4 +1,7 @@ -use k8s_openapi::apimachinery::pkg::{api::resource::Quantity, apis::meta::v1::LabelSelector}; +use k8s_openapi::{ + api::core::v1::{NodeAffinity, PodAffinity, PodAntiAffinity}, + apimachinery::pkg::{api::resource::Quantity, apis::meta::v1::LabelSelector}, +}; use std::{ collections::{btree_map, hash_map, BTreeMap, HashMap}, hash::Hash, @@ -134,6 +137,9 @@ impl Atomic for String {} impl Atomic for Quantity {} impl<'a> Atomic for &'a str {} impl Atomic for LabelSelector {} +impl Atomic for PodAffinity {} +impl Atomic for PodAntiAffinity {} +impl Atomic for NodeAffinity {} impl Merge for Option { fn merge(&mut self, defaults: &Self) { diff --git a/src/role_utils.rs b/src/role_utils.rs index e6478577e..06f6b0ec8 100644 --- a/src/role_utils.rs +++ b/src/role_utils.rs @@ -182,6 +182,8 @@ pub struct RoleGroup { #[serde(flatten)] pub config: CommonConfiguration, pub replicas: Option, + // TODO Can be removed after we stop supporting this field. + // See ADR 26 Affinities pub selector: Option, }