Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .rustfmt.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
reorder_imports = true
merge_imports = true
max_width = 120
94 changes: 22 additions & 72 deletions lambda-runtime-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,47 +44,6 @@ impl fmt::Display for LambdaHeaders {
}
}

/// Trait that defines the methods that should be implemented by a Lambda Runtime API client.
/// Events are returned as a `Vec<u8>` and output for the `event_response()` method should
/// also be passed as a `Vec<u8>`. Serialization and deserialization are up to the runtime
/// library. When returning an error to the `event_error()` or `fail_init()` methods the objects
/// should implement the `error::RuntimeApiError` triat.
pub trait RuntimeClient {
/// Should fetch the next event from the Runtime APIs.
///
/// # Return
/// A tuple of the event raw bytes and the `EventContext` object extracted from the
/// Lambda Runtime API headers - see the `LambdaHeaders` enum.
fn next_event(&self) -> Result<(Vec<u8>, EventContext), ApiError>;
/// Should return the handler response to the Runtime APIs.
///
/// # Arguments
///
/// * `request_id` The original AWS request id received from the `poll_events()` method.
/// * `output` The object returned by the handler serialized to its raw bytes representation
/// as a `Vec<u8>`.
///
/// # Return
/// A `Result<(), RuntimeError>` signifying the API call's success or failure.
fn event_response(&self, request_id: &str, output: Vec<u8>) -> Result<(), ApiError>;
/// Should send an error response to the Runtime APIs.
///
/// # Arguments
///
/// * `request_id` The original AWS request id received from the `poll_events()` method.
/// * `e` The `errors::HandlerError` returned by the handler function.
///
/// # Return
/// A `Result<(), RuntimeError>` signifying the API call's success or failure. A unit struct
/// signifies a lack of an error.
fn event_error(&self, request_id: &str, e: &RuntimeApiError) -> Result<(), ApiError>;
/// Should tell the Runtime APIs that the custom runtime has failed to initialize and the
/// execution environment should be terminated.
fn fail_init(&self, e: &RuntimeApiError);
/// Returns the endpoint this client is communicating with. Primarily used for debugging.
fn get_endpoint(&self) -> String;
}

/// AWS Moble SDK client properties
#[derive(Deserialize, Clone)]
pub struct ClientApplication {
Expand Down Expand Up @@ -148,25 +107,16 @@ pub struct EventContext {
pub identity: Option<CognitoIdentity>,
}

/// The client SDK for Lambda's Runtime APIs that implements the `RuntimeClient` trait.
/// This object is used by the `RustRuntime` to communicate with the internal endpoint.
pub struct HttpRuntimeClient {
/// Used by the Runtime to communicate with the internal endpoint.
pub struct RuntimeClient {
_runtime: Runtime,
http_client: Client<HttpConnector, Body>,
endpoint: String,
}

impl HttpRuntimeClient {
impl RuntimeClient {
/// Creates a new instance of the Runtime APIclient SDK. The http client has timeouts disabled and
/// always send the `Connection: keep-alive` header.
///
/// # Arguments
///
/// * `http_endpoint` The http endpoint for the Runtime APIs. This value comes from the AWS_LAMBDA_RUNTIME_API
/// environment variable.
///
/// # Return
/// An instance of the Runtime APIs client SDK.
/// will always send a `Connection: keep-alive` header.
pub fn new(endpoint: String, runtime: Option<Runtime>) -> Result<Self, ApiError> {
debug!("Starting new HttpRuntimeClient for {}", endpoint);
// start a tokio core main event loop for hyper
Expand All @@ -177,20 +127,17 @@ impl HttpRuntimeClient {

let http_client = Client::builder().executor(runtime.executor()).build_http();

Ok(HttpRuntimeClient {
Ok(RuntimeClient {
_runtime: runtime,
http_client,
endpoint,
})
}
}

impl RuntimeClient for HttpRuntimeClient {
/// Makes the HTTP call to poll for new events to the Runtime APIs.
///
/// # Return
/// A `Result` containing a tuple of the new event of type `E` and its `EventContext`.
fn next_event(&self) -> Result<(Vec<u8>, EventContext), ApiError> {
impl RuntimeClient {
/// Polls for new events to the Runtime APIs.
pub fn next_event(&self) -> Result<(Vec<u8>, EventContext), ApiError> {
let uri = format!(
"http://{}/{}/runtime/invocation/next",
self.endpoint, RUNTIME_API_VERSION
Expand Down Expand Up @@ -252,7 +199,7 @@ impl RuntimeClient for HttpRuntimeClient {
///
/// # Returns
/// A `Result` object containing a bool return value for the call or an `error::ApiError` instance.
fn event_response(&self, request_id: &str, output: Vec<u8>) -> Result<(), ApiError> {
pub fn event_response(&self, request_id: &str, output: Vec<u8>) -> Result<(), ApiError> {
let uri: Uri = format!(
"http://{}/{}/runtime/invocation/{}/response",
self.endpoint, RUNTIME_API_VERSION, request_id
Expand Down Expand Up @@ -299,7 +246,7 @@ impl RuntimeClient for HttpRuntimeClient {
///
/// # Returns
/// A `Result` object containing a bool return value for the call or an `error::ApiError` instance.
fn event_error(&self, request_id: &str, e: &RuntimeApiError) -> Result<(), ApiError> {
pub fn event_error(&self, request_id: &str, e: &RuntimeApiError) -> Result<(), ApiError> {
let uri: Uri = format!(
"http://{}/{}/runtime/invocation/{}/error",
self.endpoint, RUNTIME_API_VERSION, request_id
Expand Down Expand Up @@ -346,30 +293,33 @@ impl RuntimeClient for HttpRuntimeClient {
/// # Panics
/// If it cannot send the init error. In this case we panic to force the runtime
/// to restart.
fn fail_init(&self, e: &RuntimeApiError) {
pub fn fail_init(&self, e: &RuntimeApiError) {
let uri: Uri = format!("http://{}/{}/runtime/init/error", self.endpoint, RUNTIME_API_VERSION)
.parse()
.expect("Could not generate Runtime API URI");
.expect("Could not generate Runtime URI");
error!("Calling fail_init Runtime API: {}", e.to_response().error_message);
let err_body = serde_json::to_vec(&e.to_response()).expect("Could not serialize error object");
let req = self.get_runtime_post_request(&uri, err_body);

match self.http_client.request(req).wait() {
Ok(_) => {}
Err(e) => {
let resp = self
.http_client
.request(req)
.map_err(|e| {
error!("Error while sending init failed message: {}", e);
panic!("Error while sending init failed message: {}", e);
}
}
}).map(|resp| {
info!("Successfully sent error response to the runtime API: {:?}", resp);
});
tokio::spawn(resp);
}

/// Returns the endpoint configured for this HTTP Runtime client.
fn get_endpoint(&self) -> String {
pub fn get_endpoint(&self) -> String {
self.endpoint.clone()
}
}

impl HttpRuntimeClient {
impl RuntimeClient {
/// Creates a Hyper `Request` object for the given `Uri` and `Body`. Sets the
/// HTTP method to `POST` and the `Content-Type` header value to `application/json`.
///
Expand Down
8 changes: 3 additions & 5 deletions lambda-runtime-client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
#![warn(missing_docs)]
//! Rust client SDK for the AWS Lambda Runtime APIs. This crate defines
//! both a `RuntimeClient` trait and it's implementation: `HttpRuntimeClient`.
//! The `HttpRuntimeClient` object exposes all of the methods available in
//! AWS Lambda's Runtime APIs and it can be used to build any rust-related
//! custom runtime in Lambda.
//! a `RuntimeClient` that encapsulates interactions with AWS Lambda's Runtime
//! APIs.
//!
//! To return errors to the Runtime APIs through the `event_error()` or
//! `fail_init()` methods the `Error` objects must implement the `error::RuntimeApiError`
Expand Down Expand Up @@ -33,7 +31,7 @@
//!
//! fn main() {
//! let runtime_endpoint = String::from("http://localhost:8080");
//! let client: &RuntimeClient = &HttpRuntimeClient::new(runtime_endpoint, None)
//! let client = RuntimeClient::new(runtime_endpoint, None)
//! .expect("Could not initialize client");
//!
//! let (event_data, event_context) = client.next_event()
Expand Down
6 changes: 3 additions & 3 deletions lambda-runtime/examples/basic.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
extern crate lambda_runtime as lambda;
extern crate serde_derive;
extern crate log;
extern crate serde_derive;
extern crate simple_logger;

use serde_derive::{Serialize, Deserialize};
use lambda::{lambda, error::HandlerError};
use lambda::{error::HandlerError, lambda};
use log::error;
use serde_derive::{Deserialize, Serialize};
use std::error::Error;

#[derive(Deserialize)]
Expand Down
8 changes: 4 additions & 4 deletions lambda-runtime/examples/with_custom_runtime.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
extern crate lambda_runtime as lambda;
extern crate serde_derive;
extern crate log;
extern crate serde_derive;
extern crate simple_logger;
extern crate tokio;

use lambda::{lambda, error::HandlerError};
use tokio::runtime::Runtime;
use serde_derive::{Serialize, Deserialize};
use lambda::{error::HandlerError, lambda};
use log::error;
use serde_derive::{Deserialize, Serialize};
use std::error::Error;
use tokio::runtime::Runtime;

#[derive(Deserialize, Clone)]
struct CustomEvent {
Expand Down
6 changes: 6 additions & 0 deletions lambda-runtime/src/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ pub trait ConfigProvider {
/// used by the `start()` method of this module.
pub struct EnvConfigProvider;

impl EnvConfigProvider {
pub fn new() -> Self {
EnvConfigProvider {}
}
}

impl ConfigProvider for EnvConfigProvider {
/// Loads the function settings from the Lambda environment variables:
/// https://docs.aws.amazon.com/lambda/latest/dg/current-supported-versions.html
Expand Down
42 changes: 18 additions & 24 deletions lambda-runtime/src/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use std::{error::Error, result};

use lambda_runtime_client;
use serde;
use serde_json;

use context::Context;
use env::{ConfigProvider, EnvConfigProvider, FunctionSettings};
use error::{HandlerError, RuntimeError};
use lambda_runtime_client::RuntimeClient;
use tokio::runtime::Runtime as TokioRuntime;

const MAX_RETRIES: i8 = 3;
Expand All @@ -22,12 +22,12 @@ pub type Handler<E, O> = fn(E, Context) -> Result<O, HandlerError>;
///
/// # Panics
/// The function panics if the Lambda environment variables are not set.
pub fn start<E: 'static, O: 'static>(f: Handler<E, O>, runtime: Option<TokioRuntime>)
pub fn start<E, O>(f: Handler<E, O>, runtime: Option<TokioRuntime>)
where
for<'invocation> E: serde::Deserialize<'invocation>,
O: serde::Serialize,
{
start_with_config(f, &EnvConfigProvider {}, runtime)
start_with_config(f, EnvConfigProvider::new(), runtime)
}

#[macro_export]
Expand All @@ -53,13 +53,11 @@ macro_rules! lambda {
/// The function panics if the `ConfigProvider` returns an error from the `get_runtime_api_endpoint()`
/// or `get_function_settings()` methods. The panic forces AWS Lambda to terminate the environment
/// and spin up a new one for the next invocation.
pub(crate) fn start_with_config<E: 'static, O: 'static>(
f: Handler<E, O>,
config: &'static ConfigProvider,
runtime: Option<TokioRuntime>,
) where
pub(crate) fn start_with_config<E, O, C>(f: Handler<E, O>, config: C, runtime: Option<TokioRuntime>)
where
for<'invocation> E: serde::Deserialize<'invocation>,
O: serde::Serialize,
C: ConfigProvider,
{
// if we cannot find the endpoint we panic, nothing else we can do.
let endpoint: String;
Expand All @@ -81,10 +79,9 @@ pub(crate) fn start_with_config<E: 'static, O: 'static>(
}
}

match lambda_runtime_client::HttpRuntimeClient::new(endpoint, runtime) {
match RuntimeClient::new(endpoint, runtime) {
Ok(client) => {
let trait_client: &lambda_runtime_client::RuntimeClient = &client;
start_with_runtime_client(f, function_config, trait_client);
start_with_runtime_client(f, function_config, client);
}
Err(e) => {
panic!("Could not create runtime client SDK: {}", e);
Expand All @@ -103,11 +100,8 @@ pub(crate) fn start_with_config<E: 'static, O: 'static>(
///
/// # Panics
/// The function panics if we cannot instantiate a new `RustRuntime` object.
pub(crate) fn start_with_runtime_client<'env, E: 'static, O: 'static>(
f: Handler<E, O>,
func_settings: FunctionSettings,
client: &'env lambda_runtime_client::RuntimeClient,
) where
pub(crate) fn start_with_runtime_client<E, O>(f: Handler<E, O>, func_settings: FunctionSettings, client: RuntimeClient)
where
for<'invocation> E: serde::Deserialize<'invocation>,
O: serde::Serialize,
{
Expand All @@ -125,15 +119,15 @@ pub(crate) fn start_with_runtime_client<'env, E: 'static, O: 'static>(

/// Internal representation of the runtime object that polls for events and communicates
/// with the Runtime APIs
pub(super) struct Runtime<'env, E: 'static, O: 'static> {
runtime_client: &'env lambda_runtime_client::RuntimeClient,
pub(super) struct Runtime<E, O> {
runtime_client: RuntimeClient,
handler: Handler<E, O>,
max_retries: i8,
settings: FunctionSettings,
}

// generic methods implementation
impl<'env, E, O> Runtime<'env, E, O> {
impl<E, O> Runtime<E, O> {
/// Creates a new instance of the `Runtime` object populated with the environment
/// settings.
///
Expand All @@ -151,8 +145,8 @@ impl<'env, E, O> Runtime<'env, E, O> {
f: Handler<E, O>,
config: FunctionSettings,
retries: i8,
client: &'env lambda_runtime_client::RuntimeClient,
) -> result::Result<Runtime<'env, E, O>, RuntimeError> {
client: RuntimeClient,
) -> result::Result<Runtime<E, O>, RuntimeError> {
debug!(
"Creating new runtime with {} max retries for endpoint {}",
retries,
Expand All @@ -169,7 +163,7 @@ impl<'env, E, O> Runtime<'env, E, O> {

// implementation of methods that require the Event and Output types
// to be compatible with `serde`'s Deserialize/Serialize.
impl<'env, E, O> Runtime<'env, E, O>
impl<'env, E, O> Runtime<E, O>
where
for<'de> E: serde::Deserialize<'de>,
O: serde::Serialize,
Expand Down Expand Up @@ -300,12 +294,12 @@ pub(crate) mod tests {
use super::*;
use context;
use env;
use lambda_runtime_client as cli;
use lambda_runtime_client::RuntimeClient;

#[test]
fn runtime_invokes_handler() {
let config: &env::ConfigProvider = &env::tests::MockConfigProvider { error: false };
let client: &lambda_runtime_client::RuntimeClient = &cli::HttpRuntimeClient::new(
let client = RuntimeClient::new(
config
.get_runtime_api_endpoint()
.expect("Could not get runtime endpoint"),
Expand Down
2 changes: 1 addition & 1 deletion lambda-runtime/tests/skeptic.rs
Original file line number Diff line number Diff line change
@@ -1 +1 @@
include!(concat!(env!("OUT_DIR"), "/skeptic-tests.rs"));
include!(concat!(env!("OUT_DIR"), "/skeptic-tests.rs"));