Skip to content

Commit c119674

Browse files
Merge branch 'master' into chore/informers-bean-constructor
2 parents 08bd040 + a61fd5a commit c119674

File tree

15 files changed

+641
-12
lines changed

15 files changed

+641
-12
lines changed

extended/src/main/java/io/kubernetes/client/extended/leaderelection/LeaderElector.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -304,11 +304,9 @@ private boolean tryAcquireOrRenew() {
304304

305305
if (observedTimeMilliSeconds + config.getLeaseDuration().toMillis() > now.getTime()
306306
&& !isLeader()) {
307-
if (log.isDebugEnabled()) {
308-
log.debug(
309-
"Lock is held by {} and has not yet expired",
310-
oldLeaderElectionRecord.getHolderIdentity());
311-
}
307+
log.debug(
308+
"Lock is held by {} and has not yet expired",
309+
oldLeaderElectionRecord.getHolderIdentity());
312310
return false;
313311
}
314312

@@ -368,7 +366,13 @@ private void maybeReportTransition() {
368366
this.reportedLeader = this.observedRecord.getHolderIdentity();
369367

370368
if (this.onNewLeaderHook != null) {
371-
this.hookWorkers.submit(() -> onNewLeaderHook.accept(this.reportedLeader));
369+
this.hookWorkers.submit(
370+
() -> {
371+
log.info(
372+
"LeaderElection lock is currently held by {}",
373+
this.observedRecord.getHolderIdentity());
374+
onNewLeaderHook.accept(this.reportedLeader);
375+
});
372376
}
373377
}
374378

extended/src/test/java/io/kubernetes/client/extended/leaderelection/LeaderElectorTest.java

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import java.time.Duration;
1616
import java.util.concurrent.CountDownLatch;
1717
import java.util.concurrent.TimeUnit;
18+
import java.util.function.Consumer;
1819
import org.junit.Test;
1920

2021
/** Leader Election tests using "simulated" locks created by {@link LockSmith} */
@@ -53,11 +54,64 @@ public void testLeaderGracefulShutdown() throws Exception {
5354
leaderElector2.close();
5455
}
5556

57+
@Test(timeout = 20000L)
58+
public void testLeaderTransitionHook() throws InterruptedException {
59+
LockSmith lockSmith = new LockSmith();
60+
61+
CountDownLatch startBeingLeader1 = new CountDownLatch(1);
62+
CountDownLatch stopBeingLeader1 = new CountDownLatch(1);
63+
64+
LeaderElector leaderElector1 =
65+
makeAndRunLeaderElectorAsync(
66+
lockSmith, "candidate1", startBeingLeader1, stopBeingLeader1, (id) -> {});
67+
68+
// wait for candidate1 to become leader
69+
startBeingLeader1.await();
70+
71+
// start candidate2 and the transition hook should be called on the start
72+
CountDownLatch startBeingLeader2 = new CountDownLatch(1);
73+
CountDownLatch stopBeingLeader2 = new CountDownLatch(1);
74+
CountDownLatch notifiedLeader = new CountDownLatch(1);
75+
String expectedLeader = "candidate1";
76+
LeaderElector leaderElector2 =
77+
makeAndRunLeaderElectorAsync(
78+
lockSmith,
79+
"candidate2",
80+
startBeingLeader2,
81+
stopBeingLeader2,
82+
(id) -> {
83+
if (expectedLeader.equals(id)) {
84+
notifiedLeader.countDown();
85+
}
86+
});
87+
88+
notifiedLeader.await();
89+
90+
// start candidate1
91+
leaderElector1.close();
92+
93+
// ensure stopBeingLeader hook is called
94+
stopBeingLeader1.await();
95+
96+
// wait for candidate2 to become leader
97+
startBeingLeader2.await();
98+
}
99+
56100
private LeaderElector makeAndRunLeaderElectorAsync(
57101
LockSmith lockSmith,
58102
String lockIdentity,
59103
CountDownLatch startBeingLeader,
60104
CountDownLatch stopBeingLeader) {
105+
return makeAndRunLeaderElectorAsync(
106+
lockSmith, lockIdentity, startBeingLeader, stopBeingLeader, (id) -> {});
107+
}
108+
109+
private LeaderElector makeAndRunLeaderElectorAsync(
110+
LockSmith lockSmith,
111+
String lockIdentity,
112+
CountDownLatch startBeingLeader,
113+
CountDownLatch stopBeingLeader,
114+
Consumer<String> onNewLeaderHook) {
61115
LeaderElectionConfig leaderElectionConfig =
62116
new LeaderElectionConfig(
63117
lockSmith.makeLock(lockIdentity),
@@ -70,7 +124,9 @@ private LeaderElector makeAndRunLeaderElectorAsync(
70124
new Thread(
71125
() ->
72126
leaderElector.run(
73-
() -> startBeingLeader.countDown(), () -> stopBeingLeader.countDown()),
127+
() -> startBeingLeader.countDown(),
128+
() -> stopBeingLeader.countDown(),
129+
onNewLeaderHook),
74130
String.format("%s-leader-elector-main", lockIdentity));
75131
thread.setDaemon(true);
76132
thread.start();

spring/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,11 @@
6464
<artifactId>wiremock</artifactId>
6565
<scope>test</scope>
6666
</dependency>
67+
<dependency>
68+
<groupId>org.awaitility</groupId>
69+
<artifactId>awaitility</artifactId>
70+
<scope>test</scope>
71+
</dependency>
6772

6873
</dependencies>
6974

spring/src/main/java/io/kubernetes/client/spring/extended/controller/KubernetesInformerFactoryProcessor.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,15 @@
2020
import io.kubernetes.client.openapi.ApiClient;
2121
import io.kubernetes.client.spring.extended.controller.annotation.KubernetesInformer;
2222
import io.kubernetes.client.spring.extended.controller.annotation.KubernetesInformers;
23+
import io.kubernetes.client.spring.extended.controller.config.KubernetesInformerProperties;
24+
import io.kubernetes.client.spring.extended.controller.factory.KubernetesControllerFactory;
2325
import io.kubernetes.client.util.generic.GenericKubernetesApi;
2426
import org.slf4j.Logger;
2527
import org.slf4j.LoggerFactory;
2628
import org.springframework.beans.BeansException;
2729
import org.springframework.beans.factory.BeanFactory;
2830
import org.springframework.beans.factory.BeanFactoryAware;
31+
import org.springframework.beans.factory.annotation.Autowired;
2932
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
3033
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
3134
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
@@ -51,6 +54,10 @@ public class KubernetesInformerFactoryProcessor
5154

5255
public static final int ORDER = 0;
5356

57+
private static final Logger log = LoggerFactory.getLogger(KubernetesControllerFactory.class);
58+
59+
@Autowired private KubernetesInformerProperties informerProperties;
60+
5461
private ConfigurableListableBeanFactory beanFactory;
5562

5663
@Override
@@ -133,6 +140,20 @@ private <T extends KubernetesObject> Lister<T> lister(Class<T> type) {
133140
private <T extends KubernetesObject> SharedInformer<T> informer(
134141
Class<T> type, KubernetesInformer kubernetesInformer) {
135142
ApiClient apiClient = this.beanFactory.getBean(ApiClient.class);
143+
144+
if (apiClient.getHttpClient().readTimeoutMillis() > 0) {
145+
log.warn(
146+
"Enforcing read-timeout of the ApiClient {} to {} so that the watch connection won't abort from client-side",
147+
apiClient,
148+
informerProperties.getClientReadTimeout());
149+
apiClient.setHttpClient(
150+
apiClient
151+
.getHttpClient()
152+
.newBuilder()
153+
.readTimeout(informerProperties.getClientReadTimeout())
154+
.build());
155+
}
156+
136157
SharedInformerFactory sharedInformerFactory =
137158
this.beanFactory.getBean(SharedInformerFactory.class);
138159
final GenericKubernetesApi api =

spring/src/main/java/io/kubernetes/client/spring/extended/controller/config/KubernetesInformerAutoConfiguration.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,15 @@
1818
import io.kubernetes.client.util.ClientBuilder;
1919
import java.io.IOException;
2020
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
21+
import org.springframework.boot.context.properties.EnableConfigurationProperties;
2122
import org.springframework.context.annotation.Bean;
2223
import org.springframework.context.annotation.Configuration;
2324

2425
@Configuration(proxyBeanMethods = false)
2526
@ConditionalOnKubernetesInformerEnabled
27+
@EnableConfigurationProperties({
28+
KubernetesInformerProperties.class,
29+
})
2630
public class KubernetesInformerAutoConfiguration {
2731

2832
@Bean
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
Copyright 2021 The Kubernetes Authors.
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
package io.kubernetes.client.spring.extended.controller.config;
14+
15+
import io.kubernetes.client.informer.cache.ReflectorRunnable;
16+
import java.time.Duration;
17+
import org.springframework.boot.context.properties.ConfigurationProperties;
18+
19+
@ConfigurationProperties("kubernetes.informer")
20+
public class KubernetesInformerProperties {
21+
22+
private Duration clientReadTimeout = ReflectorRunnable.REFLECTOR_WATCH_CLIENTSIDE_TIMEOUT;
23+
24+
public Duration getClientReadTimeout() {
25+
return clientReadTimeout;
26+
}
27+
28+
public KubernetesInformerProperties setClientReadTimeout(Duration clientReadTimeout) {
29+
this.clientReadTimeout = clientReadTimeout;
30+
return this;
31+
}
32+
}
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
/*
2+
Copyright 2021 The Kubernetes Authors.
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
package io.kubernetes.client.spring.extended.manifests;
14+
15+
import com.github.benmanes.caffeine.cache.CacheLoader;
16+
import com.github.benmanes.caffeine.cache.Caffeine;
17+
import com.github.benmanes.caffeine.cache.LoadingCache;
18+
import io.kubernetes.client.openapi.models.V1ConfigMap;
19+
import io.kubernetes.client.spring.extended.manifests.annotation.FromConfigMap;
20+
import io.kubernetes.client.spring.extended.manifests.config.KubernetesManifestsProperties;
21+
import io.kubernetes.client.spring.extended.manifests.configmaps.ConfigMapGetter;
22+
import java.lang.reflect.Field;
23+
import java.util.Map;
24+
import java.util.concurrent.Executors;
25+
import java.util.concurrent.ScheduledExecutorService;
26+
import java.util.concurrent.TimeUnit;
27+
import java.util.function.Supplier;
28+
import org.checkerframework.checker.nullness.qual.NonNull;
29+
import org.checkerframework.checker.nullness.qual.Nullable;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
32+
import org.springframework.beans.BeansException;
33+
import org.springframework.beans.factory.BeanCreationException;
34+
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
35+
import org.springframework.beans.factory.annotation.Autowired;
36+
import org.springframework.beans.factory.config.BeanPostProcessor;
37+
import org.springframework.beans.factory.config.InstantiationAwareBeanPostProcessor;
38+
import org.springframework.context.ApplicationContext;
39+
import org.springframework.context.ApplicationContextAware;
40+
import org.springframework.util.ReflectionUtils;
41+
42+
public class KubernetesFromConfigMapProcessor
43+
implements InstantiationAwareBeanPostProcessor, BeanPostProcessor, ApplicationContextAware {
44+
45+
private static final Logger log = LoggerFactory.getLogger(KubernetesFromConfigMapProcessor.class);
46+
47+
private ApplicationContext applicationContext;
48+
49+
private final ScheduledExecutorService configMapKeyRefresher =
50+
Executors.newSingleThreadScheduledExecutor();
51+
52+
@Autowired private KubernetesManifestsProperties manifestsProperties;
53+
54+
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
55+
56+
for (Field field : bean.getClass().getDeclaredFields()) {
57+
ReflectionUtils.makeAccessible(field);
58+
try {
59+
if (field.get(bean) != null) {
60+
continue; // field already set, skip processing
61+
}
62+
} catch (IllegalAccessException e) {
63+
log.warn("Failed inject resource for @FromConfigMap annotated field {}", field, e);
64+
continue;
65+
}
66+
67+
FromConfigMap fromConfigMapAnnotation = field.getAnnotation(FromConfigMap.class);
68+
if (fromConfigMapAnnotation == null) {
69+
continue; // skip if the field doesn't have the annotation
70+
}
71+
72+
if (!Map.class.isAssignableFrom(field.getType())) {
73+
log.warn(
74+
"Failed inject resource for @FromConfigMap annotated field {}, the declaring type should be Map<String, String>",
75+
field);
76+
continue;
77+
}
78+
79+
ConfigMapGetter configMapGetter =
80+
getOrCreateConfigMapGetter(fromConfigMapAnnotation, applicationContext);
81+
82+
LoadingCache<String, String> configMapDataCache =
83+
Caffeine.newBuilder()
84+
.expireAfterWrite(manifestsProperties.getRefreshInterval())
85+
.build(
86+
new ConfigMapGetterCacheLoader(
87+
() -> {
88+
return configMapGetter.get(
89+
fromConfigMapAnnotation.namespace(), fromConfigMapAnnotation.name());
90+
}));
91+
fullyRefreshCache(configMapGetter, fromConfigMapAnnotation, configMapDataCache);
92+
configMapKeyRefresher.scheduleAtFixedRate(
93+
() -> {
94+
fullyRefreshCache(configMapGetter, fromConfigMapAnnotation, configMapDataCache);
95+
},
96+
manifestsProperties.getRefreshInterval().getSeconds(),
97+
manifestsProperties.getRefreshInterval().getSeconds(),
98+
TimeUnit.SECONDS);
99+
ReflectionUtils.setField(field, bean, configMapDataCache.asMap());
100+
}
101+
102+
return bean;
103+
}
104+
105+
private static void fullyRefreshCache(
106+
ConfigMapGetter configMapGetter,
107+
FromConfigMap fromConfigMapAnnotation,
108+
LoadingCache<String, String> configMapDataCache) {
109+
V1ConfigMap configMap =
110+
configMapGetter.get(fromConfigMapAnnotation.namespace(), fromConfigMapAnnotation.name());
111+
if (configMap == null || configMap.getData() == null) {
112+
return;
113+
}
114+
// TODO: make the cache data refreshment atomic
115+
configMap.getData().keySet().stream().forEach(key -> configMapDataCache.refresh(key));
116+
}
117+
118+
private ConfigMapGetter getOrCreateConfigMapGetter(
119+
FromConfigMap fromConfigMapAnnotation, ApplicationContext applicationContext) {
120+
ConfigMapGetter configMapGetter;
121+
try {
122+
configMapGetter =
123+
applicationContext
124+
.getAutowireCapableBeanFactory()
125+
.getBean(fromConfigMapAnnotation.configMapGetter());
126+
} catch (NoSuchBeanDefinitionException ne) {
127+
try {
128+
configMapGetter = fromConfigMapAnnotation.configMapGetter().newInstance();
129+
} catch (IllegalAccessException | InstantiationException e) {
130+
throw new BeanCreationException("failed creating configmap getter instance", e);
131+
}
132+
applicationContext.getAutowireCapableBeanFactory().autowireBean(configMapGetter);
133+
applicationContext
134+
.getAutowireCapableBeanFactory()
135+
.initializeBean(
136+
configMapGetter,
137+
"configmap-getter-" + fromConfigMapAnnotation.configMapGetter().getSimpleName());
138+
}
139+
return configMapGetter;
140+
}
141+
142+
@Override
143+
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
144+
this.applicationContext = applicationContext;
145+
}
146+
147+
static class ConfigMapGetterCacheLoader implements CacheLoader<String, String> {
148+
149+
ConfigMapGetterCacheLoader(Supplier<V1ConfigMap> configMapSupplier) {
150+
this.configMapSupplier = configMapSupplier;
151+
}
152+
153+
private final Supplier<V1ConfigMap> configMapSupplier;
154+
155+
@Override
156+
public @Nullable String load(@NonNull String key) throws Exception {
157+
V1ConfigMap configMap = this.configMapSupplier.get();
158+
if (configMap == null || configMap.getData() == null) {
159+
return null;
160+
}
161+
return configMap.getData().get(key);
162+
}
163+
}
164+
}

0 commit comments

Comments
 (0)