Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions examples/extension-telemetry-basic/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[package]
name = "extension-telemetry-basic"
version = "0.1.0"
edition = "2021"


# Use cargo-edit(https://github.com/killercup/cargo-edit#installation)
# to manage dependencies.
# Running `cargo add DEPENDENCY_NAME` will
# add the latest version of a dependency to the list,
# and it will keep the alphabetic ordering for you.

[dependencies]
lambda-extension = { path = "../../lambda-extension" }
serde = "1.0.136"
tokio = { version = "1", features = ["macros", "rt"] }
tracing = { version = "0.1", features = ["log"] }
tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt"] }


14 changes: 14 additions & 0 deletions examples/extension-telemetry-basic/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# AWS Lambda Telemetry extension example

## Build & Deploy

1. Install [cargo-lambda](https://github.com/cargo-lambda/cargo-lambda#installation)
2. Build the extension with `cargo lambda build --release --extension`
3. Deploy the extension as a layer with `cargo lambda deploy --extension`

The last command will give you an ARN for the extension layer that you can use in your functions.


## Build for ARM 64

Build the extension with `cargo lambda build --release --extension --arm64`
59 changes: 59 additions & 0 deletions examples/extension-telemetry-basic/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use lambda_extension::{service_fn, Error, Extension, LambdaTelemetry, LambdaTelemetryRecord, SharedService};
use tracing::info;

async fn handler(events: Vec<LambdaTelemetry>) -> Result<(), Error> {
for event in events {
match event.record {
LambdaTelemetryRecord::Function(record) => info!("[logs] [function] {}", record),
LambdaTelemetryRecord::PlatformInitStart {
initialization_type: _,
phase: _,
runtime_version: _,
runtime_version_arn: _,
} => info!("[platform] Initialization started"),
LambdaTelemetryRecord::PlatformInitRuntimeDone {
initialization_type: _,
phase: _,
status: _,
error_type: _,
spans: _,
} => info!("[platform] Initialization finished"),
LambdaTelemetryRecord::PlatformStart {
request_id,
version: _,
tracing: _,
} => info!("[platform] Handling of request {} started", request_id),
LambdaTelemetryRecord::PlatformRuntimeDone {
request_id,
status: _,
error_type: _,
metrics: _,
spans: _,
tracing: _,
} => info!("[platform] Handling of request {} finished", request_id),
_ => (),
}
}

Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Error> {
// The runtime logging can be enabled here by initializing `tracing` with `tracing-subscriber`
// While `tracing` is used internally, `log` can be used as well if preferred.
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
// disabling time is handy because CloudWatch will add the ingestion time.
.without_time()
.init();

let telemetry_processor = SharedService::new(service_fn(handler));

Extension::new()
.with_telemetry_processor(telemetry_processor)
.run()
.await?;

Ok(())
}
41 changes: 40 additions & 1 deletion lambda-extension/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

### Simple extension

The code below creates a simple extension that's registered to every `INVOKE` and `SHUTDOWN` events, and logs them in CloudWatch.
The code below creates a simple extension that's registered to every `INVOKE` and `SHUTDOWN` events.

```rust,no_run
use lambda_extension::{service_fn, Error, LambdaEvent, NextEvent};
Expand Down Expand Up @@ -75,6 +75,45 @@ async fn main() -> Result<(), Error> {

```

### Telemetry processor extension

```rust,no_run
use lambda_extension::{service_fn, Error, Extension, LambdaTelemetry, LambdaTelemetryRecord, SharedService};
use tracing::info;

async fn handler(events: Vec<LambdaTelemetry>) -> Result<(), Error> {
for event in events {
match event.record {
LambdaTelemetryRecord::Function(record) => {
// do something with the function log record
},
LambdaTelemetryRecord::PlatformInitStart {
initialization_type: _,
phase: _,
runtime_version: _,
runtime_version_arn: _,
} => {
// do something with the PlatformInitStart event
},
// more types of telemetry events are available
_ => (),
}
}

Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Error> {
let telemetry_processor = SharedService::new(service_fn(handler));

Extension::new().with_telemetry_processor(telemetry_processor).run().await?;

Ok(())
}

```

## Deployment

Lambda extensions can be added to your functions either using [Lambda layers](https://docs.aws.amazon.com/lambda/latest/dg/using-extensions.html#using-extensions-config), or adding them to [containers images](https://docs.aws.amazon.com/lambda/latest/dg/using-extensions.html#invocation-extensions-images).
Expand Down
132 changes: 123 additions & 9 deletions lambda-extension/src/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,31 @@ use tokio_stream::StreamExt;
use tower::{service_fn, MakeService, Service, ServiceExt};
use tracing::{error, trace};

use crate::{logs::*, requests, Error, ExtensionError, LambdaEvent, NextEvent};
use crate::{
logs::*,
requests::{self, Api},
telemetry_wrapper, Error, ExtensionError, LambdaEvent, LambdaTelemetry, NextEvent,
};

const DEFAULT_LOG_PORT_NUMBER: u16 = 9002;
const DEFAULT_TELEMETRY_PORT_NUMBER: u16 = 9003;

/// An Extension that runs event and log processors
pub struct Extension<'a, E, L> {
/// An Extension that runs event, log and telemetry processors
pub struct Extension<'a, E, L, T> {
extension_name: Option<&'a str>,
events: Option<&'a [&'a str]>,
events_processor: E,
log_types: Option<&'a [&'a str]>,
logs_processor: Option<L>,
log_buffering: Option<LogBuffering>,
log_port_number: u16,
telemetry_types: Option<&'a [&'a str]>,
telemetry_processor: Option<T>,
telemetry_buffering: Option<LogBuffering>,
telemetry_port_number: u16,
}

impl<'a> Extension<'a, Identity<LambdaEvent>, MakeIdentity<Vec<LambdaLog>>> {
impl<'a> Extension<'a, Identity<LambdaEvent>, MakeIdentity<Vec<LambdaLog>>, MakeIdentity<Vec<LambdaTelemetry>>> {
/// Create a new base [`Extension`] with a no-op events processor
pub fn new() -> Self {
Extension {
Expand All @@ -35,17 +44,23 @@ impl<'a> Extension<'a, Identity<LambdaEvent>, MakeIdentity<Vec<LambdaLog>>> {
log_buffering: None,
logs_processor: None,
log_port_number: DEFAULT_LOG_PORT_NUMBER,
telemetry_types: None,
telemetry_buffering: None,
telemetry_processor: None,
telemetry_port_number: DEFAULT_TELEMETRY_PORT_NUMBER,
}
}
}

impl<'a> Default for Extension<'a, Identity<LambdaEvent>, MakeIdentity<Vec<LambdaLog>>> {
impl<'a> Default
for Extension<'a, Identity<LambdaEvent>, MakeIdentity<Vec<LambdaLog>>, MakeIdentity<Vec<LambdaTelemetry>>>
{
fn default() -> Self {
Self::new()
}
}

impl<'a, E, L> Extension<'a, E, L>
impl<'a, E, L, T> Extension<'a, E, L, T>
where
E: Service<LambdaEvent>,
E::Future: Future<Output = Result<(), E::Error>>,
Expand All @@ -58,6 +73,14 @@ where
L::Error: Into<Box<dyn std::error::Error + Send + Sync>> + fmt::Debug,
L::MakeError: Into<Box<dyn std::error::Error + Send + Sync>> + fmt::Debug,
L::Future: Send,

// Fixme: 'static bound might be too restrictive
T: MakeService<(), Vec<LambdaTelemetry>, Response = ()> + Send + Sync + 'static,
T::Service: Service<Vec<LambdaTelemetry>, Response = ()> + Send + Sync,
<T::Service as Service<Vec<LambdaTelemetry>>>::Future: Send + 'a,
T::Error: Into<Box<dyn std::error::Error + Send + Sync>> + fmt::Debug,
T::MakeError: Into<Box<dyn std::error::Error + Send + Sync>> + fmt::Debug,
T::Future: Send,
{
/// Create a new [`Extension`] with a given extension name
pub fn with_extension_name(self, extension_name: &'a str) -> Self {
Expand All @@ -77,7 +100,7 @@ where
}

/// Create a new [`Extension`] with a service that receives Lambda events.
pub fn with_events_processor<N>(self, ep: N) -> Extension<'a, N, L>
pub fn with_events_processor<N>(self, ep: N) -> Extension<'a, N, L, T>
where
N: Service<LambdaEvent>,
N::Future: Future<Output = Result<(), N::Error>>,
Expand All @@ -91,11 +114,15 @@ where
log_buffering: self.log_buffering,
logs_processor: self.logs_processor,
log_port_number: self.log_port_number,
telemetry_types: self.telemetry_types,
telemetry_buffering: self.telemetry_buffering,
telemetry_processor: self.telemetry_processor,
telemetry_port_number: self.telemetry_port_number,
}
}

/// Create a new [`Extension`] with a service that receives Lambda logs.
pub fn with_logs_processor<N, NS>(self, lp: N) -> Extension<'a, E, N>
pub fn with_logs_processor<N, NS>(self, lp: N) -> Extension<'a, E, N, T>
where
N: Service<()>,
N::Future: Future<Output = Result<NS, N::Error>>,
Expand All @@ -109,6 +136,10 @@ where
log_types: self.log_types,
log_buffering: self.log_buffering,
log_port_number: self.log_port_number,
telemetry_types: self.telemetry_types,
telemetry_buffering: self.telemetry_buffering,
telemetry_processor: self.telemetry_processor,
telemetry_port_number: self.telemetry_port_number,
}
}

Expand Down Expand Up @@ -137,6 +168,53 @@ where
}
}

/// Create a new [`Extension`] with a service that receives Lambda telemetry data.
pub fn with_telemetry_processor<N, NS>(self, lp: N) -> Extension<'a, E, L, N>
where
N: Service<()>,
N::Future: Future<Output = Result<NS, N::Error>>,
N::Error: Into<Box<dyn std::error::Error + Send + Sync>> + fmt::Display,
{
Extension {
telemetry_processor: Some(lp),
events_processor: self.events_processor,
extension_name: self.extension_name,
events: self.events,
log_types: self.log_types,
log_buffering: self.log_buffering,
logs_processor: self.logs_processor,
log_port_number: self.log_port_number,
telemetry_types: self.telemetry_types,
telemetry_buffering: self.telemetry_buffering,
telemetry_port_number: self.telemetry_port_number,
}
}

/// Create a new [`Extension`] with a list of telemetry types to subscribe.
/// The only accepted telemetry types are `function`, `platform`, and `extension`.
pub fn with_telemetry_types(self, telemetry_types: &'a [&'a str]) -> Self {
Extension {
telemetry_types: Some(telemetry_types),
..self
}
}

/// Create a new [`Extension`] with specific configuration to buffer telemetry.
pub fn with_telemetry_buffering(self, lb: LogBuffering) -> Self {
Extension {
telemetry_buffering: Some(lb),
..self
}
}

/// Create a new [`Extension`] with a different port number to listen to telemetry.
pub fn with_telemetry_port_number(self, port_number: u16) -> Self {
Extension {
telemetry_port_number: port_number,
..self
}
}

/// Execute the given extension
pub async fn run(self) -> Result<(), Error> {
let client = &Client::builder().build()?;
Expand Down Expand Up @@ -166,7 +244,8 @@ where
trace!("Log processor started");

// Call Logs API to start receiving events
let req = requests::subscribe_logs_request(
let req = requests::subscribe_request(
Api::LogsApi,
extension_id,
self.log_types,
self.log_buffering,
Expand All @@ -179,6 +258,41 @@ where
trace!("Registered extension with Logs API");
}

if let Some(mut telemetry_processor) = self.telemetry_processor {
trace!("Telemetry processor found");
// Spawn task to run processor
let addr = SocketAddr::from(([0, 0, 0, 0], self.telemetry_port_number));
let make_service = service_fn(move |_socket: &AddrStream| {
trace!("Creating new telemetry processor Service");
let service = telemetry_processor.make_service(());
async move {
let service = Arc::new(Mutex::new(service.await?));
Ok::<_, T::MakeError>(service_fn(move |req| telemetry_wrapper(service.clone(), req)))
}
});
let server = Server::bind(&addr).serve(make_service);
tokio::spawn(async move {
if let Err(e) = server.await {
error!("Error while running telemetry processor: {}", e);
}
});
trace!("Telemetry processor started");

// Call Telemetry API to start receiving events
let req = requests::subscribe_request(
Api::TelemetryApi,
extension_id,
self.telemetry_types,
self.telemetry_buffering,
self.telemetry_port_number,
)?;
let res = client.call(req).await?;
if res.status() != http::StatusCode::OK {
return Err(ExtensionError::boxed("unable to initialize the telemetry api"));
}
trace!("Registered extension with Telemetry API");
}

let incoming = async_stream::stream! {
loop {
trace!("Waiting for next event (incoming loop)");
Expand Down
2 changes: 2 additions & 0 deletions lambda-extension/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ mod events;
pub use events::*;
mod logs;
pub use logs::*;
mod telemetry;
pub use telemetry::*;

/// Include several request builders to interact with the Extension API.
pub mod requests;
Expand Down
Loading