24
24
import io .javaoperatorsdk .operator .ReconcilerUtils ;
25
25
import io .javaoperatorsdk .operator .api .config .Cloner ;
26
26
import io .javaoperatorsdk .operator .api .config .ConfigurationServiceProvider ;
27
- import io .javaoperatorsdk .operator .api .config .ExecutorServiceManager ;
28
27
import io .javaoperatorsdk .operator .api .config .ResourceConfiguration ;
29
28
import io .javaoperatorsdk .operator .processing .LifecycleAware ;
30
29
import io .javaoperatorsdk .operator .processing .event .ResourceID ;
@@ -47,22 +46,7 @@ public class InformerManager<T extends HasMetadata, C extends ResourceConfigurat
47
46
@ Override
48
47
public void start () throws OperatorException {
49
48
// make sure informers are all started before proceeding further
50
- ExecutorServiceManager .executeAndWaitForCompletion (
51
- () -> sources .values ().parallelStream ().forEach (source -> {
52
- // change thread name for easier debugging
53
- final var thread = Thread .currentThread ();
54
- final var name = thread .getName ();
55
- try {
56
- thread .setName (source .informerInfo () + " " + thread .getId ());
57
- source .start ();
58
- } catch (Exception e ) {
59
- throw new OperatorException ("Couldn't start informer: " + source , e );
60
- } finally {
61
- // restore original name
62
- thread .setName (name );
63
- }
64
- }),
65
- "InformerStart" );
49
+ sources .values ().parallelStream ().forEach (InformerWrapper ::start );
66
50
}
67
51
68
52
void initSources (MixedOperation <T , KubernetesResourceList <T >, Resource <T >> client ,
@@ -103,19 +87,18 @@ public void changeNamespaces(Set<String> namespaces) {
103
87
log .debug ("Stopped informer {} for namespaces: {}" , this , sourcesToRemove );
104
88
sourcesToRemove .forEach (k -> sources .remove (k ).stop ());
105
89
106
- ExecutorServiceManager .executeAndWaitForCompletion (
107
- () -> namespaces .forEach (ns -> {
108
- if (!sources .containsKey (ns )) {
109
- final var source =
110
- createEventSource (
111
- client .inNamespace (ns ).withLabelSelector (configuration .getLabelSelector ()),
112
- eventHandler , ns );
113
- source .addIndexers (this .indexers );
114
- source .start ();
115
- log .debug ("Registered new {} -> {} for namespace: {}" , this , source ,
116
- ns );
117
- }
118
- }), "InformerStart" );
90
+ namespaces .forEach (ns -> {
91
+ if (!sources .containsKey (ns )) {
92
+ final var source =
93
+ createEventSource (
94
+ client .inNamespace (ns ).withLabelSelector (configuration .getLabelSelector ()),
95
+ eventHandler , ns );
96
+ source .addIndexers (this .indexers );
97
+ source .start ();
98
+ log .debug ("Registered new {} -> {} for namespace: {}" , this , source ,
99
+ ns );
100
+ }
101
+ });
119
102
}
120
103
121
104
@@ -131,19 +114,15 @@ private InformerWrapper<T> createEventSource(
131
114
132
115
@ Override
133
116
public void stop () {
134
- ExecutorServiceManager .executeAndWaitForCompletion (
135
- () -> {
136
- log .info ("Stopping {}" , this );
137
- sources .forEach ((ns , source ) -> {
138
- try {
139
- log .debug ("Stopping informer for namespace: {} -> {}" , ns , source );
140
- source .stop ();
141
- } catch (Exception e ) {
142
- log .warn ("Error stopping informer for namespace: {} -> {}" , ns , source , e );
143
- }
144
- });
145
- },
146
- "StopInformer" );
117
+ log .info ("Stopping {}" , this );
118
+ sources .forEach ((ns , source ) -> {
119
+ try {
120
+ log .debug ("Stopping informer for namespace: {} -> {}" , ns , source );
121
+ source .stop ();
122
+ } catch (Exception e ) {
123
+ log .warn ("Error stopping informer for namespace: {} -> {}" , ns , source , e );
124
+ }
125
+ });
147
126
}
148
127
149
128
@ Override
0 commit comments