Skip to content
Prev Previous commit
Next Next commit
cargo fmt
  • Loading branch information
illicitonion committed Jan 6, 2019
commit f2ae75f408d9761af270a29a04d3291ca830c518
125 changes: 66 additions & 59 deletions lambda-runtime-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,28 +136,28 @@ impl RuntimeClient {

let http_client = Client::builder().executor(task_executor).build_http();

Ok(RuntimeClient {
http_client,
endpoint,
})
Ok(RuntimeClient { http_client, endpoint })
}
}

impl RuntimeClient {
/// Polls for new events to the Runtime APIs.
pub fn next_event(&self) -> impl Future<Item=(Vec<u8>, EventContext), Error=ApiError> {
pub fn next_event(&self) -> impl Future<Item = (Vec<u8>, EventContext), Error = ApiError> {
let uri = format!(
"http://{}/{}/runtime/invocation/next",
self.endpoint, RUNTIME_API_VERSION
).parse();
)
.parse();
trace!("Polling for next event");
let http_client = self.http_client.clone();
uri.into_future()
.map_err(ApiError::from)
.and_then(move |uri| http_client.get(uri).map_err(|e| {
error!("Error when fetching next event from Runtime API: {}", e);
ApiError::from(e)
}))
.and_then(move |uri| {
http_client.get(uri).map_err(|e| {
error!("Error when fetching next event from Runtime API: {}", e);
ApiError::from(e)
})
})
.and_then(|resp| {
if resp.status().is_client_error() {
error!(
Expand All @@ -179,7 +179,12 @@ impl RuntimeClient {
.clone());
}
return Ok((Self::get_event_context(&resp.headers())?, resp));
}).and_then(|(ctx, resp)| Ok(ctx).into_future().join(resp.into_body().concat2().map_err(Into::into)))
})
.and_then(|(ctx, resp)| {
Ok(ctx)
.into_future()
.join(resp.into_body().concat2().map_err(Into::into))
})
.map(|(ctx, body)| {
let buf = body.into_bytes().to_vec();

Expand All @@ -204,11 +209,12 @@ impl RuntimeClient {
///
/// # Returns
/// A `Future` object containing a either resolving () for success or an `error::ApiError` instance.
pub fn event_response(&self, request_id: String, output: Vec<u8>) -> impl Future<Item=(), Error=ApiError> {
pub fn event_response(&self, request_id: String, output: Vec<u8>) -> impl Future<Item = (), Error = ApiError> {
let uri = format!(
"http://{}/{}/runtime/invocation/{}/response",
self.endpoint, RUNTIME_API_VERSION, request_id
).parse();
)
.parse();
trace!(
"Posting response for request {} to Runtime API. Response length {} bytes",
request_id,
Expand All @@ -220,26 +226,26 @@ impl RuntimeClient {
.map(move |uri| Self::get_runtime_post_request(&uri, output))
.and_then(move |req| http_client.request(req).map_err(ApiError::from))
.then(move |result| match result {
Ok(resp) => {
if !resp.status().is_success() {
error!(
"Error from Runtime API when posting response for request {}: {}",
request_id,
resp.status()
);
return Err(ApiError::new(&format!(
"Error {} while sending response",
resp.status()
)));
Ok(resp) => {
if !resp.status().is_success() {
error!(
"Error from Runtime API when posting response for request {}: {}",
request_id,
resp.status()
);
return Err(ApiError::new(&format!(
"Error {} while sending response",
resp.status()
)));
}
trace!("Posted response to Runtime API for request {}", request_id);
Ok(())
}
trace!("Posted response to Runtime API for request {}", request_id);
Ok(())
}
Err(e) => {
error!("Error when calling runtime API for request {}: {}", request_id, e);
Err(ApiError::from(e))
}
})
Err(e) => {
error!("Error when calling runtime API for request {}: {}", request_id, e);
Err(ApiError::from(e))
}
})
}

/// Calls Lambda's Runtime APIs to send an error generated by the `Handler`. Because it's rust,
Expand All @@ -254,7 +260,7 @@ impl RuntimeClient {
///
/// # Returns
/// A `Future` object containing a either resolving () for success or an `error::ApiError` instance.
pub fn event_error(&self, request_id: String, e: &dyn RuntimeApiError) -> impl Future<Item=(), Error=ApiError> {
pub fn event_error(&self, request_id: String, e: &dyn RuntimeApiError) -> impl Future<Item = (), Error = ApiError> {
let uri = format!(
"http://{}/{}/runtime/invocation/{}/error",
self.endpoint, RUNTIME_API_VERSION, request_id
Expand All @@ -267,33 +273,34 @@ impl RuntimeClient {
.map_err(ApiError::from)
.map(move |uri| (Self::get_runtime_error_request(&uri, &response), response))
.and_then(move |(req, error_response)| {
trace!(
"Posting error to runtime API for request {}: {}",
request_id,
error_response.error_message
);
http_client.request(req).map_err(ApiError::from)
}).then(move |result| match result {
Ok(resp) => {
if !resp.status().is_success() {
error!(
"Error from Runtime API when posting error response for request {}: {}",
request_id2,
resp.status()
);
return Err(ApiError::new(&format!(
"Error {} while sending response",
resp.status()
)));
trace!(
"Posting error to runtime API for request {}: {}",
request_id,
error_response.error_message
);
http_client.request(req).map_err(ApiError::from)
})
.then(move |result| match result {
Ok(resp) => {
if !resp.status().is_success() {
error!(
"Error from Runtime API when posting error response for request {}: {}",
request_id2,
resp.status()
);
return Err(ApiError::new(&format!(
"Error {} while sending response",
resp.status()
)));
}
trace!("Posted error response for request id {}", request_id2);
Ok(())
}
trace!("Posted error response for request id {}", request_id2);
Ok(())
}
Err(e) => {
error!("Error when calling runtime API for request {}: {}", request_id2, e);
Err(ApiError::from(e))
}
})
Err(e) => {
error!("Error when calling runtime API for request {}: {}", request_id2, e);
Err(ApiError::from(e))
}
})
}

/// Calls the Runtime APIs to report a failure during the init process.
Expand Down
4 changes: 2 additions & 2 deletions lambda-runtime/examples/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::error::Error;
use lambda_runtime::{error::HandlerError, lambda, Context};
use serde_derive::{Deserialize, Serialize};
use simple_logger;
use tokio::prelude::future::{Future, ok};
use tokio::prelude::future::{ok, Future};

#[derive(Deserialize)]
struct CustomEvent {
Expand All @@ -23,7 +23,7 @@ fn main() -> Result<(), Box<dyn Error>> {
Ok(())
}

fn my_handler(e: CustomEvent, _c: Context) -> impl Future<Item=CustomOutput, Error=HandlerError> {
fn my_handler(e: CustomEvent, _c: Context) -> impl Future<Item = CustomOutput, Error = HandlerError> {
ok(format!("Hello, {}!", e.first_name))
.map(|message| format!("{} (modified in a Future)", message))
.map(|message| CustomOutput { message })
Expand Down
90 changes: 54 additions & 36 deletions lambda-runtime/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,38 @@ use std::{error::Error, marker::PhantomData, result};
use lambda_runtime_client::RuntimeClient;
use serde;
use serde_json;
use tokio::prelude::future::{Future, IntoFuture, loop_fn, Loop};
use tokio::prelude::future::{loop_fn, Future, IntoFuture, Loop};
use tokio::runtime::Runtime as TokioRuntime;

use crate::{
context::Context,
env::{ConfigProvider, EnvConfigProvider, FunctionSettings},
error::{HandlerError, RuntimeError},
};
use tokio::runtime::TaskExecutor;
use std::sync::Arc;
use std::sync::Mutex;
use tokio::runtime::TaskExecutor;

const MAX_RETRIES: i8 = 3;

/// Functions acting as a handler must conform to this type.
pub trait Handler<E, O>: Send {
/// Future of return value returned by handler.
type Future: Future<Item=O, Error=HandlerError> + Send;
type Future: Future<Item = O, Error = HandlerError> + Send;
/// IntoFuture of return value returned by handler.
type IntoFuture: IntoFuture<Future=Self::Future, Item=O, Error=HandlerError> + Send;
type IntoFuture: IntoFuture<Future = Self::Future, Item = O, Error = HandlerError> + Send;

/// Run the handler.
fn run(&mut self, event: E, ctx: Context) -> Self::IntoFuture;
}

impl<F, E, O: Send, Fut: Future<Item=O, Error=HandlerError> + Send, IntoFut: IntoFuture<Future=Fut, Item=O, Error=HandlerError> + Send> Handler<E, O> for F
impl<
F,
E,
O: Send,
Fut: Future<Item = O, Error = HandlerError> + Send,
IntoFut: IntoFuture<Future = Fut, Item = O, Error = HandlerError> + Send,
> Handler<E, O> for F
where
F: FnMut(E, Context) -> IntoFut + Send,
{
Expand All @@ -55,7 +61,9 @@ where
{
let mut runtime = runtime.unwrap_or_else(|| TokioRuntime::new().expect("Failed to start tokio runtime"));
let task_executor = runtime.executor();
runtime.block_on(start_with_config(f, &EnvConfigProvider::new(), task_executor)).unwrap();
runtime
.block_on(start_with_config(f, &EnvConfigProvider::new(), task_executor))
.unwrap();
}

#[macro_export]
Expand Down Expand Up @@ -87,7 +95,11 @@ macro_rules! lambda {
/// The function panics if the `ConfigProvider` returns an error from the `get_runtime_api_endpoint()`
/// or `get_function_settings()` methods. The panic forces AWS Lambda to terminate the environment
/// and spin up a new one for the next invocation.
pub(crate) fn start_with_config<E, O, C>(f: impl Handler<E, O>, config: &C, task_executor: TaskExecutor) -> impl Future<Item=(), Error=String> + Send
pub(crate) fn start_with_config<E, O, C>(
f: impl Handler<E, O>,
config: &C,
task_executor: TaskExecutor,
) -> impl Future<Item = (), Error = String> + Send
where
E: serde::de::DeserializeOwned + Send + 'static,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm think I'm missing why the event needs to be static—aren't we taking ownership of it here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're taking ownership of the instance of E - the 'static here is stating that that instance's lifetime doesn't depend on any other (non-static) lifetimes; otherwise the E may have a reference to some other data in it which could be dropped, which would mean the E instance was no longer valid. It's a bound on whether E itself can have borrows, rather than on how long the E instance actually lives.

O: serde::Serialize + Send,
Expand All @@ -114,9 +126,7 @@ where
}

match RuntimeClient::new(endpoint, task_executor) {
Ok(client) => {
start_with_runtime_client(f, function_config, client)
}
Ok(client) => start_with_runtime_client(f, function_config, client),
Err(e) => {
panic!("Could not create runtime client SDK: {}", e);
}
Expand All @@ -138,7 +148,8 @@ pub(crate) fn start_with_runtime_client<E, O>(
f: impl Handler<E, O>,
func_settings: FunctionSettings,
client: RuntimeClient,
) -> impl Future<Item=(), Error=String> + Send where
) -> impl Future<Item = (), Error = String> + Send
where
E: serde::de::DeserializeOwned + Send + 'static,
O: serde::Serialize + Send,
{
Expand Down Expand Up @@ -211,7 +222,7 @@ where
/// Starts the main event loop and begin polling or new events. If one of the
/// Runtime APIs returns an unrecoverable error this method calls the init failed
/// API and then panics.
fn start(&self) -> impl Future<Item=(), Error=String> + Send {
fn start(&self) -> impl Future<Item = (), Error = String> + Send {
debug!("Beginning main event loop");

let max_retries = self.max_retries;
Expand Down Expand Up @@ -310,31 +321,38 @@ where
///
/// # Return
/// A `Future` resolving to the next `Event` object to be processed.
pub(super) fn get_next_event(max_retries: i8, runtime_client: RuntimeClient, settings: FunctionSettings) -> impl Future<Item=(E, Context), Error=String> {
loop_fn((0, None), move |(iteration, maybe_error): (i8, Option<RuntimeError>)| {
if let Some(err) = maybe_error {
if iteration > max_retries {
error!("Unrecoverable error while fetching next event: {}", err);
match err.request_id.clone() {
Some(req_id) => {
return Box::new(runtime_client
.event_error(req_id, &err)
.map_err(|e| format!("Could not send event error response: {}", e))
// these errors are not recoverable. Either we can't communicate with the runtime APIs
// or we cannot parse the event. panic to restart the environment.
.then(|_| Err("Could not retrieve next event".to_owned()))) as Box<dyn Future<Item=_, Error=_> + Send>
}
None => {
runtime_client.fail_init(&err);
unreachable!();
pub(super) fn get_next_event(
max_retries: i8,
runtime_client: RuntimeClient,
settings: FunctionSettings,
) -> impl Future<Item = (E, Context), Error = String> {
loop_fn(
(0, None),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking of using tower-retry for retries in a slightly less ad-hoc way (e.g., https://gist.github.com/seanmonstar/2469a8e544ada2d31d13c8ee54f17d48). I'll talk to the Tower folks about getting them on crates.io. I've been doing that locally with some success.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll happily give this a go when the crate is published! :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good!

move |(iteration, maybe_error): (i8, Option<RuntimeError>)| {
if let Some(err) = maybe_error {
if iteration > max_retries {
error!("Unrecoverable error while fetching next event: {}", err);
match err.request_id.clone() {
Some(req_id) => {
return Box::new(
runtime_client
.event_error(req_id, &err)
.map_err(|e| format!("Could not send event error response: {}", e))
// these errors are not recoverable. Either we can't communicate with the runtime APIs
// or we cannot parse the event. panic to restart the environment.
.then(|_| Err("Could not retrieve next event".to_owned())),
) as Box<dyn Future<Item = _, Error = _> + Send>;
}
None => {
runtime_client.fail_init(&err);
unreachable!();
}
}
}
}
}

let settings = settings.clone();
Box::new(runtime_client.next_event().then(move |result| {
match result {
let settings = settings.clone();
Box::new(runtime_client.next_event().then(move |result| match result {
Ok((ev_data, invocation_ctx)) => {
let parse_result = serde_json::from_slice(&ev_data);
match parse_result {
Expand All @@ -358,9 +376,9 @@ where
}
}
Err(e) => Ok(Loop::Continue((iteration + 1, Some(RuntimeError::from(e))))),
}
})) as Box<dyn Future<Item=_, Error=_> + Send>
})
})) as Box<dyn Future<Item = _, Error = _> + Send>
},
)
}
}

Expand Down