From d6c8593ca7d764c500cab749058c83466d2be8fb Mon Sep 17 00:00:00 2001 From: John Fallows Date: Wed, 10 Apr 2024 22:33:43 -0700 Subject: [PATCH 1/2] Use per worker registration for composite namespaces --- .../config/AsyncapiBindingConfig.java | 16 ++- .../stream/AsyncapiClientFactory.java | 8 +- .../internal/stream/AsyncapiProxyFactory.java | 8 +- .../stream/AsyncapiServerFactory.java | 8 +- .../config/OpenapiAsyncapiBindingConfig.java | 13 +- .../streams/OpenapiAsyncapiProxyFactory.java | 8 +- .../internal/config/OpenapiBindingConfig.java | 10 +- .../streams/OpenapiClientFactory.java | 9 +- .../streams/OpenapiServerFactory.java | 8 +- .../internal/airline/ZillaDumpCommand.java | 4 +- .../aklivity/zilla/runtime/engine/Engine.java | 11 +- .../zilla/runtime/engine/EngineContext.java | 6 + .../runtime/engine/config/BindingConfig.java | 18 +-- .../engine/config/BindingConfigBuilder.java | 38 +----- .../config/CompositeBindingAdapterSpi.java | 24 ---- .../engine/config/NamespaceConfig.java | 4 - .../config/BindingConfigsAdapter.java | 41 ++---- .../internal/layouts/BindingsLayout.java | 25 ++-- .../internal/registry/BindingRegistry.java | 5 + .../internal/registry/EngineManager.java | 127 +++--------------- .../internal/registry/EngineRegistry.java | 53 ++++---- .../internal/registry/EngineWorker.java | 92 ++++++++++--- .../internal/registry/NamespaceRegistry.java | 6 + .../engine/reader/BindingsLayoutReader.java | 10 +- .../config/BindingConfigsAdapterTest.java | 2 - .../reader/BindingsLayoutReaderTest.java | 24 +++- .../TestCompositeBindingAdapterSpi.java | 52 ------- ...e.engine.config.CompositeBindingAdapterSpi | 1 - 28 files changed, 276 insertions(+), 355 deletions(-) delete mode 100644 runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/config/CompositeBindingAdapterSpi.java delete mode 100644 runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/config/TestCompositeBindingAdapterSpi.java delete mode 100644 runtime/engine/src/test/resources/META-INF/services/io.aklivity.zilla.runtime.engine.config.CompositeBindingAdapterSpi diff --git a/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiBindingConfig.java b/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiBindingConfig.java index 27c92e5405..9f026a3355 100644 --- a/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiBindingConfig.java +++ b/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiBindingConfig.java @@ -71,6 +71,7 @@ public final class AsyncapiBindingConfig private final Map operationIds; private final LongFunction supplyCatalog; private final ToLongFunction resolveId; + private final Consumer attach; private final Consumer detach; private final long overrideRouteId; private final HttpHeaderHelper helper; @@ -80,6 +81,8 @@ public AsyncapiBindingConfig( BindingConfig binding, AsyncapiNamespaceGenerator namespaceGenerator, LongFunction supplyCatalog, + Consumer attachComposite, + Consumer detachComposite, long overrideRouteId) { this.id = binding.id; @@ -99,7 +102,8 @@ public AsyncapiBindingConfig( this.operationIds = new TreeMap<>(CharSequence::compare); this.helper = new HttpHeaderHelper(); this.parser = new AsyncapiParser(); - this.detach = binding.detach; + this.attach = attachComposite; + this.detach = detachComposite; this.routes = binding.routes.stream().map(r -> new AsyncapiRouteConfig(r, schemaIdsByApiId::get)).collect(toList()); } @@ -218,9 +222,9 @@ private void attachProxyBinding( (existingValue, newValue) -> existingValue, Object2ObjectHashMap::new)); - final NamespaceConfig composite = binding.attach.apply(namespaceGenerator.generateProxy(binding, asyncapis, - schemaIdsByApiId::get)); - + final NamespaceConfig composite = namespaceGenerator.generateProxy(binding, asyncapis, schemaIdsByApiId::get); + composite.readURL = binding.readURL; + attach.accept(composite); for (AsyncapiSchemaConfig config : configs) { Asyncapi asyncapi = config.asyncapi; @@ -235,7 +239,9 @@ private void attachServerClientBinding( for (AsyncapiSchemaConfig config : configs) { Asyncapi asyncapi = config.asyncapi; - final NamespaceConfig composite = binding.attach.apply(namespaceGenerator.generate(binding, asyncapi)); + final NamespaceConfig composite = namespaceGenerator.generate(binding, asyncapi); + composite.readURL = binding.readURL; + attach.accept(composite); updateNamespace(config, composite, asyncapi); } } diff --git a/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/stream/AsyncapiClientFactory.java b/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/stream/AsyncapiClientFactory.java index 7f72238433..e8f848b4cc 100644 --- a/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/stream/AsyncapiClientFactory.java +++ b/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/stream/AsyncapiClientFactory.java @@ -14,6 +14,7 @@ */ package io.aklivity.zilla.runtime.binding.asyncapi.internal.stream; +import java.util.function.Consumer; import java.util.function.LongFunction; import java.util.function.LongSupplier; import java.util.function.LongUnaryOperator; @@ -43,6 +44,7 @@ import io.aklivity.zilla.runtime.engine.buffer.BufferPool; import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler; import io.aklivity.zilla.runtime.engine.config.BindingConfig; +import io.aklivity.zilla.runtime.engine.config.NamespaceConfig; public final class AsyncapiClientFactory implements AsyncapiStreamFactory { @@ -77,6 +79,8 @@ public final class AsyncapiClientFactory implements AsyncapiStreamFactory private final LongUnaryOperator supplyReplyId; private final LongSupplier supplyTraceId; private final LongFunction supplyCatalog; + private final Consumer attachComposite; + private final Consumer detachComposite; private final Long2ObjectHashMap bindings; private final int asyncapiTypeId; private final int mqttTypeId; @@ -98,6 +102,8 @@ public AsyncapiClientFactory( this.supplyReplyId = context::supplyReplyId; this.supplyTraceId = context::supplyTraceId; this.supplyCatalog = context::supplyCatalog; + this.attachComposite = context::attachComposite; + this.detachComposite = context::detachComposite; this.bindings = new Long2ObjectHashMap<>(); this.asyncapiTypeId = context.supplyTypeId(AsyncapiBinding.NAME); this.mqttTypeId = context.supplyTypeId(MQTT_TYPE_NAME); @@ -120,7 +126,7 @@ public void attach( BindingConfig binding) { AsyncapiBindingConfig asyncapiBinding = new AsyncapiBindingConfig(binding, namespaceGenerator, supplyCatalog, - config.targetRouteId()); + attachComposite, detachComposite, config.targetRouteId()); bindings.put(binding.id, asyncapiBinding); asyncapiBinding.attach(binding); diff --git a/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/stream/AsyncapiProxyFactory.java b/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/stream/AsyncapiProxyFactory.java index b0d9edc098..df5f5ffdc0 100644 --- a/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/stream/AsyncapiProxyFactory.java +++ b/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/stream/AsyncapiProxyFactory.java @@ -15,6 +15,7 @@ package io.aklivity.zilla.runtime.binding.asyncapi.internal.stream; import java.util.Optional; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.LongFunction; import java.util.function.LongSupplier; @@ -47,6 +48,7 @@ import io.aklivity.zilla.runtime.engine.buffer.BufferPool; import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler; import io.aklivity.zilla.runtime.engine.config.BindingConfig; +import io.aklivity.zilla.runtime.engine.config.NamespaceConfig; public final class AsyncapiProxyFactory implements AsyncapiStreamFactory { @@ -80,6 +82,8 @@ public final class AsyncapiProxyFactory implements AsyncapiStreamFactory private final LongSupplier supplyTraceId; private final Function supplyTypeId; private final LongFunction supplyCatalog; + private final Consumer attachComposite; + private final Consumer detachComposite; private final Long2ObjectHashMap bindings; private final Long2LongHashMap apiIds; private final AsyncapiConfiguration config; @@ -102,6 +106,8 @@ public AsyncapiProxyFactory( this.supplyTypeId = context::supplyTypeId; this.supplyTraceId = context::supplyTraceId; this.supplyCatalog = context::supplyCatalog; + this.attachComposite = context::attachComposite; + this.detachComposite = context::detachComposite; this.bindings = new Long2ObjectHashMap<>(); this.apiIds = new Long2LongHashMap(-1); this.asyncapiTypeId = context.supplyTypeId(AsyncapiBinding.NAME); @@ -119,7 +125,7 @@ public void attach( BindingConfig binding) { AsyncapiBindingConfig asyncapiBinding = new AsyncapiBindingConfig(binding, namespaceGenerator, - supplyCatalog, config.targetRouteId()); + supplyCatalog, attachComposite, detachComposite, config.targetRouteId()); bindings.put(binding.id, asyncapiBinding); asyncapiBinding.attach(binding); diff --git a/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/stream/AsyncapiServerFactory.java b/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/stream/AsyncapiServerFactory.java index a15bea4666..b32ebe11fd 100644 --- a/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/stream/AsyncapiServerFactory.java +++ b/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/stream/AsyncapiServerFactory.java @@ -14,6 +14,7 @@ */ package io.aklivity.zilla.runtime.binding.asyncapi.internal.stream; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.LongFunction; import java.util.function.LongSupplier; @@ -47,6 +48,7 @@ import io.aklivity.zilla.runtime.engine.buffer.BufferPool; import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler; import io.aklivity.zilla.runtime.engine.config.BindingConfig; +import io.aklivity.zilla.runtime.engine.config.NamespaceConfig; public final class AsyncapiServerFactory implements AsyncapiStreamFactory { @@ -86,6 +88,8 @@ public final class AsyncapiServerFactory implements AsyncapiStreamFactory private final LongSupplier supplyTraceId; private final Function supplyTypeId; private final LongFunction supplyCatalog; + private final Consumer attachComposite; + private final Consumer detachComposite; private final Long2ObjectHashMap bindings; private final int asyncapiTypeId; private final int mqttTypeId; @@ -108,6 +112,8 @@ public AsyncapiServerFactory( this.supplyTypeId = context::supplyTypeId; this.supplyTraceId = context::supplyTraceId; this.supplyCatalog = context::supplyCatalog; + this.attachComposite = context::attachComposite; + this.detachComposite = context::detachComposite; this.bindings = new Long2ObjectHashMap<>(); this.asyncapiTypeId = context.supplyTypeId(AsyncapiBinding.NAME); this.mqttTypeId = context.supplyTypeId(MQTT_TYPE_NAME); @@ -131,7 +137,7 @@ public void attach( BindingConfig binding) { AsyncapiBindingConfig asyncapiBinding = new AsyncapiBindingConfig(binding, namespaceGenerator, supplyCatalog, - config.targetRouteId()); + attachComposite, detachComposite, config.targetRouteId()); bindings.put(binding.id, asyncapiBinding); asyncapiBinding.attach(binding); diff --git a/incubator/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncapiBindingConfig.java b/incubator/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncapiBindingConfig.java index 18d2f9baf8..d7ae8c04bd 100644 --- a/incubator/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncapiBindingConfig.java +++ b/incubator/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncapiBindingConfig.java @@ -62,6 +62,7 @@ public final class OpenapiAsyncapiBindingConfig private final Object2LongHashMap asyncapiSchemaIdsByApiId; private final AsyncapiParser asyncapiParser; private final OpenapiParser openapiParser; + private final Consumer attach; private final Consumer detach; private NamespaceConfig composite; @@ -70,14 +71,17 @@ public final class OpenapiAsyncapiBindingConfig public OpenapiAsyncapiBindingConfig( BindingConfig binding, OpenapiAsyncNamespaceGenerator namespaceGenerator, - LongFunction supplyCatalog) + LongFunction supplyCatalog, + Consumer attachComposite, + Consumer detachComposite) { this.id = binding.id; this.name = binding.name; this.kind = binding.kind; this.options = (OpenapiAsyncapiOptionsConfig) binding.options; this.resolvedIds = binding.resolveId; - this.detach = binding.detach; + this.attach = attachComposite; + this.detach = detachComposite; this.openapiSchemaIdsByApiId = new Object2LongHashMap<>(-1); this.asyncapiSchemaIdsByApiId = new Object2LongHashMap<>(-1); this.compositeResolvedIds = new Long2LongHashMap(-1); @@ -139,8 +143,9 @@ public void attach( (e, n) -> e, Object2ObjectHashMap::new)); - this.composite = binding.attach.apply(namespaceGenerator.generate(binding, openapis, - asyncapis, openapiSchemaIdsByApiId::get)); + this.composite = namespaceGenerator.generate(binding, openapis, asyncapis, openapiSchemaIdsByApiId::get); + this.composite.readURL = binding.readURL; + attach.accept(this.composite); BindingConfig mappingBinding = composite.bindings.stream() .filter(b -> b.type.equals("http-kafka")).findFirst().get(); diff --git a/incubator/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/streams/OpenapiAsyncapiProxyFactory.java b/incubator/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/streams/OpenapiAsyncapiProxyFactory.java index c22d80ebda..72f870044b 100644 --- a/incubator/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/streams/OpenapiAsyncapiProxyFactory.java +++ b/incubator/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/streams/OpenapiAsyncapiProxyFactory.java @@ -14,6 +14,7 @@ */ package io.aklivity.zilla.runtime.binding.openapi.asyncapi.internal.streams; +import java.util.function.Consumer; import java.util.function.LongFunction; import java.util.function.LongSupplier; import java.util.function.LongUnaryOperator; @@ -47,6 +48,7 @@ import io.aklivity.zilla.runtime.engine.buffer.BufferPool; import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler; import io.aklivity.zilla.runtime.engine.config.BindingConfig; +import io.aklivity.zilla.runtime.engine.config.NamespaceConfig; public final class OpenapiAsyncapiProxyFactory implements OpenapiAsyncapiStreamFactory { @@ -83,6 +85,8 @@ public final class OpenapiAsyncapiProxyFactory implements OpenapiAsyncapiStreamF private final LongUnaryOperator supplyReplyId; private final LongSupplier supplyTraceId; private final LongFunction supplyCatalog; + private final Consumer attachComposite; + private final Consumer detachComposite; private final Long2ObjectHashMap bindings; private final Long2LongHashMap apiIds; private final OpenapiAsyncapiConfiguration config; @@ -104,6 +108,8 @@ public OpenapiAsyncapiProxyFactory( this.supplyReplyId = context::supplyReplyId; this.supplyTraceId = context::supplyTraceId; this.supplyCatalog = context::supplyCatalog; + this.attachComposite = context::attachComposite; + this.detachComposite = context::detachComposite; this.bindings = new Long2ObjectHashMap<>(); this.apiIds = new Long2LongHashMap(-1); this.openapiTypeId = context.supplyTypeId(OpenapiBinding.NAME); @@ -122,7 +128,7 @@ public void attach( BindingConfig binding) { OpenapiAsyncapiBindingConfig apiBinding = new OpenapiAsyncapiBindingConfig(binding, - namespaceGenerator, supplyCatalog); + namespaceGenerator, supplyCatalog, attachComposite, detachComposite); bindings.put(binding.id, apiBinding); apiBinding.attach(binding); diff --git a/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiBindingConfig.java b/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiBindingConfig.java index 4b3f1aeaef..1b020b55a1 100644 --- a/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiBindingConfig.java +++ b/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiBindingConfig.java @@ -69,6 +69,7 @@ public final class OpenapiBindingConfig private final Long2LongHashMap apiIdsByNamespaceId; private final HttpHeaderHelper helper; private final OpenapiParser parser; + private final Consumer attach; private final Consumer detach; private final Long2LongHashMap resolvedIds; private final Object2ObjectHashMap paths; @@ -79,6 +80,8 @@ public OpenapiBindingConfig( BindingConfig binding, OpenapiNamespaceGenerator namespaceGenerator, LongFunction supplyCatalog, + Consumer attachComposite, + Consumer detachComposite, long overrideRouteId) { this.id = binding.id; @@ -96,7 +99,8 @@ public OpenapiBindingConfig( this.httpOrigins = new IntHashSet(-1); this.parser = new OpenapiParser(); this.helper = new HttpHeaderHelper(); - this.detach = binding.detach; + this.attach = attachComposite; + this.detach = detachComposite; this.routes = binding.routes.stream().map(OpenapiRouteConfig::new).collect(toList()); Map> resolversByMethod = new TreeMap<>(CharSequence::compare); @@ -118,7 +122,9 @@ public void attach( configs.forEach(c -> { Openapi openapi = c.openapi; - final NamespaceConfig composite = binding.attach.apply(namespaceGenerator.generate(binding, openapi)); + final NamespaceConfig composite = namespaceGenerator.generate(binding, openapi); + composite.readURL = binding.readURL; + attach.accept(composite); composites.put(c.schemaId, composite); openapi.paths.forEach((k, v) -> { diff --git a/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/streams/OpenapiClientFactory.java b/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/streams/OpenapiClientFactory.java index 62f4c6dd1a..c28f3272fb 100644 --- a/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/streams/OpenapiClientFactory.java +++ b/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/streams/OpenapiClientFactory.java @@ -14,6 +14,7 @@ */ package io.aklivity.zilla.runtime.binding.openapi.internal.streams; +import java.util.function.Consumer; import java.util.function.LongFunction; import java.util.function.LongSupplier; import java.util.function.LongUnaryOperator; @@ -43,6 +44,7 @@ import io.aklivity.zilla.runtime.engine.buffer.BufferPool; import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler; import io.aklivity.zilla.runtime.engine.config.BindingConfig; +import io.aklivity.zilla.runtime.engine.config.NamespaceConfig; public final class OpenapiClientFactory implements OpenapiStreamFactory { @@ -79,11 +81,12 @@ public final class OpenapiClientFactory implements OpenapiStreamFactory private final LongUnaryOperator supplyReplyId; private final LongSupplier supplyTraceId; private final LongFunction supplyCatalog; + private final Consumer attachComposite; + private final Consumer detachComposite; private final Long2ObjectHashMap bindings; private final int openapiTypeId; private final int httpTypeId; - public OpenapiClientFactory( OpenapiConfiguration config, EngineContext context) @@ -97,6 +100,8 @@ public OpenapiClientFactory( this.supplyReplyId = context::supplyReplyId; this.supplyTraceId = context::supplyTraceId; this.supplyCatalog = context::supplyCatalog; + this.attachComposite = context::attachComposite; + this.detachComposite = context::detachComposite; this.namespaceGenerator = new OpenapiClientNamespaceGenerator(); this.bindings = new Long2ObjectHashMap<>(); this.openapiTypeId = context.supplyTypeId(OpenapiBinding.NAME); @@ -120,7 +125,7 @@ public void attach( BindingConfig binding) { OpenapiBindingConfig openapiBinding = new OpenapiBindingConfig(binding, namespaceGenerator, supplyCatalog, - config.targetRouteId()); + attachComposite, detachComposite, config.targetRouteId()); bindings.put(binding.id, openapiBinding); openapiBinding.attach(binding); diff --git a/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/streams/OpenapiServerFactory.java b/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/streams/OpenapiServerFactory.java index 13faa7a88f..7f0cabcdee 100644 --- a/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/streams/OpenapiServerFactory.java +++ b/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/streams/OpenapiServerFactory.java @@ -14,6 +14,7 @@ */ package io.aklivity.zilla.runtime.binding.openapi.internal.streams; +import java.util.function.Consumer; import java.util.function.LongFunction; import java.util.function.LongSupplier; import java.util.function.LongUnaryOperator; @@ -45,6 +46,7 @@ import io.aklivity.zilla.runtime.engine.buffer.BufferPool; import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler; import io.aklivity.zilla.runtime.engine.config.BindingConfig; +import io.aklivity.zilla.runtime.engine.config.NamespaceConfig; public final class OpenapiServerFactory implements OpenapiStreamFactory { @@ -82,6 +84,8 @@ public final class OpenapiServerFactory implements OpenapiStreamFactory private final LongUnaryOperator supplyReplyId; private final LongSupplier supplyTraceId; private final LongFunction supplyCatalog; + private final Consumer attachComposite; + private final Consumer detachComposite; private final Long2ObjectHashMap bindings; private final int openapiTypeId; private final int httpTypeId; @@ -99,6 +103,8 @@ public OpenapiServerFactory( this.supplyReplyId = context::supplyReplyId; this.supplyTraceId = context::supplyTraceId; this.supplyCatalog = context::supplyCatalog; + this.attachComposite = context::attachComposite; + this.detachComposite = context::detachComposite; this.namespaceGenerator = new OpenapiServerNamespaceGenerator(); this.bindings = new Long2ObjectHashMap<>(); this.openapiTypeId = context.supplyTypeId(OpenapiBinding.NAME); @@ -122,7 +128,7 @@ public void attach( BindingConfig binding) { OpenapiBindingConfig openapiBinding = new OpenapiBindingConfig(binding, namespaceGenerator, supplyCatalog, - config.targetRouteId()); + attachComposite, detachComposite, config.targetRouteId()); bindings.put(binding.id, openapiBinding); openapiBinding.attach(binding); diff --git a/incubator/command-dump/src/main/java/io/aklivity/zilla/runtime/command/dump/internal/airline/ZillaDumpCommand.java b/incubator/command-dump/src/main/java/io/aklivity/zilla/runtime/command/dump/internal/airline/ZillaDumpCommand.java index 09a767e771..59c2fd5a23 100644 --- a/incubator/command-dump/src/main/java/io/aklivity/zilla/runtime/command/dump/internal/airline/ZillaDumpCommand.java +++ b/incubator/command-dump/src/main/java/io/aklivity/zilla/runtime/command/dump/internal/airline/ZillaDumpCommand.java @@ -314,7 +314,9 @@ public void run() final int streamBufferCount = streamBuffers.length; final IdleStrategy idleStrategy = new BackoffIdleStrategy(MAX_SPINS, MAX_YIELDS, MIN_PARK_NS, MAX_PARK_NS); - final BindingsLayoutReader bindings = BindingsLayoutReader.builder().directory(directory).build(); + final BindingsLayoutReader bindings = BindingsLayoutReader.builder() + .path(directory.resolve("bindings")) + .build(); final DumpHandler[] dumpHandlers = new DumpHandler[streamBufferCount]; for (int i = 0; i < streamBufferCount; i++) { diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/Engine.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/Engine.java index b0115b1b07..7c866f7707 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/Engine.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/Engine.java @@ -59,6 +59,7 @@ import io.aklivity.zilla.runtime.engine.binding.function.MessageReader; import io.aklivity.zilla.runtime.engine.catalog.Catalog; import io.aklivity.zilla.runtime.engine.config.KindConfig; +import io.aklivity.zilla.runtime.engine.config.NamespaceConfig; import io.aklivity.zilla.runtime.engine.event.EventFormatterFactory; import io.aklivity.zilla.runtime.engine.exporter.Exporter; import io.aklivity.zilla.runtime.engine.ext.EngineExtContext; @@ -95,6 +96,7 @@ public final class Engine implements Collector, AutoCloseable private final List workers; private final boolean readonly; private final EngineConfiguration config; + private final EngineManager manager; private Future watcherTaskRef; @@ -163,7 +165,7 @@ public final class Engine implements Collector, AutoCloseable EngineWorker worker = new EngineWorker(config, tasks, labels, errorHandler, tuning::affinity, bindings, exporters, guards, vaults, catalogs, models, metricGroups, this, this::supplyEventReader, - eventFormatterFactory, workerIndex, readonly); + eventFormatterFactory, workerIndex, readonly, this::process); workers.add(worker); } this.workers = workers; @@ -226,6 +228,7 @@ else if ("http".equals(protocol) || "https".equals(protocol)) this.extensions = extensions; this.context = context; this.readonly = readonly; + this.manager = manager; } public T binding( @@ -238,6 +241,12 @@ public T binding( .orElse(null); } + private void process( + NamespaceConfig config) + { + manager.process(config); + } + public void start() throws Exception { for (EngineWorker worker : workers) diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/EngineContext.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/EngineContext.java index 550cc6d876..15628a3b60 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/EngineContext.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/EngineContext.java @@ -74,6 +74,12 @@ MessageConsumer supplyReceiver( EventFormatter supplyEventFormatter(); + void attachComposite( + NamespaceConfig composite); + + void detachComposite( + NamespaceConfig composite); + void detachSender( long replyId); diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/config/BindingConfig.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/config/BindingConfig.java index 19c476e8ed..33faaec460 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/config/BindingConfig.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/config/BindingConfig.java @@ -19,11 +19,8 @@ import static java.util.function.Function.identity; import java.util.List; -import java.util.concurrent.ConcurrentMap; -import java.util.function.Consumer; import java.util.function.Function; import java.util.function.ToLongFunction; -import java.util.function.UnaryOperator; public class BindingConfig { @@ -37,8 +34,11 @@ public class BindingConfig public transient long[] metricIds; - public transient UnaryOperator attach; - public transient Consumer detach; + public transient long typeId; + public transient long kindId; + + public transient long originTypeId; + public transient long routedTypeId; public final String namespace; public final String name; @@ -51,7 +51,6 @@ public class BindingConfig public final List catalogs; public final List routes; public final TelemetryRefConfig telemetryRef; - public final ConcurrentMap composites; public static BindingConfigBuilder builder() { @@ -77,8 +76,7 @@ public static BindingConfigBuilder builder( .options(binding.options) .catalogs(binding.catalogs) .routes(binding.routes) - .telemetry(binding.telemetryRef) - .composites(binding.composites.values()); + .telemetry(binding.telemetryRef); } BindingConfig( @@ -91,8 +89,7 @@ public static BindingConfigBuilder builder( OptionsConfig options, List catalogs, List routes, - TelemetryRefConfig telemetryRef, - ConcurrentMap composites) + TelemetryRefConfig telemetryRef) { this.namespace = requireNonNull(namespace); this.name = requireNonNull(name); @@ -105,6 +102,5 @@ public static BindingConfigBuilder builder( this.routes = routes; this.catalogs = catalogs; this.telemetryRef = telemetryRef; - this.composites = composites; } } diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/config/BindingConfigBuilder.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/config/BindingConfigBuilder.java index ebcb99b920..c3b0bba650 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/config/BindingConfigBuilder.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/config/BindingConfigBuilder.java @@ -17,12 +17,9 @@ import static java.util.Collections.emptyList; -import java.util.Collection; import java.util.LinkedList; import java.util.List; import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.function.Function; public final class BindingConfigBuilder extends ConfigBuilder> @@ -44,7 +41,6 @@ public final class BindingConfigBuilder extends ConfigBuilder routes; private List catalogs; private TelemetryRefConfig telemetryRef; - private List composites; BindingConfigBuilder( Function mapper) @@ -184,29 +180,6 @@ public BindingConfigBuilder telemetry( return this; } - public NamespaceConfigBuilder> composite() - { - return new NamespaceConfigBuilder<>(this::composite); - } - - public BindingConfigBuilder composite( - NamespaceConfig composite) - { - if (composites == null) - { - composites = new LinkedList<>(); - } - composites.add(composite); - return this; - } - - public BindingConfigBuilder composites( - Collection composites) - { - composites.forEach(this::composite); - return this; - } - @Override public T build() { @@ -227,15 +200,6 @@ public T build() options, Optional.ofNullable(catalogs).orElse(CATALOGS_DEFAULT), Optional.ofNullable(routes).orElse(ROUTES_DEFAULT), - telemetryRef, - asConcurrentMap(Optional.ofNullable(composites).orElse(COMPOSITES_DEFAULT)))); - } - - private static ConcurrentMap asConcurrentMap( - List namespaces) - { - ConcurrentMap composites = new ConcurrentHashMap<>(); - namespaces.forEach(n -> composites.put(n.name, n)); - return composites; + telemetryRef)); } } diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/config/CompositeBindingAdapterSpi.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/config/CompositeBindingAdapterSpi.java deleted file mode 100644 index 208c6941d1..0000000000 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/config/CompositeBindingAdapterSpi.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright 2021-2023 Aklivity Inc. - * - * Aklivity licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.aklivity.zilla.runtime.engine.config; - -public interface CompositeBindingAdapterSpi -{ - String type(); - - BindingConfig adapt( - BindingConfig binding); -} diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/config/NamespaceConfig.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/config/NamespaceConfig.java index 65499ee635..a98f862482 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/config/NamespaceConfig.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/config/NamespaceConfig.java @@ -19,7 +19,6 @@ import static java.util.function.Function.identity; import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; public class NamespaceConfig @@ -34,8 +33,6 @@ public class NamespaceConfig public final List vaults; public final List catalogs; - public final AtomicInteger refs; - public static NamespaceConfigBuilder builder() { return new NamespaceConfigBuilder<>(identity()); @@ -55,6 +52,5 @@ public static NamespaceConfigBuilder builder() this.guards = requireNonNull(guards); this.vaults = requireNonNull(vaults); this.catalogs = requireNonNull(catalogs); - this.refs = new AtomicInteger(); } } diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/config/BindingConfigsAdapter.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/config/BindingConfigsAdapter.java index 74bba3d940..e35eda2bb7 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/config/BindingConfigsAdapter.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/config/BindingConfigsAdapter.java @@ -16,15 +16,10 @@ package io.aklivity.zilla.runtime.engine.internal.config; import static io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder.ROUTES_DEFAULT; -import static java.util.function.Function.identity; -import static java.util.stream.Collectors.toMap; import java.util.LinkedList; import java.util.List; -import java.util.Map; import java.util.Optional; -import java.util.ServiceLoader; -import java.util.function.Supplier; import java.util.regex.Matcher; import jakarta.json.Json; @@ -38,7 +33,6 @@ import io.aklivity.zilla.runtime.engine.config.BindingConfig; import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder; -import io.aklivity.zilla.runtime.engine.config.CompositeBindingAdapterSpi; import io.aklivity.zilla.runtime.engine.config.ConfigAdapterContext; import io.aklivity.zilla.runtime.engine.config.OptionsConfigAdapter; import io.aklivity.zilla.runtime.engine.config.OptionsConfigAdapterSpi; @@ -62,8 +56,6 @@ public class BindingConfigsAdapter implements JsonbAdapter composites; - private String namespace; public BindingConfigsAdapter( @@ -74,12 +66,6 @@ public BindingConfigsAdapter( this.options = new OptionsConfigAdapter(OptionsConfigAdapterSpi.Kind.BINDING, context); this.cataloged = new CatalogedAdapter(); this.telemetryRef = new TelemetryRefAdapter(); - - this.composites = ServiceLoader - .load(CompositeBindingAdapterSpi.class) - .stream() - .map(Supplier::get) - .collect(toMap(CompositeBindingAdapterSpi::type, identity())); } public BindingConfigsAdapter adaptNamespace( @@ -167,28 +153,23 @@ public BindingConfig[] adaptFromJson( for (String name : object.keySet()) { - JsonObject item = object.getJsonObject(name); - - String type = item.getString(TYPE_NAME); - route.adaptType(type); - options.adaptType(type); - - CompositeBindingAdapterSpi composite = composites.get(type); - - BindingConfigBuilder binding = composite != null - ? BindingConfig.builder(composite::adapt) - : BindingConfig.builder(); - Matcher matcher = NamespaceAdapter.PATTERN_NAME.matcher(name); if (!matcher.matches()) { throw new IllegalStateException(String.format("%s does not match pattern", name)); } - binding.namespace(Optional.ofNullable(matcher.group("namespace")).orElse(namespace)) - .name(matcher.group("name")) - .type(type) - .kind(kind.adaptFromJson(item.getJsonString(KIND_NAME))); + JsonObject item = object.getJsonObject(name); + + String type = item.getString(TYPE_NAME); + route.adaptType(type); + options.adaptType(type); + + BindingConfigBuilder binding = BindingConfig.builder() + .namespace(Optional.ofNullable(matcher.group("namespace")).orElse(namespace)) + .name(matcher.group("name")) + .type(type) + .kind(kind.adaptFromJson(item.getJsonString(KIND_NAME))); if (item.containsKey(ENTRY_NAME)) { diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/layouts/BindingsLayout.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/layouts/BindingsLayout.java index 150590ab71..7c8054b8ec 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/layouts/BindingsLayout.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/layouts/BindingsLayout.java @@ -28,6 +28,8 @@ import org.agrona.LangUtil; +import io.aklivity.zilla.runtime.engine.config.BindingConfig; + public final class BindingsLayout implements AutoCloseable { private static final int RECORD_SIZE = 5 * Long.BYTES; @@ -49,19 +51,16 @@ private BindingsLayout( } public void writeBindingInfo( - long id, - long typeId, - long kindId, - long originTypeId, - long routedTypeId) + BindingConfig binding) { byteBuf.clear(); - byteBuf.putLong(id); - byteBuf.putLong(typeId); - byteBuf.putLong(kindId); - byteBuf.putLong(originTypeId); - byteBuf.putLong(routedTypeId); + byteBuf.putLong(binding.id); + byteBuf.putLong(binding.typeId); + byteBuf.putLong(binding.kindId); + byteBuf.putLong(binding.originTypeId); + byteBuf.putLong(binding.routedTypeId); byteBuf.flip(); + while (byteBuf.hasRemaining()) { try @@ -101,10 +100,10 @@ public static final class Builder private Path path; private boolean readonly; - public Builder directory( - Path directory) + public Builder path( + Path path) { - this.path = directory.resolve("bindings"); + this.path = path; return this; } diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/BindingRegistry.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/BindingRegistry.java index 1c72c736c5..5901455c04 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/BindingRegistry.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/BindingRegistry.java @@ -40,6 +40,11 @@ final class BindingRegistry this.context = context; } + public BindingConfig config() + { + return binding; + } + public void attach() { attached = context.attach(binding); diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineManager.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineManager.java index cddd95b8b9..f19213d226 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineManager.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineManager.java @@ -22,7 +22,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -47,7 +46,6 @@ import io.aklivity.zilla.runtime.engine.config.ConfigException; import io.aklivity.zilla.runtime.engine.config.EngineConfig; import io.aklivity.zilla.runtime.engine.config.EngineConfigReader; -import io.aklivity.zilla.runtime.engine.config.EngineConfigWriter; import io.aklivity.zilla.runtime.engine.config.ExporterConfig; import io.aklivity.zilla.runtime.engine.config.GuardConfig; import io.aklivity.zilla.runtime.engine.config.GuardedConfig; @@ -64,7 +62,6 @@ import io.aklivity.zilla.runtime.engine.guard.Guard; import io.aklivity.zilla.runtime.engine.internal.Tuning; import io.aklivity.zilla.runtime.engine.internal.config.NamespaceAdapter; -import io.aklivity.zilla.runtime.engine.internal.layouts.BindingsLayout; import io.aklivity.zilla.runtime.engine.namespace.NamespacedId; import io.aklivity.zilla.runtime.engine.resolver.Resolver; @@ -168,6 +165,17 @@ public EngineConfig reconfigure( return newConfig; } + public void process( + NamespaceConfig namespace) + { + final List guards = current.namespaces.stream() + .map(n -> n.guards) + .flatMap(gs -> gs.stream()) + .collect(toList()); + + process(guards, namespace); + } + private EngineConfig parse( URL configURL, String configText) @@ -201,17 +209,9 @@ private EngineConfig parse( for (NamespaceConfig namespace : engine.namespaces) { - namespace.readURL = namespaceReadURL; + namespace.readURL = l -> readURL.apply(configURL, l); process(guards, namespace); } - - if (config.verboseComposites()) - { - EngineConfigWriter writer = new EngineConfigWriter(null); - engine.namespaces.stream() - .flatMap(n -> n.bindings.stream().flatMap(b -> b.composites.values().stream())) - .forEach(n -> System.out.println(writer.write(n))); - } } catch (Throwable ex) { @@ -264,8 +264,14 @@ private void process( binding.resolveId = resolver::resolve; binding.readURL = namespace.readURL; - binding.attach = n -> attachComposite(binding, n); - binding.detach = n -> detachComposite(binding, n); + binding.typeId = supplyId.applyAsInt(binding.type); + binding.kindId = supplyId.applyAsInt(binding.kind.name().toLowerCase()); + + Binding typed = bindingByType.apply(binding.type); + String originType = typed.originType(binding.kind); + String routedType = typed.routedType(binding.kind); + binding.originTypeId = originType != null ? supplyId.applyAsInt(originType) : 0L; + binding.routedTypeId = routedType != null ? supplyId.applyAsInt(routedType) : 0L; if (binding.vault != null) { @@ -349,12 +355,6 @@ private void process( } binding.metricIds = metricIds.stream().mapToLong(Long::longValue).toArray(); - for (NamespaceConfig composite : binding.composites.values()) - { - composite.readURL = binding.readURL; - process(guards, composite); - } - long affinity = tuning.affinity(binding.id); final long maxbits = maxWorkers.apply(binding.type.intern().hashCode()).applyAsInt(binding.kind); @@ -379,11 +379,6 @@ private void register( } extensions.forEach(e -> e.onRegistered(context)); - - if (config != null) - { - writeBindingTypes(config); - } } private void unregister( @@ -421,88 +416,6 @@ private void unregister( } } - private void writeBindingTypes( - EngineConfig engine) - { - try (BindingsLayout layout = BindingsLayout.builder() - .directory(config.directory()) - .build()) - { - LinkedList namespaces = new LinkedList<>(engine.namespaces); - for (int i = 0; i < namespaces.size(); i++) - { - NamespaceConfig namespace = namespaces.get(i); - for (BindingConfig binding : namespace.bindings) - { - long typeId = binding.resolveId.applyAsLong(binding.type); - long kindId = binding.resolveId.applyAsLong(binding.kind.name().toLowerCase()); - Binding typed = bindingByType.apply(binding.type); - long originTypeId = binding.resolveId.applyAsLong(typed.originType(binding.kind)); - long routedTypeId = binding.resolveId.applyAsLong(typed.routedType(binding.kind)); - layout.writeBindingInfo(binding.id, typeId, kindId, originTypeId, routedTypeId); - if (binding.composites != null) - { - namespaces.addAll(binding.composites.values()); - } - } - } - } - catch (Exception ex) - { - LangUtil.rethrowUnchecked(ex); - } - } - - private NamespaceConfig attachComposite( - BindingConfig binding, - NamespaceConfig composite) - { - NamespaceConfig attached = binding.composites.putIfAbsent(composite.name, composite); - - if (attached == null) - { - composite.readURL = binding.readURL; - process(composite); - writeBindingTypes(current); - register(composite); - - attached = composite; - - if (config.verboseComposites()) - { - EngineConfigWriter writer = new EngineConfigWriter(null); - System.out.println(writer.write(attached)); - } - } - - attached.refs.incrementAndGet(); - - return attached; - } - - private void detachComposite( - BindingConfig binding, - NamespaceConfig composite) - { - BiFunction remapper = (k, v) -> v.refs.decrementAndGet() == 0 ? null : v; - - if (binding.composites.computeIfPresent(composite.name, remapper) == null) - { - unregister(composite); - } - } - - private void process( - NamespaceConfig namespace) - { - final List guards = current.namespaces.stream() - .map(n -> n.guards) - .flatMap(gs -> gs.stream()) - .collect(toList()); - - process(guards, namespace); - } - private final class NameResolver { private final int namespaceId; diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineRegistry.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineRegistry.java index 94aa358d53..452cc3b183 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineRegistry.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineRegistry.java @@ -15,6 +15,8 @@ */ package io.aklivity.zilla.runtime.engine.internal.registry; +import java.util.Collection; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.LongConsumer; import java.util.function.ToIntFunction; @@ -23,7 +25,6 @@ import io.aklivity.zilla.runtime.engine.binding.BindingContext; import io.aklivity.zilla.runtime.engine.catalog.CatalogContext; -import io.aklivity.zilla.runtime.engine.config.BindingConfig; import io.aklivity.zilla.runtime.engine.config.NamespaceConfig; import io.aklivity.zilla.runtime.engine.exporter.ExporterContext; import io.aklivity.zilla.runtime.engine.guard.GuardContext; @@ -49,6 +50,7 @@ public class EngineRegistry private final Int2ObjectHashMap namespacesById; private final LongConsumer detachBinding; private final Collector collector; + private final Consumer process; public EngineRegistry( Function bindingsByType, @@ -62,7 +64,8 @@ public EngineRegistry( LongConsumer exporterDetached, ObjectLongLongFunction supplyMetricRecorder, LongConsumer detachBinding, - Collector collector) + Collector collector, + Consumer process) { this.bindingsByType = bindingsByType; this.guardsByType = guardsByType; @@ -77,6 +80,25 @@ public EngineRegistry( this.namespacesById = new Int2ObjectHashMap<>(); this.detachBinding = detachBinding; this.collector = collector; + this.process = process; + } + + public void process( + NamespaceConfig composite) + { + process.accept(composite); + } + + public void attachNow( + NamespaceConfig namespace) + { + attach(namespace).run(); + } + + public void detachNow( + NamespaceConfig namespace) + { + detach(namespace).run(); } public NamespaceTask attach( @@ -157,6 +179,11 @@ public void detachAll() namespacesById.clear(); } + public Collection namespaces() + { + return namespacesById.values(); + } + private NamespaceRegistry findNamespace( int namespaceId) { @@ -172,17 +199,6 @@ private void attachNamespace( supplyMetricRecorder, detachBinding, collector); namespacesById.put(registry.namespaceId(), registry); registry.attach(); - - for (BindingConfig binding : namespace.bindings) - { - for (NamespaceConfig composite : binding.composites.values()) - { - if (composite.refs.intValue() == 0) - { - attachNamespace(composite); - } - } - } } protected void detachNamespace( @@ -191,16 +207,5 @@ protected void detachNamespace( int namespaceId = supplyLabelId.applyAsInt(namespace.name); NamespaceRegistry registry = namespacesById.remove(namespaceId); registry.detach(); - - for (BindingConfig binding : namespace.bindings) - { - for (NamespaceConfig composite : binding.composites.values()) - { - if (composite.refs.intValue() == 0) - { - detachNamespace(composite); - } - } - } } } diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java index a484ff8a31..b08e60e369 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java @@ -56,6 +56,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.IntConsumer; import java.util.function.IntFunction; @@ -70,6 +71,7 @@ import org.agrona.DeadlineTimerWheel.TimerHandler; import org.agrona.DirectBuffer; import org.agrona.ErrorHandler; +import org.agrona.LangUtil; import org.agrona.MutableDirectBuffer; import org.agrona.collections.Int2ObjectHashMap; import org.agrona.collections.Long2ObjectHashMap; @@ -100,6 +102,7 @@ import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler; import io.aklivity.zilla.runtime.engine.concurrent.Signaler; import io.aklivity.zilla.runtime.engine.config.BindingConfig; +import io.aklivity.zilla.runtime.engine.config.EngineConfigWriter; import io.aklivity.zilla.runtime.engine.config.ModelConfig; import io.aklivity.zilla.runtime.engine.config.NamespaceConfig; import io.aklivity.zilla.runtime.engine.event.EventFormatter; @@ -114,6 +117,7 @@ import io.aklivity.zilla.runtime.engine.internal.budget.DefaultBudgetCreditor; import io.aklivity.zilla.runtime.engine.internal.budget.DefaultBudgetDebitor; import io.aklivity.zilla.runtime.engine.internal.exporter.ExporterAgent; +import io.aklivity.zilla.runtime.engine.internal.layouts.BindingsLayout; import io.aklivity.zilla.runtime.engine.internal.layouts.BudgetsLayout; import io.aklivity.zilla.runtime.engine.internal.layouts.BufferPoolLayout; import io.aklivity.zilla.runtime.engine.internal.layouts.EventsLayout; @@ -250,7 +254,8 @@ public EngineWorker( Supplier supplyEventReader, EventFormatterFactory eventFormatterFactory, int index, - boolean readonly) + boolean readonly, + Consumer process) { this.localIndex = index; this.config = config; @@ -422,7 +427,7 @@ public EngineWorker( this.registry = new EngineRegistry( bindingsByType::get, guardsByType::get, vaultsByType::get, catalogsByType::get, metricsByName::get, exportersByType::get, labels::supplyLabelId, this::onExporterAttached, this::onExporterDetached, - this::supplyMetricWriter, this::detachStreams, collector); + this::supplyMetricWriter, this::detachStreams, collector, process); this.taskQueue = new ConcurrentLinkedDeque<>(); this.correlations = new Long2ObjectHashMap<>(); @@ -748,6 +753,34 @@ public String roleName() return agentName; } + @Override + public void attachComposite( + NamespaceConfig composite) + { + assert thread == Thread.currentThread(); + + registry.process(composite); + registry.attachNow(composite); + writeBindingTypes(registry); + + if (localIndex == 0 && + config.verboseComposites()) + { + EngineConfigWriter writer = new EngineConfigWriter(null); + System.out.println(writer.write(composite)); + } + } + + @Override + public void detachComposite( + NamespaceConfig composite) + { + assert thread == Thread.currentThread(); + + registry.detachNow(composite); + writeBindingTypes(registry); + } + public void doStart() { thread = startOnThread(runner, Thread::new); @@ -864,17 +897,11 @@ public String toString() public CompletableFuture attach( NamespaceConfig namespace) { - NamespaceTask attachTask = registry.attach(namespace); + assert thread != Thread.currentThread(); - if (thread == Thread.currentThread()) - { - attachTask.run(); - } - else - { - taskQueue.offer(attachTask); - signaler.signalNow(0L, 0L, 0L, supplyTraceId(), SIGNAL_TASK_QUEUED, 0); - } + NamespaceTask attachTask = registry.attach(namespace); + taskQueue.offer(attachTask); + signaler.signalNow(0L, 0L, 0L, supplyTraceId(), SIGNAL_TASK_QUEUED, 0); return attachTask.future(); } @@ -882,17 +909,11 @@ public CompletableFuture attach( public CompletableFuture detach( NamespaceConfig namespace) { - NamespaceTask detachTask = registry.detach(namespace); + assert thread != Thread.currentThread(); - if (thread == Thread.currentThread()) - { - detachTask.run(); - } - else - { - taskQueue.offer(detachTask); - signaler.signalNow(0L, 0L, 0L, supplyTraceId(), SIGNAL_TASK_QUEUED, 0); - } + NamespaceTask detachTask = registry.detach(namespace); + taskQueue.offer(detachTask); + signaler.signalNow(0L, 0L, 0L, supplyTraceId(), SIGNAL_TASK_QUEUED, 0); return detachTask.future(); } @@ -975,6 +996,33 @@ public Clock clock() return Clock.systemUTC(); } + private void writeBindingTypes( + EngineRegistry engine) + { + // assumes all composite bindings are attached on worker 0 + if (localIndex == 0) + { + try (BindingsLayout layout = BindingsLayout.builder() + .path(config.directory().resolve("bindings")) + .build()) + { + for (NamespaceRegistry namespace : engine.namespaces()) + { + for (BindingRegistry registry : namespace.bindings()) + { + BindingConfig binding = registry.config(); + + layout.writeBindingInfo(binding); + } + } + } + catch (Exception ex) + { + LangUtil.rethrowUnchecked(ex); + } + } + } + private void onSystemMessage( int msgTypeId, DirectBuffer buffer, diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/NamespaceRegistry.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/NamespaceRegistry.java index 86bee85e32..ef49c2c383 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/NamespaceRegistry.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/NamespaceRegistry.java @@ -21,6 +21,7 @@ import static io.aklivity.zilla.runtime.engine.metrics.MetricContext.Direction.RECEIVED; import static io.aklivity.zilla.runtime.engine.metrics.MetricContext.Direction.SENT; +import java.util.Collection; import java.util.function.Function; import java.util.function.LongConsumer; import java.util.function.LongFunction; @@ -138,6 +139,11 @@ public void detach() namespace.telemetry.exporters.forEach(this::detachExporter); } + public Collection bindings() + { + return bindingsById.values(); + } + private void attachBinding( BindingConfig config) { diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/reader/BindingsLayoutReader.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/reader/BindingsLayoutReader.java index 11da32e507..44c992a30b 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/reader/BindingsLayoutReader.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/reader/BindingsLayoutReader.java @@ -42,19 +42,19 @@ public static Builder builder() public static final class Builder { - private Path directory; + private Path path; - public Builder directory( - Path directory) + public Builder path( + Path path) { - this.directory = directory; + this.path = path; return this; } public BindingsLayoutReader build() { BindingsLayout bindingsLayout = BindingsLayout.builder() - .directory(directory) + .path(path) .readonly(true) .build(); return new BindingsLayoutReader(bindingsLayout); diff --git a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/internal/config/BindingConfigsAdapterTest.java b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/internal/config/BindingConfigsAdapterTest.java index 5d6b1767c8..22fdabe61c 100644 --- a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/internal/config/BindingConfigsAdapterTest.java +++ b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/internal/config/BindingConfigsAdapterTest.java @@ -42,7 +42,6 @@ import io.aklivity.zilla.runtime.engine.config.BindingConfig; import io.aklivity.zilla.runtime.engine.config.ConfigAdapterContext; -import io.aklivity.zilla.runtime.engine.config.NamespaceConfig; import io.aklivity.zilla.runtime.engine.config.RouteConfig; import io.aklivity.zilla.runtime.engine.test.internal.binding.config.TestBindingOptionsConfig; @@ -82,7 +81,6 @@ public void shouldReadBinding() assertThat(bindings[0], not(nullValue())); assertThat(bindings[0].kind, equalTo(PROXY)); assertThat(bindings[0].routes, emptyCollectionOf(RouteConfig.class)); - assertThat(bindings[0].composites.values(), not(emptyCollectionOf(NamespaceConfig.class))); } @Test diff --git a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/reader/BindingsLayoutReaderTest.java b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/reader/BindingsLayoutReaderTest.java index 7132ccce81..82c93792a2 100644 --- a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/reader/BindingsLayoutReaderTest.java +++ b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/reader/BindingsLayoutReaderTest.java @@ -15,6 +15,7 @@ */ package io.aklivity.zilla.runtime.engine.reader; +import static io.aklivity.zilla.runtime.engine.config.KindConfig.SERVER; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; @@ -24,6 +25,7 @@ import org.junit.Test; +import io.aklivity.zilla.runtime.engine.config.BindingConfig; import io.aklivity.zilla.runtime.engine.internal.layouts.BindingsLayout; public class BindingsLayoutReaderTest @@ -33,9 +35,25 @@ public void shouldReadBindings() { // GIVEN Path directory = Paths.get("target/zilla-itests"); - BindingsLayout layout = BindingsLayout.builder().directory(directory).build(); - layout.writeBindingInfo(1L, 2L, 3L, 4L, 5L); - BindingsLayoutReader reader = BindingsLayoutReader.builder().directory(directory).build(); + BindingsLayout layout = BindingsLayout.builder() + .path(directory.resolve("bindings0")) + .build(); + BindingConfig binding = BindingConfig.builder() + .namespace("test") + .name("test0") + .type("test") + .kind(SERVER) + .build(); + binding.id = 1L; + binding.typeId = 2L; + binding.kindId = 3L; + binding.originTypeId = 4L; + binding.routedTypeId = 5L; + layout.writeBindingInfo(binding); + + BindingsLayoutReader reader = BindingsLayoutReader.builder() + .path(directory.resolve("bindings0")) + .build(); // WHEN Map result = reader.bindings(); diff --git a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/config/TestCompositeBindingAdapterSpi.java b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/config/TestCompositeBindingAdapterSpi.java deleted file mode 100644 index b2927481a1..0000000000 --- a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/config/TestCompositeBindingAdapterSpi.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright 2021-2023 Aklivity Inc. - * - * Aklivity licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.aklivity.zilla.runtime.engine.test.internal.binding.config; - -import io.aklivity.zilla.runtime.engine.config.BindingConfig; -import io.aklivity.zilla.runtime.engine.config.CompositeBindingAdapterSpi; -import io.aklivity.zilla.runtime.engine.config.KindConfig; - -public class TestCompositeBindingAdapterSpi implements CompositeBindingAdapterSpi -{ - @Override - public String type() - { - return "test"; - } - - @Override - public BindingConfig adapt( - BindingConfig binding) - { - switch (binding.kind) - { - case PROXY: - return BindingConfig.builder(binding) - .composite() - .name(String.format(binding.qname, "$composite")) - .binding() - .name("test0") - .type("test") - .kind(KindConfig.SERVER) - .build() - .build() - .build(); - default: - return binding; - } - } - -} diff --git a/runtime/engine/src/test/resources/META-INF/services/io.aklivity.zilla.runtime.engine.config.CompositeBindingAdapterSpi b/runtime/engine/src/test/resources/META-INF/services/io.aklivity.zilla.runtime.engine.config.CompositeBindingAdapterSpi deleted file mode 100644 index a5029d1d87..0000000000 --- a/runtime/engine/src/test/resources/META-INF/services/io.aklivity.zilla.runtime.engine.config.CompositeBindingAdapterSpi +++ /dev/null @@ -1 +0,0 @@ -io.aklivity.zilla.runtime.engine.test.internal.binding.config.TestCompositeBindingAdapterSpi From 010e017be511e71f12922ecd8d8096a823d1835f Mon Sep 17 00:00:00 2001 From: John Fallows Date: Thu, 11 Apr 2024 10:56:20 -0700 Subject: [PATCH 2/2] Ensure name matcher is local to thread --- .../zilla/runtime/engine/internal/registry/EngineManager.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineManager.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineManager.java index f19213d226..db46a6bb17 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineManager.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineManager.java @@ -83,7 +83,6 @@ public class EngineManager private final List extensions; private final BiFunction readURL; private final Resolver expressions; - private final Matcher matchName; private EngineConfig current; @@ -116,7 +115,6 @@ public EngineManager( this.extensions = extensions; this.readURL = readURL; this.expressions = Resolver.instantiate(config); - this.matchName = NamespaceAdapter.PATTERN_NAME.matcher(""); } public EngineConfig reconfigure( @@ -419,11 +417,13 @@ private void unregister( private final class NameResolver { private final int namespaceId; + private final Matcher matchName; private NameResolver( int namespaceId) { this.namespaceId = namespaceId; + this.matchName = NamespaceAdapter.PATTERN_NAME.matcher(""); } private long resolve(