Skip to content
Next Next commit
RuntimeClient::next_event returns a Future
  • Loading branch information
illicitonion committed Jan 6, 2019
commit 2f7eb744229ed7025a702c3312206806f2b2de5c
39 changes: 18 additions & 21 deletions lambda-runtime-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use hyper::{
};
use serde_derive::Deserialize;
use serde_json;
use tokio::prelude::future::IntoFuture;
use tokio::runtime::Runtime;

use crate::error::{ApiError, ErrorResponse, RuntimeApiError};
Expand Down Expand Up @@ -149,20 +150,20 @@ impl RuntimeClient {

impl RuntimeClient {
/// Polls for new events to the Runtime APIs.
pub fn next_event(&self) -> Result<(Vec<u8>, EventContext), ApiError> {
pub fn next_event(&self) -> impl Future<Item=(Vec<u8>, EventContext), Error=ApiError> {
let uri = format!(
"http://{}/{}/runtime/invocation/next",
self.endpoint, RUNTIME_API_VERSION
)
.parse()?;
).parse();
trace!("Polling for next event");

// We wait instead of processing the future asynchronously because AWS Lambda
// itself enforces only one event per container at a time. No point in taking on
// the additional complexity.
let out = self.http_client.get(uri).wait();
match out {
Ok(resp) => {
let http_client = self.http_client.clone();
uri.into_future()
.map_err(ApiError::from)
.and_then(move |uri| http_client.get(uri).map_err(|e| {
error!("Error when fetching next event from Runtime API: {}", e);
ApiError::from(e)
}))
.and_then(|resp| {
if resp.status().is_client_error() {
error!(
"Runtime API returned client error when polling for new events: {}",
Expand All @@ -182,22 +183,18 @@ impl RuntimeClient {
.unrecoverable()
.clone());
}
let ctx = self.get_event_context(&resp.headers())?;
let out = resp.into_body().concat2().wait()?;
let buf: Vec<u8> = out.into_bytes().to_vec();
return Ok((Self::get_event_context(&resp.headers())?, resp));
}).and_then(|(ctx, resp)| Ok(ctx).into_future().join(resp.into_body().concat2().map_err(Into::into)))
.map(|(ctx, body)| {
let buf = body.into_bytes().to_vec();

trace!(
"Received new event for request id {}. Event length {} bytes",
ctx.aws_request_id,
buf.len()
);
Ok((buf, ctx))
}
Err(e) => {
error!("Error when fetching next event from Runtime API: {}", e);
Err(ApiError::from(e))
}
}
(buf, ctx)
})
}

/// Calls the Lambda Runtime APIs to submit a response to an event. In this function we treat
Expand Down Expand Up @@ -378,7 +375,7 @@ impl RuntimeClient {
/// A `Result` containing the populated `EventContext` or an `ApiError` if the required headers
/// were not present or the client context and cognito identity could not be parsed from the
/// JSON string.
fn get_event_context(&self, headers: &HeaderMap<HeaderValue>) -> Result<EventContext, ApiError> {
fn get_event_context(headers: &HeaderMap<HeaderValue>) -> Result<EventContext, ApiError> {
// let headers = resp.headers();

let aws_request_id = match headers.get(LambdaHeaders::RequestId.as_str()) {
Expand Down
4 changes: 3 additions & 1 deletion lambda-runtime-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
//!
//! ```rust,no_run
//! extern crate lambda_runtime_client;
//! extern crate tokio;
//! #[macro_use]
//! extern crate serde_derive;
//! extern crate serde_json;
//!
//! use lambda_runtime_client::{RuntimeClient, EventContext};
//! use tokio::prelude::future::Future;
//!
//! #[derive(Serialize, Deserialize, Debug)]
//! struct CustomEvent {
Expand All @@ -35,7 +37,7 @@
//! let client = RuntimeClient::new(runtime_endpoint, None)
//! .expect("Could not initialize client");
//!
//! let (event_data, event_context) = client.next_event()
//! let (event_data, event_context) = client.next_event().wait()
//! .expect("Could not retrieve next event");
//! let custom_event: CustomEvent = serde_json::from_slice(&event_data)
//! .expect("Could not turn Vec<u8> into CustomEvent object");
Expand Down
3 changes: 2 additions & 1 deletion lambda-runtime/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{error::Error, marker::PhantomData, result};
use lambda_runtime_client::RuntimeClient;
use serde;
use serde_json;
use tokio::prelude::future::Future;
use tokio::runtime::Runtime as TokioRuntime;

use crate::{
Expand Down Expand Up @@ -294,7 +295,7 @@ where
}
}

match self.runtime_client.next_event() {
match self.runtime_client.next_event().wait() {
Ok((ev_data, invocation_ctx)) => {
let parse_result = serde_json::from_slice(&ev_data);
match parse_result {
Expand Down