Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import io.aklivity.zilla.runtime.binding.asyncapi.internal.types.String16FW;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.types.String8FW;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.types.stream.HttpBeginExFW;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiServerView;
import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler;
import io.aklivity.zilla.runtime.engine.config.BindingConfig;
import io.aklivity.zilla.runtime.engine.config.KindConfig;
Expand Down Expand Up @@ -222,39 +223,55 @@ private void attachProxyBinding(
(existingValue, newValue) -> existingValue,
Object2ObjectHashMap::new));

namespaceGenerator.init(binding);
final NamespaceConfig composite = namespaceGenerator.generateProxy(binding, asyncapis, schemaIdsByApiId::get);
composite.readURL = binding.readURL;
attach.accept(composite);
for (AsyncapiSchemaConfig config : configs)
{
Asyncapi asyncapi = config.asyncapi;
updateNamespace(config, composite, asyncapi);
}
updateNamespace(configs, composite, new ArrayList<>(asyncapis.values()));
}

private void attachServerClientBinding(
BindingConfig binding,
List<AsyncapiSchemaConfig> configs)
{
final Map<Integer, AsyncapiNamespaceConfig> namespaceConfigs = new HashMap<>();
for (AsyncapiSchemaConfig config : configs)
{
namespaceGenerator.init(binding);
Asyncapi asyncapi = config.asyncapi;
final NamespaceConfig composite = namespaceGenerator.generate(binding, asyncapi);
final List<AsyncapiServerView> servers =
namespaceGenerator.filterAsyncapiServers(asyncapi, options.asyncapis.stream()
.filter(a -> a.apiLabel.equals(config.apiLabel))
.flatMap(a -> a.servers.stream())
.collect(Collectors.toList()));

servers.stream().collect(Collectors.groupingBy(AsyncapiServerView::getPort)).forEach((k, v) ->
namespaceConfigs.computeIfAbsent(k, s -> new AsyncapiNamespaceConfig()).addSpecForNamespace(v, config, asyncapi));
}

for (AsyncapiNamespaceConfig namespaceConfig : namespaceConfigs.values())
{
namespaceConfig.servers.forEach(s -> s.setAsyncapiProtocol(
namespaceGenerator.resolveProtocol(s.protocol(), options, namespaceConfig.asyncapis, namespaceConfig.servers)));
final NamespaceConfig composite = namespaceGenerator.generate(binding, namespaceConfig);
composite.readURL = binding.readURL;
attach.accept(composite);
updateNamespace(config, composite, asyncapi);
updateNamespace(namespaceConfig.configs, composite, namespaceConfig.asyncapis);
}
}

private void updateNamespace(
AsyncapiSchemaConfig config,
List<AsyncapiSchemaConfig> configs,
NamespaceConfig composite,
Asyncapi asyncapi)
List<Asyncapi> asyncapis)
{
composites.put(config.schemaId, composite);
schemaIdsByApiId.put(config.apiLabel, config.schemaId);
extractChannels(asyncapi);
extractOperations(asyncapi);
configs.forEach(c ->
{
composites.put(c.schemaId, composite);
schemaIdsByApiId.put(c.apiLabel, c.schemaId);
});
asyncapis.forEach(this::extractChannels);
asyncapis.forEach(this::extractOperations);
}

private void extractNamespace(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@
import static java.util.Collections.emptyList;

import java.util.List;
import java.util.stream.Collectors;

import io.aklivity.zilla.runtime.binding.asyncapi.config.AsyncapiOptionsConfig;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.Asyncapi;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiServerView;
import io.aklivity.zilla.runtime.engine.config.BindingConfig;
import io.aklivity.zilla.runtime.engine.config.MetricRefConfig;
Expand All @@ -32,33 +30,24 @@ public class AsyncapiClientNamespaceGenerator extends AsyncapiNamespaceGenerator
{
public NamespaceConfig generate(
BindingConfig binding,
Asyncapi asyncapi)
AsyncapiNamespaceConfig namespaceConfig)
{
List<AsyncapiServerView> servers = namespaceConfig.servers;
AsyncapiOptionsConfig options = binding.options != null ? (AsyncapiOptionsConfig) binding.options : EMPTY_OPTION;
final List<MetricRefConfig> metricRefs = binding.telemetryRef != null ?
binding.telemetryRef.metricRefs : emptyList();

this.asyncapi = asyncapi;
this.qname = binding.qname;
this.namespace = binding.namespace;
this.qvault = binding.qvault;
this.vault = binding.vault;
final List<AsyncapiServerView> servers =
filterAsyncapiServers(asyncapi.servers, options.asyncapis.stream()
.flatMap(a -> a.servers.stream())
.collect(Collectors.toList()));
servers.forEach(s -> s.setAsyncapiProtocol(resolveProtocol(s.protocol(), options, servers)));

//TODO: keep it until we support different protocols on the same composite binding
AsyncapiServerView serverView = servers.get(0);
this.protocol = serverView.getAsyncapiProtocol();
int[] compositeSecurePorts = resolvePorts(servers, true);
this.isTlsEnabled = compositeSecurePorts.length > 0;

final String namespace = String.join("+", namespaceConfig.asyncapiLabels);
return NamespaceConfig.builder()
.name(String.format("%s.%s", qname, "$composite"))
.name(String.format("%s.%s-%s", qname, "$composite", namespace))
.inject(n -> this.injectNamespaceMetric(n, !metricRefs.isEmpty()))
.inject(n -> this.injectCatalog(n, asyncapi))
.inject(n -> this.injectCatalog(n, namespaceConfig.asyncapis))
.inject(n -> protocol.injectProtocolClientCache(n, metricRefs))
.binding()
.name(String.format("%s_client0", protocol.scheme))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/
package io.aklivity.zilla.runtime.binding.asyncapi.internal.config;

import static io.aklivity.zilla.runtime.binding.asyncapi.internal.config.AsyncapiNamespaceGenerator.APPLICATION_JSON;
import static io.aklivity.zilla.runtime.binding.http.config.HttpPolicyConfig.CROSS_ORIGIN;

import java.util.List;
Expand All @@ -26,7 +27,6 @@
import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiParameter;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiServer;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiChannelView;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiMessageView;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiServerView;
import io.aklivity.zilla.runtime.binding.http.config.HttpAuthorizationConfig;
import io.aklivity.zilla.runtime.binding.http.config.HttpConditionConfig;
Expand All @@ -35,7 +35,6 @@
import io.aklivity.zilla.runtime.binding.http.config.HttpRequestConfig.Method;
import io.aklivity.zilla.runtime.binding.http.config.HttpRequestConfigBuilder;
import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder;
import io.aklivity.zilla.runtime.engine.config.CatalogedConfigBuilder;
import io.aklivity.zilla.runtime.engine.config.GuardedConfigBuilder;
import io.aklivity.zilla.runtime.engine.config.ModelConfig;
import io.aklivity.zilla.runtime.engine.config.RouteConfigBuilder;
Expand All @@ -60,11 +59,11 @@ public class AsyncapiHttpProtocol extends AsyncapiProtocol

protected AsyncapiHttpProtocol(
String qname,
Asyncapi asyncApi,
List<Asyncapi> asyncapis,
AsyncapiOptionsConfig options,
String protocol)
{
super(qname, asyncApi, protocol, SCHEME);
super(qname, asyncapis, protocol, SCHEME);
this.securitySchemes = resolveSecuritySchemes();
this.isJwtEnabled = !securitySchemes.isEmpty();

Expand All @@ -91,26 +90,27 @@ public <C> BindingConfigBuilder<C> injectProtocolServerOptions(
public <C> BindingConfigBuilder<C> injectProtocolServerRoutes(
BindingConfigBuilder<C> binding)
{
for (Map.Entry<String, AsyncapiServer> entry : asyncApi.servers.entrySet())
for (Asyncapi asyncapi : asyncapis)
{
AsyncapiServerView server = AsyncapiServerView.of(entry.getValue());
for (String name : asyncApi.operations.keySet())
for (Map.Entry<String, AsyncapiServer> entry : asyncapi.servers.entrySet())
{
AsyncapiOperation operation = asyncApi.operations.get(name);
AsyncapiChannelView channel = AsyncapiChannelView.of(asyncApi.channels, operation.channel);
String path = channel.address().replaceAll("\\{[^}]+\\}", "*");
String method = operation.bindings.get("http").method;
binding
.route()
AsyncapiServerView server = AsyncapiServerView.of(entry.getValue());
for (String name : asyncapi.operations.keySet())
{
AsyncapiOperation operation = asyncapi.operations.get(name);
AsyncapiChannelView channel = AsyncapiChannelView.of(asyncapi.channels, operation.channel);
String path = channel.address().replaceAll("\\{[^}]+\\}", "*");
String method = operation.bindings.get("http").method;
binding
.route()
.exit(qname)
.when(HttpConditionConfig::builder)
.header(":scheme", server.scheme())
.header(":authority", server.authority())
.header(":path", path)
.header(":method", method)
.build()
.inject(route -> injectHttpServerRouteGuarded(route, server))
.build();
.build();
}
}
}
return binding;
Expand All @@ -135,64 +135,51 @@ private <C> HttpOptionsConfigBuilder<C> injectHttpServerOptions(
private <C> HttpOptionsConfigBuilder<C> injectHttpServerRequests(
HttpOptionsConfigBuilder<C> options)
{
for (String name : asyncApi.operations.keySet())
for (Asyncapi asyncapi : asyncapis)
{
AsyncapiOperation operation = asyncApi.operations.get(name);
AsyncapiChannelView channel = AsyncapiChannelView.of(asyncApi.channels, operation.channel);
String path = channel.address();
Method method = Method.valueOf(operation.bindings.get("http").method);
if (channel.messages() != null && !channel.messages().isEmpty() ||
channel.parameters() != null && !channel.parameters().isEmpty())
for (String name : asyncapi.operations.keySet())
{
options
.request()
.path(path)
.method(method)
.inject(request -> injectContent(request, channel.messages()))
.inject(request -> injectPathParams(request, channel.parameters()))
AsyncapiOperation operation = asyncapi.operations.get(name);
AsyncapiChannelView channel = AsyncapiChannelView.of(asyncapi.channels, operation.channel);
String path = channel.address();
Method method = Method.valueOf(operation.bindings.get("http").method);
if (channel.messages() != null && !channel.messages().isEmpty() ||
channel.parameters() != null && !channel.parameters().isEmpty())
{
options
.request()
.path(path)
.method(method)
.inject(request -> injectContent(request, asyncapi, channel.messages()))
.inject(request -> injectPathParams(request, channel.parameters()))
.build();
}
}
}
return options;
}

private <C> HttpRequestConfigBuilder<C> injectContent(
HttpRequestConfigBuilder<C> request,
Asyncapi asyncapi,
Map<String, AsyncapiMessage> messages)
{
if (messages != null)
{
if (hasJsonContentType())
if (hasJsonContentType(asyncapi))
{
request.
content(JsonModelConfig::builder)
.catalog()
.name(INLINE_CATALOG_NAME)
.inject(catalog -> injectSchemas(catalog, messages))
.build()
.build();
.catalog()
.name(INLINE_CATALOG_NAME)
.inject(cataloged -> injectJsonSchemas(cataloged, asyncapi, messages, APPLICATION_JSON))
.build()
.build();
}
}
return request;
}

private <C> CatalogedConfigBuilder<C> injectSchemas(
CatalogedConfigBuilder<C> catalog,
Map<String, AsyncapiMessage> messages)
{
for (String name : messages.keySet())
{
AsyncapiMessageView message = AsyncapiMessageView.of(asyncApi.components.messages, messages.get(name));
String subject = message.refKey() != null ? message.refKey() : name;
catalog
.schema()
.subject(subject)
.build()
.build();
}
return catalog;
}

private <C> HttpRequestConfigBuilder<C> injectPathParams(
HttpRequestConfigBuilder<C> request,
Map<String, AsyncapiParameter> parameters)
Expand Down
Loading