Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
244 changes: 0 additions & 244 deletions Sources/SwiftAwsLambda/Lambda.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@ import Darwin.C
#endif

import Backtrace
import Dispatch
import Logging
import NIO
import NIOConcurrencyHelpers

public enum Lambda {
/// Run a Lambda defined by implementing the `LambdaClosure` closure.
Expand Down Expand Up @@ -113,248 +111,6 @@ public enum Lambda {
signalSource.cancel()
}
}

public class Context {
/// The request ID, which identifies the request that triggered the function invocation.
public let requestId: String

/// The AWS X-Ray tracing header.
public let traceId: String

/// The ARN of the Lambda function, version, or alias that's specified in the invocation.
public let invokedFunctionArn: String

/// The timestamp that the function times out
public let deadline: DispatchWallTime

/// For invocations from the AWS Mobile SDK, data about the Amazon Cognito identity provider.
public let cognitoIdentity: String?

/// For invocations from the AWS Mobile SDK, data about the client application and device.
public let clientContext: String?

/// a logger to log
public let logger: Logger

internal init(requestId: String,
traceId: String,
invokedFunctionArn: String,
deadline: DispatchWallTime,
cognitoIdentity: String? = nil,
clientContext: String? = nil,
logger: Logger) {
self.requestId = requestId
self.traceId = traceId
self.invokedFunctionArn = invokedFunctionArn
self.cognitoIdentity = cognitoIdentity
self.clientContext = clientContext
self.deadline = deadline
// mutate logger with context
var logger = logger
logger[metadataKey: "awsRequestId"] = .string(requestId)
logger[metadataKey: "awsTraceId"] = .string(traceId)
self.logger = logger
}

public func getRemainingTime() -> TimeAmount {
let deadline = self.deadline.millisSinceEpoch
let now = DispatchWallTime.now().millisSinceEpoch

let remaining = deadline - now
return .milliseconds(remaining)
}
}

private final class Lifecycle {
private let eventLoop: EventLoop
private let logger: Logger
private let configuration: Configuration
private let factory: LambdaHandlerFactory

private var _state = State.idle
private let stateLock = Lock()

init(eventLoop: EventLoop, logger: Logger, configuration: Configuration, factory: @escaping LambdaHandlerFactory) {
self.eventLoop = eventLoop
self.logger = logger
self.configuration = configuration
self.factory = factory
}

deinit {
guard case .shutdown = self.state else {
preconditionFailure("invalid state \(self.state)")
}
}

private var state: State {
get {
return self.stateLock.withLock {
self._state
}
}
set {
self.stateLock.withLockVoid {
precondition(newValue.order > _state.order, "invalid state \(newValue) after \(self._state)")
self._state = newValue
}
}
}

func start() -> EventLoopFuture<Int> {
logger.info("lambda lifecycle starting with \(self.configuration)")
self.state = .initializing
var logger = self.logger
logger[metadataKey: "lifecycleId"] = .string(self.configuration.lifecycle.id)
let runner = LambdaRunner(eventLoop: self.eventLoop, configuration: self.configuration)
return runner.initialize(logger: logger, factory: self.factory).flatMap { handler in
self.state = .active(runner, handler)
return self.run()
}
}

func stop() {
self.logger.debug("lambda lifecycle stopping")
self.state = .stopping
}

func shutdown() {
self.logger.debug("lambda lifecycle shutdown")
self.state = .shutdown
}

@inline(__always)
private func run() -> EventLoopFuture<Int> {
let promise = self.eventLoop.makePromise(of: Int.self)

func _run(_ count: Int) {
switch self.state {
case .active(let runner, let handler):
if self.configuration.lifecycle.maxTimes > 0, count >= self.configuration.lifecycle.maxTimes {
return promise.succeed(count)
}
var logger = self.logger
logger[metadataKey: "lifecycleIteration"] = "\(count)"
runner.run(logger: logger, handler: handler).whenComplete { result in
switch result {
case .success:
// recursive! per aws lambda runtime spec the polling requests are to be done one at a time
_run(count + 1)
case .failure(let error):
promise.fail(error)
}
}
case .stopping, .shutdown:
promise.succeed(count)
default:
preconditionFailure("invalid run state: \(self.state)")
}
}

_run(0)

return promise.futureResult
}

private enum State {
case idle
case initializing
case active(LambdaRunner, LambdaHandler)
case stopping
case shutdown

internal var order: Int {
switch self {
case .idle:
return 0
case .initializing:
return 1
case .active:
return 2
case .stopping:
return 3
case .shutdown:
return 4
}
}
}
}

@usableFromInline
internal struct Configuration: CustomStringConvertible {
let general: General
let lifecycle: Lifecycle
let runtimeEngine: RuntimeEngine

@usableFromInline
init() {
self.init(general: .init(), lifecycle: .init(), runtimeEngine: .init())
}

init(general: General? = nil, lifecycle: Lifecycle? = nil, runtimeEngine: RuntimeEngine? = nil) {
self.general = general ?? General()
self.lifecycle = lifecycle ?? Lifecycle()
self.runtimeEngine = runtimeEngine ?? RuntimeEngine()
}

struct General: CustomStringConvertible {
let logLevel: Logger.Level

init(logLevel: Logger.Level? = nil) {
self.logLevel = logLevel ?? env("LOG_LEVEL").flatMap(Logger.Level.init) ?? .info
}

var description: String {
return "\(General.self)(logLevel: \(self.logLevel))"
}
}

struct Lifecycle: CustomStringConvertible {
let id: String
let maxTimes: Int
let stopSignal: Signal

init(id: String? = nil, maxTimes: Int? = nil, stopSignal: Signal? = nil) {
self.id = id ?? "\(DispatchTime.now().uptimeNanoseconds)"
self.maxTimes = maxTimes ?? env("MAX_REQUESTS").flatMap(Int.init) ?? 0
self.stopSignal = stopSignal ?? env("STOP_SIGNAL").flatMap(Int32.init).flatMap(Signal.init) ?? Signal.TERM
precondition(self.maxTimes >= 0, "maxTimes must be equal or larger than 0")
}

var description: String {
return "\(Lifecycle.self)(id: \(self.id), maxTimes: \(self.maxTimes), stopSignal: \(self.stopSignal))"
}
}

struct RuntimeEngine: CustomStringConvertible {
let ip: String
let port: Int
let keepAlive: Bool
let requestTimeout: TimeAmount?
let offload: Bool

init(baseURL: String? = nil, keepAlive: Bool? = nil, requestTimeout: TimeAmount? = nil, offload: Bool? = nil) {
let ipPort = env("AWS_LAMBDA_RUNTIME_API")?.split(separator: ":") ?? ["127.0.0.1", "7000"]
guard ipPort.count == 2, let port = Int(ipPort[1]) else {
preconditionFailure("invalid ip+port configuration \(ipPort)")
}
self.ip = String(ipPort[0])
self.port = port
self.keepAlive = keepAlive ?? env("KEEP_ALIVE").flatMap(Bool.init) ?? true
self.requestTimeout = requestTimeout ?? env("REQUEST_TIMEOUT").flatMap(Int64.init).flatMap { .milliseconds($0) }
self.offload = offload ?? env("OFFLOAD").flatMap(Bool.init) ?? false
}

var description: String {
return "\(RuntimeEngine.self)(ip: \(self.ip), port: \(self.port), keepAlive: \(self.keepAlive), requestTimeout: \(String(describing: self.requestTimeout)), offload: \(self.offload)"
}
}

@usableFromInline
var description: String {
return "\(Configuration.self)\n \(self.general))\n \(self.lifecycle)\n \(self.runtimeEngine)"
}
}
}

public typealias LambdaResult = Result<[UInt8], Error>
Expand Down
95 changes: 95 additions & 0 deletions Sources/SwiftAwsLambda/LambdaConfiguration.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftAwsLambda open source project
//
// Copyright (c) 2017-2020 Apple Inc. and the SwiftAwsLambda project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftAwsLambda project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import Dispatch
import Logging
import NIO

extension Lambda {
@usableFromInline
internal struct Configuration: CustomStringConvertible {
let general: General
let lifecycle: Lifecycle
let runtimeEngine: RuntimeEngine

@usableFromInline
init() {
self.init(general: .init(), lifecycle: .init(), runtimeEngine: .init())
}

init(general: General? = nil, lifecycle: Lifecycle? = nil, runtimeEngine: RuntimeEngine? = nil) {
self.general = general ?? General()
self.lifecycle = lifecycle ?? Lifecycle()
self.runtimeEngine = runtimeEngine ?? RuntimeEngine()
}

struct General: CustomStringConvertible {
let logLevel: Logger.Level

init(logLevel: Logger.Level? = nil) {
self.logLevel = logLevel ?? env("LOG_LEVEL").flatMap(Logger.Level.init) ?? .info
}

var description: String {
return "\(General.self)(logLevel: \(self.logLevel))"
}
}

struct Lifecycle: CustomStringConvertible {
let id: String
let maxTimes: Int
let stopSignal: Signal

init(id: String? = nil, maxTimes: Int? = nil, stopSignal: Signal? = nil) {
self.id = id ?? "\(DispatchTime.now().uptimeNanoseconds)"
self.maxTimes = maxTimes ?? env("MAX_REQUESTS").flatMap(Int.init) ?? 0
self.stopSignal = stopSignal ?? env("STOP_SIGNAL").flatMap(Int32.init).flatMap(Signal.init) ?? Signal.TERM
precondition(self.maxTimes >= 0, "maxTimes must be equal or larger than 0")
}

var description: String {
return "\(Lifecycle.self)(id: \(self.id), maxTimes: \(self.maxTimes), stopSignal: \(self.stopSignal))"
}
}

struct RuntimeEngine: CustomStringConvertible {
let ip: String
let port: Int
let keepAlive: Bool
let requestTimeout: TimeAmount?
let offload: Bool

init(baseURL: String? = nil, keepAlive: Bool? = nil, requestTimeout: TimeAmount? = nil, offload: Bool? = nil) {
let ipPort = env("AWS_LAMBDA_RUNTIME_API")?.split(separator: ":") ?? ["127.0.0.1", "7000"]
guard ipPort.count == 2, let port = Int(ipPort[1]) else {
preconditionFailure("invalid ip+port configuration \(ipPort)")
}
self.ip = String(ipPort[0])
self.port = port
self.keepAlive = keepAlive ?? env("KEEP_ALIVE").flatMap(Bool.init) ?? true
self.requestTimeout = requestTimeout ?? env("REQUEST_TIMEOUT").flatMap(Int64.init).flatMap { .milliseconds($0) }
self.offload = offload ?? env("OFFLOAD").flatMap(Bool.init) ?? false
}

var description: String {
return "\(RuntimeEngine.self)(ip: \(self.ip), port: \(self.port), keepAlive: \(self.keepAlive), requestTimeout: \(String(describing: self.requestTimeout)), offload: \(self.offload)"
}
}

@usableFromInline
var description: String {
return "\(Configuration.self)\n \(self.general))\n \(self.lifecycle)\n \(self.runtimeEngine)"
}
}
}
Loading