diff --git a/lambda-runtime/src/lib.rs b/lambda-runtime/src/lib.rs index 7664a499..cad99de4 100644 --- a/lambda-runtime/src/lib.rs +++ b/lambda-runtime/src/lib.rs @@ -26,7 +26,7 @@ use tokio::io::{AsyncRead, AsyncWrite}; use tokio_stream::{Stream, StreamExt}; pub use tower::{self, service_fn, Service}; use tower::{util::ServiceFn, ServiceExt}; -use tracing::{error, trace}; +use tracing::{error, trace, Instrument}; mod requests; #[cfg(test)] @@ -120,15 +120,6 @@ where continue; } - let body = hyper::body::to_bytes(body).await?; - trace!("response body - {}", std::str::from_utf8(&body)?); - - #[cfg(debug_assertions)] - if parts.status.is_server_error() { - error!("Lambda Runtime server returned an unexpected error"); - return Err(parts.status.to_string().into()); - } - let ctx: Context = Context::try_from(parts.headers)?; let ctx: Context = ctx.with_config(config); let request_id = &ctx.request_id.clone(); @@ -138,55 +129,80 @@ where Some(trace_id) => env::set_var("_X_AMZN_TRACE_ID", trace_id), None => env::remove_var("_X_AMZN_TRACE_ID"), } - let body = match serde_json::from_slice(&body) { - Ok(body) => body, - Err(err) => { - let req = build_event_error_request(request_id, err)?; - client.call(req).await.expect("Unable to send response to Runtime APIs"); - return Ok(()); - } + let request_span = match xray_trace_id { + Some(trace_id) => tracing::span!( + tracing::Level::INFO, + "Lambda request", + requestId = request_id, + xrayTraceId = trace_id + ), + None => tracing::span!(tracing::Level::INFO, "Lambda request", requestId = request_id), }; - let req = match handler.ready().await { - Ok(handler) => { - // Catches panics outside of a `Future` - let task = - panic::catch_unwind(panic::AssertUnwindSafe(|| handler.call(LambdaEvent::new(body, ctx)))); - - let task = match task { - // Catches panics inside of the `Future` - Ok(task) => panic::AssertUnwindSafe(task).catch_unwind().await, - Err(err) => Err(err), - }; - - match task { - Ok(response) => match response { - Ok(response) => { - trace!("Ok response from handler (run loop)"); - EventCompletionRequest { - request_id, - body: response, + // Group the handling in one future and instrument it with the span + async { + let body = hyper::body::to_bytes(body).await?; + trace!("response body - {}", std::str::from_utf8(&body)?); + + #[cfg(debug_assertions)] + if parts.status.is_server_error() { + error!("Lambda Runtime server returned an unexpected error"); + return Err(parts.status.to_string().into()); + } + + let body = match serde_json::from_slice(&body) { + Ok(body) => body, + Err(err) => { + let req = build_event_error_request(request_id, err)?; + client.call(req).await.expect("Unable to send response to Runtime APIs"); + return Ok(()); + } + }; + + let req = match handler.ready().await { + Ok(handler) => { + // Catches panics outside of a `Future` + let task = + panic::catch_unwind(panic::AssertUnwindSafe(|| handler.call(LambdaEvent::new(body, ctx)))); + + let task = match task { + // Catches panics inside of the `Future` + Ok(task) => panic::AssertUnwindSafe(task).catch_unwind().await, + Err(err) => Err(err), + }; + + match task { + Ok(response) => match response { + Ok(response) => { + trace!("Ok response from handler (run loop)"); + EventCompletionRequest { + request_id, + body: response, + } + .into_req() } - .into_req() + Err(err) => build_event_error_request(request_id, err), + }, + Err(err) => { + error!("{:?}", err); + let error_type = type_name_of_val(&err); + let msg = if let Some(msg) = err.downcast_ref::<&str>() { + format!("Lambda panicked: {}", msg) + } else { + "Lambda panicked".to_string() + }; + EventErrorRequest::new(request_id, error_type, &msg).into_req() } - Err(err) => build_event_error_request(request_id, err), - }, - Err(err) => { - error!("{:?}", err); - let error_type = type_name_of_val(&err); - let msg = if let Some(msg) = err.downcast_ref::<&str>() { - format!("Lambda panicked: {}", msg) - } else { - "Lambda panicked".to_string() - }; - EventErrorRequest::new(request_id, error_type, &msg).into_req() } } - } - Err(err) => build_event_error_request(request_id, err), - }?; + Err(err) => build_event_error_request(request_id, err), + }?; - client.call(req).await.expect("Unable to send response to Runtime APIs"); + client.call(req).await.expect("Unable to send response to Runtime APIs"); + Ok::<(), Error>(()) + } + .instrument(request_span) + .await?; } Ok(()) }