Skip to content

Commit faba864

Browse files
committed
feat: merge EventSource and EventSourceMetadata
Signed-off-by: Chris Laprun <claprun@redhat.com>
1 parent 09280cf commit faba864

File tree

5 files changed

+57
-41
lines changed

5 files changed

+57
-41
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import java.util.Set;
88
import java.util.function.Function;
99
import java.util.stream.Collectors;
10+
import java.util.stream.Stream;
1011

1112
import org.slf4j.Logger;
1213
import org.slf4j.LoggerFactory;
@@ -81,6 +82,7 @@ public synchronized void start() {
8182
getThreadNamer("start"));
8283
}
8384

85+
@SuppressWarnings("rawtypes")
8486
private static Function<EventSource, String> getThreadNamer(String stage) {
8587
return es -> es.priority() + " " + stage + " -> " + es.name();
8688
}
@@ -106,7 +108,7 @@ private void logEventSourceEvent(EventSource eventSource, String event) {
106108
}
107109
}
108110

109-
private Void startEventSource(EventSource eventSource) {
111+
private <R> Void startEventSource(EventSource<R, P> eventSource) {
110112
try {
111113
logEventSourceEvent(eventSource, "Starting");
112114
eventSource.start();
@@ -119,7 +121,7 @@ private Void startEventSource(EventSource eventSource) {
119121
return null;
120122
}
121123

122-
private Void stopEventSource(EventSource eventSource) {
124+
private <R> Void stopEventSource(EventSource<R, P> eventSource) {
123125
try {
124126
logEventSourceEvent(eventSource, "Stopping");
125127
eventSource.stop();
@@ -131,7 +133,7 @@ private Void stopEventSource(EventSource eventSource) {
131133
}
132134

133135
@SuppressWarnings("rawtypes")
134-
public final synchronized void registerEventSource(EventSource eventSource)
136+
public final synchronized <R> void registerEventSource(EventSource<R, P> eventSource)
135137
throws OperatorException {
136138
Objects.requireNonNull(eventSource, "EventSource must not be null");
137139
try {
@@ -184,16 +186,22 @@ public void changeNamespaces(Set<String> namespaces) {
184186
getEventSourceThreadNamer("changeNamespace"));
185187
}
186188

187-
public Set<EventSource> getRegisteredEventSources() {
189+
public Set<EventSource<?, P>> getRegisteredEventSources() {
188190
return eventSources.flatMappedSources()
189-
190191
.collect(Collectors.toCollection(LinkedHashSet::new));
191192
}
192193

194+
@SuppressWarnings("rawtypes")
193195
public List<EventSource> allEventSources() {
194196
return eventSources.allEventSources().toList();
195197
}
196198

199+
200+
@SuppressWarnings("unused")
201+
public Stream<? extends EventSource<?, P>> getEventSourcesStream() {
202+
return eventSources.flatMappedSources();
203+
}
204+
197205
public ControllerEventSource<P> getControllerEventSource() {
198206
return eventSources.controllerEventSource();
199207
}
@@ -203,7 +211,7 @@ public <R> List<EventSource<R, P>> getEventSourcesFor(Class<R> dependentType) {
203211
}
204212

205213
@Override
206-
public EventSource dynamicallyRegisterEventSource(EventSource eventSource) {
214+
public <R> EventSource<R, P> dynamicallyRegisterEventSource(EventSource<R, P> eventSource) {
207215
synchronized (this) {
208216
var actual = eventSources.existingEventSourceOfSameNameAndType(eventSource);
209217
if (actual != null) {
@@ -219,8 +227,10 @@ public EventSource dynamicallyRegisterEventSource(EventSource eventSource) {
219227
}
220228

221229
@Override
222-
public synchronized Optional<EventSource> dynamicallyDeRegisterEventSource(String name) {
223-
EventSource es = eventSources.remove(name);
230+
public synchronized <R> Optional<EventSource<R, P>> dynamicallyDeRegisterEventSource(
231+
String name) {
232+
@SuppressWarnings("unchecked")
233+
EventSource<R, P> es = eventSources.remove(name);
224234
if (es != null) {
225235
es.stop();
226236
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceMetadata.java

Lines changed: 0 additions & 13 deletions
This file was deleted.

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,56 +15,60 @@
1515
import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerEventSource;
1616
import io.javaoperatorsdk.operator.processing.event.source.timer.TimerEventSource;
1717

18-
class EventSources<R extends HasMetadata> {
18+
class EventSources<P extends HasMetadata> {
1919

20-
private final ConcurrentNavigableMap<String, Map<String, EventSource>> sources =
20+
private final ConcurrentNavigableMap<String, Map<String, EventSource<?, P>>> sources =
2121
new ConcurrentSkipListMap<>();
22-
private final TimerEventSource<R> retryAndRescheduleTimerEventSource =
22+
private final TimerEventSource<P> retryAndRescheduleTimerEventSource =
2323
new TimerEventSource<>("RetryAndRescheduleTimerEventSource");
24-
private ControllerEventSource<R> controllerEventSource;
24+
private ControllerEventSource<P> controllerEventSource;
2525

2626

27-
void createControllerEventSource(Controller<R> controller) {
27+
void createControllerEventSource(Controller<P> controller) {
2828
controllerEventSource = new ControllerEventSource<>(controller);
2929
}
3030

31-
public ControllerEventSource<R> controllerEventSource() {
31+
public ControllerEventSource<P> controllerEventSource() {
3232
return controllerEventSource;
3333
}
3434

35-
TimerEventSource<R> retryEventSource() {
35+
TimerEventSource<P> retryEventSource() {
3636
return retryAndRescheduleTimerEventSource;
3737
}
3838

39+
@SuppressWarnings("rawtypes")
3940
public Stream<EventSource> allEventSources() {
4041
return Stream.concat(
4142
Stream.of(controllerEventSource(), retryAndRescheduleTimerEventSource),
4243
flatMappedSources());
4344
}
4445

46+
@SuppressWarnings("rawtypes")
4547
Stream<EventSource> additionalEventSources() {
4648
return Stream.concat(
4749
Stream.of(retryEventSource()).filter(Objects::nonNull),
4850
flatMappedSources());
4951
}
5052

51-
Stream<EventSource> flatMappedSources() {
53+
Stream<EventSource<?, P>> flatMappedSources() {
5254
return sources.values().stream().flatMap(c -> c.values().stream());
5355
}
5456

5557
public void clear() {
5658
sources.clear();
5759
}
5860

59-
public EventSource existingEventSourceOfSameNameAndType(EventSource source) {
60-
return existingEventSourceOfSameType(source).get(source.name());
61+
@SuppressWarnings("unchecked")
62+
public <R> EventSource<R, P> existingEventSourceOfSameNameAndType(EventSource<R, P> source) {
63+
return (EventSource<R, P>) existingEventSourceOfSameType(source).get(source.name());
6164
}
6265

63-
public Map<String, EventSource> existingEventSourceOfSameType(EventSource source) {
66+
private <R> Map<String, EventSource<?, P>> existingEventSourceOfSameType(
67+
EventSource<R, P> source) {
6468
return sources.getOrDefault(keyFor(source), Collections.emptyMap());
6569
}
6670

67-
public void add(EventSource eventSource) {
71+
public <R> void add(EventSource<R, P> eventSource) {
6872
final var name = eventSource.name();
6973
final var existing = existingEventSourceOfSameType(eventSource);
7074
if (existing.get(name) != null) {
@@ -75,7 +79,7 @@ public void add(EventSource eventSource) {
7579
sources.computeIfAbsent(keyFor(eventSource), k -> new HashMap<>()).put(name, eventSource);
7680
}
7781

78-
private String keyFor(EventSource source) {
82+
private <R> String keyFor(EventSource<R, P> source) {
7983
return keyFor(source.resourceType());
8084
}
8185

@@ -84,7 +88,7 @@ private String keyFor(Class<?> dependentType) {
8488
}
8589

8690
@SuppressWarnings("unchecked")
87-
public <S> EventSource<S, R> get(Class<S> dependentType, String name) {
91+
public <S> EventSource<S, P> get(Class<S> dependentType, String name) {
8892
if (dependentType == null) {
8993
throw new IllegalArgumentException("Must pass a dependent type to retrieve event sources");
9094
}
@@ -96,17 +100,17 @@ public <S> EventSource<S, R> get(Class<S> dependentType, String name) {
96100
}
97101

98102
final var size = sourcesForType.size();
99-
EventSource source;
103+
EventSource<S, P> source;
100104
if (size == 1 && name == null) {
101-
source = sourcesForType.values().stream().findFirst().orElseThrow();
105+
source = (EventSource<S, P>) sourcesForType.values().stream().findFirst().orElseThrow();
102106
} else {
103107
if (name == null || name.isBlank()) {
104108
throw new IllegalArgumentException("There are multiple EventSources registered for type "
105109
+ dependentType.getCanonicalName()
106110
+ ", you need to provide a name to specify which EventSource you want to query. Known names: "
107111
+ String.join(",", sourcesForType.keySet()));
108112
}
109-
source = sourcesForType.get(name);
113+
source = (EventSource<S, P>) sourcesForType.get(name);
110114

111115
if (source == null) {
112116
throw new IllegalArgumentException("There is no event source found for class:" +
@@ -132,16 +136,17 @@ private String keyAsString(Class dependentType, String name) {
132136
}
133137

134138
@SuppressWarnings("unchecked")
135-
public <S> List<EventSource<S, R>> getEventSources(Class<S> dependentType) {
139+
public <S> List<EventSource<S, P>> getEventSources(Class<S> dependentType) {
136140
final var sourcesForType = sources.get(keyFor(dependentType));
137141
if (sourcesForType == null) {
138142
return Collections.emptyList();
139143
}
140144

141145
return sourcesForType.values().stream()
142-
.map(es -> (EventSource<S, R>) es).toList();
146+
.map(es -> (EventSource<S, P>) es).toList();
143147
}
144148

149+
@SuppressWarnings("rawtypes")
145150
public EventSource remove(String name) {
146151
var optionalMap = sources.values().stream().filter(m -> m.containsKey(name)).findFirst();
147152
return optionalMap.map(m -> m.remove(name)).orElse(null);

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/EventSource.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,20 @@ default EventSourceStartPriority priority() {
4444
*/
4545
Class<R> resourceType();
4646

47+
/**
48+
* Retrieves this EventSource's configuration if it exists.
49+
*
50+
* @return this EventSource's configuration if it exists
51+
* @since 5.0.0
52+
*/
53+
@SuppressWarnings({"rawtypes", "unused"})
54+
default Optional<?> optionalConfiguration() {
55+
if (this instanceof Configurable configurable) {
56+
return Optional.ofNullable(configurable.configuration());
57+
}
58+
return Optional.empty();
59+
}
60+
4761
default Optional<R> getSecondaryResource(P primary) {
4862
var resources = getSecondaryResources(primary);
4963
if (resources.isEmpty()) {

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourceManagerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public void registersEventSource() {
3939

4040
eventSourceManager.registerEventSource(eventSource);
4141

42-
Set<EventSource> registeredSources = eventSourceManager.getRegisteredEventSources();
42+
final var registeredSources = eventSourceManager.getRegisteredEventSources();
4343
assertThat(registeredSources).contains(eventSource);
4444

4545
verify(eventSource, times(1)).setEventHandler(any());

0 commit comments

Comments
 (0)