From 875f679731f9601d09bd735179275830014fdc48 Mon Sep 17 00:00:00 2001 From: Matt Anger Date: Fri, 4 Jun 2021 15:58:39 -0700 Subject: [PATCH 01/12] port over changeset from our fork --- build.gradle | 3 +- .../uber/cadence/client/WorkflowOptions.java | 48 +++- .../cadence/context/ContextPropagator.java | 30 +++ .../internal/context/ContextThreadLocal.java | 53 +++- .../internal/replay/WorkflowContext.java | 14 +- .../internal/sync/SyncDecisionContext.java | 14 +- .../internal/sync/SyncWorkflowWorker.java | 11 +- .../sync/TestActivityEnvironmentInternal.java | 6 + .../sync/TestWorkflowEnvironmentInternal.java | 5 + .../internal/sync/WorkflowStubImpl.java | 14 +- .../internal/sync/WorkflowThreadImpl.java | 6 + .../testservice/TestWorkflowService.java | 7 + .../internal/worker/ActivityWorker.java | 31 ++- .../internal/worker/LocalActivityWorker.java | 20 +- .../serviceclient/IWorkflowService.java | 4 + .../WorkflowServiceTChannel.java | 59 ++++- .../java/com/uber/cadence/worker/Worker.java | 14 + .../uber/cadence/worker/WorkerFactory.java | 12 + .../cadence/client/WorkflowOptionsTest.java | 1 + .../internal/testing/WorkflowTestingTest.java | 250 +----------------- .../workerFactory/WorkerFactoryTests.java | 8 +- src/test/resources/logback-test.xml | 2 +- 22 files changed, 342 insertions(+), 270 deletions(-) diff --git a/build.gradle b/build.gradle index 7e608194e..0fed3a527 100644 --- a/build.gradle +++ b/build.gradle @@ -53,7 +53,7 @@ dependencies { errorproneJavac('com.google.errorprone:javac:9+181-r4173-1') errorprone('com.google.errorprone:error_prone_core:2.3.4') - compile group: 'com.uber.tchannel', name: 'tchannel-core', version: '0.8.5' + compile group: 'com.uber.tchannel', name: 'tchannel-core', version: '0.8.30' compile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.25' compile group: 'org.apache.thrift', name: 'libthrift', version: '0.9.3' compile group: 'com.google.code.gson', name: 'gson', version: '2.8.6' @@ -62,6 +62,7 @@ dependencies { compile group: 'com.cronutils', name: 'cron-utils', version: '9.0.0' compile group: 'io.micrometer', name: 'micrometer-core', version: '1.1.2' compile group: 'javax.annotation', name: 'javax.annotation-api', version: '1.3.2' + compile group: 'io.opentelemetry', name: 'opentelemetry-sdk', version: '1.1.0' testCompile group: 'junit', name: 'junit', version: '4.12' testCompile group: 'com.googlecode.junit-toolbox', name: 'junit-toolbox', version: '2.4' diff --git a/src/main/java/com/uber/cadence/client/WorkflowOptions.java b/src/main/java/com/uber/cadence/client/WorkflowOptions.java index 3d75647ec..9ab9c60ce 100644 --- a/src/main/java/com/uber/cadence/client/WorkflowOptions.java +++ b/src/main/java/com/uber/cadence/client/WorkflowOptions.java @@ -30,9 +30,11 @@ import com.uber.cadence.common.MethodRetry; import com.uber.cadence.common.RetryOptions; import com.uber.cadence.context.ContextPropagator; +import com.uber.cadence.context.OpenTelemetryContextPropagator; import com.uber.cadence.internal.common.OptionsUtils; import com.uber.cadence.workflow.WorkflowMethod; import java.time.Duration; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Objects; @@ -66,6 +68,7 @@ public static WorkflowOptions merge( .setMemo(o.getMemo()) .setSearchAttributes(o.getSearchAttributes()) .setContextPropagators(o.getContextPropagators()) + .setDefaultContextPropagators(o.useDefaultContextPropagators) .validateBuildWithDefaults(); } @@ -91,6 +94,8 @@ public static final class Builder { private List contextPropagators; + private Boolean useDefaultContextPropagators; + public Builder() {} public Builder(WorkflowOptions o) { @@ -107,6 +112,7 @@ public Builder(WorkflowOptions o) { this.memo = o.memo; this.searchAttributes = o.searchAttributes; this.contextPropagators = o.contextPropagators; + this.useDefaultContextPropagators = o.useDefaultContextPropagators; } /** @@ -214,6 +220,13 @@ public Builder setContextPropagators(List contextPropagators) return this; } + /** Specifies that the default context propagators should not be used. */ + public Builder setDefaultContextPropagators(Boolean useDefaultContextPropagators) { + this.useDefaultContextPropagators = + (useDefaultContextPropagators == null || useDefaultContextPropagators); + return this; + } + public WorkflowOptions build() { return new WorkflowOptions( workflowId, @@ -225,7 +238,8 @@ public WorkflowOptions build() { cronSchedule, memo, searchAttributes, - contextPropagators); + contextPropagators, + useDefaultContextPropagators); } /** @@ -261,6 +275,20 @@ public WorkflowOptions validateBuildWithDefaults() { cron.validate(); } + if (useDefaultContextPropagators == null || useDefaultContextPropagators) { + // Add OpenTelemetry propagator if not already present. + if (contextPropagators != null) { + contextPropagators = new ArrayList(contextPropagators); + } else { + contextPropagators = new ArrayList<>(); + } + + OpenTelemetryContextPropagator otelPropagator = new OpenTelemetryContextPropagator(); + if (!contextPropagators.contains(otelPropagator)) { + contextPropagators.add(otelPropagator); + } + } + return new WorkflowOptions( workflowId, policy, @@ -272,7 +300,8 @@ public WorkflowOptions validateBuildWithDefaults() { cronSchedule, memo, searchAttributes, - contextPropagators); + contextPropagators, + useDefaultContextPropagators); } } @@ -296,6 +325,8 @@ public WorkflowOptions validateBuildWithDefaults() { private List contextPropagators; + private Boolean useDefaultContextPropagators; + private WorkflowOptions( String workflowId, WorkflowIdReusePolicy workflowIdReusePolicy, @@ -306,7 +337,8 @@ private WorkflowOptions( String cronSchedule, Map memo, Map searchAttributes, - List contextPropagators) { + List contextPropagators, + Boolean useDefaultContextPropagators) { this.workflowId = workflowId; this.workflowIdReusePolicy = workflowIdReusePolicy; this.executionStartToCloseTimeout = executionStartToCloseTimeout; @@ -317,6 +349,7 @@ private WorkflowOptions( this.memo = memo; this.searchAttributes = searchAttributes; this.contextPropagators = contextPropagators; + this.useDefaultContextPropagators = useDefaultContextPropagators; } public String getWorkflowId() { @@ -373,7 +406,9 @@ public boolean equals(Object o) { && Objects.equals(cronSchedule, that.cronSchedule) && Objects.equals(memo, that.memo) && Objects.equals(searchAttributes, that.searchAttributes) - && Objects.equals(contextPropagators, that.contextPropagators); + && Objects.equals(contextPropagators, that.contextPropagators) + && (useDefaultContextPropagators == null || useDefaultContextPropagators) + == (that.useDefaultContextPropagators == null || that.useDefaultContextPropagators); } @Override @@ -388,7 +423,8 @@ public int hashCode() { cronSchedule, memo, searchAttributes, - contextPropagators); + contextPropagators, + useDefaultContextPropagators); } @Override @@ -418,6 +454,8 @@ public String toString() { + searchAttributes + ", contextPropagators='" + contextPropagators + + ", useDefaultContextPropagators='" + + useDefaultContextPropagators + '\'' + '}'; } diff --git a/src/main/java/com/uber/cadence/context/ContextPropagator.java b/src/main/java/com/uber/cadence/context/ContextPropagator.java index 3a618f96c..d302ffdaa 100644 --- a/src/main/java/com/uber/cadence/context/ContextPropagator.java +++ b/src/main/java/com/uber/cadence/context/ContextPropagator.java @@ -23,6 +23,9 @@ * Context Propagators are used to propagate information from workflow to activity, workflow to * child workflow, and workflow to child thread (using {@link com.uber.cadence.workflow.Async}). * + *

It is important to note that all threads share one ContextPropagator instance, so your + * implementation must be thread-safe and store any state in ThreadLocal variables. + * *

A sample ContextPropagator that copies all {@link org.slf4j.MDC} entries starting * with a given prefix along the code path looks like this: * @@ -136,4 +139,31 @@ public interface ContextPropagator { /** Sets the current context */ void setCurrentContext(Object context); + + /** + * This is a lifecycle method, called after the context has been propagated to the + * workflow/activity thread but the workflow/activity has not yet started. + */ + default void setUp() { + // No-op + } + + /** + * This is a lifecycle method, called after the workflow/activity has completed. If the method + * finished without exception, {@code successful} will be true. Otherwise, it will be false and + * {@link #onError(Throwable)} will have already been called. + */ + default void finish() { + // No-op + } + + /** + * This is a lifecycle method, called when the workflow/activity finishes by throwing an unhandled + * exception. {@link #finish()} is called after this method. + * + * @param t The unhandled exception that caused the workflow/activity to terminate + */ + default void onError(Throwable t) { + // No-op + } } diff --git a/src/main/java/com/uber/cadence/internal/context/ContextThreadLocal.java b/src/main/java/com/uber/cadence/internal/context/ContextThreadLocal.java index 227124170..b4423b923 100644 --- a/src/main/java/com/uber/cadence/internal/context/ContextThreadLocal.java +++ b/src/main/java/com/uber/cadence/internal/context/ContextThreadLocal.java @@ -24,10 +24,14 @@ import java.util.List; import java.util.Map; import java.util.function.Supplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -/** This class holds the current set of context propagators */ +/** This class holds the current set of context propagators. */ public class ContextThreadLocal { + private static final Logger log = LoggerFactory.getLogger(ContextThreadLocal.class); + private static WorkflowThreadLocal> contextPropagators = WorkflowThreadLocal.withInitial( new Supplier>() { @@ -37,7 +41,7 @@ public List get() { } }); - /** Sets the list of context propagators for the thread */ + /** Sets the list of context propagators for the thread. */ public static void setContextPropagators(List propagators) { if (propagators == null || propagators.isEmpty()) { return; @@ -57,6 +61,11 @@ public static Map getCurrentContextForPropagation() { return contextData; } + /** + * Injects the context data into the thread for each configured context propagator. + * + * @param contextData The context data received from the server. + */ public static void propagateContextToCurrentThread(Map contextData) { if (contextData == null || contextData.isEmpty()) { return; @@ -67,4 +76,44 @@ public static void propagateContextToCurrentThread(Map contextDa } } } + + /** Calls {@link ContextPropagator#setUp()} for each propagator. */ + public static void setUpContextPropagators() { + for (ContextPropagator propagator : contextPropagators.get()) { + try { + propagator.setUp(); + } catch (Throwable t) { + // Don't let an error in one propagator block the others + log.error("Error calling setUp() on a contextpropagator", t); + } + } + } + + /** + * Calls {@link ContextPropagator#onError(Throwable)} for each propagator. + * + * @param t The Throwable that caused the workflow/activity to finish. + */ + public static void onErrorContextPropagators(Throwable t) { + for (ContextPropagator propagator : contextPropagators.get()) { + try { + propagator.onError(t); + } catch (Throwable t1) { + // Don't let an error in one propagator block the others + log.error("Error calling onError() on a contextpropagator", t1); + } + } + } + + /** Calls {@link ContextPropagator#finish()} for each propagator. */ + public static void finishContextPropagators() { + for (ContextPropagator propagator : contextPropagators.get()) { + try { + propagator.finish(); + } catch (Throwable t) { + // Don't let an error in one propagator block the others + log.error("Error calling finish() on a contextpropagator", t); + } + } + } } diff --git a/src/main/java/com/uber/cadence/internal/replay/WorkflowContext.java b/src/main/java/com/uber/cadence/internal/replay/WorkflowContext.java index 5f1d8f16f..729255727 100644 --- a/src/main/java/com/uber/cadence/internal/replay/WorkflowContext.java +++ b/src/main/java/com/uber/cadence/internal/replay/WorkflowContext.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; final class WorkflowContext { @@ -166,7 +167,18 @@ Map getPropagatedContexts() { Map contextData = new HashMap<>(); for (ContextPropagator propagator : contextPropagators) { - contextData.put(propagator.getName(), propagator.deserializeContext(headerData)); + // Only send the context propagator the fields that belong to them + // Change the map from MyPropagator:foo -> bar to foo -> bar + Map filteredData = + headerData + .entrySet() + .stream() + .filter(e -> e.getKey().startsWith(propagator.getName())) + .collect( + Collectors.toMap( + e -> e.getKey().substring(propagator.getName().length() + 1), + Map.Entry::getValue)); + contextData.put(propagator.getName(), propagator.deserializeContext(filteredData)); } return contextData; diff --git a/src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java b/src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java index 23955ce69..0e332ba7a 100644 --- a/src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java +++ b/src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java @@ -72,6 +72,7 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -449,7 +450,18 @@ private Map extractContextsAndConvertToBytes( } Map result = new HashMap<>(); for (ContextPropagator propagator : contextPropagators) { - result.putAll(propagator.serializeContext(propagator.getCurrentContext())); + // Get the serialized context from the propagator + Map serializedContext = + propagator.serializeContext(propagator.getCurrentContext()); + // Namespace each entry in case of overlaps, so foo -> bar becomes MyPropagator:foo -> bar + Map namespacedSerializedContext = + serializedContext + .entrySet() + .stream() + .collect( + Collectors.toMap( + e -> propagator.getName() + ":" + e.getKey(), Map.Entry::getValue)); + result.putAll(namespacedSerializedContext); } return result; } diff --git a/src/main/java/com/uber/cadence/internal/sync/SyncWorkflowWorker.java b/src/main/java/com/uber/cadence/internal/sync/SyncWorkflowWorker.java index 56636d514..5bd25f76a 100644 --- a/src/main/java/com/uber/cadence/internal/sync/SyncWorkflowWorker.java +++ b/src/main/java/com/uber/cadence/internal/sync/SyncWorkflowWorker.java @@ -39,10 +39,7 @@ import java.lang.reflect.Type; import java.time.Duration; import java.util.Objects; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.function.Consumer; import java.util.function.Function; @@ -59,6 +56,7 @@ public class SyncWorkflowWorker private final ScheduledExecutorService ldaHeartbeatExecutor = Executors.newScheduledThreadPool(4); private SuspendableWorker ldaWorker; private POJOActivityTaskHandler ldaTaskHandler; + private final IWorkflowService service; public SyncWorkflowWorker( IWorkflowService service, @@ -74,6 +72,7 @@ public SyncWorkflowWorker( ThreadPoolExecutor workflowThreadPool) { Objects.requireNonNull(workflowThreadPool); this.dataConverter = workflowOptions.getDataConverter(); + this.service = service; factory = new POJOWorkflowImplementationFactory( @@ -252,4 +251,8 @@ public R queryWorkflowExecution( public void accept(PollForDecisionTaskResponse pollForDecisionTaskResponse) { workflowWorker.accept(pollForDecisionTaskResponse); } + + public CompletableFuture isHealthy() { + return service.isHealthy(); + } } diff --git a/src/main/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternal.java b/src/main/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternal.java index 613f4d776..d313aa4c7 100644 --- a/src/main/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternal.java +++ b/src/main/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternal.java @@ -46,6 +46,7 @@ import java.util.Random; import java.util.UUID; import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicInteger; @@ -317,6 +318,11 @@ private class WorkflowServiceWrapper implements IWorkflowService { private final IWorkflowService impl; + @Override + public CompletableFuture isHealthy() { + return impl.isHealthy(); + } + private WorkflowServiceWrapper(IWorkflowService impl) { if (impl == null) { // Create empty implementation that just ignores all requests. diff --git a/src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java b/src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java index 90a9b00c8..e5f5cf8a0 100644 --- a/src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java +++ b/src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java @@ -484,6 +484,11 @@ public void GetWorkflowExecutionHistoryWithTimeout( impl.GetWorkflowExecutionHistoryWithTimeout(getRequest, resultHandler, timeoutInMillis); } + @Override + public CompletableFuture isHealthy() { + return impl.isHealthy(); + } + @Override public void PollForDecisionTask( PollForDecisionTaskRequest pollRequest, AsyncMethodCallback resultHandler) diff --git a/src/main/java/com/uber/cadence/internal/sync/WorkflowStubImpl.java b/src/main/java/com/uber/cadence/internal/sync/WorkflowStubImpl.java index 978720a23..48d32c12d 100644 --- a/src/main/java/com/uber/cadence/internal/sync/WorkflowStubImpl.java +++ b/src/main/java/com/uber/cadence/internal/sync/WorkflowStubImpl.java @@ -51,6 +51,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; class WorkflowStubImpl implements WorkflowStub { @@ -202,7 +203,18 @@ private Map extractContextsAndConvertToBytes( } Map result = new HashMap<>(); for (ContextPropagator propagator : contextPropagators) { - result.putAll(propagator.serializeContext(propagator.getCurrentContext())); + // Get the serialized context from the propagator + Map serializedContext = + propagator.serializeContext(propagator.getCurrentContext()); + // Namespace each entry in case of overlaps, so foo -> bar becomes MyPropagator:foo -> bar + Map namespacedSerializedContext = + serializedContext + .entrySet() + .stream() + .collect( + Collectors.toMap( + k -> propagator.getName() + ":" + k.getKey(), Map.Entry::getValue)); + result.putAll(namespacedSerializedContext); } return result; } diff --git a/src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java b/src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java index 9b7f5891f..bee091a2a 100644 --- a/src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java +++ b/src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java @@ -91,6 +91,7 @@ public void run() { // Repopulate the context(s) ContextThreadLocal.setContextPropagators(this.contextPropagators); ContextThreadLocal.propagateContextToCurrentThread(this.propagatedContexts); + ContextThreadLocal.setUpContextPropagators(); try { // initialYield blocks thread until the first runUntilBlocked is called. @@ -99,6 +100,7 @@ public void run() { cancellationScope.run(); } catch (DestroyWorkflowThreadError e) { if (!threadContext.isDestroyRequested()) { + ContextThreadLocal.onErrorContextPropagators(e); threadContext.setUnhandledException(e); } } catch (Error e) { @@ -111,9 +113,11 @@ public void run() { log.error( String.format("Workflow thread \"%s\" run failed with Error:\n%s", name, stackTrace)); } + ContextThreadLocal.onErrorContextPropagators(e); threadContext.setUnhandledException(e); } catch (CancellationException e) { if (!isCancelRequested()) { + ContextThreadLocal.onErrorContextPropagators(e); threadContext.setUnhandledException(e); } if (log.isDebugEnabled()) { @@ -130,8 +134,10 @@ public void run() { "Workflow thread \"%s\" run failed with unhandled exception:\n%s", name, stackTrace)); } + ContextThreadLocal.onErrorContextPropagators(e); threadContext.setUnhandledException(e); } finally { + ContextThreadLocal.finishContextPropagators(); DeterministicRunnerImpl.setCurrentThreadInternal(null); threadContext.setStatus(Status.DONE); thread.setName(originalName); diff --git a/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowService.java b/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowService.java index 79f3385fb..dfd59136f 100644 --- a/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowService.java +++ b/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowService.java @@ -845,6 +845,13 @@ public void GetWorkflowExecutionHistoryWithTimeout( GetWorkflowExecutionHistory(getRequest, resultHandler); } + @Override + public CompletableFuture isHealthy() { + CompletableFuture rval = new CompletableFuture<>(); + rval.complete(Boolean.TRUE); + return rval; + } + @Override public void PollForDecisionTask( PollForDecisionTaskRequest pollRequest, AsyncMethodCallback resultHandler) throws TException { diff --git a/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java b/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java index f1a100fb3..3b4a07c1f 100644 --- a/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java +++ b/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java @@ -41,6 +41,7 @@ import java.util.Objects; import java.util.concurrent.CancellationException; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.apache.thrift.TException; import org.slf4j.MDC; @@ -161,7 +162,11 @@ public void handle(PollForActivityTaskResponse task) throws Exception { Stopwatch sw = metricsScope.timer(MetricsType.ACTIVITY_RESP_LATENCY).start(); sendReply(task, new Result(null, null, cancelledRequest), metricsScope); sw.stop(); + onErrorContextPropagation(e); + } catch (Exception e) { + onErrorContextPropagation(e); } finally { + finishContextPropagation(); MDC.remove(LoggerTag.ACTIVITY_ID); MDC.remove(LoggerTag.ACTIVITY_TYPE); MDC.remove(LoggerTag.WORKFLOW_ID); @@ -188,7 +193,31 @@ void propagateContext(PollForActivityTaskResponse response) { }); for (ContextPropagator propagator : options.getContextPropagators()) { - propagator.setCurrentContext(propagator.deserializeContext(headerData)); + // Only send the context propagator the fields that belong to them + // Change the map from MyPropagator:foo -> bar to foo -> bar + Map filteredData = + headerData + .entrySet() + .stream() + .filter(e -> e.getKey().startsWith(propagator.getName())) + .collect( + Collectors.toMap( + e -> e.getKey().substring(propagator.getName().length() + 1), + Map.Entry::getValue)); + propagator.setCurrentContext(propagator.deserializeContext(filteredData)); + propagator.setUp(); + } + } + + void onErrorContextPropagation(Exception error) { + for (ContextPropagator propagator : options.getContextPropagators()) { + propagator.onError(error); + } + } + + void finishContextPropagation() { + for (ContextPropagator propagator : options.getContextPropagators()) { + propagator.finish(); } } diff --git a/src/main/java/com/uber/cadence/internal/worker/LocalActivityWorker.java b/src/main/java/com/uber/cadence/internal/worker/LocalActivityWorker.java index c08870adf..08e765aad 100644 --- a/src/main/java/com/uber/cadence/internal/worker/LocalActivityWorker.java +++ b/src/main/java/com/uber/cadence/internal/worker/LocalActivityWorker.java @@ -22,6 +22,7 @@ import com.uber.cadence.MarkerRecordedEventAttributes; import com.uber.cadence.PollForActivityTaskResponse; import com.uber.cadence.common.RetryOptions; +import com.uber.cadence.context.ContextPropagator; import com.uber.cadence.internal.common.LocalActivityMarkerData; import com.uber.cadence.internal.metrics.MetricsTag; import com.uber.cadence.internal.metrics.MetricsType; @@ -37,6 +38,7 @@ import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.LongSupplier; +import java.util.stream.Collectors; public final class LocalActivityWorker extends SuspendableWorkerBase { @@ -225,9 +227,19 @@ private void propagateContext(ExecuteLocalActivityParameters params) { } private void restoreContext(Map context) { - options - .getContextPropagators() - .forEach( - propagator -> propagator.setCurrentContext(propagator.deserializeContext(context))); + for (ContextPropagator propagator : options.getContextPropagators()) { + // Only send the context propagator the fields that belong to them + // Change the map from MyPropagator:foo -> bar to foo -> bar + Map filteredData = + context + .entrySet() + .stream() + .filter(e -> e.getKey().startsWith(propagator.getName())) + .collect( + Collectors.toMap( + e -> e.getKey().substring(propagator.getName().length() + 1), + Map.Entry::getValue)); + propagator.setCurrentContext(propagator.deserializeContext(filteredData)); + } } } diff --git a/src/main/java/com/uber/cadence/serviceclient/IWorkflowService.java b/src/main/java/com/uber/cadence/serviceclient/IWorkflowService.java index d275ac8f5..998fca610 100644 --- a/src/main/java/com/uber/cadence/serviceclient/IWorkflowService.java +++ b/src/main/java/com/uber/cadence/serviceclient/IWorkflowService.java @@ -23,6 +23,7 @@ import com.uber.cadence.StartWorkflowExecutionRequest; import com.uber.cadence.WorkflowService.AsyncIface; import com.uber.cadence.WorkflowService.Iface; +import java.util.concurrent.CompletableFuture; import org.apache.thrift.TException; import org.apache.thrift.async.AsyncMethodCallback; @@ -70,6 +71,7 @@ void GetWorkflowExecutionHistoryWithTimeout( AsyncMethodCallback resultHandler, Long timeoutInMillis) throws TException; + /** * SignalWorkflowExecutionWithTimeout signal workflow same as SignalWorkflowExecution but with * timeout @@ -84,4 +86,6 @@ void SignalWorkflowExecutionWithTimeout( AsyncMethodCallback resultHandler, Long timeoutInMillis) throws TException; + + CompletableFuture isHealthy(); } diff --git a/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java b/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java index 9f950b21c..6ae12a13d 100644 --- a/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java +++ b/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java @@ -104,6 +104,7 @@ import com.uber.tchannel.errors.ErrorType; import com.uber.tchannel.messages.ThriftRequest; import com.uber.tchannel.messages.ThriftResponse; +import com.uber.tchannel.messages.generated.Meta; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; @@ -127,7 +128,7 @@ public class WorkflowServiceTChannel implements IWorkflowService { private final ClientOptions options; private final Map thriftHeaders; private final TChannel tChannel; - private final SubChannel subChannel; + private SubChannel subChannel; /** * Creates Cadence client that connects to the specified host and port using specified options. @@ -159,6 +160,13 @@ public WorkflowServiceTChannel(ClientOptions options) { + Version.FEATURE_VERSION); } + public void resetSubchannelPeers() throws UnknownHostException { + InetAddress address = InetAddress.getByName(options.getHost()); + ArrayList peers = new ArrayList<>(); + peers.add(new InetSocketAddress(address, options.getPort())); + this.subChannel.setPeers(peers); + } + /** * Creates Cadence client with specified sub channel and options. * @@ -207,6 +215,45 @@ private ThriftRequest buildThriftRequest(String apiName, T body) { return buildThriftRequest(apiName, body, null); } + @Override + public CompletableFuture isHealthy() { + final ThriftRequest req = + new ThriftRequest.Builder("cadence-frontend", "Meta::health") + .setBody(new Meta.health_args()) + .build(); + final CompletableFuture result = new CompletableFuture<>(); + try { + + final TFuture> future = this.subChannel.send(req); + future.addCallback( + response -> { + req.releaseQuietly(); + if (response.isError()) { + try { + this.resetSubchannelPeers(); + } catch (final Exception inner_e) { + } + result.completeExceptionally(new TException("Rpc error:" + response.getError())); + } else { + result.complete(response.getBody(Meta.health_result.class).getSuccess().isOk()); + } + try { + response.release(); + } catch (final Exception e) { + // ignore + } + }); + } catch (final TChannelError e) { + req.releaseQuietly(); + try { + this.resetSubchannelPeers(); + } catch (final Exception inner_e) { + } + result.complete(Boolean.FALSE); + } + return result; + } + private ThriftRequest buildThriftRequest(String apiName, T body, Long rpcTimeoutOverride) { String endpoint = getEndpoint(INTERFACE_NAME, apiName); ThriftRequest.Builder builder = @@ -237,6 +284,11 @@ private ThriftResponse doRemoteCall(ThriftRequest request) throws TExc } catch (ExecutionException e) { throw new TException(e); } catch (TChannelError e) { + try { + resetSubchannelPeers(); + } catch (UnknownHostException uhe) { + // do nothing? + } throw new TException("Rpc error", e); } this.throwOnRpcError(response); @@ -249,6 +301,11 @@ private CompletableFuture> doRemoteCallAsync(ThriftRequest try { future = subChannel.send(request); } catch (TChannelError tChannelError) { + try { + resetSubchannelPeers(); + } catch (UnknownHostException uhe) { + // do nothing? + } result.completeExceptionally(new TException(tChannelError)); } future.addCallback( diff --git a/src/main/java/com/uber/cadence/worker/Worker.java b/src/main/java/com/uber/cadence/worker/Worker.java index 7f2d0e706..4527403e2 100644 --- a/src/main/java/com/uber/cadence/worker/Worker.java +++ b/src/main/java/com/uber/cadence/worker/Worker.java @@ -23,6 +23,7 @@ import com.uber.cadence.client.WorkflowClient; import com.uber.cadence.common.WorkflowExecutionHistory; import com.uber.cadence.context.ContextPropagator; +import com.uber.cadence.context.OpenTelemetryContextPropagator; import com.uber.cadence.converter.DataConverter; import com.uber.cadence.internal.common.InternalUtils; import com.uber.cadence.internal.metrics.MetricsTag; @@ -36,8 +37,10 @@ import com.uber.m3.tally.Scope; import com.uber.m3.util.ImmutableMap; import java.time.Duration; +import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -81,6 +84,13 @@ public final class Worker implements Suspendable { .getOptions() .getMetricsScope() .tagged(ImmutableMap.of(MetricsTag.TASK_LIST, taskList)); + contextPropagators = new ArrayList<>(contextPropagators); + + // Add the OpenTelemetry propagator if it's not already there + OpenTelemetryContextPropagator otelPropagator = new OpenTelemetryContextPropagator(); + if (!contextPropagators.contains(otelPropagator)) { + contextPropagators.add(otelPropagator); + } SingleWorkerOptions activityOptions = SingleWorkerOptions.newBuilder() @@ -326,4 +336,8 @@ public void resumePolling() { public boolean isSuspended() { return workflowWorker.isSuspended() && activityWorker.isSuspended(); } + + public CompletableFuture isHealthy() { + return workflowWorker.isHealthy(); + } } diff --git a/src/main/java/com/uber/cadence/worker/WorkerFactory.java b/src/main/java/com/uber/cadence/worker/WorkerFactory.java index 9a3d7d5c0..ff3ecf834 100644 --- a/src/main/java/com/uber/cadence/worker/WorkerFactory.java +++ b/src/main/java/com/uber/cadence/worker/WorkerFactory.java @@ -39,10 +39,12 @@ import java.util.List; import java.util.Objects; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -287,6 +289,16 @@ public synchronized void shutdownNow() { } } + public CompletableFuture isHealthy() { + List> healthyList = + workers.stream().map(Worker::isHealthy).collect(Collectors.toList()); + CompletableFuture result = CompletableFuture.supplyAsync(() -> true); + for (CompletableFuture future : healthyList) { + result = result.thenCombine(future, (current, other) -> current && other); + } + return result; + } + /** * Blocks until all tasks have completed execution after a shutdown request, or the timeout * occurs, or the current thread is interrupted, whichever happens first. diff --git a/src/test/java/com/uber/cadence/client/WorkflowOptionsTest.java b/src/test/java/com/uber/cadence/client/WorkflowOptionsTest.java index 050a307b8..fae572646 100644 --- a/src/test/java/com/uber/cadence/client/WorkflowOptionsTest.java +++ b/src/test/java/com/uber/cadence/client/WorkflowOptionsTest.java @@ -46,6 +46,7 @@ public void testOnlyOptionsAndEmptyAnnotationsPresent() throws NoSuchMethodExcep .setWorkflowIdReusePolicy(WorkflowIdReusePolicy.RejectDuplicate) .setMemo(getTestMemo()) .setSearchAttributes(getTestSearchAttributes()) + .setDefaultContextPropagators(false) .build(); WorkflowMethod a = WorkflowOptionsTest.class diff --git a/src/test/java/com/uber/cadence/internal/testing/WorkflowTestingTest.java b/src/test/java/com/uber/cadence/internal/testing/WorkflowTestingTest.java index e22de1bdd..81214c1bf 100644 --- a/src/test/java/com/uber/cadence/internal/testing/WorkflowTestingTest.java +++ b/src/test/java/com/uber/cadence/internal/testing/WorkflowTestingTest.java @@ -17,6 +17,7 @@ package com.uber.cadence.internal.testing; +import static org.junit.Assert.*; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -39,7 +40,7 @@ import com.uber.cadence.activity.ActivityMethod; import com.uber.cadence.activity.ActivityOptions; import com.uber.cadence.client.*; -import com.uber.cadence.context.ContextPropagator; +import com.uber.cadence.context.ContextTests; import com.uber.cadence.internal.common.WorkflowExecutionUtils; import com.uber.cadence.testing.SimulatedTimeoutException; import com.uber.cadence.testing.TestEnvironmentOptions; @@ -47,17 +48,14 @@ import com.uber.cadence.worker.Worker; import com.uber.cadence.workflow.ActivityTimeoutException; import com.uber.cadence.workflow.Async; -import com.uber.cadence.workflow.ChildWorkflowOptions; import com.uber.cadence.workflow.ChildWorkflowTimedOutException; import com.uber.cadence.workflow.Promise; import com.uber.cadence.workflow.SignalMethod; import com.uber.cadence.workflow.Workflow; import com.uber.cadence.workflow.WorkflowMethod; -import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.UUID; import java.util.concurrent.CancellationException; @@ -73,7 +71,6 @@ import org.junit.runner.Description; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.slf4j.MDC; public class WorkflowTestingTest { private static final Logger log = LoggerFactory.getLogger(WorkflowTestingTest.class); @@ -99,7 +96,8 @@ public void setUp() { new TestEnvironmentOptions.Builder() .setWorkflowClientOptions( WorkflowClientOptions.newBuilder() - .setContextPropagators(Collections.singletonList(new TestContextPropagator())) + .setContextPropagators( + Collections.singletonList(new ContextTests.TestContextPropagator())) .build()) .build(); testEnvironment = TestWorkflowEnvironment.newInstance(options); @@ -739,244 +737,4 @@ public void testMockedChildSimulatedTimeout() { assertTrue(e.getCause() instanceof ChildWorkflowTimedOutException); } } - - public static class TestContextPropagator implements ContextPropagator { - - @Override - public String getName() { - return this.getClass().getName(); - } - - @Override - public Map serializeContext(Object context) { - String testKey = (String) context; - if (testKey != null) { - return Collections.singletonMap("test", testKey.getBytes(StandardCharsets.UTF_8)); - } else { - return Collections.emptyMap(); - } - } - - @Override - public Object deserializeContext(Map context) { - if (context.containsKey("test")) { - return new String(context.get("test"), StandardCharsets.UTF_8); - } else { - return null; - } - } - - @Override - public Object getCurrentContext() { - return MDC.get("test"); - } - - @Override - public void setCurrentContext(Object context) { - MDC.put("test", String.valueOf(context)); - } - } - - public static class ContextPropagationWorkflowImpl implements TestWorkflow { - - @Override - public String workflow1(String input) { - // The test value should be in the MDC - return MDC.get("test"); - } - } - - @Test - public void testWorkflowContextPropagation() { - Worker worker = testEnvironment.newWorker(TASK_LIST); - worker.registerWorkflowImplementationTypes(ContextPropagationWorkflowImpl.class); - testEnvironment.start(); - MDC.put("test", "testing123"); - WorkflowClient client = testEnvironment.newWorkflowClient(); - WorkflowOptions options = - new WorkflowOptions.Builder() - .setContextPropagators(Collections.singletonList(new TestContextPropagator())) - .build(); - TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options); - String result = workflow.workflow1("input1"); - assertEquals("testing123", result); - } - - public static class ContextPropagationParentWorkflowImpl implements ParentWorkflow { - - @Override - public String workflow(String input) { - // Get the MDC value - String mdcValue = MDC.get("test"); - - // Fire up a child workflow - ChildWorkflowOptions options = - new ChildWorkflowOptions.Builder() - .setContextPropagators(Collections.singletonList(new TestContextPropagator())) - .build(); - ChildWorkflow child = Workflow.newChildWorkflowStub(ChildWorkflow.class, options); - - String result = child.workflow(mdcValue, Workflow.getWorkflowInfo().getWorkflowId()); - return result; - } - - @Override - public void signal(String value) {} - } - - public static class ContextPropagationChildWorkflowImpl implements ChildWorkflow { - - @Override - public String workflow(String input, String parentId) { - String mdcValue = MDC.get("test"); - return input + mdcValue; - } - } - - @Test - public void testChildWorkflowContextPropagation() { - Worker worker = testEnvironment.newWorker(TASK_LIST); - worker.registerWorkflowImplementationTypes( - ContextPropagationParentWorkflowImpl.class, ContextPropagationChildWorkflowImpl.class); - testEnvironment.start(); - MDC.put("test", "testing123"); - WorkflowClient client = testEnvironment.newWorkflowClient(); - WorkflowOptions options = - new WorkflowOptions.Builder() - .setContextPropagators(Collections.singletonList(new TestContextPropagator())) - .build(); - ParentWorkflow workflow = client.newWorkflowStub(ParentWorkflow.class, options); - String result = workflow.workflow("input1"); - assertEquals("testing123testing123", result); - } - - public static class ContextPropagationThreadWorkflowImpl implements TestWorkflow { - - @Override - public String workflow1(String input) { - Promise asyncPromise = Async.function(this::async); - return asyncPromise.get(); - } - - private String async() { - return "async" + MDC.get("test"); - } - } - - @Test - public void testThreadContextPropagation() { - Worker worker = testEnvironment.newWorker(TASK_LIST); - worker.registerWorkflowImplementationTypes(ContextPropagationThreadWorkflowImpl.class); - testEnvironment.start(); - MDC.put("test", "testing123"); - WorkflowClient client = testEnvironment.newWorkflowClient(); - WorkflowOptions options = - new WorkflowOptions.Builder() - .setContextPropagators(Collections.singletonList(new TestContextPropagator())) - .build(); - TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options); - String result = workflow.workflow1("input1"); - assertEquals("asynctesting123", result); - } - - public static class ContextActivityImpl implements TestActivity { - @Override - public String activity1(String input) { - return "activity" + MDC.get("test"); - } - } - - public static class ContextPropagationActivityWorkflowImpl implements TestWorkflow { - @Override - public String workflow1(String input) { - ActivityOptions options = - new ActivityOptions.Builder() - .setScheduleToCloseTimeout(Duration.ofSeconds(5)) - .setContextPropagators(Collections.singletonList(new TestContextPropagator())) - .build(); - TestActivity activity = Workflow.newActivityStub(TestActivity.class, options); - return activity.activity1("foo"); - } - } - - @Test - public void testActivityContextPropagation() { - Worker worker = testEnvironment.newWorker(TASK_LIST); - worker.registerWorkflowImplementationTypes(ContextPropagationActivityWorkflowImpl.class); - worker.registerActivitiesImplementations(new ContextActivityImpl()); - testEnvironment.start(); - MDC.put("test", "testing123"); - WorkflowClient client = testEnvironment.newWorkflowClient(); - WorkflowOptions options = - new WorkflowOptions.Builder() - .setContextPropagators(Collections.singletonList(new TestContextPropagator())) - .build(); - TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options); - String result = workflow.workflow1("input1"); - assertEquals("activitytesting123", result); - } - - public static class DefaultContextPropagationActivityWorkflowImpl implements TestWorkflow { - @Override - public String workflow1(String input) { - ActivityOptions options = - new ActivityOptions.Builder().setScheduleToCloseTimeout(Duration.ofSeconds(5)).build(); - TestActivity activity = Workflow.newActivityStub(TestActivity.class, options); - return activity.activity1("foo"); - } - } - - @Test - public void testDefaultActivityContextPropagation() { - Worker worker = testEnvironment.newWorker(TASK_LIST); - worker.registerWorkflowImplementationTypes(DefaultContextPropagationActivityWorkflowImpl.class); - worker.registerActivitiesImplementations(new ContextActivityImpl()); - testEnvironment.start(); - MDC.put("test", "testing123"); - WorkflowClient client = testEnvironment.newWorkflowClient(); - WorkflowOptions options = - new WorkflowOptions.Builder() - .setContextPropagators(Collections.singletonList(new TestContextPropagator())) - .build(); - TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options); - String result = workflow.workflow1("input1"); - assertEquals("activitytesting123", result); - } - - public static class DefaultContextPropagationParentWorkflowImpl implements ParentWorkflow { - - @Override - public String workflow(String input) { - // Get the MDC value - String mdcValue = MDC.get("test"); - - // Fire up a child workflow - ChildWorkflowOptions options = new ChildWorkflowOptions.Builder().build(); - ChildWorkflow child = Workflow.newChildWorkflowStub(ChildWorkflow.class, options); - - String result = child.workflow(mdcValue, Workflow.getWorkflowInfo().getWorkflowId()); - return result; - } - - @Override - public void signal(String value) {} - } - - @Test - public void testDefaultChildWorkflowContextPropagation() { - Worker worker = testEnvironment.newWorker(TASK_LIST); - worker.registerWorkflowImplementationTypes( - DefaultContextPropagationParentWorkflowImpl.class, - ContextPropagationChildWorkflowImpl.class); - testEnvironment.start(); - MDC.put("test", "testing123"); - WorkflowClient client = testEnvironment.newWorkflowClient(); - WorkflowOptions options = - new WorkflowOptions.Builder() - .setContextPropagators(Collections.singletonList(new TestContextPropagator())) - .build(); - ParentWorkflow workflow = client.newWorkflowStub(ParentWorkflow.class, options); - String result = workflow.workflow("input1"); - assertEquals("testing123testing123", result); - } } diff --git a/src/test/java/com/uber/cadence/workerFactory/WorkerFactoryTests.java b/src/test/java/com/uber/cadence/workerFactory/WorkerFactoryTests.java index 3b6135f60..eb105a4e3 100644 --- a/src/test/java/com/uber/cadence/workerFactory/WorkerFactoryTests.java +++ b/src/test/java/com/uber/cadence/workerFactory/WorkerFactoryTests.java @@ -17,8 +17,7 @@ package com.uber.cadence.workerFactory; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; import com.uber.cadence.client.WorkflowClient; import com.uber.cadence.serviceclient.ClientOptions; @@ -65,6 +64,11 @@ public void whenAFactoryIsStartedAllWorkersStart() { factory.start(); assertTrue(factory.isStarted()); + try { + assertTrue(factory.isHealthy().get()); + } catch (Exception e) { + assertNull("Failed to check if cluster is health!", e); + } factory.shutdown(); factory.awaitTermination(1, TimeUnit.SECONDS); } diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml index b87fe0374..19ed8e82a 100644 --- a/src/test/resources/logback-test.xml +++ b/src/test/resources/logback-test.xml @@ -28,4 +28,4 @@ - \ No newline at end of file + From dd73a620bc902b73808d97023ee4a0fffe484d20 Mon Sep 17 00:00:00 2001 From: Matt Anger Date: Fri, 4 Jun 2021 15:59:11 -0700 Subject: [PATCH 02/12] new files --- .../OpenTelemetryContextPropagator.java | 144 +++++ .../uber/cadence/context/ContextTests.java | 514 ++++++++++++++++++ 2 files changed, 658 insertions(+) create mode 100644 src/main/java/com/uber/cadence/context/OpenTelemetryContextPropagator.java create mode 100644 src/test/java/com/uber/cadence/context/ContextTests.java diff --git a/src/main/java/com/uber/cadence/context/OpenTelemetryContextPropagator.java b/src/main/java/com/uber/cadence/context/OpenTelemetryContextPropagator.java new file mode 100644 index 000000000..927d3c71f --- /dev/null +++ b/src/main/java/com/uber/cadence/context/OpenTelemetryContextPropagator.java @@ -0,0 +1,144 @@ +/* + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.uber.cadence.context; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.baggage.Baggage; +import io.opentelemetry.api.baggage.propagation.W3CBaggagePropagator; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.context.propagation.TextMapGetter; +import io.opentelemetry.context.propagation.TextMapPropagator; +import io.opentelemetry.context.propagation.TextMapSetter; +import java.nio.charset.Charset; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nullable; +import org.slf4j.MDC; + +public class OpenTelemetryContextPropagator implements ContextPropagator { + + private static final TextMapPropagator w3cTraceContextPropagator = + W3CTraceContextPropagator.getInstance(); + private static final TextMapPropagator w3cBaggagePropagator = W3CBaggagePropagator.getInstance(); + private static ThreadLocal currentContextOtelScope = new ThreadLocal<>(); + private static ThreadLocal currentOtelSpan = new ThreadLocal<>(); + private static ThreadLocal currentOtelScope = new ThreadLocal<>(); + private static ThreadLocal> otelKeySet = new ThreadLocal<>(); + private static final TextMapSetter> setter = Map::put; + private static final TextMapGetter> getter = + new TextMapGetter>() { + @Override + public Iterable keys(Map carrier) { + return otelKeySet.get(); + } + + @Nullable + @Override + public String get(Map carrier, String key) { + return MDC.get(key); + } + }; + + @Override + public String getName() { + return "OpenTelemetry"; + } + + @Override + public Map serializeContext(Object context) { + Map serializedContext = new HashMap<>(); + Map contextMap = (Map) context; + if (contextMap != null) { + for (Map.Entry entry : contextMap.entrySet()) { + serializedContext.put(entry.getKey(), entry.getValue().getBytes(Charset.defaultCharset())); + } + } + return serializedContext; + } + + @Override + public Object deserializeContext(Map context) { + Map contextMap = new HashMap<>(); + for (Map.Entry entry : context.entrySet()) { + contextMap.put(entry.getKey(), new String(entry.getValue(), Charset.defaultCharset())); + } + return contextMap; + } + + @Override + public Object getCurrentContext() { + Map carrier = new HashMap<>(); + w3cTraceContextPropagator.inject(Context.current(), carrier, setter); + w3cBaggagePropagator.inject(Context.current(), carrier, setter); + return carrier; + } + + @Override + public void setCurrentContext(Object context) { + Map contextMap = (Map) context; + if (contextMap != null) { + for (Map.Entry entry : contextMap.entrySet()) { + MDC.put(entry.getKey(), entry.getValue()); + } + otelKeySet.set(contextMap.keySet()); + } + } + + @Override + @SuppressWarnings("MustBeClosedChecker") + public void setUp() { + Context context = + Baggage.fromContext(w3cBaggagePropagator.extract(Context.current(), null, getter)) + .toBuilder() + .build() + .storeInContext(w3cTraceContextPropagator.extract(Context.current(), null, getter)); + + currentContextOtelScope.set(context.makeCurrent()); + + Span span = + GlobalOpenTelemetry.getTracer("cadence-client") + .spanBuilder("cadence.workflow") + .setParent(context) + .setSpanKind(SpanKind.CLIENT) + .startSpan(); + + Scope scope = span.makeCurrent(); + currentOtelSpan.set(span); + currentOtelScope.set(scope); + } + + @Override + public void finish() { + Scope scope = currentOtelScope.get(); + if (scope != null) { + scope.close(); + } + Span span = currentOtelSpan.get(); + if (span != null) { + span.end(); + } + Scope contextScope = currentContextOtelScope.get(); + if (contextScope != null) { + contextScope.close(); + } + } +} diff --git a/src/test/java/com/uber/cadence/context/ContextTests.java b/src/test/java/com/uber/cadence/context/ContextTests.java new file mode 100644 index 000000000..54da2adc3 --- /dev/null +++ b/src/test/java/com/uber/cadence/context/ContextTests.java @@ -0,0 +1,514 @@ +/* + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.uber.cadence.context; + +import static org.junit.Assert.*; + +import com.uber.cadence.activity.ActivityMethod; +import com.uber.cadence.activity.ActivityOptions; +import com.uber.cadence.client.WorkflowClient; +import com.uber.cadence.client.WorkflowClientOptions; +import com.uber.cadence.client.WorkflowException; +import com.uber.cadence.client.WorkflowOptions; +import com.uber.cadence.internal.testing.WorkflowTestingTest.ChildWorkflow; +import com.uber.cadence.testing.TestEnvironmentOptions; +import com.uber.cadence.testing.TestWorkflowEnvironment; +import com.uber.cadence.worker.Worker; +import com.uber.cadence.workflow.Async; +import com.uber.cadence.workflow.ChildWorkflowOptions; +import com.uber.cadence.workflow.Promise; +import com.uber.cadence.workflow.SignalMethod; +import com.uber.cadence.workflow.Workflow; +import com.uber.cadence.workflow.WorkflowMethod; +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.baggage.Baggage; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.TraceFlags; +import io.opentelemetry.api.trace.TraceState; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.rules.Timeout; +import org.junit.runner.Description; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; + +public class ContextTests { + private static final Logger log = LoggerFactory.getLogger(ContextTests.class); + + @Rule public Timeout globalTimeout = Timeout.seconds(5000); + + @Rule + public TestWatcher watchman = + new TestWatcher() { + @Override + protected void failed(Throwable e, Description description) { + System.err.println(testEnvironment.getDiagnostics()); + } + }; + + private static final String TASK_LIST = "test-workflow"; + + private TestWorkflowEnvironment testEnvironment; + + @Before + public void setUp() { + WorkflowClientOptions clientOptions = + WorkflowClientOptions.newBuilder() + .setContextPropagators( + Arrays.asList(new TestContextPropagator(), new OpenTelemetryContextPropagator())) + .build(); + TestEnvironmentOptions options = + new TestEnvironmentOptions.Builder().setWorkflowClientOptions(clientOptions).build(); + testEnvironment = TestWorkflowEnvironment.newInstance(options); + } + + @After + public void tearDown() { + testEnvironment.close(); + } + + public interface TestWorkflow { + + @WorkflowMethod(executionStartToCloseTimeoutSeconds = 3600 * 24, taskList = TASK_LIST) + String workflow1(String input); + } + + public interface ParentWorkflow { + + @WorkflowMethod(executionStartToCloseTimeoutSeconds = 3600 * 24, taskList = TASK_LIST) + String workflow(String input); + + @SignalMethod + void signal(String value); + } + + public interface TestActivity { + + @ActivityMethod(scheduleToCloseTimeoutSeconds = 3600) + String activity1(String input); + } + + public static class TestContextPropagator implements ContextPropagator { + + @Override + public String getName() { + return "TestContextPropagator::withSomeColons"; + } + + @Override + public Map serializeContext(Object context) { + String testKey = (String) context; + if (testKey != null) { + return Collections.singletonMap("test", testKey.getBytes(StandardCharsets.UTF_8)); + } else { + return Collections.emptyMap(); + } + } + + @Override + public Object deserializeContext(Map context) { + if (context.containsKey("test")) { + return new String(context.get("test"), StandardCharsets.UTF_8); + } else { + return null; + } + } + + @Override + public Object getCurrentContext() { + return MDC.get("test"); + } + + @Override + public void setCurrentContext(Object context) { + MDC.put("test", String.valueOf(context)); + } + } + + public static class ContextPropagationWorkflowImpl implements TestWorkflow { + + @Override + public String workflow1(String input) { + // The test value should be in the MDC + return MDC.get("test"); + } + } + + @Test() + public void testWorkflowContextPropagation() { + Worker worker = testEnvironment.newWorker(TASK_LIST); + worker.registerWorkflowImplementationTypes(ContextPropagationWorkflowImpl.class); + testEnvironment.start(); + MDC.put("test", "testing123"); + WorkflowClient client = testEnvironment.newWorkflowClient(); + WorkflowOptions options = + new WorkflowOptions.Builder() + .setContextPropagators(Collections.singletonList(new TestContextPropagator())) + .build(); + TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options); + String result = workflow.workflow1("input1"); + assertEquals("testing123", result); + } + + public static class ContextPropagationParentWorkflowImpl implements ParentWorkflow { + + @Override + public String workflow(String input) { + // Get the MDC value + String mdcValue = MDC.get("test"); + + // Fire up a child workflow + ChildWorkflowOptions options = + new ChildWorkflowOptions.Builder() + .setContextPropagators(Collections.singletonList(new TestContextPropagator())) + .build(); + ChildWorkflow child = Workflow.newChildWorkflowStub(ChildWorkflow.class, options); + + String result = child.workflow(mdcValue, Workflow.getWorkflowInfo().getWorkflowId()); + return result; + } + + @Override + public void signal(String value) {} + } + + public static class ContextPropagationChildWorkflowImpl implements ChildWorkflow { + + @Override + public String workflow(String input, String parentId) { + String mdcValue = MDC.get("test"); + return input + mdcValue; + } + } + + @Test + public void testChildWorkflowContextPropagation() { + Worker worker = testEnvironment.newWorker(TASK_LIST); + worker.registerWorkflowImplementationTypes( + ContextPropagationParentWorkflowImpl.class, ContextPropagationChildWorkflowImpl.class); + testEnvironment.start(); + MDC.put("test", "testing123"); + WorkflowClient client = testEnvironment.newWorkflowClient(); + WorkflowOptions options = + new WorkflowOptions.Builder() + .setContextPropagators(Collections.singletonList(new TestContextPropagator())) + .build(); + ParentWorkflow workflow = client.newWorkflowStub(ParentWorkflow.class, options); + String result = workflow.workflow("input1"); + assertEquals("testing123testing123", result); + } + + public static class ContextPropagationThreadWorkflowImpl implements TestWorkflow { + + @Override + public String workflow1(String input) { + Promise asyncPromise = Async.function(this::async); + return asyncPromise.get(); + } + + private String async() { + return "async" + MDC.get("test"); + } + } + + @Test + public void testThreadContextPropagation() { + Worker worker = testEnvironment.newWorker(TASK_LIST); + worker.registerWorkflowImplementationTypes(ContextPropagationThreadWorkflowImpl.class); + testEnvironment.start(); + MDC.put("test", "testing123"); + WorkflowClient client = testEnvironment.newWorkflowClient(); + WorkflowOptions options = + new WorkflowOptions.Builder() + .setContextPropagators(Collections.singletonList(new TestContextPropagator())) + .build(); + TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options); + String result = workflow.workflow1("input1"); + assertEquals("asynctesting123", result); + } + + public static class ContextActivityImpl implements TestActivity { + @Override + public String activity1(String input) { + return "activity" + MDC.get("test"); + } + } + + public static class ContextPropagationActivityWorkflowImpl implements TestWorkflow { + @Override + public String workflow1(String input) { + ActivityOptions options = + new ActivityOptions.Builder() + .setScheduleToCloseTimeout(Duration.ofSeconds(5)) + .setContextPropagators(Collections.singletonList(new TestContextPropagator())) + .build(); + TestActivity activity = Workflow.newActivityStub(TestActivity.class, options); + return activity.activity1("foo"); + } + } + + @Test + public void testActivityContextPropagation() { + Worker worker = testEnvironment.newWorker(TASK_LIST); + worker.registerWorkflowImplementationTypes(ContextPropagationActivityWorkflowImpl.class); + worker.registerActivitiesImplementations(new ContextActivityImpl()); + testEnvironment.start(); + MDC.put("test", "testing123"); + WorkflowClient client = testEnvironment.newWorkflowClient(); + WorkflowOptions options = + new WorkflowOptions.Builder() + .setContextPropagators(Collections.singletonList(new TestContextPropagator())) + .build(); + TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options); + String result = workflow.workflow1("input1"); + assertEquals("activitytesting123", result); + } + + public static class DefaultContextPropagationActivityWorkflowImpl implements TestWorkflow { + @Override + public String workflow1(String input) { + ActivityOptions options = + new ActivityOptions.Builder().setScheduleToCloseTimeout(Duration.ofSeconds(5)).build(); + TestActivity activity = Workflow.newActivityStub(TestActivity.class, options); + return activity.activity1("foo"); + } + } + + @Test + public void testDefaultActivityContextPropagation() { + Worker worker = testEnvironment.newWorker(TASK_LIST); + worker.registerWorkflowImplementationTypes(DefaultContextPropagationActivityWorkflowImpl.class); + worker.registerActivitiesImplementations(new ContextActivityImpl()); + testEnvironment.start(); + MDC.put("test", "testing123"); + WorkflowClient client = testEnvironment.newWorkflowClient(); + WorkflowOptions options = + new WorkflowOptions.Builder() + .setContextPropagators(Collections.singletonList(new TestContextPropagator())) + .build(); + TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options); + String result = workflow.workflow1("input1"); + assertEquals("activitytesting123", result); + } + + public static class DefaultContextPropagationParentWorkflowImpl implements ParentWorkflow { + + @Override + public String workflow(String input) { + // Get the MDC value + String mdcValue = MDC.get("test"); + + // Fire up a child workflow + ChildWorkflowOptions options = new ChildWorkflowOptions.Builder().build(); + ChildWorkflow child = Workflow.newChildWorkflowStub(ChildWorkflow.class, options); + + String result = child.workflow(mdcValue, Workflow.getWorkflowInfo().getWorkflowId()); + return result; + } + + @Override + public void signal(String value) {} + } + + @Test + public void testDefaultChildWorkflowContextPropagation() { + Worker worker = testEnvironment.newWorker(TASK_LIST); + worker.registerWorkflowImplementationTypes( + DefaultContextPropagationParentWorkflowImpl.class, + ContextPropagationChildWorkflowImpl.class); + testEnvironment.start(); + MDC.put("test", "testing123"); + WorkflowClient client = testEnvironment.newWorkflowClient(); + WorkflowOptions options = + new WorkflowOptions.Builder() + .setContextPropagators(Collections.singletonList(new TestContextPropagator())) + .build(); + ParentWorkflow workflow = client.newWorkflowStub(ParentWorkflow.class, options); + String result = workflow.workflow("input1"); + assertEquals("testing123testing123", result); + } + + public static class OpenTelemetryContextPropagationWorkflowImpl implements TestWorkflow { + @Override + public String workflow1(String input) { + if ("fail".equals(input)) { + throw new IllegalArgumentException(); + } else if ("baggage".equals(input)) { + return Baggage.current().toString(); + } else { + SpanContext ctx = Span.current().getSpanContext(); + return ctx.getTraceId() + ":" + ctx.getSpanId() + ":" + ctx.getTraceState().toString(); + } + } + } + + @Test + public void testOpenTelemetryContextPropagation() { + TraceState TRACE_STATE = TraceState.builder().put("foo", "bar").build(); + String TRACE_ID_BASE16 = "ff000000000000000000000000000041"; + String SPAN_ID_BASE16 = "ff00000000000041"; + + Context ctx = + Context.current() + .with( + Span.wrap( + SpanContext.create( + TRACE_ID_BASE16, SPAN_ID_BASE16, TraceFlags.getDefault(), TRACE_STATE))); + + Span span = + GlobalOpenTelemetry.getTracer("test-tracer") + .spanBuilder("test-span") + .setParent(ctx) + .setSpanKind(SpanKind.CLIENT) + .startSpan(); + + Worker worker = testEnvironment.newWorker(TASK_LIST); + worker.registerWorkflowImplementationTypes(OpenTelemetryContextPropagationWorkflowImpl.class); + testEnvironment.start(); + WorkflowClient client = testEnvironment.newWorkflowClient(); + WorkflowOptions options = + new WorkflowOptions.Builder() + .setContextPropagators( + Arrays.asList(new TestContextPropagator(), new OpenTelemetryContextPropagator())) + .build(); + + try (Scope scope = span.makeCurrent()) { + TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options); + assertEquals( + TRACE_ID_BASE16 + ":" + SPAN_ID_BASE16 + ":" + TRACE_STATE.toString(), + workflow.workflow1("success")); + } + + try (Scope scope = span.makeCurrent()) { + TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options); + workflow.workflow1("fail"); + } catch (WorkflowException e) { + assertEquals(IllegalArgumentException.class, e.getCause().getClass()); + } finally { + span.end(); + } + } + + @Test + public void testDefaultOpenTelemetryContextPropagation() { + TraceState TRACE_STATE = TraceState.builder().put("foo", "bar").build(); + String TRACE_ID_BASE16 = "ff000000000000000000000000000041"; + String SPAN_ID_BASE16 = "ff00000000000041"; + + Context ctx = + Context.current() + .with( + Span.wrap( + SpanContext.create( + TRACE_ID_BASE16, SPAN_ID_BASE16, TraceFlags.getDefault(), TRACE_STATE))); + + Span span = + GlobalOpenTelemetry.getTracer("test-tracer") + .spanBuilder("test-span") + .setParent(ctx) + .setSpanKind(SpanKind.CLIENT) + .startSpan(); + + Worker worker = testEnvironment.newWorker(TASK_LIST); + worker.registerWorkflowImplementationTypes(OpenTelemetryContextPropagationWorkflowImpl.class); + testEnvironment.start(); + WorkflowClient client = testEnvironment.newWorkflowClient(); + WorkflowOptions options = new WorkflowOptions.Builder().build(); + + try (Scope scope = span.makeCurrent()) { + TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options); + assertEquals( + TRACE_ID_BASE16 + ":" + SPAN_ID_BASE16 + ":" + TRACE_STATE.toString(), + workflow.workflow1("success")); + } + + try (Scope scope = span.makeCurrent()) { + TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options); + workflow.workflow1("fail"); + } catch (WorkflowException e) { + assertEquals(IllegalArgumentException.class, e.getCause().getClass()); + } finally { + span.end(); + } + } + + @Test + public void testNoDefaultOpenTelemetryContextPropagation() { + TraceState TRACE_STATE = TraceState.builder().put("foo", "bar").build(); + String TRACE_ID_BASE16 = "ff000000000000000000000000000041"; + String SPAN_ID_BASE16 = "ff00000000000041"; + + Context ctx = + Context.current() + .with( + Span.wrap( + SpanContext.create( + TRACE_ID_BASE16, SPAN_ID_BASE16, TraceFlags.getDefault(), TRACE_STATE))); + + Span span = + GlobalOpenTelemetry.getTracer("test-tracer") + .spanBuilder("test-span") + .setParent(ctx) + .setSpanKind(SpanKind.CLIENT) + .startSpan(); + + Worker worker = testEnvironment.newWorker(TASK_LIST); + worker.registerWorkflowImplementationTypes(OpenTelemetryContextPropagationWorkflowImpl.class); + testEnvironment.start(); + WorkflowClient client = testEnvironment.newWorkflowClient(); + WorkflowOptions options = + new WorkflowOptions.Builder().setDefaultContextPropagators(false).build(); + + try (Scope scope = span.makeCurrent()) { + TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options); + assertNotEquals( + TRACE_ID_BASE16 + ":" + SPAN_ID_BASE16 + ":" + TRACE_STATE.toString(), + workflow.workflow1("success")); + } + } + + @Test + public void testBaggagePropagation() { + Baggage baggage = + Baggage.builder().put("keyFoo1", "valueFoo1").put("keyFoo2", "valueFoo2").build(); + + Worker worker = testEnvironment.newWorker(TASK_LIST); + worker.registerWorkflowImplementationTypes(OpenTelemetryContextPropagationWorkflowImpl.class); + testEnvironment.start(); + WorkflowClient client = testEnvironment.newWorkflowClient(); + WorkflowOptions options = new WorkflowOptions.Builder().build(); + + try (Scope scope = Context.current().with(baggage).makeCurrent()) { + TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options); + assertEquals(baggage.toString(), workflow.workflow1("baggage")); + } + } +} From f6ced94cab092fda6756ee7191390024d95f7772 Mon Sep 17 00:00:00 2001 From: Matt Anger Date: Fri, 4 Jun 2021 16:05:50 -0700 Subject: [PATCH 03/12] undo defaultContextProp changes --- .../uber/cadence/client/WorkflowOptions.java | 48 ++----------------- 1 file changed, 5 insertions(+), 43 deletions(-) diff --git a/src/main/java/com/uber/cadence/client/WorkflowOptions.java b/src/main/java/com/uber/cadence/client/WorkflowOptions.java index 9ab9c60ce..3d75647ec 100644 --- a/src/main/java/com/uber/cadence/client/WorkflowOptions.java +++ b/src/main/java/com/uber/cadence/client/WorkflowOptions.java @@ -30,11 +30,9 @@ import com.uber.cadence.common.MethodRetry; import com.uber.cadence.common.RetryOptions; import com.uber.cadence.context.ContextPropagator; -import com.uber.cadence.context.OpenTelemetryContextPropagator; import com.uber.cadence.internal.common.OptionsUtils; import com.uber.cadence.workflow.WorkflowMethod; import java.time.Duration; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Objects; @@ -68,7 +66,6 @@ public static WorkflowOptions merge( .setMemo(o.getMemo()) .setSearchAttributes(o.getSearchAttributes()) .setContextPropagators(o.getContextPropagators()) - .setDefaultContextPropagators(o.useDefaultContextPropagators) .validateBuildWithDefaults(); } @@ -94,8 +91,6 @@ public static final class Builder { private List contextPropagators; - private Boolean useDefaultContextPropagators; - public Builder() {} public Builder(WorkflowOptions o) { @@ -112,7 +107,6 @@ public Builder(WorkflowOptions o) { this.memo = o.memo; this.searchAttributes = o.searchAttributes; this.contextPropagators = o.contextPropagators; - this.useDefaultContextPropagators = o.useDefaultContextPropagators; } /** @@ -220,13 +214,6 @@ public Builder setContextPropagators(List contextPropagators) return this; } - /** Specifies that the default context propagators should not be used. */ - public Builder setDefaultContextPropagators(Boolean useDefaultContextPropagators) { - this.useDefaultContextPropagators = - (useDefaultContextPropagators == null || useDefaultContextPropagators); - return this; - } - public WorkflowOptions build() { return new WorkflowOptions( workflowId, @@ -238,8 +225,7 @@ public WorkflowOptions build() { cronSchedule, memo, searchAttributes, - contextPropagators, - useDefaultContextPropagators); + contextPropagators); } /** @@ -275,20 +261,6 @@ public WorkflowOptions validateBuildWithDefaults() { cron.validate(); } - if (useDefaultContextPropagators == null || useDefaultContextPropagators) { - // Add OpenTelemetry propagator if not already present. - if (contextPropagators != null) { - contextPropagators = new ArrayList(contextPropagators); - } else { - contextPropagators = new ArrayList<>(); - } - - OpenTelemetryContextPropagator otelPropagator = new OpenTelemetryContextPropagator(); - if (!contextPropagators.contains(otelPropagator)) { - contextPropagators.add(otelPropagator); - } - } - return new WorkflowOptions( workflowId, policy, @@ -300,8 +272,7 @@ public WorkflowOptions validateBuildWithDefaults() { cronSchedule, memo, searchAttributes, - contextPropagators, - useDefaultContextPropagators); + contextPropagators); } } @@ -325,8 +296,6 @@ public WorkflowOptions validateBuildWithDefaults() { private List contextPropagators; - private Boolean useDefaultContextPropagators; - private WorkflowOptions( String workflowId, WorkflowIdReusePolicy workflowIdReusePolicy, @@ -337,8 +306,7 @@ private WorkflowOptions( String cronSchedule, Map memo, Map searchAttributes, - List contextPropagators, - Boolean useDefaultContextPropagators) { + List contextPropagators) { this.workflowId = workflowId; this.workflowIdReusePolicy = workflowIdReusePolicy; this.executionStartToCloseTimeout = executionStartToCloseTimeout; @@ -349,7 +317,6 @@ private WorkflowOptions( this.memo = memo; this.searchAttributes = searchAttributes; this.contextPropagators = contextPropagators; - this.useDefaultContextPropagators = useDefaultContextPropagators; } public String getWorkflowId() { @@ -406,9 +373,7 @@ public boolean equals(Object o) { && Objects.equals(cronSchedule, that.cronSchedule) && Objects.equals(memo, that.memo) && Objects.equals(searchAttributes, that.searchAttributes) - && Objects.equals(contextPropagators, that.contextPropagators) - && (useDefaultContextPropagators == null || useDefaultContextPropagators) - == (that.useDefaultContextPropagators == null || that.useDefaultContextPropagators); + && Objects.equals(contextPropagators, that.contextPropagators); } @Override @@ -423,8 +388,7 @@ public int hashCode() { cronSchedule, memo, searchAttributes, - contextPropagators, - useDefaultContextPropagators); + contextPropagators); } @Override @@ -454,8 +418,6 @@ public String toString() { + searchAttributes + ", contextPropagators='" + contextPropagators - + ", useDefaultContextPropagators='" - + useDefaultContextPropagators + '\'' + '}'; } From f867d9b5ddfa9068e7421fe080081698690bdb1c Mon Sep 17 00:00:00 2001 From: Matt Anger Date: Mon, 7 Jun 2021 14:04:02 -0500 Subject: [PATCH 04/12] remove bad calls to method no longer in this pr --- src/test/java/com/uber/cadence/client/WorkflowOptionsTest.java | 1 - src/test/java/com/uber/cadence/context/ContextTests.java | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/test/java/com/uber/cadence/client/WorkflowOptionsTest.java b/src/test/java/com/uber/cadence/client/WorkflowOptionsTest.java index fae572646..050a307b8 100644 --- a/src/test/java/com/uber/cadence/client/WorkflowOptionsTest.java +++ b/src/test/java/com/uber/cadence/client/WorkflowOptionsTest.java @@ -46,7 +46,6 @@ public void testOnlyOptionsAndEmptyAnnotationsPresent() throws NoSuchMethodExcep .setWorkflowIdReusePolicy(WorkflowIdReusePolicy.RejectDuplicate) .setMemo(getTestMemo()) .setSearchAttributes(getTestSearchAttributes()) - .setDefaultContextPropagators(false) .build(); WorkflowMethod a = WorkflowOptionsTest.class diff --git a/src/test/java/com/uber/cadence/context/ContextTests.java b/src/test/java/com/uber/cadence/context/ContextTests.java index 54da2adc3..723f75dcf 100644 --- a/src/test/java/com/uber/cadence/context/ContextTests.java +++ b/src/test/java/com/uber/cadence/context/ContextTests.java @@ -485,7 +485,7 @@ public void testNoDefaultOpenTelemetryContextPropagation() { testEnvironment.start(); WorkflowClient client = testEnvironment.newWorkflowClient(); WorkflowOptions options = - new WorkflowOptions.Builder().setDefaultContextPropagators(false).build(); + new WorkflowOptions.Builder().build(); try (Scope scope = span.makeCurrent()) { TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options); From 5da79ac933d1518b69bced5ececb220aae527173 Mon Sep 17 00:00:00 2001 From: Matt Anger Date: Tue, 15 Jun 2021 09:53:03 -0700 Subject: [PATCH 05/12] remove all context prop --- build.gradle | 1 - .../cadence/context/ContextPropagator.java | 30 - .../OpenTelemetryContextPropagator.java | 144 ----- .../internal/context/ContextThreadLocal.java | 53 +- .../internal/replay/WorkflowContext.java | 14 +- .../internal/sync/SyncDecisionContext.java | 14 +- .../internal/sync/WorkflowStubImpl.java | 14 +- .../internal/sync/WorkflowThreadImpl.java | 6 - .../internal/worker/ActivityWorker.java | 31 +- .../internal/worker/LocalActivityWorker.java | 20 +- .../java/com/uber/cadence/worker/Worker.java | 10 - .../uber/cadence/context/ContextTests.java | 514 ------------------ .../internal/testing/WorkflowTestingTest.java | 250 ++++++++- 13 files changed, 256 insertions(+), 845 deletions(-) delete mode 100644 src/main/java/com/uber/cadence/context/OpenTelemetryContextPropagator.java delete mode 100644 src/test/java/com/uber/cadence/context/ContextTests.java diff --git a/build.gradle b/build.gradle index 0fed3a527..9ec15bbf9 100644 --- a/build.gradle +++ b/build.gradle @@ -62,7 +62,6 @@ dependencies { compile group: 'com.cronutils', name: 'cron-utils', version: '9.0.0' compile group: 'io.micrometer', name: 'micrometer-core', version: '1.1.2' compile group: 'javax.annotation', name: 'javax.annotation-api', version: '1.3.2' - compile group: 'io.opentelemetry', name: 'opentelemetry-sdk', version: '1.1.0' testCompile group: 'junit', name: 'junit', version: '4.12' testCompile group: 'com.googlecode.junit-toolbox', name: 'junit-toolbox', version: '2.4' diff --git a/src/main/java/com/uber/cadence/context/ContextPropagator.java b/src/main/java/com/uber/cadence/context/ContextPropagator.java index d302ffdaa..3a618f96c 100644 --- a/src/main/java/com/uber/cadence/context/ContextPropagator.java +++ b/src/main/java/com/uber/cadence/context/ContextPropagator.java @@ -23,9 +23,6 @@ * Context Propagators are used to propagate information from workflow to activity, workflow to * child workflow, and workflow to child thread (using {@link com.uber.cadence.workflow.Async}). * - *

It is important to note that all threads share one ContextPropagator instance, so your - * implementation must be thread-safe and store any state in ThreadLocal variables. - * *

A sample ContextPropagator that copies all {@link org.slf4j.MDC} entries starting * with a given prefix along the code path looks like this: * @@ -139,31 +136,4 @@ public interface ContextPropagator { /** Sets the current context */ void setCurrentContext(Object context); - - /** - * This is a lifecycle method, called after the context has been propagated to the - * workflow/activity thread but the workflow/activity has not yet started. - */ - default void setUp() { - // No-op - } - - /** - * This is a lifecycle method, called after the workflow/activity has completed. If the method - * finished without exception, {@code successful} will be true. Otherwise, it will be false and - * {@link #onError(Throwable)} will have already been called. - */ - default void finish() { - // No-op - } - - /** - * This is a lifecycle method, called when the workflow/activity finishes by throwing an unhandled - * exception. {@link #finish()} is called after this method. - * - * @param t The unhandled exception that caused the workflow/activity to terminate - */ - default void onError(Throwable t) { - // No-op - } } diff --git a/src/main/java/com/uber/cadence/context/OpenTelemetryContextPropagator.java b/src/main/java/com/uber/cadence/context/OpenTelemetryContextPropagator.java deleted file mode 100644 index 927d3c71f..000000000 --- a/src/main/java/com/uber/cadence/context/OpenTelemetryContextPropagator.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Modifications copyright (C) 2017 Uber Technologies, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"). You may not - * use this file except in compliance with the License. A copy of the License is - * located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package com.uber.cadence.context; - -import io.opentelemetry.api.GlobalOpenTelemetry; -import io.opentelemetry.api.baggage.Baggage; -import io.opentelemetry.api.baggage.propagation.W3CBaggagePropagator; -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.api.trace.SpanKind; -import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; -import io.opentelemetry.context.Context; -import io.opentelemetry.context.Scope; -import io.opentelemetry.context.propagation.TextMapGetter; -import io.opentelemetry.context.propagation.TextMapPropagator; -import io.opentelemetry.context.propagation.TextMapSetter; -import java.nio.charset.Charset; -import java.util.HashMap; -import java.util.Map; -import javax.annotation.Nullable; -import org.slf4j.MDC; - -public class OpenTelemetryContextPropagator implements ContextPropagator { - - private static final TextMapPropagator w3cTraceContextPropagator = - W3CTraceContextPropagator.getInstance(); - private static final TextMapPropagator w3cBaggagePropagator = W3CBaggagePropagator.getInstance(); - private static ThreadLocal currentContextOtelScope = new ThreadLocal<>(); - private static ThreadLocal currentOtelSpan = new ThreadLocal<>(); - private static ThreadLocal currentOtelScope = new ThreadLocal<>(); - private static ThreadLocal> otelKeySet = new ThreadLocal<>(); - private static final TextMapSetter> setter = Map::put; - private static final TextMapGetter> getter = - new TextMapGetter>() { - @Override - public Iterable keys(Map carrier) { - return otelKeySet.get(); - } - - @Nullable - @Override - public String get(Map carrier, String key) { - return MDC.get(key); - } - }; - - @Override - public String getName() { - return "OpenTelemetry"; - } - - @Override - public Map serializeContext(Object context) { - Map serializedContext = new HashMap<>(); - Map contextMap = (Map) context; - if (contextMap != null) { - for (Map.Entry entry : contextMap.entrySet()) { - serializedContext.put(entry.getKey(), entry.getValue().getBytes(Charset.defaultCharset())); - } - } - return serializedContext; - } - - @Override - public Object deserializeContext(Map context) { - Map contextMap = new HashMap<>(); - for (Map.Entry entry : context.entrySet()) { - contextMap.put(entry.getKey(), new String(entry.getValue(), Charset.defaultCharset())); - } - return contextMap; - } - - @Override - public Object getCurrentContext() { - Map carrier = new HashMap<>(); - w3cTraceContextPropagator.inject(Context.current(), carrier, setter); - w3cBaggagePropagator.inject(Context.current(), carrier, setter); - return carrier; - } - - @Override - public void setCurrentContext(Object context) { - Map contextMap = (Map) context; - if (contextMap != null) { - for (Map.Entry entry : contextMap.entrySet()) { - MDC.put(entry.getKey(), entry.getValue()); - } - otelKeySet.set(contextMap.keySet()); - } - } - - @Override - @SuppressWarnings("MustBeClosedChecker") - public void setUp() { - Context context = - Baggage.fromContext(w3cBaggagePropagator.extract(Context.current(), null, getter)) - .toBuilder() - .build() - .storeInContext(w3cTraceContextPropagator.extract(Context.current(), null, getter)); - - currentContextOtelScope.set(context.makeCurrent()); - - Span span = - GlobalOpenTelemetry.getTracer("cadence-client") - .spanBuilder("cadence.workflow") - .setParent(context) - .setSpanKind(SpanKind.CLIENT) - .startSpan(); - - Scope scope = span.makeCurrent(); - currentOtelSpan.set(span); - currentOtelScope.set(scope); - } - - @Override - public void finish() { - Scope scope = currentOtelScope.get(); - if (scope != null) { - scope.close(); - } - Span span = currentOtelSpan.get(); - if (span != null) { - span.end(); - } - Scope contextScope = currentContextOtelScope.get(); - if (contextScope != null) { - contextScope.close(); - } - } -} diff --git a/src/main/java/com/uber/cadence/internal/context/ContextThreadLocal.java b/src/main/java/com/uber/cadence/internal/context/ContextThreadLocal.java index b4423b923..227124170 100644 --- a/src/main/java/com/uber/cadence/internal/context/ContextThreadLocal.java +++ b/src/main/java/com/uber/cadence/internal/context/ContextThreadLocal.java @@ -24,14 +24,10 @@ import java.util.List; import java.util.Map; import java.util.function.Supplier; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -/** This class holds the current set of context propagators. */ +/** This class holds the current set of context propagators */ public class ContextThreadLocal { - private static final Logger log = LoggerFactory.getLogger(ContextThreadLocal.class); - private static WorkflowThreadLocal> contextPropagators = WorkflowThreadLocal.withInitial( new Supplier>() { @@ -41,7 +37,7 @@ public List get() { } }); - /** Sets the list of context propagators for the thread. */ + /** Sets the list of context propagators for the thread */ public static void setContextPropagators(List propagators) { if (propagators == null || propagators.isEmpty()) { return; @@ -61,11 +57,6 @@ public static Map getCurrentContextForPropagation() { return contextData; } - /** - * Injects the context data into the thread for each configured context propagator. - * - * @param contextData The context data received from the server. - */ public static void propagateContextToCurrentThread(Map contextData) { if (contextData == null || contextData.isEmpty()) { return; @@ -76,44 +67,4 @@ public static void propagateContextToCurrentThread(Map contextDa } } } - - /** Calls {@link ContextPropagator#setUp()} for each propagator. */ - public static void setUpContextPropagators() { - for (ContextPropagator propagator : contextPropagators.get()) { - try { - propagator.setUp(); - } catch (Throwable t) { - // Don't let an error in one propagator block the others - log.error("Error calling setUp() on a contextpropagator", t); - } - } - } - - /** - * Calls {@link ContextPropagator#onError(Throwable)} for each propagator. - * - * @param t The Throwable that caused the workflow/activity to finish. - */ - public static void onErrorContextPropagators(Throwable t) { - for (ContextPropagator propagator : contextPropagators.get()) { - try { - propagator.onError(t); - } catch (Throwable t1) { - // Don't let an error in one propagator block the others - log.error("Error calling onError() on a contextpropagator", t1); - } - } - } - - /** Calls {@link ContextPropagator#finish()} for each propagator. */ - public static void finishContextPropagators() { - for (ContextPropagator propagator : contextPropagators.get()) { - try { - propagator.finish(); - } catch (Throwable t) { - // Don't let an error in one propagator block the others - log.error("Error calling finish() on a contextpropagator", t); - } - } - } } diff --git a/src/main/java/com/uber/cadence/internal/replay/WorkflowContext.java b/src/main/java/com/uber/cadence/internal/replay/WorkflowContext.java index 729255727..5f1d8f16f 100644 --- a/src/main/java/com/uber/cadence/internal/replay/WorkflowContext.java +++ b/src/main/java/com/uber/cadence/internal/replay/WorkflowContext.java @@ -23,7 +23,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; final class WorkflowContext { @@ -167,18 +166,7 @@ Map getPropagatedContexts() { Map contextData = new HashMap<>(); for (ContextPropagator propagator : contextPropagators) { - // Only send the context propagator the fields that belong to them - // Change the map from MyPropagator:foo -> bar to foo -> bar - Map filteredData = - headerData - .entrySet() - .stream() - .filter(e -> e.getKey().startsWith(propagator.getName())) - .collect( - Collectors.toMap( - e -> e.getKey().substring(propagator.getName().length() + 1), - Map.Entry::getValue)); - contextData.put(propagator.getName(), propagator.deserializeContext(filteredData)); + contextData.put(propagator.getName(), propagator.deserializeContext(headerData)); } return contextData; diff --git a/src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java b/src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java index 0e332ba7a..23955ce69 100644 --- a/src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java +++ b/src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java @@ -72,7 +72,6 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; -import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -450,18 +449,7 @@ private Map extractContextsAndConvertToBytes( } Map result = new HashMap<>(); for (ContextPropagator propagator : contextPropagators) { - // Get the serialized context from the propagator - Map serializedContext = - propagator.serializeContext(propagator.getCurrentContext()); - // Namespace each entry in case of overlaps, so foo -> bar becomes MyPropagator:foo -> bar - Map namespacedSerializedContext = - serializedContext - .entrySet() - .stream() - .collect( - Collectors.toMap( - e -> propagator.getName() + ":" + e.getKey(), Map.Entry::getValue)); - result.putAll(namespacedSerializedContext); + result.putAll(propagator.serializeContext(propagator.getCurrentContext())); } return result; } diff --git a/src/main/java/com/uber/cadence/internal/sync/WorkflowStubImpl.java b/src/main/java/com/uber/cadence/internal/sync/WorkflowStubImpl.java index 48d32c12d..978720a23 100644 --- a/src/main/java/com/uber/cadence/internal/sync/WorkflowStubImpl.java +++ b/src/main/java/com/uber/cadence/internal/sync/WorkflowStubImpl.java @@ -51,7 +51,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; class WorkflowStubImpl implements WorkflowStub { @@ -203,18 +202,7 @@ private Map extractContextsAndConvertToBytes( } Map result = new HashMap<>(); for (ContextPropagator propagator : contextPropagators) { - // Get the serialized context from the propagator - Map serializedContext = - propagator.serializeContext(propagator.getCurrentContext()); - // Namespace each entry in case of overlaps, so foo -> bar becomes MyPropagator:foo -> bar - Map namespacedSerializedContext = - serializedContext - .entrySet() - .stream() - .collect( - Collectors.toMap( - k -> propagator.getName() + ":" + k.getKey(), Map.Entry::getValue)); - result.putAll(namespacedSerializedContext); + result.putAll(propagator.serializeContext(propagator.getCurrentContext())); } return result; } diff --git a/src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java b/src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java index bee091a2a..9b7f5891f 100644 --- a/src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java +++ b/src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java @@ -91,7 +91,6 @@ public void run() { // Repopulate the context(s) ContextThreadLocal.setContextPropagators(this.contextPropagators); ContextThreadLocal.propagateContextToCurrentThread(this.propagatedContexts); - ContextThreadLocal.setUpContextPropagators(); try { // initialYield blocks thread until the first runUntilBlocked is called. @@ -100,7 +99,6 @@ public void run() { cancellationScope.run(); } catch (DestroyWorkflowThreadError e) { if (!threadContext.isDestroyRequested()) { - ContextThreadLocal.onErrorContextPropagators(e); threadContext.setUnhandledException(e); } } catch (Error e) { @@ -113,11 +111,9 @@ public void run() { log.error( String.format("Workflow thread \"%s\" run failed with Error:\n%s", name, stackTrace)); } - ContextThreadLocal.onErrorContextPropagators(e); threadContext.setUnhandledException(e); } catch (CancellationException e) { if (!isCancelRequested()) { - ContextThreadLocal.onErrorContextPropagators(e); threadContext.setUnhandledException(e); } if (log.isDebugEnabled()) { @@ -134,10 +130,8 @@ public void run() { "Workflow thread \"%s\" run failed with unhandled exception:\n%s", name, stackTrace)); } - ContextThreadLocal.onErrorContextPropagators(e); threadContext.setUnhandledException(e); } finally { - ContextThreadLocal.finishContextPropagators(); DeterministicRunnerImpl.setCurrentThreadInternal(null); threadContext.setStatus(Status.DONE); thread.setName(originalName); diff --git a/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java b/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java index 3b4a07c1f..f1a100fb3 100644 --- a/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java +++ b/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java @@ -41,7 +41,6 @@ import java.util.Objects; import java.util.concurrent.CancellationException; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import org.apache.thrift.TException; import org.slf4j.MDC; @@ -162,11 +161,7 @@ public void handle(PollForActivityTaskResponse task) throws Exception { Stopwatch sw = metricsScope.timer(MetricsType.ACTIVITY_RESP_LATENCY).start(); sendReply(task, new Result(null, null, cancelledRequest), metricsScope); sw.stop(); - onErrorContextPropagation(e); - } catch (Exception e) { - onErrorContextPropagation(e); } finally { - finishContextPropagation(); MDC.remove(LoggerTag.ACTIVITY_ID); MDC.remove(LoggerTag.ACTIVITY_TYPE); MDC.remove(LoggerTag.WORKFLOW_ID); @@ -193,31 +188,7 @@ void propagateContext(PollForActivityTaskResponse response) { }); for (ContextPropagator propagator : options.getContextPropagators()) { - // Only send the context propagator the fields that belong to them - // Change the map from MyPropagator:foo -> bar to foo -> bar - Map filteredData = - headerData - .entrySet() - .stream() - .filter(e -> e.getKey().startsWith(propagator.getName())) - .collect( - Collectors.toMap( - e -> e.getKey().substring(propagator.getName().length() + 1), - Map.Entry::getValue)); - propagator.setCurrentContext(propagator.deserializeContext(filteredData)); - propagator.setUp(); - } - } - - void onErrorContextPropagation(Exception error) { - for (ContextPropagator propagator : options.getContextPropagators()) { - propagator.onError(error); - } - } - - void finishContextPropagation() { - for (ContextPropagator propagator : options.getContextPropagators()) { - propagator.finish(); + propagator.setCurrentContext(propagator.deserializeContext(headerData)); } } diff --git a/src/main/java/com/uber/cadence/internal/worker/LocalActivityWorker.java b/src/main/java/com/uber/cadence/internal/worker/LocalActivityWorker.java index 08e765aad..c08870adf 100644 --- a/src/main/java/com/uber/cadence/internal/worker/LocalActivityWorker.java +++ b/src/main/java/com/uber/cadence/internal/worker/LocalActivityWorker.java @@ -22,7 +22,6 @@ import com.uber.cadence.MarkerRecordedEventAttributes; import com.uber.cadence.PollForActivityTaskResponse; import com.uber.cadence.common.RetryOptions; -import com.uber.cadence.context.ContextPropagator; import com.uber.cadence.internal.common.LocalActivityMarkerData; import com.uber.cadence.internal.metrics.MetricsTag; import com.uber.cadence.internal.metrics.MetricsType; @@ -38,7 +37,6 @@ import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.LongSupplier; -import java.util.stream.Collectors; public final class LocalActivityWorker extends SuspendableWorkerBase { @@ -227,19 +225,9 @@ private void propagateContext(ExecuteLocalActivityParameters params) { } private void restoreContext(Map context) { - for (ContextPropagator propagator : options.getContextPropagators()) { - // Only send the context propagator the fields that belong to them - // Change the map from MyPropagator:foo -> bar to foo -> bar - Map filteredData = - context - .entrySet() - .stream() - .filter(e -> e.getKey().startsWith(propagator.getName())) - .collect( - Collectors.toMap( - e -> e.getKey().substring(propagator.getName().length() + 1), - Map.Entry::getValue)); - propagator.setCurrentContext(propagator.deserializeContext(filteredData)); - } + options + .getContextPropagators() + .forEach( + propagator -> propagator.setCurrentContext(propagator.deserializeContext(context))); } } diff --git a/src/main/java/com/uber/cadence/worker/Worker.java b/src/main/java/com/uber/cadence/worker/Worker.java index 4527403e2..9bdb9a87d 100644 --- a/src/main/java/com/uber/cadence/worker/Worker.java +++ b/src/main/java/com/uber/cadence/worker/Worker.java @@ -23,7 +23,6 @@ import com.uber.cadence.client.WorkflowClient; import com.uber.cadence.common.WorkflowExecutionHistory; import com.uber.cadence.context.ContextPropagator; -import com.uber.cadence.context.OpenTelemetryContextPropagator; import com.uber.cadence.converter.DataConverter; import com.uber.cadence.internal.common.InternalUtils; import com.uber.cadence.internal.metrics.MetricsTag; @@ -37,10 +36,8 @@ import com.uber.m3.tally.Scope; import com.uber.m3.util.ImmutableMap; import java.time.Duration; -import java.util.ArrayList; import java.util.List; import java.util.Objects; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -84,13 +81,6 @@ public final class Worker implements Suspendable { .getOptions() .getMetricsScope() .tagged(ImmutableMap.of(MetricsTag.TASK_LIST, taskList)); - contextPropagators = new ArrayList<>(contextPropagators); - - // Add the OpenTelemetry propagator if it's not already there - OpenTelemetryContextPropagator otelPropagator = new OpenTelemetryContextPropagator(); - if (!contextPropagators.contains(otelPropagator)) { - contextPropagators.add(otelPropagator); - } SingleWorkerOptions activityOptions = SingleWorkerOptions.newBuilder() diff --git a/src/test/java/com/uber/cadence/context/ContextTests.java b/src/test/java/com/uber/cadence/context/ContextTests.java deleted file mode 100644 index 723f75dcf..000000000 --- a/src/test/java/com/uber/cadence/context/ContextTests.java +++ /dev/null @@ -1,514 +0,0 @@ -/* - * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Modifications copyright (C) 2017 Uber Technologies, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"). You may not - * use this file except in compliance with the License. A copy of the License is - * located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package com.uber.cadence.context; - -import static org.junit.Assert.*; - -import com.uber.cadence.activity.ActivityMethod; -import com.uber.cadence.activity.ActivityOptions; -import com.uber.cadence.client.WorkflowClient; -import com.uber.cadence.client.WorkflowClientOptions; -import com.uber.cadence.client.WorkflowException; -import com.uber.cadence.client.WorkflowOptions; -import com.uber.cadence.internal.testing.WorkflowTestingTest.ChildWorkflow; -import com.uber.cadence.testing.TestEnvironmentOptions; -import com.uber.cadence.testing.TestWorkflowEnvironment; -import com.uber.cadence.worker.Worker; -import com.uber.cadence.workflow.Async; -import com.uber.cadence.workflow.ChildWorkflowOptions; -import com.uber.cadence.workflow.Promise; -import com.uber.cadence.workflow.SignalMethod; -import com.uber.cadence.workflow.Workflow; -import com.uber.cadence.workflow.WorkflowMethod; -import io.opentelemetry.api.GlobalOpenTelemetry; -import io.opentelemetry.api.baggage.Baggage; -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.api.trace.SpanContext; -import io.opentelemetry.api.trace.SpanKind; -import io.opentelemetry.api.trace.TraceFlags; -import io.opentelemetry.api.trace.TraceState; -import io.opentelemetry.context.Context; -import io.opentelemetry.context.Scope; -import java.nio.charset.StandardCharsets; -import java.time.Duration; -import java.util.Arrays; -import java.util.Collections; -import java.util.Map; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestWatcher; -import org.junit.rules.Timeout; -import org.junit.runner.Description; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.slf4j.MDC; - -public class ContextTests { - private static final Logger log = LoggerFactory.getLogger(ContextTests.class); - - @Rule public Timeout globalTimeout = Timeout.seconds(5000); - - @Rule - public TestWatcher watchman = - new TestWatcher() { - @Override - protected void failed(Throwable e, Description description) { - System.err.println(testEnvironment.getDiagnostics()); - } - }; - - private static final String TASK_LIST = "test-workflow"; - - private TestWorkflowEnvironment testEnvironment; - - @Before - public void setUp() { - WorkflowClientOptions clientOptions = - WorkflowClientOptions.newBuilder() - .setContextPropagators( - Arrays.asList(new TestContextPropagator(), new OpenTelemetryContextPropagator())) - .build(); - TestEnvironmentOptions options = - new TestEnvironmentOptions.Builder().setWorkflowClientOptions(clientOptions).build(); - testEnvironment = TestWorkflowEnvironment.newInstance(options); - } - - @After - public void tearDown() { - testEnvironment.close(); - } - - public interface TestWorkflow { - - @WorkflowMethod(executionStartToCloseTimeoutSeconds = 3600 * 24, taskList = TASK_LIST) - String workflow1(String input); - } - - public interface ParentWorkflow { - - @WorkflowMethod(executionStartToCloseTimeoutSeconds = 3600 * 24, taskList = TASK_LIST) - String workflow(String input); - - @SignalMethod - void signal(String value); - } - - public interface TestActivity { - - @ActivityMethod(scheduleToCloseTimeoutSeconds = 3600) - String activity1(String input); - } - - public static class TestContextPropagator implements ContextPropagator { - - @Override - public String getName() { - return "TestContextPropagator::withSomeColons"; - } - - @Override - public Map serializeContext(Object context) { - String testKey = (String) context; - if (testKey != null) { - return Collections.singletonMap("test", testKey.getBytes(StandardCharsets.UTF_8)); - } else { - return Collections.emptyMap(); - } - } - - @Override - public Object deserializeContext(Map context) { - if (context.containsKey("test")) { - return new String(context.get("test"), StandardCharsets.UTF_8); - } else { - return null; - } - } - - @Override - public Object getCurrentContext() { - return MDC.get("test"); - } - - @Override - public void setCurrentContext(Object context) { - MDC.put("test", String.valueOf(context)); - } - } - - public static class ContextPropagationWorkflowImpl implements TestWorkflow { - - @Override - public String workflow1(String input) { - // The test value should be in the MDC - return MDC.get("test"); - } - } - - @Test() - public void testWorkflowContextPropagation() { - Worker worker = testEnvironment.newWorker(TASK_LIST); - worker.registerWorkflowImplementationTypes(ContextPropagationWorkflowImpl.class); - testEnvironment.start(); - MDC.put("test", "testing123"); - WorkflowClient client = testEnvironment.newWorkflowClient(); - WorkflowOptions options = - new WorkflowOptions.Builder() - .setContextPropagators(Collections.singletonList(new TestContextPropagator())) - .build(); - TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options); - String result = workflow.workflow1("input1"); - assertEquals("testing123", result); - } - - public static class ContextPropagationParentWorkflowImpl implements ParentWorkflow { - - @Override - public String workflow(String input) { - // Get the MDC value - String mdcValue = MDC.get("test"); - - // Fire up a child workflow - ChildWorkflowOptions options = - new ChildWorkflowOptions.Builder() - .setContextPropagators(Collections.singletonList(new TestContextPropagator())) - .build(); - ChildWorkflow child = Workflow.newChildWorkflowStub(ChildWorkflow.class, options); - - String result = child.workflow(mdcValue, Workflow.getWorkflowInfo().getWorkflowId()); - return result; - } - - @Override - public void signal(String value) {} - } - - public static class ContextPropagationChildWorkflowImpl implements ChildWorkflow { - - @Override - public String workflow(String input, String parentId) { - String mdcValue = MDC.get("test"); - return input + mdcValue; - } - } - - @Test - public void testChildWorkflowContextPropagation() { - Worker worker = testEnvironment.newWorker(TASK_LIST); - worker.registerWorkflowImplementationTypes( - ContextPropagationParentWorkflowImpl.class, ContextPropagationChildWorkflowImpl.class); - testEnvironment.start(); - MDC.put("test", "testing123"); - WorkflowClient client = testEnvironment.newWorkflowClient(); - WorkflowOptions options = - new WorkflowOptions.Builder() - .setContextPropagators(Collections.singletonList(new TestContextPropagator())) - .build(); - ParentWorkflow workflow = client.newWorkflowStub(ParentWorkflow.class, options); - String result = workflow.workflow("input1"); - assertEquals("testing123testing123", result); - } - - public static class ContextPropagationThreadWorkflowImpl implements TestWorkflow { - - @Override - public String workflow1(String input) { - Promise asyncPromise = Async.function(this::async); - return asyncPromise.get(); - } - - private String async() { - return "async" + MDC.get("test"); - } - } - - @Test - public void testThreadContextPropagation() { - Worker worker = testEnvironment.newWorker(TASK_LIST); - worker.registerWorkflowImplementationTypes(ContextPropagationThreadWorkflowImpl.class); - testEnvironment.start(); - MDC.put("test", "testing123"); - WorkflowClient client = testEnvironment.newWorkflowClient(); - WorkflowOptions options = - new WorkflowOptions.Builder() - .setContextPropagators(Collections.singletonList(new TestContextPropagator())) - .build(); - TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options); - String result = workflow.workflow1("input1"); - assertEquals("asynctesting123", result); - } - - public static class ContextActivityImpl implements TestActivity { - @Override - public String activity1(String input) { - return "activity" + MDC.get("test"); - } - } - - public static class ContextPropagationActivityWorkflowImpl implements TestWorkflow { - @Override - public String workflow1(String input) { - ActivityOptions options = - new ActivityOptions.Builder() - .setScheduleToCloseTimeout(Duration.ofSeconds(5)) - .setContextPropagators(Collections.singletonList(new TestContextPropagator())) - .build(); - TestActivity activity = Workflow.newActivityStub(TestActivity.class, options); - return activity.activity1("foo"); - } - } - - @Test - public void testActivityContextPropagation() { - Worker worker = testEnvironment.newWorker(TASK_LIST); - worker.registerWorkflowImplementationTypes(ContextPropagationActivityWorkflowImpl.class); - worker.registerActivitiesImplementations(new ContextActivityImpl()); - testEnvironment.start(); - MDC.put("test", "testing123"); - WorkflowClient client = testEnvironment.newWorkflowClient(); - WorkflowOptions options = - new WorkflowOptions.Builder() - .setContextPropagators(Collections.singletonList(new TestContextPropagator())) - .build(); - TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options); - String result = workflow.workflow1("input1"); - assertEquals("activitytesting123", result); - } - - public static class DefaultContextPropagationActivityWorkflowImpl implements TestWorkflow { - @Override - public String workflow1(String input) { - ActivityOptions options = - new ActivityOptions.Builder().setScheduleToCloseTimeout(Duration.ofSeconds(5)).build(); - TestActivity activity = Workflow.newActivityStub(TestActivity.class, options); - return activity.activity1("foo"); - } - } - - @Test - public void testDefaultActivityContextPropagation() { - Worker worker = testEnvironment.newWorker(TASK_LIST); - worker.registerWorkflowImplementationTypes(DefaultContextPropagationActivityWorkflowImpl.class); - worker.registerActivitiesImplementations(new ContextActivityImpl()); - testEnvironment.start(); - MDC.put("test", "testing123"); - WorkflowClient client = testEnvironment.newWorkflowClient(); - WorkflowOptions options = - new WorkflowOptions.Builder() - .setContextPropagators(Collections.singletonList(new TestContextPropagator())) - .build(); - TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options); - String result = workflow.workflow1("input1"); - assertEquals("activitytesting123", result); - } - - public static class DefaultContextPropagationParentWorkflowImpl implements ParentWorkflow { - - @Override - public String workflow(String input) { - // Get the MDC value - String mdcValue = MDC.get("test"); - - // Fire up a child workflow - ChildWorkflowOptions options = new ChildWorkflowOptions.Builder().build(); - ChildWorkflow child = Workflow.newChildWorkflowStub(ChildWorkflow.class, options); - - String result = child.workflow(mdcValue, Workflow.getWorkflowInfo().getWorkflowId()); - return result; - } - - @Override - public void signal(String value) {} - } - - @Test - public void testDefaultChildWorkflowContextPropagation() { - Worker worker = testEnvironment.newWorker(TASK_LIST); - worker.registerWorkflowImplementationTypes( - DefaultContextPropagationParentWorkflowImpl.class, - ContextPropagationChildWorkflowImpl.class); - testEnvironment.start(); - MDC.put("test", "testing123"); - WorkflowClient client = testEnvironment.newWorkflowClient(); - WorkflowOptions options = - new WorkflowOptions.Builder() - .setContextPropagators(Collections.singletonList(new TestContextPropagator())) - .build(); - ParentWorkflow workflow = client.newWorkflowStub(ParentWorkflow.class, options); - String result = workflow.workflow("input1"); - assertEquals("testing123testing123", result); - } - - public static class OpenTelemetryContextPropagationWorkflowImpl implements TestWorkflow { - @Override - public String workflow1(String input) { - if ("fail".equals(input)) { - throw new IllegalArgumentException(); - } else if ("baggage".equals(input)) { - return Baggage.current().toString(); - } else { - SpanContext ctx = Span.current().getSpanContext(); - return ctx.getTraceId() + ":" + ctx.getSpanId() + ":" + ctx.getTraceState().toString(); - } - } - } - - @Test - public void testOpenTelemetryContextPropagation() { - TraceState TRACE_STATE = TraceState.builder().put("foo", "bar").build(); - String TRACE_ID_BASE16 = "ff000000000000000000000000000041"; - String SPAN_ID_BASE16 = "ff00000000000041"; - - Context ctx = - Context.current() - .with( - Span.wrap( - SpanContext.create( - TRACE_ID_BASE16, SPAN_ID_BASE16, TraceFlags.getDefault(), TRACE_STATE))); - - Span span = - GlobalOpenTelemetry.getTracer("test-tracer") - .spanBuilder("test-span") - .setParent(ctx) - .setSpanKind(SpanKind.CLIENT) - .startSpan(); - - Worker worker = testEnvironment.newWorker(TASK_LIST); - worker.registerWorkflowImplementationTypes(OpenTelemetryContextPropagationWorkflowImpl.class); - testEnvironment.start(); - WorkflowClient client = testEnvironment.newWorkflowClient(); - WorkflowOptions options = - new WorkflowOptions.Builder() - .setContextPropagators( - Arrays.asList(new TestContextPropagator(), new OpenTelemetryContextPropagator())) - .build(); - - try (Scope scope = span.makeCurrent()) { - TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options); - assertEquals( - TRACE_ID_BASE16 + ":" + SPAN_ID_BASE16 + ":" + TRACE_STATE.toString(), - workflow.workflow1("success")); - } - - try (Scope scope = span.makeCurrent()) { - TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options); - workflow.workflow1("fail"); - } catch (WorkflowException e) { - assertEquals(IllegalArgumentException.class, e.getCause().getClass()); - } finally { - span.end(); - } - } - - @Test - public void testDefaultOpenTelemetryContextPropagation() { - TraceState TRACE_STATE = TraceState.builder().put("foo", "bar").build(); - String TRACE_ID_BASE16 = "ff000000000000000000000000000041"; - String SPAN_ID_BASE16 = "ff00000000000041"; - - Context ctx = - Context.current() - .with( - Span.wrap( - SpanContext.create( - TRACE_ID_BASE16, SPAN_ID_BASE16, TraceFlags.getDefault(), TRACE_STATE))); - - Span span = - GlobalOpenTelemetry.getTracer("test-tracer") - .spanBuilder("test-span") - .setParent(ctx) - .setSpanKind(SpanKind.CLIENT) - .startSpan(); - - Worker worker = testEnvironment.newWorker(TASK_LIST); - worker.registerWorkflowImplementationTypes(OpenTelemetryContextPropagationWorkflowImpl.class); - testEnvironment.start(); - WorkflowClient client = testEnvironment.newWorkflowClient(); - WorkflowOptions options = new WorkflowOptions.Builder().build(); - - try (Scope scope = span.makeCurrent()) { - TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options); - assertEquals( - TRACE_ID_BASE16 + ":" + SPAN_ID_BASE16 + ":" + TRACE_STATE.toString(), - workflow.workflow1("success")); - } - - try (Scope scope = span.makeCurrent()) { - TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options); - workflow.workflow1("fail"); - } catch (WorkflowException e) { - assertEquals(IllegalArgumentException.class, e.getCause().getClass()); - } finally { - span.end(); - } - } - - @Test - public void testNoDefaultOpenTelemetryContextPropagation() { - TraceState TRACE_STATE = TraceState.builder().put("foo", "bar").build(); - String TRACE_ID_BASE16 = "ff000000000000000000000000000041"; - String SPAN_ID_BASE16 = "ff00000000000041"; - - Context ctx = - Context.current() - .with( - Span.wrap( - SpanContext.create( - TRACE_ID_BASE16, SPAN_ID_BASE16, TraceFlags.getDefault(), TRACE_STATE))); - - Span span = - GlobalOpenTelemetry.getTracer("test-tracer") - .spanBuilder("test-span") - .setParent(ctx) - .setSpanKind(SpanKind.CLIENT) - .startSpan(); - - Worker worker = testEnvironment.newWorker(TASK_LIST); - worker.registerWorkflowImplementationTypes(OpenTelemetryContextPropagationWorkflowImpl.class); - testEnvironment.start(); - WorkflowClient client = testEnvironment.newWorkflowClient(); - WorkflowOptions options = - new WorkflowOptions.Builder().build(); - - try (Scope scope = span.makeCurrent()) { - TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options); - assertNotEquals( - TRACE_ID_BASE16 + ":" + SPAN_ID_BASE16 + ":" + TRACE_STATE.toString(), - workflow.workflow1("success")); - } - } - - @Test - public void testBaggagePropagation() { - Baggage baggage = - Baggage.builder().put("keyFoo1", "valueFoo1").put("keyFoo2", "valueFoo2").build(); - - Worker worker = testEnvironment.newWorker(TASK_LIST); - worker.registerWorkflowImplementationTypes(OpenTelemetryContextPropagationWorkflowImpl.class); - testEnvironment.start(); - WorkflowClient client = testEnvironment.newWorkflowClient(); - WorkflowOptions options = new WorkflowOptions.Builder().build(); - - try (Scope scope = Context.current().with(baggage).makeCurrent()) { - TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options); - assertEquals(baggage.toString(), workflow.workflow1("baggage")); - } - } -} diff --git a/src/test/java/com/uber/cadence/internal/testing/WorkflowTestingTest.java b/src/test/java/com/uber/cadence/internal/testing/WorkflowTestingTest.java index 81214c1bf..e22de1bdd 100644 --- a/src/test/java/com/uber/cadence/internal/testing/WorkflowTestingTest.java +++ b/src/test/java/com/uber/cadence/internal/testing/WorkflowTestingTest.java @@ -17,7 +17,6 @@ package com.uber.cadence.internal.testing; -import static org.junit.Assert.*; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -40,7 +39,7 @@ import com.uber.cadence.activity.ActivityMethod; import com.uber.cadence.activity.ActivityOptions; import com.uber.cadence.client.*; -import com.uber.cadence.context.ContextTests; +import com.uber.cadence.context.ContextPropagator; import com.uber.cadence.internal.common.WorkflowExecutionUtils; import com.uber.cadence.testing.SimulatedTimeoutException; import com.uber.cadence.testing.TestEnvironmentOptions; @@ -48,14 +47,17 @@ import com.uber.cadence.worker.Worker; import com.uber.cadence.workflow.ActivityTimeoutException; import com.uber.cadence.workflow.Async; +import com.uber.cadence.workflow.ChildWorkflowOptions; import com.uber.cadence.workflow.ChildWorkflowTimedOutException; import com.uber.cadence.workflow.Promise; import com.uber.cadence.workflow.SignalMethod; import com.uber.cadence.workflow.Workflow; import com.uber.cadence.workflow.WorkflowMethod; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.UUID; import java.util.concurrent.CancellationException; @@ -71,6 +73,7 @@ import org.junit.runner.Description; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.MDC; public class WorkflowTestingTest { private static final Logger log = LoggerFactory.getLogger(WorkflowTestingTest.class); @@ -96,8 +99,7 @@ public void setUp() { new TestEnvironmentOptions.Builder() .setWorkflowClientOptions( WorkflowClientOptions.newBuilder() - .setContextPropagators( - Collections.singletonList(new ContextTests.TestContextPropagator())) + .setContextPropagators(Collections.singletonList(new TestContextPropagator())) .build()) .build(); testEnvironment = TestWorkflowEnvironment.newInstance(options); @@ -737,4 +739,244 @@ public void testMockedChildSimulatedTimeout() { assertTrue(e.getCause() instanceof ChildWorkflowTimedOutException); } } + + public static class TestContextPropagator implements ContextPropagator { + + @Override + public String getName() { + return this.getClass().getName(); + } + + @Override + public Map serializeContext(Object context) { + String testKey = (String) context; + if (testKey != null) { + return Collections.singletonMap("test", testKey.getBytes(StandardCharsets.UTF_8)); + } else { + return Collections.emptyMap(); + } + } + + @Override + public Object deserializeContext(Map context) { + if (context.containsKey("test")) { + return new String(context.get("test"), StandardCharsets.UTF_8); + } else { + return null; + } + } + + @Override + public Object getCurrentContext() { + return MDC.get("test"); + } + + @Override + public void setCurrentContext(Object context) { + MDC.put("test", String.valueOf(context)); + } + } + + public static class ContextPropagationWorkflowImpl implements TestWorkflow { + + @Override + public String workflow1(String input) { + // The test value should be in the MDC + return MDC.get("test"); + } + } + + @Test + public void testWorkflowContextPropagation() { + Worker worker = testEnvironment.newWorker(TASK_LIST); + worker.registerWorkflowImplementationTypes(ContextPropagationWorkflowImpl.class); + testEnvironment.start(); + MDC.put("test", "testing123"); + WorkflowClient client = testEnvironment.newWorkflowClient(); + WorkflowOptions options = + new WorkflowOptions.Builder() + .setContextPropagators(Collections.singletonList(new TestContextPropagator())) + .build(); + TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options); + String result = workflow.workflow1("input1"); + assertEquals("testing123", result); + } + + public static class ContextPropagationParentWorkflowImpl implements ParentWorkflow { + + @Override + public String workflow(String input) { + // Get the MDC value + String mdcValue = MDC.get("test"); + + // Fire up a child workflow + ChildWorkflowOptions options = + new ChildWorkflowOptions.Builder() + .setContextPropagators(Collections.singletonList(new TestContextPropagator())) + .build(); + ChildWorkflow child = Workflow.newChildWorkflowStub(ChildWorkflow.class, options); + + String result = child.workflow(mdcValue, Workflow.getWorkflowInfo().getWorkflowId()); + return result; + } + + @Override + public void signal(String value) {} + } + + public static class ContextPropagationChildWorkflowImpl implements ChildWorkflow { + + @Override + public String workflow(String input, String parentId) { + String mdcValue = MDC.get("test"); + return input + mdcValue; + } + } + + @Test + public void testChildWorkflowContextPropagation() { + Worker worker = testEnvironment.newWorker(TASK_LIST); + worker.registerWorkflowImplementationTypes( + ContextPropagationParentWorkflowImpl.class, ContextPropagationChildWorkflowImpl.class); + testEnvironment.start(); + MDC.put("test", "testing123"); + WorkflowClient client = testEnvironment.newWorkflowClient(); + WorkflowOptions options = + new WorkflowOptions.Builder() + .setContextPropagators(Collections.singletonList(new TestContextPropagator())) + .build(); + ParentWorkflow workflow = client.newWorkflowStub(ParentWorkflow.class, options); + String result = workflow.workflow("input1"); + assertEquals("testing123testing123", result); + } + + public static class ContextPropagationThreadWorkflowImpl implements TestWorkflow { + + @Override + public String workflow1(String input) { + Promise asyncPromise = Async.function(this::async); + return asyncPromise.get(); + } + + private String async() { + return "async" + MDC.get("test"); + } + } + + @Test + public void testThreadContextPropagation() { + Worker worker = testEnvironment.newWorker(TASK_LIST); + worker.registerWorkflowImplementationTypes(ContextPropagationThreadWorkflowImpl.class); + testEnvironment.start(); + MDC.put("test", "testing123"); + WorkflowClient client = testEnvironment.newWorkflowClient(); + WorkflowOptions options = + new WorkflowOptions.Builder() + .setContextPropagators(Collections.singletonList(new TestContextPropagator())) + .build(); + TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options); + String result = workflow.workflow1("input1"); + assertEquals("asynctesting123", result); + } + + public static class ContextActivityImpl implements TestActivity { + @Override + public String activity1(String input) { + return "activity" + MDC.get("test"); + } + } + + public static class ContextPropagationActivityWorkflowImpl implements TestWorkflow { + @Override + public String workflow1(String input) { + ActivityOptions options = + new ActivityOptions.Builder() + .setScheduleToCloseTimeout(Duration.ofSeconds(5)) + .setContextPropagators(Collections.singletonList(new TestContextPropagator())) + .build(); + TestActivity activity = Workflow.newActivityStub(TestActivity.class, options); + return activity.activity1("foo"); + } + } + + @Test + public void testActivityContextPropagation() { + Worker worker = testEnvironment.newWorker(TASK_LIST); + worker.registerWorkflowImplementationTypes(ContextPropagationActivityWorkflowImpl.class); + worker.registerActivitiesImplementations(new ContextActivityImpl()); + testEnvironment.start(); + MDC.put("test", "testing123"); + WorkflowClient client = testEnvironment.newWorkflowClient(); + WorkflowOptions options = + new WorkflowOptions.Builder() + .setContextPropagators(Collections.singletonList(new TestContextPropagator())) + .build(); + TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options); + String result = workflow.workflow1("input1"); + assertEquals("activitytesting123", result); + } + + public static class DefaultContextPropagationActivityWorkflowImpl implements TestWorkflow { + @Override + public String workflow1(String input) { + ActivityOptions options = + new ActivityOptions.Builder().setScheduleToCloseTimeout(Duration.ofSeconds(5)).build(); + TestActivity activity = Workflow.newActivityStub(TestActivity.class, options); + return activity.activity1("foo"); + } + } + + @Test + public void testDefaultActivityContextPropagation() { + Worker worker = testEnvironment.newWorker(TASK_LIST); + worker.registerWorkflowImplementationTypes(DefaultContextPropagationActivityWorkflowImpl.class); + worker.registerActivitiesImplementations(new ContextActivityImpl()); + testEnvironment.start(); + MDC.put("test", "testing123"); + WorkflowClient client = testEnvironment.newWorkflowClient(); + WorkflowOptions options = + new WorkflowOptions.Builder() + .setContextPropagators(Collections.singletonList(new TestContextPropagator())) + .build(); + TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options); + String result = workflow.workflow1("input1"); + assertEquals("activitytesting123", result); + } + + public static class DefaultContextPropagationParentWorkflowImpl implements ParentWorkflow { + + @Override + public String workflow(String input) { + // Get the MDC value + String mdcValue = MDC.get("test"); + + // Fire up a child workflow + ChildWorkflowOptions options = new ChildWorkflowOptions.Builder().build(); + ChildWorkflow child = Workflow.newChildWorkflowStub(ChildWorkflow.class, options); + + String result = child.workflow(mdcValue, Workflow.getWorkflowInfo().getWorkflowId()); + return result; + } + + @Override + public void signal(String value) {} + } + + @Test + public void testDefaultChildWorkflowContextPropagation() { + Worker worker = testEnvironment.newWorker(TASK_LIST); + worker.registerWorkflowImplementationTypes( + DefaultContextPropagationParentWorkflowImpl.class, + ContextPropagationChildWorkflowImpl.class); + testEnvironment.start(); + MDC.put("test", "testing123"); + WorkflowClient client = testEnvironment.newWorkflowClient(); + WorkflowOptions options = + new WorkflowOptions.Builder() + .setContextPropagators(Collections.singletonList(new TestContextPropagator())) + .build(); + ParentWorkflow workflow = client.newWorkflowStub(ParentWorkflow.class, options); + String result = workflow.workflow("input1"); + assertEquals("testing123testing123", result); + } } From 17140b236b1fbef76b72ac26cc2ea3005e9fb8d4 Mon Sep 17 00:00:00 2001 From: Matt Anger Date: Tue, 15 Jun 2021 12:26:54 -0700 Subject: [PATCH 06/12] cr comments --- .../cadence/serviceclient/WorkflowServiceTChannel.java | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java b/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java index 6ae12a13d..c61993a22 100644 --- a/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java +++ b/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java @@ -284,11 +284,6 @@ private ThriftResponse doRemoteCall(ThriftRequest request) throws TExc } catch (ExecutionException e) { throw new TException(e); } catch (TChannelError e) { - try { - resetSubchannelPeers(); - } catch (UnknownHostException uhe) { - // do nothing? - } throw new TException("Rpc error", e); } this.throwOnRpcError(response); @@ -301,11 +296,6 @@ private CompletableFuture> doRemoteCallAsync(ThriftRequest try { future = subChannel.send(request); } catch (TChannelError tChannelError) { - try { - resetSubchannelPeers(); - } catch (UnknownHostException uhe) { - // do nothing? - } result.completeExceptionally(new TException(tChannelError)); } future.addCallback( From 6800bcc17612a344e0071d56333ac92abe3ae8fd Mon Sep 17 00:00:00 2001 From: Matt Anger Date: Tue, 15 Jun 2021 12:31:02 -0700 Subject: [PATCH 07/12] remove hard codeing of service name --- .../com/uber/cadence/serviceclient/WorkflowServiceTChannel.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java b/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java index c61993a22..f79dd6695 100644 --- a/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java +++ b/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java @@ -218,7 +218,7 @@ private ThriftRequest buildThriftRequest(String apiName, T body) { @Override public CompletableFuture isHealthy() { final ThriftRequest req = - new ThriftRequest.Builder("cadence-frontend", "Meta::health") + new ThriftRequest.Builder(options.getServiceName(), "Meta::health") .setBody(new Meta.health_args()) .build(); final CompletableFuture result = new CompletableFuture<>(); From d88a16cd16c29c7b384ba5927daa32489af1fdcf Mon Sep 17 00:00:00 2001 From: Matt Anger Date: Thu, 24 Jun 2021 13:42:53 -0700 Subject: [PATCH 08/12] add comment --- .../com/uber/cadence/serviceclient/WorkflowServiceTChannel.java | 1 + src/main/java/com/uber/cadence/worker/Worker.java | 1 + src/main/java/com/uber/cadence/worker/WorkerFactory.java | 1 + 3 files changed, 3 insertions(+) diff --git a/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java b/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java index f79dd6695..7a7cc07cb 100644 --- a/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java +++ b/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java @@ -215,6 +215,7 @@ private ThriftRequest buildThriftRequest(String apiName, T body) { return buildThriftRequest(apiName, body, null); } + /** Checks if we have a valid connection to the Cadence cluster, and potentially resets the peer list */ @Override public CompletableFuture isHealthy() { final ThriftRequest req = diff --git a/src/main/java/com/uber/cadence/worker/Worker.java b/src/main/java/com/uber/cadence/worker/Worker.java index 9bdb9a87d..061467568 100644 --- a/src/main/java/com/uber/cadence/worker/Worker.java +++ b/src/main/java/com/uber/cadence/worker/Worker.java @@ -327,6 +327,7 @@ public boolean isSuspended() { return workflowWorker.isSuspended() && activityWorker.isSuspended(); } + /** Checks if we have a valid connection to the Cadence cluster, and potentially resets the peer list */ public CompletableFuture isHealthy() { return workflowWorker.isHealthy(); } diff --git a/src/main/java/com/uber/cadence/worker/WorkerFactory.java b/src/main/java/com/uber/cadence/worker/WorkerFactory.java index ff3ecf834..257b79d9c 100644 --- a/src/main/java/com/uber/cadence/worker/WorkerFactory.java +++ b/src/main/java/com/uber/cadence/worker/WorkerFactory.java @@ -289,6 +289,7 @@ public synchronized void shutdownNow() { } } + /** Checks if we have a valid connection to the Cadence cluster, and potentially resets the peer list */ public CompletableFuture isHealthy() { List> healthyList = workers.stream().map(Worker::isHealthy).collect(Collectors.toList()); From ba4dd3417eba61156f5b7b228d345124429390d3 Mon Sep 17 00:00:00 2001 From: Matt Anger Date: Thu, 24 Jun 2021 14:37:05 -0700 Subject: [PATCH 09/12] add comment to IWorkflowService --- .../java/com/uber/cadence/serviceclient/IWorkflowService.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/com/uber/cadence/serviceclient/IWorkflowService.java b/src/main/java/com/uber/cadence/serviceclient/IWorkflowService.java index 998fca610..40bfbfce1 100644 --- a/src/main/java/com/uber/cadence/serviceclient/IWorkflowService.java +++ b/src/main/java/com/uber/cadence/serviceclient/IWorkflowService.java @@ -87,5 +87,6 @@ void SignalWorkflowExecutionWithTimeout( Long timeoutInMillis) throws TException; + /** Checks if we have a valid connection to the Cadence cluster, and potentially resets the peer list */ CompletableFuture isHealthy(); } From 1f7df8ca916767ce549c1ebad575956f73d1194f Mon Sep 17 00:00:00 2001 From: Quanzheng Long Date: Thu, 24 Jun 2021 21:54:07 -0700 Subject: [PATCH 10/12] rerun pipeline From 3c83c69061d99308634523843bb66966022df810 Mon Sep 17 00:00:00 2001 From: Matt Anger Date: Fri, 25 Jun 2021 10:16:18 -0700 Subject: [PATCH 11/12] formatting --- .../com/uber/cadence/serviceclient/IWorkflowService.java | 5 ++++- .../uber/cadence/serviceclient/WorkflowServiceTChannel.java | 5 ++++- src/main/java/com/uber/cadence/worker/Worker.java | 6 +++++- src/main/java/com/uber/cadence/worker/WorkerFactory.java | 5 ++++- 4 files changed, 17 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/uber/cadence/serviceclient/IWorkflowService.java b/src/main/java/com/uber/cadence/serviceclient/IWorkflowService.java index 40bfbfce1..b04c1e2df 100644 --- a/src/main/java/com/uber/cadence/serviceclient/IWorkflowService.java +++ b/src/main/java/com/uber/cadence/serviceclient/IWorkflowService.java @@ -87,6 +87,9 @@ void SignalWorkflowExecutionWithTimeout( Long timeoutInMillis) throws TException; - /** Checks if we have a valid connection to the Cadence cluster, and potentially resets the peer list */ + /** + * Checks if we have a valid connection to the Cadence cluster, and potentially resets the peer + * list + */ CompletableFuture isHealthy(); } diff --git a/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java b/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java index 7a7cc07cb..00a3bf661 100644 --- a/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java +++ b/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java @@ -215,7 +215,10 @@ private ThriftRequest buildThriftRequest(String apiName, T body) { return buildThriftRequest(apiName, body, null); } - /** Checks if we have a valid connection to the Cadence cluster, and potentially resets the peer list */ + /** + * Checks if we have a valid connection to the Cadence cluster, and potentially resets the peer + * list + */ @Override public CompletableFuture isHealthy() { final ThriftRequest req = diff --git a/src/main/java/com/uber/cadence/worker/Worker.java b/src/main/java/com/uber/cadence/worker/Worker.java index 061467568..7c0b714ea 100644 --- a/src/main/java/com/uber/cadence/worker/Worker.java +++ b/src/main/java/com/uber/cadence/worker/Worker.java @@ -38,6 +38,7 @@ import java.time.Duration; import java.util.List; import java.util.Objects; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -327,7 +328,10 @@ public boolean isSuspended() { return workflowWorker.isSuspended() && activityWorker.isSuspended(); } - /** Checks if we have a valid connection to the Cadence cluster, and potentially resets the peer list */ + /** + * Checks if we have a valid connection to the Cadence cluster, and potentially resets the peer + * list + */ public CompletableFuture isHealthy() { return workflowWorker.isHealthy(); } diff --git a/src/main/java/com/uber/cadence/worker/WorkerFactory.java b/src/main/java/com/uber/cadence/worker/WorkerFactory.java index 257b79d9c..a1916661e 100644 --- a/src/main/java/com/uber/cadence/worker/WorkerFactory.java +++ b/src/main/java/com/uber/cadence/worker/WorkerFactory.java @@ -289,7 +289,10 @@ public synchronized void shutdownNow() { } } - /** Checks if we have a valid connection to the Cadence cluster, and potentially resets the peer list */ + /** + * Checks if we have a valid connection to the Cadence cluster, and potentially resets the peer + * list + */ public CompletableFuture isHealthy() { List> healthyList = workers.stream().map(Worker::isHealthy).collect(Collectors.toList()); From 90504beedf7e43a197ce26daa4cd8ae60675e964 Mon Sep 17 00:00:00 2001 From: Quanzheng Long Date: Fri, 25 Jun 2021 10:21:42 -0700 Subject: [PATCH 12/12] rerun pipeline