diff --git a/tests/Aspire.Hosting.Kafka.Tests/KafkaFunctionalTests.cs b/tests/Aspire.Hosting.Kafka.Tests/KafkaFunctionalTests.cs index d032142b298..907a7472423 100644 --- a/tests/Aspire.Hosting.Kafka.Tests/KafkaFunctionalTests.cs +++ b/tests/Aspire.Hosting.Kafka.Tests/KafkaFunctionalTests.cs @@ -8,6 +8,7 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; +using Polly; using Xunit; using Xunit.Abstractions; @@ -48,11 +49,18 @@ public async Task VerifyKafkaResource() await host.StartAsync(); var topic = "test-topic"; - var producer = host.Services.GetRequiredService>(); - for (var i = 0; i < 10; i++) + + var pipeline = new ResiliencePipelineBuilder() + .AddRetry(new() { MaxRetryAttempts = 10, Delay = TimeSpan.FromSeconds(1), ShouldHandle = new PredicateBuilder().Handle>() }) + .Build(); + await pipeline.ExecuteAsync(async token => { - await producer.ProduceAsync(topic, new Message { Key = "test-key", Value = $"test-value{i}" }); - } + var producer = host.Services.GetRequiredService>(); + for (var i = 0; i < 10; i++) + { + await producer.ProduceAsync(topic, new Message { Key = "test-key", Value = $"test-value{i}" }, token); + } + }, cts.Token); var consumer = host.Services.GetRequiredService>(); consumer.Subscribe(topic); @@ -115,11 +123,17 @@ public async Task WithDataShouldPersistStateBetweenUsages(bool useVolume) { await host.StartAsync(); - var producer = host.Services.GetRequiredService>(); - for (var i = 0; i < 10; i++) + var pipeline = new ResiliencePipelineBuilder() + .AddRetry(new() { MaxRetryAttempts = 10, Delay = TimeSpan.FromSeconds(1), ShouldHandle = new PredicateBuilder().Handle>() }) + .Build(); + await pipeline.ExecuteAsync(async token => { - await producer.ProduceAsync(topic, new Message { Key = "test-key", Value = $"test-value{i}" }); - } + var producer = host.Services.GetRequiredService>(); + for (var i = 0; i < 10; i++) + { + await producer.ProduceAsync(topic, new Message { Key = "test-key", Value = $"test-value{i}" }, token); + } + }, cts.Token); } } finally @@ -163,15 +177,21 @@ public async Task WithDataShouldPersistStateBetweenUsages(bool useVolume) { await host.StartAsync(); - var consumer = host.Services.GetRequiredService>(); - consumer.Subscribe(topic); - for (var i = 0; i < 10; i++) + var pipeline = new ResiliencePipelineBuilder() + .AddRetry(new() { MaxRetryAttempts = 10, Delay = TimeSpan.FromSeconds(1), ShouldHandle = new PredicateBuilder().Handle() }) + .Build(); + pipeline.Execute(() => { - var result = consumer.Consume(cts.Token); - - Assert.Equal($"test-key", result.Message.Key); - Assert.Equal($"test-value{i}", result.Message.Value); - } + var consumer = host.Services.GetRequiredService>(); + consumer.Subscribe(topic); + for (var i = 0; i < 10; i++) + { + var result = consumer.Consume(cts.Token); + + Assert.Equal($"test-key", result.Message.Key); + Assert.Equal($"test-value{i}", result.Message.Value); + } + }); } } finally