diff --git a/Cargo.toml b/Cargo.toml index 84a25db..68886d8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ aws-sdk-eventbridge = "0.0.22-alpha" aws-smithy-client = { version = "0.27.0-alpha", features = ["test-util"] } aws-smithy-http = "0.27.0-alpha" aws-types = "0.0.22-alpha" +chrono = "0.4" futures = { version = "0.3", features = ["std"] } lambda_runtime = { version = "0.4", optional = true } lambda_http = { version = "0.4", optional = true } @@ -19,7 +20,7 @@ rayon = { version = "1.5", optional = true } serde = "1" serde_json = "1.0" tracing = "0.1" -tracing-subscriber = { version = "0.2", features = ["fmt", "json"] } +tracing-subscriber = "0.3" tokio = { version = "1", features = ["full"] } [dev-dependencies] diff --git a/src/entrypoints/lambda/apigateway.rs b/src/entrypoints/lambda/apigateway.rs index 45f0860..a922cfb 100644 --- a/src/entrypoints/lambda/apigateway.rs +++ b/src/entrypoints/lambda/apigateway.rs @@ -1,4 +1,4 @@ -use crate::{Product, Service}; +use crate::{Product, Service, utils::inject_lambda_context}; use lambda_http::{ ext::RequestExt, lambda_runtime::Context, Body, IntoResponse, Request, Response, }; @@ -12,8 +12,11 @@ type E = Box; pub async fn delete_product( service: &Service, event: Request, - _: Context, + ctx: Context, ) -> Result { + let span = inject_lambda_context(&ctx); + let _guard = span.enter(); + // Retrieve product ID from event // // If the event doesn't contain a product ID, we return a 400 Bad Request. @@ -62,8 +65,11 @@ pub async fn delete_product( pub async fn get_product( service: &Service, event: Request, - _: Context, + ctx: Context, ) -> Result { + let span = inject_lambda_context(&ctx); + let _guard = span.enter(); + // Retrieve product ID from event. // // If the event doesn't contain a product ID, we return a 400 Bad Request. @@ -112,8 +118,11 @@ pub async fn get_product( pub async fn get_products( service: &Service, _event: Request, - _: Context, + ctx: Context, ) -> Result { + let span = inject_lambda_context(&ctx); + let _guard = span.enter(); + // Retrieve products // TODO: Add pagination let res = service.get_products(None).await; @@ -138,8 +147,11 @@ pub async fn get_products( pub async fn put_product( service: &Service, event: Request, - _: Context, + ctx: Context, ) -> Result { + let span = inject_lambda_context(&ctx); + let _guard = span.enter(); + // Retrieve product ID from event. // // If the event doesn't contain a product ID, we return a 400 Bad Request. diff --git a/src/entrypoints/lambda/dynamodb/mod.rs b/src/entrypoints/lambda/dynamodb/mod.rs index a57d93d..40e4fa1 100644 --- a/src/entrypoints/lambda/dynamodb/mod.rs +++ b/src/entrypoints/lambda/dynamodb/mod.rs @@ -1,4 +1,4 @@ -use crate::{Event, Service}; +use crate::{Event, Service, utils::inject_lambda_context}; use lambda_runtime::Context; use rayon::prelude::*; use tracing::{info, instrument}; @@ -15,8 +15,11 @@ type E = Box; pub async fn parse_events( service: &Service, event: model::DynamoDBEvent, - _: Context, + ctx: Context, ) -> Result<(), E> { + let span = inject_lambda_context(&ctx); + let _guard = span.enter(); + info!("Transform events"); let events = event .records diff --git a/src/utils.rs b/src/utils/mod.rs similarity index 86% rename from src/utils.rs rename to src/utils/mod.rs index 6156c90..039c3e6 100644 --- a/src/utils.rs +++ b/src/utils/mod.rs @@ -1,11 +1,8 @@ use crate::{event_bus, model, store, Service}; use tracing::{info, instrument}; -/// Setup tracing -pub fn setup_tracing() { - let subscriber = tracing_subscriber::fmt().json().finish(); - tracing::subscriber::set_global_default(subscriber).expect("failed to set tracing subscriber"); -} +mod trace; +pub use trace::{inject_lambda_context, setup_tracing}; /// Retrieve a service /// diff --git a/src/utils/trace.rs b/src/utils/trace.rs new file mode 100644 index 0000000..48d4a19 --- /dev/null +++ b/src/utils/trace.rs @@ -0,0 +1,156 @@ +use chrono::prelude::*; +use lambda_runtime::Context; +use std::collections::BTreeMap; +use tracing_subscriber::{prelude::*, Layer}; +use tracing::{span::Attributes, Id}; + +/// Setup tracing +pub fn setup_tracing() { + let layer = LambdaLayer::new(tracing::Level::INFO); + tracing_subscriber::registry().with(layer).init(); +} + +pub fn inject_lambda_context(ctx: &Context) -> tracing::Span { + let ctx_string = serde_json::to_string(ctx).unwrap(); + let ctx_str = ctx_string.as_str(); + tracing::span!(tracing::Level::TRACE, "lambda_handler", lambda_context = ctx_str) +} + +struct LambdaVisitor<'a> { + pub data: &'a mut BTreeMap, +} + +impl<'a> tracing::field::Visit for LambdaVisitor<'a> { + fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { + self.data + .insert(field.name().to_string(), format!("{:?}", value).into()); + } + + fn record_i64(&mut self, field: &tracing::field::Field, value: i64) { + self.data.insert(field.name().to_string(), value.into()); + } + + fn record_u64(&mut self, field: &tracing::field::Field, value: u64) { + self.data.insert(field.name().to_string(), value.into()); + } + + fn record_bool(&mut self, field: &tracing::field::Field, value: bool) { + self.data.insert(field.name().to_string(), value.into()); + } + + fn record_str(&mut self, field: &tracing::field::Field, value: &str) { + self.data.insert(field.name().to_string(), value.into()); + } + + fn record_error( + &mut self, + field: &tracing::field::Field, + value: &(dyn std::error::Error + 'static), + ) { + self.data + .insert(field.name().to_string(), format!("{:?}", value).into()); + } +} + +struct LambdaContextVisitor { + pub context: Option, +} + +impl tracing::field::Visit for LambdaContextVisitor { + fn record_debug(&mut self, _field: &tracing::field::Field, _value: &dyn std::fmt::Debug) { + } + + fn record_str(&mut self, field: &tracing::field::Field, value: &str) { + if field.name() == "lambda_context" { + let context = serde_json::from_str(value).ok(); + self.context = context; + } + } +} + +struct LambdaLayer { + level: tracing::Level, +} + +impl LambdaLayer { + pub fn new(level: tracing::Level) -> Self { + Self { level } + } +} + +impl Layer for LambdaLayer +where + S: tracing::Subscriber, + S: for<'lookup> tracing_subscriber::registry::LookupSpan<'lookup>, +{ + fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: tracing_subscriber::layer::Context<'_, S>) { + let mut visitor = LambdaContextVisitor { context: None }; + attrs.record(&mut visitor); + if let Some(context) = visitor.context { + let span = ctx.span(id).unwrap(); + let mut extensions = span.extensions_mut(); + extensions.insert(context); + } + } + + fn on_event(&self, event: &tracing::Event<'_>, ctx: tracing_subscriber::layer::Context<'_, S>) { + let metadata = event.metadata(); + if metadata.level() > &self.level { + return; + } + + // Find Lambda context + let lambda_ctxs = if let Some(scope) = ctx.event_scope(event) { + scope + .from_root() + .map(|span| { + if let Some(v) = span.extensions().get::() { + Some(v.clone()) + } else { + None + } + }) + .filter_map(|c| c) + .collect::>() + } else { + Default::default() + }; + let lambda_ctx = lambda_ctxs.first(); + + let mut data = BTreeMap::new(); + let mut visitor = LambdaVisitor { data: &mut data }; + event.record(&mut visitor); + + let output = serde_json::json!({ + "level": metadata.level().to_string(), + "location": format!("{}:{}", metadata.file().unwrap_or("UNKNOWN"), metadata.line().unwrap_or(0)), + "target": metadata.target(), + // If data has only one key named 'message', we can just use that as the message. + // This is the default key when using macros such as `info!()` or `debug!()`. + "message": if data.len() == 1 && data.contains_key("message") { + data.remove("message").unwrap().into() + } else { + serde_json::to_value(data).unwrap() + }, + "timestamp": Utc::now().to_rfc3339(), + }); + + let output = if let Some(lambda_ctx) = lambda_ctx { + if let serde_json::Value::Object(mut output) = output { + output.insert("function_name".to_string(), lambda_ctx.env_config.function_name.clone().into()); + output.insert("function_memory_size".to_string(), lambda_ctx.env_config.memory.into()); + output.insert("function_arn".to_string(), lambda_ctx.invoked_function_arn.clone().into()); + output.insert("function_request_id".to_string(), lambda_ctx.request_id.clone().into()); + output.insert("xray_trace_id".to_string(), lambda_ctx.xray_trace_id.clone().into()); + + serde_json::Value::Object(output) + } else { + output + } + } else { + output + }; + + println!("{}", serde_json::to_string(&output).unwrap()); + } +}