From acc90b59e68998f2ad63eba938ac2b5c363b336b Mon Sep 17 00:00:00 2001 From: Bowei Xu Date: Mon, 14 Sep 2020 09:58:10 -0700 Subject: [PATCH 1/2] Fix getVersion override when added new version --- .../activity/LocalActivityOptions.java | 8 +- .../internal/replay/ClockDecisionContext.java | 20 +- .../internal/replay/DecisionsHelper.java | 4 +- .../internal/sync/SyncDecisionContext.java | 13 +- .../internal/worker/LocalActivityWorker.java | 6 +- .../LocalActivityContextPropagationTest.java | 3 +- .../uber/cadence/workflow/WorkflowTest.java | 72 +++++++ src/test/resources/testGetVersionHistory.json | 178 ++++++++++++++++++ 8 files changed, 290 insertions(+), 14 deletions(-) create mode 100644 src/test/resources/testGetVersionHistory.json diff --git a/src/main/java/com/uber/cadence/activity/LocalActivityOptions.java b/src/main/java/com/uber/cadence/activity/LocalActivityOptions.java index de13cc860..9534af9a2 100644 --- a/src/main/java/com/uber/cadence/activity/LocalActivityOptions.java +++ b/src/main/java/com/uber/cadence/activity/LocalActivityOptions.java @@ -101,7 +101,8 @@ public LocalActivityOptions validateAndBuildWithDefaults() { if (retryOptions != null) { ro = new RetryOptions.Builder(retryOptions).validateBuildWithDefaults(); } - return new LocalActivityOptions(roundUpToSeconds(scheduleToCloseTimeout), ro, contextPropagators); + return new LocalActivityOptions( + roundUpToSeconds(scheduleToCloseTimeout), ro, contextPropagators); } } @@ -109,7 +110,10 @@ public LocalActivityOptions validateAndBuildWithDefaults() { private final RetryOptions retryOptions; private final List contextPropagators; - private LocalActivityOptions(Duration scheduleToCloseTimeout, RetryOptions retryOptions, List contextPropagators) { + private LocalActivityOptions( + Duration scheduleToCloseTimeout, + RetryOptions retryOptions, + List contextPropagators) { this.scheduleToCloseTimeout = scheduleToCloseTimeout; this.retryOptions = retryOptions; this.contextPropagators = contextPropagators; diff --git a/src/main/java/com/uber/cadence/internal/replay/ClockDecisionContext.java b/src/main/java/com/uber/cadence/internal/replay/ClockDecisionContext.java index 118d95cc7..4de852cbb 100644 --- a/src/main/java/com/uber/cadence/internal/replay/ClockDecisionContext.java +++ b/src/main/java/com/uber/cadence/internal/replay/ClockDecisionContext.java @@ -88,6 +88,7 @@ public void accept(Exception reason) { private final DataConverter dataConverter; private final Condition taskCondition; private boolean taskCompleted = false; + private final Map versionMap = new HashMap<>(); ClockDecisionContext( DecisionsHelper decisions, @@ -227,6 +228,9 @@ void handleMarkerRecorded(HistoryEvent event) { sideEffectResults.put(event.getEventId(), attributes.getDetails()); } else if (LOCAL_ACTIVITY_MARKER_NAME.equals(name)) { handleLocalActivityMarker(attributes); + } else if (VERSION_MARKER_NAME.equals(name)) { + handleVersionMarker(attributes); + // record version } else if (!MUTABLE_SIDE_EFFECT_MARKER_NAME.equals(name) && !VERSION_MARKER_NAME.equals(name)) { if (log.isWarnEnabled()) { log.warn("Unexpected marker: " + event); @@ -276,6 +280,14 @@ private void handleLocalActivityMarker(MarkerRecordedEventAttributes attributes) } } + private void handleVersionMarker(MarkerRecordedEventAttributes attributes) { + MarkerHandler.MarkerInterface markerData = + MarkerHandler.MarkerInterface.fromEventAttributes(attributes, dataConverter); + String versionID = markerData.getId(); + int version = dataConverter.fromData(attributes.getDetails(), Integer.class, Integer.class); + versionMap.put(versionID, version); + } + int getVersion(String changeId, DataConverter converter, int minSupported, int maxSupported) { Predicate changeIdEquals = (attributes) -> { @@ -285,6 +297,12 @@ int getVersion(String changeId, DataConverter converter, int minSupported, int m }; decisions.addAllMissingVersionMarker(true, Optional.of(changeIdEquals)); + Integer version = versionMap.get(changeId); + if (version != null) { + validateVersion(changeId, version, minSupported, maxSupported); + return version; + } + Optional result = versionHandler.handle( changeId, @@ -299,7 +317,7 @@ int getVersion(String changeId, DataConverter converter, int minSupported, int m if (!result.isPresent()) { return WorkflowInternal.DEFAULT_VERSION; } - int version = converter.fromData(result.get(), Integer.class, Integer.class); + version = converter.fromData(result.get(), Integer.class, Integer.class); validateVersion(changeId, version, minSupported, maxSupported); return version; } diff --git a/src/main/java/com/uber/cadence/internal/replay/DecisionsHelper.java b/src/main/java/com/uber/cadence/internal/replay/DecisionsHelper.java index 6ed08a08f..b02a47b05 100644 --- a/src/main/java/com/uber/cadence/internal/replay/DecisionsHelper.java +++ b/src/main/java/com/uber/cadence/internal/replay/DecisionsHelper.java @@ -664,10 +664,10 @@ private void addDecision(DecisionId decisionId, DecisionStateMachine decision) { // is removed in replay. void addAllMissingVersionMarker( boolean isNextDecisionVersionMarker, - Optional> isDifferentChange) { + Optional> changeIdEquals) { boolean added; do { - added = addMissingVersionMarker(isNextDecisionVersionMarker, isDifferentChange); + added = addMissingVersionMarker(isNextDecisionVersionMarker, changeIdEquals); } while (added); } diff --git a/src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java b/src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java index 5cf6d7b01..23955ce69 100644 --- a/src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java +++ b/src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java @@ -323,10 +323,11 @@ private ExecuteActivityParameters constructExecuteActivityParameters( private ExecuteLocalActivityParameters constructExecuteLocalActivityParameters( String name, LocalActivityOptions options, byte[] input, long elapsed, int attempt) { - ExecuteLocalActivityParameters parameters = new ExecuteLocalActivityParameters() - .withActivityType(new ActivityType().setName(name)) - .withInput(input) - .withScheduleToCloseTimeoutSeconds(options.getScheduleToCloseTimeout().getSeconds()); + ExecuteLocalActivityParameters parameters = + new ExecuteLocalActivityParameters() + .withActivityType(new ActivityType().setName(name)) + .withInput(input) + .withScheduleToCloseTimeoutSeconds(options.getScheduleToCloseTimeout().getSeconds()); RetryOptions retryOptions = options.getRetryOptions(); if (retryOptions != null) { @@ -337,8 +338,8 @@ private ExecuteLocalActivityParameters constructExecuteLocalActivityParameters( parameters.setWorkflowDomain(this.context.getDomain()); parameters.setWorkflowExecution(this.context.getWorkflowExecution()); - List propagators = Optional.ofNullable(options.getContextPropagators()) - .orElse(contextPropagators); + List propagators = + Optional.ofNullable(options.getContextPropagators()).orElse(contextPropagators); parameters.setContext(extractContextsAndConvertToBytes(propagators)); return parameters; diff --git a/src/main/java/com/uber/cadence/internal/worker/LocalActivityWorker.java b/src/main/java/com/uber/cadence/internal/worker/LocalActivityWorker.java index 85f60ccb9..de6ce8da3 100644 --- a/src/main/java/com/uber/cadence/internal/worker/LocalActivityWorker.java +++ b/src/main/java/com/uber/cadence/internal/worker/LocalActivityWorker.java @@ -271,7 +271,9 @@ private void propagateContext(ExecuteLocalActivityParameters params) { } private void restoreContext(Map context) { - options.getContextPropagators() - .forEach(propagator -> propagator.setCurrentContext(propagator.deserializeContext(context))); + options + .getContextPropagators() + .forEach( + propagator -> propagator.setCurrentContext(propagator.deserializeContext(context))); } } diff --git a/src/test/java/com/uber/cadence/activity/LocalActivityContextPropagationTest.java b/src/test/java/com/uber/cadence/activity/LocalActivityContextPropagationTest.java index 12c3197c6..62fa3723c 100644 --- a/src/test/java/com/uber/cadence/activity/LocalActivityContextPropagationTest.java +++ b/src/test/java/com/uber/cadence/activity/LocalActivityContextPropagationTest.java @@ -59,7 +59,8 @@ public class LocalActivityContextPropagationTest { private final WrapperContext wrapperContext = new WrapperContext(EXPECTED_CONTEXT_NAME); - //let's add safe TestWorkflowEnvironment closing and make configurable propagation enabling/disabling + // let's add safe TestWorkflowEnvironment closing and make configurable propagation + // enabling/disabling private class TestEnvAutoCloseable implements AutoCloseable { private TestWorkflowEnvironment testEnv; diff --git a/src/test/java/com/uber/cadence/workflow/WorkflowTest.java b/src/test/java/com/uber/cadence/workflow/WorkflowTest.java index b52af98d8..c426d0d42 100644 --- a/src/test/java/com/uber/cadence/workflow/WorkflowTest.java +++ b/src/test/java/com/uber/cadence/workflow/WorkflowTest.java @@ -4427,6 +4427,78 @@ public void testVersionNotSupported() { } } + public static class TestGetVersionAddedImpl implements TestWorkflow1 { + + @Override + public String execute(String taskList) { + + int versionNew = Workflow.getVersion("cid2", Workflow.DEFAULT_VERSION, 1); + assertEquals(-1, versionNew); + int version = Workflow.getVersion("cid1", Workflow.DEFAULT_VERSION, 1); + assertEquals(1, version); + + TestActivities testActivities = + Workflow.newActivityStub(TestActivities.class, newActivityOptions1(taskList)); + return "hello" + testActivities.activity1(1); + } + } + + @Test + public void testGetVersionAdded() { + try { + WorkflowReplayer.replayWorkflowExecutionFromResource( + "testGetVersionHistory.json", TestGetVersionAddedImpl.class); + } catch (Exception e) { + e.printStackTrace(); + fail(); + } + } + + public static class TestGetVersionRemovedImpl implements TestWorkflow1 { + + @Override + public String execute(String taskList) { + // history contains cid1, but later getVersion is removed + TestActivities testActivities = + Workflow.newActivityStub(TestActivities.class, newActivityOptions1(taskList)); + return "hello" + testActivities.activity1(1); + } + } + + @Test + public void testGetVersionRemoved() { + try { + WorkflowReplayer.replayWorkflowExecutionFromResource( + "testGetVersionHistory.json", TestGetVersionRemovedImpl.class); + } catch (Exception e) { + e.printStackTrace(); + fail(); + } + } + + public static class TestGetVersionRemoveAndAddImpl implements TestWorkflow1 { + + @Override + public String execute(String taskList) { + int version = Workflow.getVersion("cid2", Workflow.DEFAULT_VERSION, 1); + assertEquals(-1, version); + TestActivities testActivities = + Workflow.newActivityStub(TestActivities.class, newActivityOptions1(taskList)); + return "hello" + testActivities.activity1(1); + } + } + + @Test + public void testGetVersionRemoveAndAdd() { + try { + WorkflowReplayer.replayWorkflowExecutionFromResource( + "testGetVersionHistory.json", TestGetVersionRemoveAndAddImpl.class); + } catch (Exception e) { + e.printStackTrace(); + fail(); + } + } + public interface DeterminismFailingWorkflow { @WorkflowMethod diff --git a/src/test/resources/testGetVersionHistory.json b/src/test/resources/testGetVersionHistory.json new file mode 100644 index 000000000..650831ec7 --- /dev/null +++ b/src/test/resources/testGetVersionHistory.json @@ -0,0 +1,178 @@ +[ + { + "eventId":1, + "timestamp":1599846349049225000, + "eventType":"WorkflowExecutionStarted", + "version":-24, + "taskId":11534336, + "workflowExecutionStartedEventAttributes":{ + "workflowType":{ + "name":"TestWorkflow1::execute" + }, + "taskList":{ + "name":"WorkflowTest-testGetVersionAdded[Docker Sticky OFF]-bf24970f-3800-427a-8e52-2a5c2d8dc08e" + }, + "input":"IldvcmtmbG93VGVzdC10ZXN0R2V0VmVyc2lvbkFkZGVkW0RvY2tlciBTdGlja3kgT0ZGXS1iZjI0OTcwZi0zODAwLTQyN2EtOGU1Mi0yYTVjMmQ4ZGMwOGUi", + "executionStartToCloseTimeoutSeconds":30, + "taskStartToCloseTimeoutSeconds":5, + "originalExecutionRunId":"249740d3-8d3c-4660-9c3c-cd61f9136db3", + "identity":"", + "firstExecutionRunId":"249740d3-8d3c-4660-9c3c-cd61f9136db3", + "attempt":0, + "firstDecisionTaskBackoffSeconds":0 + } + }, + { + "eventId":2, + "timestamp":1599846349049238000, + "eventType":"DecisionTaskScheduled", + "version":-24, + "taskId":11534337, + "decisionTaskScheduledEventAttributes":{ + "taskList":{ + "name":"WorkflowTest-testGetVersionAdded[Docker Sticky OFF]-bf24970f-3800-427a-8e52-2a5c2d8dc08e" + }, + "startToCloseTimeoutSeconds":5, + "attempt":0 + } + }, + { + "eventId":3, + "timestamp":1599846349090195000, + "eventType":"DecisionTaskStarted", + "version":-24, + "taskId":11534342, + "decisionTaskStartedEventAttributes":{ + "scheduledEventId":2, + "identity":"6275@boweixu-C02V61JZHTDG", + "requestId":"d85c4495-9902-4db1-a0d6-7a153ecf9278" + } + }, + { + "eventId":4, + "timestamp":1599846349263595000, + "eventType":"DecisionTaskCompleted", + "version":-24, + "taskId":11534345, + "decisionTaskCompletedEventAttributes":{ + "scheduledEventId":2, + "startedEventId":3, + "identity":"6275@boweixu-C02V61JZHTDG" + } + }, + { + "eventId":5, + "timestamp":1599846349263629000, + "eventType":"MarkerRecorded", + "version":-24, + "taskId":11534346, + "markerRecordedEventAttributes":{ + "markerName":"Version", + "details":"MQ==", + "decisionTaskCompletedEventId":4, + "header":{ + "fields":{ + "MutableMarkerHeader":"eyJpZCI6ImNpZDEiLCJldmVudElkIjo1LCJhY2Nlc3NDb3VudCI6MH0=" + } + } + } + }, + { + "eventId":6, + "timestamp":1599846349263639000, + "eventType":"ActivityTaskScheduled", + "version":-24, + "taskId":11534347, + "activityTaskScheduledEventAttributes":{ + "activityId":"0", + "activityType":{ + "name":"customActivity1" + }, + "taskList":{ + "name":"WorkflowTest-testGetVersionAdded[Docker Sticky OFF]-bf24970f-3800-427a-8e52-2a5c2d8dc08e" + }, + "input":"MQ==", + "scheduleToCloseTimeoutSeconds":5, + "scheduleToStartTimeoutSeconds":5, + "startToCloseTimeoutSeconds":10, + "heartbeatTimeoutSeconds":5, + "decisionTaskCompletedEventId":4 + } + }, + { + "eventId":7, + "timestamp":1599846349269715000, + "eventType":"ActivityTaskStarted", + "version":-24, + "taskId":11534351, + "activityTaskStartedEventAttributes":{ + "scheduledEventId":6, + "identity":"6275@boweixu-C02V61JZHTDG", + "requestId":"2e479cb9-9b47-4765-953c-4e219f6b828a", + "attempt":0, + "lastFailureReason":"" + } + }, + { + "eventId":8, + "timestamp":1599846349292416000, + "eventType":"ActivityTaskCompleted", + "version":-24, + "taskId":11534354, + "activityTaskCompletedEventAttributes":{ + "result":"MQ==", + "scheduledEventId":6, + "startedEventId":7, + "identity":"6275@boweixu-C02V61JZHTDG" + } + }, + { + "eventId":9, + "timestamp":1599846349292422000, + "eventType":"DecisionTaskScheduled", + "version":-24, + "taskId":11534356, + "decisionTaskScheduledEventAttributes":{ + "taskList":{ + "name":"WorkflowTest-testGetVersionAdded[Docker Sticky OFF]-bf24970f-3800-427a-8e52-2a5c2d8dc08e" + }, + "startToCloseTimeoutSeconds":5, + "attempt":0 + } + }, + { + "eventId":10, + "timestamp":1599846349295720000, + "eventType":"DecisionTaskStarted", + "version":-24, + "taskId":11534359, + "decisionTaskStartedEventAttributes":{ + "scheduledEventId":9, + "identity":"6275@boweixu-C02V61JZHTDG", + "requestId":"b275fd8b-4fd3-4201-9bc7-428c9e2c5ef7" + } + }, + { + "eventId":11, + "timestamp":1599846349316011000, + "eventType":"DecisionTaskCompleted", + "version":-24, + "taskId":11534362, + "decisionTaskCompletedEventAttributes":{ + "scheduledEventId":9, + "startedEventId":10, + "identity":"6275@boweixu-C02V61JZHTDG" + } + }, + { + "eventId":12, + "timestamp":1599846349316036000, + "eventType":"WorkflowExecutionCompleted", + "version":-24, + "taskId":11534363, + "workflowExecutionCompletedEventAttributes":{ + "result":"ImhlbGxvMSI=", + "decisionTaskCompletedEventId":11 + } + } +] \ No newline at end of file From 7490c19aaee75b45cf47a750ffda6f85b53fbcbe Mon Sep 17 00:00:00 2001 From: Bowei Xu Date: Mon, 14 Sep 2020 10:01:45 -0700 Subject: [PATCH 2/2] tiny --- .../com/uber/cadence/internal/replay/ClockDecisionContext.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/java/com/uber/cadence/internal/replay/ClockDecisionContext.java b/src/main/java/com/uber/cadence/internal/replay/ClockDecisionContext.java index 4de852cbb..d24b99a1e 100644 --- a/src/main/java/com/uber/cadence/internal/replay/ClockDecisionContext.java +++ b/src/main/java/com/uber/cadence/internal/replay/ClockDecisionContext.java @@ -230,7 +230,6 @@ void handleMarkerRecorded(HistoryEvent event) { handleLocalActivityMarker(attributes); } else if (VERSION_MARKER_NAME.equals(name)) { handleVersionMarker(attributes); - // record version } else if (!MUTABLE_SIDE_EFFECT_MARKER_NAME.equals(name) && !VERSION_MARKER_NAME.equals(name)) { if (log.isWarnEnabled()) { log.warn("Unexpected marker: " + event);