Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public abstract class AsyncapiProtocol
{
protected static final String INLINE_CATALOG_NAME = "catalog0";
protected static final Pattern JSON_CONTENT_TYPE = Pattern.compile("^application/(?:.+\\+)?json$");
private static final String VERSION_LATEST = "latest";
protected static final String VERSION_LATEST = "latest";

protected final Matcher jsonContentType = JSON_CONTENT_TYPE.matcher("");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiOperation;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiChannelView;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiMessageView;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiSchemaView;
import io.aklivity.zilla.runtime.binding.kafka.config.KafkaOptionsConfig;
import io.aklivity.zilla.runtime.binding.kafka.config.KafkaOptionsConfigBuilder;
import io.aklivity.zilla.runtime.binding.kafka.config.KafkaSaslConfig;
Expand Down Expand Up @@ -194,10 +195,12 @@ private <C> CatalogedConfigBuilder<C> injectSchemas(
for (String name : messages.keySet())
{
AsyncapiMessageView message = AsyncapiMessageView.of(asyncApi.components.messages, messages.get(name));
String subject = message.refKey() != null ? message.refKey() : name;
AsyncapiSchemaView payload = AsyncapiSchemaView.of(asyncApi.components.schemas, message.payload());
String subject = payload.refKey() != null ? payload.refKey() : name;
catalog
.schema()
.subject(subject)
.version(VERSION_LATEST)
.build()
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public class AsyncapiMessage
{
public AsyncapiSchema headers;
public String contentType;
public AsyncapiSchema payload;

@JsonbProperty("$ref")
public String ref;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ public String contentType()
return message.contentType;
}

public AsyncapiSchema payload()
{
return message.payload;
}

public static AsyncapiMessageView of(
Map<String, AsyncapiMessage> messages,
AsyncapiMessage asyncapiMessage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ public String getType()
return schema.type;
}

public String refKey()
{
return key;
}

public AsyncapiSchemaView getItems()
{
return schema.items == null ? null : AsyncapiSchemaView.of(schemas, schema.items);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import static java.util.stream.Collectors.toList;

import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.Asyncapi;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiOperation;
Expand All @@ -26,19 +29,29 @@
import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithConfig;
import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithConfigBuilder;
import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithFetchConfig;
import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithFetchConfigBuilder;
import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithFetchMergeConfig;
import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithProduceConfig;
import io.aklivity.zilla.runtime.binding.openapi.asyncapi.config.OpenapiAsyncapiOptionsConfig;
import io.aklivity.zilla.runtime.binding.openapi.asyncapi.config.OpenapiAsyncapiSpecConfig;
import io.aklivity.zilla.runtime.binding.openapi.asyncapi.internal.OpenapiAsyncapiBinding;
import io.aklivity.zilla.runtime.binding.openapi.internal.model.Openapi;
import io.aklivity.zilla.runtime.binding.openapi.internal.view.OpenApiPathView;
import io.aklivity.zilla.runtime.binding.openapi.internal.model.OpenapiOperation;
import io.aklivity.zilla.runtime.binding.openapi.internal.model.OpenapiResponse;
import io.aklivity.zilla.runtime.binding.openapi.internal.model.OpenapiResponseByContentType;
import io.aklivity.zilla.runtime.binding.openapi.internal.view.OpenapiPathView;
import io.aklivity.zilla.runtime.binding.openapi.internal.view.OpenapiSchemaView;
import io.aklivity.zilla.runtime.engine.config.BindingConfig;
import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder;
import io.aklivity.zilla.runtime.engine.config.CompositeBindingAdapterSpi;
import io.aklivity.zilla.runtime.engine.config.RouteConfigBuilder;

public final class OpenapiAsyncCompositeBindingAdapter implements CompositeBindingAdapterSpi
{
private static final Pattern JSON_CONTENT_TYPE = Pattern.compile("^application/(?:.+\\+)?json$");

private final Matcher jsonContentType = JSON_CONTENT_TYPE.matcher("");

@Override
public String type()
{
Expand Down Expand Up @@ -89,20 +102,22 @@ private <C> BindingConfigBuilder<C> injectHttpKafkaRoutes(

for (String item : openapi.paths.keySet())
{
OpenApiPathView path = OpenApiPathView.of(openapi.paths.get(item));
OpenapiPathView path = OpenapiPathView.of(openapi.paths.get(item));
for (String method : path.methods().keySet())
{
final String operationId = condition.operationId != null ?
condition.operationId : path.methods().get(method).operationId;

final AsyncapiOperation operation = asyncapi.operations.entrySet().stream()
final OpenapiOperation openapiOperation = path.methods().get(method);

final AsyncapiOperation asyncapiOperation = asyncapi.operations.entrySet().stream()
.filter(f -> f.getKey().equals(operationId))
.map(v -> v.getValue())
.findFirst()
.get();

final AsyncapiChannelView channel = AsyncapiChannelView
.of(asyncapi.channels, operation.channel);
.of(asyncapi.channels, asyncapiOperation.channel);

binding
.route()
Expand All @@ -111,7 +126,8 @@ private <C> BindingConfigBuilder<C> injectHttpKafkaRoutes(
.method(method)
.path(item)
.build()
.inject(r -> injectHttpKafkaRouteWith(r, operation.action, channel.address()))
.inject(r -> injectHttpKafkaRouteWith(r, openapi, openapiOperation,
asyncapiOperation.action, channel.address()))
.build();
}
}
Expand All @@ -123,6 +139,8 @@ private <C> BindingConfigBuilder<C> injectHttpKafkaRoutes(

private <C> RouteConfigBuilder<C> injectHttpKafkaRouteWith(
RouteConfigBuilder<C> route,
Openapi openapi,
OpenapiOperation operation,
String action,
String address)
{
Expand All @@ -133,6 +151,7 @@ private <C> RouteConfigBuilder<C> injectHttpKafkaRouteWith(
case "receive":
newWith.fetch(HttpKafkaWithFetchConfig.builder()
.topic(address)
.inject(with -> this.injectHttpKafkaRouteFetchWith(with, openapi, operation))
.build());
break;
case "send":
Expand All @@ -147,4 +166,46 @@ private <C> RouteConfigBuilder<C> injectHttpKafkaRouteWith(

return route;
}

private <C> HttpKafkaWithFetchConfigBuilder<C> injectHttpKafkaRouteFetchWith(
HttpKafkaWithFetchConfigBuilder<C> fetch,
Openapi openapi,
OpenapiOperation operation)
{
merge:
for (Map.Entry<String, OpenapiResponseByContentType> response : operation.responses.entrySet())
{
OpenapiSchemaView schema = resolveSchemaForJsonContentType(response.getValue().content, openapi);

if ("array".equals(schema.getType()))
{
fetch.merged(HttpKafkaWithFetchMergeConfig.builder()
.contentType("application/json")
.build());
break merge;
}

}
return fetch;
}

private OpenapiSchemaView resolveSchemaForJsonContentType(
Map<String, OpenapiResponse> content,
Openapi openApi)
{
OpenapiResponse response = null;
if (content != null)
{
for (String contentType : content.keySet())
{
if (jsonContentType.reset(contentType).matches())
{
response = content.get(contentType);
break;
}
}
}

return response == null ? null : OpenapiSchemaView.of(openApi.components.schemas, response.schema);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,16 @@
import io.aklivity.zilla.runtime.binding.openapi.config.OpenapiConfig;
import io.aklivity.zilla.runtime.binding.openapi.config.OpenapiOptionsConfig;
import io.aklivity.zilla.runtime.binding.openapi.internal.OpenapiBinding;
import io.aklivity.zilla.runtime.binding.openapi.internal.model.OpenApiHeader;
import io.aklivity.zilla.runtime.binding.openapi.internal.model.OpenApiResponse;
import io.aklivity.zilla.runtime.binding.openapi.internal.model.OpenApiServer;
import io.aklivity.zilla.runtime.binding.openapi.internal.model.Openapi;
import io.aklivity.zilla.runtime.binding.openapi.internal.model.OpenapiHeader;
import io.aklivity.zilla.runtime.binding.openapi.internal.model.OpenapiResponse;
import io.aklivity.zilla.runtime.binding.openapi.internal.model.OpenapiResponseByContentType;
import io.aklivity.zilla.runtime.binding.openapi.internal.view.OpenApiOperationView;
import io.aklivity.zilla.runtime.binding.openapi.internal.view.OpenApiOperationsView;
import io.aklivity.zilla.runtime.binding.openapi.internal.view.OpenApiPathView;
import io.aklivity.zilla.runtime.binding.openapi.internal.view.OpenApiSchemaView;
import io.aklivity.zilla.runtime.binding.openapi.internal.view.OpenApiServerView;
import io.aklivity.zilla.runtime.binding.openapi.internal.model.OpenapiServer;
import io.aklivity.zilla.runtime.binding.openapi.internal.view.OpenapiOperationView;
import io.aklivity.zilla.runtime.binding.openapi.internal.view.OpenapiOperationsView;
import io.aklivity.zilla.runtime.binding.openapi.internal.view.OpenapiPathView;
import io.aklivity.zilla.runtime.binding.openapi.internal.view.OpenapiSchemaView;
import io.aklivity.zilla.runtime.binding.openapi.internal.view.OpenapiServerView;
import io.aklivity.zilla.runtime.binding.tls.config.TlsOptionsConfig;
import io.aklivity.zilla.runtime.engine.config.BindingConfig;
import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder;
Expand Down Expand Up @@ -119,9 +119,9 @@ private URI findFirstServerUrlWithScheme(
{
requireNonNull(scheme);
URI result = null;
for (OpenApiServer item : openApi.servers)
for (OpenapiServer item : openApi.servers)
{
OpenApiServerView server = OpenApiServerView.of(item);
OpenapiServerView server = OpenapiServerView.of(item);
if (scheme.equals(server.url().getScheme()))
{
result = server.url();
Expand All @@ -135,7 +135,7 @@ private <C> BindingConfigBuilder<C> injectHttpClientOptions(
BindingConfigBuilder<C> binding,
Openapi openApi)
{
OpenApiOperationsView operations = OpenApiOperationsView.of(openApi.paths);
OpenapiOperationsView operations = OpenapiOperationsView.of(openApi.paths);
if (operations.hasResponses())
{
binding.
Expand All @@ -147,16 +147,16 @@ private <C> BindingConfigBuilder<C> injectHttpClientOptions(
}

private <C> HttpOptionsConfigBuilder<C> injectHttpClientRequests(
OpenApiOperationsView operations,
OpenapiOperationsView operations,
HttpOptionsConfigBuilder<C> options,
Openapi openApi)
{
for (String pathName : openApi.paths.keySet())
{
OpenApiPathView path = OpenApiPathView.of(openApi.paths.get(pathName));
OpenapiPathView path = OpenapiPathView.of(openApi.paths.get(pathName));
for (String methodName : path.methods().keySet())
{
OpenApiOperationView operation = operations.operation(pathName, methodName);
OpenapiOperationView operation = operations.operation(pathName, methodName);
if (operation.hasResponses())
{
options
Expand All @@ -174,7 +174,7 @@ private <C> HttpOptionsConfigBuilder<C> injectHttpClientRequests(

private <C> HttpRequestConfigBuilder<C> injectResponses(
HttpRequestConfigBuilder<C> request,
OpenApiOperationView operation,
OpenapiOperationView operation,
Openapi openApi)
{
if (operation != null && operation.responsesByStatus() != null)
Expand All @@ -183,11 +183,11 @@ private <C> HttpRequestConfigBuilder<C> injectResponses(
{
String status = responses0.getKey();
OpenapiResponseByContentType responses1 = responses0.getValue();
if (!(OpenApiOperationView.DEFAULT.equals(status)) && responses1.content != null)
if (!(OpenapiOperationView.DEFAULT.equals(status)) && responses1.content != null)
{
for (Map.Entry<String, OpenApiResponse> response2 : responses1.content.entrySet())
for (Map.Entry<String, OpenapiResponse> response2 : responses1.content.entrySet())
{
OpenApiSchemaView schema = OpenApiSchemaView.of(openApi.components.schemas, response2.getValue().schema);
OpenapiSchemaView schema = OpenapiSchemaView.of(openApi.components.schemas, response2.getValue().schema);
request
.response()
.status(Integer.parseInt(status))
Expand Down Expand Up @@ -215,7 +215,7 @@ private <C> HttpResponseConfigBuilder<C> injectResponseHeaders(
{
if (responses.headers != null && !responses.headers.isEmpty())
{
for (Map.Entry<String, OpenApiHeader> header : responses.headers.entrySet())
for (Map.Entry<String, OpenapiHeader> header : responses.headers.entrySet())
{
String name = header.getKey();
ModelConfig model = models.get(header.getValue().schema.type);
Expand Down
Loading