Skip to content

Commit 84b08d0

Browse files
morhiditweise
authored andcommitted
[FLINK-29251] Send CREATED status and Cancel event via FlinkResourceListener
1 parent e5a325c commit 84b08d0

File tree

8 files changed

+50
-27
lines changed

8 files changed

+50
-27
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,14 @@ public FlinkDeploymentController(
8484

8585
@Override
8686
public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) {
87-
LOG.info("Deleting FlinkDeployment");
87+
String msg = "Cleaning up " + FlinkDeployment.class.getSimpleName();
88+
LOG.info(msg);
89+
eventRecorder.triggerEvent(
90+
flinkApp,
91+
EventRecorder.Type.Normal,
92+
EventRecorder.Reason.Cleanup,
93+
EventRecorder.Component.Operator,
94+
msg);
8895
statusRecorder.updateStatusFromCache(flinkApp);
8996
try {
9097
observerFactory.getOrCreate(flinkApp).observe(flinkApp, context);

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,14 @@ public UpdateControl<FlinkSessionJob> reconcile(
107107

108108
@Override
109109
public DeleteControl cleanup(FlinkSessionJob sessionJob, Context context) {
110-
LOG.info("Deleting FlinkSessionJob");
110+
String msg = "Cleaning up " + FlinkSessionJob.class.getSimpleName();
111+
LOG.info(msg);
112+
eventRecorder.triggerEvent(
113+
sessionJob,
114+
EventRecorder.Type.Normal,
115+
EventRecorder.Reason.Cleanup,
116+
EventRecorder.Component.Operator,
117+
msg);
111118
statusRecorder.removeCachedStatus(sessionJob);
112119
return reconciler.cleanup(sessionJob, context);
113120
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ private void updateJobStatus(
147147
eventRecorder.triggerEvent(
148148
resource,
149149
EventRecorder.Type.Normal,
150-
EventRecorder.Reason.StatusChanged,
150+
EventRecorder.Reason.JobStatusChanged,
151151
EventRecorder.Component.Job,
152152
message);
153153
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ public DeleteControl cleanupInternal(FlinkDeployment deployment, Context<?> cont
187187
if (eventRecorder.triggerEvent(
188188
deployment,
189189
EventRecorder.Type.Warning,
190-
EventRecorder.Reason.Cleanup,
190+
EventRecorder.Reason.CleanupFailed,
191191
EventRecorder.Component.Operator,
192192
error)) {
193193
LOG.warn(error);

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,9 +121,10 @@ public enum Reason {
121121
SpecChanged,
122122
Rollback,
123123
Submit,
124-
StatusChanged,
124+
JobStatusChanged,
125125
SavepointError,
126126
Cleanup,
127+
CleanupFailed,
127128
Missing,
128129
ValidationError
129130
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.flink.kubernetes.operator.listener.AuditUtils;
2828
import org.apache.flink.kubernetes.operator.listener.FlinkResourceListener;
2929
import org.apache.flink.kubernetes.operator.metrics.MetricManager;
30+
import org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState;
3031

3132
import com.fasterxml.jackson.databind.ObjectMapper;
3233
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -140,6 +141,9 @@ public void updateStatusFromCache(CR resource) {
140141
} else {
141142
// Initialize cache with current status copy
142143
statusCache.put(key, objectMapper.convertValue(resource.getStatus(), ObjectNode.class));
144+
if (ResourceLifecycleState.CREATED.equals(resource.getStatus().getLifecycleState())) {
145+
statusUpdateListener.accept(resource, resource.getStatus());
146+
}
143147
}
144148
metricManager.onUpdate(resource);
145149
}

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ public void verifyBasicReconcileLoop(FlinkVersion flinkVersion) throws Exception
117117
assertEquals(
118118
org.apache.flink.api.common.JobStatus.RECONCILING.name(),
119119
appCluster.getStatus().getJobStatus().getState());
120-
assertEquals(2, testController.getInternalStatusUpdateCount());
120+
assertEquals(3, testController.getInternalStatusUpdateCount());
121121
assertFalse(updateControl.isUpdateStatus());
122122
assertEquals(
123123
Optional.of(
@@ -140,7 +140,7 @@ public void verifyBasicReconcileLoop(FlinkVersion flinkVersion) throws Exception
140140
assertEquals(
141141
org.apache.flink.api.common.JobStatus.RECONCILING.name(),
142142
appCluster.getStatus().getJobStatus().getState());
143-
assertEquals(3, testController.getInternalStatusUpdateCount());
143+
assertEquals(4, testController.getInternalStatusUpdateCount());
144144
assertFalse(updateControl.isUpdateStatus());
145145
assertEquals(
146146
Optional.of(
@@ -154,7 +154,7 @@ public void verifyBasicReconcileLoop(FlinkVersion flinkVersion) throws Exception
154154
assertEquals(
155155
org.apache.flink.api.common.JobStatus.RUNNING.name(),
156156
appCluster.getStatus().getJobStatus().getState());
157-
assertEquals(4, testController.getInternalStatusUpdateCount());
157+
assertEquals(5, testController.getInternalStatusUpdateCount());
158158
assertFalse(updateControl.isUpdateStatus());
159159
assertEquals(
160160
Optional.of(
@@ -169,7 +169,7 @@ public void verifyBasicReconcileLoop(FlinkVersion flinkVersion) throws Exception
169169
assertEquals(
170170
org.apache.flink.api.common.JobStatus.RUNNING.name(),
171171
appCluster.getStatus().getJobStatus().getState());
172-
assertEquals(4, testController.getInternalStatusUpdateCount());
172+
assertEquals(5, testController.getInternalStatusUpdateCount());
173173
assertFalse(updateControl.isUpdateStatus());
174174
assertEquals(
175175
Optional.of(
@@ -195,7 +195,7 @@ public void verifyBasicReconcileLoop(FlinkVersion flinkVersion) throws Exception
195195
assertEquals(
196196
org.apache.flink.api.common.JobStatus.RUNNING.name(),
197197
appCluster.getStatus().getJobStatus().getState());
198-
assertEquals(5, testController.getInternalStatusUpdateCount());
198+
assertEquals(6, testController.getInternalStatusUpdateCount());
199199
assertFalse(updateControl.isUpdateStatus());
200200

201201
reconciliationStatus = appCluster.getStatus().getReconciliationStatus();
@@ -482,7 +482,7 @@ public void verifyStatelessUpgrade(FlinkVersion flinkVersion) throws Exception {
482482

483483
assertEquals(1, testController.events().size());
484484
assertEquals(
485-
EventRecorder.Reason.StatusChanged,
485+
EventRecorder.Reason.JobStatusChanged,
486486
EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
487487

488488
// Upgrade job
@@ -530,7 +530,7 @@ public void verifyStatelessUpgrade(FlinkVersion flinkVersion) throws Exception {
530530
testController.reconcile(appCluster, context);
531531
assertEquals(1, testController.events().size());
532532
assertEquals(
533-
EventRecorder.Reason.StatusChanged,
533+
EventRecorder.Reason.JobStatusChanged,
534534
EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
535535

536536
// Suspend job
@@ -592,7 +592,7 @@ public void verifyStatelessUpgrade(FlinkVersion flinkVersion) throws Exception {
592592
.collect(Collectors.toList());
593593
assertEquals(1, statusEvents.size());
594594
assertEquals(
595-
EventRecorder.Reason.StatusChanged,
595+
EventRecorder.Reason.JobStatusChanged,
596596
EventRecorder.Reason.valueOf(statusEvents.get(0).getReason()));
597597

598598
assertEquals(

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -58,36 +58,40 @@ public void testListeners() {
5858
var eventRecorder = EventRecorder.create(kubernetesClient, listeners);
5959

6060
var deployment = TestUtils.buildApplicationCluster();
61-
statusRecorder.updateStatusFromCache(deployment);
6261

63-
statusRecorder.patchAndCacheStatus(deployment);
6462
assertTrue(listener1.updates.isEmpty());
6563
assertTrue(listener2.updates.isEmpty());
6664
assertTrue(listener1.events.isEmpty());
6765
assertTrue(listener2.events.isEmpty());
6866

69-
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.ERROR);
70-
71-
statusRecorder.patchAndCacheStatus(deployment);
67+
statusRecorder.updateStatusFromCache(deployment);
68+
assertEquals(1, listener1.updates.size());
69+
statusRecorder.updateStatusFromCache(deployment);
7270
assertEquals(1, listener1.updates.size());
7371
assertEquals(deployment, listener1.updates.get(0).getFlinkResource());
7472

75-
assertEquals(1, listener2.updates.size());
76-
assertEquals(deployment, listener2.updates.get(0).getFlinkResource());
77-
assertEquals(
78-
listener1.updates.get(0).getTimestamp(), listener2.updates.get(0).getTimestamp());
73+
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.ERROR);
74+
statusRecorder.patchAndCacheStatus(deployment);
75+
assertEquals(2, listener1.updates.size());
76+
assertEquals(deployment, listener1.updates.get(1).getFlinkResource());
7977

8078
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
8179
statusRecorder.patchAndCacheStatus(deployment);
80+
assertEquals(3, listener1.updates.size());
81+
assertEquals(deployment, listener1.updates.get(2).getFlinkResource());
8282

83-
assertEquals(2, listener1.updates.size());
84-
assertEquals(deployment, listener1.updates.get(0).getFlinkResource());
85-
assertEquals(2, listener2.updates.size());
86-
assertEquals(deployment, listener2.updates.get(0).getFlinkResource());
83+
for (int i = 0; i < listener1.updates.size(); i++) {
84+
assertEquals(
85+
listener1.updates.get(i).getTimestamp(),
86+
listener2.updates.get(i).getTimestamp());
87+
assertEquals(
88+
listener1.updates.get(i).getFlinkResource(),
89+
listener2.updates.get(i).getFlinkResource());
90+
}
8791

8892
var updateContext =
8993
(FlinkResourceListener.StatusUpdateContext<FlinkDeployment, FlinkDeploymentStatus>)
90-
listener1.updates.get(1);
94+
listener1.updates.get(2);
9195
assertEquals(
9296
JobManagerDeploymentStatus.ERROR,
9397
updateContext.getPreviousStatus().getJobManagerDeploymentStatus());

0 commit comments

Comments
 (0)