Skip to content

Commit da36061

Browse files
committed
impl proress
Signed-off-by: Attila Mészáros <csviri@gmail.com>
1 parent 47ef983 commit da36061

File tree

6 files changed

+36
-18
lines changed

6 files changed

+36
-18
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/DependentResource.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ public interface DependentResource<R, P extends HasMetadata> {
3131
*/
3232
Class<R> resourceType();
3333

34+
// todo recreate the event source because of re-active use case?
3435
/**
3536
* Dependent resources are designed to by default provide event sources. There are cases where
3637
* they might not:

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/WorkflowCleanupExecutor.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,11 @@ protected void doRun(DependentResourceNode<R, P> dependentResourceNode,
6868
DependentResource<R, P> dependentResource) {
6969
var deletePostCondition = dependentResourceNode.getDeletePostcondition();
7070

71-
if (dependentResource.isDeletable()) {
71+
// todo test
72+
var active =
73+
isConditionMet(dependentResourceNode.getActivationCondition(), dependentResource);
74+
75+
if (dependentResource.isDeletable() && active) {
7276
((Deleter<P>) dependentResource).delete(primary, context);
7377
deleteCalled.add(dependentResourceNode);
7478
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/WorkflowReconcileExecutor.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -76,22 +76,23 @@ private synchronized <R> void handleReconcile(DependentResourceNode<R, P> depend
7676
}
7777
}
7878

79+
// todo IT test re-registration
7980
private <R> void registerEventSourceForActivationCondition(boolean activationConditionMet,
8081
DependentResourceNode<R, P> dependentResourceNode) {
8182
if (dependentResourceNode.getActivationCondition().isPresent()) {
83+
var eventSource =
84+
dependentResourceNode.getDependentResource().eventSource(context.eventSourceRetriever()
85+
.eventSourceContexForDynamicRegistration());
86+
8287
if (activationConditionMet) {
83-
// todo create issue for v5 to return name also from DependentResource.eventSource
84-
var eventSource =
85-
dependentResourceNode.getDependentResource().eventSource(context.eventSourceRetriever()
86-
.eventSourceContexForDynamicRegistration());
87-
88-
eventSource.ifPresent(es -> {
89-
// todo check if event source with the name not exists yet, if does do not register.
90-
context.eventSourceRetriever()
91-
.dynamicallyRegisterEventSource(dependentResourceNode.getName(), es);
92-
});
88+
89+
var es = eventSource.orElseThrow();
90+
context.eventSourceRetriever()
91+
.dynamicallyRegisterEventSource(dependentResourceNode.getName(), es);
92+
9393
} else {
94-
// todo deregister event source
94+
context.eventSourceRetriever()
95+
.dynamicallyDeRegisterEventSource(dependentResourceNode.getName());
9596
}
9697
}
9798
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -233,14 +233,20 @@ public <R> List<ResourceEventSource<R, P>> getResourceEventSourcesFor(Class<R> d
233233
}
234234

235235
@Override
236-
public void dynamicallyRegisterEventSource(String name, EventSource eventSource) {
237-
// todo not that other thread should wait for syncing (with start() within synchronized this is
238-
// automatically ensured)
236+
public synchronized void dynamicallyRegisterEventSource(String name, EventSource eventSource) {
237+
if (eventSources.existing(name, eventSource) != null) {
238+
return;
239+
}
240+
registerEventSource(name, eventSource);
241+
eventSource.start();
239242
}
240243

241244
@Override
242-
public void dynamicallyDeRegisterEventSource(String name) {
243-
// todo
245+
public synchronized void dynamicallyDeRegisterEventSource(String name) {
246+
EventSource es = eventSources.remove(name);
247+
if (es != null) {
248+
es.stop();
249+
}
244250
}
245251

246252
@Override

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceRetriever.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ default <R> ResourceEventSource<R, P> getResourceEventSourceFor(Class<R> depende
1919

2020
// todo javadocs
2121
// this will be an idempotent synchronized operation
22+
// todo check if event source with the name not exists yet, if does do not register.
2223
void dynamicallyRegisterEventSource(String name, EventSource eventSource);
2324

2425
void dynamicallyDeRegisterEventSource(String name);

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public void clear() {
7575
sources.clear();
7676
}
7777

78-
private NamedEventSource existing(String name, EventSource source) {
78+
public NamedEventSource existing(String name, EventSource source) {
7979
final var eventSources = sources.get(keyFor(source));
8080
if (eventSources == null || eventSources.isEmpty()) {
8181
return null;
@@ -183,4 +183,9 @@ public <S> List<ResourceEventSource<S, R>> getEventSources(Class<S> dependentTyp
183183
.map(es -> (ResourceEventSource<S, R>) es)
184184
.collect(Collectors.toList());
185185
}
186+
187+
public EventSource remove(String name) {
188+
var optionalMap = sources.values().stream().filter(m -> m.containsKey(name)).findFirst();
189+
return optionalMap.map(m -> m.remove(name)).orElse(null);
190+
}
186191
}

0 commit comments

Comments
 (0)