Skip to content

Commit 0351c61

Browse files
Add sample for signal and wait until applied and get apply results (#50)
1 parent 71b4d45 commit 0351c61

File tree

1 file changed

+297
-0
lines changed

1 file changed

+297
-0
lines changed
Lines changed: 297 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,297 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
18+
package com.uber.cadence.samples.hello;
19+
20+
import static com.uber.cadence.samples.common.SampleConstants.DOMAIN;
21+
22+
import com.uber.cadence.DescribeWorkflowExecutionRequest;
23+
import com.uber.cadence.DescribeWorkflowExecutionResponse;
24+
import com.uber.cadence.EventType;
25+
import com.uber.cadence.GetWorkflowExecutionHistoryRequest;
26+
import com.uber.cadence.GetWorkflowExecutionHistoryResponse;
27+
import com.uber.cadence.HistoryEvent;
28+
import com.uber.cadence.SearchAttributes;
29+
import com.uber.cadence.WorkflowExecution;
30+
import com.uber.cadence.client.WorkflowClient;
31+
import com.uber.cadence.client.WorkflowClientOptions;
32+
import com.uber.cadence.client.WorkflowOptions;
33+
import com.uber.cadence.converter.DataConverter;
34+
import com.uber.cadence.converter.JsonDataConverter;
35+
import com.uber.cadence.serviceclient.ClientOptions;
36+
import com.uber.cadence.serviceclient.WorkflowServiceTChannel;
37+
import com.uber.cadence.worker.Worker;
38+
import com.uber.cadence.worker.WorkerFactory;
39+
import com.uber.cadence.workflow.SignalMethod;
40+
import com.uber.cadence.workflow.Workflow;
41+
import com.uber.cadence.workflow.WorkflowMethod;
42+
import com.uber.cadence.workflow.WorkflowUtils;
43+
import java.nio.charset.Charset;
44+
import java.time.Duration;
45+
import java.util.ArrayList;
46+
import java.util.Arrays;
47+
import java.util.HashMap;
48+
import java.util.List;
49+
import java.util.Map;
50+
import org.apache.commons.lang.RandomStringUtils;
51+
52+
/**
53+
* Demonstrates signalling a workflow, and wait until it's applied and get a
54+
* response. This should be much performant(lower latency) than using signal+query approach.
55+
* Requires a Cadence server to be running.
56+
*/
57+
@SuppressWarnings("ALL")
58+
public class HelloSignalAndResponse {
59+
60+
static final String TASK_LIST = "HelloSignal";
61+
62+
/** Workflow interface must have a method annotated with @WorkflowMethod. */
63+
public interface GreetingWorkflow {
64+
/**
65+
* @return list of greeting strings that were received through the receiveName Method. This
66+
* method will block until the number of greetings specified are received.
67+
*/
68+
@WorkflowMethod
69+
List<String> getGreetings();
70+
71+
/** Receives name through an external signal. */
72+
@SignalMethod
73+
void receiveName(String name);
74+
75+
@SignalMethod
76+
void exit();
77+
}
78+
79+
/** GreetingWorkflow implementation that returns a greeting. */
80+
public static class GreetingWorkflowImpl implements GreetingWorkflow {
81+
82+
List<String> messageQueue = new ArrayList<>(10);
83+
boolean exit = false;
84+
85+
@Override
86+
public List<String> getGreetings() {
87+
List<String> receivedMessages = new ArrayList<>(10);
88+
89+
while (true) {
90+
Workflow.await(() -> !messageQueue.isEmpty() || exit);
91+
if (messageQueue.isEmpty() && exit) {
92+
return receivedMessages;
93+
}
94+
String message = messageQueue.remove(0);
95+
receivedMessages.add(message);
96+
}
97+
}
98+
99+
@Override
100+
public void receiveName(String name) {
101+
Map<String, Object> upsertedMap = new HashMap<>();
102+
// Because we are going to get the response after signal, make sure first thing to do in the
103+
// signal method is to upsert search attribute with the response.
104+
// Use CustomKeywordField for response, in real code you may use other fields
105+
// If there are multiple signals processed in paralell, consider returning a map of message
106+
// to each status/result so that they won't overwrite each other
107+
upsertedMap.put("CustomKeywordField", name + ":" + "No_Error");
108+
Workflow.upsertSearchAttributes(upsertedMap);
109+
110+
messageQueue.add(name);
111+
}
112+
113+
@Override
114+
public void exit() {
115+
exit = true;
116+
}
117+
}
118+
119+
public static void main(String[] args) throws Exception {
120+
// Get a new client
121+
// NOTE: to set a different options, you can do like this:
122+
// ClientOptions.newBuilder().setRpcTimeout(5 * 1000).build();
123+
WorkflowClient workflowClient =
124+
WorkflowClient.newInstance(
125+
new WorkflowServiceTChannel(ClientOptions.defaultInstance()),
126+
WorkflowClientOptions.newBuilder().setDomain(DOMAIN).build());
127+
// Get worker to poll the task list.
128+
WorkerFactory factory = WorkerFactory.newInstance(workflowClient);
129+
Worker worker = factory.newWorker(TASK_LIST);
130+
worker.registerWorkflowImplementationTypes(GreetingWorkflowImpl.class);
131+
factory.start();
132+
133+
// In a real application use a business ID like customer ID or order ID
134+
String workflowId = RandomStringUtils.randomAlphabetic(10);
135+
136+
// Start a workflow execution. Usually this is done from another program.
137+
// Get a workflow stub using the same task list the worker uses.
138+
// The newly started workflow is going to have the workflowId generated above.
139+
WorkflowOptions workflowOptions =
140+
new WorkflowOptions.Builder()
141+
.setTaskList(TASK_LIST)
142+
.setExecutionStartToCloseTimeout(Duration.ofSeconds(30))
143+
.setWorkflowId(workflowId)
144+
.build();
145+
GreetingWorkflow workflow =
146+
workflowClient.newWorkflowStub(GreetingWorkflow.class, workflowOptions);
147+
// Start workflow asynchronously to not use another thread to signal.
148+
WorkflowClient.start(workflow::getGreetings);
149+
// After start for getGreeting returns, the workflow is guaranteed to be started.
150+
// So we can send a signal to it using the workflow stub.
151+
// This workflow keeps receiving signals until exit is called
152+
String signal = "World";
153+
154+
final signalWaitResult result =
155+
signalAndWait(
156+
workflowClient,
157+
workflowId,
158+
"",
159+
() -> {
160+
workflow.receiveName(signal); // sends receiveName signal
161+
},
162+
JsonDataConverter.getInstance(),
163+
"GreetingWorkflow::receiveName",
164+
signal);
165+
166+
System.out.printf(
167+
"result: isReceived: %b, isProccessed: %b, isRunning: %b, runID: %s \n",
168+
result.isSignalReceived, result.isSignalProcessed, result.isWorkflowRunning, result.runId);
169+
if (result.isSignalProcessed) {
170+
// Get results from search attribute `CustomKeywordField`
171+
WorkflowExecution execution = new WorkflowExecution();
172+
execution.setWorkflowId(workflowId);
173+
execution.setRunId(
174+
result.runId); // make sure to sure the same runID in case the current run changes
175+
DescribeWorkflowExecutionRequest request = new DescribeWorkflowExecutionRequest();
176+
request.setDomain(DOMAIN);
177+
request.setExecution(execution);
178+
DescribeWorkflowExecutionResponse resp =
179+
workflowClient.getService().DescribeWorkflowExecution(request);
180+
SearchAttributes searchAttributes = resp.workflowExecutionInfo.getSearchAttributes();
181+
String keyword =
182+
WorkflowUtils.getValueFromSearchAttributes(
183+
searchAttributes, "CustomKeywordField", String.class);
184+
System.out.printf("Signal result is: %s\n", keyword);
185+
} else {
186+
System.out.printf("No result because signal was not processed");
187+
}
188+
}
189+
190+
/**
191+
* This signal helper not only sends signal to workflow, but also wait & return after the signal
192+
* has been applied. It will wait until the first decision task completed after the signal shows
193+
* by in the history(meaning recieved). It compare signalName and the signal method argument to
194+
* determine if that's the signal you sent. TODO: if this feature is proved to be useful, we
195+
* should move to client or server implementation
196+
*
197+
* <p>NOTE that the signalOperation should not use requestedID for deduping. (However, this
198+
* requestedID is not exposed in Java client yet anyway). Because deduping means a noop and return
199+
* success, then this helper will wait for signal forever.
200+
*
201+
* @param workflowClient
202+
* @param workflowId
203+
* @param runId
204+
* @param signalOperation the operation that will send signal
205+
* @param dataConverter for converting signalArgs to compare and determine if the signal has
206+
* received
207+
* @param signalName for comparing the signalName received in the history
208+
* @param signalArgs for comparing the signalData(converted by dataConverter) received in the
209+
* history
210+
*/
211+
private static signalWaitResult signalAndWait(
212+
WorkflowClient workflowClient,
213+
String workflowId,
214+
String runId,
215+
Runnable signalOperation,
216+
DataConverter dataConverter,
217+
String signalName,
218+
Object... signalArgs)
219+
throws Exception {
220+
final byte[] signalData = dataConverter.toData(signalArgs);
221+
signalWaitResult result = new signalWaitResult();
222+
223+
// get the current eventID
224+
WorkflowExecution execution = new WorkflowExecution();
225+
execution.setWorkflowId(workflowId);
226+
execution.setRunId(runId);
227+
DescribeWorkflowExecutionRequest request = new DescribeWorkflowExecutionRequest();
228+
request.setDomain(DOMAIN);
229+
request.setExecution(execution);
230+
DescribeWorkflowExecutionResponse resp =
231+
workflowClient.getService().DescribeWorkflowExecution(request);
232+
long currentEventId = resp.workflowExecutionInfo.historyLength;
233+
result.runId = resp.workflowExecutionInfo.execution.runId;
234+
235+
// send signal
236+
signalOperation.run();
237+
238+
// Poll history starting from currentEventId,
239+
// then wait until the signal is received, and then wait until it's
240+
// processed(decisionTaskCompleted)
241+
result.isSignalReceived = false;
242+
result.isSignalProcessed = false;
243+
result.isWorkflowRunning = !resp.workflowExecutionInfo.isSetCloseStatus();
244+
245+
while (result.isWorkflowRunning && !result.isSignalProcessed) {
246+
GetWorkflowExecutionHistoryRequest historyReq = new GetWorkflowExecutionHistoryRequest();
247+
historyReq.setDomain(DOMAIN);
248+
historyReq.setExecution(execution);
249+
historyReq.setWaitForNewEvent(true);
250+
String token =
251+
String.format(
252+
"{\"RunID\":\"%s\",\"FirstEventID\":0,\"NextEventID\":%d,\"IsWorkflowRunning\":true,\"PersistenceToken\":null,\"TransientDecision\":null,\"BranchToken\":null}",
253+
result.runId, currentEventId + 1);
254+
historyReq.setNextPageToken(token.getBytes(Charset.defaultCharset()));
255+
final GetWorkflowExecutionHistoryResponse historyResp =
256+
workflowClient.getService().GetWorkflowExecutionHistory(historyReq);
257+
token = new String(historyResp.getNextPageToken(), Charset.defaultCharset());
258+
result.isWorkflowRunning = token.contains("\"IsWorkflowRunning\":true");
259+
260+
for (HistoryEvent event : historyResp.history.events) {
261+
if (!result.isSignalReceived) {
262+
// wait for signal received
263+
if (event.getEventType() == EventType.WorkflowExecutionSignaled) {
264+
final byte[] eventSignalData =
265+
event.getWorkflowExecutionSignaledEventAttributes().getInput();
266+
final String eventSignalName =
267+
event.getWorkflowExecutionSignaledEventAttributes().getSignalName();
268+
if (Arrays.equals(eventSignalData, signalData) && eventSignalName.equals(signalName)) {
269+
result.isSignalReceived = true;
270+
} else {
271+
if (Arrays.equals(eventSignalData, signalData)
272+
|| eventSignalName.equals(signalName)) {
273+
System.out.println(
274+
"[WARN] either signal event data or signalName doesn't match, is the signalArgs and signalName correct?");
275+
}
276+
}
277+
}
278+
} else {
279+
// signal is received, now wait for first decisioin task complete
280+
if (event.getEventType() == EventType.DecisionTaskCompleted) {
281+
result.isSignalProcessed = true;
282+
break;
283+
}
284+
}
285+
currentEventId = event.getEventId();
286+
}
287+
}
288+
return result;
289+
}
290+
291+
private static class signalWaitResult {
292+
public boolean isSignalProcessed;
293+
public boolean isSignalReceived;
294+
public boolean isWorkflowRunning;
295+
public String runId;
296+
}
297+
}

0 commit comments

Comments
 (0)