forked from dotnet/aspire
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathKafkaBuilderExtensions.cs
More file actions
236 lines (201 loc) · 12.6 KB
/
KafkaBuilderExtensions.cs
File metadata and controls
236 lines (201 loc) · 12.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using Aspire.Hosting.ApplicationModel;
using Aspire.Hosting.Utils;
using Confluent.Kafka;
using HealthChecks.Kafka;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Diagnostics.HealthChecks;
namespace Aspire.Hosting;
/// <summary>
/// Provides extension methods for adding Kafka resources to the application model.
/// </summary>
public static class KafkaBuilderExtensions
{
private const int KafkaBrokerPort = 9092;
private const int KafkaInternalBrokerPort = 9093;
private const int KafkaUIPort = 8080;
private const string Target = "/var/lib/kafka/data";
/// <summary>
/// Adds a Kafka resource to the application. A container is used for local development.
/// </summary>
/// <remarks>
/// This version of the package defaults to the <inheritdoc cref="KafkaContainerImageTags.Tag"/> tag of the <inheritdoc cref="KafkaContainerImageTags.Image"/> container image.
/// </remarks>
/// <param name="builder">The <see cref="IDistributedApplicationBuilder"/>.</param>
/// <param name="name">The name of the resource. This name will be used as the connection string name when referenced in a dependency</param>
/// <param name="port">The host port of Kafka broker.</param>
/// <returns>A reference to the <see cref="IResourceBuilder{KafkaServerResource}"/>.</returns>
public static IResourceBuilder<KafkaServerResource> AddKafka(this IDistributedApplicationBuilder builder, [ResourceName] string name, int? port = null)
{
ArgumentNullException.ThrowIfNull(builder);
ArgumentNullException.ThrowIfNull(name);
var kafka = new KafkaServerResource(name);
string? connectionString = null;
builder.Eventing.Subscribe<ConnectionStringAvailableEvent>(kafka, async (@event, ct) =>
{
connectionString = await kafka.ConnectionStringExpression.GetValueAsync(ct).ConfigureAwait(false);
if (connectionString == null)
{
throw new DistributedApplicationException($"ConnectionStringAvailableEvent was published for the '{kafka.Name}' resource but the connection string was null.");
}
});
var healthCheckKey = $"{name}_check";
// NOTE: We cannot use AddKafka here because it registers the health check as a singleton
// which means if you have multiple Kafka resources the factory callback will end
// up using the connection string of the last Kafka resource that was added. The
// client packages also have to work around this issue.
//
// SEE: https://github.com/Xabaril/AspNetCore.Diagnostics.HealthChecks/issues/2298
var healthCheckRegistration = new HealthCheckRegistration(
healthCheckKey,
sp =>
{
var options = new KafkaHealthCheckOptions();
options.Configuration = new ProducerConfig();
options.Configuration.BootstrapServers = connectionString ?? throw new InvalidOperationException("Connection string is unavailable");
return new KafkaHealthCheck(options);
},
failureStatus: default,
tags: default);
builder.Services.AddHealthChecks().Add(healthCheckRegistration);
return builder.AddResource(kafka)
.WithEndpoint(targetPort: KafkaBrokerPort, port: port, name: KafkaServerResource.PrimaryEndpointName)
.WithEndpoint(targetPort: KafkaInternalBrokerPort, name: KafkaServerResource.InternalEndpointName)
.WithImage(KafkaContainerImageTags.Image, KafkaContainerImageTags.Tag)
.WithImageRegistry(KafkaContainerImageTags.Registry)
.WithEnvironment(context => ConfigureKafkaContainer(context, kafka))
.WithHealthCheck(healthCheckKey);
}
/// <summary>
/// Adds a Kafka UI container to the application.
/// </summary>
/// <remarks>
/// This version of the package defaults to the <inheritdoc cref="KafkaContainerImageTags.KafkaUiTag"/> tag of the <inheritdoc cref="KafkaContainerImageTags.KafkaUiImage"/> container image.
/// </remarks>
/// <param name="builder">The Kafka server resource builder.</param>
/// <param name="configureContainer">Configuration callback for KafkaUI container resource.</param>
/// <param name="containerName">The name of the container (Optional).</param>
/// <returns>A reference to the <see cref="IResourceBuilder{KafkaServerResource}"/>.</returns>
public static IResourceBuilder<KafkaServerResource> WithKafkaUI(this IResourceBuilder<KafkaServerResource> builder, Action<IResourceBuilder<KafkaUIContainerResource>>? configureContainer = null, string? containerName = null)
{
ArgumentNullException.ThrowIfNull(builder);
if (builder.ApplicationBuilder.Resources.OfType<KafkaUIContainerResource>().SingleOrDefault() is { } existingKafkaUIResource)
{
var builderForExistingResource = builder.ApplicationBuilder.CreateResourceBuilder(existingKafkaUIResource);
configureContainer?.Invoke(builderForExistingResource);
return builder;
}
else
{
containerName ??= $"{builder.Resource.Name}-kafka-ui";
var kafkaUi = new KafkaUIContainerResource(containerName);
var kafkaUiBuilder = builder.ApplicationBuilder.AddResource(kafkaUi)
.WithImage(KafkaContainerImageTags.KafkaUiImage, KafkaContainerImageTags.KafkaUiTag)
.WithImageRegistry(KafkaContainerImageTags.Registry)
.WithHttpEndpoint(targetPort: KafkaUIPort)
.ExcludeFromManifest();
builder.ApplicationBuilder.Eventing.Subscribe<AfterEndpointsAllocatedEvent>((e, ct) =>
{
var kafkaResources = builder.ApplicationBuilder.Resources.OfType<KafkaServerResource>();
int i = 0;
foreach (var kafkaResource in kafkaResources)
{
if (kafkaResource.InternalEndpoint.IsAllocated)
{
var endpoint = kafkaResource.InternalEndpoint;
int index = i;
kafkaUiBuilder.WithEnvironment(context => ConfigureKafkaUIContainer(context, endpoint, index));
}
i++;
}
return Task.CompletedTask;
});
configureContainer?.Invoke(kafkaUiBuilder);
return builder;
}
static void ConfigureKafkaUIContainer(EnvironmentCallbackContext context, EndpointReference endpoint, int index)
{
var bootstrapServers = context.ExecutionContext.IsRunMode
// In run mode, Kafka UI assumes Kafka is being accessed over a default Aspire container network and hardcodes the host as the Kafka resource name
// This will need to be refactored once updated service discovery APIs are available
? ReferenceExpression.Create($"{endpoint.Resource.Name}:{endpoint.Property(EndpointProperty.TargetPort)}")
: ReferenceExpression.Create($"{endpoint.Property(EndpointProperty.Host)}:{endpoint.Property(EndpointProperty.Port)}");
context.EnvironmentVariables.Add($"KAFKA_CLUSTERS_{index}_NAME", endpoint.Resource.Name);
context.EnvironmentVariables.Add($"KAFKA_CLUSTERS_{index}_BOOTSTRAPSERVERS", bootstrapServers);
}
}
/// <summary>
/// Configures the host port that the KafkaUI resource is exposed on instead of using randomly assigned port.
/// </summary>
/// <param name="builder">The resource builder for KafkaUI.</param>
/// <param name="port">The port to bind on the host. If <see langword="null"/> is used random port will be assigned.</param>
/// <returns>The resource builder for KafkaUI.</returns>
public static IResourceBuilder<KafkaUIContainerResource> WithHostPort(this IResourceBuilder<KafkaUIContainerResource> builder, int? port)
{
ArgumentNullException.ThrowIfNull(builder);
return builder.WithEndpoint("http", endpoint =>
{
endpoint.Port = port;
});
}
/// <summary>
/// Adds a named volume for the data folder to a Kafka container resource.
/// </summary>
/// <param name="builder">The resource builder.</param>
/// <param name="name">The name of the volume. Defaults to an auto-generated name based on the application and resource names.</param>
/// <param name="isReadOnly">A flag that indicates if this is a read-only volume.</param>
/// <returns>The <see cref="IResourceBuilder{T}"/>.</returns>
public static IResourceBuilder<KafkaServerResource> WithDataVolume(this IResourceBuilder<KafkaServerResource> builder, string? name = null, bool isReadOnly = false)
{
ArgumentNullException.ThrowIfNull(builder);
return builder
.WithEnvironment(ConfigureLogDirs)
.WithVolume(name ?? VolumeNameGenerator.Generate(builder, "data"), Target, isReadOnly);
}
/// <summary>
/// Adds a bind mount for the data folder to a Kafka container resource.
/// </summary>
/// <param name="builder">The resource builder.</param>
/// <param name="source">The source directory on the host to mount into the container.</param>
/// <param name="isReadOnly">A flag that indicates if this is a read-only mount.</param>
/// <returns>The <see cref="IResourceBuilder{T}"/>.</returns>
public static IResourceBuilder<KafkaServerResource> WithDataBindMount(this IResourceBuilder<KafkaServerResource> builder, string source, bool isReadOnly = false)
{
ArgumentNullException.ThrowIfNull(builder);
ArgumentNullException.ThrowIfNull(source);
return builder
.WithEnvironment(ConfigureLogDirs)
.WithBindMount(source, Target, isReadOnly);
}
private static void ConfigureKafkaContainer(EnvironmentCallbackContext context, KafkaServerResource resource)
{
// confluentinc/confluent-local is a docker image that contains a Kafka broker started with KRaft to avoid pulling a separate image for ZooKeeper.
// See https://github.com/confluentinc/kafka-images/blob/master/local/README.md.
// When not explicitly set default configuration is applied.
// See https://github.com/confluentinc/kafka-images/blob/master/local/include/etc/confluent/docker/configureDefaults for more details.
// Define the default listeners + an internal listener for the container to broker communication
context.EnvironmentVariables.Add($"KAFKA_LISTENERS", $"PLAINTEXT://localhost:29092,CONTROLLER://localhost:29093,PLAINTEXT_HOST://0.0.0.0:{KafkaBrokerPort},PLAINTEXT_INTERNAL://0.0.0.0:{KafkaInternalBrokerPort}");
// Defaults default listeners security protocol map + the internal listener to be PLAINTEXT
context.EnvironmentVariables.Add("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT");
// primaryEndpoint is the endpoint that is exposed to the host machine
var primaryEndpoint = resource.PrimaryEndpoint;
// internalEndpoint is the endpoint that is used for communication between containers
var internalEndpoint = resource.InternalEndpoint;
var advertisedListeners = context.ExecutionContext.IsRunMode
// In run mode, PLAINTEXT_INTERNAL assumes kafka is being accessed over a default Aspire container network and hardcodes the resource address
// This will need to be refactored once updated service discovery APIs are available
? ReferenceExpression.Create($"PLAINTEXT://localhost:29092,PLAINTEXT_HOST://localhost:{primaryEndpoint.Property(EndpointProperty.Port)},PLAINTEXT_INTERNAL://{resource.Name}:{internalEndpoint.Property(EndpointProperty.TargetPort)}")
: ReferenceExpression.Create(
$"PLAINTEXT://{primaryEndpoint.Property(EndpointProperty.Host)}:29092,PLAINTEXT_HOST://{primaryEndpoint.Property(EndpointProperty.Host)}:{primaryEndpoint.Property(EndpointProperty.Port)},PLAINTEXT_INTERNAL://{internalEndpoint.Property(EndpointProperty.Host)}:{internalEndpoint.Property(EndpointProperty.Port)}");
context.EnvironmentVariables["KAFKA_ADVERTISED_LISTENERS"] = advertisedListeners;
}
/// <summary>
/// Only need to call this if we want to persistent kafka data
/// </summary>
/// <param name="context"></param>
private static void ConfigureLogDirs(EnvironmentCallbackContext context)
{
context.EnvironmentVariables["KAFKA_LOG_DIRS"] = Target;
}
}