Skip to content

Commit fe754c3

Browse files
authored
Merge pull request #1148 from brendandburns/cherry-pick
Cherry pick some fixes to set the stage for 9.0.1
2 parents dd06710 + b6ac710 commit fe754c3

File tree

4 files changed

+29
-23
lines changed

4 files changed

+29
-23
lines changed

util/src/main/java/io/kubernetes/client/informer/cache/SharedProcessor.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,16 @@
2323
import java.util.concurrent.locks.ReentrantReadWriteLock;
2424
import org.apache.commons.collections4.CollectionUtils;
2525
import org.joda.time.DateTime;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
2628

2729
/*
2830
* SharedProcessor class manages all the registered ProcessorListener and distributes notifications.
2931
*/
3032
public class SharedProcessor<ApiType extends KubernetesObject> {
3133

34+
private static final Logger log = LoggerFactory.getLogger(SharedProcessor.class);
35+
3236
private ReadWriteLock lock = new ReentrantReadWriteLock();
3337

3438
private List<ProcessorListener<ApiType>> listeners;
@@ -155,16 +159,16 @@ public void stop() {
155159
} finally {
156160
lock.writeLock().unlock();
157161
}
158-
// Disable new tasks from being submitted
159-
executorService.shutdown();
162+
// Interrupts running listeners by signalling InterruptedException
163+
executorService.shutdownNow();
160164
try {
161-
// Wait a while for existing tasks to terminate
165+
// Hold until all the listeners exits
162166
if (!executorService.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
163-
// Cancel currently executing tasks
164-
executorService.shutdownNow();
167+
log.warn(
168+
"SharedProcessors wasn't gracefully terminated, there can be listener thread leakage");
165169
}
166170
} catch (InterruptedException e) {
167-
executorService.shutdownNow();
171+
log.error("Graceful shutdown process of SharedProcessors was interrupted");
168172
}
169173
}
170174
}

util/src/main/java/io/kubernetes/client/util/generic/GenericKubernetesApi.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -661,9 +661,10 @@ public Watchable<ApiType> watch(String namespace, final ListOptions listOptions)
661661
throw new IllegalArgumentException("invalid namespace");
662662
}
663663
Call call =
664-
customObjectsApi.listClusterCustomObjectCall(
664+
customObjectsApi.listNamespacedCustomObjectCall(
665665
this.apiGroup,
666666
this.apiVersion,
667+
namespace,
667668
this.resourcePlural,
668669
null,
669670
listOptions.getContinue(),

util/src/test/java/io/kubernetes/client/informer/cache/SharedProcessorTest.java

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -67,22 +67,6 @@ public void testListenerAddition() throws InterruptedException {
6767
public void testShutdownGracefully() throws InterruptedException {
6868
SharedProcessor<V1Pod> sharedProcessor =
6969
new SharedProcessor<>(Executors.newCachedThreadPool(), Duration.ofSeconds(5));
70-
TestWorker<V1Pod> quickWorker = new TestWorker<>(null, 0);
71-
quickWorker.setTask(
72-
() -> {
73-
try {
74-
// sleep 2s so that it could terminate within timeout(5s)
75-
Thread.sleep(2000);
76-
} catch (InterruptedException e) {
77-
}
78-
});
79-
long before = System.currentTimeMillis();
80-
sharedProcessor.addAndStartListener(quickWorker);
81-
sharedProcessor.stop();
82-
// the stopping worker properly blocks the processor's stop call
83-
assertTrue(System.currentTimeMillis() - before >= 2000);
84-
85-
sharedProcessor = new SharedProcessor<>(Executors.newCachedThreadPool(), Duration.ofSeconds(5));
8670
TestWorker<V1Pod> slowWorker = new TestWorker<>(null, 0);
8771
final boolean[] interrupted = {false};
8872
slowWorker.setTask(

util/src/test/java/io/kubernetes/client/util/generic/GenericKubernetesApiTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,11 @@
1919
import com.google.gson.Gson;
2020
import io.kubernetes.client.custom.V1Patch;
2121
import io.kubernetes.client.openapi.ApiClient;
22+
import io.kubernetes.client.openapi.ApiException;
2223
import io.kubernetes.client.openapi.models.*;
2324
import io.kubernetes.client.util.ClientBuilder;
25+
import io.kubernetes.client.util.Watchable;
26+
import io.kubernetes.client.util.generic.options.ListOptions;
2427
import java.io.IOException;
2528
import java.net.SocketTimeoutException;
2629
import java.util.concurrent.TimeUnit;
@@ -163,6 +166,20 @@ public void patchNamespacedJobReturningObject() {
163166
verify(1, patchRequestedFor(urlPathEqualTo("/apis/batch/v1/namespaces/default/jobs/foo1")));
164167
}
165168

169+
@Test
170+
public void watchNamespacedJobReturningObject() throws ApiException {
171+
V1JobList jobList = new V1JobList().kind("JobList").metadata(new V1ListMeta());
172+
173+
stubFor(
174+
get(urlPathEqualTo("/apis/batch/v1/namespaces/default/jobs"))
175+
.willReturn(aResponse().withStatus(200).withBody(new Gson().toJson(jobList))));
176+
Watchable<V1Job> jobListWatch = jobClient.watch("default", new ListOptions());
177+
verify(
178+
1,
179+
getRequestedFor(urlPathEqualTo("/apis/batch/v1/namespaces/default/jobs"))
180+
.withQueryParam("watch", equalTo("true")));
181+
}
182+
166183
@Test
167184
public void testReadTimeoutShouldThrowException() {
168185
ApiClient apiClient = new ClientBuilder().setBasePath("http://localhost:" + 8181).build();

0 commit comments

Comments
 (0)