Skip to content

Remove raw history support #1004

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@
package com.uber.cadence.internal.common;

import com.google.common.base.Defaults;
import com.google.common.collect.Lists;
import com.uber.cadence.DataBlob;
import com.uber.cadence.History;
import com.uber.cadence.HistoryEvent;
import com.uber.cadence.HistoryEventFilterType;
import com.uber.cadence.Memo;
import com.uber.cadence.SearchAttributes;
import com.uber.cadence.TaskList;
Expand All @@ -33,15 +28,10 @@
import com.uber.cadence.workflow.WorkflowMethod;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;

/** Utility functions shared by the implementation code. */
public final class InternalUtils {
Expand Down Expand Up @@ -164,93 +154,6 @@ public static SearchAttributes convertMapToSearchAttributes(
return new SearchAttributes().setIndexedFields(mapOfByteBuffer);
}

// This method serializes history to blob data
public static DataBlob SerializeFromHistoryToBlobData(History history) {

// TODO: move to global dependency after https://issues.apache.org/jira/browse/THRIFT-2218
TSerializer serializer = new TSerializer();
DataBlob blob = new DataBlob();
try {
blob.setData(serializer.serialize(history));
} catch (org.apache.thrift.TException err) {
throw new RuntimeException("Serialize history to blob data failed", err);
}

return blob;
}

// This method deserialize the DataBlob data to the History data
public static History DeserializeFromBlobDataToHistory(
List<DataBlob> blobData, HistoryEventFilterType historyEventFilterType) throws TException {

// TODO: move to global dependency after https://issues.apache.org/jira/browse/THRIFT-2218
TDeserializer deSerializer = new TDeserializer();
List<HistoryEvent> events = Lists.newArrayList();
for (DataBlob data : blobData) {
History history = new History();
try {
byte[] dataByte = data.getData();
// TODO: verify the beginning index
dataByte = Arrays.copyOfRange(dataByte, 0, dataByte.length);
deSerializer.deserialize(history, dataByte);

if (history == null || history.getEvents() == null || history.getEvents().size() == 0) {
return null;
}
} catch (org.apache.thrift.TException err) {
throw new TException("Deserialize blob data to history failed with unknown error");
}

events.addAll(history.getEvents());
}

if (events.size() > 0 && historyEventFilterType == HistoryEventFilterType.CLOSE_EVENT) {
events = events.subList(events.size() - 1, events.size());
}

return new History().setEvents(events);
}

// This method serializes history event to blob data
public static List<DataBlob> SerializeFromHistoryEventToBlobData(List<HistoryEvent> events) {

// TODO: move to global dependency after https://issues.apache.org/jira/browse/THRIFT-2218
TSerializer serializer = new TSerializer();
List<DataBlob> blobs = Lists.newArrayListWithCapacity(events.size());
for (HistoryEvent event : events) {
DataBlob blob = new DataBlob();
try {
blob.setData(serializer.serialize(event));
} catch (org.apache.thrift.TException err) {
throw new RuntimeException("Serialize history event to blob data failed", err);
}
blobs.add(blob);
}
return blobs;
}

// This method serializes blob data to history event
public static List<HistoryEvent> DeserializeFromBlobDataToHistoryEvents(List<DataBlob> blobData)
throws TException {

// TODO: move to global dependency after https://issues.apache.org/jira/browse/THRIFT-2218
TDeserializer deSerializer = new TDeserializer();
List<HistoryEvent> events = Lists.newArrayList();
for (DataBlob data : blobData) {
try {
HistoryEvent event = new HistoryEvent();
byte[] dataByte = data.getData();
// TODO: verify the beginning index
dataByte = Arrays.copyOfRange(dataByte, 0, dataByte.length);
deSerializer.deserialize(event, dataByte);
events.add(event);
} catch (org.apache.thrift.TException err) {
throw new TException("Deserialize blob data to history event failed with unknown error");
}
}
return events;
}

/** Prohibit instantiation */
private InternalUtils() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,9 @@

import com.google.common.collect.Lists;
import com.uber.cadence.GetWorkflowExecutionHistoryResponse;
import com.uber.cadence.History;
import com.uber.cadence.HistoryEvent;
import com.uber.cadence.HistoryEventFilterType;
import com.uber.cadence.activity.Activity;
import com.uber.cadence.common.WorkflowExecutionHistory;
import com.uber.cadence.internal.common.InternalUtils;
import com.uber.cadence.internal.common.RpcRetryer;
import com.uber.cadence.internal.common.WorkflowExecutionUtils;
import com.uber.cadence.internal.metrics.MetricsType;
Expand Down Expand Up @@ -185,14 +182,10 @@ protected WorkflowExecutionHistory getFullHistory(String domain, WorkflowExecuti
nextPageToken, this.serviceClient, domain, execution.toThrift()));
pageToken = resp.getNextPageToken();

// handle raw history
// TODO support raw history feature once server removes default Thrift encoding
if (resp.getRawHistory() != null && resp.getRawHistory().size() > 0) {
History history =
InternalUtils.DeserializeFromBlobDataToHistory(
resp.getRawHistory(), HistoryEventFilterType.ALL_EVENT);
if (history != null && history.getEvents() != null) {
histories.addAll(history.getEvents());
}
throw new UnsupportedOperationException(
"Raw history is not supported. Please turn off frontend.sendRawWorkflowHistory feature flag in frontend service to recover");
} else {
histories.addAll(resp.getHistory().getEvents());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package com.uber.cadence.internal.testservice;

import com.uber.cadence.BadRequestError;
import com.uber.cadence.DataBlob;
import com.uber.cadence.EntityNotExistsError;
import com.uber.cadence.EventType;
import com.uber.cadence.GetWorkflowExecutionHistoryRequest;
Expand All @@ -34,7 +33,6 @@
import com.uber.cadence.StickyExecutionAttributes;
import com.uber.cadence.WorkflowExecution;
import com.uber.cadence.WorkflowExecutionInfo;
import com.uber.cadence.internal.common.InternalUtils;
import com.uber.cadence.internal.common.WorkflowExecutionUtils;
import com.uber.cadence.internal.testservice.RequestContext.Timer;
import java.time.Duration;
Expand Down Expand Up @@ -348,24 +346,20 @@ public GetWorkflowExecutionHistoryResponse getWorkflowExecutionHistory(
if (!getRequest.isWaitForNewEvent()
&& getRequest.getHistoryEventFilterType() != HistoryEventFilterType.CLOSE_EVENT) {
List<HistoryEvent> events = history.getEventsLocked();
List<DataBlob> blobs = InternalUtils.SerializeFromHistoryEventToBlobData(events);
// Copy the list as it is mutable. Individual events assumed immutable.
ArrayList<HistoryEvent> eventsCopy = new ArrayList<>(events);
return new GetWorkflowExecutionHistoryResponse()
.setHistory(new History().setEvents(eventsCopy))
.setRawHistory(blobs);
.setHistory(new History().setEvents(eventsCopy));
}
expectedNextEventId = history.getNextEventIdLocked();
} finally {
lock.unlock();
}
List<HistoryEvent> events =
history.waitForNewEvents(expectedNextEventId, getRequest.getHistoryEventFilterType());
List<DataBlob> blobs = InternalUtils.SerializeFromHistoryEventToBlobData(events);
GetWorkflowExecutionHistoryResponse result = new GetWorkflowExecutionHistoryResponse();
if (events != null) {
result.setHistory(new History().setEvents(events));
result.setRawHistory(blobs);
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.uber.cadence.WorkflowService.GetWorkflowExecutionHistory_result;
import com.uber.cadence.internal.Version;
import com.uber.cadence.internal.common.CheckedExceptionWrapper;
import com.uber.cadence.internal.common.InternalUtils;
import com.uber.cadence.internal.metrics.MetricsTag;
import com.uber.cadence.internal.metrics.MetricsType;
import com.uber.cadence.internal.metrics.ServiceMethod;
Expand Down Expand Up @@ -766,10 +765,8 @@ private GetWorkflowExecutionHistoryResponse getWorkflowExecutionHistory(
if (response.getResponseCode() == ResponseCode.OK) {
GetWorkflowExecutionHistoryResponse res = result.getSuccess();
if (res.getRawHistory() != null) {
History history =
InternalUtils.DeserializeFromBlobDataToHistory(
res.getRawHistory(), getRequest.getHistoryEventFilterType());
res.setHistory(history);
throw new TException(
"Raw history is not supported. Please turn off frontend.sendRawWorkflowHistory feature flag in frontend service to recover");
}
return res;
}
Expand Down Expand Up @@ -2593,10 +2590,8 @@ private void getWorkflowExecutionHistory(
if (r.getResponseCode() == ResponseCode.OK) {
GetWorkflowExecutionHistoryResponse res = result.getSuccess();
if (res.getRawHistory() != null) {
History history =
InternalUtils.DeserializeFromBlobDataToHistory(
res.getRawHistory(), getRequest.getHistoryEventFilterType());
res.setHistory(history);
throw new TException(
"Raw history is not supported. Please turn off frontend.sendRawWorkflowHistory feature flag in frontend service to recover");
}
resultHandler.onComplete(res);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,14 @@

package com.uber.cadence.internal.common;

import static com.uber.cadence.EventType.WorkflowExecutionStarted;
import static junit.framework.TestCase.assertEquals;
import static org.junit.Assert.assertNotNull;

import com.google.common.collect.Lists;
import com.googlecode.junittoolbox.MultithreadingTester;
import com.googlecode.junittoolbox.RunnableAssert;
import com.uber.cadence.*;
import com.uber.cadence.converter.DataConverterException;
import com.uber.cadence.workflow.WorkflowUtils;
import java.io.FileOutputStream;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import junit.framework.TestCase;
import org.junit.Test;

public class InternalUtilsTest {
Expand All @@ -56,101 +47,4 @@ public void testConvertMapToSearchAttributesException() throws Throwable {
attr.put("InvalidValue", new FileOutputStream("dummy"));
InternalUtils.convertMapToSearchAttributes(attr);
}

@Test
public void testSerialization_History() {

RunnableAssert r =
new RunnableAssert("history_serialization") {
@Override
public void run() {
HistoryEvent event =
new HistoryEvent()
.setEventId(1)
.setVersion(1)
.setEventType(WorkflowExecutionStarted)
.setTimestamp(LocalDateTime.now().toEpochSecond(ZoneOffset.UTC))
.setWorkflowExecutionStartedEventAttributes(
new WorkflowExecutionStartedEventAttributes()
.setAttempt(1)
.setFirstExecutionRunId("test"));

List<HistoryEvent> historyEvents = Lists.newArrayList(event);
History history = new History().setEvents(historyEvents);
DataBlob blob = InternalUtils.SerializeFromHistoryToBlobData(history);
assertNotNull(blob);

try {
History result =
InternalUtils.DeserializeFromBlobDataToHistory(
Lists.newArrayList(blob), HistoryEventFilterType.ALL_EVENT);
assertNotNull(result);
assertEquals(1, result.events.size());
assertEquals(event.getEventId(), result.events.get(0).getEventId());
assertEquals(event.getVersion(), result.events.get(0).getVersion());
assertEquals(event.getEventType(), result.events.get(0).getEventType());
assertEquals(event.getTimestamp(), result.events.get(0).getTimestamp());
assertEquals(
event.getWorkflowExecutionStartedEventAttributes(),
result.events.get(0).getWorkflowExecutionStartedEventAttributes());
} catch (Exception e) {
TestCase.fail("Received unexpected error during deserialization");
}
}
};

try {
new MultithreadingTester().add(r).numThreads(50).numRoundsPerThread(10).run();
} catch (Exception e) {
TestCase.fail("Received unexpected error during concurrent deserialization");
}
}

@Test
public void testSerialization_HistoryEvent() {

RunnableAssert r =
new RunnableAssert("history_event_serialization") {
@Override
public void run() {
HistoryEvent event =
new HistoryEvent()
.setEventId(1)
.setVersion(1)
.setEventType(WorkflowExecutionStarted)
.setTimestamp(LocalDateTime.now().toEpochSecond(ZoneOffset.UTC))
.setWorkflowExecutionStartedEventAttributes(
new WorkflowExecutionStartedEventAttributes()
.setAttempt(1)
.setFirstExecutionRunId("test"));

List<HistoryEvent> historyEvents = Lists.newArrayList(event);
List<DataBlob> blobList =
InternalUtils.SerializeFromHistoryEventToBlobData(historyEvents);
assertEquals(1, blobList.size());

try {
List<HistoryEvent> result =
InternalUtils.DeserializeFromBlobDataToHistoryEvents(blobList);
assertNotNull(result);
assertEquals(1, result.size());
assertEquals(event.getEventId(), result.get(0).getEventId());
assertEquals(event.getVersion(), result.get(0).getVersion());
assertEquals(event.getEventType(), result.get(0).getEventType());
assertEquals(event.getTimestamp(), result.get(0).getTimestamp());
assertEquals(
event.getWorkflowExecutionStartedEventAttributes(),
result.get(0).getWorkflowExecutionStartedEventAttributes());
} catch (Exception e) {
TestCase.fail("Received unexpected error during deserialization");
}
}
};

try {
new MultithreadingTester().add(r).numThreads(50).numRoundsPerThread(10).run();
} catch (Exception e) {
TestCase.fail("Received unexpected error during concurrent deserialization");
}
}
}
Loading