Skip to content
Prev Previous commit
Next Next commit
get_next_event uses a future loop_fn
  • Loading branch information
illicitonion committed Jan 6, 2019
commit 695333c296d176354ad25a068e8b3b8f2c97edec
1 change: 1 addition & 0 deletions lambda-runtime-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ pub struct EventContext {
}

/// Used by the Runtime to communicate with the internal endpoint.
#[derive(Clone)]
pub struct RuntimeClient {
http_client: Client<HttpConnector, Body>,
endpoint: String,
Expand Down
98 changes: 53 additions & 45 deletions lambda-runtime/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{error::Error, marker::PhantomData, result};
use lambda_runtime_client::RuntimeClient;
use serde;
use serde_json;
use tokio::prelude::future::{Future, IntoFuture};
use tokio::prelude::future::{Future, IntoFuture, loop_fn, Loop};
use tokio::runtime::Runtime as TokioRuntime;

use crate::{
Expand Down Expand Up @@ -79,7 +79,7 @@ macro_rules! lambda {
/// and spin up a new one for the next invocation.
pub(crate) fn start_with_config<E, O, C>(f: impl Handler<E, O>, config: &C, task_executor: TaskExecutor) -> impl Future<Item=(), Error=String>
where
E: serde::de::DeserializeOwned,
E: serde::de::DeserializeOwned + 'static,
O: serde::Serialize,
C: ConfigProvider,
{
Expand Down Expand Up @@ -129,7 +129,7 @@ pub(crate) fn start_with_runtime_client<E, O>(
func_settings: FunctionSettings,
client: RuntimeClient,
) -> impl Future<Item=(), Error=String> where
E: serde::de::DeserializeOwned,
E: serde::de::DeserializeOwned + 'static,
O: serde::Serialize,
{
let mut lambda_runtime: Runtime<_, E, O>;
Expand Down Expand Up @@ -195,7 +195,7 @@ impl<F, E, O> Runtime<F, E, O> {
impl<F, E, O> Runtime<F, E, O>
where
F: Handler<E, O>,
E: serde::de::DeserializeOwned,
E: serde::de::DeserializeOwned + 'static,
O: serde::Serialize,
{
/// Starts the main event loop and begin polling or new events. If one of the
Expand All @@ -204,7 +204,7 @@ where
fn start(&mut self) -> impl Future<Item=(), Error=String> {
debug!("Beginning main event loop");
loop {
let (event, ctx) = self.get_next_event(0, None);
let (event, ctx) = self.get_next_event().wait().unwrap();
let request_id = ctx.aws_request_id.clone();
info!("Received new event with AWS request id: {}", request_id);
let function_outcome = self.invoke(event, ctx);
Expand Down Expand Up @@ -279,53 +279,61 @@ where
/// unless the error throws is not recoverable.
///
/// # Return
/// The next `Event` object to be processed.
pub(super) fn get_next_event(&self, retries: i8, e: Option<RuntimeError>) -> (E, Context) {
if let Some(err) = e {
if retries > self.max_retries {
error!("Unrecoverable error while fetching next event: {}", err);
match err.request_id.clone() {
Some(req_id) => {
self.runtime_client
.event_error(req_id, &err).wait()
.expect("Could not send event error response");
}
None => {
self.runtime_client.fail_init(&err);
/// A `Future` resolving to the next `Event` object to be processed.
pub(super) fn get_next_event(&self) -> impl Future<Item=(E, Context), Error=String> {
let max_retries = self.max_retries;
let runtime_client = self.runtime_client.clone();
let settings = self.settings.clone();
loop_fn((0, None), move |(iteration, maybe_error): (i8, Option<RuntimeError>)| {
if let Some(err) = maybe_error {
if iteration > max_retries {
error!("Unrecoverable error while fetching next event: {}", err);
match err.request_id.clone() {
Some(req_id) => {
return Box::new(runtime_client
.event_error(req_id, &err)
.map_err(|e| format!("Could not send event error response: {}", e))
// these errors are not recoverable. Either we can't communicate with the runtime APIs
// or we cannot parse the event. panic to restart the environment.
.then(|_| Err("Could not retrieve next event".to_owned()))) as Box<dyn Future<Item=_, Error=_>>
}
None => {
runtime_client.fail_init(&err);
unreachable!();
}
}
}

// these errors are not recoverable. Either we can't communicate with the runtie APIs
// or we cannot parse the event. panic to restart the environment.
panic!("Could not retrieve next event");
}
}

match self.runtime_client.next_event().wait() {
Ok((ev_data, invocation_ctx)) => {
let parse_result = serde_json::from_slice(&ev_data);
match parse_result {
Ok(ev) => {
let mut handler_ctx = Context::new(self.settings.clone());
handler_ctx.invoked_function_arn = invocation_ctx.invoked_function_arn;
handler_ctx.aws_request_id = invocation_ctx.aws_request_id;
handler_ctx.xray_trace_id = invocation_ctx.xray_trace_id;
handler_ctx.client_context = invocation_ctx.client_context;
handler_ctx.identity = invocation_ctx.identity;
handler_ctx.deadline = invocation_ctx.deadline;
let settings = settings.clone();
Box::new(runtime_client.next_event().then(move |result| {
match result {
Ok((ev_data, invocation_ctx)) => {
let parse_result = serde_json::from_slice(&ev_data);
match parse_result {
Ok(ev) => {
let mut handler_ctx = Context::new(settings.clone());
handler_ctx.invoked_function_arn = invocation_ctx.invoked_function_arn;
handler_ctx.aws_request_id = invocation_ctx.aws_request_id;
handler_ctx.xray_trace_id = invocation_ctx.xray_trace_id;
handler_ctx.client_context = invocation_ctx.client_context;
handler_ctx.identity = invocation_ctx.identity;
handler_ctx.deadline = invocation_ctx.deadline;

(ev, handler_ctx)
}
Err(e) => {
error!("Could not parse event to type: {}", e);
let mut runtime_err = RuntimeError::from(e);
runtime_err.request_id = Option::from(invocation_ctx.aws_request_id);
self.get_next_event(retries + 1, Option::from(runtime_err))
Ok(Loop::Break((ev, handler_ctx)))
}
Err(e) => {
error!("Could not parse event to type: {}", e);
let mut runtime_err = RuntimeError::from(e);
runtime_err.request_id = Some(invocation_ctx.aws_request_id);
Ok(Loop::Continue((iteration + 1, Some(runtime_err))))
}
}
}
Err(e) => Ok(Loop::Continue((iteration + 1, Some(RuntimeError::from(e))))),
}
}
Err(e) => self.get_next_event(retries + 1, Option::from(RuntimeError::from(e))),
}
})) as Box<dyn Future<Item=_, Error=_>>
})
}
}

Expand Down