Skip to content

fix: improvements event source management #1239

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,15 @@ public EventSourceManager(Controller<R> controller) {
this.eventSources = eventSources;
this.controller = controller;
// controller event source needs to be available before we create the event processor
final var controllerEventSource = eventSources.initControllerEventSource(controller);
eventSources.initControllerEventSource(controller);
this.eventProcessor = new EventProcessor<>(this);

// sources need to be registered after the event processor is created since it's set on the
// event source
registerEventSource(eventSources.retryEventSource());
registerEventSource(controllerEventSource);
postProcessDefaultEventSources();
}

private void postProcessDefaultEventSources() {
eventSources.controllerResourceEventSource().setEventHandler(eventProcessor);
eventSources.retryEventSource().setEventHandler(eventProcessor);
}

/**
Expand Down Expand Up @@ -146,7 +148,7 @@ public void broadcastOnResourceEvent(ResourceAction action, R resource, R oldRes
public void changeNamespaces(Set<String> namespaces) {
eventProcessor.stop();
eventSources
.allEventSources()
.eventSources()
.filter(NamespaceChangeable.class::isInstance)
.map(NamespaceChangeable.class::cast)
.filter(NamespaceChangeable::allowsNamespaceChanges)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@

class EventSources<R extends HasMetadata> implements Iterable<NamedEventSource> {

private static final String CONTROLLER_EVENT_SOURCE_KEY = "0";
public static final String CONTROLLER_RESOURCE_EVENT_SOURCE_NAME =
"ControllerResourceEventSource";
public static final String RETRY_RESCHEDULE_TIMER_EVENT_SOURCE_NAME =
"RetryAndRescheduleTimerEventSource";

private final ConcurrentNavigableMap<String, Map<String, EventSource>> sources =
new ConcurrentSkipListMap<>();
private final TimerEventSource<R> retryAndRescheduleTimerEventSource = new TimerEventSource<>();
Expand All @@ -38,17 +42,21 @@ TimerEventSource<R> retryEventSource() {

@Override
public Iterator<NamedEventSource> iterator() {
return flatMappedSources().iterator();
return Stream.concat(Stream.of(
new NamedEventSource(controllerResourceEventSource, CONTROLLER_RESOURCE_EVENT_SOURCE_NAME),
new NamedEventSource(retryAndRescheduleTimerEventSource,
RETRY_RESCHEDULE_TIMER_EVENT_SOURCE_NAME)),
flatMappedSources()).iterator();
}

Stream<NamedEventSource> flatMappedSources() {
return sources.values().stream().flatMap(c -> c.entrySet().stream()
.map(esEntry -> new NamedEventSource(esEntry.getValue(), esEntry.getKey())));
}

Stream<EventSource> allEventSources() {
Stream<EventSource> eventSources() {
return Stream.concat(
Stream.of(retryEventSource(), controllerResourceEventSource()).filter(Objects::nonNull),
Stream.of(controllerResourceEventSource(), retryEventSource()).filter(Objects::nonNull),
sources.values().stream().flatMap(c -> c.values().stream()));
}

Expand Down Expand Up @@ -81,9 +89,6 @@ private Class<?> getResourceType(EventSource source) {
}

private String keyFor(EventSource source) {
if (source instanceof ControllerResourceEventSource) {
return CONTROLLER_EVENT_SOURCE_KEY;
}
return keyFor(getResourceType(source));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.javaoperatorsdk.operator.processing.event;

import java.util.Objects;

import io.javaoperatorsdk.operator.OperatorException;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;

Expand Down Expand Up @@ -40,4 +42,19 @@ public String toString() {
public EventSource original() {
return original;
}

@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
NamedEventSource that = (NamedEventSource) o;
return Objects.equals(original, that.original) && Objects.equals(name, that.name);
}

@Override
public int hashCode() {
return Objects.hash(original, name);
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
package io.javaoperatorsdk.operator.processing.event;

import java.util.Set;
import java.util.stream.Collectors;

import org.junit.jupiter.api.Test;

import io.fabric8.kubernetes.api.model.HasMetadata;
Expand All @@ -12,12 +9,16 @@
import io.javaoperatorsdk.operator.processing.Controller;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;

import static io.javaoperatorsdk.operator.processing.event.EventSources.CONTROLLER_RESOURCE_EVENT_SOURCE_NAME;
import static io.javaoperatorsdk.operator.processing.event.EventSources.RETRY_RESCHEDULE_TIMER_EVENT_SOURCE_NAME;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.mock;

@SuppressWarnings({"unchecked", "rawtypes"})
class EventSourcesTest {

public static final String EVENT_SOURCE_NAME = "foo";
EventSources eventSources = new EventSources();

@Test
Expand All @@ -31,20 +32,41 @@ void cannotAddTwoEventSourcesWithSameName() {
@Test
void allEventSourcesShouldReturnAll() {
// initial state doesn't have ControllerResourceEventSource
assertEquals(Set.of(eventSources.retryEventSource()), eventSources.allEventSources().collect(
Collectors.toSet()));
assertThat(eventSources.eventSources()).containsExactly(eventSources.retryEventSource());

initControllerEventSource();

assertThat(eventSources.eventSources()).containsExactly(
eventSources.controllerResourceEventSource(),
eventSources.retryEventSource());

final var source = mock(EventSource.class);
eventSources.add(EVENT_SOURCE_NAME, source);
// order matters
assertThat(eventSources.eventSources())
.containsExactly(eventSources.controllerResourceEventSource(),
eventSources.retryEventSource(), source);
}

@Test
void eventSourcesIteratorShouldReturnControllerEventSourceAsFirst() {
initControllerEventSource();
final var source = mock(EventSource.class);
eventSources.add(EVENT_SOURCE_NAME, source);

assertThat(eventSources.iterator()).toIterable().containsExactly(
new NamedEventSource(eventSources.controllerResourceEventSource(),
CONTROLLER_RESOURCE_EVENT_SOURCE_NAME),
new NamedEventSource(eventSources.retryEventSource(),
RETRY_RESCHEDULE_TIMER_EVENT_SOURCE_NAME),
new NamedEventSource(source, EVENT_SOURCE_NAME));
}

private void initControllerEventSource() {
final var configuration = MockControllerConfiguration.forResource(HasMetadata.class);
final var controller = new Controller(mock(Reconciler.class), configuration,
MockKubernetesClient.client(HasMetadata.class));
eventSources.initControllerEventSource(controller);
assertEquals(
Set.of(eventSources.retryEventSource(), eventSources.controllerResourceEventSource()),
eventSources.allEventSources().collect(Collectors.toSet()));
final var source = mock(EventSource.class);
eventSources.add("foo", source);
assertEquals(Set.of(eventSources.retryEventSource(),
eventSources.controllerResourceEventSource(), source),
eventSources.allEventSources().collect(Collectors.toSet()));
}

}