diff --git a/libraries/tests/e2e/InfraShared/BatchProcessingStack.cs b/libraries/tests/e2e/InfraShared/BatchProcessingStack.cs new file mode 100644 index 000000000..e6b786a1c --- /dev/null +++ b/libraries/tests/e2e/InfraShared/BatchProcessingStack.cs @@ -0,0 +1,198 @@ +using Amazon.CDK; +using Amazon.CDK.AWS.DynamoDB; +using Amazon.CDK.AWS.SQS; +using Amazon.CDK.AWS.Kinesis; +using Amazon.CDK.AWS.Lambda; +using Amazon.CDK.AWS.Lambda.EventSources; +using Constructs; +using Attribute = Amazon.CDK.AWS.DynamoDB.Attribute; +using Stream = Amazon.CDK.AWS.Kinesis.Stream; + +namespace InfraShared; + +public class BatchProcessingStackProps : PowertoolsDefaultStackProps +{ + // Any specific properties for batch processing +} + +public class BatchProcessingStack : Stack +{ + public Table DynamoDbTable { get; set; } + public Queue StandardQueue { get; set; } + public Queue FifoQueue { get; set; } + public Stream KinesisStream { get; set; } + + public BatchProcessingStack(Construct scope, string id, BatchProcessingStackProps props) : base(scope, id, props) + { + // DynamoDB table with streams enabled + DynamoDbTable = new Table(this, "BatchProcessingTable", new TableProps + { + PartitionKey = new Attribute + { + Name = "id", + Type = AttributeType.STRING + }, + TableName = "BatchProcessingTable", + BillingMode = BillingMode.PAY_PER_REQUEST, + RemovalPolicy = RemovalPolicy.DESTROY, + Stream = StreamViewType.NEW_AND_OLD_IMAGES + }); + + // SQS Queues + StandardQueue = new Queue(this, "BatchProcessingStandardQueue", new QueueProps + { + QueueName = "BatchProcessingStandardQueue", + RemovalPolicy = RemovalPolicy.DESTROY + }); + + FifoQueue = new Queue(this, "BatchProcessingFifoQueue", new QueueProps + { + QueueName = "BatchProcessingFifoQueue.fifo", + Fifo = true, + RemovalPolicy = RemovalPolicy.DESTROY + }); + + // Kinesis Data Stream + KinesisStream = new Stream(this, "BatchProcessingKinesisStream", new StreamProps + { + StreamName = "BatchProcessingKinesisStream", + ShardCount = 1, + RemovalPolicy = RemovalPolicy.DESTROY + }); + + var utility = "batchprocessing"; + + if (props.IsAot) + { + CreateAotFunctions(this, utility, props); + } + else + { + CreateRegularFunctions(this, utility, props); + } + } + + private void CreateAotFunctions(Construct scope, string utility, BatchProcessingStackProps props) + { + var sources = new[] { "DynamoDB", "SQS", "Kinesis" }; + + foreach (var source in sources) + { + var baseAotPath = $"../functions/{utility}/AOT-Function/src/AOT-Function{source}"; + var distAotPath = $"../functions/{utility}/AOT-Function/dist/AOT-Function{source}"; + var path = new Path(baseAotPath, distAotPath); + props.Handler = $"AOT-Function{source}"; + + var architecture = props.ArchitectureString == "arm64" ? Architecture.ARM_64 : Architecture.X86_64; + var arch = architecture == Architecture.X86_64 ? "X64" : "ARM"; + + var lambdaFunction = CreateFunctionConstruct( + scope, + $"{utility}_{arch}_aot_net8__{source}", + Runtime.DOTNET_8, + architecture, + $"E2ETestLambda_{arch}_AOT_NET8_{utility}_{source}", + path, + props + ); + + ConfigureEventSource(lambdaFunction, source); + } + } + + private void CreateRegularFunctions(Construct scope, string utility, BatchProcessingStackProps props) + { + var sources = new[] { "DynamoDB", "SQS", "Kinesis" }; + + foreach (var source in sources) + { + var basePath = $"../functions/{utility}/Function/src/Function{source}"; + var distPath = $"../functions/{utility}/Function/dist/Function{source}"; + var path = new Path(basePath, distPath); + props.Handler = $"Function{source}::Function{source}.Function::FunctionHandler"; + + // Create Lambda functions for different runtimes and architectures + var runtimes = new[] { + (runtime: Runtime.DOTNET_8, arch: Architecture.X86_64, archStr: "X64", runtimeStr : "NET8"), + (runtime: Runtime.DOTNET_8, arch: Architecture.ARM_64, archStr: "ARM", runtimeStr : "NET8"), + (runtime: Runtime.DOTNET_6, arch: Architecture.X86_64, archStr: "X64", runtimeStr : "NET6"), + (runtime: Runtime.DOTNET_6, arch: Architecture.ARM_64, archStr: "ARM", runtimeStr : "NET6") + }; + + foreach (var (runtime, arch, archStr, runtimeStr) in runtimes) + { + var lambdaFunction = CreateFunctionConstruct( + scope, + $"{utility}_{archStr}_{runtimeStr}_{source}", + runtime, + arch, + $"E2ETestLambda_{archStr}_{runtimeStr}_{utility}_{source}", + path, + props + ); + + ConfigureEventSource(lambdaFunction, source); + } + } + } + + private FunctionConstruct CreateFunctionConstruct(Construct scope, string id, Runtime runtime, Architecture architecture, + string name, Path path, PowertoolsDefaultStackProps props) + { + var lambdaFunction = new FunctionConstruct(scope, id, new FunctionConstructProps + { + Runtime = runtime, + Architecture = architecture, + Name = name, + Handler = props.Handler!, + SourcePath = path.SourcePath, + DistPath = path.DistPath, + Environment = new Dictionary + { + { "BATCH_PROCESSING_TABLE_NAME", DynamoDbTable.TableName }, + { "BATCH_PROCESSING_STANDARD_QUEUE_URL", StandardQueue.QueueUrl }, + { "BATCH_PROCESSING_FIFO_QUEUE_URL", FifoQueue.QueueUrl }, + { "BATCH_PROCESSING_KINESIS_STREAM_NAME", KinesisStream.StreamName } + }, + IsAot = props.IsAot + }); + + return lambdaFunction; + } + + private void ConfigureEventSource(FunctionConstruct lambdaFunction, string source) + { + switch (source) + { + case "DynamoDB": + lambdaFunction.Function.AddEventSource(new DynamoEventSource(DynamoDbTable, new DynamoEventSourceProps + { + StartingPosition = StartingPosition.LATEST, + BatchSize = 10, + RetryAttempts = 3 + })); + break; + + case "SQS": + lambdaFunction.Function.AddEventSource(new SqsEventSource(StandardQueue, new SqsEventSourceProps + { + BatchSize = 10, + MaxBatchingWindow = Duration.Seconds(5) + })); + + // Add permissions for FIFO queue too + FifoQueue.GrantConsumeMessages(lambdaFunction.Function); + break; + + case "Kinesis": + lambdaFunction.Function.AddEventSource(new KinesisEventSource(KinesisStream, new KinesisEventSourceProps + { + StartingPosition = StartingPosition.LATEST, + BatchSize = 10, + RetryAttempts = 3, + MaxBatchingWindow = Duration.Seconds(5) + })); + break; + } + } +} \ No newline at end of file diff --git a/libraries/tests/e2e/functions/batchprocessing/AOT-Function/src/AOT-FunctionDynamoDB/AOT-FunctionDynamoDB.cs b/libraries/tests/e2e/functions/batchprocessing/AOT-Function/src/AOT-FunctionDynamoDB/AOT-FunctionDynamoDB.cs new file mode 100644 index 000000000..0a21d82bf --- /dev/null +++ b/libraries/tests/e2e/functions/batchprocessing/AOT-Function/src/AOT-FunctionDynamoDB/AOT-FunctionDynamoDB.cs @@ -0,0 +1,60 @@ +using System.Text.Json.Serialization; +using Amazon.Lambda.Core; +using Amazon.Lambda.DynamoDBEvents; +using Amazon.Lambda.RuntimeSupport; +using Amazon.Lambda.Serialization.SystemTextJson; +using AWS.Lambda.Powertools.BatchProcessing; +using AWS.Lambda.Powertools.BatchProcessing.DynamoDb; + +namespace AOT_FunctionDynamoDB; + +public class Program +{ + private static async Task Main() + { + Func handler = FunctionHandler; + await LambdaBootstrapBuilder.Create(handler, new SourceGeneratorLambdaJsonSerializer()) + .Build() + .RunAsync(); + } + + [BatchProcessor(RecordHandler = typeof(DynamoDbRecordHandler))] + public static BatchItemFailuresResponse FunctionHandler(DynamoDBEvent dynamoEvent, ILambdaContext context) + { + context.Logger.LogInformation($"Processing {dynamoEvent.Records.Count} records"); + return DynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse; + } +} + +public class DynamoDbRecordHandler : IRecordHandler +{ + public Task HandleAsync(DynamoDBEvent.DynamodbStreamRecord record, CancellationToken cancellationToken = default) + { + try + { + // Process the DynamoDB record + if (record.Dynamodb.NewImage != null && record.Dynamodb.NewImage.TryGetValue("id", out var idValue)) + { + Console.WriteLine($"Processing record with id: {idValue.S}"); + + if (idValue.S?.Contains("fail") == true) + { + throw new Exception($"Failed to process record with id: {idValue.S}"); + } + } + + return Task.FromResult(RecordHandlerResult.None); + } + catch (Exception ex) + { + Console.WriteLine($"Error processing record: {ex.Message}"); + return Task.FromResult(RecordHandlerResult.None); + } + } +} + +[JsonSerializable(typeof(DynamoDBEvent))] +[JsonSerializable(typeof(BatchItemFailuresResponse))] +public partial class CustomJsonSerializerContext : JsonSerializerContext +{ +} \ No newline at end of file diff --git a/libraries/tests/e2e/functions/batchprocessing/AOT-Function/src/AOT-FunctionDynamoDB/AOT-FunctionDynamoDB.csproj b/libraries/tests/e2e/functions/batchprocessing/AOT-Function/src/AOT-FunctionDynamoDB/AOT-FunctionDynamoDB.csproj new file mode 100644 index 000000000..b98c0fcbe --- /dev/null +++ b/libraries/tests/e2e/functions/batchprocessing/AOT-Function/src/AOT-FunctionDynamoDB/AOT-FunctionDynamoDB.csproj @@ -0,0 +1,22 @@ + + + Exe + net8.0 + enable + enable + Lambda + true + true + true + partial + AOT_FunctionDynamoDB + + + + + + + + + + \ No newline at end of file diff --git a/libraries/tests/e2e/functions/batchprocessing/AOT-Function/src/AOT-FunctionDynamoDB/aws-lambda-tools-defaults.json b/libraries/tests/e2e/functions/batchprocessing/AOT-Function/src/AOT-FunctionDynamoDB/aws-lambda-tools-defaults.json new file mode 100644 index 000000000..307a7dca4 --- /dev/null +++ b/libraries/tests/e2e/functions/batchprocessing/AOT-Function/src/AOT-FunctionDynamoDB/aws-lambda-tools-defaults.json @@ -0,0 +1,13 @@ +{ + "Information": [ + "This file provides default values for the deployment wizard inside Visual Studio and the AWS Lambda commands added to the .NET Core CLI.", + "To learn more about the Lambda commands with the .NET Core CLI execute the following command at the command line in the project root directory.", + "dotnet lambda help", + "All the command line options for the Lambda command can be specified in this file." + ], + "profile": "", + "region": "", + "configuration": "Release", + "function-memory-size": 512, + "function-timeout": 30 +} \ No newline at end of file diff --git a/libraries/tests/e2e/functions/batchprocessing/AOT-Function/src/AOT-FunctionKinesis/AOT-FunctionKinesis.cs b/libraries/tests/e2e/functions/batchprocessing/AOT-Function/src/AOT-FunctionKinesis/AOT-FunctionKinesis.cs new file mode 100644 index 000000000..e48edf1d6 --- /dev/null +++ b/libraries/tests/e2e/functions/batchprocessing/AOT-Function/src/AOT-FunctionKinesis/AOT-FunctionKinesis.cs @@ -0,0 +1,65 @@ +// AOT-FunctionKinesis/Program.cs +using System.Text; +using System.Text.Json; +using System.Text.Json.Serialization; +using System.Threading.Tasks; +using Amazon.Lambda.Core; +using Amazon.Lambda.KinesisEvents; +using Amazon.Lambda.RuntimeSupport; +using Amazon.Lambda.Serialization.SystemTextJson; +using AWS.Lambda.Powertools.BatchProcessing; +using AWS.Lambda.Powertools.BatchProcessing.Kinesis; + +namespace AOT_FunctionKinesis; + +public class Program +{ + private static async Task Main() + { + Func handler = FunctionHandler; + await LambdaBootstrapBuilder.Create(handler, new SourceGeneratorLambdaJsonSerializer()) + .Build() + .RunAsync(); + } + + [BatchProcessor(RecordHandler = typeof(KinesisRecordHandler))] + public static BatchItemFailuresResponse FunctionHandler(KinesisEvent kinesisEvent, ILambdaContext context) + { + context.Logger.LogInformation($"Processing {kinesisEvent.Records.Count} records"); + return KinesisEventBatchProcessor.Result.BatchItemFailuresResponse; + } +} + +public class KinesisRecordHandler : IRecordHandler +{ + public Task HandleAsync(KinesisEvent.KinesisEventRecord record, CancellationToken cancellationToken = default) + { + try + { + // Process the Kinesis record + Console.WriteLine($"Processing record: {record.EventId}"); + + // Decode and parse the data + string data = Encoding.UTF8.GetString(record.Kinesis.Data.ToArray()); + var dataJson = JsonSerializer.Deserialize(data); + + if (dataJson.TryGetProperty("status", out var status) && status.GetString() == "error") + { + throw new Exception($"Failed to process record with status error: {record.EventId}"); + } + + return Task.FromResult(RecordHandlerResult.None); + } + catch (Exception ex) + { + Console.WriteLine($"Error processing record: {ex.Message}"); + return Task.FromResult(RecordHandlerResult.None); + } + } +} + +[JsonSerializable(typeof(KinesisEvent))] +[JsonSerializable(typeof(BatchItemFailuresResponse))] +public partial class CustomJsonSerializerContext : JsonSerializerContext +{ +} \ No newline at end of file diff --git a/libraries/tests/e2e/functions/batchprocessing/AOT-Function/src/AOT-FunctionKinesis/AOT-FunctionKinesis.csproj b/libraries/tests/e2e/functions/batchprocessing/AOT-Function/src/AOT-FunctionKinesis/AOT-FunctionKinesis.csproj new file mode 100644 index 000000000..93467e062 --- /dev/null +++ b/libraries/tests/e2e/functions/batchprocessing/AOT-Function/src/AOT-FunctionKinesis/AOT-FunctionKinesis.csproj @@ -0,0 +1,22 @@ + + + Exe + net8.0 + enable + enable + Lambda + true + true + true + partial + AOT_FunctionKinesis + + + + + + + + + + \ No newline at end of file diff --git a/libraries/tests/e2e/functions/batchprocessing/AOT-Function/src/AOT-FunctionKinesis/aws-lambda-tools-defaults.json b/libraries/tests/e2e/functions/batchprocessing/AOT-Function/src/AOT-FunctionKinesis/aws-lambda-tools-defaults.json new file mode 100644 index 000000000..307a7dca4 --- /dev/null +++ b/libraries/tests/e2e/functions/batchprocessing/AOT-Function/src/AOT-FunctionKinesis/aws-lambda-tools-defaults.json @@ -0,0 +1,13 @@ +{ + "Information": [ + "This file provides default values for the deployment wizard inside Visual Studio and the AWS Lambda commands added to the .NET Core CLI.", + "To learn more about the Lambda commands with the .NET Core CLI execute the following command at the command line in the project root directory.", + "dotnet lambda help", + "All the command line options for the Lambda command can be specified in this file." + ], + "profile": "", + "region": "", + "configuration": "Release", + "function-memory-size": 512, + "function-timeout": 30 +} \ No newline at end of file diff --git a/libraries/tests/e2e/functions/batchprocessing/AOT-Function/src/AOT-FunctionSQS/AOT-FunctionSQS.cs b/libraries/tests/e2e/functions/batchprocessing/AOT-Function/src/AOT-FunctionSQS/AOT-FunctionSQS.cs new file mode 100644 index 000000000..75834cfba --- /dev/null +++ b/libraries/tests/e2e/functions/batchprocessing/AOT-Function/src/AOT-FunctionSQS/AOT-FunctionSQS.cs @@ -0,0 +1,64 @@ +// AOT-FunctionSQS/Program.cs +using System.Text.Json; +using System.Text.Json.Serialization; +using System.Threading.Tasks; +using Amazon.Lambda.Core; +using Amazon.Lambda.RuntimeSupport; +using Amazon.Lambda.Serialization.SystemTextJson; +using Amazon.Lambda.SQSEvents; +using AWS.Lambda.Powertools.BatchProcessing; +using AWS.Lambda.Powertools.BatchProcessing.Sqs; + +namespace AOT_FunctionSQS; + +public class Program +{ + private static async Task Main() + { + Func handler = FunctionHandler; + await LambdaBootstrapBuilder.Create(handler, new SourceGeneratorLambdaJsonSerializer()) + .Build() + .RunAsync(); + } + + [BatchProcessor(RecordHandler = typeof(SqsRecordHandler))] + public static BatchItemFailuresResponse FunctionHandler(SQSEvent sqsEvent, ILambdaContext context) + { + context.Logger.LogInformation($"Processing {sqsEvent.Records.Count} records"); + return SqsBatchProcessor.Result.BatchItemFailuresResponse; + } +} + +public class SqsRecordHandler : IRecordHandler +{ + public Task HandleAsync(SQSEvent.SQSMessage message, CancellationToken cancellationToken = default) + { + try + { + // Process the SQS message + Console.WriteLine($"Processing message: {message.MessageId}"); + + // Attempt to parse the message body as JSON + var messageBody = message.Body; + var messageJson = JsonSerializer.Deserialize(messageBody); + + if (messageJson.TryGetProperty("action", out var action) && action.GetString() == "fail") + { + throw new Exception($"Failed to process message with ID: {message.MessageId}"); + } + + return Task.FromResult(RecordHandlerResult.None); + } + catch (Exception ex) + { + Console.WriteLine($"Error processing message: {ex.Message}"); + return Task.FromResult(RecordHandlerResult.None); + } + } +} + +[JsonSerializable(typeof(SQSEvent))] +[JsonSerializable(typeof(BatchItemFailuresResponse))] +public partial class CustomJsonSerializerContext : JsonSerializerContext +{ +} \ No newline at end of file diff --git a/libraries/tests/e2e/functions/batchprocessing/AOT-Function/src/AOT-FunctionSQS/AOT-FunctionSQS.csproj b/libraries/tests/e2e/functions/batchprocessing/AOT-Function/src/AOT-FunctionSQS/AOT-FunctionSQS.csproj new file mode 100644 index 000000000..05c589797 --- /dev/null +++ b/libraries/tests/e2e/functions/batchprocessing/AOT-Function/src/AOT-FunctionSQS/AOT-FunctionSQS.csproj @@ -0,0 +1,22 @@ + + + Exe + net8.0 + enable + enable + Lambda + true + true + true + partial + AOT_FunctionSQS + + + + + + + + + + \ No newline at end of file diff --git a/libraries/tests/e2e/functions/batchprocessing/AOT-Function/src/AOT-FunctionSQS/aws-lambda-tools-defaults.json b/libraries/tests/e2e/functions/batchprocessing/AOT-Function/src/AOT-FunctionSQS/aws-lambda-tools-defaults.json new file mode 100644 index 000000000..307a7dca4 --- /dev/null +++ b/libraries/tests/e2e/functions/batchprocessing/AOT-Function/src/AOT-FunctionSQS/aws-lambda-tools-defaults.json @@ -0,0 +1,13 @@ +{ + "Information": [ + "This file provides default values for the deployment wizard inside Visual Studio and the AWS Lambda commands added to the .NET Core CLI.", + "To learn more about the Lambda commands with the .NET Core CLI execute the following command at the command line in the project root directory.", + "dotnet lambda help", + "All the command line options for the Lambda command can be specified in this file." + ], + "profile": "", + "region": "", + "configuration": "Release", + "function-memory-size": 512, + "function-timeout": 30 +} \ No newline at end of file diff --git a/libraries/tests/e2e/functions/batchprocessing/BatchProcessingTests.sln b/libraries/tests/e2e/functions/batchprocessing/BatchProcessingTests.sln new file mode 100644 index 000000000..2c57787fb --- /dev/null +++ b/libraries/tests/e2e/functions/batchprocessing/BatchProcessingTests.sln @@ -0,0 +1,30 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio Version 17 +VisualStudioVersion = 17.0.31903.59 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Function", "Function", "{FE3A26C9-5A8D-4DD3-A87B-2D7FC5BC15A8}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "test", "test", "{28C61FF3-B4F5-44AC-9375-A4C6FC8579C8}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Function.Tests", "Function\test\Function.Tests\Function.Tests.csproj", "{8959B0AC-3B85-4E30-9C48-CAD5F72AD5BB}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {8959B0AC-3B85-4E30-9C48-CAD5F72AD5BB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {8959B0AC-3B85-4E30-9C48-CAD5F72AD5BB}.Debug|Any CPU.Build.0 = Debug|Any CPU + {8959B0AC-3B85-4E30-9C48-CAD5F72AD5BB}.Release|Any CPU.ActiveCfg = Release|Any CPU + {8959B0AC-3B85-4E30-9C48-CAD5F72AD5BB}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(NestedProjects) = preSolution + {28C61FF3-B4F5-44AC-9375-A4C6FC8579C8} = {FE3A26C9-5A8D-4DD3-A87B-2D7FC5BC15A8} + {8959B0AC-3B85-4E30-9C48-CAD5F72AD5BB} = {28C61FF3-B4F5-44AC-9375-A4C6FC8579C8} + EndGlobalSection +EndGlobal diff --git a/libraries/tests/e2e/functions/batchprocessing/Function/src/FunctionDynamoDB/FunctionDynamoDB.cs b/libraries/tests/e2e/functions/batchprocessing/Function/src/FunctionDynamoDB/FunctionDynamoDB.cs new file mode 100644 index 000000000..833df9478 --- /dev/null +++ b/libraries/tests/e2e/functions/batchprocessing/Function/src/FunctionDynamoDB/FunctionDynamoDB.cs @@ -0,0 +1,55 @@ +// FunctionDynamoDB/Function.cs +using System.Text.Json; +using System.Threading.Tasks; +using Amazon.Lambda.Core; +using Amazon.Lambda.DynamoDBEvents; +using AWS.Lambda.Powertools.BatchProcessing; +using AWS.Lambda.Powertools.BatchProcessing.DynamoDb; + +// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class +[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] + +namespace FunctionDynamoDB; + +public class Function +{ + /// + /// Process DynamoDB Stream events + /// + [BatchProcessor(RecordHandler = typeof(DynamoDbRecordHandler))] + public BatchItemFailuresResponse FunctionHandler(DynamoDBEvent dynamoEvent, ILambdaContext context) + { + context.Logger.LogInformation($"Processing {dynamoEvent.Records.Count} records"); + return DynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse; + } +} + +public class DynamoDbRecordHandler : IRecordHandler +{ + public Task HandleAsync(DynamoDBEvent.DynamodbStreamRecord record, CancellationToken cancellationToken = default) + { + try + { + // Process the DynamoDB record + // Example: Check if NewImage contains certain fields and process based on that + if (record.Dynamodb.NewImage != null && record.Dynamodb.NewImage.TryGetValue("id", out var idValue)) + { + // Process the record based on id + Console.WriteLine($"Processing record with id: {idValue.S}"); + + // Simulate failed processing for specific IDs + if (idValue.S?.Contains("fail") == true) + { + throw new Exception($"Failed to process record with id: {idValue.S}"); + } + } + + return Task.FromResult(RecordHandlerResult.None); + } + catch (Exception ex) + { + Console.WriteLine($"Error processing record: {ex.Message}"); + return Task.FromResult(RecordHandlerResult.None); + } + } +} \ No newline at end of file diff --git a/libraries/tests/e2e/functions/batchprocessing/Function/src/FunctionDynamoDB/FunctionDynamoDB.csproj b/libraries/tests/e2e/functions/batchprocessing/Function/src/FunctionDynamoDB/FunctionDynamoDB.csproj new file mode 100644 index 000000000..0bbf973d2 --- /dev/null +++ b/libraries/tests/e2e/functions/batchprocessing/Function/src/FunctionDynamoDB/FunctionDynamoDB.csproj @@ -0,0 +1,20 @@ + + + net6.0;net8.0 + enable + enable + true + Lambda + + true + + true + + + + + + + + + \ No newline at end of file diff --git a/libraries/tests/e2e/functions/batchprocessing/Function/src/FunctionDynamoDB/aws-lambda-tools-defaults.json b/libraries/tests/e2e/functions/batchprocessing/Function/src/FunctionDynamoDB/aws-lambda-tools-defaults.json new file mode 100644 index 000000000..307a7dca4 --- /dev/null +++ b/libraries/tests/e2e/functions/batchprocessing/Function/src/FunctionDynamoDB/aws-lambda-tools-defaults.json @@ -0,0 +1,13 @@ +{ + "Information": [ + "This file provides default values for the deployment wizard inside Visual Studio and the AWS Lambda commands added to the .NET Core CLI.", + "To learn more about the Lambda commands with the .NET Core CLI execute the following command at the command line in the project root directory.", + "dotnet lambda help", + "All the command line options for the Lambda command can be specified in this file." + ], + "profile": "", + "region": "", + "configuration": "Release", + "function-memory-size": 512, + "function-timeout": 30 +} \ No newline at end of file diff --git a/libraries/tests/e2e/functions/batchprocessing/Function/src/FunctionKinesis/FunctionKinesis.cs b/libraries/tests/e2e/functions/batchprocessing/Function/src/FunctionKinesis/FunctionKinesis.cs new file mode 100644 index 000000000..5706d9e09 --- /dev/null +++ b/libraries/tests/e2e/functions/batchprocessing/Function/src/FunctionKinesis/FunctionKinesis.cs @@ -0,0 +1,54 @@ +using System.Text; +using System.Text.Json; +using System.Threading.Tasks; +using Amazon.Lambda.Core; +using Amazon.Lambda.KinesisEvents; +using AWS.Lambda.Powertools.BatchProcessing; +using AWS.Lambda.Powertools.BatchProcessing.Kinesis; + +// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class +[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] + +namespace FunctionKinesis; + +public class Function +{ + /// + /// Process Kinesis events + /// + [BatchProcessor(RecordHandler = typeof(KinesisRecordHandler))] + public BatchItemFailuresResponse FunctionHandler(KinesisEvent kinesisEvent, ILambdaContext context) + { + context.Logger.LogInformation($"Processing {kinesisEvent.Records.Count} records"); + return KinesisEventBatchProcessor.Result.BatchItemFailuresResponse; + } +} + +public class KinesisRecordHandler : IRecordHandler +{ + public Task HandleAsync(KinesisEvent.KinesisEventRecord record, CancellationToken cancellationToken = default) + { + try + { + // Process the Kinesis record + Console.WriteLine($"Processing record: {record.EventId}"); + + // Decode and parse the data + string data = Encoding.UTF8.GetString(record.Kinesis.Data.ToArray()); + var dataJson = JsonSerializer.Deserialize(data); + + // Example: Check if the data contains specific fields + if (dataJson.TryGetProperty("status", out var status) && status.GetString() == "error") + { + throw new Exception($"Failed to process record with status error: {record.EventId}"); + } + + return Task.FromResult(RecordHandlerResult.None); + } + catch (Exception ex) + { + Console.WriteLine($"Error processing record: {ex.Message}"); + return Task.FromResult(RecordHandlerResult.None); + } + } +} \ No newline at end of file diff --git a/libraries/tests/e2e/functions/batchprocessing/Function/src/FunctionKinesis/FunctionKinesis.csproj b/libraries/tests/e2e/functions/batchprocessing/Function/src/FunctionKinesis/FunctionKinesis.csproj new file mode 100644 index 000000000..0bbf973d2 --- /dev/null +++ b/libraries/tests/e2e/functions/batchprocessing/Function/src/FunctionKinesis/FunctionKinesis.csproj @@ -0,0 +1,20 @@ + + + net6.0;net8.0 + enable + enable + true + Lambda + + true + + true + + + + + + + + + \ No newline at end of file diff --git a/libraries/tests/e2e/functions/batchprocessing/Function/src/FunctionKinesis/aws-lambda-tools-defaults.json b/libraries/tests/e2e/functions/batchprocessing/Function/src/FunctionKinesis/aws-lambda-tools-defaults.json new file mode 100644 index 000000000..307a7dca4 --- /dev/null +++ b/libraries/tests/e2e/functions/batchprocessing/Function/src/FunctionKinesis/aws-lambda-tools-defaults.json @@ -0,0 +1,13 @@ +{ + "Information": [ + "This file provides default values for the deployment wizard inside Visual Studio and the AWS Lambda commands added to the .NET Core CLI.", + "To learn more about the Lambda commands with the .NET Core CLI execute the following command at the command line in the project root directory.", + "dotnet lambda help", + "All the command line options for the Lambda command can be specified in this file." + ], + "profile": "", + "region": "", + "configuration": "Release", + "function-memory-size": 512, + "function-timeout": 30 +} \ No newline at end of file diff --git a/libraries/tests/e2e/functions/batchprocessing/Function/src/FunctionSQS/FunctionSQS.cs b/libraries/tests/e2e/functions/batchprocessing/Function/src/FunctionSQS/FunctionSQS.cs new file mode 100644 index 000000000..61b9078be --- /dev/null +++ b/libraries/tests/e2e/functions/batchprocessing/Function/src/FunctionSQS/FunctionSQS.cs @@ -0,0 +1,54 @@ +// FunctionSQS/Function.cs +using System.Text.Json; +using System.Threading.Tasks; +using Amazon.Lambda.Core; +using Amazon.Lambda.SQSEvents; +using AWS.Lambda.Powertools.BatchProcessing; +using AWS.Lambda.Powertools.BatchProcessing.Sqs; + +// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class +[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] + +namespace FunctionSQS; + +public class Function +{ + /// + /// Process SQS events + /// + [BatchProcessor(RecordHandler = typeof(SqsRecordHandler))] + public BatchItemFailuresResponse FunctionHandler(SQSEvent sqsEvent, ILambdaContext context) + { + context.Logger.LogInformation($"Processing {sqsEvent.Records.Count} records"); + return SqsBatchProcessor.Result.BatchItemFailuresResponse; + } +} + +public class SqsRecordHandler : IRecordHandler +{ + public Task HandleAsync(SQSEvent.SQSMessage message, CancellationToken cancellationToken = default) + { + try + { + // Process the SQS message + Console.WriteLine($"Processing message: {message.MessageId}"); + + // Attempt to parse the message body as JSON + var messageBody = message.Body; + var messageJson = JsonSerializer.Deserialize(messageBody); + + // Example: Inspect messageJson for specific content and process accordingly + if (messageJson.TryGetProperty("action", out var action) && action.GetString() == "fail") + { + throw new Exception($"Failed to process message with ID: {message.MessageId}"); + } + + return Task.FromResult(RecordHandlerResult.None); + } + catch (Exception ex) + { + Console.WriteLine($"Error processing message: {ex.Message}"); + return Task.FromResult(RecordHandlerResult.None); + } + } +} \ No newline at end of file diff --git a/libraries/tests/e2e/functions/batchprocessing/Function/src/FunctionSQS/FunctionSQS.csproj b/libraries/tests/e2e/functions/batchprocessing/Function/src/FunctionSQS/FunctionSQS.csproj new file mode 100644 index 000000000..0bbf973d2 --- /dev/null +++ b/libraries/tests/e2e/functions/batchprocessing/Function/src/FunctionSQS/FunctionSQS.csproj @@ -0,0 +1,20 @@ + + + net6.0;net8.0 + enable + enable + true + Lambda + + true + + true + + + + + + + + + \ No newline at end of file diff --git a/libraries/tests/e2e/functions/batchprocessing/Function/src/FunctionSQS/aws-lambda-tools-defaults.json b/libraries/tests/e2e/functions/batchprocessing/Function/src/FunctionSQS/aws-lambda-tools-defaults.json new file mode 100644 index 000000000..307a7dca4 --- /dev/null +++ b/libraries/tests/e2e/functions/batchprocessing/Function/src/FunctionSQS/aws-lambda-tools-defaults.json @@ -0,0 +1,13 @@ +{ + "Information": [ + "This file provides default values for the deployment wizard inside Visual Studio and the AWS Lambda commands added to the .NET Core CLI.", + "To learn more about the Lambda commands with the .NET Core CLI execute the following command at the command line in the project root directory.", + "dotnet lambda help", + "All the command line options for the Lambda command can be specified in this file." + ], + "profile": "", + "region": "", + "configuration": "Release", + "function-memory-size": 512, + "function-timeout": 30 +} \ No newline at end of file diff --git a/libraries/tests/e2e/functions/batchprocessing/Function/test/Function.Tests/DynamoDBTests.cs b/libraries/tests/e2e/functions/batchprocessing/Function/test/Function.Tests/DynamoDBTests.cs new file mode 100644 index 000000000..368f30b8e --- /dev/null +++ b/libraries/tests/e2e/functions/batchprocessing/Function/test/Function.Tests/DynamoDBTests.cs @@ -0,0 +1,146 @@ +using Amazon.DynamoDBv2; +using Amazon.DynamoDBv2.Model; +using Amazon.Lambda; +using Xunit; +using Xunit.Abstractions; + +namespace Function.Tests; + +public class DynamoDbTests : TestBase +{ + private readonly string _tableName = "BatchProcessingTable"; + private readonly AmazonDynamoDBClient _dynamoDbClient; + private readonly AmazonLambdaClient _lambdaClient; + + public DynamoDbTests(ITestOutputHelper output) : base(output) + { + _dynamoDbClient = new AmazonDynamoDBClient(); + _lambdaClient = new AmazonLambdaClient(); + } + + [Theory] + [InlineData("E2ETestLambda_X64_NET8_batchprocessing_DynamoDB")] + [InlineData("E2ETestLambda_ARM_NET8_batchprocessing_DynamoDB")] + [InlineData("E2ETestLambda_X64_NET6_batchprocessing_DynamoDB")] + [InlineData("E2ETestLambda_ARM_NET6_batchprocessing_DynamoDB")] + public async Task TestSuccessfulProcessing(string functionName) + { + await TestSuccessfulBatchProcessing(functionName); + } + + [Theory] + [InlineData("E2ETestLambda_X64_NET8_batchprocessing_DynamoDB")] + [InlineData("E2ETestLambda_ARM_NET8_batchprocessing_DynamoDB")] + [InlineData("E2ETestLambda_X64_NET6_batchprocessing_DynamoDB")] + [InlineData("E2ETestLambda_ARM_NET6_batchprocessing_DynamoDB")] + public async Task TestFailedItems(string functionName) + { + await TestFailedItemsProcessing(functionName); + } + + [Trait("Category", "AOT")] + [Theory] + [InlineData("E2ETestLambda_ARM_AOT_NET8_batchprocessing_DynamoDB")] + [InlineData("E2ETestLambda_X64_AOT_NET8_batchprocessing_DynamoDB")] + public async Task TestAotFunctionProcessing(string functionName) + { + await TestAotProcessing(functionName); + } + + private async Task TestSuccessfulBatchProcessing(string functionName) + { + // Arrange - Create 3 items that should process successfully + for (int i = 1; i <= 3; i++) + { + var item = new Dictionary + { + ["id"] = new AttributeValue { S = $"success-{Guid.NewGuid()}" }, + ["name"] = new AttributeValue { S = $"Test Item {i}" }, + ["data"] = new AttributeValue { S = $"Some data {i}" }, + ["timestamp"] = new AttributeValue { N = DateTimeOffset.UtcNow.ToUnixTimeSeconds().ToString() } + }; + + await _dynamoDbClient.PutItemAsync(_tableName, item); + } + + // Wait for stream processing + await Task.Delay(TimeSpan.FromSeconds(10)); + + // Assert - Check CloudWatch logs to see successful processing + var success = await WaitForSuccessInLogs( + functionName, + "Processing record with id: success-", + TimeSpan.FromMinutes(2)); + + Assert.True(success, $"Failed to find successful processing in logs for {functionName}"); + } + + private async Task TestFailedItemsProcessing(string functionName) + { + // Arrange - Create items that should fail + var failItem = new Dictionary + { + ["id"] = new AttributeValue { S = "fail-item" }, + ["name"] = new AttributeValue { S = "Item that should fail" }, + ["data"] = new AttributeValue { S = "This should trigger failure logic" }, + ["timestamp"] = new AttributeValue { N = DateTimeOffset.UtcNow.ToUnixTimeSeconds().ToString() } + }; + + await _dynamoDbClient.PutItemAsync(_tableName, failItem); + + // Create successful item right after to verify partial batch processing + var successItem = new Dictionary + { + ["id"] = new AttributeValue { S = $"success-after-fail-{Guid.NewGuid()}" }, + ["name"] = new AttributeValue { S = "Success after failure" }, + ["data"] = new AttributeValue { S = "This should be processed" }, + ["timestamp"] = new AttributeValue { N = DateTimeOffset.UtcNow.ToUnixTimeSeconds().ToString() } + }; + + await _dynamoDbClient.PutItemAsync(_tableName, successItem); + + // Wait for processing + await Task.Delay(TimeSpan.FromSeconds(10)); + + // Assert - Verify both error handling and successful processing + var failureLogged = await WaitForSuccessInLogs( + functionName, + "Failed to process record with id: fail-item", + TimeSpan.FromMinutes(2)); + + Assert.True(failureLogged, $"Failed to find error handling in logs for {functionName}"); + + // Also verify the success item was processed + var successAfterFailure = await WaitForSuccessInLogs( + functionName, + "Processing record with id: success-after-fail", + TimeSpan.FromMinutes(2)); + + Assert.True(successAfterFailure, $"Failed to process items after failure for {functionName}"); + } + + private async Task TestAotProcessing(string functionName) + { + // Arrange - Create item for AOT function + var item = new Dictionary + { + ["id"] = new AttributeValue { S = $"aot-test-{Guid.NewGuid()}" }, + ["name"] = new AttributeValue { S = "AOT Test Item" }, + ["data"] = new AttributeValue { S = "Testing AOT compiled function" }, + ["timestamp"] = new AttributeValue { N = DateTimeOffset.UtcNow.ToUnixTimeSeconds().ToString() } + }; + + await _dynamoDbClient.PutItemAsync(_tableName, item); + + // Wait for processing + await Task.Delay(TimeSpan.FromSeconds(10)); + + // Assert - Check logs for AOT function + var aotSuccess = await WaitForSuccessInLogs( + functionName, + "Processing record with id: aot-test-", + TimeSpan.FromMinutes(2)); + + Assert.True(aotSuccess, $"Failed to find AOT function processing in logs for {functionName}"); + } +} \ No newline at end of file diff --git a/libraries/tests/e2e/functions/batchprocessing/Function/test/Function.Tests/Function.Tests.csproj b/libraries/tests/e2e/functions/batchprocessing/Function/test/Function.Tests/Function.Tests.csproj new file mode 100644 index 000000000..0b4a52d85 --- /dev/null +++ b/libraries/tests/e2e/functions/batchprocessing/Function/test/Function.Tests/Function.Tests.csproj @@ -0,0 +1,20 @@ + + + net8.0 + enable + enable + true + Logging.E2E.Tests + + + + + + + + + + + + + \ No newline at end of file diff --git a/libraries/tests/e2e/functions/batchprocessing/Function/test/Function.Tests/KinesisTests.cs b/libraries/tests/e2e/functions/batchprocessing/Function/test/Function.Tests/KinesisTests.cs new file mode 100644 index 000000000..ca7030310 --- /dev/null +++ b/libraries/tests/e2e/functions/batchprocessing/Function/test/Function.Tests/KinesisTests.cs @@ -0,0 +1,147 @@ +using System.Text; +using Amazon; +using Amazon.Kinesis; +using Amazon.Kinesis.Model; +using Amazon.Lambda; +using Xunit; +using Xunit.Abstractions; + +namespace Function.Tests; + +public class KinesisTests : TestBase +{ + private readonly AmazonKinesisClient _kinesisClient; + private readonly AmazonLambdaClient _lambdaClient; + private readonly string _streamName = "BatchProcessingKinesisStream"; + + public KinesisTests(ITestOutputHelper output) : base(output) + { + _kinesisClient = new AmazonKinesisClient(); + _lambdaClient = new AmazonLambdaClient(); + } + + [Theory] + [InlineData("E2ETestLambda_X64_NET8_batchprocessing_Kinesis")] + [InlineData("E2ETestLambda_ARM_NET8_batchprocessing_Kinesis")] + [InlineData("E2ETestLambda_X64_NET6_batchprocessing_Kinesis")] + [InlineData("E2ETestLambda_ARM_NET6_batchprocessing_Kinesis")] + public async Task TestSuccessfulProcessing(string functionName) + { + await TestSuccessfulKinesisProcessing(functionName); + } + + [Theory] + [InlineData("E2ETestLambda_X64_NET8_batchprocessing_Kinesis")] + [InlineData("E2ETestLambda_ARM_NET8_batchprocessing_Kinesis")] + [InlineData("E2ETestLambda_X64_NET6_batchprocessing_Kinesis")] + [InlineData("E2ETestLambda_ARM_NET6_batchprocessing_Kinesis")] + public async Task TestFailedItems(string functionName) + { + await TestFailedKinesisItems(functionName); + } + + [Trait("Category", "AOT")] + [Theory] + [InlineData("E2ETestLambda_ARM_AOT_NET8_batchprocessing_Kinesis")] + [InlineData("E2ETestLambda_X64_AOT_NET8_batchprocessing_Kinesis")] + public async Task TestAotFunctionProcessing(string functionName) + { + await TestAotKinesisProcessing(functionName); + } + + private async Task TestSuccessfulKinesisProcessing(string functionName) + { + // Arrange - Put records into the Kinesis stream + var putRecordsRequest = new PutRecordsRequest + { + StreamName = _streamName, + Records = new List() + }; + + for (int i = 1; i <= 3; i++) + { + var record = new { Id = i, Name = $"Kinesis Record {i}", Status = "success", Timestamp = DateTime.UtcNow }; + var data = Encoding.UTF8.GetBytes(Serialize(record)); + + putRecordsRequest.Records.Add(new PutRecordsRequestEntry + { + Data = new MemoryStream(data), + PartitionKey = $"partition-{i}" + }); + } + + await _kinesisClient.PutRecordsAsync(putRecordsRequest); + + // Wait for processing + await Task.Delay(TimeSpan.FromSeconds(10)); + + // Assert - Check logs for successful processing + var success = await WaitForSuccessInLogs( + functionName, + "Processing record:", + TimeSpan.FromMinutes(2)); + + Assert.True(success, $"Failed to find successful processing in logs for {functionName}"); + } + + private async Task TestFailedKinesisItems(string functionName) + { + // Create one record that should fail + var errorRecord = new { Id = 999, Name = "Error Record", Status = "error", Timestamp = DateTime.UtcNow }; + var errorData = Encoding.UTF8.GetBytes(Serialize(errorRecord)); + + await _kinesisClient.PutRecordAsync(new PutRecordRequest + { + StreamName = _streamName, + Data = new MemoryStream(errorData), + PartitionKey = "error-partition" + }); + + // Create a success record after + var successRecord = new { Id = 1000, Name = "Success After Error", Status = "success", Timestamp = DateTime.UtcNow }; + var successData = Encoding.UTF8.GetBytes(Serialize(successRecord)); + + await _kinesisClient.PutRecordAsync(new PutRecordRequest + { + StreamName = _streamName, + Data = new MemoryStream(successData), + PartitionKey = "success-partition" + }); + + // Wait for processing + await Task.Delay(TimeSpan.FromSeconds(10)); + + // Check for error handling + var failureLogged = await WaitForSuccessInLogs( + functionName, + "Failed to process record with status error", + TimeSpan.FromMinutes(2)); + + Assert.True(failureLogged, $"Failed to find error handling in logs for {functionName}"); + } + + private async Task TestAotKinesisProcessing(string functionName) + { + // Send a record to test AOT function + var aotRecord = new { Id = 500, Name = "AOT Test Record", Status = "success", Timestamp = DateTime.UtcNow }; + var aotData = Encoding.UTF8.GetBytes(Serialize(aotRecord)); + + await _kinesisClient.PutRecordAsync(new PutRecordRequest + { + StreamName = _streamName, + Data = new MemoryStream(aotData), + PartitionKey = "aot-test-partition" + }); + + // Wait for processing + await Task.Delay(TimeSpan.FromSeconds(10)); + + // Check AOT function logs + var aotSuccess = await WaitForSuccessInLogs( + functionName, + "Processing record:", + TimeSpan.FromMinutes(2)); + + Assert.True(aotSuccess, $"Failed to find AOT function processing in logs for {functionName}"); + } +} \ No newline at end of file diff --git a/libraries/tests/e2e/functions/batchprocessing/Function/test/Function.Tests/SqsTests.cs b/libraries/tests/e2e/functions/batchprocessing/Function/test/Function.Tests/SqsTests.cs new file mode 100644 index 000000000..67109b0f4 --- /dev/null +++ b/libraries/tests/e2e/functions/batchprocessing/Function/test/Function.Tests/SqsTests.cs @@ -0,0 +1,167 @@ +using Amazon; +using Amazon.Lambda; +using Amazon.SQS; +using Amazon.SQS.Model; +using Xunit; +using Xunit.Abstractions; + +namespace Function.Tests; + +public class SqsTests : TestBase +{ + private readonly AmazonSQSClient _sqsClient; + private readonly AmazonLambdaClient _lambdaClient; + private readonly string _standardQueueUrl; + private readonly string _fifoQueueUrl; + + public SqsTests(ITestOutputHelper output) : base(output) + { + _sqsClient = new AmazonSQSClient(); + _lambdaClient = new AmazonLambdaClient(); + _standardQueueUrl = "https://sqs.eu-west-1.amazonaws.com/746792595426/BatchProcessingStandardQueue"; + _fifoQueueUrl = "https://sqs.eu-west-1.amazonaws.com/746792595426/BatchProcessingFifoQueue.fifo"; + } + + [Theory] + [InlineData("E2ETestLambda_X64_NET8_batchprocessing_SQS")] + [InlineData("E2ETestLambda_ARM_NET8_batchprocessing_SQS")] + [InlineData("E2ETestLambda_X64_NET6_batchprocessing_SQS")] + [InlineData("E2ETestLambda_ARM_NET6_batchprocessing_SQS")] + public async Task TestStandardQueueSuccessfulProcessing(string functionName) + { + await TestStandardQueueSuccess(functionName); + } + + [Theory] + [InlineData("E2ETestLambda_X64_NET8_batchprocessing_SQS")] + [InlineData("E2ETestLambda_ARM_NET8_batchprocessing_SQS")] + [InlineData("E2ETestLambda_X64_NET6_batchprocessing_SQS")] + [InlineData("E2ETestLambda_ARM_NET6_batchprocessing_SQS")] + public async Task TestStandardQueueFailedItems(string functionName) + { + await TestStandardQueueFailures(functionName); + } + + [Theory] + [InlineData("E2ETestLambda_X64_NET8_batchprocessing_SQS")] + [InlineData("E2ETestLambda_ARM_NET8_batchprocessing_SQS")] + [InlineData("E2ETestLambda_X64_NET6_batchprocessing_SQS")] + [InlineData("E2ETestLambda_ARM_NET6_batchprocessing_SQS")] + public async Task TestFifoQueueStopOnFirstFailure(string functionName) + { + await TestFifoQueueFailureHandling(functionName); + } + + [Trait("Category", "AOT")] + [Theory] + [InlineData("E2ETestLambda_ARM_AOT_NET8_batchprocessing_SQS")] + [InlineData("E2ETestLambda_X64_AOT_NET8_batchprocessing_SQS")] + public async Task TestAotStandardQueueProcessing(string functionName) + { + await TestStandardQueueSuccess(functionName); + } + + private async Task TestStandardQueueSuccess(string functionName) + { + // Arrange - Send 3 messages to standard queue + for (int i = 1; i <= 3; i++) + { + var message = new { Id = i, Name = $"Test Item {i}", Action = "process" }; + var request = new SendMessageRequest + { + QueueUrl = _standardQueueUrl, + MessageBody = Serialize(message) + }; + + await _sqsClient.SendMessageAsync(request); + } + + // Wait for processing + await Task.Delay(TimeSpan.FromSeconds(10)); + + // Assert - Check logs for processing + var success = await WaitForSuccessInLogs( + functionName, + "Processing message:", + TimeSpan.FromMinutes(2)); + + Assert.True(success, $"Failed to find successful processing in logs for {functionName}"); + } + + private async Task TestStandardQueueFailures(string functionName) + { + // Arrange - Send a message that should fail + var failMessage = new { Id = 999, Name = "Failure Test", Action = "fail" }; + await _sqsClient.SendMessageAsync(new SendMessageRequest + { + QueueUrl = _standardQueueUrl, + MessageBody = Serialize(failMessage) + }); + + // Send another message that should succeed + var successMessage = new { Id = 1000, Name = "Success After Failure", Action = "process" }; + await _sqsClient.SendMessageAsync(new SendMessageRequest + { + QueueUrl = _standardQueueUrl, + MessageBody = Serialize(successMessage) + }); + + // Wait for processing + await Task.Delay(TimeSpan.FromSeconds(10)); + + // Assert - Check for failure handling + var failureLogged = await WaitForSuccessInLogs( + functionName, + "Failed to process message with ID:", + TimeSpan.FromMinutes(2)); + + Assert.True(failureLogged, $"Failed to find error handling in logs for {functionName}"); + } + + private async Task TestFifoQueueFailureHandling(string functionName) + { + // Arrange - Send messages to FIFO queue with a group ID + string messageGroupId = Guid.NewGuid().ToString(); + + // First message should succeed + var message1 = new { Id = 1, Name = "FIFO First Message", Action = "process" }; + await _sqsClient.SendMessageAsync(new SendMessageRequest + { + QueueUrl = _fifoQueueUrl, + MessageBody = Serialize(message1), + MessageGroupId = messageGroupId, + MessageDeduplicationId = Guid.NewGuid().ToString() + }); + + // Second message should fail + var message2 = new { Id = 2, Name = "FIFO Second Message", Action = "fail" }; + await _sqsClient.SendMessageAsync(new SendMessageRequest + { + QueueUrl = _fifoQueueUrl, + MessageBody = Serialize(message2), + MessageGroupId = messageGroupId, + MessageDeduplicationId = Guid.NewGuid().ToString() + }); + + // Third message should be returned as unprocessed since second failed + var message3 = new { Id = 3, Name = "FIFO Third Message", Action = "process" }; + await _sqsClient.SendMessageAsync(new SendMessageRequest + { + QueueUrl = _fifoQueueUrl, + MessageBody = Serialize(message3), + MessageGroupId = messageGroupId, + MessageDeduplicationId = Guid.NewGuid().ToString() + }); + + // Wait for processing + await Task.Delay(TimeSpan.FromSeconds(15)); + + // Look for error and also check that the third message was properly returned + var batchFailure = await WaitForSuccessInLogs( + functionName, + "BatchItemFailures", + TimeSpan.FromMinutes(2)); + + Assert.True(batchFailure, $"Failed to find batch failure handling for FIFO queue in {functionName}"); + } +} \ No newline at end of file diff --git a/libraries/tests/e2e/functions/batchprocessing/Function/test/Function.Tests/TestBase.cs b/libraries/tests/e2e/functions/batchprocessing/Function/test/Function.Tests/TestBase.cs new file mode 100644 index 000000000..3bcc61bc6 --- /dev/null +++ b/libraries/tests/e2e/functions/batchprocessing/Function/test/Function.Tests/TestBase.cs @@ -0,0 +1,79 @@ +using System.Text.Json; +using Amazon; +using Amazon.CloudWatchLogs; +using Amazon.CloudWatchLogs.Model; +using Xunit.Abstractions; +using InvalidOperationException = Amazon.CloudWatchLogs.Model.InvalidOperationException; + +namespace Function.Tests; + +public abstract class TestBase +{ + protected readonly ITestOutputHelper Output; + + protected TestBase(ITestOutputHelper output) + { + Output = output; + } + + protected async Task WaitForSuccessInLogs(string functionName, string messageToFind, TimeSpan timeout) + { + using var logsClient = new AmazonCloudWatchLogsClient(); + var logGroupName = $"/aws/lambda/{functionName}"; + + var startTime = DateTime.UtcNow; + while (DateTime.UtcNow - startTime < timeout) + { + try + { + var response = await logsClient.DescribeLogStreamsAsync(new DescribeLogStreamsRequest + { + LogGroupName = logGroupName, + OrderBy = "LastEventTime", + Descending = true, + Limit = 5 + }); + + foreach (var stream in response.LogStreams) + { + var events = await logsClient.GetLogEventsAsync(new GetLogEventsRequest + { + LogGroupName = logGroupName, + LogStreamName = stream.LogStreamName, + StartTime = DateTime.UtcNow.AddMinutes(-5) + }); + + if (events.Events.Any(e => e.Message.Contains(messageToFind))) + { + return true; + } + } + + await Task.Delay(TimeSpan.FromSeconds(5)); + } + catch (Exception ex) + { + Output.WriteLine($"Error checking logs: {ex.Message}"); + await Task.Delay(TimeSpan.FromSeconds(5)); + } + } + + return false; + } + + protected T Deserialize(string json) + { + return JsonSerializer.Deserialize(json, new JsonSerializerOptions + { + PropertyNameCaseInsensitive = true + }) ?? throw new InvalidOperationException("Failed to deserialize"); + } + + protected string Serialize(T obj) + { + return JsonSerializer.Serialize(obj, new JsonSerializerOptions + { + PropertyNamingPolicy = JsonNamingPolicy.CamelCase + }); + } +} \ No newline at end of file diff --git a/libraries/tests/e2e/infra/Program.cs b/libraries/tests/e2e/infra/Program.cs index d56d83b2b..d5748248a 100644 --- a/libraries/tests/e2e/infra/Program.cs +++ b/libraries/tests/e2e/infra/Program.cs @@ -13,6 +13,8 @@ public static void Main(string[] args) _ = new IdempotencyStack(app, "IdempotencyStack", new IdempotencyStackProps { TableName = "IdempotencyTable" }); + _ = new BatchProcessingStack(app, "BatchProcessingStack", new BatchProcessingStackProps { }); + app.Synth(); } }