-
Notifications
You must be signed in to change notification settings - Fork 379
Handlers can be async #68
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Result implements IntoFuture, so this is backwards compatible, assuming Handler (and its Event type) are Send and 'static, which they almost certainly always are.
|
Hi @illicitonion, thanks for the PR. We also planned to make the whole runtime async eventually. We were holding off for now because we have a pretty significant refactor in progress in #63. I'll @davidbarsky chime in here since he owns the async RFC. |
|
Thanks for cutting this CR! I think this is an excellent start from my cursory overview. I was also looking at replacing the panics with a top-level panic handler, but fell down a rabbit hole with Tower integration. |
README.md
Outdated
|
|
||
| `Handler` provides a default implementation that enables you to provide a Rust closure or function pointer to the `lambda!()` macro. | ||
|
|
||
| If your handler is synchronous, you can just return a `Result` from it; if your handler is asynchronous, you can return a `Future` from it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for these docs! Small phrasing nit—I'd prefer avoiding using “just” in documentation. I'd instead emphasize that due to the IntoFuture implementation on Result, most synchronous handlers won't need to make any code changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rephrased
|
|
||
| /// Functions serving as ALB and API Gateway handlers must conform to this type. | ||
| pub trait Handler<R> { | ||
| pub trait Handler<R>: Send { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is Send not inferred on Handler?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because Handler is a trait, there's no way for the type system to know that an implementation of it doesn't have some non-Send fields. We can either add the bound here to require that implementors are Send, or we can add a + Send bound in each of the places that requires Handler be Send, but doing the latter would require adding the bound to several core functions, and would lead to more confusing error messages if you had a non-Send trait.
| /// # Panics | ||
| /// The function panics if the Lambda environment variables are not set. | ||
| pub fn start<R>(f: impl Handler<R>, runtime: Option<TokioRuntime>) | ||
| pub fn start<R>(f: impl Handler<R> + 'static, runtime: Option<TokioRuntime>) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two things:
- I'm not completely sure that
impl Handlerneeds to be&'static. I wonder ifstart<R>can be refactored to take animpl Futureinstead of animpl Handler<R>. - If we're able to get rid of all
.waits()inside the client, we can remove the optionalTokioRuntimeparameter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll hold off on looking at this until #63 merges, if that's ok
lambda-runtime-client/src/client.rs
Outdated
| /// Creates a new instance of the Runtime APIclient SDK. The http client has timeouts disabled and | ||
| /// will always send a `Connection: keep-alive` header. | ||
| pub fn new(endpoint: String, runtime: Option<Runtime>) -> Result<Self, ApiError> { | ||
| pub fn new(endpoint: String, task_executor: TaskExecutor) -> Result<Self, ApiError> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The signature can be changed to just return Self—we're not returning ApiError anywhere now (nice job that!)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| resp.status() | ||
| ))); | ||
| let http_client = self.http_client.clone(); | ||
| uri.into_future() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't know that Uri had an IntoFuture implementation. Can I ask why you opted for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think uri here is a Result and all Result types get a free IntoFuture impl. Since the return type changed, the ? needed to be removed above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah; I could change this to future::done(uri) if that's more clear?
| where | ||
| E: serde::de::DeserializeOwned, | ||
| O: serde::Serialize, | ||
| E: serde::de::DeserializeOwned + Send + 'static, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm think I'm missing why the event needs to be static—aren't we taking ownership of it here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're taking ownership of the instance of E - the 'static here is stating that that instance's lifetime doesn't depend on any other (non-static) lifetimes; otherwise the E may have a reference to some other data in it which could be dropped, which would mean the E instance was no longer valid. It's a bound on whether E itself can have borrows, rather than on how long the E instance actually lives.
| runtime_client: client, | ||
| settings: config, | ||
| handler: f, | ||
| handler: Arc::new(Mutex::new(f)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not the biggest fan of having a mutex here. Can you elaborate why it's necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because run takes a &mut self, we're only allowed to call run from one thread at a time. This Mutex is how we guarantee that multiple threads don't call run.
If we wanted to drop the Mutex, we would need to either:
- Change
runfrom taking&mut selfto just taking&self(this would mean we could only implement it forFnnot forFnMutlike we currently do). This would be easy, but an API change. - Somehow guarantee that we only have one thread which calls a
Handler. This is what we were doing before (by blocking the executions); if we wanted to do this, we could probably use some kind of same thread executor, but I imagine that would be awkward for actually writing handlers, because they couldn't necessarily rely on the runtime they submit tasks to being able to do more than one thing at a time.
| F: Handler<E, O>, | ||
| E: serde::de::DeserializeOwned, | ||
| O: serde::Serialize, | ||
| E: serde::de::DeserializeOwned + Send + 'static, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See earlier comments about the 'static bound w.r.t. ownership.
| let settings = self.settings.clone(); | ||
| let handler = self.handler.clone(); | ||
|
|
||
| loop_fn((), move |()| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might have the opportunity to refactor this chunk of code into a bunch of smaller and more methods & futures.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this could very reasonably become a series of nicely tightly scoped maps and and_thens in a follow-up change :)
| settings: FunctionSettings, | ||
| ) -> impl Future<Item = (E, Context), Error = String> { | ||
| loop_fn( | ||
| (0, None), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking of using tower-retry for retries in a slightly less ad-hoc way (e.g., https://gist.github.com/seanmonstar/2469a8e544ada2d31d13c8ee54f17d48). I'll talk to the Tower folks about getting them on crates.io. I've been doing that locally with some success.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll happily give this a go when the crate is published! :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good!
|
Futures retry exists and doesn't depend on anything but the futures crate https://mexus.gitlab.io/futures-retry/futures_retry/ |
|
Any progress on this? |
|
Sorry, didn't saw this PR before making mine... |
|
@nappa85 you're fine! don't worry about it. |
No worries @nappa85! I haven't had time to fix this up, and something that's done and works is definitely better than something that's part-finished! :) |
|
I think this can be closed now. First class async support now exists in master |
Fixes #14
Handlers now return an
IntoFuture(whichResultimplements), so that async handlers can be written. In particular, this is useful for usingrusoto, because it means that its Futures will be executed on the tokio runtime.This is mostly backwards compatible, because
ResultimplementsIntoFuture, but it adds aSend + 'staticbound to a few types (notablyHandler,Event) so should probably be considered a breaking change.It also stops us calling
wait()everywhere except the top-level, but it tries to make minimally invasive changes. At some point I'd be tempted to change all of the inlinepanic!calls to be Future error returns, and handle them at the top-level, rather thanpanic!ing all over the place.Each commit is useful to separately review, as they are separated out building blocks. Sorry that the example I added is so contrived...!
By submitting this pull request