Skip to content

Commit 02507ad

Browse files
committed
refactor: remove EventMarker altogether
1 parent bdf1ede commit 02507ad

File tree

6 files changed

+215
-247
lines changed

6 files changed

+215
-247
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventMarker.java

Lines changed: 0 additions & 106 deletions
This file was deleted.

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java

Lines changed: 51 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
1515
import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
1616
import io.javaoperatorsdk.operator.api.monitoring.Metrics;
17-
import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
1817
import io.javaoperatorsdk.operator.processing.LifecycleAware;
1918
import io.javaoperatorsdk.operator.processing.MDCUtils;
2019
import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter;
@@ -41,7 +40,7 @@ class EventProcessor<R extends HasMetadata> implements EventHandler, LifecycleAw
4140
private final Metrics metrics;
4241
private final Cache<R> cache;
4342
private final EventSourceManager<R> eventSourceManager;
44-
private final EventMarker eventMarker = new EventMarker();
43+
// private final EventMarker eventMarker = new EventMarker();
4544
private final RateLimiter<? extends RateLimitState> rateLimiter;
4645

4746
private final ResourceStateManager resourceStateManager = new ResourceStateManager();
@@ -108,77 +107,76 @@ public synchronized void handleEvent(Event event) {
108107
log.debug("Received event: {}", event);
109108

110109
final var resourceID = event.getRelatedCustomResourceID();
110+
final var state = resourceStateManager.getOrCreate(event.getRelatedCustomResourceID());
111111
MDCUtils.addResourceIDInfo(resourceID);
112112
metrics.receivedEvent(event);
113-
handleEventMarking(event);
113+
handleEventMarking(event, state);
114114
if (!this.running) {
115115
// events are received and marked, but will be processed when started, see start() method.
116116
log.debug("Skipping event: {} because the event processor is not started", event);
117117
return;
118118
}
119-
handleMarkedEventForResource(resourceID);
119+
handleMarkedEventForResource(state);
120120
} finally {
121121
MDCUtils.removeResourceIDInfo();
122122
}
123123
}
124124

125-
private void handleMarkedEventForResource(ResourceID resourceID) {
126-
if (eventMarker.deleteEventPresent(resourceID)) {
127-
cleanupForDeletedEvent(resourceID);
128-
} else if (!eventMarker.processedMarkForDeletionPresent(resourceID)) {
129-
submitReconciliationExecution(resourceID);
125+
private void handleMarkedEventForResource(ResourceState state) {
126+
if (state.deleteEventPresent()) {
127+
cleanupForDeletedEvent(state.getId());
128+
} else if (!state.processedMarkForDeletionPresent()) {
129+
submitReconciliationExecution(state);
130130
}
131131
}
132132

133-
private void submitReconciliationExecution(ResourceID resourceID) {
133+
private void submitReconciliationExecution(ResourceState state) {
134134
try {
135-
boolean controllerUnderExecution = isControllerUnderExecution(resourceID);
136-
Optional<R> latest = cache.get(resourceID);
135+
boolean controllerUnderExecution = isControllerUnderExecution(state);
136+
Optional<R> latest = cache.get(state.getId());
137137
latest.ifPresent(MDCUtils::addResourceInfo);
138138
if (!controllerUnderExecution && latest.isPresent()) {
139-
final var resourceState = resourceStateManager.getOrCreate(resourceID);
140-
var rateLimit = resourceState.getRateLimit();
139+
var rateLimit = state.getRateLimit();
141140
if (rateLimit == null) {
142141
rateLimit = rateLimiter.initState();
143-
resourceState.setRateLimit(rateLimit);
142+
state.setRateLimit(rateLimit);
144143
}
145144
var rateLimiterPermission = rateLimiter.isLimited(rateLimit);
146145
if (rateLimiterPermission.isPresent()) {
147-
handleRateLimitedSubmission(resourceID, rateLimiterPermission.get());
146+
handleRateLimitedSubmission(state.getId(), rateLimiterPermission.get());
148147
return;
149148
}
150-
setUnderExecutionProcessing(resourceID);
151-
final var retryInfo = retryInfo(resourceID);
149+
state.setUnderProcessing(true);
150+
final var retryInfo = state.getRetry();
152151
ExecutionScope<R> executionScope = new ExecutionScope<>(latest.get(), retryInfo);
153-
eventMarker.unMarkEventReceived(resourceID);
154-
metrics.reconcileCustomResource(resourceID, retryInfo);
152+
state.unMarkEventReceived();
153+
metrics.reconcileCustomResource(state.getId(), retryInfo);
155154
log.debug("Executing events for custom resource. Scope: {}", executionScope);
156155
executor.execute(new ControllerExecution(executionScope));
157156
} else {
158157
log.debug(
159158
"Skipping executing controller for resource id: {}. Controller in execution: {}. Latest Resource present: {}",
160-
resourceID,
159+
state,
161160
controllerUnderExecution,
162161
latest.isPresent());
163162
if (latest.isEmpty()) {
164-
log.debug("no custom resource found in cache for ResourceID: {}", resourceID);
163+
log.debug("no custom resource found in cache for ResourceID: {}", state);
165164
}
166165
}
167166
} finally {
168167
MDCUtils.removeResourceInfo();
169168
}
170169
}
171170

172-
private void handleEventMarking(Event event) {
171+
private void handleEventMarking(Event event, ResourceState state) {
173172
final var relatedCustomResourceID = event.getRelatedCustomResourceID();
174173
if (event instanceof ResourceEvent) {
175174
var resourceEvent = (ResourceEvent) event;
176175
if (resourceEvent.getAction() == ResourceAction.DELETED) {
177176
log.debug("Marking delete event received for: {}", relatedCustomResourceID);
178-
eventMarker.markDeleteEventReceived(event);
177+
state.markDeleteEventReceived();
179178
} else {
180-
if (eventMarker.processedMarkForDeletionPresent(relatedCustomResourceID)
181-
&& isResourceMarkedForDeletion(resourceEvent)) {
179+
if (state.processedMarkForDeletionPresent() && isResourceMarkedForDeletion(resourceEvent)) {
182180
log.debug(
183181
"Skipping mark of event received, since already processed mark for deletion and resource marked for deletion: {}",
184182
relatedCustomResourceID);
@@ -190,22 +188,21 @@ && isResourceMarkedForDeletion(resourceEvent)) {
190188
// removed, but also the informers websocket is disconnected and later reconnected. So
191189
// meanwhile the resource could be deleted and recreated. In this case we just mark a new
192190
// event as below.
193-
markEventReceived(event);
191+
markEventReceived(state);
194192
}
195-
} else if (!eventMarker.deleteEventPresent(relatedCustomResourceID) ||
196-
!eventMarker.processedMarkForDeletionPresent(relatedCustomResourceID)) {
197-
markEventReceived(event);
193+
} else if (!state.deleteEventPresent() || !state.processedMarkForDeletionPresent()) {
194+
markEventReceived(state);
198195
} else if (log.isDebugEnabled()) {
199196
log.debug(
200197
"Skipped marking event as received. Delete event present: {}, processed mark for deletion: {}",
201-
eventMarker.deleteEventPresent(relatedCustomResourceID),
202-
eventMarker.processedMarkForDeletionPresent(relatedCustomResourceID));
198+
state.deleteEventPresent(),
199+
state.processedMarkForDeletionPresent());
203200
}
204201
}
205202

206-
private void markEventReceived(Event event) {
207-
log.debug("Marking event received for: {}", event.getRelatedCustomResourceID());
208-
eventMarker.markEventReceived(event);
203+
private void markEventReceived(ResourceState state) {
204+
log.debug("Marking event received for: {}", state.getId());
205+
state.markEventReceived();
209206
}
210207

211208
private boolean isResourceMarkedForDeletion(ResourceEvent resourceEvent) {
@@ -220,16 +217,13 @@ private void handleRateLimitedSubmission(ResourceID resourceID, Duration minimal
220217
Math.max(minimalDurationMillis, MINIMAL_RATE_LIMIT_RESCHEDULE_DURATION));
221218
}
222219

223-
private RetryInfo retryInfo(ResourceID resourceID) {
224-
return resourceStateManager.getOrCreate(resourceID).getRetry();
225-
}
226-
227220
synchronized void eventProcessingFinished(
228221
ExecutionScope<R> executionScope, PostExecutionControl<R> postExecutionControl) {
229222
if (!running) {
230223
return;
231224
}
232225
ResourceID resourceID = executionScope.getResourceID();
226+
final var state = resourceStateManager.getOrCreate(resourceID);
233227
log.debug(
234228
"Event processing finished. Scope: {}, PostExecutionControl: {}",
235229
executionScope,
@@ -241,17 +235,17 @@ synchronized void eventProcessingFinished(
241235
// Either way we don't want to retry.
242236
if (isRetryConfigured()
243237
&& postExecutionControl.exceptionDuringExecution()
244-
&& !eventMarker.deleteEventPresent(resourceID)) {
238+
&& !state.deleteEventPresent()) {
245239
handleRetryOnException(
246240
executionScope, postExecutionControl.getRuntimeException().orElseThrow());
247241
return;
248242
}
249243
cleanupOnSuccessfulExecution(executionScope);
250244
metrics.finishedReconciliation(resourceID);
251-
if (eventMarker.deleteEventPresent(resourceID)) {
245+
if (state.deleteEventPresent()) {
252246
cleanupForDeletedEvent(executionScope.getResourceID());
253247
} else if (postExecutionControl.isFinalizerRemoved()) {
254-
eventMarker.markProcessedMarkForDeletion(resourceID);
248+
state.markProcessedMarkForDeletion();
255249
} else {
256250
postExecutionControl
257251
.getUpdatedCustomResource()
@@ -264,8 +258,8 @@ synchronized void eventProcessingFinished(
264258
ResourceID.fromResource(r), r, executionScope.getResource());
265259
}
266260
});
267-
if (eventMarker.eventPresent(resourceID)) {
268-
submitReconciliationExecution(resourceID);
261+
if (state.eventPresent()) {
262+
submitReconciliationExecution(state);
269263
} else {
270264
reScheduleExecutionIfInstructed(postExecutionControl, executionScope.getResource());
271265
}
@@ -296,17 +290,17 @@ TimerEventSource<R> retryEventSource() {
296290
*/
297291
private void handleRetryOnException(
298292
ExecutionScope<R> executionScope, Exception exception) {
299-
RetryExecution execution = getOrInitRetryExecution(executionScope);
300-
var resourceID = executionScope.getResourceID();
301-
boolean eventPresent = eventMarker.eventPresent(resourceID);
302-
eventMarker.markEventReceived(resourceID);
293+
final var state = getOrInitRetryExecution(executionScope);
294+
var resourceID = state.getId();
295+
boolean eventPresent = state.eventPresent();
296+
state.markEventReceived();
303297

304298
if (eventPresent) {
305299
log.debug("New events exists for for resource id: {}", resourceID);
306-
submitReconciliationExecution(resourceID);
300+
submitReconciliationExecution(state);
307301
return;
308302
}
309-
Optional<Long> nextDelay = execution.nextDelay();
303+
Optional<Long> nextDelay = state.getRetry().nextDelay();
310304

311305
nextDelay.ifPresentOrElse(
312306
delay -> {
@@ -329,29 +323,24 @@ private void cleanupOnSuccessfulExecution(ExecutionScope<R> executionScope) {
329323
retryEventSource().cancelOnceSchedule(executionScope.getResourceID());
330324
}
331325

332-
private RetryExecution getOrInitRetryExecution(ExecutionScope<R> executionScope) {
326+
private ResourceState getOrInitRetryExecution(ExecutionScope<R> executionScope) {
333327
final var state = resourceStateManager.getOrCreate(executionScope.getResourceID());
334328
RetryExecution retryExecution = state.getRetry();
335329
if (retryExecution == null) {
336330
retryExecution = retry.initExecution();
337331
state.setRetry(retryExecution);
338332
}
339-
return retryExecution;
333+
return state;
340334
}
341335

342336
private void cleanupForDeletedEvent(ResourceID resourceID) {
343337
log.debug("Cleaning up for delete event for: {}", resourceID);
344-
eventMarker.cleanup(resourceID);
345338
resourceStateManager.remove(resourceID);
346339
metrics.cleanupDoneFor(resourceID);
347340
}
348341

349-
private boolean isControllerUnderExecution(ResourceID resourceID) {
350-
return resourceStateManager.getOrCreate(resourceID).isUnderProcessing();
351-
}
352-
353-
private void setUnderExecutionProcessing(ResourceID resourceID) {
354-
resourceStateManager.getOrCreate(resourceID).setUnderProcessing(true);
342+
private boolean isControllerUnderExecution(ResourceState state) {
343+
return state.isUnderProcessing();
355344
}
356345

357346
private void unsetUnderExecution(ResourceID resourceID) {
@@ -374,8 +363,8 @@ public void start() throws OperatorException {
374363
}
375364

376365
private void handleAlreadyMarkedEvents() {
377-
for (ResourceID resourceID : eventMarker.resourceIDsWithEventPresent()) {
378-
handleMarkedEventForResource(resourceID);
366+
for (var state : resourceStateManager.resourcesWithEventPresent()) {
367+
handleMarkedEventForResource(state);
379368
}
380369
}
381370

@@ -411,6 +400,6 @@ public String toString() {
411400
}
412401

413402
public synchronized boolean isUnderProcessing(ResourceID resourceID) {
414-
return isControllerUnderExecution(resourceID);
403+
return isControllerUnderExecution(resourceStateManager.getOrCreate(resourceID));
415404
}
416405
}

0 commit comments

Comments
 (0)