Skip to content

Commit 7c0e2ab

Browse files
authored
Fix getVersion override when added new version (#526)
1 parent 4fb1cd5 commit 7c0e2ab

File tree

8 files changed

+289
-14
lines changed

8 files changed

+289
-14
lines changed

src/main/java/com/uber/cadence/activity/LocalActivityOptions.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,15 +101,19 @@ public LocalActivityOptions validateAndBuildWithDefaults() {
101101
if (retryOptions != null) {
102102
ro = new RetryOptions.Builder(retryOptions).validateBuildWithDefaults();
103103
}
104-
return new LocalActivityOptions(roundUpToSeconds(scheduleToCloseTimeout), ro, contextPropagators);
104+
return new LocalActivityOptions(
105+
roundUpToSeconds(scheduleToCloseTimeout), ro, contextPropagators);
105106
}
106107
}
107108

108109
private final Duration scheduleToCloseTimeout;
109110
private final RetryOptions retryOptions;
110111
private final List<ContextPropagator> contextPropagators;
111112

112-
private LocalActivityOptions(Duration scheduleToCloseTimeout, RetryOptions retryOptions, List<ContextPropagator> contextPropagators) {
113+
private LocalActivityOptions(
114+
Duration scheduleToCloseTimeout,
115+
RetryOptions retryOptions,
116+
List<ContextPropagator> contextPropagators) {
113117
this.scheduleToCloseTimeout = scheduleToCloseTimeout;
114118
this.retryOptions = retryOptions;
115119
this.contextPropagators = contextPropagators;

src/main/java/com/uber/cadence/internal/replay/ClockDecisionContext.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ public void accept(Exception reason) {
8888
private final DataConverter dataConverter;
8989
private final Condition taskCondition;
9090
private boolean taskCompleted = false;
91+
private final Map<String, Integer> versionMap = new HashMap<>();
9192

9293
ClockDecisionContext(
9394
DecisionsHelper decisions,
@@ -227,6 +228,8 @@ void handleMarkerRecorded(HistoryEvent event) {
227228
sideEffectResults.put(event.getEventId(), attributes.getDetails());
228229
} else if (LOCAL_ACTIVITY_MARKER_NAME.equals(name)) {
229230
handleLocalActivityMarker(attributes);
231+
} else if (VERSION_MARKER_NAME.equals(name)) {
232+
handleVersionMarker(attributes);
230233
} else if (!MUTABLE_SIDE_EFFECT_MARKER_NAME.equals(name) && !VERSION_MARKER_NAME.equals(name)) {
231234
if (log.isWarnEnabled()) {
232235
log.warn("Unexpected marker: " + event);
@@ -276,6 +279,14 @@ private void handleLocalActivityMarker(MarkerRecordedEventAttributes attributes)
276279
}
277280
}
278281

282+
private void handleVersionMarker(MarkerRecordedEventAttributes attributes) {
283+
MarkerHandler.MarkerInterface markerData =
284+
MarkerHandler.MarkerInterface.fromEventAttributes(attributes, dataConverter);
285+
String versionID = markerData.getId();
286+
int version = dataConverter.fromData(attributes.getDetails(), Integer.class, Integer.class);
287+
versionMap.put(versionID, version);
288+
}
289+
279290
int getVersion(String changeId, DataConverter converter, int minSupported, int maxSupported) {
280291
Predicate<MarkerRecordedEventAttributes> changeIdEquals =
281292
(attributes) -> {
@@ -285,6 +296,12 @@ int getVersion(String changeId, DataConverter converter, int minSupported, int m
285296
};
286297
decisions.addAllMissingVersionMarker(true, Optional.of(changeIdEquals));
287298

299+
Integer version = versionMap.get(changeId);
300+
if (version != null) {
301+
validateVersion(changeId, version, minSupported, maxSupported);
302+
return version;
303+
}
304+
288305
Optional<byte[]> result =
289306
versionHandler.handle(
290307
changeId,
@@ -299,7 +316,7 @@ int getVersion(String changeId, DataConverter converter, int minSupported, int m
299316
if (!result.isPresent()) {
300317
return WorkflowInternal.DEFAULT_VERSION;
301318
}
302-
int version = converter.fromData(result.get(), Integer.class, Integer.class);
319+
version = converter.fromData(result.get(), Integer.class, Integer.class);
303320
validateVersion(changeId, version, minSupported, maxSupported);
304321
return version;
305322
}

src/main/java/com/uber/cadence/internal/replay/DecisionsHelper.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -664,10 +664,10 @@ private void addDecision(DecisionId decisionId, DecisionStateMachine decision) {
664664
// is removed in replay.
665665
void addAllMissingVersionMarker(
666666
boolean isNextDecisionVersionMarker,
667-
Optional<Predicate<MarkerRecordedEventAttributes>> isDifferentChange) {
667+
Optional<Predicate<MarkerRecordedEventAttributes>> changeIdEquals) {
668668
boolean added;
669669
do {
670-
added = addMissingVersionMarker(isNextDecisionVersionMarker, isDifferentChange);
670+
added = addMissingVersionMarker(isNextDecisionVersionMarker, changeIdEquals);
671671
} while (added);
672672
}
673673

src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -323,10 +323,11 @@ private ExecuteActivityParameters constructExecuteActivityParameters(
323323

324324
private ExecuteLocalActivityParameters constructExecuteLocalActivityParameters(
325325
String name, LocalActivityOptions options, byte[] input, long elapsed, int attempt) {
326-
ExecuteLocalActivityParameters parameters = new ExecuteLocalActivityParameters()
327-
.withActivityType(new ActivityType().setName(name))
328-
.withInput(input)
329-
.withScheduleToCloseTimeoutSeconds(options.getScheduleToCloseTimeout().getSeconds());
326+
ExecuteLocalActivityParameters parameters =
327+
new ExecuteLocalActivityParameters()
328+
.withActivityType(new ActivityType().setName(name))
329+
.withInput(input)
330+
.withScheduleToCloseTimeoutSeconds(options.getScheduleToCloseTimeout().getSeconds());
330331

331332
RetryOptions retryOptions = options.getRetryOptions();
332333
if (retryOptions != null) {
@@ -337,8 +338,8 @@ private ExecuteLocalActivityParameters constructExecuteLocalActivityParameters(
337338
parameters.setWorkflowDomain(this.context.getDomain());
338339
parameters.setWorkflowExecution(this.context.getWorkflowExecution());
339340

340-
List<ContextPropagator> propagators = Optional.ofNullable(options.getContextPropagators())
341-
.orElse(contextPropagators);
341+
List<ContextPropagator> propagators =
342+
Optional.ofNullable(options.getContextPropagators()).orElse(contextPropagators);
342343
parameters.setContext(extractContextsAndConvertToBytes(propagators));
343344

344345
return parameters;

src/main/java/com/uber/cadence/internal/worker/LocalActivityWorker.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,9 @@ private void propagateContext(ExecuteLocalActivityParameters params) {
271271
}
272272

273273
private void restoreContext(Map<String, byte[]> context) {
274-
options.getContextPropagators()
275-
.forEach(propagator -> propagator.setCurrentContext(propagator.deserializeContext(context)));
274+
options
275+
.getContextPropagators()
276+
.forEach(
277+
propagator -> propagator.setCurrentContext(propagator.deserializeContext(context)));
276278
}
277279
}

src/test/java/com/uber/cadence/activity/LocalActivityContextPropagationTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ public class LocalActivityContextPropagationTest {
5959

6060
private final WrapperContext wrapperContext = new WrapperContext(EXPECTED_CONTEXT_NAME);
6161

62-
//let's add safe TestWorkflowEnvironment closing and make configurable propagation enabling/disabling
62+
// let's add safe TestWorkflowEnvironment closing and make configurable propagation
63+
// enabling/disabling
6364
private class TestEnvAutoCloseable implements AutoCloseable {
6465

6566
private TestWorkflowEnvironment testEnv;

src/test/java/com/uber/cadence/workflow/WorkflowTest.java

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4427,6 +4427,78 @@ public void testVersionNotSupported() {
44274427
}
44284428
}
44294429

4430+
public static class TestGetVersionAddedImpl implements TestWorkflow1 {
4431+
4432+
@Override
4433+
public String execute(String taskList) {
4434+
4435+
int versionNew = Workflow.getVersion("cid2", Workflow.DEFAULT_VERSION, 1);
4436+
assertEquals(-1, versionNew);
4437+
int version = Workflow.getVersion("cid1", Workflow.DEFAULT_VERSION, 1);
4438+
assertEquals(1, version);
4439+
4440+
TestActivities testActivities =
4441+
Workflow.newActivityStub(TestActivities.class, newActivityOptions1(taskList));
4442+
return "hello" + testActivities.activity1(1);
4443+
}
4444+
}
4445+
4446+
@Test
4447+
public void testGetVersionAdded() {
4448+
try {
4449+
WorkflowReplayer.replayWorkflowExecutionFromResource(
4450+
"testGetVersionHistory.json", TestGetVersionAddedImpl.class);
4451+
} catch (Exception e) {
4452+
e.printStackTrace();
4453+
fail();
4454+
}
4455+
}
4456+
4457+
public static class TestGetVersionRemovedImpl implements TestWorkflow1 {
4458+
4459+
@Override
4460+
public String execute(String taskList) {
4461+
// history contains cid1, but later getVersion is removed
4462+
TestActivities testActivities =
4463+
Workflow.newActivityStub(TestActivities.class, newActivityOptions1(taskList));
4464+
return "hello" + testActivities.activity1(1);
4465+
}
4466+
}
4467+
4468+
@Test
4469+
public void testGetVersionRemoved() {
4470+
try {
4471+
WorkflowReplayer.replayWorkflowExecutionFromResource(
4472+
"testGetVersionHistory.json", TestGetVersionRemovedImpl.class);
4473+
} catch (Exception e) {
4474+
e.printStackTrace();
4475+
fail();
4476+
}
4477+
}
4478+
4479+
public static class TestGetVersionRemoveAndAddImpl implements TestWorkflow1 {
4480+
4481+
@Override
4482+
public String execute(String taskList) {
4483+
int version = Workflow.getVersion("cid2", Workflow.DEFAULT_VERSION, 1);
4484+
assertEquals(-1, version);
4485+
TestActivities testActivities =
4486+
Workflow.newActivityStub(TestActivities.class, newActivityOptions1(taskList));
4487+
return "hello" + testActivities.activity1(1);
4488+
}
4489+
}
4490+
4491+
@Test
4492+
public void testGetVersionRemoveAndAdd() {
4493+
try {
4494+
WorkflowReplayer.replayWorkflowExecutionFromResource(
4495+
"testGetVersionHistory.json", TestGetVersionRemoveAndAddImpl.class);
4496+
} catch (Exception e) {
4497+
e.printStackTrace();
4498+
fail();
4499+
}
4500+
}
4501+
44304502
public interface DeterminismFailingWorkflow {
44314503

44324504
@WorkflowMethod
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
[
2+
{
3+
"eventId":1,
4+
"timestamp":1599846349049225000,
5+
"eventType":"WorkflowExecutionStarted",
6+
"version":-24,
7+
"taskId":11534336,
8+
"workflowExecutionStartedEventAttributes":{
9+
"workflowType":{
10+
"name":"TestWorkflow1::execute"
11+
},
12+
"taskList":{
13+
"name":"WorkflowTest-testGetVersionAdded[Docker Sticky OFF]-bf24970f-3800-427a-8e52-2a5c2d8dc08e"
14+
},
15+
"input":"IldvcmtmbG93VGVzdC10ZXN0R2V0VmVyc2lvbkFkZGVkW0RvY2tlciBTdGlja3kgT0ZGXS1iZjI0OTcwZi0zODAwLTQyN2EtOGU1Mi0yYTVjMmQ4ZGMwOGUi",
16+
"executionStartToCloseTimeoutSeconds":30,
17+
"taskStartToCloseTimeoutSeconds":5,
18+
"originalExecutionRunId":"249740d3-8d3c-4660-9c3c-cd61f9136db3",
19+
"identity":"",
20+
"firstExecutionRunId":"249740d3-8d3c-4660-9c3c-cd61f9136db3",
21+
"attempt":0,
22+
"firstDecisionTaskBackoffSeconds":0
23+
}
24+
},
25+
{
26+
"eventId":2,
27+
"timestamp":1599846349049238000,
28+
"eventType":"DecisionTaskScheduled",
29+
"version":-24,
30+
"taskId":11534337,
31+
"decisionTaskScheduledEventAttributes":{
32+
"taskList":{
33+
"name":"WorkflowTest-testGetVersionAdded[Docker Sticky OFF]-bf24970f-3800-427a-8e52-2a5c2d8dc08e"
34+
},
35+
"startToCloseTimeoutSeconds":5,
36+
"attempt":0
37+
}
38+
},
39+
{
40+
"eventId":3,
41+
"timestamp":1599846349090195000,
42+
"eventType":"DecisionTaskStarted",
43+
"version":-24,
44+
"taskId":11534342,
45+
"decisionTaskStartedEventAttributes":{
46+
"scheduledEventId":2,
47+
"identity":"6275@boweixu-C02V61JZHTDG",
48+
"requestId":"d85c4495-9902-4db1-a0d6-7a153ecf9278"
49+
}
50+
},
51+
{
52+
"eventId":4,
53+
"timestamp":1599846349263595000,
54+
"eventType":"DecisionTaskCompleted",
55+
"version":-24,
56+
"taskId":11534345,
57+
"decisionTaskCompletedEventAttributes":{
58+
"scheduledEventId":2,
59+
"startedEventId":3,
60+
"identity":"6275@boweixu-C02V61JZHTDG"
61+
}
62+
},
63+
{
64+
"eventId":5,
65+
"timestamp":1599846349263629000,
66+
"eventType":"MarkerRecorded",
67+
"version":-24,
68+
"taskId":11534346,
69+
"markerRecordedEventAttributes":{
70+
"markerName":"Version",
71+
"details":"MQ==",
72+
"decisionTaskCompletedEventId":4,
73+
"header":{
74+
"fields":{
75+
"MutableMarkerHeader":"eyJpZCI6ImNpZDEiLCJldmVudElkIjo1LCJhY2Nlc3NDb3VudCI6MH0="
76+
}
77+
}
78+
}
79+
},
80+
{
81+
"eventId":6,
82+
"timestamp":1599846349263639000,
83+
"eventType":"ActivityTaskScheduled",
84+
"version":-24,
85+
"taskId":11534347,
86+
"activityTaskScheduledEventAttributes":{
87+
"activityId":"0",
88+
"activityType":{
89+
"name":"customActivity1"
90+
},
91+
"taskList":{
92+
"name":"WorkflowTest-testGetVersionAdded[Docker Sticky OFF]-bf24970f-3800-427a-8e52-2a5c2d8dc08e"
93+
},
94+
"input":"MQ==",
95+
"scheduleToCloseTimeoutSeconds":5,
96+
"scheduleToStartTimeoutSeconds":5,
97+
"startToCloseTimeoutSeconds":10,
98+
"heartbeatTimeoutSeconds":5,
99+
"decisionTaskCompletedEventId":4
100+
}
101+
},
102+
{
103+
"eventId":7,
104+
"timestamp":1599846349269715000,
105+
"eventType":"ActivityTaskStarted",
106+
"version":-24,
107+
"taskId":11534351,
108+
"activityTaskStartedEventAttributes":{
109+
"scheduledEventId":6,
110+
"identity":"6275@boweixu-C02V61JZHTDG",
111+
"requestId":"2e479cb9-9b47-4765-953c-4e219f6b828a",
112+
"attempt":0,
113+
"lastFailureReason":""
114+
}
115+
},
116+
{
117+
"eventId":8,
118+
"timestamp":1599846349292416000,
119+
"eventType":"ActivityTaskCompleted",
120+
"version":-24,
121+
"taskId":11534354,
122+
"activityTaskCompletedEventAttributes":{
123+
"result":"MQ==",
124+
"scheduledEventId":6,
125+
"startedEventId":7,
126+
"identity":"6275@boweixu-C02V61JZHTDG"
127+
}
128+
},
129+
{
130+
"eventId":9,
131+
"timestamp":1599846349292422000,
132+
"eventType":"DecisionTaskScheduled",
133+
"version":-24,
134+
"taskId":11534356,
135+
"decisionTaskScheduledEventAttributes":{
136+
"taskList":{
137+
"name":"WorkflowTest-testGetVersionAdded[Docker Sticky OFF]-bf24970f-3800-427a-8e52-2a5c2d8dc08e"
138+
},
139+
"startToCloseTimeoutSeconds":5,
140+
"attempt":0
141+
}
142+
},
143+
{
144+
"eventId":10,
145+
"timestamp":1599846349295720000,
146+
"eventType":"DecisionTaskStarted",
147+
"version":-24,
148+
"taskId":11534359,
149+
"decisionTaskStartedEventAttributes":{
150+
"scheduledEventId":9,
151+
"identity":"6275@boweixu-C02V61JZHTDG",
152+
"requestId":"b275fd8b-4fd3-4201-9bc7-428c9e2c5ef7"
153+
}
154+
},
155+
{
156+
"eventId":11,
157+
"timestamp":1599846349316011000,
158+
"eventType":"DecisionTaskCompleted",
159+
"version":-24,
160+
"taskId":11534362,
161+
"decisionTaskCompletedEventAttributes":{
162+
"scheduledEventId":9,
163+
"startedEventId":10,
164+
"identity":"6275@boweixu-C02V61JZHTDG"
165+
}
166+
},
167+
{
168+
"eventId":12,
169+
"timestamp":1599846349316036000,
170+
"eventType":"WorkflowExecutionCompleted",
171+
"version":-24,
172+
"taskId":11534363,
173+
"workflowExecutionCompletedEventAttributes":{
174+
"result":"ImhlbGxvMSI=",
175+
"decisionTaskCompletedEventId":11
176+
}
177+
}
178+
]

0 commit comments

Comments
 (0)