Skip to content

feat: span processor api refactor #2962

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 31 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
ac0c3bd
feat: add a readable span interface
paullegranddc Apr 22, 2025
a0a8c23
feat: add an experimental on_ending api
paullegranddc May 13, 2025
54fa6f2
fix: formatting and clippy
paullegranddc May 13, 2025
445aa66
Merge branch 'main' into paullgdc/sdk/span_processor_api_refactor
paullegranddc May 13, 2025
b72b031
fix: some SpanProcessor calls were not finished
paullegranddc May 13, 2025
afb4197
fix: lint
paullegranddc May 14, 2025
4954b0d
fix: lint clippy
paullegranddc May 14, 2025
1d3e6d8
Merge branch 'main' into paullgdc/sdk/span_processor_api_refactor
paullegranddc May 14, 2025
224a7ed
feat: add span processor benchmarks
paullegranddc May 20, 2025
a62a9bd
fix: remove unised import
paullegranddc May 20, 2025
5ce1ba4
fix: typos, so many typos...
paullegranddc May 20, 2025
5dae8a7
fix: insert count as number instead of string in example
paullegranddc May 20, 2025
a28a461
fix: make on_ending contract more explicit
paullegranddc May 20, 2025
edb290a
fix: on_ending comment
paullegranddc May 20, 2025
07840d8
nit: move span endtime logic to callers of ensure_ended_and_exported
paullegranddc May 27, 2025
9a2d119
fix: remove panic in FinishedSpan
paullegranddc May 27, 2025
48a3edf
fix: add tests and fix clippy
paullegranddc May 27, 2025
e2edadb
Merge branch 'main' into paullgdc/sdk/span_processor_api_refactor
paullegranddc May 27, 2025
4f9b012
nit: use None rather than empty string when no recording
paullegranddc May 27, 2025
ddcf5f9
nit: add test coverage for ReadableSpan implementation
paullegranddc May 27, 2025
28dc62b
fix: add non panicking version of FinishedSpan::consume
paullegranddc May 27, 2025
b80d612
fix: test non panicking behavior
paullegranddc May 27, 2025
8601bd1
fix: remove panics in finished span reads
paullegranddc May 27, 2025
86fd3da
fix: remove no longer relevant comment
paullegranddc May 27, 2025
2dd7ccf
nit: add comment explaining why we grab the data from the span mutably
paullegranddc May 27, 2025
79270ce
fix: rename ensure_ended_and_exported to end_and_export
paullegranddc May 27, 2025
d8f2f80
fix: example return instead of panic on posined lock
paullegranddc May 27, 2025
343aba2
revert: remove on_ending API
paullegranddc May 27, 2025
2a284e5
Merge branch 'main' into paullgdc/sdk/span_processor_api_refactor
TommyCpp Jun 3, 2025
2bab32c
Merge branch 'main' into paullgdc/sdk/span_processor_api_refactor
TommyCpp Jun 8, 2025
cead725
fix: make consume non panicking
paullegranddc Jun 9, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 70 additions & 3 deletions examples/tracing-http-propagator/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,17 @@ use opentelemetry_sdk::{
error::OTelSdkResult,
logs::{LogProcessor, SdkLogRecord, SdkLoggerProvider},
propagation::{BaggagePropagator, TraceContextPropagator},
trace::{SdkTracerProvider, SpanProcessor},
trace::{FinishedSpan, ReadableSpan, SdkTracerProvider, SpanProcessor},
};
use opentelemetry_semantic_conventions::trace;
use opentelemetry_stdout::{LogExporter, SpanExporter};
use std::time::Duration;
use std::{convert::Infallible, net::SocketAddr, sync::OnceLock};
use std::{
collections::HashMap,
convert::Infallible,
net::SocketAddr,
sync::{Mutex, OnceLock},
};
use tokio::net::TcpListener;
use tracing::info;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
Expand Down Expand Up @@ -84,6 +89,7 @@ async fn router(
let span = tracer
.span_builder("router")
.with_kind(SpanKind::Server)
.with_attributes([KeyValue::new("http.route", req.uri().path().to_string())])
.start_with_context(tracer, &parent_cx);

info!(name = "router", message = "Dispatching request");
Expand All @@ -105,6 +111,66 @@ async fn router(
response
}

#[derive(Debug, Default)]
/// A custom span processor that counts concurrent requests for each route (indentified by the http.route
/// attribute) and adds that information to the span attributes.
struct RouteConcurrencyCounterSpanProcessor(Mutex<HashMap<opentelemetry::Key, usize>>);

impl SpanProcessor for RouteConcurrencyCounterSpanProcessor {
fn force_flush(&self) -> OTelSdkResult {
Ok(())
}

fn shutdown_with_timeout(&self, _timeout: Duration) -> crate::OTelSdkResult {
Ok(())
}

fn on_start(&self, span: &mut opentelemetry_sdk::trace::Span, _cx: &Context) {
if !matches!(span.span_kind(), SpanKind::Server) {
return;
}
let Some(route) = span
.attributes()
.iter()
.find(|kv| kv.key.as_str() == "http.route")
else {
return;
};
let Ok(mut counts) = self.0.lock() else {
return;
};
let count = counts.entry(route.key.clone()).or_default();
*count += 1;
span.set_attribute(KeyValue::new(
"http.route.concurrent_requests",
*count as i64,
));
}

fn on_end(&self, span: &mut FinishedSpan) {
if !matches!(span.span_kind(), SpanKind::Server) {
return;
}
let Some(route) = span
.attributes()
.iter()
.find(|kv| kv.key.as_str() == "http.route")
else {
return;
};
let Ok(mut counts) = self.0.lock() else {
return;
};
let Some(count) = counts.get_mut(&route.key) else {
return;
};
*count -= 1;
if *count == 0 {
counts.remove(&route.key);
}
}
}

/// A custom log processor that enriches LogRecords with baggage attributes.
/// Baggage information is not added automatically without this processor.
#[derive(Debug)]
Expand Down Expand Up @@ -142,7 +208,7 @@ impl SpanProcessor for EnrichWithBaggageSpanProcessor {
}
}

fn on_end(&self, _span: opentelemetry_sdk::trace::SpanData) {}
fn on_end(&self, _span: &mut opentelemetry_sdk::trace::FinishedSpan) {}
}

fn init_tracer() -> SdkTracerProvider {
Expand All @@ -158,6 +224,7 @@ fn init_tracer() -> SdkTracerProvider {
// Setup tracerprovider with stdout exporter
// that prints the spans to stdout.
let provider = SdkTracerProvider::builder()
.with_span_processor(RouteConcurrencyCounterSpanProcessor::default())
.with_span_processor(EnrichWithBaggageSpanProcessor)
.with_simple_exporter(SpanExporter::default())
.build();
Expand Down
6 changes: 6 additions & 0 deletions opentelemetry-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,12 @@ name = "log"
harness = false
required-features = ["logs"]

[[bench]]
name = "span_processor_api"
harness = false
required-features = ["testing"]


[lib]
bench = false

Expand Down
5 changes: 3 additions & 2 deletions opentelemetry-sdk/benches/batch_span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ use opentelemetry::trace::{
SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you run the benchmark suite to do a performance regression against main ? Would be great to include.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I've run the existing BatchSpanProcessor benchmarks.
With experimental_span_processor_on_ending not enabled, the benchmark show no detectable changes in performance.

If the experimental_span_processor_on_ending feature is enabled, there is a slight cost (5%) which I believe comes from the fact we have to clone the tracer field, because we have to iterate over the span processor list through a shared reference, while passing the span mutably. Thus we have to clone the tracer associated with the span so we can split the ownership (it's in an Arc cell so this is not that expensive)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we missing a benchmark? It seems like some of your changes should improve performance -->

https://github.com/open-telemetry/opentelemetry-rust/pull/2962/files/1d3e6d89637daebad09a9f792ae095f63be48e1a#r2097787712

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, some benchmarks running multiple span processors were missing, I added them un this PR. The comparison between the main branch and this branch for the benchmarks is in this PR description.

Baseline - main branch:

SpanProcessorApi/0_processors
    time:   [339.66 ns 340.56 ns 341.47 ns]
SpanProcessorApi/1_processors
    time:   [373.10 ns 374.36 ns 375.60 ns]
SpanProcessorApi/2_processors
    time:   [803.10 ns 804.99 ns 807.03 ns]
SpanProcessorApi/4_processors
    time:   [1.2096 µs 1.2137 µs 1.2179 µs]

Candidate - paullegranddc:paullgdc/sdk/span_processor_api_refactor:

SpanProcessorApi/0_processors
    time:   [385.15 ns 386.14 ns 387.25 ns]
SpanProcessorApi/1_processors
    time:   [385.73 ns 387.17 ns 388.85 ns]
SpanProcessorApi/2_processors
    time:   [384.84 ns 385.66 ns 386.50 ns]
SpanProcessorApi/4_processors
    time:   [386.78 ns 388.17 ns 389.58 ns]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! This is exactly what I was hoping to see :D

};
use opentelemetry_sdk::testing::trace::NoopSpanExporter;
use opentelemetry_sdk::trace::SpanData;
use opentelemetry_sdk::trace::{
BatchConfigBuilder, BatchSpanProcessor, SpanEvents, SpanLinks, SpanProcessor,
};
use opentelemetry_sdk::trace::{FinishedSpan, SpanData};
use std::sync::Arc;
use tokio::runtime::Runtime;

Expand Down Expand Up @@ -62,7 +62,8 @@ fn criterion_benchmark(c: &mut Criterion) {
let spans = get_span_data();
handles.push(tokio::spawn(async move {
for span in spans {
span_processor.on_end(span);
let mut span = FinishedSpan::new(span);
span_processor.on_end(&mut span);
tokio::task::yield_now().await;
}
}));
Expand Down
88 changes: 88 additions & 0 deletions opentelemetry-sdk/benches/span_processor_api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
use std::time::Duration;

use criterion::{black_box, criterion_group, criterion_main, Criterion};
use opentelemetry::{
trace::{Span, Tracer, TracerProvider},
Context, KeyValue,
};
use opentelemetry_sdk::trace as sdktrace;

#[cfg(not(target_os = "windows"))]
use pprof::criterion::{Output, PProfProfiler};

/*
Adding results in comments for a quick reference.
Chip: Apple M1 Max
Total Number of Cores: 10 (8 performance and 2 efficiency)

SpanProcessorApi/0_processors
time: [385.15 ns 386.14 ns 387.25 ns]
SpanProcessorApi/1_processors
time: [385.73 ns 387.17 ns 388.85 ns]
SpanProcessorApi/2_processors
time: [384.84 ns 385.66 ns 386.50 ns]
SpanProcessorApi/4_processors
time: [386.78 ns 388.17 ns 389.58 ns]
*/

#[derive(Debug)]
struct NoopSpanProcessor;

impl sdktrace::SpanProcessor for NoopSpanProcessor {
fn on_start(&self, _span: &mut sdktrace::Span, _parent_cx: &Context) {}
fn on_end(&self, _span: &mut sdktrace::FinishedSpan) {}
fn force_flush(&self) -> opentelemetry_sdk::error::OTelSdkResult {
Ok(())
}
fn shutdown_with_timeout(&self, _timeout: Duration) -> opentelemetry_sdk::error::OTelSdkResult {
Ok(())
}
}

fn create_tracer(span_processors_count: usize) -> sdktrace::SdkTracer {
let mut builder = sdktrace::SdkTracerProvider::builder();
for _ in 0..span_processors_count {
builder = builder.with_span_processor(NoopSpanProcessor);
}
builder.build().tracer("tracer")
}

fn create_span(tracer: &sdktrace::Tracer) -> sdktrace::Span {
let mut span = tracer.start("foo");
span.set_attribute(KeyValue::new("key1", false));
span.set_attribute(KeyValue::new("key2", "hello"));
span.set_attribute(KeyValue::new("key4", 123.456));
span.add_event("my_event", vec![KeyValue::new("key1", "value1")]);
span
}

fn criterion_benchmark(c: &mut Criterion) {
let mut group = c.benchmark_group("SpanProcessorApi");
for i in [0, 1, 2, 4] {
group.bench_function(format!("{}_processors", i), |b| {
let tracer = create_tracer(i);
b.iter(|| {
black_box(create_span(&tracer));
});
});
}
}

#[cfg(not(target_os = "windows"))]
criterion_group! {
name = benches;
config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None)))
.warm_up_time(std::time::Duration::from_secs(1))
.measurement_time(std::time::Duration::from_secs(2));
targets = criterion_benchmark
}

#[cfg(target_os = "windows")]
criterion_group! {
name = benches;
config = Criterion::default().warm_up_time(std::time::Duration::from_secs(1))
.measurement_time(std::time::Duration::from_secs(2));
targets = criterion_benchmark
}

criterion_main!(benches);
4 changes: 2 additions & 2 deletions opentelemetry-sdk/src/trace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub use id_generator::{IdGenerator, RandomIdGenerator};
pub use links::SpanLinks;
pub use provider::{SdkTracerProvider, TracerProviderBuilder};
pub use sampler::{Sampler, ShouldSample};
pub use span::Span;
pub use span::{FinishedSpan, ReadableSpan, Span};
pub use span_limit::SpanLimits;
pub use span_processor::{
BatchConfig, BatchConfigBuilder, BatchSpanProcessor, BatchSpanProcessorBuilder,
Expand Down Expand Up @@ -137,7 +137,7 @@ mod tests {
}
}

fn on_end(&self, _span: SpanData) {
fn on_end(&self, _span: &mut FinishedSpan) {
// TODO: Accessing Context::current() will panic today and hence commented out.
// See https://github.com/open-telemetry/opentelemetry-rust/issues/2871
// let _c = Context::current();
Expand Down
8 changes: 4 additions & 4 deletions opentelemetry-sdk/src/trace/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,8 +471,8 @@ mod tests {
SERVICE_NAME, TELEMETRY_SDK_LANGUAGE, TELEMETRY_SDK_NAME, TELEMETRY_SDK_VERSION,
};
use crate::trace::provider::TracerProviderInner;
use crate::trace::{Config, Span, SpanProcessor};
use crate::trace::{SdkTracerProvider, SpanData};
use crate::trace::SdkTracerProvider;
use crate::trace::{Config, FinishedSpan, Span, SpanProcessor};
use crate::Resource;
use opentelemetry::trace::{Tracer, TracerProvider};
use opentelemetry::{Context, Key, KeyValue, Value};
Expand Down Expand Up @@ -526,7 +526,7 @@ mod tests {
.fetch_add(1, Ordering::SeqCst);
}

fn on_end(&self, _span: SpanData) {
fn on_end(&self, _span: &mut FinishedSpan) {
// ignore
}

Expand Down Expand Up @@ -789,7 +789,7 @@ mod tests {
// No operation needed for this processor
}

fn on_end(&self, _span: SpanData) {
fn on_end(&self, _span: &mut FinishedSpan) {
// No operation needed for this processor
}

Expand Down
Loading
Loading