Skip to content

Commit 9abf4bc

Browse files
authored
remove raw history (#1004)
What changed? Remove raw history support in client Why? History is stored as Thrift encoded binary. Sending raw history in Thrift will no longer be supported in V4 How did you test it? Unit Test
1 parent e38793d commit 9abf4bc

File tree

6 files changed

+20
-237
lines changed

6 files changed

+20
-237
lines changed

src/main/java/com/uber/cadence/internal/common/InternalUtils.java

Lines changed: 0 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,6 @@
1818
package com.uber.cadence.internal.common;
1919

2020
import com.google.common.base.Defaults;
21-
import com.google.common.collect.Lists;
22-
import com.uber.cadence.DataBlob;
23-
import com.uber.cadence.History;
24-
import com.uber.cadence.HistoryEvent;
25-
import com.uber.cadence.HistoryEventFilterType;
2621
import com.uber.cadence.Memo;
2722
import com.uber.cadence.SearchAttributes;
2823
import com.uber.cadence.TaskList;
@@ -33,15 +28,10 @@
3328
import com.uber.cadence.workflow.WorkflowMethod;
3429
import java.lang.reflect.Method;
3530
import java.nio.ByteBuffer;
36-
import java.util.Arrays;
3731
import java.util.HashMap;
38-
import java.util.List;
3932
import java.util.Map;
4033
import java.util.concurrent.ExecutorService;
4134
import java.util.concurrent.TimeUnit;
42-
import org.apache.thrift.TDeserializer;
43-
import org.apache.thrift.TException;
44-
import org.apache.thrift.TSerializer;
4535

4636
/** Utility functions shared by the implementation code. */
4737
public final class InternalUtils {
@@ -164,93 +154,6 @@ public static SearchAttributes convertMapToSearchAttributes(
164154
return new SearchAttributes().setIndexedFields(mapOfByteBuffer);
165155
}
166156

167-
// This method serializes history to blob data
168-
public static DataBlob SerializeFromHistoryToBlobData(History history) {
169-
170-
// TODO: move to global dependency after https://issues.apache.org/jira/browse/THRIFT-2218
171-
TSerializer serializer = new TSerializer();
172-
DataBlob blob = new DataBlob();
173-
try {
174-
blob.setData(serializer.serialize(history));
175-
} catch (org.apache.thrift.TException err) {
176-
throw new RuntimeException("Serialize history to blob data failed", err);
177-
}
178-
179-
return blob;
180-
}
181-
182-
// This method deserialize the DataBlob data to the History data
183-
public static History DeserializeFromBlobDataToHistory(
184-
List<DataBlob> blobData, HistoryEventFilterType historyEventFilterType) throws TException {
185-
186-
// TODO: move to global dependency after https://issues.apache.org/jira/browse/THRIFT-2218
187-
TDeserializer deSerializer = new TDeserializer();
188-
List<HistoryEvent> events = Lists.newArrayList();
189-
for (DataBlob data : blobData) {
190-
History history = new History();
191-
try {
192-
byte[] dataByte = data.getData();
193-
// TODO: verify the beginning index
194-
dataByte = Arrays.copyOfRange(dataByte, 0, dataByte.length);
195-
deSerializer.deserialize(history, dataByte);
196-
197-
if (history == null || history.getEvents() == null || history.getEvents().size() == 0) {
198-
return null;
199-
}
200-
} catch (org.apache.thrift.TException err) {
201-
throw new TException("Deserialize blob data to history failed with unknown error");
202-
}
203-
204-
events.addAll(history.getEvents());
205-
}
206-
207-
if (events.size() > 0 && historyEventFilterType == HistoryEventFilterType.CLOSE_EVENT) {
208-
events = events.subList(events.size() - 1, events.size());
209-
}
210-
211-
return new History().setEvents(events);
212-
}
213-
214-
// This method serializes history event to blob data
215-
public static List<DataBlob> SerializeFromHistoryEventToBlobData(List<HistoryEvent> events) {
216-
217-
// TODO: move to global dependency after https://issues.apache.org/jira/browse/THRIFT-2218
218-
TSerializer serializer = new TSerializer();
219-
List<DataBlob> blobs = Lists.newArrayListWithCapacity(events.size());
220-
for (HistoryEvent event : events) {
221-
DataBlob blob = new DataBlob();
222-
try {
223-
blob.setData(serializer.serialize(event));
224-
} catch (org.apache.thrift.TException err) {
225-
throw new RuntimeException("Serialize history event to blob data failed", err);
226-
}
227-
blobs.add(blob);
228-
}
229-
return blobs;
230-
}
231-
232-
// This method serializes blob data to history event
233-
public static List<HistoryEvent> DeserializeFromBlobDataToHistoryEvents(List<DataBlob> blobData)
234-
throws TException {
235-
236-
// TODO: move to global dependency after https://issues.apache.org/jira/browse/THRIFT-2218
237-
TDeserializer deSerializer = new TDeserializer();
238-
List<HistoryEvent> events = Lists.newArrayList();
239-
for (DataBlob data : blobData) {
240-
try {
241-
HistoryEvent event = new HistoryEvent();
242-
byte[] dataByte = data.getData();
243-
// TODO: verify the beginning index
244-
dataByte = Arrays.copyOfRange(dataByte, 0, dataByte.length);
245-
deSerializer.deserialize(event, dataByte);
246-
events.add(event);
247-
} catch (org.apache.thrift.TException err) {
248-
throw new TException("Deserialize blob data to history event failed with unknown error");
249-
}
250-
}
251-
return events;
252-
}
253-
254157
/** Prohibit instantiation */
255158
private InternalUtils() {}
256159
}

src/main/java/com/uber/cadence/internal/shadowing/ReplayWorkflowActivityImpl.java

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,9 @@
1919

2020
import com.google.common.collect.Lists;
2121
import com.uber.cadence.GetWorkflowExecutionHistoryResponse;
22-
import com.uber.cadence.History;
2322
import com.uber.cadence.HistoryEvent;
24-
import com.uber.cadence.HistoryEventFilterType;
2523
import com.uber.cadence.activity.Activity;
2624
import com.uber.cadence.common.WorkflowExecutionHistory;
27-
import com.uber.cadence.internal.common.InternalUtils;
2825
import com.uber.cadence.internal.common.RpcRetryer;
2926
import com.uber.cadence.internal.common.WorkflowExecutionUtils;
3027
import com.uber.cadence.internal.metrics.MetricsType;
@@ -185,14 +182,10 @@ protected WorkflowExecutionHistory getFullHistory(String domain, WorkflowExecuti
185182
nextPageToken, this.serviceClient, domain, execution.toThrift()));
186183
pageToken = resp.getNextPageToken();
187184

188-
// handle raw history
185+
// TODO support raw history feature once server removes default Thrift encoding
189186
if (resp.getRawHistory() != null && resp.getRawHistory().size() > 0) {
190-
History history =
191-
InternalUtils.DeserializeFromBlobDataToHistory(
192-
resp.getRawHistory(), HistoryEventFilterType.ALL_EVENT);
193-
if (history != null && history.getEvents() != null) {
194-
histories.addAll(history.getEvents());
195-
}
187+
throw new UnsupportedOperationException(
188+
"Raw history is not supported. Please turn off frontend.sendRawWorkflowHistory feature flag in frontend service to recover");
196189
} else {
197190
histories.addAll(resp.getHistory().getEvents());
198191
}

src/main/java/com/uber/cadence/internal/testservice/TestWorkflowStoreImpl.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package com.uber.cadence.internal.testservice;
1919

2020
import com.uber.cadence.BadRequestError;
21-
import com.uber.cadence.DataBlob;
2221
import com.uber.cadence.EntityNotExistsError;
2322
import com.uber.cadence.EventType;
2423
import com.uber.cadence.GetWorkflowExecutionHistoryRequest;
@@ -34,7 +33,6 @@
3433
import com.uber.cadence.StickyExecutionAttributes;
3534
import com.uber.cadence.WorkflowExecution;
3635
import com.uber.cadence.WorkflowExecutionInfo;
37-
import com.uber.cadence.internal.common.InternalUtils;
3836
import com.uber.cadence.internal.common.WorkflowExecutionUtils;
3937
import com.uber.cadence.internal.testservice.RequestContext.Timer;
4038
import java.time.Duration;
@@ -348,24 +346,20 @@ public GetWorkflowExecutionHistoryResponse getWorkflowExecutionHistory(
348346
if (!getRequest.isWaitForNewEvent()
349347
&& getRequest.getHistoryEventFilterType() != HistoryEventFilterType.CLOSE_EVENT) {
350348
List<HistoryEvent> events = history.getEventsLocked();
351-
List<DataBlob> blobs = InternalUtils.SerializeFromHistoryEventToBlobData(events);
352349
// Copy the list as it is mutable. Individual events assumed immutable.
353350
ArrayList<HistoryEvent> eventsCopy = new ArrayList<>(events);
354351
return new GetWorkflowExecutionHistoryResponse()
355-
.setHistory(new History().setEvents(eventsCopy))
356-
.setRawHistory(blobs);
352+
.setHistory(new History().setEvents(eventsCopy));
357353
}
358354
expectedNextEventId = history.getNextEventIdLocked();
359355
} finally {
360356
lock.unlock();
361357
}
362358
List<HistoryEvent> events =
363359
history.waitForNewEvents(expectedNextEventId, getRequest.getHistoryEventFilterType());
364-
List<DataBlob> blobs = InternalUtils.SerializeFromHistoryEventToBlobData(events);
365360
GetWorkflowExecutionHistoryResponse result = new GetWorkflowExecutionHistoryResponse();
366361
if (events != null) {
367362
result.setHistory(new History().setEvents(events));
368-
result.setRawHistory(blobs);
369363
}
370364
return result;
371365
}

src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import com.uber.cadence.WorkflowService.GetWorkflowExecutionHistory_result;
2929
import com.uber.cadence.internal.Version;
3030
import com.uber.cadence.internal.common.CheckedExceptionWrapper;
31-
import com.uber.cadence.internal.common.InternalUtils;
3231
import com.uber.cadence.internal.metrics.MetricsTag;
3332
import com.uber.cadence.internal.metrics.MetricsType;
3433
import com.uber.cadence.internal.metrics.ServiceMethod;
@@ -766,10 +765,8 @@ private GetWorkflowExecutionHistoryResponse getWorkflowExecutionHistory(
766765
if (response.getResponseCode() == ResponseCode.OK) {
767766
GetWorkflowExecutionHistoryResponse res = result.getSuccess();
768767
if (res.getRawHistory() != null) {
769-
History history =
770-
InternalUtils.DeserializeFromBlobDataToHistory(
771-
res.getRawHistory(), getRequest.getHistoryEventFilterType());
772-
res.setHistory(history);
768+
throw new TException(
769+
"Raw history is not supported. Please turn off frontend.sendRawWorkflowHistory feature flag in frontend service to recover");
773770
}
774771
return res;
775772
}
@@ -2593,10 +2590,8 @@ private void getWorkflowExecutionHistory(
25932590
if (r.getResponseCode() == ResponseCode.OK) {
25942591
GetWorkflowExecutionHistoryResponse res = result.getSuccess();
25952592
if (res.getRawHistory() != null) {
2596-
History history =
2597-
InternalUtils.DeserializeFromBlobDataToHistory(
2598-
res.getRawHistory(), getRequest.getHistoryEventFilterType());
2599-
res.setHistory(history);
2593+
throw new TException(
2594+
"Raw history is not supported. Please turn off frontend.sendRawWorkflowHistory feature flag in frontend service to recover");
26002595
}
26012596
resultHandler.onComplete(res);
26022597
return;

src/test/java/com/uber/cadence/internal/common/InternalUtilsTest.java

Lines changed: 0 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,14 @@
1717

1818
package com.uber.cadence.internal.common;
1919

20-
import static com.uber.cadence.EventType.WorkflowExecutionStarted;
2120
import static junit.framework.TestCase.assertEquals;
22-
import static org.junit.Assert.assertNotNull;
2321

24-
import com.google.common.collect.Lists;
25-
import com.googlecode.junittoolbox.MultithreadingTester;
26-
import com.googlecode.junittoolbox.RunnableAssert;
2722
import com.uber.cadence.*;
2823
import com.uber.cadence.converter.DataConverterException;
2924
import com.uber.cadence.workflow.WorkflowUtils;
3025
import java.io.FileOutputStream;
31-
import java.time.LocalDateTime;
32-
import java.time.ZoneOffset;
3326
import java.util.HashMap;
34-
import java.util.List;
3527
import java.util.Map;
36-
import junit.framework.TestCase;
3728
import org.junit.Test;
3829

3930
public class InternalUtilsTest {
@@ -56,101 +47,4 @@ public void testConvertMapToSearchAttributesException() throws Throwable {
5647
attr.put("InvalidValue", new FileOutputStream("dummy"));
5748
InternalUtils.convertMapToSearchAttributes(attr);
5849
}
59-
60-
@Test
61-
public void testSerialization_History() {
62-
63-
RunnableAssert r =
64-
new RunnableAssert("history_serialization") {
65-
@Override
66-
public void run() {
67-
HistoryEvent event =
68-
new HistoryEvent()
69-
.setEventId(1)
70-
.setVersion(1)
71-
.setEventType(WorkflowExecutionStarted)
72-
.setTimestamp(LocalDateTime.now().toEpochSecond(ZoneOffset.UTC))
73-
.setWorkflowExecutionStartedEventAttributes(
74-
new WorkflowExecutionStartedEventAttributes()
75-
.setAttempt(1)
76-
.setFirstExecutionRunId("test"));
77-
78-
List<HistoryEvent> historyEvents = Lists.newArrayList(event);
79-
History history = new History().setEvents(historyEvents);
80-
DataBlob blob = InternalUtils.SerializeFromHistoryToBlobData(history);
81-
assertNotNull(blob);
82-
83-
try {
84-
History result =
85-
InternalUtils.DeserializeFromBlobDataToHistory(
86-
Lists.newArrayList(blob), HistoryEventFilterType.ALL_EVENT);
87-
assertNotNull(result);
88-
assertEquals(1, result.events.size());
89-
assertEquals(event.getEventId(), result.events.get(0).getEventId());
90-
assertEquals(event.getVersion(), result.events.get(0).getVersion());
91-
assertEquals(event.getEventType(), result.events.get(0).getEventType());
92-
assertEquals(event.getTimestamp(), result.events.get(0).getTimestamp());
93-
assertEquals(
94-
event.getWorkflowExecutionStartedEventAttributes(),
95-
result.events.get(0).getWorkflowExecutionStartedEventAttributes());
96-
} catch (Exception e) {
97-
TestCase.fail("Received unexpected error during deserialization");
98-
}
99-
}
100-
};
101-
102-
try {
103-
new MultithreadingTester().add(r).numThreads(50).numRoundsPerThread(10).run();
104-
} catch (Exception e) {
105-
TestCase.fail("Received unexpected error during concurrent deserialization");
106-
}
107-
}
108-
109-
@Test
110-
public void testSerialization_HistoryEvent() {
111-
112-
RunnableAssert r =
113-
new RunnableAssert("history_event_serialization") {
114-
@Override
115-
public void run() {
116-
HistoryEvent event =
117-
new HistoryEvent()
118-
.setEventId(1)
119-
.setVersion(1)
120-
.setEventType(WorkflowExecutionStarted)
121-
.setTimestamp(LocalDateTime.now().toEpochSecond(ZoneOffset.UTC))
122-
.setWorkflowExecutionStartedEventAttributes(
123-
new WorkflowExecutionStartedEventAttributes()
124-
.setAttempt(1)
125-
.setFirstExecutionRunId("test"));
126-
127-
List<HistoryEvent> historyEvents = Lists.newArrayList(event);
128-
List<DataBlob> blobList =
129-
InternalUtils.SerializeFromHistoryEventToBlobData(historyEvents);
130-
assertEquals(1, blobList.size());
131-
132-
try {
133-
List<HistoryEvent> result =
134-
InternalUtils.DeserializeFromBlobDataToHistoryEvents(blobList);
135-
assertNotNull(result);
136-
assertEquals(1, result.size());
137-
assertEquals(event.getEventId(), result.get(0).getEventId());
138-
assertEquals(event.getVersion(), result.get(0).getVersion());
139-
assertEquals(event.getEventType(), result.get(0).getEventType());
140-
assertEquals(event.getTimestamp(), result.get(0).getTimestamp());
141-
assertEquals(
142-
event.getWorkflowExecutionStartedEventAttributes(),
143-
result.get(0).getWorkflowExecutionStartedEventAttributes());
144-
} catch (Exception e) {
145-
TestCase.fail("Received unexpected error during deserialization");
146-
}
147-
}
148-
};
149-
150-
try {
151-
new MultithreadingTester().add(r).numThreads(50).numRoundsPerThread(10).run();
152-
} catch (Exception e) {
153-
TestCase.fail("Received unexpected error during concurrent deserialization");
154-
}
155-
}
15650
}

0 commit comments

Comments
 (0)