Skip to content

Commit b24cd30

Browse files
committed
Use lifecycle methods on ContextPropagator so as not to leak OpenTracing implementation
1 parent cf5c9c6 commit b24cd30

File tree

10 files changed

+167
-54
lines changed

10 files changed

+167
-54
lines changed

build.gradle

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,13 @@ dependencies {
5858
compile group: 'com.google.guava', name: 'guava', version: '28.1-jre'
5959
compile group: 'com.cronutils', name: 'cron-utils', version: '9.0.0'
6060
compile group: 'io.micrometer', name: 'micrometer-core', version: '1.1.2'
61+
compile group: 'io.opentracing', name: 'opentracing-api', version: '0.33.0'
62+
compile group: 'io.opentracing', name: 'opentracing-util', version: '0.33.0'
6163

6264
testCompile group: 'junit', name: 'junit', version: '4.12'
6365
testCompile group: 'com.googlecode.junit-toolbox', name: 'junit-toolbox', version: '2.4'
6466
testCompile group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.3'
67+
testCompile group: 'io.opentracing', name: 'opentracing-mock', version: '0.33.0'
6568
}
6669

6770
license {

src/main/java/com/uber/cadence/activity/LocalActivityOptions.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,15 +101,19 @@ public LocalActivityOptions validateAndBuildWithDefaults() {
101101
if (retryOptions != null) {
102102
ro = new RetryOptions.Builder(retryOptions).validateBuildWithDefaults();
103103
}
104-
return new LocalActivityOptions(roundUpToSeconds(scheduleToCloseTimeout), ro, contextPropagators);
104+
return new LocalActivityOptions(
105+
roundUpToSeconds(scheduleToCloseTimeout), ro, contextPropagators);
105106
}
106107
}
107108

108109
private final Duration scheduleToCloseTimeout;
109110
private final RetryOptions retryOptions;
110111
private final List<ContextPropagator> contextPropagators;
111112

112-
private LocalActivityOptions(Duration scheduleToCloseTimeout, RetryOptions retryOptions, List<ContextPropagator> contextPropagators) {
113+
private LocalActivityOptions(
114+
Duration scheduleToCloseTimeout,
115+
RetryOptions retryOptions,
116+
List<ContextPropagator> contextPropagators) {
113117
this.scheduleToCloseTimeout = scheduleToCloseTimeout;
114118
this.retryOptions = retryOptions;
115119
this.contextPropagators = contextPropagators;

src/main/java/com/uber/cadence/context/ContextPropagator.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@
2323
* Context Propagators are used to propagate information from workflow to activity, workflow to
2424
* child workflow, and workflow to child thread (using {@link com.uber.cadence.workflow.Async}).
2525
*
26+
* <p>It is important to note that all threads share one ContextPropagator instance, so your
27+
* implementation <b>must</b> be thread-safe and store any state in ThreadLocal variables.
28+
*
2629
* <p>A sample <code>ContextPropagator</code> that copies all {@link org.slf4j.MDC} entries starting
2730
* with a given prefix along the code path looks like this:
2831
*
@@ -136,4 +139,31 @@ public interface ContextPropagator {
136139

137140
/** Sets the current context */
138141
void setCurrentContext(Object context);
142+
143+
/**
144+
* This is a lifecycle method, called after the context has been propagated to the
145+
* workflow/activity thread but the workflow/activity has not yet started.
146+
*/
147+
default void setUp() {
148+
// No-op
149+
}
150+
151+
/**
152+
* This is a lifecycle method, called after the workflow/activity has completed. If the method
153+
* finished without exception, {@code successful} will be true. Otherwise, it will be false and
154+
* {@link #onError(Throwable)} will have already been called.
155+
*/
156+
default void finish(boolean successful) {
157+
// No-op
158+
}
159+
160+
/**
161+
* This is a lifecycle method, called when the workflow/activity finishes by throwing an unhandled
162+
* exception. {@link #finish(boolean)} is called after this method.
163+
*
164+
* @param t The unhandled exception that caused the workflow/activity to terminate
165+
*/
166+
default void onError(Throwable t) {
167+
// No-op
168+
}
139169
}

src/main/java/com/uber/cadence/context/OpenTracingContextPropagator.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,28 @@
1717

1818
package com.uber.cadence.context;
1919

20+
import com.uber.cadence.internal.logging.LoggerTag;
21+
import io.opentracing.Scope;
2022
import io.opentracing.Span;
2123
import io.opentracing.SpanContext;
2224
import io.opentracing.Tracer;
25+
import io.opentracing.log.Fields;
2326
import io.opentracing.propagation.Format;
2427
import io.opentracing.propagation.TextMap;
28+
import io.opentracing.tag.Tags;
2529
import io.opentracing.util.GlobalTracer;
2630
import java.nio.charset.Charset;
2731
import java.util.HashMap;
2832
import java.util.Iterator;
2933
import java.util.Map;
34+
import org.slf4j.MDC;
3035

3136
/** Support for OpenTracing spans */
3237
public class OpenTracingContextPropagator implements ContextPropagator {
3338

3439
private static ThreadLocal<SpanContext> currentOpenTracingSpanContext = new ThreadLocal<>();
40+
private static ThreadLocal<Span> currentOpenTracingSpan = new ThreadLocal<>();
41+
private static ThreadLocal<Scope> currentOpenTracingScope = new ThreadLocal<>();
3542

3643
public static void setCurrentOpenTracingSpanContext(SpanContext ctx) {
3744
if (ctx != null) {
@@ -91,6 +98,46 @@ public void setCurrentContext(Object context) {
9198
}
9299
}
93100

101+
@Override
102+
public void setUp() {
103+
Tracer openTracingTracer = GlobalTracer.get();
104+
Tracer.SpanBuilder builder =
105+
openTracingTracer
106+
.buildSpan("cadence.workflow")
107+
.withTag("resource.name", MDC.get(LoggerTag.WORKFLOW_TYPE));
108+
109+
if (getCurrentOpenTracingSpanContext() != null) {
110+
builder.asChildOf(getCurrentOpenTracingSpanContext());
111+
}
112+
113+
Span span = builder.start();
114+
openTracingTracer.activateSpan(span);
115+
currentOpenTracingSpan.set(span);
116+
Scope scope = openTracingTracer.activateSpan(span);
117+
currentOpenTracingScope.set(scope);
118+
}
119+
120+
@Override
121+
public void onError(Throwable t) {
122+
Span span = currentOpenTracingSpan.get();
123+
Tags.ERROR.set(span, true);
124+
Map<String, Object> errorData = new HashMap<>();
125+
errorData.put(Fields.EVENT, "error");
126+
if (t != null) {
127+
errorData.put(Fields.ERROR_OBJECT, t);
128+
errorData.put(Fields.MESSAGE, t.getMessage());
129+
}
130+
span.log(errorData);
131+
}
132+
133+
@Override
134+
public void finish(boolean successful) {
135+
Scope currentScope = currentOpenTracingScope.get();
136+
Span currentSpan = currentOpenTracingSpan.get();
137+
currentScope.close();
138+
currentSpan.finish();
139+
}
140+
94141
private class HashMapTextMap implements TextMap {
95142

96143
private final HashMap<String, String> backingMap = new HashMap<>();

src/main/java/com/uber/cadence/internal/context/ContextThreadLocal.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,14 @@
2424
import java.util.List;
2525
import java.util.Map;
2626
import java.util.function.Supplier;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
2729

2830
/** This class holds the current set of context propagators */
2931
public class ContextThreadLocal {
3032

33+
private static final Logger log = LoggerFactory.getLogger(ContextThreadLocal.class);
34+
3135
private static WorkflowThreadLocal<List<ContextPropagator>> contextPropagators =
3236
WorkflowThreadLocal.withInitial(
3337
new Supplier<List<ContextPropagator>>() {
@@ -57,6 +61,11 @@ public static Map<String, Object> getCurrentContextForPropagation() {
5761
return contextData;
5862
}
5963

64+
/**
65+
* Injects the context data into the thread for each configured context propagator
66+
*
67+
* @param contextData The context data received from the server
68+
*/
6069
public static void propagateContextToCurrentThread(Map<String, Object> contextData) {
6170
if (contextData == null || contextData.isEmpty()) {
6271
return;
@@ -67,4 +76,49 @@ public static void propagateContextToCurrentThread(Map<String, Object> contextDa
6776
}
6877
}
6978
}
79+
80+
/** Calls {@link ContextPropagator#setUp()} for each propagator */
81+
public static void setUpContextPropagators() {
82+
for (ContextPropagator propagator : contextPropagators.get()) {
83+
try {
84+
propagator.setUp();
85+
} catch (Throwable t) {
86+
// Don't let an error in one propagator block the others
87+
log.error("Error calling setUp() on a contextpropagator", t);
88+
}
89+
}
90+
}
91+
92+
/**
93+
* Calls {@link ContextPropagator#onError(Throwable)} for each propagator
94+
*
95+
* @param t The Throwable that caused the workflow/activity to finish
96+
*/
97+
public static void onErrorContextPropagators(Throwable t) {
98+
for (ContextPropagator propagator : contextPropagators.get()) {
99+
try {
100+
propagator.onError(t);
101+
} catch (Throwable t1) {
102+
// Don't let an error in one propagator block the others
103+
log.error("Error calling onError() on a contextpropagator", t1);
104+
}
105+
}
106+
}
107+
108+
/**
109+
* Calls {@link ContextPropagator#finish(boolean))} for each propagator
110+
*
111+
* @param successful True if the workflow/activity completed without unhandled exception, false
112+
* otherwise
113+
*/
114+
public static void finishContextPropagators(boolean successful) {
115+
for (ContextPropagator propagator : contextPropagators.get()) {
116+
try {
117+
propagator.finish(successful);
118+
} catch (Throwable t) {
119+
// Don't let an error in one propagator block the others
120+
log.error("Error calling finish() on a contextpropagator", t);
121+
}
122+
}
123+
}
70124
}

src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -323,10 +323,11 @@ private ExecuteActivityParameters constructExecuteActivityParameters(
323323

324324
private ExecuteLocalActivityParameters constructExecuteLocalActivityParameters(
325325
String name, LocalActivityOptions options, byte[] input, long elapsed, int attempt) {
326-
ExecuteLocalActivityParameters parameters = new ExecuteLocalActivityParameters()
327-
.withActivityType(new ActivityType().setName(name))
328-
.withInput(input)
329-
.withScheduleToCloseTimeoutSeconds(options.getScheduleToCloseTimeout().getSeconds());
326+
ExecuteLocalActivityParameters parameters =
327+
new ExecuteLocalActivityParameters()
328+
.withActivityType(new ActivityType().setName(name))
329+
.withInput(input)
330+
.withScheduleToCloseTimeoutSeconds(options.getScheduleToCloseTimeout().getSeconds());
330331

331332
RetryOptions retryOptions = options.getRetryOptions();
332333
if (retryOptions != null) {
@@ -337,8 +338,8 @@ private ExecuteLocalActivityParameters constructExecuteLocalActivityParameters(
337338
parameters.setWorkflowDomain(this.context.getDomain());
338339
parameters.setWorkflowExecution(this.context.getWorkflowExecution());
339340

340-
List<ContextPropagator> propagators = Optional.ofNullable(options.getContextPropagators())
341-
.orElse(contextPropagators);
341+
List<ContextPropagator> propagators =
342+
Optional.ofNullable(options.getContextPropagators()).orElse(contextPropagators);
342343
parameters.setContext(extractContextsAndConvertToBytes(propagators));
343344

344345
return parameters;

src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java

Lines changed: 7 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,12 @@
1919

2020
import com.google.common.util.concurrent.RateLimiter;
2121
import com.uber.cadence.context.ContextPropagator;
22-
import com.uber.cadence.context.OpenTracingContextPropagator;
2322
import com.uber.cadence.internal.context.ContextThreadLocal;
2423
import com.uber.cadence.internal.logging.LoggerTag;
2524
import com.uber.cadence.internal.metrics.MetricsType;
2625
import com.uber.cadence.internal.replay.DeciderCache;
2726
import com.uber.cadence.internal.replay.DecisionContext;
2827
import com.uber.cadence.workflow.Promise;
29-
import io.opentracing.Scope;
30-
import io.opentracing.Span;
31-
import io.opentracing.Tracer;
32-
import io.opentracing.log.Fields;
33-
import io.opentracing.tag.Tags;
34-
import io.opentracing.util.GlobalTracer;
3528
import java.io.PrintWriter;
3629
import java.io.StringWriter;
3730
import java.util.HashMap;
@@ -98,27 +91,16 @@ public void run() {
9891
// Repopulate the context(s)
9992
ContextThreadLocal.setContextPropagators(this.contextPropagators);
10093
ContextThreadLocal.propagateContextToCurrentThread(this.propagatedContexts);
94+
ContextThreadLocal.setUpContextPropagators();
10195

102-
// Set up an opentracing span
103-
Tracer openTracingTracer = GlobalTracer.get();
104-
Tracer.SpanBuilder builder =
105-
openTracingTracer
106-
.buildSpan("cadence.workflow")
107-
.withTag("resource.name", decisionContext.getWorkflowType().getName());
108-
109-
if (OpenTracingContextPropagator.getCurrentOpenTracingSpanContext() != null) {
110-
builder.asChildOf(OpenTracingContextPropagator.getCurrentOpenTracingSpanContext());
111-
}
112-
Span span = builder.start();
113-
114-
try (Scope scope = openTracingTracer.activateSpan(span)) {
96+
try {
11597
// initialYield blocks thread until the first runUntilBlocked is called.
11698
// Otherwise r starts executing without control of the sync.
11799
threadContext.initialYield();
118100
cancellationScope.run();
119101
} catch (DestroyWorkflowThreadError e) {
120102
if (!threadContext.isDestroyRequested()) {
121-
setSpanError(span, e);
103+
ContextThreadLocal.onErrorContextPropagators(e);
122104
threadContext.setUnhandledException(e);
123105
}
124106
} catch (Error e) {
@@ -131,11 +113,11 @@ public void run() {
131113
log.error(
132114
String.format("Workflow thread \"%s\" run failed with Error:\n%s", name, stackTrace));
133115
}
134-
setSpanError(span, e);
116+
ContextThreadLocal.onErrorContextPropagators(e);
135117
threadContext.setUnhandledException(e);
136118
} catch (CancellationException e) {
137119
if (!isCancelRequested()) {
138-
setSpanError(span, e);
120+
ContextThreadLocal.onErrorContextPropagators(e);
139121
threadContext.setUnhandledException(e);
140122
}
141123
if (log.isDebugEnabled()) {
@@ -152,27 +134,16 @@ public void run() {
152134
"Workflow thread \"%s\" run failed with unhandled exception:\n%s",
153135
name, stackTrace));
154136
}
155-
setSpanError(span, e);
137+
ContextThreadLocal.onErrorContextPropagators(e);
156138
threadContext.setUnhandledException(e);
157139
} finally {
140+
ContextThreadLocal.finishContextPropagators(threadContext.getUnhandledException() == null);
158141
DeterministicRunnerImpl.setCurrentThreadInternal(null);
159142
threadContext.setStatus(Status.DONE);
160143
thread.setName(originalName);
161144
thread = null;
162145
MDC.clear();
163-
span.finish();
164-
}
165-
}
166-
167-
private void setSpanError(Span span, Throwable ex) {
168-
Tags.ERROR.set(span, true);
169-
Map<String, Object> errorData = new HashMap<>();
170-
errorData.put(Fields.EVENT, "error");
171-
if (ex != null) {
172-
errorData.put(Fields.ERROR_OBJECT, ex);
173-
errorData.put(Fields.MESSAGE, ex.getMessage());
174146
}
175-
span.log(errorData);
176147
}
177148

178149
public String getName() {

src/main/java/com/uber/cadence/internal/worker/LocalActivityWorker.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,9 @@ private void propagateContext(ExecuteLocalActivityParameters params) {
271271
}
272272

273273
private void restoreContext(Map<String, byte[]> context) {
274-
options.getContextPropagators()
275-
.forEach(propagator -> propagator.setCurrentContext(propagator.deserializeContext(context)));
274+
options
275+
.getContextPropagators()
276+
.forEach(
277+
propagator -> propagator.setCurrentContext(propagator.deserializeContext(context)));
276278
}
277279
}

src/test/java/com/uber/cadence/activity/LocalActivityContextPropagationTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ public class LocalActivityContextPropagationTest {
5959

6060
private final WrapperContext wrapperContext = new WrapperContext(EXPECTED_CONTEXT_NAME);
6161

62-
//let's add safe TestWorkflowEnvironment closing and make configurable propagation enabling/disabling
62+
// let's add safe TestWorkflowEnvironment closing and make configurable propagation
63+
// enabling/disabling
6364
private class TestEnvAutoCloseable implements AutoCloseable {
6465

6566
private TestWorkflowEnvironment testEnv;

0 commit comments

Comments
 (0)