diff --git a/build.gradle b/build.gradle
index d41bd07cf..ce3c655b3 100644
--- a/build.gradle
+++ b/build.gradle
@@ -58,10 +58,13 @@ dependencies {
compile group: 'com.google.guava', name: 'guava', version: '28.1-jre'
compile group: 'com.cronutils', name: 'cron-utils', version: '9.0.0'
compile group: 'io.micrometer', name: 'micrometer-core', version: '1.1.2'
+ compile group: 'io.opentracing', name: 'opentracing-api', version: '0.33.0'
+ compile group: 'io.opentracing', name: 'opentracing-util', version: '0.33.0'
testCompile group: 'junit', name: 'junit', version: '4.12'
testCompile group: 'com.googlecode.junit-toolbox', name: 'junit-toolbox', version: '2.4'
testCompile group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.3'
+ testCompile group: 'io.opentracing', name: 'opentracing-mock', version: '0.33.0'
}
license {
diff --git a/src/main/java/com/uber/cadence/context/ContextPropagator.java b/src/main/java/com/uber/cadence/context/ContextPropagator.java
index 3a618f96c..3bfca5b71 100644
--- a/src/main/java/com/uber/cadence/context/ContextPropagator.java
+++ b/src/main/java/com/uber/cadence/context/ContextPropagator.java
@@ -23,6 +23,9 @@
* Context Propagators are used to propagate information from workflow to activity, workflow to
* child workflow, and workflow to child thread (using {@link com.uber.cadence.workflow.Async}).
*
+ *
It is important to note that all threads share one ContextPropagator instance, so your
+ * implementation must be thread-safe and store any state in ThreadLocal variables.
+ *
*
A sample ContextPropagator
that copies all {@link org.slf4j.MDC} entries starting
* with a given prefix along the code path looks like this:
*
@@ -136,4 +139,31 @@ public interface ContextPropagator {
/** Sets the current context */
void setCurrentContext(Object context);
+
+ /**
+ * This is a lifecycle method, called after the context has been propagated to the
+ * workflow/activity thread but the workflow/activity has not yet started.
+ */
+ default void setUp() {
+ // No-op
+ }
+
+ /**
+ * This is a lifecycle method, called after the workflow/activity has completed. If the method
+ * finished without exception, {@code successful} will be true. Otherwise, it will be false and
+ * {@link #onError(Throwable)} will have already been called.
+ */
+ default void finish(boolean successful) {
+ // No-op
+ }
+
+ /**
+ * This is a lifecycle method, called when the workflow/activity finishes by throwing an unhandled
+ * exception. {@link #finish(boolean)} is called after this method.
+ *
+ * @param t The unhandled exception that caused the workflow/activity to terminate
+ */
+ default void onError(Throwable t) {
+ // No-op
+ }
}
diff --git a/src/main/java/com/uber/cadence/context/OpenTracingContextPropagator.java b/src/main/java/com/uber/cadence/context/OpenTracingContextPropagator.java
new file mode 100644
index 000000000..e90ac29b9
--- /dev/null
+++ b/src/main/java/com/uber/cadence/context/OpenTracingContextPropagator.java
@@ -0,0 +1,208 @@
+/*
+ * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Modifications copyright (C) 2017 Uber Technologies, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"). You may not
+ * use this file except in compliance with the License. A copy of the License is
+ * located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package com.uber.cadence.context;
+
+import com.uber.cadence.internal.logging.LoggerTag;
+import io.opentracing.Scope;
+import io.opentracing.Span;
+import io.opentracing.SpanContext;
+import io.opentracing.Tracer;
+import io.opentracing.log.Fields;
+import io.opentracing.propagation.Format;
+import io.opentracing.propagation.TextMap;
+import io.opentracing.tag.Tags;
+import io.opentracing.util.GlobalTracer;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
+/** Support for OpenTracing spans */
+public class OpenTracingContextPropagator implements ContextPropagator {
+
+ private static final Logger log = LoggerFactory.getLogger(OpenTracingContextPropagator.class);
+
+ private static ThreadLocal currentOpenTracingSpanContext = new ThreadLocal<>();
+ private static ThreadLocal currentOpenTracingSpan = new ThreadLocal<>();
+ private static ThreadLocal currentOpenTracingScope = new ThreadLocal<>();
+
+ public static void setCurrentOpenTracingSpanContext(SpanContext ctx) {
+ if (ctx != null) {
+ currentOpenTracingSpanContext.set(ctx);
+ }
+ }
+
+ public static SpanContext getCurrentOpenTracingSpanContext() {
+ return currentOpenTracingSpanContext.get();
+ }
+
+ @Override
+ public String getName() {
+ return "OpenTracing";
+ }
+
+ @Override
+ public Map serializeContext(Object context) {
+ Map serializedContext = new HashMap<>();
+ Map contextMap = (Map) context;
+ if (contextMap != null) {
+ for (Map.Entry entry : contextMap.entrySet()) {
+ serializedContext.put(entry.getKey(), entry.getValue().getBytes(Charset.defaultCharset()));
+ }
+ }
+ return serializedContext;
+ }
+
+ @Override
+ public Object deserializeContext(Map context) {
+ Map contextMap = new HashMap<>();
+ for (Map.Entry entry : context.entrySet()) {
+ contextMap.put(entry.getKey(), new String(entry.getValue(), Charset.defaultCharset()));
+ }
+ return contextMap;
+ }
+
+ @Override
+ public Object getCurrentContext() {
+ Tracer currentTracer = GlobalTracer.get();
+ Span currentSpan = currentTracer.scopeManager().activeSpan();
+ if (currentSpan != null) {
+ HashMapTextMap contextTextMap = new HashMapTextMap();
+ currentTracer.inject(currentSpan.context(), Format.Builtin.TEXT_MAP, contextTextMap);
+ return contextTextMap.getBackingMap();
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public void setCurrentContext(Object context) {
+ Tracer currentTracer = GlobalTracer.get();
+ Map contextAsMap = (Map) context;
+ if (contextAsMap != null) {
+ HashMapTextMap contextTextMap = new HashMapTextMap(contextAsMap);
+ setCurrentOpenTracingSpanContext(
+ currentTracer.extract(Format.Builtin.TEXT_MAP, contextTextMap));
+ }
+ }
+
+ @Override
+ public void setUp() {
+ Tracer openTracingTracer = GlobalTracer.get();
+ Tracer.SpanBuilder builder =
+ openTracingTracer
+ .buildSpan("cadence.workflow")
+ .withTag("resource.name", MDC.get(LoggerTag.WORKFLOW_TYPE));
+
+ if (getCurrentOpenTracingSpanContext() != null) {
+ builder.asChildOf(getCurrentOpenTracingSpanContext());
+ }
+
+ Span span = builder.start();
+ openTracingTracer.activateSpan(span);
+ currentOpenTracingSpan.set(span);
+ Scope scope = openTracingTracer.activateSpan(span);
+ currentOpenTracingScope.set(scope);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ Span span = currentOpenTracingSpan.get();
+ if (span != null) {
+ Tags.ERROR.set(span, true);
+ Map errorData = new HashMap<>();
+ errorData.put(Fields.EVENT, "error");
+ if (t != null) {
+ errorData.put(Fields.ERROR_OBJECT, t);
+ errorData.put(Fields.MESSAGE, t.getMessage());
+ }
+ span.log(errorData);
+ }
+ }
+
+ @Override
+ public void finish(boolean successful) {
+ Scope currentScope = currentOpenTracingScope.get();
+ Span currentSpan = currentOpenTracingSpan.get();
+
+ if (currentScope != null) {
+ currentScope.close();
+ }
+
+ if (currentSpan != null) {
+ currentSpan.finish();
+ }
+
+ currentOpenTracingScope.remove();
+ currentOpenTracingSpan.remove();
+ currentOpenTracingSpanContext.remove();
+ }
+
+ /** Just check for other instances of the same class */
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null) {
+ return false;
+ }
+
+ if (this == obj) {
+ return true;
+ }
+
+ if (this.getClass().equals(obj.getClass())) {
+ return true;
+ }
+
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return this.getClass().hashCode();
+ }
+
+ private class HashMapTextMap implements TextMap {
+
+ private final HashMap backingMap = new HashMap<>();
+
+ public HashMapTextMap() {
+ // Noop
+ }
+
+ public HashMapTextMap(Map spanData) {
+ backingMap.putAll(spanData);
+ }
+
+ @Override
+ public Iterator> iterator() {
+ return backingMap.entrySet().iterator();
+ }
+
+ @Override
+ public void put(String key, String value) {
+ backingMap.put(key, value);
+ }
+
+ public HashMap getBackingMap() {
+ return backingMap;
+ }
+ }
+}
diff --git a/src/main/java/com/uber/cadence/internal/context/ContextThreadLocal.java b/src/main/java/com/uber/cadence/internal/context/ContextThreadLocal.java
index 227124170..8bbf7729e 100644
--- a/src/main/java/com/uber/cadence/internal/context/ContextThreadLocal.java
+++ b/src/main/java/com/uber/cadence/internal/context/ContextThreadLocal.java
@@ -24,10 +24,14 @@
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/** This class holds the current set of context propagators */
public class ContextThreadLocal {
+ private static final Logger log = LoggerFactory.getLogger(ContextThreadLocal.class);
+
private static WorkflowThreadLocal> contextPropagators =
WorkflowThreadLocal.withInitial(
new Supplier>() {
@@ -57,6 +61,11 @@ public static Map getCurrentContextForPropagation() {
return contextData;
}
+ /**
+ * Injects the context data into the thread for each configured context propagator
+ *
+ * @param contextData The context data received from the server
+ */
public static void propagateContextToCurrentThread(Map contextData) {
if (contextData == null || contextData.isEmpty()) {
return;
@@ -67,4 +76,49 @@ public static void propagateContextToCurrentThread(Map contextDa
}
}
}
+
+ /** Calls {@link ContextPropagator#setUp()} for each propagator */
+ public static void setUpContextPropagators() {
+ for (ContextPropagator propagator : contextPropagators.get()) {
+ try {
+ propagator.setUp();
+ } catch (Throwable t) {
+ // Don't let an error in one propagator block the others
+ log.error("Error calling setUp() on a contextpropagator", t);
+ }
+ }
+ }
+
+ /**
+ * Calls {@link ContextPropagator#onError(Throwable)} for each propagator
+ *
+ * @param t The Throwable that caused the workflow/activity to finish
+ */
+ public static void onErrorContextPropagators(Throwable t) {
+ for (ContextPropagator propagator : contextPropagators.get()) {
+ try {
+ propagator.onError(t);
+ } catch (Throwable t1) {
+ // Don't let an error in one propagator block the others
+ log.error("Error calling onError() on a contextpropagator", t1);
+ }
+ }
+ }
+
+ /**
+ * Calls {@link ContextPropagator#finish(boolean)} for each propagator
+ *
+ * @param successful True if the workflow/activity completed without unhandled exception, false
+ * otherwise
+ */
+ public static void finishContextPropagators(boolean successful) {
+ for (ContextPropagator propagator : contextPropagators.get()) {
+ try {
+ propagator.finish(successful);
+ } catch (Throwable t) {
+ // Don't let an error in one propagator block the others
+ log.error("Error calling finish() on a contextpropagator", t);
+ }
+ }
+ }
}
diff --git a/src/main/java/com/uber/cadence/internal/replay/WorkflowContext.java b/src/main/java/com/uber/cadence/internal/replay/WorkflowContext.java
index 5f1d8f16f..729255727 100644
--- a/src/main/java/com/uber/cadence/internal/replay/WorkflowContext.java
+++ b/src/main/java/com/uber/cadence/internal/replay/WorkflowContext.java
@@ -23,6 +23,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
final class WorkflowContext {
@@ -166,7 +167,18 @@ Map getPropagatedContexts() {
Map contextData = new HashMap<>();
for (ContextPropagator propagator : contextPropagators) {
- contextData.put(propagator.getName(), propagator.deserializeContext(headerData));
+ // Only send the context propagator the fields that belong to them
+ // Change the map from MyPropagator:foo -> bar to foo -> bar
+ Map filteredData =
+ headerData
+ .entrySet()
+ .stream()
+ .filter(e -> e.getKey().startsWith(propagator.getName()))
+ .collect(
+ Collectors.toMap(
+ e -> e.getKey().substring(propagator.getName().length() + 1),
+ Map.Entry::getValue));
+ contextData.put(propagator.getName(), propagator.deserializeContext(filteredData));
}
return contextData;
diff --git a/src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java b/src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java
index 23955ce69..0e332ba7a 100644
--- a/src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java
+++ b/src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java
@@ -72,6 +72,7 @@
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -449,7 +450,18 @@ private Map extractContextsAndConvertToBytes(
}
Map result = new HashMap<>();
for (ContextPropagator propagator : contextPropagators) {
- result.putAll(propagator.serializeContext(propagator.getCurrentContext()));
+ // Get the serialized context from the propagator
+ Map serializedContext =
+ propagator.serializeContext(propagator.getCurrentContext());
+ // Namespace each entry in case of overlaps, so foo -> bar becomes MyPropagator:foo -> bar
+ Map namespacedSerializedContext =
+ serializedContext
+ .entrySet()
+ .stream()
+ .collect(
+ Collectors.toMap(
+ e -> propagator.getName() + ":" + e.getKey(), Map.Entry::getValue));
+ result.putAll(namespacedSerializedContext);
}
return result;
}
diff --git a/src/main/java/com/uber/cadence/internal/sync/WorkflowStubImpl.java b/src/main/java/com/uber/cadence/internal/sync/WorkflowStubImpl.java
index a967a5e97..8a85a857e 100644
--- a/src/main/java/com/uber/cadence/internal/sync/WorkflowStubImpl.java
+++ b/src/main/java/com/uber/cadence/internal/sync/WorkflowStubImpl.java
@@ -58,6 +58,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
class WorkflowStubImpl implements WorkflowStub {
@@ -206,7 +207,18 @@ private Map extractContextsAndConvertToBytes(
}
Map result = new HashMap<>();
for (ContextPropagator propagator : contextPropagators) {
- result.putAll(propagator.serializeContext(propagator.getCurrentContext()));
+ // Get the serialized context from the propagator
+ Map serializedContext =
+ propagator.serializeContext(propagator.getCurrentContext());
+ // Namespace each entry in case of overlaps, so foo -> bar becomes MyPropagator:foo -> bar
+ Map namespacedSerializedContext =
+ serializedContext
+ .entrySet()
+ .stream()
+ .collect(
+ Collectors.toMap(
+ k -> propagator.getName() + ":" + k.getKey(), Map.Entry::getValue));
+ result.putAll(namespacedSerializedContext);
}
return result;
}
diff --git a/src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java b/src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java
index 9b7f5891f..295610dcf 100644
--- a/src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java
+++ b/src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java
@@ -91,6 +91,7 @@ public void run() {
// Repopulate the context(s)
ContextThreadLocal.setContextPropagators(this.contextPropagators);
ContextThreadLocal.propagateContextToCurrentThread(this.propagatedContexts);
+ ContextThreadLocal.setUpContextPropagators();
try {
// initialYield blocks thread until the first runUntilBlocked is called.
@@ -99,6 +100,7 @@ public void run() {
cancellationScope.run();
} catch (DestroyWorkflowThreadError e) {
if (!threadContext.isDestroyRequested()) {
+ ContextThreadLocal.onErrorContextPropagators(e);
threadContext.setUnhandledException(e);
}
} catch (Error e) {
@@ -111,9 +113,11 @@ public void run() {
log.error(
String.format("Workflow thread \"%s\" run failed with Error:\n%s", name, stackTrace));
}
+ ContextThreadLocal.onErrorContextPropagators(e);
threadContext.setUnhandledException(e);
} catch (CancellationException e) {
if (!isCancelRequested()) {
+ ContextThreadLocal.onErrorContextPropagators(e);
threadContext.setUnhandledException(e);
}
if (log.isDebugEnabled()) {
@@ -130,8 +134,10 @@ public void run() {
"Workflow thread \"%s\" run failed with unhandled exception:\n%s",
name, stackTrace));
}
+ ContextThreadLocal.onErrorContextPropagators(e);
threadContext.setUnhandledException(e);
} finally {
+ ContextThreadLocal.finishContextPropagators(threadContext.getUnhandledException() == null);
DeterministicRunnerImpl.setCurrentThreadInternal(null);
threadContext.setStatus(Status.DONE);
thread.setName(originalName);
diff --git a/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java b/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java
index a0a86014a..6941f1cc4 100644
--- a/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java
+++ b/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java
@@ -24,6 +24,7 @@
import com.uber.cadence.RespondActivityTaskFailedRequest;
import com.uber.cadence.WorkflowExecution;
import com.uber.cadence.context.ContextPropagator;
+import com.uber.cadence.context.OpenTracingContextPropagator;
import com.uber.cadence.internal.common.RpcRetryer;
import com.uber.cadence.internal.logging.LoggerTag;
import com.uber.cadence.internal.metrics.MetricsTag;
@@ -34,12 +35,18 @@
import com.uber.m3.tally.Stopwatch;
import com.uber.m3.util.Duration;
import com.uber.m3.util.ImmutableMap;
+import io.opentracing.Span;
+import io.opentracing.Tracer;
+import io.opentracing.log.Fields;
+import io.opentracing.tag.Tags;
+import io.opentracing.util.GlobalTracer;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import org.apache.thrift.TException;
import org.slf4j.MDC;
@@ -126,7 +133,18 @@ public void handle(PollForActivityTaskResponse task) throws Exception {
propagateContext(task);
- try {
+ // Set up an opentracing span
+ Tracer openTracingTracer = GlobalTracer.get();
+ Tracer.SpanBuilder builder =
+ openTracingTracer
+ .buildSpan("cadence.activity")
+ .withTag("resource.name", task.getActivityType().getName());
+ if (OpenTracingContextPropagator.getCurrentOpenTracingSpanContext() != null) {
+ builder.asChildOf(OpenTracingContextPropagator.getCurrentOpenTracingSpanContext());
+ }
+ Span span = builder.start();
+
+ try (io.opentracing.Scope scope = openTracingTracer.activateSpan(span)) {
Stopwatch sw = metricsScope.timer(MetricsType.ACTIVITY_EXEC_LATENCY).start();
ActivityTaskHandler.Result response = handler.handle(task, metricsScope, false);
sw.stop();
@@ -148,11 +166,22 @@ public void handle(PollForActivityTaskResponse task) throws Exception {
Stopwatch sw = metricsScope.timer(MetricsType.ACTIVITY_RESP_LATENCY).start();
sendReply(task, new Result(null, null, cancelledRequest), metricsScope);
sw.stop();
+ } catch (Exception e) {
+ Tags.ERROR.set(span, true);
+ Map errorData = new HashMap<>();
+ errorData.put(Fields.EVENT, "error");
+ if (e != null) {
+ errorData.put(Fields.ERROR_OBJECT, e);
+ errorData.put(Fields.MESSAGE, e.getMessage());
+ }
+ span.log(errorData);
+ throw e;
} finally {
MDC.remove(LoggerTag.ACTIVITY_ID);
MDC.remove(LoggerTag.ACTIVITY_TYPE);
MDC.remove(LoggerTag.WORKFLOW_ID);
MDC.remove(LoggerTag.RUN_ID);
+ span.finish();
}
}
@@ -175,7 +204,18 @@ void propagateContext(PollForActivityTaskResponse response) {
});
for (ContextPropagator propagator : options.getContextPropagators()) {
- propagator.setCurrentContext(propagator.deserializeContext(headerData));
+ // Only send the context propagator the fields that belong to them
+ // Change the map from MyPropagator:foo -> bar to foo -> bar
+ Map filteredData =
+ headerData
+ .entrySet()
+ .stream()
+ .filter(e -> e.getKey().startsWith(propagator.getName()))
+ .collect(
+ Collectors.toMap(
+ e -> e.getKey().substring(propagator.getName().length() + 1),
+ Map.Entry::getValue));
+ propagator.setCurrentContext(propagator.deserializeContext(filteredData));
}
}
diff --git a/src/main/java/com/uber/cadence/internal/worker/LocalActivityWorker.java b/src/main/java/com/uber/cadence/internal/worker/LocalActivityWorker.java
index c08870adf..08e765aad 100644
--- a/src/main/java/com/uber/cadence/internal/worker/LocalActivityWorker.java
+++ b/src/main/java/com/uber/cadence/internal/worker/LocalActivityWorker.java
@@ -22,6 +22,7 @@
import com.uber.cadence.MarkerRecordedEventAttributes;
import com.uber.cadence.PollForActivityTaskResponse;
import com.uber.cadence.common.RetryOptions;
+import com.uber.cadence.context.ContextPropagator;
import com.uber.cadence.internal.common.LocalActivityMarkerData;
import com.uber.cadence.internal.metrics.MetricsTag;
import com.uber.cadence.internal.metrics.MetricsType;
@@ -37,6 +38,7 @@
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
+import java.util.stream.Collectors;
public final class LocalActivityWorker extends SuspendableWorkerBase {
@@ -225,9 +227,19 @@ private void propagateContext(ExecuteLocalActivityParameters params) {
}
private void restoreContext(Map context) {
- options
- .getContextPropagators()
- .forEach(
- propagator -> propagator.setCurrentContext(propagator.deserializeContext(context)));
+ for (ContextPropagator propagator : options.getContextPropagators()) {
+ // Only send the context propagator the fields that belong to them
+ // Change the map from MyPropagator:foo -> bar to foo -> bar
+ Map filteredData =
+ context
+ .entrySet()
+ .stream()
+ .filter(e -> e.getKey().startsWith(propagator.getName()))
+ .collect(
+ Collectors.toMap(
+ e -> e.getKey().substring(propagator.getName().length() + 1),
+ Map.Entry::getValue));
+ propagator.setCurrentContext(propagator.deserializeContext(filteredData));
+ }
}
}
diff --git a/src/test/java/com/uber/cadence/context/ContextTests.java b/src/test/java/com/uber/cadence/context/ContextTests.java
new file mode 100644
index 000000000..320ddcb05
--- /dev/null
+++ b/src/test/java/com/uber/cadence/context/ContextTests.java
@@ -0,0 +1,442 @@
+/*
+ * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Modifications copyright (C) 2017 Uber Technologies, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"). You may not
+ * use this file except in compliance with the License. A copy of the License is
+ * located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package com.uber.cadence.context;
+
+import static org.junit.Assert.*;
+
+import com.uber.cadence.activity.ActivityMethod;
+import com.uber.cadence.activity.ActivityOptions;
+import com.uber.cadence.client.WorkflowClient;
+import com.uber.cadence.client.WorkflowClientOptions;
+import com.uber.cadence.client.WorkflowException;
+import com.uber.cadence.client.WorkflowOptions;
+import com.uber.cadence.internal.testing.WorkflowTestingTest.ChildWorkflow;
+import com.uber.cadence.testing.TestEnvironmentOptions;
+import com.uber.cadence.testing.TestWorkflowEnvironment;
+import com.uber.cadence.worker.Worker;
+import com.uber.cadence.workflow.Async;
+import com.uber.cadence.workflow.ChildWorkflowOptions;
+import com.uber.cadence.workflow.Promise;
+import com.uber.cadence.workflow.SignalMethod;
+import com.uber.cadence.workflow.Workflow;
+import com.uber.cadence.workflow.WorkflowMethod;
+import io.opentracing.Scope;
+import io.opentracing.Span;
+import io.opentracing.Tracer;
+import io.opentracing.mock.MockSpan;
+import io.opentracing.mock.MockTracer;
+import io.opentracing.util.GlobalTracer;
+import io.opentracing.util.ThreadLocalScopeManager;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.rules.Timeout;
+import org.junit.runner.Description;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
+public class ContextTests {
+ private static final Logger log = LoggerFactory.getLogger(ContextTests.class);
+
+ @Rule public Timeout globalTimeout = Timeout.seconds(5000);
+
+ @Rule
+ public TestWatcher watchman =
+ new TestWatcher() {
+ @Override
+ protected void failed(Throwable e, Description description) {
+ System.err.println(testEnvironment.getDiagnostics());
+ }
+ };
+
+ private static final String TASK_LIST = "test-workflow";
+
+ private TestWorkflowEnvironment testEnvironment;
+ private MockTracer mockTracer =
+ new MockTracer(new ThreadLocalScopeManager(), MockTracer.Propagator.TEXT_MAP);
+
+ @Before
+ public void setUp() {
+ TestEnvironmentOptions options =
+ new TestEnvironmentOptions.Builder()
+ .setWorkflowClientOptions(
+ WorkflowClientOptions.newBuilder()
+ .setContextPropagators(
+ Arrays.asList(
+ new TestContextPropagator(), new OpenTracingContextPropagator()))
+ .build())
+ .build();
+
+ testEnvironment = TestWorkflowEnvironment.newInstance(options);
+ GlobalTracer.registerIfAbsent(mockTracer);
+ }
+
+ @After
+ public void tearDown() {
+ testEnvironment.close();
+ mockTracer.reset();
+ }
+
+ public interface TestWorkflow {
+
+ @WorkflowMethod(executionStartToCloseTimeoutSeconds = 3600 * 24, taskList = TASK_LIST)
+ String workflow1(String input);
+ }
+
+ public interface ParentWorkflow {
+
+ @WorkflowMethod(executionStartToCloseTimeoutSeconds = 3600 * 24, taskList = TASK_LIST)
+ String workflow(String input);
+
+ @SignalMethod
+ void signal(String value);
+ }
+
+ public interface TestActivity {
+
+ @ActivityMethod(scheduleToCloseTimeoutSeconds = 3600)
+ String activity1(String input);
+ }
+
+ public static class TestContextPropagator implements ContextPropagator {
+
+ @Override
+ public String getName() {
+ return "TestContextPropagator::withSomeColons";
+ }
+
+ @Override
+ public Map serializeContext(Object context) {
+ String testKey = (String) context;
+ if (testKey != null) {
+ return Collections.singletonMap("test", testKey.getBytes(StandardCharsets.UTF_8));
+ } else {
+ return Collections.emptyMap();
+ }
+ }
+
+ @Override
+ public Object deserializeContext(Map context) {
+ if (context.containsKey("test")) {
+ return new String(context.get("test"), StandardCharsets.UTF_8);
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public Object getCurrentContext() {
+ return MDC.get("test");
+ }
+
+ @Override
+ public void setCurrentContext(Object context) {
+ MDC.put("test", String.valueOf(context));
+ }
+ }
+
+ public static class ContextPropagationWorkflowImpl implements TestWorkflow {
+
+ @Override
+ public String workflow1(String input) {
+ // The test value should be in the MDC
+ return MDC.get("test");
+ }
+ }
+
+ @Test()
+ public void testWorkflowContextPropagation() {
+ Worker worker = testEnvironment.newWorker(TASK_LIST);
+ worker.registerWorkflowImplementationTypes(ContextPropagationWorkflowImpl.class);
+ testEnvironment.start();
+ MDC.put("test", "testing123");
+ WorkflowClient client = testEnvironment.newWorkflowClient();
+ WorkflowOptions options =
+ new WorkflowOptions.Builder()
+ .setContextPropagators(Collections.singletonList(new TestContextPropagator()))
+ .build();
+ TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options);
+ String result = workflow.workflow1("input1");
+ assertEquals("testing123", result);
+ }
+
+ public static class ContextPropagationParentWorkflowImpl implements ParentWorkflow {
+
+ @Override
+ public String workflow(String input) {
+ // Get the MDC value
+ String mdcValue = MDC.get("test");
+
+ // Fire up a child workflow
+ ChildWorkflowOptions options =
+ new ChildWorkflowOptions.Builder()
+ .setContextPropagators(Collections.singletonList(new TestContextPropagator()))
+ .build();
+ ChildWorkflow child = Workflow.newChildWorkflowStub(ChildWorkflow.class, options);
+
+ String result = child.workflow(mdcValue, Workflow.getWorkflowInfo().getWorkflowId());
+ return result;
+ }
+
+ @Override
+ public void signal(String value) {}
+ }
+
+ public static class ContextPropagationChildWorkflowImpl implements ChildWorkflow {
+
+ @Override
+ public String workflow(String input, String parentId) {
+ String mdcValue = MDC.get("test");
+ return input + mdcValue;
+ }
+ }
+
+ @Test
+ public void testChildWorkflowContextPropagation() {
+ Worker worker = testEnvironment.newWorker(TASK_LIST);
+ worker.registerWorkflowImplementationTypes(
+ ContextPropagationParentWorkflowImpl.class, ContextPropagationChildWorkflowImpl.class);
+ testEnvironment.start();
+ MDC.put("test", "testing123");
+ WorkflowClient client = testEnvironment.newWorkflowClient();
+ WorkflowOptions options =
+ new WorkflowOptions.Builder()
+ .setContextPropagators(Collections.singletonList(new TestContextPropagator()))
+ .build();
+ ParentWorkflow workflow = client.newWorkflowStub(ParentWorkflow.class, options);
+ String result = workflow.workflow("input1");
+ assertEquals("testing123testing123", result);
+ }
+
+ public static class ContextPropagationThreadWorkflowImpl implements TestWorkflow {
+
+ @Override
+ public String workflow1(String input) {
+ Promise asyncPromise = Async.function(this::async);
+ return asyncPromise.get();
+ }
+
+ private String async() {
+ return "async" + MDC.get("test");
+ }
+ }
+
+ @Test
+ public void testThreadContextPropagation() {
+ Worker worker = testEnvironment.newWorker(TASK_LIST);
+ worker.registerWorkflowImplementationTypes(ContextPropagationThreadWorkflowImpl.class);
+ testEnvironment.start();
+ MDC.put("test", "testing123");
+ WorkflowClient client = testEnvironment.newWorkflowClient();
+ WorkflowOptions options =
+ new WorkflowOptions.Builder()
+ .setContextPropagators(Collections.singletonList(new TestContextPropagator()))
+ .build();
+ TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options);
+ String result = workflow.workflow1("input1");
+ assertEquals("asynctesting123", result);
+ }
+
+ public static class ContextActivityImpl implements TestActivity {
+ @Override
+ public String activity1(String input) {
+ return "activity" + MDC.get("test");
+ }
+ }
+
+ public static class ContextPropagationActivityWorkflowImpl implements TestWorkflow {
+ @Override
+ public String workflow1(String input) {
+ ActivityOptions options =
+ new ActivityOptions.Builder()
+ .setScheduleToCloseTimeout(Duration.ofSeconds(5))
+ .setContextPropagators(Collections.singletonList(new TestContextPropagator()))
+ .build();
+ TestActivity activity = Workflow.newActivityStub(TestActivity.class, options);
+ return activity.activity1("foo");
+ }
+ }
+
+ @Test
+ public void testActivityContextPropagation() {
+ Worker worker = testEnvironment.newWorker(TASK_LIST);
+ worker.registerWorkflowImplementationTypes(ContextPropagationActivityWorkflowImpl.class);
+ worker.registerActivitiesImplementations(new ContextActivityImpl());
+ testEnvironment.start();
+ MDC.put("test", "testing123");
+ WorkflowClient client = testEnvironment.newWorkflowClient();
+ WorkflowOptions options =
+ new WorkflowOptions.Builder()
+ .setContextPropagators(Collections.singletonList(new TestContextPropagator()))
+ .build();
+ TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options);
+ String result = workflow.workflow1("input1");
+ assertEquals("activitytesting123", result);
+ }
+
+ public static class DefaultContextPropagationActivityWorkflowImpl implements TestWorkflow {
+ @Override
+ public String workflow1(String input) {
+ ActivityOptions options =
+ new ActivityOptions.Builder().setScheduleToCloseTimeout(Duration.ofSeconds(5)).build();
+ TestActivity activity = Workflow.newActivityStub(TestActivity.class, options);
+ return activity.activity1("foo");
+ }
+ }
+
+ @Test
+ public void testDefaultActivityContextPropagation() {
+ Worker worker = testEnvironment.newWorker(TASK_LIST);
+ worker.registerWorkflowImplementationTypes(DefaultContextPropagationActivityWorkflowImpl.class);
+ worker.registerActivitiesImplementations(new ContextActivityImpl());
+ testEnvironment.start();
+ MDC.put("test", "testing123");
+ WorkflowClient client = testEnvironment.newWorkflowClient();
+ WorkflowOptions options =
+ new WorkflowOptions.Builder()
+ .setContextPropagators(Collections.singletonList(new TestContextPropagator()))
+ .build();
+ TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options);
+ String result = workflow.workflow1("input1");
+ assertEquals("activitytesting123", result);
+ }
+
+ public static class DefaultContextPropagationParentWorkflowImpl implements ParentWorkflow {
+
+ @Override
+ public String workflow(String input) {
+ // Get the MDC value
+ String mdcValue = MDC.get("test");
+
+ // Fire up a child workflow
+ ChildWorkflowOptions options = new ChildWorkflowOptions.Builder().build();
+ ChildWorkflow child = Workflow.newChildWorkflowStub(ChildWorkflow.class, options);
+
+ String result = child.workflow(mdcValue, Workflow.getWorkflowInfo().getWorkflowId());
+ return result;
+ }
+
+ @Override
+ public void signal(String value) {}
+ }
+
+ @Test
+ public void testDefaultChildWorkflowContextPropagation() {
+ Worker worker = testEnvironment.newWorker(TASK_LIST);
+ worker.registerWorkflowImplementationTypes(
+ DefaultContextPropagationParentWorkflowImpl.class,
+ ContextPropagationChildWorkflowImpl.class);
+ testEnvironment.start();
+ MDC.put("test", "testing123");
+ WorkflowClient client = testEnvironment.newWorkflowClient();
+ WorkflowOptions options =
+ new WorkflowOptions.Builder()
+ .setContextPropagators(Collections.singletonList(new TestContextPropagator()))
+ .build();
+ ParentWorkflow workflow = client.newWorkflowStub(ParentWorkflow.class, options);
+ String result = workflow.workflow("input1");
+ assertEquals("testing123testing123", result);
+ }
+
+ public static class OpenTracingContextPropagationWorkflowImpl implements TestWorkflow {
+ @Override
+ public String workflow1(String input) {
+ Tracer tracer = GlobalTracer.get();
+ Span activeSpan = tracer.scopeManager().activeSpan();
+ MockSpan mockSpan = (MockSpan) activeSpan;
+ assertNotNull(activeSpan);
+ assertEquals("TestWorkflow::workflow1", mockSpan.tags().get("resource.name"));
+ assertNotEquals(0, mockSpan.parentId());
+ if ("fail".equals(input)) {
+ throw new IllegalArgumentException();
+ } else {
+ return activeSpan.getBaggageItem("foo");
+ }
+ }
+ }
+
+ @Test
+ public void testOpenTracingContextPropagation() {
+ Tracer tracer = GlobalTracer.get();
+ Span span = tracer.buildSpan("testContextPropagationSuccess").start();
+
+ Worker worker = testEnvironment.newWorker(TASK_LIST);
+ worker.registerWorkflowImplementationTypes(OpenTracingContextPropagationWorkflowImpl.class);
+ testEnvironment.start();
+ WorkflowClient client = testEnvironment.newWorkflowClient();
+ WorkflowOptions options =
+ new WorkflowOptions.Builder()
+ .setContextPropagators(
+ Arrays.asList(new TestContextPropagator(), new OpenTracingContextPropagator()))
+ .build();
+
+ try (Scope scope = tracer.scopeManager().activate(span)) {
+
+ Span activeSpan = tracer.scopeManager().activeSpan();
+ activeSpan.setBaggageItem("foo", "bar");
+
+ TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options);
+ assertEquals("bar", workflow.workflow1("input1"));
+
+ } finally {
+ span.finish();
+ }
+ }
+
+ @Test
+ public void testOpenTracingContextPropagationWithFailure() {
+ Tracer tracer = GlobalTracer.get();
+ Span span = tracer.buildSpan("testContextPropagationFailure").start();
+
+ Worker worker = testEnvironment.newWorker(TASK_LIST);
+ worker.registerWorkflowImplementationTypes(OpenTracingContextPropagationWorkflowImpl.class);
+ testEnvironment.start();
+ WorkflowClient client = testEnvironment.newWorkflowClient();
+ WorkflowOptions options =
+ new WorkflowOptions.Builder()
+ .setContextPropagators(
+ Arrays.asList(new TestContextPropagator(), new OpenTracingContextPropagator()))
+ .build();
+
+ try (Scope scope = tracer.scopeManager().activate(span)) {
+
+ Span activeSpan = tracer.scopeManager().activeSpan();
+ activeSpan.setBaggageItem("foo", "bar");
+
+ TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options);
+ try {
+ workflow.workflow1("fail");
+ fail("Unreachable");
+ } catch (WorkflowException e) {
+ // Expected
+ assertEquals(IllegalArgumentException.class, e.getCause().getClass());
+ }
+
+ } finally {
+ span.finish();
+ }
+ }
+}
diff --git a/src/test/java/com/uber/cadence/internal/testing/WorkflowTestingTest.java b/src/test/java/com/uber/cadence/internal/testing/WorkflowTestingTest.java
index e22de1bdd..41589711d 100644
--- a/src/test/java/com/uber/cadence/internal/testing/WorkflowTestingTest.java
+++ b/src/test/java/com/uber/cadence/internal/testing/WorkflowTestingTest.java
@@ -17,9 +17,7 @@
package com.uber.cadence.internal.testing;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -38,8 +36,12 @@
import com.uber.cadence.activity.Activity;
import com.uber.cadence.activity.ActivityMethod;
import com.uber.cadence.activity.ActivityOptions;
-import com.uber.cadence.client.*;
-import com.uber.cadence.context.ContextPropagator;
+import com.uber.cadence.client.WorkflowClient;
+import com.uber.cadence.client.WorkflowClientOptions;
+import com.uber.cadence.client.WorkflowException;
+import com.uber.cadence.client.WorkflowOptions;
+import com.uber.cadence.client.WorkflowStub;
+import com.uber.cadence.client.WorkflowTimedOutException;
import com.uber.cadence.internal.common.WorkflowExecutionUtils;
import com.uber.cadence.testing.SimulatedTimeoutException;
import com.uber.cadence.testing.TestEnvironmentOptions;
@@ -47,17 +49,13 @@
import com.uber.cadence.worker.Worker;
import com.uber.cadence.workflow.ActivityTimeoutException;
import com.uber.cadence.workflow.Async;
-import com.uber.cadence.workflow.ChildWorkflowOptions;
import com.uber.cadence.workflow.ChildWorkflowTimedOutException;
import com.uber.cadence.workflow.Promise;
import com.uber.cadence.workflow.SignalMethod;
import com.uber.cadence.workflow.Workflow;
import com.uber.cadence.workflow.WorkflowMethod;
-import java.nio.charset.StandardCharsets;
import java.time.Duration;
-import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CancellationException;
@@ -73,7 +71,6 @@
import org.junit.runner.Description;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
public class WorkflowTestingTest {
private static final Logger log = LoggerFactory.getLogger(WorkflowTestingTest.class);
@@ -97,10 +94,7 @@ protected void failed(Throwable e, Description description) {
public void setUp() {
TestEnvironmentOptions options =
new TestEnvironmentOptions.Builder()
- .setWorkflowClientOptions(
- WorkflowClientOptions.newBuilder()
- .setContextPropagators(Collections.singletonList(new TestContextPropagator()))
- .build())
+ .setWorkflowClientOptions(WorkflowClientOptions.newBuilder().build())
.build();
testEnvironment = TestWorkflowEnvironment.newInstance(options);
}
@@ -739,244 +733,4 @@ public void testMockedChildSimulatedTimeout() {
assertTrue(e.getCause() instanceof ChildWorkflowTimedOutException);
}
}
-
- public static class TestContextPropagator implements ContextPropagator {
-
- @Override
- public String getName() {
- return this.getClass().getName();
- }
-
- @Override
- public Map serializeContext(Object context) {
- String testKey = (String) context;
- if (testKey != null) {
- return Collections.singletonMap("test", testKey.getBytes(StandardCharsets.UTF_8));
- } else {
- return Collections.emptyMap();
- }
- }
-
- @Override
- public Object deserializeContext(Map context) {
- if (context.containsKey("test")) {
- return new String(context.get("test"), StandardCharsets.UTF_8);
- } else {
- return null;
- }
- }
-
- @Override
- public Object getCurrentContext() {
- return MDC.get("test");
- }
-
- @Override
- public void setCurrentContext(Object context) {
- MDC.put("test", String.valueOf(context));
- }
- }
-
- public static class ContextPropagationWorkflowImpl implements TestWorkflow {
-
- @Override
- public String workflow1(String input) {
- // The test value should be in the MDC
- return MDC.get("test");
- }
- }
-
- @Test
- public void testWorkflowContextPropagation() {
- Worker worker = testEnvironment.newWorker(TASK_LIST);
- worker.registerWorkflowImplementationTypes(ContextPropagationWorkflowImpl.class);
- testEnvironment.start();
- MDC.put("test", "testing123");
- WorkflowClient client = testEnvironment.newWorkflowClient();
- WorkflowOptions options =
- new WorkflowOptions.Builder()
- .setContextPropagators(Collections.singletonList(new TestContextPropagator()))
- .build();
- TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options);
- String result = workflow.workflow1("input1");
- assertEquals("testing123", result);
- }
-
- public static class ContextPropagationParentWorkflowImpl implements ParentWorkflow {
-
- @Override
- public String workflow(String input) {
- // Get the MDC value
- String mdcValue = MDC.get("test");
-
- // Fire up a child workflow
- ChildWorkflowOptions options =
- new ChildWorkflowOptions.Builder()
- .setContextPropagators(Collections.singletonList(new TestContextPropagator()))
- .build();
- ChildWorkflow child = Workflow.newChildWorkflowStub(ChildWorkflow.class, options);
-
- String result = child.workflow(mdcValue, Workflow.getWorkflowInfo().getWorkflowId());
- return result;
- }
-
- @Override
- public void signal(String value) {}
- }
-
- public static class ContextPropagationChildWorkflowImpl implements ChildWorkflow {
-
- @Override
- public String workflow(String input, String parentId) {
- String mdcValue = MDC.get("test");
- return input + mdcValue;
- }
- }
-
- @Test
- public void testChildWorkflowContextPropagation() {
- Worker worker = testEnvironment.newWorker(TASK_LIST);
- worker.registerWorkflowImplementationTypes(
- ContextPropagationParentWorkflowImpl.class, ContextPropagationChildWorkflowImpl.class);
- testEnvironment.start();
- MDC.put("test", "testing123");
- WorkflowClient client = testEnvironment.newWorkflowClient();
- WorkflowOptions options =
- new WorkflowOptions.Builder()
- .setContextPropagators(Collections.singletonList(new TestContextPropagator()))
- .build();
- ParentWorkflow workflow = client.newWorkflowStub(ParentWorkflow.class, options);
- String result = workflow.workflow("input1");
- assertEquals("testing123testing123", result);
- }
-
- public static class ContextPropagationThreadWorkflowImpl implements TestWorkflow {
-
- @Override
- public String workflow1(String input) {
- Promise asyncPromise = Async.function(this::async);
- return asyncPromise.get();
- }
-
- private String async() {
- return "async" + MDC.get("test");
- }
- }
-
- @Test
- public void testThreadContextPropagation() {
- Worker worker = testEnvironment.newWorker(TASK_LIST);
- worker.registerWorkflowImplementationTypes(ContextPropagationThreadWorkflowImpl.class);
- testEnvironment.start();
- MDC.put("test", "testing123");
- WorkflowClient client = testEnvironment.newWorkflowClient();
- WorkflowOptions options =
- new WorkflowOptions.Builder()
- .setContextPropagators(Collections.singletonList(new TestContextPropagator()))
- .build();
- TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options);
- String result = workflow.workflow1("input1");
- assertEquals("asynctesting123", result);
- }
-
- public static class ContextActivityImpl implements TestActivity {
- @Override
- public String activity1(String input) {
- return "activity" + MDC.get("test");
- }
- }
-
- public static class ContextPropagationActivityWorkflowImpl implements TestWorkflow {
- @Override
- public String workflow1(String input) {
- ActivityOptions options =
- new ActivityOptions.Builder()
- .setScheduleToCloseTimeout(Duration.ofSeconds(5))
- .setContextPropagators(Collections.singletonList(new TestContextPropagator()))
- .build();
- TestActivity activity = Workflow.newActivityStub(TestActivity.class, options);
- return activity.activity1("foo");
- }
- }
-
- @Test
- public void testActivityContextPropagation() {
- Worker worker = testEnvironment.newWorker(TASK_LIST);
- worker.registerWorkflowImplementationTypes(ContextPropagationActivityWorkflowImpl.class);
- worker.registerActivitiesImplementations(new ContextActivityImpl());
- testEnvironment.start();
- MDC.put("test", "testing123");
- WorkflowClient client = testEnvironment.newWorkflowClient();
- WorkflowOptions options =
- new WorkflowOptions.Builder()
- .setContextPropagators(Collections.singletonList(new TestContextPropagator()))
- .build();
- TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options);
- String result = workflow.workflow1("input1");
- assertEquals("activitytesting123", result);
- }
-
- public static class DefaultContextPropagationActivityWorkflowImpl implements TestWorkflow {
- @Override
- public String workflow1(String input) {
- ActivityOptions options =
- new ActivityOptions.Builder().setScheduleToCloseTimeout(Duration.ofSeconds(5)).build();
- TestActivity activity = Workflow.newActivityStub(TestActivity.class, options);
- return activity.activity1("foo");
- }
- }
-
- @Test
- public void testDefaultActivityContextPropagation() {
- Worker worker = testEnvironment.newWorker(TASK_LIST);
- worker.registerWorkflowImplementationTypes(DefaultContextPropagationActivityWorkflowImpl.class);
- worker.registerActivitiesImplementations(new ContextActivityImpl());
- testEnvironment.start();
- MDC.put("test", "testing123");
- WorkflowClient client = testEnvironment.newWorkflowClient();
- WorkflowOptions options =
- new WorkflowOptions.Builder()
- .setContextPropagators(Collections.singletonList(new TestContextPropagator()))
- .build();
- TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options);
- String result = workflow.workflow1("input1");
- assertEquals("activitytesting123", result);
- }
-
- public static class DefaultContextPropagationParentWorkflowImpl implements ParentWorkflow {
-
- @Override
- public String workflow(String input) {
- // Get the MDC value
- String mdcValue = MDC.get("test");
-
- // Fire up a child workflow
- ChildWorkflowOptions options = new ChildWorkflowOptions.Builder().build();
- ChildWorkflow child = Workflow.newChildWorkflowStub(ChildWorkflow.class, options);
-
- String result = child.workflow(mdcValue, Workflow.getWorkflowInfo().getWorkflowId());
- return result;
- }
-
- @Override
- public void signal(String value) {}
- }
-
- @Test
- public void testDefaultChildWorkflowContextPropagation() {
- Worker worker = testEnvironment.newWorker(TASK_LIST);
- worker.registerWorkflowImplementationTypes(
- DefaultContextPropagationParentWorkflowImpl.class,
- ContextPropagationChildWorkflowImpl.class);
- testEnvironment.start();
- MDC.put("test", "testing123");
- WorkflowClient client = testEnvironment.newWorkflowClient();
- WorkflowOptions options =
- new WorkflowOptions.Builder()
- .setContextPropagators(Collections.singletonList(new TestContextPropagator()))
- .build();
- ParentWorkflow workflow = client.newWorkflowStub(ParentWorkflow.class, options);
- String result = workflow.workflow("input1");
- assertEquals("testing123testing123", result);
- }
}
diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml
index b87fe0374..27d076198 100644
--- a/src/test/resources/logback-test.xml
+++ b/src/test/resources/logback-test.xml
@@ -25,6 +25,7 @@
+