|
23 | 23 | import java.util.concurrent.locks.ReentrantReadWriteLock;
|
24 | 24 | import org.apache.commons.collections4.CollectionUtils;
|
25 | 25 | import org.joda.time.DateTime;
|
| 26 | +import org.slf4j.Logger; |
| 27 | +import org.slf4j.LoggerFactory; |
26 | 28 |
|
27 | 29 | /*
|
28 | 30 | * SharedProcessor class manages all the registered ProcessorListener and distributes notifications.
|
29 | 31 | */
|
30 | 32 | public class SharedProcessor<ApiType extends KubernetesObject> {
|
31 | 33 |
|
| 34 | + private static final Logger log = LoggerFactory.getLogger(SharedProcessor.class); |
| 35 | + |
32 | 36 | private ReadWriteLock lock = new ReentrantReadWriteLock();
|
33 | 37 |
|
34 | 38 | private List<ProcessorListener<ApiType>> listeners;
|
@@ -155,16 +159,16 @@ public void stop() {
|
155 | 159 | } finally {
|
156 | 160 | lock.writeLock().unlock();
|
157 | 161 | }
|
158 |
| - // Disable new tasks from being submitted |
159 |
| - executorService.shutdown(); |
| 162 | + // Interrupts running listeners by signalling InterruptedException |
| 163 | + executorService.shutdownNow(); |
160 | 164 | try {
|
161 |
| - // Wait a while for existing tasks to terminate |
| 165 | + // Hold until all the listeners exits |
162 | 166 | if (!executorService.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
|
163 |
| - // Cancel currently executing tasks |
164 |
| - executorService.shutdownNow(); |
| 167 | + log.warn( |
| 168 | + "SharedProcessors wasn't gracefully terminated, there can be listener thread leakage"); |
165 | 169 | }
|
166 | 170 | } catch (InterruptedException e) {
|
167 |
| - executorService.shutdownNow(); |
| 171 | + log.error("Graceful shutdown process of SharedProcessors was interrupted"); |
168 | 172 | }
|
169 | 173 | }
|
170 | 174 | }
|
0 commit comments