Skip to content

Fix replay error when querying workflow that contains activity retry #532

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -296,12 +296,6 @@ int getVersion(String changeId, DataConverter converter, int minSupported, int m
};
decisions.addAllMissingVersionMarker(true, Optional.of(changeIdEquals));

Integer version = versionMap.get(changeId);
if (version != null) {
validateVersion(changeId, version, minSupported, maxSupported);
return version;
}

Optional<byte[]> result =
versionHandler.handle(
changeId,
Expand All @@ -313,6 +307,12 @@ int getVersion(String changeId, DataConverter converter, int minSupported, int m
return Optional.of(converter.toData(maxSupported));
});

Integer version = versionMap.get(changeId);
if (version != null) {
validateVersion(changeId, version, minSupported, maxSupported);
return version;
}

if (!result.isPresent()) {
return WorkflowInternal.DEFAULT_VERSION;
}
Expand Down
163 changes: 136 additions & 27 deletions src/test/java/com/uber/cadence/workflow/WorkflowTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.google.common.base.Strings;
import com.google.common.util.concurrent.UncheckedExecutionException;
Expand Down Expand Up @@ -411,6 +413,27 @@ public interface TestWorkflow2 {
List<String> getTrace();
}

public interface TestWorkflow3 {

@WorkflowMethod
String execute(String taskList);

@SignalMethod(name = "testSignal")
void signal1(String arg);

@QueryMethod(name = "getState")
String getState();
}

public interface TestWorkflowQuery {

@WorkflowMethod()
String execute(String taskList);

@QueryMethod()
String query();
}

public static class TestSyncWorkflowImpl implements TestWorkflow1 {

@Override
Expand Down Expand Up @@ -4334,13 +4357,13 @@ public void testGetVersion2() {

static CompletableFuture<Boolean> executionStarted = new CompletableFuture<>();

public static class TestGetVersionWithoutDecisionEventWorkflowImpl
implements TestWorkflowSignaled {
public static class TestGetVersionWithoutDecisionEventWorkflowImpl implements TestWorkflow3 {

CompletablePromise<Boolean> signalReceived = Workflow.newPromise();
String result = "";

@Override
public String execute() {
public String execute(String taskList) {
try {
if (!getVersionExecuted.contains("getVersionWithoutDecisionEvent")) {
// Execute getVersion in non-replay mode.
Expand All @@ -4353,10 +4376,11 @@ public String execute() {
int version = Workflow.getVersion("test_change", Workflow.DEFAULT_VERSION, 1);
if (version == Workflow.DEFAULT_VERSION) {
signalReceived.get();
return "result 1";
result = "result 1";
} else {
return "result 2";
result = "result 2";
}
return result;
}
} catch (Exception e) {
throw new RuntimeException("failed to get from signal");
Expand All @@ -4369,6 +4393,11 @@ public String execute() {
public void signal1(String arg) {
signalReceived.complete(true);
}

@Override
public String getState() {
return result;
}
}

@Test
Expand All @@ -4377,25 +4406,26 @@ public void testGetVersionWithoutDecisionEvent() throws Exception {
executionStarted = new CompletableFuture<>();
getVersionExecuted.remove("getVersionWithoutDecisionEvent");
startWorkerFor(TestGetVersionWithoutDecisionEventWorkflowImpl.class);
TestWorkflowSignaled workflowStub =
TestWorkflow3 workflowStub =
workflowClient.newWorkflowStub(
TestWorkflowSignaled.class, newWorkflowOptionsBuilder(taskList).build());
WorkflowClient.start(workflowStub::execute);
TestWorkflow3.class, newWorkflowOptionsBuilder(taskList).build());
WorkflowClient.start(workflowStub::execute, taskList);
executionStarted.get();
workflowStub.signal1("test signal");
String result = workflowStub.execute();
String result = workflowStub.execute(taskList);
assertEquals("result 1", result);
assertEquals("result 1", workflowStub.getState());
}

// The following test covers the scenario where getVersion call is removed before a
// non-version-marker decision.
public static class TestGetVersionRemovedInReplayWorkflowImpl implements TestWorkflow1 {
public static class TestGetVersionRemovedInReplayWorkflowImpl implements TestWorkflowQuery {
String result = "";

@Override
public String execute(String taskList) {
TestActivities testActivities =
Workflow.newActivityStub(TestActivities.class, newActivityOptions1(taskList));
String result;
// Test removing a version check in replay code.
if (!getVersionExecuted.contains(taskList)) {
int version = Workflow.getVersion("test_change", Workflow.DEFAULT_VERSION, 1);
Expand All @@ -4412,25 +4442,33 @@ public String execute(String taskList) {
result += testActivities.activity();
return result;
}

@Override
public String query() {
return result;
}
}

@Test
public void testGetVersionRemovedInReplay() {
startWorkerFor(TestGetVersionRemovedInReplayWorkflowImpl.class);
TestWorkflow1 workflowStub =
TestWorkflowQuery workflowStub =
workflowClient.newWorkflowStub(
TestWorkflow1.class, newWorkflowOptionsBuilder(taskList).build());
TestWorkflowQuery.class, newWorkflowOptionsBuilder(taskList).build());
String result = workflowStub.execute(taskList);
assertEquals("activity22activity", result);
tracer.setExpected(
"registerQuery TestWorkflowQuery::query",
"getVersion",
"executeActivity TestActivities::activity2",
"executeActivity TestActivities::activity");
assertEquals("activity22activity", workflowStub.query());
}

// The following test covers the scenario where getVersion call is removed before another
// version-marker decision.
public static class TestGetVersionRemovedInReplay2WorkflowImpl implements TestWorkflow1 {
public static class TestGetVersionRemovedInReplay2WorkflowImpl implements TestWorkflowQuery {
String result = "";

@Override
public String execute(String taskList) {
Expand All @@ -4445,19 +4483,30 @@ public String execute(String taskList) {
Workflow.getVersion("test_change_2", Workflow.DEFAULT_VERSION, 2);
}

return testActivities.activity();
result = testActivities.activity();
return result;
}

@Override
public String query() {
return result;
}
}

@Test
public void testGetVersionRemovedInReplay2() {
startWorkerFor(TestGetVersionRemovedInReplay2WorkflowImpl.class);
TestWorkflow1 workflowStub =
TestWorkflowQuery workflowStub =
workflowClient.newWorkflowStub(
TestWorkflow1.class, newWorkflowOptionsBuilder(taskList).build());
TestWorkflowQuery.class, newWorkflowOptionsBuilder(taskList).build());
String result = workflowStub.execute(taskList);
assertEquals("activity", result);
tracer.setExpected("getVersion", "getVersion", "executeActivity TestActivities::activity");
tracer.setExpected(
"registerQuery TestWorkflowQuery::query",
"getVersion",
"getVersion",
"executeActivity TestActivities::activity");
assertEquals("activity", workflowStub.query());
}

public static class TestVersionNotSupportedWorkflowImpl implements TestWorkflow1 {
Expand Down Expand Up @@ -5162,15 +5211,6 @@ public void testParallelLocalActivityExecutionWorkflow() {
result);
}

public interface TestWorkflowQuery {

@WorkflowMethod()
String execute(String taskList);

@QueryMethod()
String query();
}

public static final class TestLocalActivityAndQueryWorkflow implements TestWorkflowQuery {

String message = "initial value";
Expand Down Expand Up @@ -5846,4 +5886,73 @@ public void upsertSearchAttributes(Map<String, Object> searchAttributes) {
next.upsertSearchAttributes(searchAttributes);
}
}

public static class TestGetVersionWorkflowRetryImpl implements TestWorkflow3 {
private String result = "";

@Override
public String execute(String taskList) {
int version = Workflow.getVersion("test_change", Workflow.DEFAULT_VERSION, 1);
int act = 0;
if (version == 1) {
ActivityOptions options =
new ActivityOptions.Builder()
.setTaskList(taskList)
.setHeartbeatTimeout(Duration.ofSeconds(5))
.setScheduleToCloseTimeout(Duration.ofSeconds(5))
.setScheduleToStartTimeout(Duration.ofSeconds(5))
.setStartToCloseTimeout(Duration.ofSeconds(10))
.setRetryOptions(
new RetryOptions.Builder()
.setMaximumAttempts(3)
.setInitialInterval(Duration.ofSeconds(1))
.build())
.build();

TestActivities testActivities = Workflow.newActivityStub(TestActivities.class, options);
act = testActivities.activity1(1);
}

result += "activity" + act;
return result;
}

@Override
public void signal1(String arg) {
Workflow.sleep(1000);
}

@Override
public String getState() {
return result;
}
}

@Test
public void testGetVersionRetry() throws ExecutionException, InterruptedException {
TestActivities activity = mock(TestActivities.class);
when(activity.activity1(1)).thenReturn(1);
worker.registerActivitiesImplementations(activity);

startWorkerFor(TestGetVersionWorkflowRetryImpl.class);
TestWorkflow3 workflowStub =
workflowClient.newWorkflowStub(
TestWorkflow3.class, newWorkflowOptionsBuilder(taskList).build());
CompletableFuture<String> result = WorkflowClient.execute(workflowStub::execute, taskList);
workflowStub.signal1("test");
assertEquals("activity1", result.get());

// test replay
assertEquals("activity1", workflowStub.getState());
}

@Test
public void testGetVersionWithRetryReplay() throws Exception {
// Avoid executing 4 times
Assume.assumeFalse("skipping for docker tests", useExternalService);
Assume.assumeFalse("skipping for sticky off", disableStickyExecution);

WorkflowReplayer.replayWorkflowExecutionFromResource(
"testGetVersionWithRetryHistory.json", TestGetVersionWorkflowRetryImpl.class);
}
}
Loading