Skip to content

Commit b787b70

Browse files
authored
Error reporting and retry fixes (awslabs#5)
* Updated header values for provided runtime and switched to ms time remaining * Fixed issue with deadline not being set in context and switched to realtime clock to get correct epoch * Removed libc dependency and switched to chrono for deadline calculation * Fixed docs example not to import HttpRuntimeClient * New error reporting method with correct content type and error type header. * Two fixes to honor max retries in the get_next_event method and call the error_reponse rather than init_fail when an error is thrown after the call to /next
1 parent 8c583c1 commit b787b70

File tree

3 files changed

+63
-35
lines changed

3 files changed

+63
-35
lines changed

lambda-runtime-client/src/client.rs

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use error::{ApiError, RuntimeApiError};
1+
use error::{ApiError, ErrorResponse, RuntimeApiError};
22
use hyper::{
33
client::HttpConnector,
44
header::{self, HeaderMap, HeaderValue},
@@ -11,6 +11,8 @@ use tokio::runtime::Runtime;
1111

1212
const RUNTIME_API_VERSION: &str = "2018-06-01";
1313
const API_CONTENT_TYPE: &str = "application/json";
14+
const API_ERROR_CONTENT_TYPE: &str = "application/vnd.aws.lambda.error+json";
15+
const RUNTIME_ERROR_HEADER: &str = "Lambda-Runtime-Function-Error-Type";
1416

1517
/// Enum of the headers returned by Lambda's `/next` API call.
1618
pub enum LambdaHeaders {
@@ -141,7 +143,8 @@ impl RuntimeClient {
141143
let uri = format!(
142144
"http://{}/{}/runtime/invocation/next",
143145
self.endpoint, RUNTIME_API_VERSION
144-
).parse()?;
146+
)
147+
.parse()?;
145148
trace!("Polling for next event");
146149

147150
// We wait instead of processing the future asynchronously because AWS Lambda
@@ -203,7 +206,8 @@ impl RuntimeClient {
203206
let uri: Uri = format!(
204207
"http://{}/{}/runtime/invocation/{}/response",
205208
self.endpoint, RUNTIME_API_VERSION, request_id
206-
).parse()?;
209+
)
210+
.parse()?;
207211
trace!(
208212
"Posting response for request {} to Runtime API. Response length {} bytes",
209213
request_id,
@@ -250,14 +254,14 @@ impl RuntimeClient {
250254
let uri: Uri = format!(
251255
"http://{}/{}/runtime/invocation/{}/error",
252256
self.endpoint, RUNTIME_API_VERSION, request_id
253-
).parse()?;
257+
)
258+
.parse()?;
254259
trace!(
255260
"Posting error to runtime API for request {}: {}",
256261
request_id,
257262
e.to_response().error_message
258263
);
259-
let err_body = serde_json::to_vec(&e.to_response()).expect("Could not serialize error object");
260-
let req = self.get_runtime_post_request(&uri, err_body);
264+
let req = self.get_runtime_error_request(&uri, &e.to_response());
261265

262266
match self.http_client.request(req).wait() {
263267
Ok(resp) => {
@@ -298,19 +302,19 @@ impl RuntimeClient {
298302
.parse()
299303
.expect("Could not generate Runtime URI");
300304
error!("Calling fail_init Runtime API: {}", e.to_response().error_message);
301-
let err_body = serde_json::to_vec(&e.to_response()).expect("Could not serialize error object");
302-
let req = self.get_runtime_post_request(&uri, err_body);
305+
let req = self.get_runtime_error_request(&uri, &e.to_response());
303306

304-
let resp = self
305-
.http_client
307+
self.http_client
306308
.request(req)
309+
.wait()
307310
.map_err(|e| {
308311
error!("Error while sending init failed message: {}", e);
309312
panic!("Error while sending init failed message: {}", e);
310-
}).map(|resp| {
313+
})
314+
.map(|resp| {
311315
info!("Successfully sent error response to the runtime API: {:?}", resp);
312-
});
313-
tokio::spawn(resp);
316+
})
317+
.expect("Could not complete init_fail request");
314318
}
315319

316320
/// Returns the endpoint configured for this HTTP Runtime client.
@@ -339,6 +343,20 @@ impl RuntimeClient {
339343
.unwrap()
340344
}
341345

346+
fn get_runtime_error_request(&self, uri: &Uri, e: &ErrorResponse) -> Request<Body> {
347+
let body = serde_json::to_vec(e).expect("Could not turn error object into response JSON");
348+
Request::builder()
349+
.method(Method::POST)
350+
.uri(uri.clone())
351+
.header(
352+
header::CONTENT_TYPE,
353+
header::HeaderValue::from_static(API_ERROR_CONTENT_TYPE),
354+
)
355+
.header(RUNTIME_ERROR_HEADER, HeaderValue::from_static("RuntimeError")) // TODO: We should add this code to the error object.
356+
.body(Body::from(body))
357+
.unwrap()
358+
}
359+
342360
/// Creates an `EventContext` object based on the response returned by the Runtime
343361
/// API `/next` endpoint.
344362
///
@@ -383,7 +401,8 @@ impl RuntimeClient {
383401
error!("Response headers do not contain deadline header");
384402
return Err(ApiError::new(&format!("Missing {} header", LambdaHeaders::Deadline)));
385403
}
386-
}.parse::<u128>()?;
404+
}
405+
.parse::<u128>()?;
387406

388407
let mut ctx = EventContext {
389408
aws_request_id,

lambda-runtime/src/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ use serde_json;
1515
pub struct RuntimeError {
1616
msg: String,
1717
stack_trace: Option<backtrace::Backtrace>,
18+
/// The request id that generated this error
19+
pub(crate) request_id: Option<String>,
1820
/// Whether the error is recoverable or not.
1921
pub(crate) recoverable: bool,
2022
}
@@ -58,6 +60,7 @@ impl RuntimeError {
5860
msg: String::from(msg),
5961
stack_trace: trace,
6062
recoverable: true,
63+
request_id: None,
6164
}
6265
}
6366
}

lambda-runtime/src/runtime.rs

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ where
174174
fn start(&self) {
175175
debug!("Beginning main event loop");
176176
loop {
177-
let (event, ctx) = self.get_next_event(0);
177+
let (event, ctx) = self.get_next_event(0, None);
178178
let request_id = ctx.aws_request_id.clone();
179179
info!("Received new event with AWS request id: {}", request_id);
180180
let function_outcome = self.invoke(event, ctx);
@@ -247,7 +247,27 @@ where
247247
///
248248
/// # Return
249249
/// The next `Event` object to be processed.
250-
pub(super) fn get_next_event(&self, retries: i8) -> (E, Context) {
250+
pub(super) fn get_next_event(&self, retries: i8, e: Option<RuntimeError>) -> (E, Context) {
251+
if let Some(err) = e {
252+
if retries > self.max_retries {
253+
error!("Unrecoverable error while fetching next event: {}", err);
254+
match err.request_id.clone() {
255+
Some(req_id) => {
256+
self.runtime_client
257+
.event_error(&req_id, &err)
258+
.expect("Could not send event error response");
259+
}
260+
None => {
261+
self.runtime_client.fail_init(&err);
262+
}
263+
}
264+
265+
// these errors are not recoverable. Either we can't communicate with the runtie APIs
266+
// or we cannot parse the event. panic to restart the environment.
267+
panic!("Could not retrieve next event");
268+
}
269+
}
270+
251271
match self.runtime_client.next_event() {
252272
Ok((ev_data, invocation_ctx)) => {
253273
let parse_result = serde_json::from_slice(&ev_data);
@@ -263,29 +283,15 @@ where
263283

264284
(ev, handler_ctx)
265285
}
266-
Err(e) => {
286+
Err(mut e) => {
267287
error!("Could not parse event to type: {}", e);
268-
self.get_next_event(retries + 1)
288+
let mut runtime_err = RuntimeError::from(e);
289+
runtime_err.request_id = Option::from(invocation_ctx.aws_request_id);
290+
self.get_next_event(retries + 1, Option::from(runtime_err))
269291
}
270292
}
271293
}
272-
Err(e) => {
273-
if !e.recoverable {
274-
error!("Unrecoverable error while fetching next event: {}", e);
275-
self.runtime_client.fail_init(&e);
276-
panic!("Could not retrieve next event");
277-
}
278-
279-
// if the error is recoverable we retry up to max_retries time
280-
if retries <= self.max_retries {
281-
//let next_retries = retries + 1;
282-
self.get_next_event(retries + 1)
283-
} else {
284-
error!("Exceeded maximum number of retries: {}", e);
285-
self.runtime_client.fail_init(&e);
286-
panic!("Could not retrieve next event");
287-
}
288-
}
294+
Err(e) => self.get_next_event(retries + 1, Option::from(RuntimeError::from(e))),
289295
}
290296
}
291297
}

0 commit comments

Comments
 (0)