Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
refactor init
motivation: make initialization logic more robust, allowing setup at contructor time and also async bootstrap

changes:
* break apart "initialization" into two parts:
  * optional throwing constructor (provider) that takes an EventLoop
  * optional BootstrappedLambdaHandler protocol that takes an EventLoop and returns async
* update core API and logic to support new initialization logic
* add tests to various initialization flows
  • Loading branch information
tomerd committed Mar 10, 2020
commit 3a9f98ac7186c4b2426ec30aa949df2d3ef65fe8
3 changes: 2 additions & 1 deletion Sources/SwiftAwsLambda/Lambda+Codable.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
//===----------------------------------------------------------------------===//

import Foundation // for JSON
import NIO

/// Extension to the `Lambda` companion to enable execution of Lambdas that take and return `Codable` payloads.
/// This is the most common way to use this library in AWS Lambda, since its JSON based.
Expand All @@ -26,7 +27,7 @@ extension Lambda {

// for testing
internal static func run<In: Decodable, Out: Encodable>(configuration: Configuration = .init(), closure: @escaping LambdaCodableClosure<In, Out>) -> LambdaLifecycleResult {
return self.run(handler: LambdaClosureWrapper(closure), configuration: configuration)
return self.run(configuration: configuration, handler: LambdaClosureWrapper(closure))
}
}

Expand Down
4 changes: 3 additions & 1 deletion Sources/SwiftAwsLambda/Lambda+String.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
//
//===----------------------------------------------------------------------===//

import NIO

/// Extension to the `Lambda` companion to enable execution of Lambdas that take and return `String` payloads.
extension Lambda {
/// Run a Lambda defined by implementing the `LambdaStringClosure` protocol.
Expand All @@ -23,7 +25,7 @@ extension Lambda {

// for testing
internal static func run(configuration: Configuration = .init(), _ closure: @escaping LambdaStringClosure) -> LambdaLifecycleResult {
return self.run(handler: LambdaClosureWrapper(closure), configuration: configuration)
return self.run(configuration: configuration, handler: LambdaClosureWrapper(closure))
}
}

Expand Down
99 changes: 65 additions & 34 deletions Sources/SwiftAwsLambda/Lambda.swift
Original file line number Diff line number Diff line change
Expand Up @@ -41,32 +41,47 @@ public enum Lambda {
self.run(handler: handler)
}

/// Run a Lambda defined by implementing the `LambdaHandler` protocol.
///
/// - note: This is a blocking operation that will run forever, as it's lifecycle is managed by the AWS Lambda Runtime Engine.
@inlinable
public static func run(_ provider: @escaping LambdaHandlerProvider) {
self.run(provider: provider)
}

// for testing and internal use
@usableFromInline
@discardableResult
internal static func run(configuration: Configuration = .init(), closure: @escaping LambdaClosure) -> LambdaLifecycleResult {
return self.run(handler: LambdaClosureWrapper(closure), configuration: configuration)
return self.run(configuration: configuration, handler: LambdaClosureWrapper(closure))
}

// for testing and internal use
@usableFromInline
@discardableResult
internal static func run(handler: LambdaHandler, configuration: Configuration = .init()) -> LambdaLifecycleResult {
internal static func run(configuration: Configuration = .init(), handler: LambdaHandler) -> LambdaLifecycleResult {
return self.run(configuration: configuration, provider: { _ in handler })
}

// for testing and internal use
@usableFromInline
@discardableResult
internal static func run(configuration: Configuration = .init(), provider: @escaping LambdaHandlerProvider) -> LambdaLifecycleResult {
do {
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) // only need one thread, will improve performance
defer { try! eventLoopGroup.syncShutdownGracefully() }
let result = try self.runAsync(eventLoopGroup: eventLoopGroup, handler: handler, configuration: configuration).wait()
let result = try self.runAsync(eventLoopGroup: eventLoopGroup, configuration: configuration, provider: provider).wait()
return .success(result)
} catch {
return .failure(error)
}
}

internal static func runAsync(eventLoopGroup: EventLoopGroup, handler: LambdaHandler, configuration: Configuration) -> EventLoopFuture<Int> {
internal static func runAsync(eventLoopGroup: EventLoopGroup, configuration: Configuration, provider: @escaping LambdaHandlerProvider) -> EventLoopFuture<Int> {
Backtrace.install()
var logger = Logger(label: "Lambda")
logger.logLevel = configuration.general.logLevel
let lifecycle = Lifecycle(eventLoop: eventLoopGroup.next(), logger: logger, configuration: configuration, handler: handler)
let lifecycle = Lifecycle(eventLoop: eventLoopGroup.next(), logger: logger, configuration: configuration, provider: provider)
let signalSource = trap(signal: configuration.lifecycle.stopSignal) { signal in
logger.info("intercepted signal: \(signal)")
lifecycle.stop()
Expand Down Expand Up @@ -132,31 +147,33 @@ public enum Lambda {
private let eventLoop: EventLoop
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should move this Lambda.Lifecycle into another file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do in another PR

private let logger: Logger
private let configuration: Configuration
private let handler: LambdaHandler
private let provider: LambdaHandlerProvider

private var _state = LifecycleState.idle
private var _state = State.idle
private let stateLock = Lock()

init(eventLoop: EventLoop, logger: Logger, configuration: Configuration, handler: LambdaHandler) {
init(eventLoop: EventLoop, logger: Logger, configuration: Configuration, provider: @escaping LambdaHandlerProvider) {
self.eventLoop = eventLoop
self.logger = logger
self.configuration = configuration
self.handler = handler
self.provider = provider
}

deinit {
precondition(self.state == .shutdown, "invalid state \(self.state)")
guard case .shutdown = self.state else {
preconditionFailure("invalid state \(self.state)")
}
}

private var state: LifecycleState {
private var state: State {
get {
return self.stateLock.withLock {
self._state
}
}
set {
self.stateLock.withLockVoid {
precondition(newValue.rawValue > _state.rawValue, "invalid state \(newValue) after \(self._state)")
precondition(newValue.order > _state.order, "invalid state \(newValue) after \(self._state)")
self._state = newValue
}
}
Expand All @@ -167,10 +184,10 @@ public enum Lambda {
self.state = .initializing
var logger = self.logger
logger[metadataKey: "lifecycleId"] = .string(self.configuration.lifecycle.id)
let runner = LambdaRunner(eventLoop: self.eventLoop, configuration: self.configuration, lambdaHandler: self.handler)
return runner.initialize(logger: logger).flatMap { _ in
self.state = .active
return self.run(runner: runner)
let runner = LambdaRunner(eventLoop: self.eventLoop, configuration: self.configuration)
return runner.initialize(logger: logger, provider: self.provider).flatMap { handler in
self.state = .active(runner, handler)
return self.run()
}
}

Expand All @@ -185,18 +202,18 @@ public enum Lambda {
}

@inline(__always)
private func run(runner: LambdaRunner) -> EventLoopFuture<Int> {
private func run() -> EventLoopFuture<Int> {
let promise = self.eventLoop.makePromise(of: Int.self)

func _run(_ count: Int) {
switch self.state {
case .active:
case .active(let runner, let handler):
if self.configuration.lifecycle.maxTimes > 0, count >= self.configuration.lifecycle.maxTimes {
return promise.succeed(count)
}
var logger = self.logger
logger[metadataKey: "lifecycleIteration"] = "\(count)"
runner.run(logger: logger).whenComplete { result in
runner.run(logger: logger, handler: handler).whenComplete { result in
switch result {
case .success:
// recursive! per aws lambda runtime spec the polling requests are to be done one at a time
Expand All @@ -216,6 +233,29 @@ public enum Lambda {

return promise.futureResult
}

private enum State {
case idle
case initializing
case active(LambdaRunner, LambdaHandler)
case stopping
case shutdown

internal var order: Int {
switch self {
case .idle:
return 0
case .initializing:
return 1
case .active:
return 2
case .stopping:
return 3
case .shutdown:
return 4
}
}
}
}

@usableFromInline
Expand Down Expand Up @@ -293,14 +333,6 @@ public enum Lambda {
return "\(Configuration.self)\n \(self.general))\n \(self.lifecycle)\n \(self.runtimeEngine)"
}
}

private enum LifecycleState: Int {
case idle
case initializing
case active
case stopping
case shutdown
}
}

/// A result type for a Lambda that returns a `[UInt8]`.
Expand All @@ -317,18 +349,17 @@ public typealias LambdaInitResult = Result<Void, Error>
/// A callback to provide the result of Lambda initialization.
public typealias LambdaInitCallBack = (LambdaInitResult) -> Void

public typealias LambdaHandlerProvider = (EventLoop) throws -> LambdaHandler

/// A processing protocol for a Lambda that takes a `[UInt8]` and returns a `LambdaResult` result type asynchronously.
public protocol LambdaHandler {
/// Initializes the `LambdaHandler`.
func initialize(callback: @escaping LambdaInitCallBack)
/// Handles the Lambda request.
func handle(context: Lambda.Context, payload: [UInt8], callback: @escaping LambdaCallback)
}

extension LambdaHandler {
@inlinable
public func initialize(callback: @escaping LambdaInitCallBack) {
callback(.success(()))
}
public protocol BootstrappedLambdaHandler: LambdaHandler {
/// Bootstraps the `LambdaHandler`.
func bootstrap(callback: @escaping LambdaInitCallBack)
}

@usableFromInline
Expand Down
59 changes: 37 additions & 22 deletions Sources/SwiftAwsLambda/LambdaRunner.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,41 @@ import NIO
/// LambdaRunner manages the Lambda runtime workflow, or business logic.
internal struct LambdaRunner {
private let runtimeClient: LambdaRuntimeClient
private let lambdaHandler: LambdaHandler
private let eventLoop: EventLoop
private let lifecycleId: String
private let offload: Bool

init(eventLoop: EventLoop, configuration: Lambda.Configuration, lambdaHandler: LambdaHandler) {
init(eventLoop: EventLoop, configuration: Lambda.Configuration) {
self.eventLoop = eventLoop
self.runtimeClient = LambdaRuntimeClient(eventLoop: self.eventLoop, configuration: configuration.runtimeEngine)
self.lambdaHandler = lambdaHandler
self.lifecycleId = configuration.lifecycle.id
self.offload = configuration.runtimeEngine.offload
}

/// Run the user provided initializer. This *must* only be called once.
///
/// - Returns: An `EventLoopFuture<Void>` fulfilled with the outcome of the initialization.
func initialize(logger: Logger) -> EventLoopFuture<Void> {
/// - Returns: An `EventLoopFuture<LambdaHandler>` fulfilled with the outcome of the initialization.
func initialize(logger: Logger, provider: @escaping LambdaHandlerProvider) -> EventLoopFuture<LambdaHandler> {
logger.debug("initializing lambda")
// We need to use `flatMap` instead of `whenFailure` to ensure we complete reporting the result before stopping.
return self.lambdaHandler.initialize(eventLoop: self.eventLoop,
lifecycleId: self.lifecycleId,
offload: self.offload).peekError { error in

let future: EventLoopFuture<LambdaHandler>
do {
// 1. craete the handler from the provider
let handler = try provider(self.eventLoop)
// 2. bootstrap if needed
if let handler = handler as? BootstrappedLambdaHandler {
future = handler.bootstrap(eventLoop: self.eventLoop,
lifecycleId: self.lifecycleId,
offload: self.offload).map { handler }
} else {
future = self.eventLoop.makeSucceededFuture(handler)
}
} catch {
future = self.eventLoop.makeFailedFuture(error)
}

// 3. report initialization error if one occured
return future.peekError { error in
self.runtimeClient.reportInitializationError(logger: logger, error: error).peekError { reportingError in
// We're going to bail out because the init failed, so there's not a lot we can do other than log
// that we couldn't report this error back to the runtime.
Expand All @@ -49,24 +62,24 @@ internal struct LambdaRunner {
}
}

func run(logger: Logger) -> EventLoopFuture<Void> {
func run(logger: Logger, handler: LambdaHandler) -> EventLoopFuture<Void> {
logger.debug("lambda invocation sequence starting")
// 1. request work from lambda runtime engine
return self.runtimeClient.requestWork(logger: logger).peekError { error in
logger.error("could not fetch work from lambda runtime engine: \(error)")
}.flatMap { invocation, payload in
// 2. send work to handler
let context = Lambda.Context(logger: logger, eventLoop: self.eventLoop, invocation: invocation)
logger.debug("sending work to lambda handler \(self.lambdaHandler)")
logger.debug("sending work to lambda handler \(handler)")

// TODO: This is just for now, so that we can work with ByteBuffers only
// in the LambdaRuntimeClient
let bytes = [UInt8](payload.readableBytesView)
return self.lambdaHandler.handle(eventLoop: self.eventLoop,
lifecycleId: self.lifecycleId,
offload: self.offload,
context: context,
payload: bytes)
return handler.handle(eventLoop: self.eventLoop,
lifecycleId: self.lifecycleId,
offload: self.offload,
context: context,
payload: bytes)
.map {
// TODO: This mapping shall be removed as soon as the LambdaHandler protocol
// works with ByteBuffer? instead of [UInt8]
Expand All @@ -93,24 +106,26 @@ internal struct LambdaRunner {
}
}

private extension LambdaHandler {
func initialize(eventLoop: EventLoop, lifecycleId: String, offload: Bool) -> EventLoopFuture<Void> {
// offloading so user code never blocks the eventloop
private extension BootstrappedLambdaHandler {
func bootstrap(eventLoop: EventLoop, lifecycleId: String, offload: Bool) -> EventLoopFuture<Void> {
let promise = eventLoop.makePromise(of: Void.self)
if offload {
// offloading so user code never blocks the eventloop
DispatchQueue(label: "lambda-\(lifecycleId)").async {
self.initialize { promise.completeWith($0) }
self.bootstrap { promise.completeWith($0) }
}
} else {
self.initialize { promise.completeWith($0) }
self.bootstrap { promise.completeWith($0) }
}
return promise.futureResult
}
}

private extension LambdaHandler {
func handle(eventLoop: EventLoop, lifecycleId: String, offload: Bool, context: Lambda.Context, payload: [UInt8]) -> EventLoopFuture<LambdaResult> {
// offloading so user code never blocks the eventloop
let promise = eventLoop.makePromise(of: LambdaResult.self)
if offload {
// offloading so user code never blocks the eventloop
DispatchQueue(label: "lambda-\(lifecycleId)").async {
self.handle(context: context, payload: payload) { result in
promise.succeed(result)
Expand Down
6 changes: 4 additions & 2 deletions Tests/SwiftAwsLambdaTests/Lambda+CodeableTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ class CodableLambdaTest: XCTestCase {
callback(.success(Response(requestId: payload.requestId)))
}
}

let maxTimes = Int.random(in: 1 ... 10)
let configuration = Lambda.Configuration(lifecycle: .init(maxTimes: maxTimes))
let result = Lambda.run(handler: Handler(), configuration: configuration)
let result = Lambda.run(configuration: configuration, handler: Handler())
assertLambdaLifecycleResult(result, shoudHaveRun: maxTimes)
}

Expand All @@ -43,9 +44,10 @@ class CodableLambdaTest: XCTestCase {
callback(.failure(TestError("boom")))
}
}

let maxTimes = Int.random(in: 1 ... 10)
let configuration = Lambda.Configuration(lifecycle: .init(maxTimes: maxTimes))
let result = Lambda.run(handler: Handler(), configuration: configuration)
let result = Lambda.run(configuration: configuration, handler: Handler())
assertLambdaLifecycleResult(result, shoudHaveRun: maxTimes)
}

Expand Down
6 changes: 4 additions & 2 deletions Tests/SwiftAwsLambdaTests/Lambda+StringTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ class StringLambdaTest: XCTestCase {
callback(.success(payload))
}
}

let maxTimes = Int.random(in: 1 ... 10)
let configuration = Lambda.Configuration(lifecycle: .init(maxTimes: maxTimes))
let result = Lambda.run(handler: Handler(), configuration: configuration)
let result = Lambda.run(configuration: configuration, handler: Handler())
assertLambdaLifecycleResult(result, shoudHaveRun: maxTimes)
}

Expand All @@ -43,9 +44,10 @@ class StringLambdaTest: XCTestCase {
callback(.failure(TestError("boom")))
}
}

let maxTimes = Int.random(in: 1 ... 10)
let configuration = Lambda.Configuration(lifecycle: .init(maxTimes: maxTimes))
let result = Lambda.run(handler: Handler(), configuration: configuration)
let result = Lambda.run(configuration: configuration, handler: Handler())
assertLambdaLifecycleResult(result, shoudHaveRun: maxTimes)
}

Expand Down
Loading