From f59dd9fc1abbf9e58cdae415a455070370bad202 Mon Sep 17 00:00:00 2001 From: Bowei Xu Date: Fri, 25 Sep 2020 02:03:02 -0700 Subject: [PATCH 1/3] Fix replay error for async retry --- .../internal/replay/ClockDecisionContext.java | 12 +- .../uber/cadence/workflow/WorkflowTest.java | 83 ++++++ .../testGetVersionWithRetryHistory.json | 247 ++++++++++++++++++ 3 files changed, 336 insertions(+), 6 deletions(-) create mode 100644 src/test/resources/testGetVersionWithRetryHistory.json diff --git a/src/main/java/com/uber/cadence/internal/replay/ClockDecisionContext.java b/src/main/java/com/uber/cadence/internal/replay/ClockDecisionContext.java index d24b99a1e..144e3fce4 100644 --- a/src/main/java/com/uber/cadence/internal/replay/ClockDecisionContext.java +++ b/src/main/java/com/uber/cadence/internal/replay/ClockDecisionContext.java @@ -296,12 +296,6 @@ int getVersion(String changeId, DataConverter converter, int minSupported, int m }; decisions.addAllMissingVersionMarker(true, Optional.of(changeIdEquals)); - Integer version = versionMap.get(changeId); - if (version != null) { - validateVersion(changeId, version, minSupported, maxSupported); - return version; - } - Optional result = versionHandler.handle( changeId, @@ -313,6 +307,12 @@ int getVersion(String changeId, DataConverter converter, int minSupported, int m return Optional.of(converter.toData(maxSupported)); }); + Integer version = versionMap.get(changeId); + if (version != null) { + validateVersion(changeId, version, minSupported, maxSupported); + return version; + } + if (!result.isPresent()) { return WorkflowInternal.DEFAULT_VERSION; } diff --git a/src/test/java/com/uber/cadence/workflow/WorkflowTest.java b/src/test/java/com/uber/cadence/workflow/WorkflowTest.java index 2b76badc2..f05c7d8f1 100644 --- a/src/test/java/com/uber/cadence/workflow/WorkflowTest.java +++ b/src/test/java/com/uber/cadence/workflow/WorkflowTest.java @@ -25,6 +25,8 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import com.google.common.base.Strings; import com.google.common.util.concurrent.UncheckedExecutionException; @@ -411,6 +413,18 @@ public interface TestWorkflow2 { List getTrace(); } + public interface TestWorkflow3 { + + @WorkflowMethod + String execute(String taskList); + + @SignalMethod(name = "testSignal") + void signal1(String arg); + + @QueryMethod(name = "getState") + String getState(); + } + public static class TestSyncWorkflowImpl implements TestWorkflow1 { @Override @@ -5846,4 +5860,73 @@ public void upsertSearchAttributes(Map searchAttributes) { next.upsertSearchAttributes(searchAttributes); } } + + public static class TestGetVersionWorkflowRetryImpl implements TestWorkflow3 { + private String result = ""; + + @Override + public String execute(String taskList) { + int version = Workflow.getVersion("test_change", Workflow.DEFAULT_VERSION, 1); + int act = 0; + if (version == 1) { + ActivityOptions options = + new ActivityOptions.Builder() + .setTaskList(taskList) + .setHeartbeatTimeout(Duration.ofSeconds(5)) + .setScheduleToCloseTimeout(Duration.ofSeconds(5)) + .setScheduleToStartTimeout(Duration.ofSeconds(5)) + .setStartToCloseTimeout(Duration.ofSeconds(10)) + .setRetryOptions( + new RetryOptions.Builder() + .setMaximumAttempts(3) + .setInitialInterval(Duration.ofSeconds(1)) + .build()) + .build(); + + TestActivities testActivities = Workflow.newActivityStub(TestActivities.class, options); + act = testActivities.activity1(1); + } + + result += "activity" + act; + return result; + } + + @Override + public void signal1(String arg) { + Workflow.sleep(1000); + } + + @Override + public String getState() { + return result; + } + } + + @Test + public void testGetVersionRetry() throws ExecutionException, InterruptedException { + TestActivities activity = mock(TestActivities.class); + when(activity.activity1(1)).thenReturn(1); + worker.registerActivitiesImplementations(activity); + + startWorkerFor(TestGetVersionWorkflowRetryImpl.class); + TestWorkflow3 workflowStub = + workflowClient.newWorkflowStub( + TestWorkflow3.class, newWorkflowOptionsBuilder(taskList).build()); + CompletableFuture result = WorkflowClient.execute(workflowStub::execute, taskList); + workflowStub.signal1("test"); + assertEquals("activity1", result.get()); + + // test replay + assertEquals("activity1", workflowStub.getState()); + } + + @Test + public void testGetVersionWithRetryReplay() throws Exception { + // Avoid executing 4 times + Assume.assumeFalse("skipping for docker tests", useExternalService); + Assume.assumeFalse("skipping for sticky off", disableStickyExecution); + + WorkflowReplayer.replayWorkflowExecutionFromResource( + "testGetVersionWithRetryHistory.json", TestGetVersionWorkflowRetryImpl.class); + } } diff --git a/src/test/resources/testGetVersionWithRetryHistory.json b/src/test/resources/testGetVersionWithRetryHistory.json new file mode 100644 index 000000000..7ac7af29b --- /dev/null +++ b/src/test/resources/testGetVersionWithRetryHistory.json @@ -0,0 +1,247 @@ +[ + { + "eventId":1, + "timestamp":1600908280230646000, + "eventType":"WorkflowExecutionStarted", + "version":-24, + "taskId":12582912, + "workflowExecutionStartedEventAttributes":{ + "workflowType":{ + "name":"TestWorkflow3::execute" + }, + "taskList":{ + "name":"WorkflowTest-testGetVersionNew[Docker Sticky ON]-017b6e2b-89c8-41b6-8c08-7bd8e9a4b31f" + }, + "input":"IldvcmtmbG93VGVzdC10ZXN0R2V0VmVyc2lvbk5ld1tEb2NrZXIgU3RpY2t5IE9OXS0wMTdiNmUyYi04OWM4LTQxYjYtOGMwOC03YmQ4ZTlhNGIzMWYi", + "executionStartToCloseTimeoutSeconds":30, + "taskStartToCloseTimeoutSeconds":5, + "originalExecutionRunId":"fab6e1da-7f25-472d-95ca-f9c5f59e6181", + "identity":"", + "firstExecutionRunId":"fab6e1da-7f25-472d-95ca-f9c5f59e6181", + "attempt":0, + "firstDecisionTaskBackoffSeconds":0 + } + }, + { + "eventId":2, + "timestamp":1600908280230662000, + "eventType":"DecisionTaskScheduled", + "version":-24, + "taskId":12582913, + "decisionTaskScheduledEventAttributes":{ + "taskList":{ + "name":"WorkflowTest-testGetVersionNew[Docker Sticky ON]-017b6e2b-89c8-41b6-8c08-7bd8e9a4b31f" + }, + "startToCloseTimeoutSeconds":5, + "attempt":0 + } + }, + { + "eventId":3, + "timestamp":1600908280245759000, + "eventType":"DecisionTaskStarted", + "version":-24, + "taskId":12582918, + "decisionTaskStartedEventAttributes":{ + "scheduledEventId":2, + "identity":"13730@boweixu-C02V61JZHTDG", + "requestId":"23fed7ea-8bda-4b40-aacf-f674cf466ea3" + } + }, + { + "eventId":4, + "timestamp":1600908280579150000, + "eventType":"DecisionTaskCompleted", + "version":-24, + "taskId":12582921, + "decisionTaskCompletedEventAttributes":{ + "scheduledEventId":2, + "startedEventId":3, + "identity":"13730@boweixu-C02V61JZHTDG" + } + }, + { + "eventId":5, + "timestamp":1600908280579167000, + "eventType":"MarkerRecorded", + "version":-24, + "taskId":12582922, + "markerRecordedEventAttributes":{ + "markerName":"Version", + "details":"MQ==", + "decisionTaskCompletedEventId":4, + "header":{ + "fields":{ + "MutableMarkerHeader":"eyJpZCI6InRlc3RfY2hhbmdlIiwiZXZlbnRJZCI6NSwiYWNjZXNzQ291bnQiOjB9" + } + } + } + }, + { + "eventId":6, + "timestamp":1600908280579177000, + "eventType":"ActivityTaskScheduled", + "version":-24, + "taskId":12582923, + "activityTaskScheduledEventAttributes":{ + "activityId":"0", + "activityType":{ + "name":"customActivity1" + }, + "taskList":{ + "name":"WorkflowTest-testGetVersionNew[Docker Sticky ON]-017b6e2b-89c8-41b6-8c08-7bd8e9a4b31f" + }, + "input":"MQ==", + "scheduleToCloseTimeoutSeconds":30, + "scheduleToStartTimeoutSeconds":30, + "startToCloseTimeoutSeconds":10, + "heartbeatTimeoutSeconds":5, + "decisionTaskCompletedEventId":4, + "retryPolicy":{ + "initialIntervalInSeconds":1, + "backoffCoefficient":2, + "maximumIntervalInSeconds":0, + "maximumAttempts":3, + "expirationIntervalInSeconds":0 + } + } + }, + { + "eventId":7, + "timestamp":1600908280391225000, + "eventType":"WorkflowExecutionSignaled", + "version":-24, + "taskId":12582924, + "workflowExecutionSignaledEventAttributes":{ + "signalName":"testSignal", + "input":"InRlc3Qi", + "identity":"" + } + }, + { + "eventId":8, + "timestamp":1600908280579193000, + "eventType":"DecisionTaskScheduled", + "version":-24, + "taskId":12582929, + "decisionTaskScheduledEventAttributes":{ + "taskList":{ + "name":"boweixu-C02V61JZHTDG:aef436a5-3ef4-4ccb-b7b9-5fbedca492be" + }, + "startToCloseTimeoutSeconds":5, + "attempt":0 + } + }, + { + "eventId":9, + "timestamp":1600908280588213000, + "eventType":"DecisionTaskStarted", + "version":-24, + "taskId":12582936, + "decisionTaskStartedEventAttributes":{ + "scheduledEventId":8, + "identity":"aef436a5-3ef4-4ccb-b7b9-5fbedca492be", + "requestId":"a0ee4248-4230-4561-ac2f-912f731b39a1" + } + }, + { + "eventId":10, + "timestamp":1600908280624228000, + "eventType":"DecisionTaskCompleted", + "version":-24, + "taskId":12582939, + "decisionTaskCompletedEventAttributes":{ + "scheduledEventId":8, + "startedEventId":9, + "identity":"13730@boweixu-C02V61JZHTDG" + } + }, + { + "eventId":11, + "timestamp":1600908280624277000, + "eventType":"TimerStarted", + "version":-24, + "taskId":12582940, + "timerStartedEventAttributes":{ + "timerId":"1", + "startToFireTimeoutSeconds":1, + "decisionTaskCompletedEventId":10 + } + }, + { + "eventId":12, + "timestamp":1600908280584857000, + "eventType":"ActivityTaskStarted", + "version":-24, + "taskId":12582941, + "activityTaskStartedEventAttributes":{ + "scheduledEventId":6, + "identity":"13730@boweixu-C02V61JZHTDG", + "requestId":"8e553cb5-a137-4a23-a26a-6c957582a227", + "attempt":0, + "lastFailureReason":"" + } + }, + { + "eventId":13, + "timestamp":1600908280614589000, + "eventType":"ActivityTaskCompleted", + "version":-24, + "taskId":12582942, + "activityTaskCompletedEventAttributes":{ + "result":"MQ==", + "scheduledEventId":6, + "startedEventId":12, + "identity":"13730@boweixu-C02V61JZHTDG" + } + }, + { + "eventId":14, + "timestamp":1600908280624342000, + "eventType":"DecisionTaskScheduled", + "version":-24, + "taskId":12582947, + "decisionTaskScheduledEventAttributes":{ + "taskList":{ + "name":"boweixu-C02V61JZHTDG:aef436a5-3ef4-4ccb-b7b9-5fbedca492be" + }, + "startToCloseTimeoutSeconds":5, + "attempt":0 + } + }, + { + "eventId":15, + "timestamp":1600908280633848000, + "eventType":"DecisionTaskStarted", + "version":-24, + "taskId":12582952, + "decisionTaskStartedEventAttributes":{ + "scheduledEventId":14, + "identity":"aef436a5-3ef4-4ccb-b7b9-5fbedca492be", + "requestId":"aa7d5ff5-4566-485b-a325-4b9b788f13e5" + } + }, + { + "eventId":16, + "timestamp":1600908280661966000, + "eventType":"DecisionTaskCompleted", + "version":-24, + "taskId":12582955, + "decisionTaskCompletedEventAttributes":{ + "scheduledEventId":14, + "startedEventId":15, + "identity":"13730@boweixu-C02V61JZHTDG" + } + }, + { + "eventId":17, + "timestamp":1600908280661988000, + "eventType":"WorkflowExecutionCompleted", + "version":-24, + "taskId":12582956, + "workflowExecutionCompletedEventAttributes":{ + "result":"ImFjdGl2aXR5MSI=", + "decisionTaskCompletedEventId":16 + } + } +] \ No newline at end of file From f9871f97972670e7669be66fcb736a764c90a50f Mon Sep 17 00:00:00 2001 From: Bowei Xu Date: Fri, 25 Sep 2020 12:39:46 -0700 Subject: [PATCH 2/3] Add more tests --- .../uber/cadence/workflow/WorkflowTest.java | 43 +++++++++++-------- 1 file changed, 25 insertions(+), 18 deletions(-) diff --git a/src/test/java/com/uber/cadence/workflow/WorkflowTest.java b/src/test/java/com/uber/cadence/workflow/WorkflowTest.java index f05c7d8f1..5cd630465 100644 --- a/src/test/java/com/uber/cadence/workflow/WorkflowTest.java +++ b/src/test/java/com/uber/cadence/workflow/WorkflowTest.java @@ -425,6 +425,15 @@ public interface TestWorkflow3 { String getState(); } + public interface TestWorkflowQuery { + + @WorkflowMethod() + String execute(String taskList); + + @QueryMethod() + String query(); + } + public static class TestSyncWorkflowImpl implements TestWorkflow1 { @Override @@ -4348,13 +4357,13 @@ public void testGetVersion2() { static CompletableFuture executionStarted = new CompletableFuture<>(); - public static class TestGetVersionWithoutDecisionEventWorkflowImpl - implements TestWorkflowSignaled { + public static class TestGetVersionWithoutDecisionEventWorkflowImpl implements TestWorkflow3 { CompletablePromise signalReceived = Workflow.newPromise(); + String result = ""; @Override - public String execute() { + public String execute(String taskList) { try { if (!getVersionExecuted.contains("getVersionWithoutDecisionEvent")) { // Execute getVersion in non-replay mode. @@ -4367,10 +4376,11 @@ public String execute() { int version = Workflow.getVersion("test_change", Workflow.DEFAULT_VERSION, 1); if (version == Workflow.DEFAULT_VERSION) { signalReceived.get(); - return "result 1"; + result = "result 1"; } else { - return "result 2"; + result = "result 2"; } + return result; } } catch (Exception e) { throw new RuntimeException("failed to get from signal"); @@ -4383,6 +4393,11 @@ public String execute() { public void signal1(String arg) { signalReceived.complete(true); } + + @Override + public String getState() { + return result; + } } @Test @@ -4391,14 +4406,15 @@ public void testGetVersionWithoutDecisionEvent() throws Exception { executionStarted = new CompletableFuture<>(); getVersionExecuted.remove("getVersionWithoutDecisionEvent"); startWorkerFor(TestGetVersionWithoutDecisionEventWorkflowImpl.class); - TestWorkflowSignaled workflowStub = + TestWorkflow3 workflowStub = workflowClient.newWorkflowStub( - TestWorkflowSignaled.class, newWorkflowOptionsBuilder(taskList).build()); - WorkflowClient.start(workflowStub::execute); + TestWorkflow3.class, newWorkflowOptionsBuilder(taskList).build()); + WorkflowClient.start(workflowStub::execute, taskList); executionStarted.get(); workflowStub.signal1("test signal"); - String result = workflowStub.execute(); + String result = workflowStub.execute(taskList); assertEquals("result 1", result); + assertEquals("result 1", workflowStub.getState()); } // The following test covers the scenario where getVersion call is removed before a @@ -5176,15 +5192,6 @@ public void testParallelLocalActivityExecutionWorkflow() { result); } - public interface TestWorkflowQuery { - - @WorkflowMethod() - String execute(String taskList); - - @QueryMethod() - String query(); - } - public static final class TestLocalActivityAndQueryWorkflow implements TestWorkflowQuery { String message = "initial value"; From 5f7c229c2388d0340abc25227caa8e24bfc5f7e6 Mon Sep 17 00:00:00 2001 From: Bowei Xu Date: Fri, 25 Sep 2020 16:33:24 -0700 Subject: [PATCH 3/3] Improve some tests --- .../uber/cadence/workflow/WorkflowTest.java | 37 ++++++++++++++----- 1 file changed, 28 insertions(+), 9 deletions(-) diff --git a/src/test/java/com/uber/cadence/workflow/WorkflowTest.java b/src/test/java/com/uber/cadence/workflow/WorkflowTest.java index 5cd630465..5c1c94a5e 100644 --- a/src/test/java/com/uber/cadence/workflow/WorkflowTest.java +++ b/src/test/java/com/uber/cadence/workflow/WorkflowTest.java @@ -4419,13 +4419,13 @@ public void testGetVersionWithoutDecisionEvent() throws Exception { // The following test covers the scenario where getVersion call is removed before a // non-version-marker decision. - public static class TestGetVersionRemovedInReplayWorkflowImpl implements TestWorkflow1 { + public static class TestGetVersionRemovedInReplayWorkflowImpl implements TestWorkflowQuery { + String result = ""; @Override public String execute(String taskList) { TestActivities testActivities = Workflow.newActivityStub(TestActivities.class, newActivityOptions1(taskList)); - String result; // Test removing a version check in replay code. if (!getVersionExecuted.contains(taskList)) { int version = Workflow.getVersion("test_change", Workflow.DEFAULT_VERSION, 1); @@ -4442,25 +4442,33 @@ public String execute(String taskList) { result += testActivities.activity(); return result; } + + @Override + public String query() { + return result; + } } @Test public void testGetVersionRemovedInReplay() { startWorkerFor(TestGetVersionRemovedInReplayWorkflowImpl.class); - TestWorkflow1 workflowStub = + TestWorkflowQuery workflowStub = workflowClient.newWorkflowStub( - TestWorkflow1.class, newWorkflowOptionsBuilder(taskList).build()); + TestWorkflowQuery.class, newWorkflowOptionsBuilder(taskList).build()); String result = workflowStub.execute(taskList); assertEquals("activity22activity", result); tracer.setExpected( + "registerQuery TestWorkflowQuery::query", "getVersion", "executeActivity TestActivities::activity2", "executeActivity TestActivities::activity"); + assertEquals("activity22activity", workflowStub.query()); } // The following test covers the scenario where getVersion call is removed before another // version-marker decision. - public static class TestGetVersionRemovedInReplay2WorkflowImpl implements TestWorkflow1 { + public static class TestGetVersionRemovedInReplay2WorkflowImpl implements TestWorkflowQuery { + String result = ""; @Override public String execute(String taskList) { @@ -4475,19 +4483,30 @@ public String execute(String taskList) { Workflow.getVersion("test_change_2", Workflow.DEFAULT_VERSION, 2); } - return testActivities.activity(); + result = testActivities.activity(); + return result; + } + + @Override + public String query() { + return result; } } @Test public void testGetVersionRemovedInReplay2() { startWorkerFor(TestGetVersionRemovedInReplay2WorkflowImpl.class); - TestWorkflow1 workflowStub = + TestWorkflowQuery workflowStub = workflowClient.newWorkflowStub( - TestWorkflow1.class, newWorkflowOptionsBuilder(taskList).build()); + TestWorkflowQuery.class, newWorkflowOptionsBuilder(taskList).build()); String result = workflowStub.execute(taskList); assertEquals("activity", result); - tracer.setExpected("getVersion", "getVersion", "executeActivity TestActivities::activity"); + tracer.setExpected( + "registerQuery TestWorkflowQuery::query", + "getVersion", + "getVersion", + "executeActivity TestActivities::activity"); + assertEquals("activity", workflowStub.query()); } public static class TestVersionNotSupportedWorkflowImpl implements TestWorkflow1 {