diff --git a/build.gradle b/build.gradle index d41bd07cf..ce3c655b3 100644 --- a/build.gradle +++ b/build.gradle @@ -58,10 +58,13 @@ dependencies { compile group: 'com.google.guava', name: 'guava', version: '28.1-jre' compile group: 'com.cronutils', name: 'cron-utils', version: '9.0.0' compile group: 'io.micrometer', name: 'micrometer-core', version: '1.1.2' + compile group: 'io.opentracing', name: 'opentracing-api', version: '0.33.0' + compile group: 'io.opentracing', name: 'opentracing-util', version: '0.33.0' testCompile group: 'junit', name: 'junit', version: '4.12' testCompile group: 'com.googlecode.junit-toolbox', name: 'junit-toolbox', version: '2.4' testCompile group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.3' + testCompile group: 'io.opentracing', name: 'opentracing-mock', version: '0.33.0' } license { diff --git a/src/main/java/com/uber/cadence/context/ContextPropagator.java b/src/main/java/com/uber/cadence/context/ContextPropagator.java index 3a618f96c..3bfca5b71 100644 --- a/src/main/java/com/uber/cadence/context/ContextPropagator.java +++ b/src/main/java/com/uber/cadence/context/ContextPropagator.java @@ -23,6 +23,9 @@ * Context Propagators are used to propagate information from workflow to activity, workflow to * child workflow, and workflow to child thread (using {@link com.uber.cadence.workflow.Async}). * + *

It is important to note that all threads share one ContextPropagator instance, so your + * implementation must be thread-safe and store any state in ThreadLocal variables. + * *

A sample ContextPropagator that copies all {@link org.slf4j.MDC} entries starting * with a given prefix along the code path looks like this: * @@ -136,4 +139,31 @@ public interface ContextPropagator { /** Sets the current context */ void setCurrentContext(Object context); + + /** + * This is a lifecycle method, called after the context has been propagated to the + * workflow/activity thread but the workflow/activity has not yet started. + */ + default void setUp() { + // No-op + } + + /** + * This is a lifecycle method, called after the workflow/activity has completed. If the method + * finished without exception, {@code successful} will be true. Otherwise, it will be false and + * {@link #onError(Throwable)} will have already been called. + */ + default void finish(boolean successful) { + // No-op + } + + /** + * This is a lifecycle method, called when the workflow/activity finishes by throwing an unhandled + * exception. {@link #finish(boolean)} is called after this method. + * + * @param t The unhandled exception that caused the workflow/activity to terminate + */ + default void onError(Throwable t) { + // No-op + } } diff --git a/src/main/java/com/uber/cadence/context/OpenTracingContextPropagator.java b/src/main/java/com/uber/cadence/context/OpenTracingContextPropagator.java new file mode 100644 index 000000000..e90ac29b9 --- /dev/null +++ b/src/main/java/com/uber/cadence/context/OpenTracingContextPropagator.java @@ -0,0 +1,208 @@ +/* + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.uber.cadence.context; + +import com.uber.cadence.internal.logging.LoggerTag; +import io.opentracing.Scope; +import io.opentracing.Span; +import io.opentracing.SpanContext; +import io.opentracing.Tracer; +import io.opentracing.log.Fields; +import io.opentracing.propagation.Format; +import io.opentracing.propagation.TextMap; +import io.opentracing.tag.Tags; +import io.opentracing.util.GlobalTracer; +import java.nio.charset.Charset; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; + +/** Support for OpenTracing spans */ +public class OpenTracingContextPropagator implements ContextPropagator { + + private static final Logger log = LoggerFactory.getLogger(OpenTracingContextPropagator.class); + + private static ThreadLocal currentOpenTracingSpanContext = new ThreadLocal<>(); + private static ThreadLocal currentOpenTracingSpan = new ThreadLocal<>(); + private static ThreadLocal currentOpenTracingScope = new ThreadLocal<>(); + + public static void setCurrentOpenTracingSpanContext(SpanContext ctx) { + if (ctx != null) { + currentOpenTracingSpanContext.set(ctx); + } + } + + public static SpanContext getCurrentOpenTracingSpanContext() { + return currentOpenTracingSpanContext.get(); + } + + @Override + public String getName() { + return "OpenTracing"; + } + + @Override + public Map serializeContext(Object context) { + Map serializedContext = new HashMap<>(); + Map contextMap = (Map) context; + if (contextMap != null) { + for (Map.Entry entry : contextMap.entrySet()) { + serializedContext.put(entry.getKey(), entry.getValue().getBytes(Charset.defaultCharset())); + } + } + return serializedContext; + } + + @Override + public Object deserializeContext(Map context) { + Map contextMap = new HashMap<>(); + for (Map.Entry entry : context.entrySet()) { + contextMap.put(entry.getKey(), new String(entry.getValue(), Charset.defaultCharset())); + } + return contextMap; + } + + @Override + public Object getCurrentContext() { + Tracer currentTracer = GlobalTracer.get(); + Span currentSpan = currentTracer.scopeManager().activeSpan(); + if (currentSpan != null) { + HashMapTextMap contextTextMap = new HashMapTextMap(); + currentTracer.inject(currentSpan.context(), Format.Builtin.TEXT_MAP, contextTextMap); + return contextTextMap.getBackingMap(); + } else { + return null; + } + } + + @Override + public void setCurrentContext(Object context) { + Tracer currentTracer = GlobalTracer.get(); + Map contextAsMap = (Map) context; + if (contextAsMap != null) { + HashMapTextMap contextTextMap = new HashMapTextMap(contextAsMap); + setCurrentOpenTracingSpanContext( + currentTracer.extract(Format.Builtin.TEXT_MAP, contextTextMap)); + } + } + + @Override + public void setUp() { + Tracer openTracingTracer = GlobalTracer.get(); + Tracer.SpanBuilder builder = + openTracingTracer + .buildSpan("cadence.workflow") + .withTag("resource.name", MDC.get(LoggerTag.WORKFLOW_TYPE)); + + if (getCurrentOpenTracingSpanContext() != null) { + builder.asChildOf(getCurrentOpenTracingSpanContext()); + } + + Span span = builder.start(); + openTracingTracer.activateSpan(span); + currentOpenTracingSpan.set(span); + Scope scope = openTracingTracer.activateSpan(span); + currentOpenTracingScope.set(scope); + } + + @Override + public void onError(Throwable t) { + Span span = currentOpenTracingSpan.get(); + if (span != null) { + Tags.ERROR.set(span, true); + Map errorData = new HashMap<>(); + errorData.put(Fields.EVENT, "error"); + if (t != null) { + errorData.put(Fields.ERROR_OBJECT, t); + errorData.put(Fields.MESSAGE, t.getMessage()); + } + span.log(errorData); + } + } + + @Override + public void finish(boolean successful) { + Scope currentScope = currentOpenTracingScope.get(); + Span currentSpan = currentOpenTracingSpan.get(); + + if (currentScope != null) { + currentScope.close(); + } + + if (currentSpan != null) { + currentSpan.finish(); + } + + currentOpenTracingScope.remove(); + currentOpenTracingSpan.remove(); + currentOpenTracingSpanContext.remove(); + } + + /** Just check for other instances of the same class */ + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + + if (this == obj) { + return true; + } + + if (this.getClass().equals(obj.getClass())) { + return true; + } + + return false; + } + + @Override + public int hashCode() { + return this.getClass().hashCode(); + } + + private class HashMapTextMap implements TextMap { + + private final HashMap backingMap = new HashMap<>(); + + public HashMapTextMap() { + // Noop + } + + public HashMapTextMap(Map spanData) { + backingMap.putAll(spanData); + } + + @Override + public Iterator> iterator() { + return backingMap.entrySet().iterator(); + } + + @Override + public void put(String key, String value) { + backingMap.put(key, value); + } + + public HashMap getBackingMap() { + return backingMap; + } + } +} diff --git a/src/main/java/com/uber/cadence/internal/context/ContextThreadLocal.java b/src/main/java/com/uber/cadence/internal/context/ContextThreadLocal.java index 227124170..8bbf7729e 100644 --- a/src/main/java/com/uber/cadence/internal/context/ContextThreadLocal.java +++ b/src/main/java/com/uber/cadence/internal/context/ContextThreadLocal.java @@ -24,10 +24,14 @@ import java.util.List; import java.util.Map; import java.util.function.Supplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** This class holds the current set of context propagators */ public class ContextThreadLocal { + private static final Logger log = LoggerFactory.getLogger(ContextThreadLocal.class); + private static WorkflowThreadLocal> contextPropagators = WorkflowThreadLocal.withInitial( new Supplier>() { @@ -57,6 +61,11 @@ public static Map getCurrentContextForPropagation() { return contextData; } + /** + * Injects the context data into the thread for each configured context propagator + * + * @param contextData The context data received from the server + */ public static void propagateContextToCurrentThread(Map contextData) { if (contextData == null || contextData.isEmpty()) { return; @@ -67,4 +76,49 @@ public static void propagateContextToCurrentThread(Map contextDa } } } + + /** Calls {@link ContextPropagator#setUp()} for each propagator */ + public static void setUpContextPropagators() { + for (ContextPropagator propagator : contextPropagators.get()) { + try { + propagator.setUp(); + } catch (Throwable t) { + // Don't let an error in one propagator block the others + log.error("Error calling setUp() on a contextpropagator", t); + } + } + } + + /** + * Calls {@link ContextPropagator#onError(Throwable)} for each propagator + * + * @param t The Throwable that caused the workflow/activity to finish + */ + public static void onErrorContextPropagators(Throwable t) { + for (ContextPropagator propagator : contextPropagators.get()) { + try { + propagator.onError(t); + } catch (Throwable t1) { + // Don't let an error in one propagator block the others + log.error("Error calling onError() on a contextpropagator", t1); + } + } + } + + /** + * Calls {@link ContextPropagator#finish(boolean)} for each propagator + * + * @param successful True if the workflow/activity completed without unhandled exception, false + * otherwise + */ + public static void finishContextPropagators(boolean successful) { + for (ContextPropagator propagator : contextPropagators.get()) { + try { + propagator.finish(successful); + } catch (Throwable t) { + // Don't let an error in one propagator block the others + log.error("Error calling finish() on a contextpropagator", t); + } + } + } } diff --git a/src/main/java/com/uber/cadence/internal/replay/WorkflowContext.java b/src/main/java/com/uber/cadence/internal/replay/WorkflowContext.java index 5f1d8f16f..729255727 100644 --- a/src/main/java/com/uber/cadence/internal/replay/WorkflowContext.java +++ b/src/main/java/com/uber/cadence/internal/replay/WorkflowContext.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; final class WorkflowContext { @@ -166,7 +167,18 @@ Map getPropagatedContexts() { Map contextData = new HashMap<>(); for (ContextPropagator propagator : contextPropagators) { - contextData.put(propagator.getName(), propagator.deserializeContext(headerData)); + // Only send the context propagator the fields that belong to them + // Change the map from MyPropagator:foo -> bar to foo -> bar + Map filteredData = + headerData + .entrySet() + .stream() + .filter(e -> e.getKey().startsWith(propagator.getName())) + .collect( + Collectors.toMap( + e -> e.getKey().substring(propagator.getName().length() + 1), + Map.Entry::getValue)); + contextData.put(propagator.getName(), propagator.deserializeContext(filteredData)); } return contextData; diff --git a/src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java b/src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java index 23955ce69..0e332ba7a 100644 --- a/src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java +++ b/src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java @@ -72,6 +72,7 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -449,7 +450,18 @@ private Map extractContextsAndConvertToBytes( } Map result = new HashMap<>(); for (ContextPropagator propagator : contextPropagators) { - result.putAll(propagator.serializeContext(propagator.getCurrentContext())); + // Get the serialized context from the propagator + Map serializedContext = + propagator.serializeContext(propagator.getCurrentContext()); + // Namespace each entry in case of overlaps, so foo -> bar becomes MyPropagator:foo -> bar + Map namespacedSerializedContext = + serializedContext + .entrySet() + .stream() + .collect( + Collectors.toMap( + e -> propagator.getName() + ":" + e.getKey(), Map.Entry::getValue)); + result.putAll(namespacedSerializedContext); } return result; } diff --git a/src/main/java/com/uber/cadence/internal/sync/WorkflowStubImpl.java b/src/main/java/com/uber/cadence/internal/sync/WorkflowStubImpl.java index a967a5e97..8a85a857e 100644 --- a/src/main/java/com/uber/cadence/internal/sync/WorkflowStubImpl.java +++ b/src/main/java/com/uber/cadence/internal/sync/WorkflowStubImpl.java @@ -58,6 +58,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; class WorkflowStubImpl implements WorkflowStub { @@ -206,7 +207,18 @@ private Map extractContextsAndConvertToBytes( } Map result = new HashMap<>(); for (ContextPropagator propagator : contextPropagators) { - result.putAll(propagator.serializeContext(propagator.getCurrentContext())); + // Get the serialized context from the propagator + Map serializedContext = + propagator.serializeContext(propagator.getCurrentContext()); + // Namespace each entry in case of overlaps, so foo -> bar becomes MyPropagator:foo -> bar + Map namespacedSerializedContext = + serializedContext + .entrySet() + .stream() + .collect( + Collectors.toMap( + k -> propagator.getName() + ":" + k.getKey(), Map.Entry::getValue)); + result.putAll(namespacedSerializedContext); } return result; } diff --git a/src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java b/src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java index 9b7f5891f..295610dcf 100644 --- a/src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java +++ b/src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java @@ -91,6 +91,7 @@ public void run() { // Repopulate the context(s) ContextThreadLocal.setContextPropagators(this.contextPropagators); ContextThreadLocal.propagateContextToCurrentThread(this.propagatedContexts); + ContextThreadLocal.setUpContextPropagators(); try { // initialYield blocks thread until the first runUntilBlocked is called. @@ -99,6 +100,7 @@ public void run() { cancellationScope.run(); } catch (DestroyWorkflowThreadError e) { if (!threadContext.isDestroyRequested()) { + ContextThreadLocal.onErrorContextPropagators(e); threadContext.setUnhandledException(e); } } catch (Error e) { @@ -111,9 +113,11 @@ public void run() { log.error( String.format("Workflow thread \"%s\" run failed with Error:\n%s", name, stackTrace)); } + ContextThreadLocal.onErrorContextPropagators(e); threadContext.setUnhandledException(e); } catch (CancellationException e) { if (!isCancelRequested()) { + ContextThreadLocal.onErrorContextPropagators(e); threadContext.setUnhandledException(e); } if (log.isDebugEnabled()) { @@ -130,8 +134,10 @@ public void run() { "Workflow thread \"%s\" run failed with unhandled exception:\n%s", name, stackTrace)); } + ContextThreadLocal.onErrorContextPropagators(e); threadContext.setUnhandledException(e); } finally { + ContextThreadLocal.finishContextPropagators(threadContext.getUnhandledException() == null); DeterministicRunnerImpl.setCurrentThreadInternal(null); threadContext.setStatus(Status.DONE); thread.setName(originalName); diff --git a/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java b/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java index a0a86014a..6941f1cc4 100644 --- a/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java +++ b/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java @@ -24,6 +24,7 @@ import com.uber.cadence.RespondActivityTaskFailedRequest; import com.uber.cadence.WorkflowExecution; import com.uber.cadence.context.ContextPropagator; +import com.uber.cadence.context.OpenTracingContextPropagator; import com.uber.cadence.internal.common.RpcRetryer; import com.uber.cadence.internal.logging.LoggerTag; import com.uber.cadence.internal.metrics.MetricsTag; @@ -34,12 +35,18 @@ import com.uber.m3.tally.Stopwatch; import com.uber.m3.util.Duration; import com.uber.m3.util.ImmutableMap; +import io.opentracing.Span; +import io.opentracing.Tracer; +import io.opentracing.log.Fields; +import io.opentracing.tag.Tags; +import io.opentracing.util.GlobalTracer; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.concurrent.CancellationException; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.apache.thrift.TException; import org.slf4j.MDC; @@ -126,7 +133,18 @@ public void handle(PollForActivityTaskResponse task) throws Exception { propagateContext(task); - try { + // Set up an opentracing span + Tracer openTracingTracer = GlobalTracer.get(); + Tracer.SpanBuilder builder = + openTracingTracer + .buildSpan("cadence.activity") + .withTag("resource.name", task.getActivityType().getName()); + if (OpenTracingContextPropagator.getCurrentOpenTracingSpanContext() != null) { + builder.asChildOf(OpenTracingContextPropagator.getCurrentOpenTracingSpanContext()); + } + Span span = builder.start(); + + try (io.opentracing.Scope scope = openTracingTracer.activateSpan(span)) { Stopwatch sw = metricsScope.timer(MetricsType.ACTIVITY_EXEC_LATENCY).start(); ActivityTaskHandler.Result response = handler.handle(task, metricsScope, false); sw.stop(); @@ -148,11 +166,22 @@ public void handle(PollForActivityTaskResponse task) throws Exception { Stopwatch sw = metricsScope.timer(MetricsType.ACTIVITY_RESP_LATENCY).start(); sendReply(task, new Result(null, null, cancelledRequest), metricsScope); sw.stop(); + } catch (Exception e) { + Tags.ERROR.set(span, true); + Map errorData = new HashMap<>(); + errorData.put(Fields.EVENT, "error"); + if (e != null) { + errorData.put(Fields.ERROR_OBJECT, e); + errorData.put(Fields.MESSAGE, e.getMessage()); + } + span.log(errorData); + throw e; } finally { MDC.remove(LoggerTag.ACTIVITY_ID); MDC.remove(LoggerTag.ACTIVITY_TYPE); MDC.remove(LoggerTag.WORKFLOW_ID); MDC.remove(LoggerTag.RUN_ID); + span.finish(); } } @@ -175,7 +204,18 @@ void propagateContext(PollForActivityTaskResponse response) { }); for (ContextPropagator propagator : options.getContextPropagators()) { - propagator.setCurrentContext(propagator.deserializeContext(headerData)); + // Only send the context propagator the fields that belong to them + // Change the map from MyPropagator:foo -> bar to foo -> bar + Map filteredData = + headerData + .entrySet() + .stream() + .filter(e -> e.getKey().startsWith(propagator.getName())) + .collect( + Collectors.toMap( + e -> e.getKey().substring(propagator.getName().length() + 1), + Map.Entry::getValue)); + propagator.setCurrentContext(propagator.deserializeContext(filteredData)); } } diff --git a/src/main/java/com/uber/cadence/internal/worker/LocalActivityWorker.java b/src/main/java/com/uber/cadence/internal/worker/LocalActivityWorker.java index c08870adf..08e765aad 100644 --- a/src/main/java/com/uber/cadence/internal/worker/LocalActivityWorker.java +++ b/src/main/java/com/uber/cadence/internal/worker/LocalActivityWorker.java @@ -22,6 +22,7 @@ import com.uber.cadence.MarkerRecordedEventAttributes; import com.uber.cadence.PollForActivityTaskResponse; import com.uber.cadence.common.RetryOptions; +import com.uber.cadence.context.ContextPropagator; import com.uber.cadence.internal.common.LocalActivityMarkerData; import com.uber.cadence.internal.metrics.MetricsTag; import com.uber.cadence.internal.metrics.MetricsType; @@ -37,6 +38,7 @@ import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.LongSupplier; +import java.util.stream.Collectors; public final class LocalActivityWorker extends SuspendableWorkerBase { @@ -225,9 +227,19 @@ private void propagateContext(ExecuteLocalActivityParameters params) { } private void restoreContext(Map context) { - options - .getContextPropagators() - .forEach( - propagator -> propagator.setCurrentContext(propagator.deserializeContext(context))); + for (ContextPropagator propagator : options.getContextPropagators()) { + // Only send the context propagator the fields that belong to them + // Change the map from MyPropagator:foo -> bar to foo -> bar + Map filteredData = + context + .entrySet() + .stream() + .filter(e -> e.getKey().startsWith(propagator.getName())) + .collect( + Collectors.toMap( + e -> e.getKey().substring(propagator.getName().length() + 1), + Map.Entry::getValue)); + propagator.setCurrentContext(propagator.deserializeContext(filteredData)); + } } } diff --git a/src/test/java/com/uber/cadence/context/ContextTests.java b/src/test/java/com/uber/cadence/context/ContextTests.java new file mode 100644 index 000000000..320ddcb05 --- /dev/null +++ b/src/test/java/com/uber/cadence/context/ContextTests.java @@ -0,0 +1,442 @@ +/* + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.uber.cadence.context; + +import static org.junit.Assert.*; + +import com.uber.cadence.activity.ActivityMethod; +import com.uber.cadence.activity.ActivityOptions; +import com.uber.cadence.client.WorkflowClient; +import com.uber.cadence.client.WorkflowClientOptions; +import com.uber.cadence.client.WorkflowException; +import com.uber.cadence.client.WorkflowOptions; +import com.uber.cadence.internal.testing.WorkflowTestingTest.ChildWorkflow; +import com.uber.cadence.testing.TestEnvironmentOptions; +import com.uber.cadence.testing.TestWorkflowEnvironment; +import com.uber.cadence.worker.Worker; +import com.uber.cadence.workflow.Async; +import com.uber.cadence.workflow.ChildWorkflowOptions; +import com.uber.cadence.workflow.Promise; +import com.uber.cadence.workflow.SignalMethod; +import com.uber.cadence.workflow.Workflow; +import com.uber.cadence.workflow.WorkflowMethod; +import io.opentracing.Scope; +import io.opentracing.Span; +import io.opentracing.Tracer; +import io.opentracing.mock.MockSpan; +import io.opentracing.mock.MockTracer; +import io.opentracing.util.GlobalTracer; +import io.opentracing.util.ThreadLocalScopeManager; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.rules.Timeout; +import org.junit.runner.Description; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; + +public class ContextTests { + private static final Logger log = LoggerFactory.getLogger(ContextTests.class); + + @Rule public Timeout globalTimeout = Timeout.seconds(5000); + + @Rule + public TestWatcher watchman = + new TestWatcher() { + @Override + protected void failed(Throwable e, Description description) { + System.err.println(testEnvironment.getDiagnostics()); + } + }; + + private static final String TASK_LIST = "test-workflow"; + + private TestWorkflowEnvironment testEnvironment; + private MockTracer mockTracer = + new MockTracer(new ThreadLocalScopeManager(), MockTracer.Propagator.TEXT_MAP); + + @Before + public void setUp() { + TestEnvironmentOptions options = + new TestEnvironmentOptions.Builder() + .setWorkflowClientOptions( + WorkflowClientOptions.newBuilder() + .setContextPropagators( + Arrays.asList( + new TestContextPropagator(), new OpenTracingContextPropagator())) + .build()) + .build(); + + testEnvironment = TestWorkflowEnvironment.newInstance(options); + GlobalTracer.registerIfAbsent(mockTracer); + } + + @After + public void tearDown() { + testEnvironment.close(); + mockTracer.reset(); + } + + public interface TestWorkflow { + + @WorkflowMethod(executionStartToCloseTimeoutSeconds = 3600 * 24, taskList = TASK_LIST) + String workflow1(String input); + } + + public interface ParentWorkflow { + + @WorkflowMethod(executionStartToCloseTimeoutSeconds = 3600 * 24, taskList = TASK_LIST) + String workflow(String input); + + @SignalMethod + void signal(String value); + } + + public interface TestActivity { + + @ActivityMethod(scheduleToCloseTimeoutSeconds = 3600) + String activity1(String input); + } + + public static class TestContextPropagator implements ContextPropagator { + + @Override + public String getName() { + return "TestContextPropagator::withSomeColons"; + } + + @Override + public Map serializeContext(Object context) { + String testKey = (String) context; + if (testKey != null) { + return Collections.singletonMap("test", testKey.getBytes(StandardCharsets.UTF_8)); + } else { + return Collections.emptyMap(); + } + } + + @Override + public Object deserializeContext(Map context) { + if (context.containsKey("test")) { + return new String(context.get("test"), StandardCharsets.UTF_8); + } else { + return null; + } + } + + @Override + public Object getCurrentContext() { + return MDC.get("test"); + } + + @Override + public void setCurrentContext(Object context) { + MDC.put("test", String.valueOf(context)); + } + } + + public static class ContextPropagationWorkflowImpl implements TestWorkflow { + + @Override + public String workflow1(String input) { + // The test value should be in the MDC + return MDC.get("test"); + } + } + + @Test() + public void testWorkflowContextPropagation() { + Worker worker = testEnvironment.newWorker(TASK_LIST); + worker.registerWorkflowImplementationTypes(ContextPropagationWorkflowImpl.class); + testEnvironment.start(); + MDC.put("test", "testing123"); + WorkflowClient client = testEnvironment.newWorkflowClient(); + WorkflowOptions options = + new WorkflowOptions.Builder() + .setContextPropagators(Collections.singletonList(new TestContextPropagator())) + .build(); + TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options); + String result = workflow.workflow1("input1"); + assertEquals("testing123", result); + } + + public static class ContextPropagationParentWorkflowImpl implements ParentWorkflow { + + @Override + public String workflow(String input) { + // Get the MDC value + String mdcValue = MDC.get("test"); + + // Fire up a child workflow + ChildWorkflowOptions options = + new ChildWorkflowOptions.Builder() + .setContextPropagators(Collections.singletonList(new TestContextPropagator())) + .build(); + ChildWorkflow child = Workflow.newChildWorkflowStub(ChildWorkflow.class, options); + + String result = child.workflow(mdcValue, Workflow.getWorkflowInfo().getWorkflowId()); + return result; + } + + @Override + public void signal(String value) {} + } + + public static class ContextPropagationChildWorkflowImpl implements ChildWorkflow { + + @Override + public String workflow(String input, String parentId) { + String mdcValue = MDC.get("test"); + return input + mdcValue; + } + } + + @Test + public void testChildWorkflowContextPropagation() { + Worker worker = testEnvironment.newWorker(TASK_LIST); + worker.registerWorkflowImplementationTypes( + ContextPropagationParentWorkflowImpl.class, ContextPropagationChildWorkflowImpl.class); + testEnvironment.start(); + MDC.put("test", "testing123"); + WorkflowClient client = testEnvironment.newWorkflowClient(); + WorkflowOptions options = + new WorkflowOptions.Builder() + .setContextPropagators(Collections.singletonList(new TestContextPropagator())) + .build(); + ParentWorkflow workflow = client.newWorkflowStub(ParentWorkflow.class, options); + String result = workflow.workflow("input1"); + assertEquals("testing123testing123", result); + } + + public static class ContextPropagationThreadWorkflowImpl implements TestWorkflow { + + @Override + public String workflow1(String input) { + Promise asyncPromise = Async.function(this::async); + return asyncPromise.get(); + } + + private String async() { + return "async" + MDC.get("test"); + } + } + + @Test + public void testThreadContextPropagation() { + Worker worker = testEnvironment.newWorker(TASK_LIST); + worker.registerWorkflowImplementationTypes(ContextPropagationThreadWorkflowImpl.class); + testEnvironment.start(); + MDC.put("test", "testing123"); + WorkflowClient client = testEnvironment.newWorkflowClient(); + WorkflowOptions options = + new WorkflowOptions.Builder() + .setContextPropagators(Collections.singletonList(new TestContextPropagator())) + .build(); + TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options); + String result = workflow.workflow1("input1"); + assertEquals("asynctesting123", result); + } + + public static class ContextActivityImpl implements TestActivity { + @Override + public String activity1(String input) { + return "activity" + MDC.get("test"); + } + } + + public static class ContextPropagationActivityWorkflowImpl implements TestWorkflow { + @Override + public String workflow1(String input) { + ActivityOptions options = + new ActivityOptions.Builder() + .setScheduleToCloseTimeout(Duration.ofSeconds(5)) + .setContextPropagators(Collections.singletonList(new TestContextPropagator())) + .build(); + TestActivity activity = Workflow.newActivityStub(TestActivity.class, options); + return activity.activity1("foo"); + } + } + + @Test + public void testActivityContextPropagation() { + Worker worker = testEnvironment.newWorker(TASK_LIST); + worker.registerWorkflowImplementationTypes(ContextPropagationActivityWorkflowImpl.class); + worker.registerActivitiesImplementations(new ContextActivityImpl()); + testEnvironment.start(); + MDC.put("test", "testing123"); + WorkflowClient client = testEnvironment.newWorkflowClient(); + WorkflowOptions options = + new WorkflowOptions.Builder() + .setContextPropagators(Collections.singletonList(new TestContextPropagator())) + .build(); + TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options); + String result = workflow.workflow1("input1"); + assertEquals("activitytesting123", result); + } + + public static class DefaultContextPropagationActivityWorkflowImpl implements TestWorkflow { + @Override + public String workflow1(String input) { + ActivityOptions options = + new ActivityOptions.Builder().setScheduleToCloseTimeout(Duration.ofSeconds(5)).build(); + TestActivity activity = Workflow.newActivityStub(TestActivity.class, options); + return activity.activity1("foo"); + } + } + + @Test + public void testDefaultActivityContextPropagation() { + Worker worker = testEnvironment.newWorker(TASK_LIST); + worker.registerWorkflowImplementationTypes(DefaultContextPropagationActivityWorkflowImpl.class); + worker.registerActivitiesImplementations(new ContextActivityImpl()); + testEnvironment.start(); + MDC.put("test", "testing123"); + WorkflowClient client = testEnvironment.newWorkflowClient(); + WorkflowOptions options = + new WorkflowOptions.Builder() + .setContextPropagators(Collections.singletonList(new TestContextPropagator())) + .build(); + TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options); + String result = workflow.workflow1("input1"); + assertEquals("activitytesting123", result); + } + + public static class DefaultContextPropagationParentWorkflowImpl implements ParentWorkflow { + + @Override + public String workflow(String input) { + // Get the MDC value + String mdcValue = MDC.get("test"); + + // Fire up a child workflow + ChildWorkflowOptions options = new ChildWorkflowOptions.Builder().build(); + ChildWorkflow child = Workflow.newChildWorkflowStub(ChildWorkflow.class, options); + + String result = child.workflow(mdcValue, Workflow.getWorkflowInfo().getWorkflowId()); + return result; + } + + @Override + public void signal(String value) {} + } + + @Test + public void testDefaultChildWorkflowContextPropagation() { + Worker worker = testEnvironment.newWorker(TASK_LIST); + worker.registerWorkflowImplementationTypes( + DefaultContextPropagationParentWorkflowImpl.class, + ContextPropagationChildWorkflowImpl.class); + testEnvironment.start(); + MDC.put("test", "testing123"); + WorkflowClient client = testEnvironment.newWorkflowClient(); + WorkflowOptions options = + new WorkflowOptions.Builder() + .setContextPropagators(Collections.singletonList(new TestContextPropagator())) + .build(); + ParentWorkflow workflow = client.newWorkflowStub(ParentWorkflow.class, options); + String result = workflow.workflow("input1"); + assertEquals("testing123testing123", result); + } + + public static class OpenTracingContextPropagationWorkflowImpl implements TestWorkflow { + @Override + public String workflow1(String input) { + Tracer tracer = GlobalTracer.get(); + Span activeSpan = tracer.scopeManager().activeSpan(); + MockSpan mockSpan = (MockSpan) activeSpan; + assertNotNull(activeSpan); + assertEquals("TestWorkflow::workflow1", mockSpan.tags().get("resource.name")); + assertNotEquals(0, mockSpan.parentId()); + if ("fail".equals(input)) { + throw new IllegalArgumentException(); + } else { + return activeSpan.getBaggageItem("foo"); + } + } + } + + @Test + public void testOpenTracingContextPropagation() { + Tracer tracer = GlobalTracer.get(); + Span span = tracer.buildSpan("testContextPropagationSuccess").start(); + + Worker worker = testEnvironment.newWorker(TASK_LIST); + worker.registerWorkflowImplementationTypes(OpenTracingContextPropagationWorkflowImpl.class); + testEnvironment.start(); + WorkflowClient client = testEnvironment.newWorkflowClient(); + WorkflowOptions options = + new WorkflowOptions.Builder() + .setContextPropagators( + Arrays.asList(new TestContextPropagator(), new OpenTracingContextPropagator())) + .build(); + + try (Scope scope = tracer.scopeManager().activate(span)) { + + Span activeSpan = tracer.scopeManager().activeSpan(); + activeSpan.setBaggageItem("foo", "bar"); + + TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options); + assertEquals("bar", workflow.workflow1("input1")); + + } finally { + span.finish(); + } + } + + @Test + public void testOpenTracingContextPropagationWithFailure() { + Tracer tracer = GlobalTracer.get(); + Span span = tracer.buildSpan("testContextPropagationFailure").start(); + + Worker worker = testEnvironment.newWorker(TASK_LIST); + worker.registerWorkflowImplementationTypes(OpenTracingContextPropagationWorkflowImpl.class); + testEnvironment.start(); + WorkflowClient client = testEnvironment.newWorkflowClient(); + WorkflowOptions options = + new WorkflowOptions.Builder() + .setContextPropagators( + Arrays.asList(new TestContextPropagator(), new OpenTracingContextPropagator())) + .build(); + + try (Scope scope = tracer.scopeManager().activate(span)) { + + Span activeSpan = tracer.scopeManager().activeSpan(); + activeSpan.setBaggageItem("foo", "bar"); + + TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options); + try { + workflow.workflow1("fail"); + fail("Unreachable"); + } catch (WorkflowException e) { + // Expected + assertEquals(IllegalArgumentException.class, e.getCause().getClass()); + } + + } finally { + span.finish(); + } + } +} diff --git a/src/test/java/com/uber/cadence/internal/testing/WorkflowTestingTest.java b/src/test/java/com/uber/cadence/internal/testing/WorkflowTestingTest.java index e22de1bdd..41589711d 100644 --- a/src/test/java/com/uber/cadence/internal/testing/WorkflowTestingTest.java +++ b/src/test/java/com/uber/cadence/internal/testing/WorkflowTestingTest.java @@ -17,9 +17,7 @@ package com.uber.cadence.internal.testing; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -38,8 +36,12 @@ import com.uber.cadence.activity.Activity; import com.uber.cadence.activity.ActivityMethod; import com.uber.cadence.activity.ActivityOptions; -import com.uber.cadence.client.*; -import com.uber.cadence.context.ContextPropagator; +import com.uber.cadence.client.WorkflowClient; +import com.uber.cadence.client.WorkflowClientOptions; +import com.uber.cadence.client.WorkflowException; +import com.uber.cadence.client.WorkflowOptions; +import com.uber.cadence.client.WorkflowStub; +import com.uber.cadence.client.WorkflowTimedOutException; import com.uber.cadence.internal.common.WorkflowExecutionUtils; import com.uber.cadence.testing.SimulatedTimeoutException; import com.uber.cadence.testing.TestEnvironmentOptions; @@ -47,17 +49,13 @@ import com.uber.cadence.worker.Worker; import com.uber.cadence.workflow.ActivityTimeoutException; import com.uber.cadence.workflow.Async; -import com.uber.cadence.workflow.ChildWorkflowOptions; import com.uber.cadence.workflow.ChildWorkflowTimedOutException; import com.uber.cadence.workflow.Promise; import com.uber.cadence.workflow.SignalMethod; import com.uber.cadence.workflow.Workflow; import com.uber.cadence.workflow.WorkflowMethod; -import java.nio.charset.StandardCharsets; import java.time.Duration; -import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.UUID; import java.util.concurrent.CancellationException; @@ -73,7 +71,6 @@ import org.junit.runner.Description; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.slf4j.MDC; public class WorkflowTestingTest { private static final Logger log = LoggerFactory.getLogger(WorkflowTestingTest.class); @@ -97,10 +94,7 @@ protected void failed(Throwable e, Description description) { public void setUp() { TestEnvironmentOptions options = new TestEnvironmentOptions.Builder() - .setWorkflowClientOptions( - WorkflowClientOptions.newBuilder() - .setContextPropagators(Collections.singletonList(new TestContextPropagator())) - .build()) + .setWorkflowClientOptions(WorkflowClientOptions.newBuilder().build()) .build(); testEnvironment = TestWorkflowEnvironment.newInstance(options); } @@ -739,244 +733,4 @@ public void testMockedChildSimulatedTimeout() { assertTrue(e.getCause() instanceof ChildWorkflowTimedOutException); } } - - public static class TestContextPropagator implements ContextPropagator { - - @Override - public String getName() { - return this.getClass().getName(); - } - - @Override - public Map serializeContext(Object context) { - String testKey = (String) context; - if (testKey != null) { - return Collections.singletonMap("test", testKey.getBytes(StandardCharsets.UTF_8)); - } else { - return Collections.emptyMap(); - } - } - - @Override - public Object deserializeContext(Map context) { - if (context.containsKey("test")) { - return new String(context.get("test"), StandardCharsets.UTF_8); - } else { - return null; - } - } - - @Override - public Object getCurrentContext() { - return MDC.get("test"); - } - - @Override - public void setCurrentContext(Object context) { - MDC.put("test", String.valueOf(context)); - } - } - - public static class ContextPropagationWorkflowImpl implements TestWorkflow { - - @Override - public String workflow1(String input) { - // The test value should be in the MDC - return MDC.get("test"); - } - } - - @Test - public void testWorkflowContextPropagation() { - Worker worker = testEnvironment.newWorker(TASK_LIST); - worker.registerWorkflowImplementationTypes(ContextPropagationWorkflowImpl.class); - testEnvironment.start(); - MDC.put("test", "testing123"); - WorkflowClient client = testEnvironment.newWorkflowClient(); - WorkflowOptions options = - new WorkflowOptions.Builder() - .setContextPropagators(Collections.singletonList(new TestContextPropagator())) - .build(); - TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options); - String result = workflow.workflow1("input1"); - assertEquals("testing123", result); - } - - public static class ContextPropagationParentWorkflowImpl implements ParentWorkflow { - - @Override - public String workflow(String input) { - // Get the MDC value - String mdcValue = MDC.get("test"); - - // Fire up a child workflow - ChildWorkflowOptions options = - new ChildWorkflowOptions.Builder() - .setContextPropagators(Collections.singletonList(new TestContextPropagator())) - .build(); - ChildWorkflow child = Workflow.newChildWorkflowStub(ChildWorkflow.class, options); - - String result = child.workflow(mdcValue, Workflow.getWorkflowInfo().getWorkflowId()); - return result; - } - - @Override - public void signal(String value) {} - } - - public static class ContextPropagationChildWorkflowImpl implements ChildWorkflow { - - @Override - public String workflow(String input, String parentId) { - String mdcValue = MDC.get("test"); - return input + mdcValue; - } - } - - @Test - public void testChildWorkflowContextPropagation() { - Worker worker = testEnvironment.newWorker(TASK_LIST); - worker.registerWorkflowImplementationTypes( - ContextPropagationParentWorkflowImpl.class, ContextPropagationChildWorkflowImpl.class); - testEnvironment.start(); - MDC.put("test", "testing123"); - WorkflowClient client = testEnvironment.newWorkflowClient(); - WorkflowOptions options = - new WorkflowOptions.Builder() - .setContextPropagators(Collections.singletonList(new TestContextPropagator())) - .build(); - ParentWorkflow workflow = client.newWorkflowStub(ParentWorkflow.class, options); - String result = workflow.workflow("input1"); - assertEquals("testing123testing123", result); - } - - public static class ContextPropagationThreadWorkflowImpl implements TestWorkflow { - - @Override - public String workflow1(String input) { - Promise asyncPromise = Async.function(this::async); - return asyncPromise.get(); - } - - private String async() { - return "async" + MDC.get("test"); - } - } - - @Test - public void testThreadContextPropagation() { - Worker worker = testEnvironment.newWorker(TASK_LIST); - worker.registerWorkflowImplementationTypes(ContextPropagationThreadWorkflowImpl.class); - testEnvironment.start(); - MDC.put("test", "testing123"); - WorkflowClient client = testEnvironment.newWorkflowClient(); - WorkflowOptions options = - new WorkflowOptions.Builder() - .setContextPropagators(Collections.singletonList(new TestContextPropagator())) - .build(); - TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options); - String result = workflow.workflow1("input1"); - assertEquals("asynctesting123", result); - } - - public static class ContextActivityImpl implements TestActivity { - @Override - public String activity1(String input) { - return "activity" + MDC.get("test"); - } - } - - public static class ContextPropagationActivityWorkflowImpl implements TestWorkflow { - @Override - public String workflow1(String input) { - ActivityOptions options = - new ActivityOptions.Builder() - .setScheduleToCloseTimeout(Duration.ofSeconds(5)) - .setContextPropagators(Collections.singletonList(new TestContextPropagator())) - .build(); - TestActivity activity = Workflow.newActivityStub(TestActivity.class, options); - return activity.activity1("foo"); - } - } - - @Test - public void testActivityContextPropagation() { - Worker worker = testEnvironment.newWorker(TASK_LIST); - worker.registerWorkflowImplementationTypes(ContextPropagationActivityWorkflowImpl.class); - worker.registerActivitiesImplementations(new ContextActivityImpl()); - testEnvironment.start(); - MDC.put("test", "testing123"); - WorkflowClient client = testEnvironment.newWorkflowClient(); - WorkflowOptions options = - new WorkflowOptions.Builder() - .setContextPropagators(Collections.singletonList(new TestContextPropagator())) - .build(); - TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options); - String result = workflow.workflow1("input1"); - assertEquals("activitytesting123", result); - } - - public static class DefaultContextPropagationActivityWorkflowImpl implements TestWorkflow { - @Override - public String workflow1(String input) { - ActivityOptions options = - new ActivityOptions.Builder().setScheduleToCloseTimeout(Duration.ofSeconds(5)).build(); - TestActivity activity = Workflow.newActivityStub(TestActivity.class, options); - return activity.activity1("foo"); - } - } - - @Test - public void testDefaultActivityContextPropagation() { - Worker worker = testEnvironment.newWorker(TASK_LIST); - worker.registerWorkflowImplementationTypes(DefaultContextPropagationActivityWorkflowImpl.class); - worker.registerActivitiesImplementations(new ContextActivityImpl()); - testEnvironment.start(); - MDC.put("test", "testing123"); - WorkflowClient client = testEnvironment.newWorkflowClient(); - WorkflowOptions options = - new WorkflowOptions.Builder() - .setContextPropagators(Collections.singletonList(new TestContextPropagator())) - .build(); - TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options); - String result = workflow.workflow1("input1"); - assertEquals("activitytesting123", result); - } - - public static class DefaultContextPropagationParentWorkflowImpl implements ParentWorkflow { - - @Override - public String workflow(String input) { - // Get the MDC value - String mdcValue = MDC.get("test"); - - // Fire up a child workflow - ChildWorkflowOptions options = new ChildWorkflowOptions.Builder().build(); - ChildWorkflow child = Workflow.newChildWorkflowStub(ChildWorkflow.class, options); - - String result = child.workflow(mdcValue, Workflow.getWorkflowInfo().getWorkflowId()); - return result; - } - - @Override - public void signal(String value) {} - } - - @Test - public void testDefaultChildWorkflowContextPropagation() { - Worker worker = testEnvironment.newWorker(TASK_LIST); - worker.registerWorkflowImplementationTypes( - DefaultContextPropagationParentWorkflowImpl.class, - ContextPropagationChildWorkflowImpl.class); - testEnvironment.start(); - MDC.put("test", "testing123"); - WorkflowClient client = testEnvironment.newWorkflowClient(); - WorkflowOptions options = - new WorkflowOptions.Builder() - .setContextPropagators(Collections.singletonList(new TestContextPropagator())) - .build(); - ParentWorkflow workflow = client.newWorkflowStub(ParentWorkflow.class, options); - String result = workflow.workflow("input1"); - assertEquals("testing123testing123", result); - } } diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml index b87fe0374..27d076198 100644 --- a/src/test/resources/logback-test.xml +++ b/src/test/resources/logback-test.xml @@ -25,6 +25,7 @@ +