Skip to content

Commit b2c53e8

Browse files
authored
Merge pull request #890 from yue9944882/spring-ioc-follow-up
Spring integration follow up
2 parents 1ebe7be + 7fcd9dc commit b2c53e8

File tree

13 files changed

+256
-85
lines changed

13 files changed

+256
-85
lines changed

extended/src/main/java/io/kubernetes/client/extended/controller/ControllerWatch.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.kubernetes.client.extended.controller;
22

33
import io.kubernetes.client.informer.ResourceEventHandler;
4+
import java.time.Duration;
45

56
/**
67
* The interface Controller watch defines how a controller watches certain resources.
@@ -21,4 +22,11 @@ public interface ControllerWatch<ApiType> {
2122
* @return the resource event handler
2223
*/
2324
ResourceEventHandler<ApiType> getResourceEventHandler();
25+
26+
/**
27+
* Gets resync period for the registering event handler.
28+
*
29+
* @return the resync period
30+
*/
31+
Duration getResyncPeriod();
2432
}

extended/src/main/java/io/kubernetes/client/extended/controller/DefaultControllerWatch.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import io.kubernetes.client.extended.controller.reconciler.Request;
44
import io.kubernetes.client.extended.workqueue.WorkQueue;
55
import io.kubernetes.client.informer.ResourceEventHandler;
6+
import java.time.Duration;
67
import java.util.function.BiPredicate;
78
import java.util.function.Function;
89
import java.util.function.Predicate;
@@ -21,6 +22,7 @@ public class DefaultControllerWatch<ApiType> implements ControllerWatch<ApiType>
2122
private Predicate<ApiType> onAddFilterPredicate;
2223
private BiPredicate<ApiType, ApiType> onUpdateFilterPredicate;
2324
private BiPredicate<ApiType, Boolean> onDeleteFilterPredicate;
25+
private Duration resyncPeriod;
2426

2527
/**
2628
* Instantiates a new Work queue resource event handler.
@@ -31,10 +33,12 @@ public class DefaultControllerWatch<ApiType> implements ControllerWatch<ApiType>
3133
public DefaultControllerWatch(
3234
Class<ApiType> apiTypeClass,
3335
WorkQueue<Request> workQueue,
34-
Function<ApiType, Request> workKeyGenerator) {
36+
Function<ApiType, Request> workKeyGenerator,
37+
Duration resyncPeriod) {
3538
this.workQueue = workQueue;
3639
this.apiTypeClass = apiTypeClass;
3740
this.workKeyGenerator = workKeyGenerator;
41+
this.resyncPeriod = resyncPeriod;
3842
}
3943

4044
public Predicate<ApiType> getOnAddFilterPredicate() {
@@ -96,4 +100,9 @@ public void onDelete(ApiType obj, boolean deletedFinalStateUnknown) {
96100
}
97101
};
98102
}
103+
104+
@Override
105+
public Duration getResyncPeriod() {
106+
return this.resyncPeriod;
107+
}
99108
}

extended/src/main/java/io/kubernetes/client/extended/controller/builder/ControllerWatchBuilder.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import io.kubernetes.client.extended.controller.DefaultControllerWatch;
55
import io.kubernetes.client.extended.controller.reconciler.Request;
66
import io.kubernetes.client.extended.workqueue.WorkQueue;
7+
import java.time.Duration;
78
import java.util.function.BiPredicate;
89
import java.util.function.Function;
910
import java.util.function.Predicate;
@@ -13,6 +14,7 @@ public class ControllerWatchBuilder<ApiType> {
1314
private Function<ApiType, Request> workKeyGenerator;
1415
private WorkQueue<Request> workQueue;
1516
private Class<ApiType> apiTypeClass;
17+
private Duration resyncPeriod = Duration.ZERO;
1618

1719
private Predicate<ApiType> onAddFilterPredicate;
1820
private BiPredicate<ApiType, ApiType> onUpdateFilterPredicate;
@@ -69,6 +71,11 @@ public ControllerWatchBuilder<ApiType> withWorkQueueKeyFunc(
6971
return this;
7072
}
7173

74+
public ControllerWatchBuilder<ApiType> withResyncPeriod(Duration resyncPeriod) {
75+
this.resyncPeriod = resyncPeriod;
76+
return this;
77+
}
78+
7279
/**
7380
* End building controller-watch.
7481
*
@@ -77,7 +84,7 @@ public ControllerWatchBuilder<ApiType> withWorkQueueKeyFunc(
7784
*/
7885
public DefaultControllerWatch<ApiType> build() throws IllegalStateException {
7986
DefaultControllerWatch<ApiType> workQueueHandler =
80-
new DefaultControllerWatch<>(apiTypeClass, workQueue, workKeyGenerator);
87+
new DefaultControllerWatch<>(apiTypeClass, workQueue, workKeyGenerator, resyncPeriod);
8188
workQueueHandler.setOnAddFilterPredicate(onAddFilterPredicate);
8289
workQueueHandler.setOnUpdateFilterPredicate(onUpdateFilterPredicate);
8390
workQueueHandler.setOnDeleteFilterPredicate(onDeleteFilterPredicate);

extended/src/main/java/io/kubernetes/client/extended/controller/builder/DefaultControllerBuilder.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ public <ApiType> DefaultControllerBuilder watch(
6363
+ "check if informer already constructed in the informerFactory",
6464
apiTypeClass));
6565
}
66-
informer.addEventHandler(watch.getResourceEventHandler());
66+
informer.addEventHandlerWithResyncPeriod(
67+
watch.getResourceEventHandler(), watch.getResyncPeriod().toMillis());
6768
return this;
6869
}
6970

extended/src/test/java/io/kubernetes/client/extended/controller/ControllerWatchTest.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import io.kubernetes.client.openapi.models.V1ObjectMeta;
99
import io.kubernetes.client.openapi.models.V1Pod;
1010
import io.kubernetes.client.openapi.models.V1PodSpec;
11+
import java.time.Duration;
1112
import org.junit.Test;
1213

1314
public class ControllerWatchTest {
@@ -18,7 +19,8 @@ public class ControllerWatchTest {
1819
public void testOnAdd() {
1920
WorkQueue<Request> workQueue = new DefaultWorkQueue<>();
2021
DefaultControllerWatch<V1Pod> controllerWatch =
21-
new DefaultControllerWatch(V1Pod.class, workQueue, Controllers.defaultReflectiveKeyFunc());
22+
new DefaultControllerWatch(
23+
V1Pod.class, workQueue, Controllers.defaultReflectiveKeyFunc(), Duration.ZERO);
2224
controllerWatch.getResourceEventHandler().onAdd(testPod);
2325
assertEquals(1, workQueue.length());
2426

@@ -31,7 +33,8 @@ public void testOnAdd() {
3133
public void testOnUpdate() {
3234
WorkQueue<Request> workQueue = new DefaultWorkQueue<>();
3335
DefaultControllerWatch<V1Pod> controllerWatch =
34-
new DefaultControllerWatch(V1Pod.class, workQueue, Controllers.defaultReflectiveKeyFunc());
36+
new DefaultControllerWatch(
37+
V1Pod.class, workQueue, Controllers.defaultReflectiveKeyFunc(), Duration.ZERO);
3538
controllerWatch.getResourceEventHandler().onUpdate(null, testPod);
3639
assertEquals(1, workQueue.length());
3740

@@ -44,7 +47,8 @@ public void testOnUpdate() {
4447
public void testOnDelete() {
4548
WorkQueue<Request> workQueue = new DefaultWorkQueue<>();
4649
DefaultControllerWatch<V1Pod> controllerWatch =
47-
new DefaultControllerWatch(V1Pod.class, workQueue, Controllers.defaultReflectiveKeyFunc());
50+
new DefaultControllerWatch(
51+
V1Pod.class, workQueue, Controllers.defaultReflectiveKeyFunc(), Duration.ZERO);
4852
controllerWatch.getResourceEventHandler().onDelete(testPod, false);
4953
assertEquals(1, workQueue.length());
5054

spring/src/main/java/io/kubernetes/client/spring/extended/controller/KubernetesReconcilerProcessor.java

Lines changed: 119 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,20 @@
99
import io.kubernetes.client.extended.workqueue.DefaultRateLimitingQueue;
1010
import io.kubernetes.client.extended.workqueue.RateLimitingQueue;
1111
import io.kubernetes.client.informer.SharedInformerFactory;
12-
import io.kubernetes.client.spring.extended.controller.annotation.KubernetesReconciler;
13-
import io.kubernetes.client.spring.extended.controller.annotation.KubernetesReconcilerWatch;
14-
import io.kubernetes.client.spring.extended.controller.annotation.KubernetesReconcilerWatches;
15-
import io.kubernetes.client.spring.extended.controller.annotation.KubernetesReconcilerWorkerCount;
12+
import io.kubernetes.client.spring.extended.controller.annotation.*;
13+
import java.lang.reflect.InvocationTargetException;
14+
import java.lang.reflect.Method;
15+
import java.time.Duration;
16+
import java.util.ArrayList;
17+
import java.util.List;
1618
import java.util.concurrent.ExecutorService;
1719
import java.util.concurrent.Executors;
1820
import java.util.function.BiPredicate;
1921
import java.util.function.Function;
2022
import java.util.function.Predicate;
23+
import java.util.function.Supplier;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
2126
import org.springframework.beans.BeansException;
2227
import org.springframework.beans.factory.BeanCreationException;
2328
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
@@ -35,6 +40,8 @@
3540
@Component
3641
public class KubernetesReconcilerProcessor implements BeanFactoryPostProcessor, Ordered {
3742

43+
private static final Logger log = LoggerFactory.getLogger(KubernetesReconcilerProcessor.class);
44+
3845
private ControllerManager controllerManager;
3946

4047
private ExecutorService controllerManagerDaemon = Executors.newSingleThreadExecutor();
@@ -69,21 +76,43 @@ private Controller buildController(SharedInformerFactory sharedInformerFactory,
6976
DefaultControllerBuilder builder = ControllerBuilder.defaultBuilder(sharedInformerFactory);
7077
for (KubernetesReconcilerWatch watch : watches.value()) {
7178
try {
72-
Predicate addFilter = watch.addFilter().newInstance();
73-
BiPredicate updateFilter = watch.updateFilter().newInstance();
74-
BiPredicate deleteFilter = watch.deleteFilter().newInstance();
79+
Predicate addFilter = null;
80+
BiPredicate updateFilter = null;
81+
BiPredicate deleteFilter = null;
82+
final List<Supplier<Boolean>> readyFuncs = new ArrayList<>();
7583
Function<?, Request> workQueueKeyFunc = watch.workQueueKeyFunc().newInstance();
84+
for (Method method : r.getClass().getMethods()) {
85+
if (method.isAnnotationPresent(AddWatchEventFilter.class)) {
86+
addFilter = new AddFilterAdaptor(r, method);
87+
}
88+
if (method.isAnnotationPresent(UpdateWatchEventFilter.class)) {
89+
updateFilter = new UpdateFilterAdaptor(r, method);
90+
}
91+
if (method.isAnnotationPresent(DeleteWatchEventFilter.class)) {
92+
deleteFilter = new DeleteFilterAdaptor(r, method);
93+
}
94+
if (method.isAnnotationPresent(KubernetesReconcilerReadyFunc.class)) {
95+
readyFuncs.add(new ReadyFuncAdaptor(r, method));
96+
}
97+
}
7698

99+
final Predicate finalAddFilter = addFilter;
100+
final BiPredicate finalUpdateFilter = updateFilter;
101+
final BiPredicate finalDeleteFilter = deleteFilter;
77102
builder =
78103
builder.watch(
79104
(workQueue) -> {
80105
return ControllerBuilder.controllerWatchBuilder(watch.apiTypeClass(), workQueue)
81-
.withOnAddFilter(addFilter)
82-
.withOnUpdateFilter(updateFilter)
83-
.withOnDeleteFilter(deleteFilter)
106+
.withOnAddFilter(finalAddFilter)
107+
.withOnUpdateFilter(finalUpdateFilter)
108+
.withOnDeleteFilter(finalDeleteFilter)
84109
.withWorkQueueKeyFunc(workQueueKeyFunc)
110+
.withResyncPeriod(Duration.ofMillis(watch.resyncPeriodMillis()))
85111
.build();
86112
});
113+
for (Supplier<Boolean> readyFunc : readyFuncs) {
114+
builder = builder.withReadyFunc(readyFunc);
115+
}
87116
} catch (IllegalAccessException | InstantiationException e) {
88117
throw new BeanCreationException("Failed instantiating controller: " + e.getMessage());
89118
}
@@ -98,4 +127,84 @@ private Controller buildController(SharedInformerFactory sharedInformerFactory,
98127
RateLimitingQueue<Request> workQueue = new DefaultRateLimitingQueue<>();
99128
return builder.withReconciler(r).withName(reconcilerName).withWorkQueue(workQueue).build();
100129
}
130+
131+
private static class AddFilterAdaptor implements Predicate {
132+
private Method method;
133+
private Object target;
134+
135+
private AddFilterAdaptor(Object target, Method method) {
136+
this.method = method;
137+
this.target = target;
138+
}
139+
140+
@Override
141+
public boolean test(Object addedObj) {
142+
try {
143+
return (boolean) method.invoke(target, addedObj);
144+
} catch (IllegalAccessException | InvocationTargetException e) {
145+
log.error("invalid EventAddFilter method signature", e);
146+
return true;
147+
}
148+
}
149+
}
150+
151+
private static class UpdateFilterAdaptor implements BiPredicate {
152+
private Method method;
153+
private Object target;
154+
155+
private UpdateFilterAdaptor(Object target, Method method) {
156+
this.method = method;
157+
this.target = target;
158+
}
159+
160+
@Override
161+
public boolean test(Object oldObj, Object newObj) {
162+
try {
163+
return (boolean) method.invoke(target, oldObj, newObj);
164+
} catch (IllegalAccessException | InvocationTargetException e) {
165+
log.error("invalid EventUpdateFilter method signature", e);
166+
return true;
167+
}
168+
}
169+
}
170+
171+
private static class DeleteFilterAdaptor implements BiPredicate {
172+
private Method method;
173+
private Object target;
174+
175+
private DeleteFilterAdaptor(Object target, Method method) {
176+
this.method = method;
177+
this.target = target;
178+
}
179+
180+
@Override
181+
public boolean test(Object deleteObj, Object cacheStatusUnknown) {
182+
try {
183+
return (boolean) method.invoke(target, deleteObj, cacheStatusUnknown);
184+
} catch (IllegalAccessException | InvocationTargetException e) {
185+
log.error("invalid EventDeleteFilter method signature", e);
186+
return true;
187+
}
188+
}
189+
}
190+
191+
private static class ReadyFuncAdaptor implements Supplier<Boolean> {
192+
private Method method;
193+
private Object target;
194+
195+
private ReadyFuncAdaptor(Object target, Method method) {
196+
this.method = method;
197+
this.target = target;
198+
}
199+
200+
@Override
201+
public Boolean get() {
202+
try {
203+
return (boolean) method.invoke(target);
204+
} catch (IllegalAccessException | InvocationTargetException e) {
205+
log.error("invalid ReadyFunc method signature", e);
206+
return false;
207+
}
208+
}
209+
}
101210
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package io.kubernetes.client.spring.extended.controller.annotation;
2+
3+
import java.lang.annotation.ElementType;
4+
import java.lang.annotation.Retention;
5+
import java.lang.annotation.RetentionPolicy;
6+
import java.lang.annotation.Target;
7+
8+
/**
9+
* Indicates that the method is a filter for {@link io.kubernetes.client.informer.EventType#ADDED}
10+
* from watches.
11+
*
12+
* <p>A add-event filter must have the signature as {@link java.util.function.Predicate<ApiType>}
13+
*/
14+
@Target({ElementType.METHOD})
15+
@Retention(RetentionPolicy.RUNTIME)
16+
public @interface AddWatchEventFilter {}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package io.kubernetes.client.spring.extended.controller.annotation;
2+
3+
import java.lang.annotation.ElementType;
4+
import java.lang.annotation.Retention;
5+
import java.lang.annotation.RetentionPolicy;
6+
import java.lang.annotation.Target;
7+
8+
/**
9+
* Indicates that the method is a filter for {@link io.kubernetes.client.informer.EventType#DELETED}
10+
* from watches.
11+
*
12+
* <p>A delete-event filter must have the signature as {@link
13+
* java.util.function.BiPredicate<ApiType,ApiType>} where the 1st parameter is the old state of the
14+
* resource and the 2nd is the new state.
15+
*/
16+
@Target({ElementType.METHOD})
17+
@Retention(RetentionPolicy.RUNTIME)
18+
public @interface DeleteWatchEventFilter {}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package io.kubernetes.client.spring.extended.controller.annotation;
2+
3+
import java.lang.annotation.ElementType;
4+
import java.lang.annotation.Retention;
5+
import java.lang.annotation.RetentionPolicy;
6+
import java.lang.annotation.Target;
7+
8+
/**
9+
* The ready func for Kubernetes reconciler. The reconciler won't be reconciling upon any events
10+
* unless all its registered ready funcs return true.
11+
*
12+
* <p>A ready func must have the signature as {@link java.util.function.Supplier<Boolean>}
13+
*/
14+
@Target({ElementType.METHOD})
15+
@Retention(RetentionPolicy.RUNTIME)
16+
public @interface KubernetesReconcilerReadyFunc {}

spring/src/main/java/io/kubernetes/client/spring/extended/controller/annotation/KubernetesReconcilerReadyFuncs.java

Lines changed: 0 additions & 19 deletions
This file was deleted.

0 commit comments

Comments
 (0)