Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
checkpoint
  • Loading branch information
bmaidics committed Apr 24, 2024
commit affd86ea20cb93fe1de6662ca5837e81a1ace15e
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ info:
version: 1.0.0
servers:
plain:
host: localhost:7116
host: localhost:7115
protocol: http
protocolVersion: '2.0'
defaultContentType: application/json
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,27 @@

import io.aklivity.zilla.runtime.engine.Configuration;

import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.IntSupplier;

import org.agrona.LangUtil;

public class AsyncapiConfiguration extends Configuration
{
private static final AtomicInteger COMPOSITE_NAMESPACE_COUNTER = new AtomicInteger(0);
public static final LongPropertyDef ASYNCAPI_TARGET_ROUTE_ID;
public static final PropertyDef<IntSupplier> COMPOSITE_NAMESPACE_POSTFIX;
private static final ConfigurationDef ASYNCAPI_CONFIG;

static
{
final ConfigurationDef config = new ConfigurationDef("zilla.binding.asyncapi");
ASYNCAPI_TARGET_ROUTE_ID = config.property("target.route.id", -1L);
COMPOSITE_NAMESPACE_POSTFIX = config.property(IntSupplier.class, "composite.namespace.postfix",
AsyncapiConfiguration::decodeIntSupplier, AsyncapiConfiguration::defaultCompositeNamespacePostfix);
ASYNCAPI_CONFIG = config;
}

Expand All @@ -38,4 +50,44 @@ public long targetRouteId()
{
return ASYNCAPI_TARGET_ROUTE_ID.getAsLong(this);
}

private static IntSupplier decodeIntSupplier(
String fullyQualifiedMethodName)
{
IntSupplier supplier = null;

try
{
MethodType signature = MethodType.methodType(int.class);
String[] parts = fullyQualifiedMethodName.split("::");
Class<?> ownerClass = Class.forName(parts[0]);
String methodName = parts[1];
MethodHandle method = MethodHandles.publicLookup().findStatic(ownerClass, methodName, signature);
supplier = () ->
{
int value = 0;
try
{
value = (int) method.invoke();
}
catch (Throwable ex)
{
LangUtil.rethrowUnchecked(ex);
}

return value;
};
}
catch (Throwable ex)
{
LangUtil.rethrowUnchecked(ex);
}

return supplier;
}

private static int defaultCompositeNamespacePostfix()
{
return COMPOSITE_NAMESPACE_COUNTER.getAndIncrement();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ private void attachServerClientBinding(
.collect(Collectors.toList()));

servers.stream().collect(Collectors.groupingBy(AsyncapiServerView::getPort)).forEach((k, v) ->
namespaceConfigs.computeIfAbsent(k, s -> new AsyncapiNamespaceConfig()).addServersForSpec(v, config, asyncapi));
namespaceConfigs.computeIfAbsent(k, s -> new AsyncapiNamespaceConfig()).addSpecForNamespace(v, config, asyncapi));
}

for (AsyncapiNamespaceConfig namespaceConfig : namespaceConfigs.values())
Expand Down Expand Up @@ -433,7 +433,7 @@ static class AsyncapiNamespaceConfig
configs = new ArrayList<>();
}

private void addServersForSpec(
private void addSpecForNamespace(
List<AsyncapiServerView> servers,
AsyncapiSchemaConfig config,
Asyncapi asyncapi)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.ToLongFunction;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public NamespaceConfig generate(
this.protocol = serverView.getAsyncapiProtocol();

return NamespaceConfig.builder()
.name(String.format("%s/%s", qname, protocol.scheme))
.name(String.format("%s/%s-%d", qname, protocol.scheme))
.inject(n -> this.injectNamespaceMetric(n, !metricRefs.isEmpty()))
.inject(n -> this.injectCatalog(n, namespaceConfig.asyncapis))
.inject(n -> injectTcpServer(n, servers, options, metricRefs))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,20 @@
package io.aklivity.zilla.runtime.binding.asyncapi.internal;

import static io.aklivity.zilla.runtime.binding.asyncapi.internal.AsyncapiConfiguration.ASYNCAPI_TARGET_ROUTE_ID;
import static io.aklivity.zilla.runtime.binding.asyncapi.internal.AsyncapiConfiguration.COMPOSITE_NAMESPACE_POSTFIX;
import static org.junit.Assert.assertEquals;

import org.junit.Test;

public class AsyncapiConfigurationTest
{
public static final String ASYNCAPI_TARGET_ROUTE_ID_NAME = "zilla.binding.asyncapi.target.route.id";
public static final String COMPOSITE_NAMESPACE_POSTFIX_NAME = "zilla.binding.asyncapi.composite.namespace.postfix";

@Test
public void shouldVerifyConstants() throws Exception
{
assertEquals(ASYNCAPI_TARGET_ROUTE_ID.name(), ASYNCAPI_TARGET_ROUTE_ID_NAME);
assertEquals(COMPOSITE_NAMESPACE_POSTFIX.name(), COMPOSITE_NAMESPACE_POSTFIX_NAME);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
connect "zilla://streams/composite0"
option zilla:window 8192
option zilla:transmission "half-duplex"
option zilla:ephemeral "test:composite0/http"
option zilla:ephemeral "test:composite0/http-0"

write zilla:begin.ext ${http:beginEx()
.typeId(zilla:id("http"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
connect "zilla://streams/composite0"
option zilla:window 8192
option zilla:transmission "duplex"
option zilla:ephemeral "test:composite0/mqtt"
option zilla:ephemeral "test:composite0/mqtt-0"

write zilla:begin.ext ${mqtt:beginEx()
.typeId(zilla:id("mqtt"))
Expand All @@ -43,7 +43,7 @@ connect await RECEIVED_SESSION_STATE
"zilla://streams/composite0"
option zilla:window 8192
option zilla:transmission "duplex"
option zilla:ephemeral "test:composite0/mqtt"
option zilla:ephemeral "test:composite0/mqtt-0"

write zilla:begin.ext ${mqtt:beginEx()
.typeId(zilla:id("mqtt"))
Expand Down Expand Up @@ -75,7 +75,7 @@ connect await RECEIVED_SESSION_STATE
"zilla://streams/composite0"
option zilla:window 8192
option zilla:transmission "duplex"
option zilla:ephemeral "test:composite0/mqtt"
option zilla:ephemeral "test:composite0/mqtt-0"

write zilla:begin.ext ${mqtt:beginEx()
.typeId(zilla:id("mqtt"))
Expand Down