Skip to content

Commit f496518

Browse files
committed
feat: implement AOT support for DynamoDB, Kinesis, and SQS batch processing functions
1 parent 447506b commit f496518

26 files changed

+1345
-0
lines changed
Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
using Amazon.CDK;
2+
using Amazon.CDK.AWS.DynamoDB;
3+
using Amazon.CDK.AWS.SQS;
4+
using Amazon.CDK.AWS.Kinesis;
5+
using Amazon.CDK.AWS.Lambda;
6+
using Amazon.CDK.AWS.Lambda.EventSources;
7+
using Constructs;
8+
using Attribute = Amazon.CDK.AWS.DynamoDB.Attribute;
9+
using Stream = Amazon.CDK.AWS.Kinesis.Stream;
10+
11+
namespace InfraShared;
12+
13+
public class BatchProcessingStackProps : PowertoolsDefaultStackProps
14+
{
15+
// Any specific properties for batch processing
16+
}
17+
18+
public class BatchProcessingStack : Stack
19+
{
20+
public Table DynamoDbTable { get; set; }
21+
public Queue StandardQueue { get; set; }
22+
public Queue FifoQueue { get; set; }
23+
public Stream KinesisStream { get; set; }
24+
25+
public BatchProcessingStack(Construct scope, string id, BatchProcessingStackProps props) : base(scope, id, props)
26+
{
27+
// DynamoDB table with streams enabled
28+
DynamoDbTable = new Table(this, "BatchProcessingTable", new TableProps
29+
{
30+
PartitionKey = new Attribute
31+
{
32+
Name = "id",
33+
Type = AttributeType.STRING
34+
},
35+
TableName = "BatchProcessingTable",
36+
BillingMode = BillingMode.PAY_PER_REQUEST,
37+
RemovalPolicy = RemovalPolicy.DESTROY,
38+
Stream = StreamViewType.NEW_AND_OLD_IMAGES
39+
});
40+
41+
// SQS Queues
42+
StandardQueue = new Queue(this, "BatchProcessingStandardQueue", new QueueProps
43+
{
44+
QueueName = "BatchProcessingStandardQueue",
45+
RemovalPolicy = RemovalPolicy.DESTROY
46+
});
47+
48+
FifoQueue = new Queue(this, "BatchProcessingFifoQueue", new QueueProps
49+
{
50+
QueueName = "BatchProcessingFifoQueue.fifo",
51+
Fifo = true,
52+
RemovalPolicy = RemovalPolicy.DESTROY
53+
});
54+
55+
// Kinesis Data Stream
56+
KinesisStream = new Stream(this, "BatchProcessingKinesisStream", new StreamProps
57+
{
58+
StreamName = "BatchProcessingKinesisStream",
59+
ShardCount = 1,
60+
RemovalPolicy = RemovalPolicy.DESTROY
61+
});
62+
63+
var utility = "batchprocessing";
64+
65+
if (props.IsAot)
66+
{
67+
CreateAotFunctions(this, utility, props);
68+
}
69+
else
70+
{
71+
CreateRegularFunctions(this, utility, props);
72+
}
73+
}
74+
75+
private void CreateAotFunctions(Construct scope, string utility, BatchProcessingStackProps props)
76+
{
77+
var sources = new[] { "DynamoDB", "SQS", "Kinesis" };
78+
79+
foreach (var source in sources)
80+
{
81+
var baseAotPath = $"../functions/{utility}/AOT-Function/src/AOT-Function{source}";
82+
var distAotPath = $"../functions/{utility}/AOT-Function/dist/AOT-Function{source}";
83+
var path = new Path(baseAotPath, distAotPath);
84+
props.Handler = $"AOT-Function{source}";
85+
86+
var architecture = props.ArchitectureString == "arm64" ? Architecture.ARM_64 : Architecture.X86_64;
87+
var arch = architecture == Architecture.X86_64 ? "X64" : "ARM";
88+
89+
var lambdaFunction = CreateFunctionConstruct(
90+
scope,
91+
$"{utility}_{arch}_aot_net8__{source}",
92+
Runtime.DOTNET_8,
93+
architecture,
94+
$"E2ETestLambda_{arch}_AOT_NET8_{utility}_{source}",
95+
path,
96+
props
97+
);
98+
99+
ConfigureEventSource(lambdaFunction, source);
100+
}
101+
}
102+
103+
private void CreateRegularFunctions(Construct scope, string utility, BatchProcessingStackProps props)
104+
{
105+
var sources = new[] { "DynamoDB", "SQS", "Kinesis" };
106+
107+
foreach (var source in sources)
108+
{
109+
var basePath = $"../functions/{utility}/Function/src/Function{source}";
110+
var distPath = $"../functions/{utility}/Function/dist/Function{source}";
111+
var path = new Path(basePath, distPath);
112+
props.Handler = $"Function{source}::Function{source}.Function::FunctionHandler";
113+
114+
// Create Lambda functions for different runtimes and architectures
115+
var runtimes = new[] {
116+
(runtime: Runtime.DOTNET_8, arch: Architecture.X86_64, archStr: "X64", runtimeStr : "NET8"),
117+
(runtime: Runtime.DOTNET_8, arch: Architecture.ARM_64, archStr: "ARM", runtimeStr : "NET8"),
118+
(runtime: Runtime.DOTNET_6, arch: Architecture.X86_64, archStr: "X64", runtimeStr : "NET6"),
119+
(runtime: Runtime.DOTNET_6, arch: Architecture.ARM_64, archStr: "ARM", runtimeStr : "NET6")
120+
};
121+
122+
foreach (var (runtime, arch, archStr, runtimeStr) in runtimes)
123+
{
124+
var lambdaFunction = CreateFunctionConstruct(
125+
scope,
126+
$"{utility}_{archStr}_{runtimeStr}_{source}",
127+
runtime,
128+
arch,
129+
$"E2ETestLambda_{archStr}_{runtimeStr}_{utility}_{source}",
130+
path,
131+
props
132+
);
133+
134+
ConfigureEventSource(lambdaFunction, source);
135+
}
136+
}
137+
}
138+
139+
private FunctionConstruct CreateFunctionConstruct(Construct scope, string id, Runtime runtime, Architecture architecture,
140+
string name, Path path, PowertoolsDefaultStackProps props)
141+
{
142+
var lambdaFunction = new FunctionConstruct(scope, id, new FunctionConstructProps
143+
{
144+
Runtime = runtime,
145+
Architecture = architecture,
146+
Name = name,
147+
Handler = props.Handler!,
148+
SourcePath = path.SourcePath,
149+
DistPath = path.DistPath,
150+
Environment = new Dictionary<string, string>
151+
{
152+
{ "BATCH_PROCESSING_TABLE_NAME", DynamoDbTable.TableName },
153+
{ "BATCH_PROCESSING_STANDARD_QUEUE_URL", StandardQueue.QueueUrl },
154+
{ "BATCH_PROCESSING_FIFO_QUEUE_URL", FifoQueue.QueueUrl },
155+
{ "BATCH_PROCESSING_KINESIS_STREAM_NAME", KinesisStream.StreamName }
156+
},
157+
IsAot = props.IsAot
158+
});
159+
160+
return lambdaFunction;
161+
}
162+
163+
private void ConfigureEventSource(FunctionConstruct lambdaFunction, string source)
164+
{
165+
switch (source)
166+
{
167+
case "DynamoDB":
168+
lambdaFunction.Function.AddEventSource(new DynamoEventSource(DynamoDbTable, new DynamoEventSourceProps
169+
{
170+
StartingPosition = StartingPosition.LATEST,
171+
BatchSize = 10,
172+
RetryAttempts = 3
173+
}));
174+
break;
175+
176+
case "SQS":
177+
lambdaFunction.Function.AddEventSource(new SqsEventSource(StandardQueue, new SqsEventSourceProps
178+
{
179+
BatchSize = 10,
180+
MaxBatchingWindow = Duration.Seconds(5)
181+
}));
182+
183+
// Add permissions for FIFO queue too
184+
FifoQueue.GrantConsumeMessages(lambdaFunction.Function);
185+
break;
186+
187+
case "Kinesis":
188+
lambdaFunction.Function.AddEventSource(new KinesisEventSource(KinesisStream, new KinesisEventSourceProps
189+
{
190+
StartingPosition = StartingPosition.LATEST,
191+
BatchSize = 10,
192+
RetryAttempts = 3,
193+
MaxBatchingWindow = Duration.Seconds(5)
194+
}));
195+
break;
196+
}
197+
}
198+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
using System.Text.Json.Serialization;
2+
using Amazon.Lambda.Core;
3+
using Amazon.Lambda.DynamoDBEvents;
4+
using Amazon.Lambda.RuntimeSupport;
5+
using Amazon.Lambda.Serialization.SystemTextJson;
6+
using AWS.Lambda.Powertools.BatchProcessing;
7+
using AWS.Lambda.Powertools.BatchProcessing.DynamoDb;
8+
9+
namespace AOT_FunctionDynamoDB;
10+
11+
public class Program
12+
{
13+
private static async Task Main()
14+
{
15+
Func<DynamoDBEvent, ILambdaContext, BatchItemFailuresResponse> handler = FunctionHandler;
16+
await LambdaBootstrapBuilder.Create(handler, new SourceGeneratorLambdaJsonSerializer<CustomJsonSerializerContext>())
17+
.Build()
18+
.RunAsync();
19+
}
20+
21+
[BatchProcessor(RecordHandler = typeof(DynamoDbRecordHandler))]
22+
public static BatchItemFailuresResponse FunctionHandler(DynamoDBEvent dynamoEvent, ILambdaContext context)
23+
{
24+
context.Logger.LogInformation($"Processing {dynamoEvent.Records.Count} records");
25+
return DynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse;
26+
}
27+
}
28+
29+
public class DynamoDbRecordHandler : IRecordHandler<DynamoDBEvent.DynamodbStreamRecord>
30+
{
31+
public Task<RecordHandlerResult> HandleAsync(DynamoDBEvent.DynamodbStreamRecord record, CancellationToken cancellationToken = default)
32+
{
33+
try
34+
{
35+
// Process the DynamoDB record
36+
if (record.Dynamodb.NewImage != null && record.Dynamodb.NewImage.TryGetValue("id", out var idValue))
37+
{
38+
Console.WriteLine($"Processing record with id: {idValue.S}");
39+
40+
if (idValue.S?.Contains("fail") == true)
41+
{
42+
throw new Exception($"Failed to process record with id: {idValue.S}");
43+
}
44+
}
45+
46+
return Task.FromResult(RecordHandlerResult.None);
47+
}
48+
catch (Exception ex)
49+
{
50+
Console.WriteLine($"Error processing record: {ex.Message}");
51+
return Task.FromResult(RecordHandlerResult.None);
52+
}
53+
}
54+
}
55+
56+
[JsonSerializable(typeof(DynamoDBEvent))]
57+
[JsonSerializable(typeof(BatchItemFailuresResponse))]
58+
public partial class CustomJsonSerializerContext : JsonSerializerContext
59+
{
60+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
<PropertyGroup>
3+
<OutputType>Exe</OutputType>
4+
<TargetFramework>net8.0</TargetFramework>
5+
<ImplicitUsings>enable</ImplicitUsings>
6+
<Nullable>enable</Nullable>
7+
<AWSProjectType>Lambda</AWSProjectType>
8+
<CopyLocalLockFileAssemblies>true</CopyLocalLockFileAssemblies>
9+
<PublishAot>true</PublishAot>
10+
<StripSymbols>true</StripSymbols>
11+
<TrimMode>partial</TrimMode>
12+
<RootNamespace>AOT_FunctionDynamoDB</RootNamespace>
13+
</PropertyGroup>
14+
<ItemGroup>
15+
<PackageReference Include="Amazon.Lambda.RuntimeSupport" Version="1.12.0"/>
16+
<PackageReference Include="Amazon.Lambda.Core" Version="2.5.0"/>
17+
<PackageReference Include="Amazon.Lambda.Serialization.SystemTextJson" Version="2.4.4"/>
18+
</ItemGroup>
19+
<ItemGroup>
20+
<ProjectReference Include="..\..\..\..\..\..\..\src\AWS.Lambda.Powertools.BatchProcessing\AWS.Lambda.Powertools.BatchProcessing.csproj" />
21+
</ItemGroup>
22+
</Project>
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
{
2+
"Information": [
3+
"This file provides default values for the deployment wizard inside Visual Studio and the AWS Lambda commands added to the .NET Core CLI.",
4+
"To learn more about the Lambda commands with the .NET Core CLI execute the following command at the command line in the project root directory.",
5+
"dotnet lambda help",
6+
"All the command line options for the Lambda command can be specified in this file."
7+
],
8+
"profile": "",
9+
"region": "",
10+
"configuration": "Release",
11+
"function-memory-size": 512,
12+
"function-timeout": 30
13+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// AOT-FunctionKinesis/Program.cs
2+
using System.Text;
3+
using System.Text.Json;
4+
using System.Text.Json.Serialization;
5+
using System.Threading.Tasks;
6+
using Amazon.Lambda.Core;
7+
using Amazon.Lambda.KinesisEvents;
8+
using Amazon.Lambda.RuntimeSupport;
9+
using Amazon.Lambda.Serialization.SystemTextJson;
10+
using AWS.Lambda.Powertools.BatchProcessing;
11+
using AWS.Lambda.Powertools.BatchProcessing.Kinesis;
12+
13+
namespace AOT_FunctionKinesis;
14+
15+
public class Program
16+
{
17+
private static async Task Main()
18+
{
19+
Func<KinesisEvent, ILambdaContext, BatchItemFailuresResponse> handler = FunctionHandler;
20+
await LambdaBootstrapBuilder.Create(handler, new SourceGeneratorLambdaJsonSerializer<CustomJsonSerializerContext>())
21+
.Build()
22+
.RunAsync();
23+
}
24+
25+
[BatchProcessor(RecordHandler = typeof(KinesisRecordHandler))]
26+
public static BatchItemFailuresResponse FunctionHandler(KinesisEvent kinesisEvent, ILambdaContext context)
27+
{
28+
context.Logger.LogInformation($"Processing {kinesisEvent.Records.Count} records");
29+
return KinesisEventBatchProcessor.Result.BatchItemFailuresResponse;
30+
}
31+
}
32+
33+
public class KinesisRecordHandler : IRecordHandler<KinesisEvent.KinesisEventRecord>
34+
{
35+
public Task<RecordHandlerResult> HandleAsync(KinesisEvent.KinesisEventRecord record, CancellationToken cancellationToken = default)
36+
{
37+
try
38+
{
39+
// Process the Kinesis record
40+
Console.WriteLine($"Processing record: {record.EventId}");
41+
42+
// Decode and parse the data
43+
string data = Encoding.UTF8.GetString(record.Kinesis.Data.ToArray());
44+
var dataJson = JsonSerializer.Deserialize<JsonElement>(data);
45+
46+
if (dataJson.TryGetProperty("status", out var status) && status.GetString() == "error")
47+
{
48+
throw new Exception($"Failed to process record with status error: {record.EventId}");
49+
}
50+
51+
return Task.FromResult(RecordHandlerResult.None);
52+
}
53+
catch (Exception ex)
54+
{
55+
Console.WriteLine($"Error processing record: {ex.Message}");
56+
return Task.FromResult(RecordHandlerResult.None);
57+
}
58+
}
59+
}
60+
61+
[JsonSerializable(typeof(KinesisEvent))]
62+
[JsonSerializable(typeof(BatchItemFailuresResponse))]
63+
public partial class CustomJsonSerializerContext : JsonSerializerContext
64+
{
65+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
<PropertyGroup>
3+
<OutputType>Exe</OutputType>
4+
<TargetFramework>net8.0</TargetFramework>
5+
<ImplicitUsings>enable</ImplicitUsings>
6+
<Nullable>enable</Nullable>
7+
<AWSProjectType>Lambda</AWSProjectType>
8+
<CopyLocalLockFileAssemblies>true</CopyLocalLockFileAssemblies>
9+
<PublishAot>true</PublishAot>
10+
<StripSymbols>true</StripSymbols>
11+
<TrimMode>partial</TrimMode>
12+
<RootNamespace>AOT_FunctionKinesis</RootNamespace>
13+
</PropertyGroup>
14+
<ItemGroup>
15+
<PackageReference Include="Amazon.Lambda.RuntimeSupport" Version="1.12.0"/>
16+
<PackageReference Include="Amazon.Lambda.Core" Version="2.5.0"/>
17+
<PackageReference Include="Amazon.Lambda.Serialization.SystemTextJson" Version="2.4.4"/>
18+
</ItemGroup>
19+
<ItemGroup>
20+
<ProjectReference Include="..\..\..\..\..\..\..\src\AWS.Lambda.Powertools.BatchProcessing\AWS.Lambda.Powertools.BatchProcessing.csproj" />
21+
</ItemGroup>
22+
</Project>

0 commit comments

Comments
 (0)