Skip to content

Commit 61e8d4d

Browse files
committed
fixes reflector list expiration
1 parent 2242bc1 commit 61e8d4d

File tree

2 files changed

+68
-0
lines changed

2 files changed

+68
-0
lines changed

util/src/main/java/io/kubernetes/client/informer/cache/ReflectorRunnable.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,15 @@
1616
import io.kubernetes.client.common.KubernetesObject;
1717
import io.kubernetes.client.informer.EventType;
1818
import io.kubernetes.client.informer.ListerWatcher;
19+
import io.kubernetes.client.openapi.ApiException;
1920
import io.kubernetes.client.openapi.models.V1ListMeta;
2021
import io.kubernetes.client.openapi.models.V1ObjectMeta;
2122
import io.kubernetes.client.util.CallGeneratorParams;
2223
import io.kubernetes.client.util.Strings;
2324
import io.kubernetes.client.util.Watchable;
2425
import java.io.IOException;
2526
import java.net.ConnectException;
27+
import java.net.HttpURLConnection;
2628
import java.time.Duration;
2729
import java.util.List;
2830
import java.util.Optional;
@@ -38,6 +40,9 @@ public class ReflectorRunnable<
3840
private static final Logger log = LoggerFactory.getLogger(ReflectorRunnable.class);
3941

4042
private String lastSyncResourceVersion;
43+
44+
private boolean isLastSyncResourceVersionUnavailable;
45+
4146
private Watchable<ApiType> watch;
4247

4348
private ListerWatcher<ApiType, ApiListType> listerWatcher;
@@ -87,6 +92,7 @@ public void run() {
8792
}
8893
this.syncWith(items, resourceVersion);
8994
this.lastSyncResourceVersion = resourceVersion;
95+
this.isLastSyncResourceVersionUnavailable = false;
9096

9197
if (log.isDebugEnabled()) {
9298
log.debug("{}#Start watching with {}...", apiTypeClass, lastSyncResourceVersion);
@@ -146,6 +152,13 @@ public void run() {
146152
closeWatch();
147153
}
148154
}
155+
} catch (ApiException e) {
156+
if (e.getCode() == HttpURLConnection.HTTP_GONE) {
157+
log.info(
158+
"ResourceVersion {} expired, will retry w/o resourceVersion at the next time",
159+
getRelistResourceVersion());
160+
isLastSyncResourceVersionUnavailable = true;
161+
}
149162
} catch (Throwable t) {
150163
this.exceptionHandler.accept(apiTypeClass, t);
151164
}
@@ -176,8 +189,23 @@ public String getLastSyncResourceVersion() {
176189
return lastSyncResourceVersion;
177190
}
178191

192+
public boolean isLastSyncResourceVersionUnavailable() {
193+
return isLastSyncResourceVersionUnavailable;
194+
}
195+
179196
private String getRelistResourceVersion() {
197+
if (isLastSyncResourceVersionUnavailable) {
198+
// Since this reflector makes paginated list requests, and all paginated list requests skip
199+
// the watch cache
200+
// if the lastSyncResourceVersion is unavailable, we set ResourceVersion="" and list again to
201+
// re-establish reflector
202+
// to the latest available ResourceVersion, using a consistent read from etcd.
203+
return "";
204+
}
180205
if (Strings.isNullOrEmpty(lastSyncResourceVersion)) {
206+
// For performance reasons, initial list performed by reflector uses "0" as resource version
207+
// to allow it to
208+
// be served from the watch cache if it is enabled.
181209
return "0";
182210
}
183211
return lastSyncResourceVersion;

util/src/test/java/io/kubernetes/client/informer/cache/ReflectorRunnableTest.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
*/
1313
package io.kubernetes.client.informer.cache;
1414

15+
import static org.junit.Assert.assertFalse;
1516
import static org.mockito.Mockito.any;
1617
import static org.mockito.Mockito.never;
1718
import static org.mockito.Mockito.times;
@@ -28,6 +29,7 @@
2829
import io.kubernetes.client.util.CallGeneratorParams;
2930
import io.kubernetes.client.util.Watch;
3031
import io.kubernetes.client.util.Watchable;
32+
import java.net.HttpURLConnection;
3133
import java.time.Duration;
3234
import java.util.concurrent.atomic.AtomicReference;
3335
import org.awaitility.Awaitility;
@@ -215,4 +217,42 @@ public Watchable<V1Pod> watch(CallGeneratorParams params) throws ApiException {
215217
reflectorRunnable.stop();
216218
}
217219
}
220+
221+
@Test
222+
public void testReflectorListShouldHandleExpiredResourceVersion() throws ApiException {
223+
String expectedResourceVersion = "100";
224+
when(listerWatcher.list(any()))
225+
.thenReturn(
226+
new V1PodList().metadata(new V1ListMeta().resourceVersion(expectedResourceVersion)));
227+
// constantly failing watches will make the reflector run only one time
228+
when(listerWatcher.watch(any())).thenThrow(new ApiException(HttpURLConnection.HTTP_GONE, ""));
229+
ReflectorRunnable<V1Pod, V1PodList> reflectorRunnable =
230+
new ReflectorRunnable<>(V1Pod.class, listerWatcher, deltaFIFO);
231+
try {
232+
Thread thread = new Thread(reflectorRunnable::run);
233+
thread.setDaemon(true);
234+
thread.start();
235+
Awaitility.await()
236+
.atMost(Duration.ofSeconds(1))
237+
.pollInterval(Duration.ofMillis(100))
238+
.until(
239+
() -> expectedResourceVersion.equals(reflectorRunnable.getLastSyncResourceVersion()));
240+
assertFalse(reflectorRunnable.isLastSyncResourceVersionUnavailable());
241+
} finally {
242+
reflectorRunnable.stop();
243+
}
244+
245+
try {
246+
Thread thread = new Thread(reflectorRunnable::run);
247+
thread.setDaemon(true);
248+
thread.start();
249+
when(listerWatcher.list(any())).thenThrow(new ApiException(HttpURLConnection.HTTP_GONE, ""));
250+
Awaitility.await()
251+
.atMost(Duration.ofSeconds(5))
252+
.pollInterval(Duration.ofMillis(100))
253+
.until(() -> reflectorRunnable.isLastSyncResourceVersionUnavailable());
254+
} finally {
255+
reflectorRunnable.stop();
256+
}
257+
}
218258
}

0 commit comments

Comments
 (0)